mirror of
https://github.com/openhab/openhab-addons.git
synced 2025-01-25 14:55:55 +01:00
[influxdb] Write asynchronously to database and improve performance (#14888)
* [influxdb] Write asynchronously to database --------- Signed-off-by: Jan N. Klug <github@klug.nrw>
This commit is contained in:
parent
990700de8d
commit
6009c13966
@ -12,19 +12,34 @@
|
||||
*/
|
||||
package org.openhab.persistence.influxdb;
|
||||
|
||||
import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.*;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.time.ZoneId;
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.eclipse.jdt.annotation.NonNullByDefault;
|
||||
import org.eclipse.jdt.annotation.Nullable;
|
||||
import org.openhab.core.common.ThreadPoolManager;
|
||||
import org.openhab.core.config.core.ConfigurableService;
|
||||
import org.openhab.core.items.Item;
|
||||
import org.openhab.core.items.ItemFactory;
|
||||
import org.openhab.core.items.ItemRegistry;
|
||||
import org.openhab.core.items.ItemUtil;
|
||||
import org.openhab.core.persistence.FilterCriteria;
|
||||
import org.openhab.core.persistence.HistoricItem;
|
||||
import org.openhab.core.persistence.PersistenceItemInfo;
|
||||
@ -32,6 +47,7 @@ import org.openhab.core.persistence.PersistenceService;
|
||||
import org.openhab.core.persistence.QueryablePersistenceService;
|
||||
import org.openhab.core.persistence.strategy.PersistenceStrategy;
|
||||
import org.openhab.core.types.State;
|
||||
import org.openhab.core.types.UnDefType;
|
||||
import org.openhab.persistence.influxdb.internal.FilterCriteriaQueryCreator;
|
||||
import org.openhab.persistence.influxdb.internal.InfluxDBConfiguration;
|
||||
import org.openhab.persistence.influxdb.internal.InfluxDBHistoricItem;
|
||||
@ -40,8 +56,6 @@ import org.openhab.persistence.influxdb.internal.InfluxDBPersistentItemInfo;
|
||||
import org.openhab.persistence.influxdb.internal.InfluxDBRepository;
|
||||
import org.openhab.persistence.influxdb.internal.InfluxDBStateConvertUtils;
|
||||
import org.openhab.persistence.influxdb.internal.InfluxPoint;
|
||||
import org.openhab.persistence.influxdb.internal.ItemToStorePointCreator;
|
||||
import org.openhab.persistence.influxdb.internal.UnexpectedConditionException;
|
||||
import org.openhab.persistence.influxdb.internal.influx1.InfluxDB1RepositoryImpl;
|
||||
import org.openhab.persistence.influxdb.internal.influx2.InfluxDB2RepositoryImpl;
|
||||
import org.osgi.framework.Constants;
|
||||
@ -49,6 +63,8 @@ import org.osgi.service.component.annotations.Activate;
|
||||
import org.osgi.service.component.annotations.Component;
|
||||
import org.osgi.service.component.annotations.Deactivate;
|
||||
import org.osgi.service.component.annotations.Reference;
|
||||
import org.osgi.service.component.annotations.ReferenceCardinality;
|
||||
import org.osgi.service.component.annotations.ReferencePolicy;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@ -81,6 +97,7 @@ public class InfluxDBPersistenceService implements QueryablePersistenceService {
|
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(InfluxDBPersistenceService.class);
|
||||
|
||||
private static final int COMMIT_INTERVAL = 3; // in s
|
||||
protected static final String CONFIG_URI = "persistence:influxdb";
|
||||
|
||||
// External dependencies
|
||||
@ -88,9 +105,16 @@ public class InfluxDBPersistenceService implements QueryablePersistenceService {
|
||||
private final InfluxDBMetadataService influxDBMetadataService;
|
||||
|
||||
private final InfluxDBConfiguration configuration;
|
||||
private final ItemToStorePointCreator itemToStorePointCreator;
|
||||
private final InfluxDBRepository influxDBRepository;
|
||||
private boolean tryReconnection;
|
||||
private boolean serviceActivated;
|
||||
|
||||
// storage
|
||||
private final ScheduledFuture<?> storeJob;
|
||||
private final BlockingQueue<InfluxPoint> pointsQueue = new LinkedBlockingQueue<>();
|
||||
|
||||
// conversion
|
||||
private final Set<ItemFactory> itemFactories = new HashSet<>();
|
||||
private Map<String, Class<? extends State>> desiredClasses = new HashMap<>();
|
||||
|
||||
@Activate
|
||||
public InfluxDBPersistenceService(final @Reference ItemRegistry itemRegistry,
|
||||
@ -101,8 +125,9 @@ public class InfluxDBPersistenceService implements QueryablePersistenceService {
|
||||
if (configuration.isValid()) {
|
||||
this.influxDBRepository = createInfluxDBRepository();
|
||||
this.influxDBRepository.connect();
|
||||
this.itemToStorePointCreator = new ItemToStorePointCreator(configuration, influxDBMetadataService);
|
||||
tryReconnection = true;
|
||||
this.storeJob = ThreadPoolManager.getScheduledPool("org.openhab.influxdb")
|
||||
.scheduleWithFixedDelay(this::commit, COMMIT_INTERVAL, COMMIT_INTERVAL, TimeUnit.SECONDS);
|
||||
serviceActivated = true;
|
||||
} else {
|
||||
throw new IllegalArgumentException("Configuration invalid.");
|
||||
}
|
||||
@ -124,7 +149,15 @@ public class InfluxDBPersistenceService implements QueryablePersistenceService {
|
||||
*/
|
||||
@Deactivate
|
||||
public void deactivate() {
|
||||
tryReconnection = false;
|
||||
serviceActivated = false;
|
||||
|
||||
storeJob.cancel(false);
|
||||
commit(); // ensure we at least tried to store the data;
|
||||
|
||||
if (!pointsQueue.isEmpty()) {
|
||||
logger.warn("InfluxDB failed to finally store {} points.", pointsQueue.size());
|
||||
}
|
||||
|
||||
influxDBRepository.disconnect();
|
||||
logger.info("InfluxDB persistence service stopped.");
|
||||
}
|
||||
@ -157,26 +190,26 @@ public class InfluxDBPersistenceService implements QueryablePersistenceService {
|
||||
|
||||
@Override
|
||||
public void store(Item item, @Nullable String alias) {
|
||||
if (checkConnection()) {
|
||||
InfluxPoint point = itemToStorePointCreator.convert(item, alias);
|
||||
if (point != null) {
|
||||
try {
|
||||
influxDBRepository.write(point);
|
||||
logger.trace("Stored item {} in InfluxDB point {}", item, point);
|
||||
} catch (UnexpectedConditionException e) {
|
||||
logger.warn("Failed to store item {} in InfluxDB point {}", point, item);
|
||||
}
|
||||
} else {
|
||||
logger.trace("Ignoring item {}, conversion to an InfluxDB point failed.", item);
|
||||
}
|
||||
} else {
|
||||
logger.debug("store ignored, InfluxDB is not connected");
|
||||
if (!serviceActivated) {
|
||||
logger.warn("InfluxDB service not ready. Storing {} rejected.", item);
|
||||
return;
|
||||
}
|
||||
convert(item, alias).thenAccept(point -> {
|
||||
if (point == null) {
|
||||
logger.trace("Ignoring item {}, conversion to an InfluxDB point failed.", item.getName());
|
||||
return;
|
||||
}
|
||||
if (pointsQueue.offer(point)) {
|
||||
logger.trace("Queued {} for item {}", point, item);
|
||||
} else {
|
||||
logger.warn("Failed to queue {} for item {}", point, item);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterable<HistoricItem> query(FilterCriteria filter) {
|
||||
if (checkConnection()) {
|
||||
if (serviceActivated && checkConnection()) {
|
||||
logger.trace(
|
||||
"Query-Filter: itemname: {}, ordering: {}, state: {}, operator: {}, getBeginDate: {}, getEndDate: {}, getPageSize: {}, getPageNumber: {}",
|
||||
filter.getItemName(), filter.getOrdering().toString(), filter.getState(), filter.getOperator(),
|
||||
@ -211,10 +244,109 @@ public class InfluxDBPersistenceService implements QueryablePersistenceService {
|
||||
private boolean checkConnection() {
|
||||
if (influxDBRepository.isConnected()) {
|
||||
return true;
|
||||
} else if (tryReconnection) {
|
||||
} else if (serviceActivated) {
|
||||
logger.debug("Connection lost, trying re-connection");
|
||||
return influxDBRepository.connect();
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private void commit() {
|
||||
if (!pointsQueue.isEmpty() && checkConnection()) {
|
||||
List<InfluxPoint> points = new ArrayList<>();
|
||||
pointsQueue.drainTo(points);
|
||||
if (!influxDBRepository.write(points)) {
|
||||
logger.warn("Re-queuing {} elements, failed to write batch.", points.size());
|
||||
pointsQueue.addAll(points);
|
||||
} else {
|
||||
logger.trace("Wrote {} elements to database", points.size());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert incoming data to an {@link InfluxPoint} for further processing. This is needed because storage is
|
||||
* asynchronous and the item data may have changed.
|
||||
* <p />
|
||||
* The method is package-private for testing.
|
||||
*
|
||||
* @param item the {@link Item} that needs conversion
|
||||
* @param storeAlias an (optional) alias for the item
|
||||
* @return a {@link CompletableFuture} that contains either <code>null</code> for item states that cannot be
|
||||
* converted or the corresponding {@link InfluxPoint}
|
||||
*/
|
||||
CompletableFuture<@Nullable InfluxPoint> convert(Item item, @Nullable String storeAlias) {
|
||||
String itemName = item.getName();
|
||||
String itemLabel = item.getLabel();
|
||||
String category = item.getCategory();
|
||||
State state = item.getState();
|
||||
String itemType = item.getType();
|
||||
Instant timeStamp = Instant.now();
|
||||
|
||||
if (state instanceof UnDefType) {
|
||||
return CompletableFuture.completedFuture(null);
|
||||
}
|
||||
|
||||
return CompletableFuture.supplyAsync(() -> {
|
||||
String measurementName = storeAlias != null && !storeAlias.isBlank() ? storeAlias : itemName;
|
||||
measurementName = influxDBMetadataService.getMeasurementNameOrDefault(itemName, measurementName);
|
||||
|
||||
if (configuration.isReplaceUnderscore()) {
|
||||
measurementName = measurementName.replace('_', '.');
|
||||
}
|
||||
|
||||
State storeState = Objects
|
||||
.requireNonNullElse(state.as(desiredClasses.get(ItemUtil.getMainItemType(itemType))), state);
|
||||
Object value = InfluxDBStateConvertUtils.stateToObject(storeState);
|
||||
|
||||
InfluxPoint.Builder pointBuilder = InfluxPoint.newBuilder(measurementName).withTime(timeStamp)
|
||||
.withValue(value).withTag(TAG_ITEM_NAME, itemName);
|
||||
|
||||
if (configuration.isAddCategoryTag()) {
|
||||
String categoryName = Objects.requireNonNullElse(category, "n/a");
|
||||
pointBuilder.withTag(TAG_CATEGORY_NAME, categoryName);
|
||||
}
|
||||
|
||||
if (configuration.isAddTypeTag()) {
|
||||
pointBuilder.withTag(TAG_TYPE_NAME, itemType);
|
||||
}
|
||||
|
||||
if (configuration.isAddLabelTag()) {
|
||||
String labelName = Objects.requireNonNullElse(itemLabel, "n/a");
|
||||
pointBuilder.withTag(TAG_LABEL_NAME, labelName);
|
||||
}
|
||||
|
||||
influxDBMetadataService.getMetaData(itemName)
|
||||
.ifPresent(metadata -> metadata.getConfiguration().forEach(pointBuilder::withTag));
|
||||
|
||||
return pointBuilder.build();
|
||||
});
|
||||
}
|
||||
|
||||
@Reference(cardinality = ReferenceCardinality.AT_LEAST_ONE, policy = ReferencePolicy.DYNAMIC)
|
||||
public void setItemFactory(ItemFactory itemFactory) {
|
||||
itemFactories.add(itemFactory);
|
||||
calculateItemTypeClasses();
|
||||
}
|
||||
|
||||
public void unsetItemFactory(ItemFactory itemFactory) {
|
||||
itemFactories.remove(itemFactory);
|
||||
calculateItemTypeClasses();
|
||||
}
|
||||
|
||||
private synchronized void calculateItemTypeClasses() {
|
||||
Map<String, Class<? extends State>> desiredClasses = new HashMap<>();
|
||||
itemFactories.forEach(factory -> {
|
||||
for (String itemType : factory.getSupportedItemTypes()) {
|
||||
Item item = factory.createItem(itemType, "influxItem");
|
||||
if (item != null) {
|
||||
item.getAcceptedCommandTypes().stream()
|
||||
.filter(commandType -> commandType.isAssignableFrom(State.class)).findFirst()
|
||||
.map(commandType -> (Class<? extends State>) commandType.asSubclass(State.class))
|
||||
.ifPresent(desiredClass -> desiredClasses.put(itemType, desiredClass));
|
||||
}
|
||||
}
|
||||
});
|
||||
this.desiredClasses = desiredClasses;
|
||||
}
|
||||
}
|
||||
|
@ -68,12 +68,12 @@ public interface InfluxDBRepository {
|
||||
List<InfluxRow> query(String query);
|
||||
|
||||
/**
|
||||
* Write point to database
|
||||
* Write points to database
|
||||
*
|
||||
* @param influxPoint Point to write
|
||||
* @throws UnexpectedConditionException when an error occurs
|
||||
* @param influxPoints {@link List<InfluxPoint>} to write
|
||||
* @returns <code>true</code> if points have been written, <code>false</code> otherwise
|
||||
*/
|
||||
void write(InfluxPoint influxPoint) throws UnexpectedConditionException;
|
||||
boolean write(List<InfluxPoint> influxPoints);
|
||||
|
||||
/**
|
||||
* create a query creator on this repository
|
||||
|
@ -1,105 +0,0 @@
|
||||
/**
|
||||
* Copyright (c) 2010-2023 Contributors to the openHAB project
|
||||
*
|
||||
* See the NOTICE file(s) distributed with this work for additional
|
||||
* information.
|
||||
*
|
||||
* This program and the accompanying materials are made available under the
|
||||
* terms of the Eclipse Public License 2.0 which is available at
|
||||
* http://www.eclipse.org/legal/epl-2.0
|
||||
*
|
||||
* SPDX-License-Identifier: EPL-2.0
|
||||
*/
|
||||
package org.openhab.persistence.influxdb.internal;
|
||||
|
||||
import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.TAG_CATEGORY_NAME;
|
||||
import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.TAG_ITEM_NAME;
|
||||
import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.TAG_LABEL_NAME;
|
||||
import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.TAG_TYPE_NAME;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
|
||||
import org.eclipse.jdt.annotation.NonNullByDefault;
|
||||
import org.eclipse.jdt.annotation.Nullable;
|
||||
import org.openhab.core.items.Item;
|
||||
import org.openhab.core.types.State;
|
||||
import org.openhab.core.types.UnDefType;
|
||||
|
||||
/**
|
||||
* Logic to create an InfluxDB {@link InfluxPoint} from an openHAB {@link Item}
|
||||
*
|
||||
* @author Joan Pujol Espinar - Initial contribution
|
||||
*/
|
||||
@NonNullByDefault
|
||||
public class ItemToStorePointCreator {
|
||||
private final InfluxDBConfiguration configuration;
|
||||
private final InfluxDBMetadataService influxDBMetadataService;
|
||||
|
||||
public ItemToStorePointCreator(InfluxDBConfiguration configuration,
|
||||
InfluxDBMetadataService influxDBMetadataService) {
|
||||
this.configuration = configuration;
|
||||
this.influxDBMetadataService = influxDBMetadataService;
|
||||
}
|
||||
|
||||
public @Nullable InfluxPoint convert(Item item, @Nullable String storeAlias) {
|
||||
if (item.getState() instanceof UnDefType) {
|
||||
return null;
|
||||
}
|
||||
|
||||
String measurementName = calculateMeasurementName(item, storeAlias);
|
||||
String itemName = item.getName();
|
||||
State state = getItemState(item);
|
||||
|
||||
Object value = InfluxDBStateConvertUtils.stateToObject(state);
|
||||
|
||||
InfluxPoint.Builder pointBuilder = InfluxPoint.newBuilder(measurementName).withTime(Instant.now())
|
||||
.withValue(value).withTag(TAG_ITEM_NAME, itemName);
|
||||
|
||||
addPointTags(item, pointBuilder);
|
||||
|
||||
return pointBuilder.build();
|
||||
}
|
||||
|
||||
private String calculateMeasurementName(Item item, @Nullable String storeAlias) {
|
||||
String name = storeAlias != null && !storeAlias.isBlank() ? storeAlias : item.getName();
|
||||
name = influxDBMetadataService.getMeasurementNameOrDefault(item.getName(), name);
|
||||
|
||||
if (configuration.isReplaceUnderscore()) {
|
||||
name = name.replace('_', '.');
|
||||
}
|
||||
|
||||
return name;
|
||||
}
|
||||
|
||||
private State getItemState(Item item) {
|
||||
return calculateDesiredTypeConversionToStore(item)
|
||||
.map(desiredClass -> Objects.requireNonNullElseGet(item.getStateAs(desiredClass), item::getState))
|
||||
.orElseGet(item::getState);
|
||||
}
|
||||
|
||||
private Optional<Class<? extends State>> calculateDesiredTypeConversionToStore(Item item) {
|
||||
return item.getAcceptedCommandTypes().stream().filter(commandType -> commandType.isAssignableFrom(State.class))
|
||||
.findFirst().map(commandType -> commandType.asSubclass(State.class));
|
||||
}
|
||||
|
||||
private void addPointTags(Item item, InfluxPoint.Builder pointBuilder) {
|
||||
if (configuration.isAddCategoryTag()) {
|
||||
String categoryName = Objects.requireNonNullElse(item.getCategory(), "n/a");
|
||||
pointBuilder.withTag(TAG_CATEGORY_NAME, categoryName);
|
||||
}
|
||||
|
||||
if (configuration.isAddTypeTag()) {
|
||||
pointBuilder.withTag(TAG_TYPE_NAME, item.getType());
|
||||
}
|
||||
|
||||
if (configuration.isAddLabelTag()) {
|
||||
String labelName = Objects.requireNonNullElse(item.getLabel(), "n/a");
|
||||
pointBuilder.withTag(TAG_LABEL_NAME, labelName);
|
||||
}
|
||||
|
||||
influxDBMetadataService.getMetaData(item.getName())
|
||||
.ifPresent(metadata -> metadata.getConfiguration().forEach(pointBuilder::withTag));
|
||||
}
|
||||
}
|
@ -23,12 +23,14 @@ import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.eclipse.jdt.annotation.NonNullByDefault;
|
||||
import org.eclipse.jdt.annotation.Nullable;
|
||||
import org.influxdb.InfluxDB;
|
||||
import org.influxdb.InfluxDBFactory;
|
||||
import org.influxdb.dto.BatchPoints;
|
||||
import org.influxdb.dto.Point;
|
||||
import org.influxdb.dto.Pong;
|
||||
import org.influxdb.dto.Query;
|
||||
@ -38,10 +40,11 @@ import org.openhab.persistence.influxdb.internal.InfluxDBConfiguration;
|
||||
import org.openhab.persistence.influxdb.internal.InfluxDBMetadataService;
|
||||
import org.openhab.persistence.influxdb.internal.InfluxDBRepository;
|
||||
import org.openhab.persistence.influxdb.internal.InfluxPoint;
|
||||
import org.openhab.persistence.influxdb.internal.UnexpectedConditionException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.influxdb.exceptions.InfluxException;
|
||||
|
||||
/**
|
||||
* Implementation of {@link InfluxDBRepository} for InfluxDB 1.0
|
||||
*
|
||||
@ -113,17 +116,25 @@ public class InfluxDB1RepositoryImpl implements InfluxDBRepository {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(InfluxPoint point) throws UnexpectedConditionException {
|
||||
public boolean write(List<InfluxPoint> influxPoints) {
|
||||
final InfluxDB currentClient = this.client;
|
||||
if (currentClient != null) {
|
||||
Point clientPoint = convertPointToClientFormat(point);
|
||||
currentClient.write(configuration.getDatabaseName(), configuration.getRetentionPolicy(), clientPoint);
|
||||
} else {
|
||||
logger.warn("Write point {} ignored due to client isn't connected", point);
|
||||
if (currentClient == null) {
|
||||
return false;
|
||||
}
|
||||
try {
|
||||
List<Point> points = influxPoints.stream().map(this::convertPointToClientFormat).filter(Optional::isPresent)
|
||||
.map(Optional::get).toList();
|
||||
BatchPoints batchPoints = BatchPoints.database(configuration.getDatabaseName())
|
||||
.retentionPolicy(configuration.getRetentionPolicy()).points(points).build();
|
||||
currentClient.write(batchPoints);
|
||||
} catch (InfluxException e) {
|
||||
logger.debug("Writing to database failed", e);
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
private Point convertPointToClientFormat(InfluxPoint point) throws UnexpectedConditionException {
|
||||
private Optional<Point> convertPointToClientFormat(InfluxPoint point) {
|
||||
Point.Builder clientPoint = Point.measurement(point.getMeasurementName()).time(point.getTime().toEpochMilli(),
|
||||
TimeUnit.MILLISECONDS);
|
||||
Object value = point.getValue();
|
||||
@ -136,10 +147,11 @@ public class InfluxDB1RepositoryImpl implements InfluxDBRepository {
|
||||
} else if (value == null) {
|
||||
clientPoint.addField(FIELD_VALUE_NAME, "null");
|
||||
} else {
|
||||
throw new UnexpectedConditionException("Not expected value type");
|
||||
logger.warn("Could not convert {}, discarding this datapoint", point);
|
||||
return Optional.empty();
|
||||
}
|
||||
point.getTags().forEach(clientPoint::tag);
|
||||
return clientPoint.build();
|
||||
return Optional.of(clientPoint.build());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -20,6 +20,7 @@ import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import org.eclipse.jdt.annotation.NonNullByDefault;
|
||||
@ -30,7 +31,6 @@ import org.openhab.persistence.influxdb.internal.InfluxDBConstants;
|
||||
import org.openhab.persistence.influxdb.internal.InfluxDBMetadataService;
|
||||
import org.openhab.persistence.influxdb.internal.InfluxDBRepository;
|
||||
import org.openhab.persistence.influxdb.internal.InfluxPoint;
|
||||
import org.openhab.persistence.influxdb.internal.UnexpectedConditionException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@ -42,6 +42,7 @@ import com.influxdb.client.WriteApi;
|
||||
import com.influxdb.client.domain.Ready;
|
||||
import com.influxdb.client.domain.WritePrecision;
|
||||
import com.influxdb.client.write.Point;
|
||||
import com.influxdb.exceptions.InfluxException;
|
||||
import com.influxdb.query.FluxTable;
|
||||
|
||||
/**
|
||||
@ -120,34 +121,40 @@ public class InfluxDB2RepositoryImpl implements InfluxDBRepository {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(InfluxPoint point) throws UnexpectedConditionException {
|
||||
public boolean write(List<InfluxPoint> influxPoints) {
|
||||
final WriteApi currentWriteAPI = writeAPI;
|
||||
if (currentWriteAPI != null) {
|
||||
currentWriteAPI.writePoint(convertPointToClientFormat(point));
|
||||
} else {
|
||||
logger.warn("Write point {} ignored due to writeAPI isn't present", point);
|
||||
if (currentWriteAPI == null) {
|
||||
return false;
|
||||
}
|
||||
try {
|
||||
List<Point> clientPoints = influxPoints.stream().map(this::convertPointToClientFormat)
|
||||
.filter(Optional::isPresent).map(Optional::get).toList();
|
||||
currentWriteAPI.writePoints(clientPoints);
|
||||
} catch (InfluxException e) {
|
||||
logger.debug("Writing to database failed", e);
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
private Point convertPointToClientFormat(InfluxPoint point) throws UnexpectedConditionException {
|
||||
private Optional<Point> convertPointToClientFormat(InfluxPoint point) {
|
||||
Point clientPoint = Point.measurement(point.getMeasurementName()).time(point.getTime(), WritePrecision.MS);
|
||||
setPointValue(point.getValue(), clientPoint);
|
||||
point.getTags().forEach(clientPoint::addTag);
|
||||
return clientPoint;
|
||||
}
|
||||
|
||||
private void setPointValue(@Nullable Object value, Point point) throws UnexpectedConditionException {
|
||||
@Nullable
|
||||
Object value = point.getValue();
|
||||
if (value instanceof String) {
|
||||
point.addField(FIELD_VALUE_NAME, (String) value);
|
||||
clientPoint.addField(FIELD_VALUE_NAME, (String) value);
|
||||
} else if (value instanceof Number) {
|
||||
point.addField(FIELD_VALUE_NAME, (Number) value);
|
||||
clientPoint.addField(FIELD_VALUE_NAME, (Number) value);
|
||||
} else if (value instanceof Boolean) {
|
||||
point.addField(FIELD_VALUE_NAME, (Boolean) value);
|
||||
clientPoint.addField(FIELD_VALUE_NAME, (Boolean) value);
|
||||
} else if (value == null) {
|
||||
point.addField(FIELD_VALUE_NAME, (String) null);
|
||||
clientPoint.addField(FIELD_VALUE_NAME, (String) null);
|
||||
} else {
|
||||
throw new UnexpectedConditionException("Not expected value type");
|
||||
logger.warn("Could not convert {}, discarding this datapoint)", clientPoint);
|
||||
return Optional.empty();
|
||||
}
|
||||
point.getTags().forEach(clientPoint::addTag);
|
||||
return Optional.of(clientPoint);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -10,7 +10,7 @@
|
||||
*
|
||||
* SPDX-License-Identifier: EPL-2.0
|
||||
*/
|
||||
package org.openhab.persistence.influxdb.internal;
|
||||
package org.openhab.persistence.influxdb;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
@ -30,14 +30,21 @@ import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
import org.mockito.junit.jupiter.MockitoSettings;
|
||||
import org.mockito.quality.Strictness;
|
||||
import org.openhab.core.items.ItemRegistry;
|
||||
import org.openhab.core.items.MetadataRegistry;
|
||||
import org.openhab.persistence.influxdb.InfluxDBPersistenceService;
|
||||
import org.openhab.persistence.influxdb.internal.InfluxDBMetadataService;
|
||||
import org.openhab.persistence.influxdb.internal.InfluxDBRepository;
|
||||
import org.openhab.persistence.influxdb.internal.InfluxDBVersion;
|
||||
import org.openhab.persistence.influxdb.internal.ItemTestHelper;
|
||||
import org.openhab.persistence.influxdb.internal.UnexpectedConditionException;
|
||||
|
||||
/**
|
||||
* @author Joan Pujol Espinar - Initial contribution
|
||||
*/
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
@MockitoSettings(strictness = Strictness.LENIENT)
|
||||
@NonNullByDefault
|
||||
public class InfluxDBPersistenceServiceTest {
|
||||
private static final Map<String, Object> VALID_V1_CONFIGURATION = Map.of( //
|
||||
@ -106,7 +113,7 @@ public class InfluxDBPersistenceServiceTest {
|
||||
InfluxDBPersistenceService instance = getService(VALID_V2_CONFIGURATION);
|
||||
when(influxDBRepositoryMock.isConnected()).thenReturn(true);
|
||||
instance.store(ItemTestHelper.createNumberItem("number", 5));
|
||||
verify(influxDBRepositoryMock).write(any());
|
||||
verify(influxDBRepositoryMock, timeout(5000)).write(any());
|
||||
}
|
||||
|
||||
@Test
|
@ -10,19 +10,21 @@
|
||||
*
|
||||
* SPDX-License-Identifier: EPL-2.0
|
||||
*/
|
||||
package org.openhab.persistence.influxdb.internal;
|
||||
package org.openhab.persistence.influxdb;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.*;
|
||||
import static org.mockito.Mockito.when;
|
||||
import static org.openhab.persistence.influxdb.internal.InfluxDBConfiguration.*;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import org.eclipse.jdt.annotation.DefaultLocation;
|
||||
import org.eclipse.jdt.annotation.NonNullByDefault;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
@ -31,11 +33,17 @@ import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.MethodSource;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
import org.openhab.core.items.ItemRegistry;
|
||||
import org.openhab.core.items.Metadata;
|
||||
import org.openhab.core.items.MetadataKey;
|
||||
import org.openhab.core.items.MetadataRegistry;
|
||||
import org.openhab.core.library.CoreItemFactory;
|
||||
import org.openhab.core.library.items.NumberItem;
|
||||
import org.openhab.persistence.influxdb.InfluxDBPersistenceService;
|
||||
import org.openhab.persistence.influxdb.internal.InfluxDBConstants;
|
||||
import org.openhab.persistence.influxdb.internal.InfluxDBMetadataService;
|
||||
import org.openhab.persistence.influxdb.internal.InfluxDBVersion;
|
||||
import org.openhab.persistence.influxdb.internal.InfluxPoint;
|
||||
import org.openhab.persistence.influxdb.internal.ItemTestHelper;
|
||||
|
||||
/**
|
||||
* @author Joan Pujol Espinar - Initial contribution
|
||||
@ -44,33 +52,27 @@ import org.openhab.persistence.influxdb.InfluxDBPersistenceService;
|
||||
@NonNullByDefault(value = { DefaultLocation.PARAMETER, DefaultLocation.RETURN_TYPE })
|
||||
public class ItemToStorePointCreatorTest {
|
||||
|
||||
private @Mock InfluxDBConfiguration influxDBConfiguration;
|
||||
private static final Map<String, Object> BASE_CONFIGURATION = Map.of( //
|
||||
URL_PARAM, "http://localhost:8086", //
|
||||
VERSION_PARAM, InfluxDBVersion.V1.name(), //
|
||||
USER_PARAM, "user", PASSWORD_PARAM, "password", //
|
||||
DATABASE_PARAM, "openhab", //
|
||||
RETENTION_POLICY_PARAM, "default");
|
||||
|
||||
private @Mock ItemRegistry itemRegistryMock;
|
||||
private @Mock MetadataRegistry metadataRegistry;
|
||||
private ItemToStorePointCreator instance;
|
||||
private InfluxDBPersistenceService instance;
|
||||
|
||||
@BeforeEach
|
||||
public void before() {
|
||||
InfluxDBMetadataService influxDBMetadataService = new InfluxDBMetadataService(metadataRegistry);
|
||||
when(influxDBConfiguration.isAddCategoryTag()).thenReturn(false);
|
||||
when(influxDBConfiguration.isAddLabelTag()).thenReturn(false);
|
||||
when(influxDBConfiguration.isAddTypeTag()).thenReturn(false);
|
||||
when(influxDBConfiguration.isReplaceUnderscore()).thenReturn(false);
|
||||
|
||||
instance = new ItemToStorePointCreator(influxDBConfiguration, influxDBMetadataService);
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
public void after() {
|
||||
instance = null;
|
||||
influxDBConfiguration = null;
|
||||
metadataRegistry = null;
|
||||
public void setup() {
|
||||
instance = getService(false, false, false, false);
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource
|
||||
public void convertBasicItem(Number number) {
|
||||
public void convertBasicItem(Number number) throws ExecutionException, InterruptedException {
|
||||
NumberItem item = ItemTestHelper.createNumberItem("myitem", number);
|
||||
InfluxPoint point = instance.convert(item, null);
|
||||
InfluxPoint point = instance.convert(item, null).get();
|
||||
|
||||
if (point == null) {
|
||||
Assertions.fail("'point' is null");
|
||||
@ -88,9 +90,9 @@ public class ItemToStorePointCreatorTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldUseAliasAsMeasurementNameIfProvided() {
|
||||
public void shouldUseAliasAsMeasurementNameIfProvided() throws ExecutionException, InterruptedException {
|
||||
NumberItem item = ItemTestHelper.createNumberItem("myitem", 5);
|
||||
InfluxPoint point = instance.convert(item, "aliasName");
|
||||
InfluxPoint point = instance.convert(item, "aliasName").get();
|
||||
|
||||
if (point == null) {
|
||||
Assertions.fail("'point' is null");
|
||||
@ -101,12 +103,12 @@ public class ItemToStorePointCreatorTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldStoreCategoryTagIfProvidedAndConfigured() {
|
||||
public void shouldStoreCategoryTagIfProvidedAndConfigured() throws ExecutionException, InterruptedException {
|
||||
NumberItem item = ItemTestHelper.createNumberItem("myitem", 5);
|
||||
item.setCategory("categoryValue");
|
||||
|
||||
when(influxDBConfiguration.isAddCategoryTag()).thenReturn(true);
|
||||
InfluxPoint point = instance.convert(item, null);
|
||||
instance = getService(false, true, false, false);
|
||||
InfluxPoint point = instance.convert(item, null).get();
|
||||
|
||||
if (point == null) {
|
||||
Assertions.fail("'point' is null");
|
||||
@ -115,8 +117,8 @@ public class ItemToStorePointCreatorTest {
|
||||
|
||||
assertThat(point.getTags(), hasEntry(InfluxDBConstants.TAG_CATEGORY_NAME, "categoryValue"));
|
||||
|
||||
when(influxDBConfiguration.isAddCategoryTag()).thenReturn(false);
|
||||
point = instance.convert(item, null);
|
||||
instance = getService(false, false, false, false);
|
||||
point = instance.convert(item, null).get();
|
||||
|
||||
if (point == null) {
|
||||
Assertions.fail("'point' is null");
|
||||
@ -127,11 +129,11 @@ public class ItemToStorePointCreatorTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldStoreTypeTagIfProvidedAndConfigured() {
|
||||
public void shouldStoreTypeTagIfProvidedAndConfigured() throws ExecutionException, InterruptedException {
|
||||
NumberItem item = ItemTestHelper.createNumberItem("myitem", 5);
|
||||
|
||||
when(influxDBConfiguration.isAddTypeTag()).thenReturn(true);
|
||||
InfluxPoint point = instance.convert(item, null);
|
||||
instance = getService(false, false, false, true);
|
||||
InfluxPoint point = instance.convert(item, null).get();
|
||||
|
||||
if (point == null) {
|
||||
Assertions.fail("'point' is null");
|
||||
@ -140,8 +142,8 @@ public class ItemToStorePointCreatorTest {
|
||||
|
||||
assertThat(point.getTags(), hasEntry(InfluxDBConstants.TAG_TYPE_NAME, "Number"));
|
||||
|
||||
when(influxDBConfiguration.isAddTypeTag()).thenReturn(false);
|
||||
point = instance.convert(item, null);
|
||||
instance = getService(false, false, false, false);
|
||||
point = instance.convert(item, null).get();
|
||||
|
||||
if (point == null) {
|
||||
Assertions.fail("'point' is null");
|
||||
@ -152,12 +154,12 @@ public class ItemToStorePointCreatorTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldStoreTypeLabelIfProvidedAndConfigured() {
|
||||
public void shouldStoreTypeLabelIfProvidedAndConfigured() throws ExecutionException, InterruptedException {
|
||||
NumberItem item = ItemTestHelper.createNumberItem("myitem", 5);
|
||||
item.setLabel("ItemLabel");
|
||||
|
||||
when(influxDBConfiguration.isAddLabelTag()).thenReturn(true);
|
||||
InfluxPoint point = instance.convert(item, null);
|
||||
instance = getService(false, false, true, false);
|
||||
InfluxPoint point = instance.convert(item, null).get();
|
||||
|
||||
if (point == null) {
|
||||
Assertions.fail("'point' is null");
|
||||
@ -166,8 +168,8 @@ public class ItemToStorePointCreatorTest {
|
||||
|
||||
assertThat(point.getTags(), hasEntry(InfluxDBConstants.TAG_LABEL_NAME, "ItemLabel"));
|
||||
|
||||
when(influxDBConfiguration.isAddLabelTag()).thenReturn(false);
|
||||
point = instance.convert(item, null);
|
||||
instance = getService(false, false, false, false);
|
||||
point = instance.convert(item, null).get();
|
||||
|
||||
if (point == null) {
|
||||
Assertions.fail("'point' is null");
|
||||
@ -178,14 +180,14 @@ public class ItemToStorePointCreatorTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldStoreMetadataAsTagsIfProvided() {
|
||||
public void shouldStoreMetadataAsTagsIfProvided() throws ExecutionException, InterruptedException {
|
||||
NumberItem item = ItemTestHelper.createNumberItem("myitem", 5);
|
||||
MetadataKey metadataKey = new MetadataKey(InfluxDBPersistenceService.SERVICE_NAME, item.getName());
|
||||
|
||||
when(metadataRegistry.get(metadataKey))
|
||||
.thenReturn(new Metadata(metadataKey, "", Map.of("key1", "val1", "key2", "val2")));
|
||||
|
||||
InfluxPoint point = instance.convert(item, null);
|
||||
InfluxPoint point = instance.convert(item, null).get();
|
||||
|
||||
if (point == null) {
|
||||
Assertions.fail("'point' is null");
|
||||
@ -197,18 +199,18 @@ public class ItemToStorePointCreatorTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldUseMeasurementNameFromMetadataIfProvided() {
|
||||
public void shouldUseMeasurementNameFromMetadataIfProvided() throws ExecutionException, InterruptedException {
|
||||
NumberItem item = ItemTestHelper.createNumberItem("myitem", 5);
|
||||
MetadataKey metadataKey = new MetadataKey(InfluxDBPersistenceService.SERVICE_NAME, item.getName());
|
||||
|
||||
InfluxPoint point = instance.convert(item, null);
|
||||
InfluxPoint point = instance.convert(item, null).get();
|
||||
if (point == null) {
|
||||
Assertions.fail();
|
||||
return;
|
||||
}
|
||||
assertThat(point.getMeasurementName(), equalTo(item.getName()));
|
||||
|
||||
point = instance.convert(item, null);
|
||||
point = instance.convert(item, null).get();
|
||||
if (point == null) {
|
||||
Assertions.fail();
|
||||
return;
|
||||
@ -219,7 +221,7 @@ public class ItemToStorePointCreatorTest {
|
||||
when(metadataRegistry.get(metadataKey))
|
||||
.thenReturn(new Metadata(metadataKey, "measurementName", Map.of("key1", "val1", "key2", "val2")));
|
||||
|
||||
point = instance.convert(item, null);
|
||||
point = instance.convert(item, null).get();
|
||||
if (point == null) {
|
||||
Assertions.fail();
|
||||
return;
|
||||
@ -230,7 +232,7 @@ public class ItemToStorePointCreatorTest {
|
||||
when(metadataRegistry.get(metadataKey))
|
||||
.thenReturn(new Metadata(metadataKey, "", Map.of("key1", "val1", "key2", "val2")));
|
||||
|
||||
point = instance.convert(item, null);
|
||||
point = instance.convert(item, null).get();
|
||||
if (point == null) {
|
||||
Assertions.fail();
|
||||
return;
|
||||
@ -238,4 +240,22 @@ public class ItemToStorePointCreatorTest {
|
||||
assertThat(point.getMeasurementName(), equalTo(item.getName()));
|
||||
assertThat(point.getTags(), hasEntry("item", item.getName()));
|
||||
}
|
||||
|
||||
private InfluxDBPersistenceService getService(boolean replaceUnderscore, boolean category, boolean label,
|
||||
boolean typeTag) {
|
||||
InfluxDBMetadataService influxDBMetadataService = new InfluxDBMetadataService(metadataRegistry);
|
||||
|
||||
Map<String, Object> configuration = new HashMap<>();
|
||||
configuration.putAll(BASE_CONFIGURATION);
|
||||
configuration.put(REPLACE_UNDERSCORE_PARAM, replaceUnderscore);
|
||||
configuration.put(ADD_CATEGORY_TAG_PARAM, category);
|
||||
configuration.put(ADD_LABEL_TAG_PARAM, label);
|
||||
configuration.put(ADD_TYPE_TAG_PARAM, typeTag);
|
||||
|
||||
InfluxDBPersistenceService instance = new InfluxDBPersistenceService(itemRegistryMock, influxDBMetadataService,
|
||||
configuration);
|
||||
instance.setItemFactory(new CoreItemFactory());
|
||||
|
||||
return instance;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user