diff --git a/bundles/org.openhab.persistence.influxdb/src/main/java/org/openhab/persistence/influxdb/InfluxDBPersistenceService.java b/bundles/org.openhab.persistence.influxdb/src/main/java/org/openhab/persistence/influxdb/InfluxDBPersistenceService.java index 31dff022172..168bc8e4747 100644 --- a/bundles/org.openhab.persistence.influxdb/src/main/java/org/openhab/persistence/influxdb/InfluxDBPersistenceService.java +++ b/bundles/org.openhab.persistence.influxdb/src/main/java/org/openhab/persistence/influxdb/InfluxDBPersistenceService.java @@ -12,19 +12,34 @@ */ package org.openhab.persistence.influxdb; +import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.*; + +import java.time.Instant; import java.time.ZoneId; import java.time.ZonedDateTime; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Objects; import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.eclipse.jdt.annotation.NonNullByDefault; import org.eclipse.jdt.annotation.Nullable; +import org.openhab.core.common.ThreadPoolManager; import org.openhab.core.config.core.ConfigurableService; import org.openhab.core.items.Item; +import org.openhab.core.items.ItemFactory; import org.openhab.core.items.ItemRegistry; +import org.openhab.core.items.ItemUtil; import org.openhab.core.persistence.FilterCriteria; import org.openhab.core.persistence.HistoricItem; import org.openhab.core.persistence.PersistenceItemInfo; @@ -32,6 +47,7 @@ import org.openhab.core.persistence.PersistenceService; import org.openhab.core.persistence.QueryablePersistenceService; import org.openhab.core.persistence.strategy.PersistenceStrategy; import org.openhab.core.types.State; +import org.openhab.core.types.UnDefType; import org.openhab.persistence.influxdb.internal.FilterCriteriaQueryCreator; import org.openhab.persistence.influxdb.internal.InfluxDBConfiguration; import org.openhab.persistence.influxdb.internal.InfluxDBHistoricItem; @@ -40,8 +56,6 @@ import org.openhab.persistence.influxdb.internal.InfluxDBPersistentItemInfo; import org.openhab.persistence.influxdb.internal.InfluxDBRepository; import org.openhab.persistence.influxdb.internal.InfluxDBStateConvertUtils; import org.openhab.persistence.influxdb.internal.InfluxPoint; -import org.openhab.persistence.influxdb.internal.ItemToStorePointCreator; -import org.openhab.persistence.influxdb.internal.UnexpectedConditionException; import org.openhab.persistence.influxdb.internal.influx1.InfluxDB1RepositoryImpl; import org.openhab.persistence.influxdb.internal.influx2.InfluxDB2RepositoryImpl; import org.osgi.framework.Constants; @@ -49,6 +63,8 @@ import org.osgi.service.component.annotations.Activate; import org.osgi.service.component.annotations.Component; import org.osgi.service.component.annotations.Deactivate; import org.osgi.service.component.annotations.Reference; +import org.osgi.service.component.annotations.ReferenceCardinality; +import org.osgi.service.component.annotations.ReferencePolicy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -81,6 +97,7 @@ public class InfluxDBPersistenceService implements QueryablePersistenceService { private final Logger logger = LoggerFactory.getLogger(InfluxDBPersistenceService.class); + private static final int COMMIT_INTERVAL = 3; // in s protected static final String CONFIG_URI = "persistence:influxdb"; // External dependencies @@ -88,9 +105,16 @@ public class InfluxDBPersistenceService implements QueryablePersistenceService { private final InfluxDBMetadataService influxDBMetadataService; private final InfluxDBConfiguration configuration; - private final ItemToStorePointCreator itemToStorePointCreator; private final InfluxDBRepository influxDBRepository; - private boolean tryReconnection; + private boolean serviceActivated; + + // storage + private final ScheduledFuture storeJob; + private final BlockingQueue pointsQueue = new LinkedBlockingQueue<>(); + + // conversion + private final Set itemFactories = new HashSet<>(); + private Map> desiredClasses = new HashMap<>(); @Activate public InfluxDBPersistenceService(final @Reference ItemRegistry itemRegistry, @@ -101,8 +125,9 @@ public class InfluxDBPersistenceService implements QueryablePersistenceService { if (configuration.isValid()) { this.influxDBRepository = createInfluxDBRepository(); this.influxDBRepository.connect(); - this.itemToStorePointCreator = new ItemToStorePointCreator(configuration, influxDBMetadataService); - tryReconnection = true; + this.storeJob = ThreadPoolManager.getScheduledPool("org.openhab.influxdb") + .scheduleWithFixedDelay(this::commit, COMMIT_INTERVAL, COMMIT_INTERVAL, TimeUnit.SECONDS); + serviceActivated = true; } else { throw new IllegalArgumentException("Configuration invalid."); } @@ -124,7 +149,15 @@ public class InfluxDBPersistenceService implements QueryablePersistenceService { */ @Deactivate public void deactivate() { - tryReconnection = false; + serviceActivated = false; + + storeJob.cancel(false); + commit(); // ensure we at least tried to store the data; + + if (!pointsQueue.isEmpty()) { + logger.warn("InfluxDB failed to finally store {} points.", pointsQueue.size()); + } + influxDBRepository.disconnect(); logger.info("InfluxDB persistence service stopped."); } @@ -157,26 +190,26 @@ public class InfluxDBPersistenceService implements QueryablePersistenceService { @Override public void store(Item item, @Nullable String alias) { - if (checkConnection()) { - InfluxPoint point = itemToStorePointCreator.convert(item, alias); - if (point != null) { - try { - influxDBRepository.write(point); - logger.trace("Stored item {} in InfluxDB point {}", item, point); - } catch (UnexpectedConditionException e) { - logger.warn("Failed to store item {} in InfluxDB point {}", point, item); - } - } else { - logger.trace("Ignoring item {}, conversion to an InfluxDB point failed.", item); - } - } else { - logger.debug("store ignored, InfluxDB is not connected"); + if (!serviceActivated) { + logger.warn("InfluxDB service not ready. Storing {} rejected.", item); + return; } + convert(item, alias).thenAccept(point -> { + if (point == null) { + logger.trace("Ignoring item {}, conversion to an InfluxDB point failed.", item.getName()); + return; + } + if (pointsQueue.offer(point)) { + logger.trace("Queued {} for item {}", point, item); + } else { + logger.warn("Failed to queue {} for item {}", point, item); + } + }); } @Override public Iterable query(FilterCriteria filter) { - if (checkConnection()) { + if (serviceActivated && checkConnection()) { logger.trace( "Query-Filter: itemname: {}, ordering: {}, state: {}, operator: {}, getBeginDate: {}, getEndDate: {}, getPageSize: {}, getPageNumber: {}", filter.getItemName(), filter.getOrdering().toString(), filter.getState(), filter.getOperator(), @@ -211,10 +244,109 @@ public class InfluxDBPersistenceService implements QueryablePersistenceService { private boolean checkConnection() { if (influxDBRepository.isConnected()) { return true; - } else if (tryReconnection) { + } else if (serviceActivated) { logger.debug("Connection lost, trying re-connection"); return influxDBRepository.connect(); } return false; } + + private void commit() { + if (!pointsQueue.isEmpty() && checkConnection()) { + List points = new ArrayList<>(); + pointsQueue.drainTo(points); + if (!influxDBRepository.write(points)) { + logger.warn("Re-queuing {} elements, failed to write batch.", points.size()); + pointsQueue.addAll(points); + } else { + logger.trace("Wrote {} elements to database", points.size()); + } + } + } + + /** + * Convert incoming data to an {@link InfluxPoint} for further processing. This is needed because storage is + * asynchronous and the item data may have changed. + *

+ * The method is package-private for testing. + * + * @param item the {@link Item} that needs conversion + * @param storeAlias an (optional) alias for the item + * @return a {@link CompletableFuture} that contains either null for item states that cannot be + * converted or the corresponding {@link InfluxPoint} + */ + CompletableFuture<@Nullable InfluxPoint> convert(Item item, @Nullable String storeAlias) { + String itemName = item.getName(); + String itemLabel = item.getLabel(); + String category = item.getCategory(); + State state = item.getState(); + String itemType = item.getType(); + Instant timeStamp = Instant.now(); + + if (state instanceof UnDefType) { + return CompletableFuture.completedFuture(null); + } + + return CompletableFuture.supplyAsync(() -> { + String measurementName = storeAlias != null && !storeAlias.isBlank() ? storeAlias : itemName; + measurementName = influxDBMetadataService.getMeasurementNameOrDefault(itemName, measurementName); + + if (configuration.isReplaceUnderscore()) { + measurementName = measurementName.replace('_', '.'); + } + + State storeState = Objects + .requireNonNullElse(state.as(desiredClasses.get(ItemUtil.getMainItemType(itemType))), state); + Object value = InfluxDBStateConvertUtils.stateToObject(storeState); + + InfluxPoint.Builder pointBuilder = InfluxPoint.newBuilder(measurementName).withTime(timeStamp) + .withValue(value).withTag(TAG_ITEM_NAME, itemName); + + if (configuration.isAddCategoryTag()) { + String categoryName = Objects.requireNonNullElse(category, "n/a"); + pointBuilder.withTag(TAG_CATEGORY_NAME, categoryName); + } + + if (configuration.isAddTypeTag()) { + pointBuilder.withTag(TAG_TYPE_NAME, itemType); + } + + if (configuration.isAddLabelTag()) { + String labelName = Objects.requireNonNullElse(itemLabel, "n/a"); + pointBuilder.withTag(TAG_LABEL_NAME, labelName); + } + + influxDBMetadataService.getMetaData(itemName) + .ifPresent(metadata -> metadata.getConfiguration().forEach(pointBuilder::withTag)); + + return pointBuilder.build(); + }); + } + + @Reference(cardinality = ReferenceCardinality.AT_LEAST_ONE, policy = ReferencePolicy.DYNAMIC) + public void setItemFactory(ItemFactory itemFactory) { + itemFactories.add(itemFactory); + calculateItemTypeClasses(); + } + + public void unsetItemFactory(ItemFactory itemFactory) { + itemFactories.remove(itemFactory); + calculateItemTypeClasses(); + } + + private synchronized void calculateItemTypeClasses() { + Map> desiredClasses = new HashMap<>(); + itemFactories.forEach(factory -> { + for (String itemType : factory.getSupportedItemTypes()) { + Item item = factory.createItem(itemType, "influxItem"); + if (item != null) { + item.getAcceptedCommandTypes().stream() + .filter(commandType -> commandType.isAssignableFrom(State.class)).findFirst() + .map(commandType -> (Class) commandType.asSubclass(State.class)) + .ifPresent(desiredClass -> desiredClasses.put(itemType, desiredClass)); + } + } + }); + this.desiredClasses = desiredClasses; + } } diff --git a/bundles/org.openhab.persistence.influxdb/src/main/java/org/openhab/persistence/influxdb/internal/InfluxDBRepository.java b/bundles/org.openhab.persistence.influxdb/src/main/java/org/openhab/persistence/influxdb/internal/InfluxDBRepository.java index efb749a8269..f955a6b3c1c 100644 --- a/bundles/org.openhab.persistence.influxdb/src/main/java/org/openhab/persistence/influxdb/internal/InfluxDBRepository.java +++ b/bundles/org.openhab.persistence.influxdb/src/main/java/org/openhab/persistence/influxdb/internal/InfluxDBRepository.java @@ -68,12 +68,12 @@ public interface InfluxDBRepository { List query(String query); /** - * Write point to database + * Write points to database * - * @param influxPoint Point to write - * @throws UnexpectedConditionException when an error occurs + * @param influxPoints {@link List} to write + * @returns true if points have been written, false otherwise */ - void write(InfluxPoint influxPoint) throws UnexpectedConditionException; + boolean write(List influxPoints); /** * create a query creator on this repository diff --git a/bundles/org.openhab.persistence.influxdb/src/main/java/org/openhab/persistence/influxdb/internal/ItemToStorePointCreator.java b/bundles/org.openhab.persistence.influxdb/src/main/java/org/openhab/persistence/influxdb/internal/ItemToStorePointCreator.java deleted file mode 100644 index 25929d80926..00000000000 --- a/bundles/org.openhab.persistence.influxdb/src/main/java/org/openhab/persistence/influxdb/internal/ItemToStorePointCreator.java +++ /dev/null @@ -1,105 +0,0 @@ -/** - * Copyright (c) 2010-2023 Contributors to the openHAB project - * - * See the NOTICE file(s) distributed with this work for additional - * information. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License 2.0 which is available at - * http://www.eclipse.org/legal/epl-2.0 - * - * SPDX-License-Identifier: EPL-2.0 - */ -package org.openhab.persistence.influxdb.internal; - -import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.TAG_CATEGORY_NAME; -import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.TAG_ITEM_NAME; -import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.TAG_LABEL_NAME; -import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.TAG_TYPE_NAME; - -import java.time.Instant; -import java.util.Objects; -import java.util.Optional; - -import org.eclipse.jdt.annotation.NonNullByDefault; -import org.eclipse.jdt.annotation.Nullable; -import org.openhab.core.items.Item; -import org.openhab.core.types.State; -import org.openhab.core.types.UnDefType; - -/** - * Logic to create an InfluxDB {@link InfluxPoint} from an openHAB {@link Item} - * - * @author Joan Pujol Espinar - Initial contribution - */ -@NonNullByDefault -public class ItemToStorePointCreator { - private final InfluxDBConfiguration configuration; - private final InfluxDBMetadataService influxDBMetadataService; - - public ItemToStorePointCreator(InfluxDBConfiguration configuration, - InfluxDBMetadataService influxDBMetadataService) { - this.configuration = configuration; - this.influxDBMetadataService = influxDBMetadataService; - } - - public @Nullable InfluxPoint convert(Item item, @Nullable String storeAlias) { - if (item.getState() instanceof UnDefType) { - return null; - } - - String measurementName = calculateMeasurementName(item, storeAlias); - String itemName = item.getName(); - State state = getItemState(item); - - Object value = InfluxDBStateConvertUtils.stateToObject(state); - - InfluxPoint.Builder pointBuilder = InfluxPoint.newBuilder(measurementName).withTime(Instant.now()) - .withValue(value).withTag(TAG_ITEM_NAME, itemName); - - addPointTags(item, pointBuilder); - - return pointBuilder.build(); - } - - private String calculateMeasurementName(Item item, @Nullable String storeAlias) { - String name = storeAlias != null && !storeAlias.isBlank() ? storeAlias : item.getName(); - name = influxDBMetadataService.getMeasurementNameOrDefault(item.getName(), name); - - if (configuration.isReplaceUnderscore()) { - name = name.replace('_', '.'); - } - - return name; - } - - private State getItemState(Item item) { - return calculateDesiredTypeConversionToStore(item) - .map(desiredClass -> Objects.requireNonNullElseGet(item.getStateAs(desiredClass), item::getState)) - .orElseGet(item::getState); - } - - private Optional> calculateDesiredTypeConversionToStore(Item item) { - return item.getAcceptedCommandTypes().stream().filter(commandType -> commandType.isAssignableFrom(State.class)) - .findFirst().map(commandType -> commandType.asSubclass(State.class)); - } - - private void addPointTags(Item item, InfluxPoint.Builder pointBuilder) { - if (configuration.isAddCategoryTag()) { - String categoryName = Objects.requireNonNullElse(item.getCategory(), "n/a"); - pointBuilder.withTag(TAG_CATEGORY_NAME, categoryName); - } - - if (configuration.isAddTypeTag()) { - pointBuilder.withTag(TAG_TYPE_NAME, item.getType()); - } - - if (configuration.isAddLabelTag()) { - String labelName = Objects.requireNonNullElse(item.getLabel(), "n/a"); - pointBuilder.withTag(TAG_LABEL_NAME, labelName); - } - - influxDBMetadataService.getMetaData(item.getName()) - .ifPresent(metadata -> metadata.getConfiguration().forEach(pointBuilder::withTag)); - } -} diff --git a/bundles/org.openhab.persistence.influxdb/src/main/java/org/openhab/persistence/influxdb/internal/influx1/InfluxDB1RepositoryImpl.java b/bundles/org.openhab.persistence.influxdb/src/main/java/org/openhab/persistence/influxdb/internal/influx1/InfluxDB1RepositoryImpl.java index dda799785e6..0319603e5f3 100644 --- a/bundles/org.openhab.persistence.influxdb/src/main/java/org/openhab/persistence/influxdb/internal/influx1/InfluxDB1RepositoryImpl.java +++ b/bundles/org.openhab.persistence.influxdb/src/main/java/org/openhab/persistence/influxdb/internal/influx1/InfluxDB1RepositoryImpl.java @@ -23,12 +23,14 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.TimeUnit; import org.eclipse.jdt.annotation.NonNullByDefault; import org.eclipse.jdt.annotation.Nullable; import org.influxdb.InfluxDB; import org.influxdb.InfluxDBFactory; +import org.influxdb.dto.BatchPoints; import org.influxdb.dto.Point; import org.influxdb.dto.Pong; import org.influxdb.dto.Query; @@ -38,10 +40,11 @@ import org.openhab.persistence.influxdb.internal.InfluxDBConfiguration; import org.openhab.persistence.influxdb.internal.InfluxDBMetadataService; import org.openhab.persistence.influxdb.internal.InfluxDBRepository; import org.openhab.persistence.influxdb.internal.InfluxPoint; -import org.openhab.persistence.influxdb.internal.UnexpectedConditionException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.influxdb.exceptions.InfluxException; + /** * Implementation of {@link InfluxDBRepository} for InfluxDB 1.0 * @@ -113,17 +116,25 @@ public class InfluxDB1RepositoryImpl implements InfluxDBRepository { } @Override - public void write(InfluxPoint point) throws UnexpectedConditionException { + public boolean write(List influxPoints) { final InfluxDB currentClient = this.client; - if (currentClient != null) { - Point clientPoint = convertPointToClientFormat(point); - currentClient.write(configuration.getDatabaseName(), configuration.getRetentionPolicy(), clientPoint); - } else { - logger.warn("Write point {} ignored due to client isn't connected", point); + if (currentClient == null) { + return false; } + try { + List points = influxPoints.stream().map(this::convertPointToClientFormat).filter(Optional::isPresent) + .map(Optional::get).toList(); + BatchPoints batchPoints = BatchPoints.database(configuration.getDatabaseName()) + .retentionPolicy(configuration.getRetentionPolicy()).points(points).build(); + currentClient.write(batchPoints); + } catch (InfluxException e) { + logger.debug("Writing to database failed", e); + return false; + } + return true; } - private Point convertPointToClientFormat(InfluxPoint point) throws UnexpectedConditionException { + private Optional convertPointToClientFormat(InfluxPoint point) { Point.Builder clientPoint = Point.measurement(point.getMeasurementName()).time(point.getTime().toEpochMilli(), TimeUnit.MILLISECONDS); Object value = point.getValue(); @@ -136,10 +147,11 @@ public class InfluxDB1RepositoryImpl implements InfluxDBRepository { } else if (value == null) { clientPoint.addField(FIELD_VALUE_NAME, "null"); } else { - throw new UnexpectedConditionException("Not expected value type"); + logger.warn("Could not convert {}, discarding this datapoint", point); + return Optional.empty(); } point.getTags().forEach(clientPoint::tag); - return clientPoint.build(); + return Optional.of(clientPoint.build()); } @Override diff --git a/bundles/org.openhab.persistence.influxdb/src/main/java/org/openhab/persistence/influxdb/internal/influx2/InfluxDB2RepositoryImpl.java b/bundles/org.openhab.persistence.influxdb/src/main/java/org/openhab/persistence/influxdb/internal/influx2/InfluxDB2RepositoryImpl.java index 43d578d727d..dade69cf59b 100644 --- a/bundles/org.openhab.persistence.influxdb/src/main/java/org/openhab/persistence/influxdb/internal/influx2/InfluxDB2RepositoryImpl.java +++ b/bundles/org.openhab.persistence.influxdb/src/main/java/org/openhab/persistence/influxdb/internal/influx2/InfluxDB2RepositoryImpl.java @@ -20,6 +20,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.stream.Stream; import org.eclipse.jdt.annotation.NonNullByDefault; @@ -30,7 +31,6 @@ import org.openhab.persistence.influxdb.internal.InfluxDBConstants; import org.openhab.persistence.influxdb.internal.InfluxDBMetadataService; import org.openhab.persistence.influxdb.internal.InfluxDBRepository; import org.openhab.persistence.influxdb.internal.InfluxPoint; -import org.openhab.persistence.influxdb.internal.UnexpectedConditionException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,6 +42,7 @@ import com.influxdb.client.WriteApi; import com.influxdb.client.domain.Ready; import com.influxdb.client.domain.WritePrecision; import com.influxdb.client.write.Point; +import com.influxdb.exceptions.InfluxException; import com.influxdb.query.FluxTable; /** @@ -120,34 +121,40 @@ public class InfluxDB2RepositoryImpl implements InfluxDBRepository { } @Override - public void write(InfluxPoint point) throws UnexpectedConditionException { + public boolean write(List influxPoints) { final WriteApi currentWriteAPI = writeAPI; - if (currentWriteAPI != null) { - currentWriteAPI.writePoint(convertPointToClientFormat(point)); - } else { - logger.warn("Write point {} ignored due to writeAPI isn't present", point); + if (currentWriteAPI == null) { + return false; } + try { + List clientPoints = influxPoints.stream().map(this::convertPointToClientFormat) + .filter(Optional::isPresent).map(Optional::get).toList(); + currentWriteAPI.writePoints(clientPoints); + } catch (InfluxException e) { + logger.debug("Writing to database failed", e); + return false; + } + return true; } - private Point convertPointToClientFormat(InfluxPoint point) throws UnexpectedConditionException { + private Optional convertPointToClientFormat(InfluxPoint point) { Point clientPoint = Point.measurement(point.getMeasurementName()).time(point.getTime(), WritePrecision.MS); - setPointValue(point.getValue(), clientPoint); - point.getTags().forEach(clientPoint::addTag); - return clientPoint; - } - - private void setPointValue(@Nullable Object value, Point point) throws UnexpectedConditionException { + @Nullable + Object value = point.getValue(); if (value instanceof String) { - point.addField(FIELD_VALUE_NAME, (String) value); + clientPoint.addField(FIELD_VALUE_NAME, (String) value); } else if (value instanceof Number) { - point.addField(FIELD_VALUE_NAME, (Number) value); + clientPoint.addField(FIELD_VALUE_NAME, (Number) value); } else if (value instanceof Boolean) { - point.addField(FIELD_VALUE_NAME, (Boolean) value); + clientPoint.addField(FIELD_VALUE_NAME, (Boolean) value); } else if (value == null) { - point.addField(FIELD_VALUE_NAME, (String) null); + clientPoint.addField(FIELD_VALUE_NAME, (String) null); } else { - throw new UnexpectedConditionException("Not expected value type"); + logger.warn("Could not convert {}, discarding this datapoint)", clientPoint); + return Optional.empty(); } + point.getTags().forEach(clientPoint::addTag); + return Optional.of(clientPoint); } @Override diff --git a/bundles/org.openhab.persistence.influxdb/src/test/java/org/openhab/persistence/influxdb/internal/InfluxDBPersistenceServiceTest.java b/bundles/org.openhab.persistence.influxdb/src/test/java/org/openhab/persistence/influxdb/InfluxDBPersistenceServiceTest.java similarity index 89% rename from bundles/org.openhab.persistence.influxdb/src/test/java/org/openhab/persistence/influxdb/internal/InfluxDBPersistenceServiceTest.java rename to bundles/org.openhab.persistence.influxdb/src/test/java/org/openhab/persistence/influxdb/InfluxDBPersistenceServiceTest.java index 05ead8fb55e..2207d35ce48 100644 --- a/bundles/org.openhab.persistence.influxdb/src/test/java/org/openhab/persistence/influxdb/internal/InfluxDBPersistenceServiceTest.java +++ b/bundles/org.openhab.persistence.influxdb/src/test/java/org/openhab/persistence/influxdb/InfluxDBPersistenceServiceTest.java @@ -10,7 +10,7 @@ * * SPDX-License-Identifier: EPL-2.0 */ -package org.openhab.persistence.influxdb.internal; +package org.openhab.persistence.influxdb; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; @@ -30,14 +30,21 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; import org.openhab.core.items.ItemRegistry; import org.openhab.core.items.MetadataRegistry; -import org.openhab.persistence.influxdb.InfluxDBPersistenceService; +import org.openhab.persistence.influxdb.internal.InfluxDBMetadataService; +import org.openhab.persistence.influxdb.internal.InfluxDBRepository; +import org.openhab.persistence.influxdb.internal.InfluxDBVersion; +import org.openhab.persistence.influxdb.internal.ItemTestHelper; +import org.openhab.persistence.influxdb.internal.UnexpectedConditionException; /** * @author Joan Pujol Espinar - Initial contribution */ @ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.LENIENT) @NonNullByDefault public class InfluxDBPersistenceServiceTest { private static final Map VALID_V1_CONFIGURATION = Map.of( // @@ -106,7 +113,7 @@ public class InfluxDBPersistenceServiceTest { InfluxDBPersistenceService instance = getService(VALID_V2_CONFIGURATION); when(influxDBRepositoryMock.isConnected()).thenReturn(true); instance.store(ItemTestHelper.createNumberItem("number", 5)); - verify(influxDBRepositoryMock).write(any()); + verify(influxDBRepositoryMock, timeout(5000)).write(any()); } @Test diff --git a/bundles/org.openhab.persistence.influxdb/src/test/java/org/openhab/persistence/influxdb/internal/ItemToStorePointCreatorTest.java b/bundles/org.openhab.persistence.influxdb/src/test/java/org/openhab/persistence/influxdb/ItemToStorePointCreatorTest.java similarity index 62% rename from bundles/org.openhab.persistence.influxdb/src/test/java/org/openhab/persistence/influxdb/internal/ItemToStorePointCreatorTest.java rename to bundles/org.openhab.persistence.influxdb/src/test/java/org/openhab/persistence/influxdb/ItemToStorePointCreatorTest.java index 364489c5e33..10a653b96a7 100644 --- a/bundles/org.openhab.persistence.influxdb/src/test/java/org/openhab/persistence/influxdb/internal/ItemToStorePointCreatorTest.java +++ b/bundles/org.openhab.persistence.influxdb/src/test/java/org/openhab/persistence/influxdb/ItemToStorePointCreatorTest.java @@ -10,19 +10,21 @@ * * SPDX-License-Identifier: EPL-2.0 */ -package org.openhab.persistence.influxdb.internal; +package org.openhab.persistence.influxdb; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.*; import static org.mockito.Mockito.when; +import static org.openhab.persistence.influxdb.internal.InfluxDBConfiguration.*; import java.math.BigDecimal; +import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ExecutionException; import java.util.stream.Stream; import org.eclipse.jdt.annotation.DefaultLocation; import org.eclipse.jdt.annotation.NonNullByDefault; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -31,11 +33,17 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.openhab.core.items.ItemRegistry; import org.openhab.core.items.Metadata; import org.openhab.core.items.MetadataKey; import org.openhab.core.items.MetadataRegistry; +import org.openhab.core.library.CoreItemFactory; import org.openhab.core.library.items.NumberItem; -import org.openhab.persistence.influxdb.InfluxDBPersistenceService; +import org.openhab.persistence.influxdb.internal.InfluxDBConstants; +import org.openhab.persistence.influxdb.internal.InfluxDBMetadataService; +import org.openhab.persistence.influxdb.internal.InfluxDBVersion; +import org.openhab.persistence.influxdb.internal.InfluxPoint; +import org.openhab.persistence.influxdb.internal.ItemTestHelper; /** * @author Joan Pujol Espinar - Initial contribution @@ -44,33 +52,27 @@ import org.openhab.persistence.influxdb.InfluxDBPersistenceService; @NonNullByDefault(value = { DefaultLocation.PARAMETER, DefaultLocation.RETURN_TYPE }) public class ItemToStorePointCreatorTest { - private @Mock InfluxDBConfiguration influxDBConfiguration; + private static final Map BASE_CONFIGURATION = Map.of( // + URL_PARAM, "http://localhost:8086", // + VERSION_PARAM, InfluxDBVersion.V1.name(), // + USER_PARAM, "user", PASSWORD_PARAM, "password", // + DATABASE_PARAM, "openhab", // + RETENTION_POLICY_PARAM, "default"); + + private @Mock ItemRegistry itemRegistryMock; private @Mock MetadataRegistry metadataRegistry; - private ItemToStorePointCreator instance; + private InfluxDBPersistenceService instance; @BeforeEach - public void before() { - InfluxDBMetadataService influxDBMetadataService = new InfluxDBMetadataService(metadataRegistry); - when(influxDBConfiguration.isAddCategoryTag()).thenReturn(false); - when(influxDBConfiguration.isAddLabelTag()).thenReturn(false); - when(influxDBConfiguration.isAddTypeTag()).thenReturn(false); - when(influxDBConfiguration.isReplaceUnderscore()).thenReturn(false); - - instance = new ItemToStorePointCreator(influxDBConfiguration, influxDBMetadataService); - } - - @AfterEach - public void after() { - instance = null; - influxDBConfiguration = null; - metadataRegistry = null; + public void setup() { + instance = getService(false, false, false, false); } @ParameterizedTest @MethodSource - public void convertBasicItem(Number number) { + public void convertBasicItem(Number number) throws ExecutionException, InterruptedException { NumberItem item = ItemTestHelper.createNumberItem("myitem", number); - InfluxPoint point = instance.convert(item, null); + InfluxPoint point = instance.convert(item, null).get(); if (point == null) { Assertions.fail("'point' is null"); @@ -88,9 +90,9 @@ public class ItemToStorePointCreatorTest { } @Test - public void shouldUseAliasAsMeasurementNameIfProvided() { + public void shouldUseAliasAsMeasurementNameIfProvided() throws ExecutionException, InterruptedException { NumberItem item = ItemTestHelper.createNumberItem("myitem", 5); - InfluxPoint point = instance.convert(item, "aliasName"); + InfluxPoint point = instance.convert(item, "aliasName").get(); if (point == null) { Assertions.fail("'point' is null"); @@ -101,12 +103,12 @@ public class ItemToStorePointCreatorTest { } @Test - public void shouldStoreCategoryTagIfProvidedAndConfigured() { + public void shouldStoreCategoryTagIfProvidedAndConfigured() throws ExecutionException, InterruptedException { NumberItem item = ItemTestHelper.createNumberItem("myitem", 5); item.setCategory("categoryValue"); - when(influxDBConfiguration.isAddCategoryTag()).thenReturn(true); - InfluxPoint point = instance.convert(item, null); + instance = getService(false, true, false, false); + InfluxPoint point = instance.convert(item, null).get(); if (point == null) { Assertions.fail("'point' is null"); @@ -115,8 +117,8 @@ public class ItemToStorePointCreatorTest { assertThat(point.getTags(), hasEntry(InfluxDBConstants.TAG_CATEGORY_NAME, "categoryValue")); - when(influxDBConfiguration.isAddCategoryTag()).thenReturn(false); - point = instance.convert(item, null); + instance = getService(false, false, false, false); + point = instance.convert(item, null).get(); if (point == null) { Assertions.fail("'point' is null"); @@ -127,11 +129,11 @@ public class ItemToStorePointCreatorTest { } @Test - public void shouldStoreTypeTagIfProvidedAndConfigured() { + public void shouldStoreTypeTagIfProvidedAndConfigured() throws ExecutionException, InterruptedException { NumberItem item = ItemTestHelper.createNumberItem("myitem", 5); - when(influxDBConfiguration.isAddTypeTag()).thenReturn(true); - InfluxPoint point = instance.convert(item, null); + instance = getService(false, false, false, true); + InfluxPoint point = instance.convert(item, null).get(); if (point == null) { Assertions.fail("'point' is null"); @@ -140,8 +142,8 @@ public class ItemToStorePointCreatorTest { assertThat(point.getTags(), hasEntry(InfluxDBConstants.TAG_TYPE_NAME, "Number")); - when(influxDBConfiguration.isAddTypeTag()).thenReturn(false); - point = instance.convert(item, null); + instance = getService(false, false, false, false); + point = instance.convert(item, null).get(); if (point == null) { Assertions.fail("'point' is null"); @@ -152,12 +154,12 @@ public class ItemToStorePointCreatorTest { } @Test - public void shouldStoreTypeLabelIfProvidedAndConfigured() { + public void shouldStoreTypeLabelIfProvidedAndConfigured() throws ExecutionException, InterruptedException { NumberItem item = ItemTestHelper.createNumberItem("myitem", 5); item.setLabel("ItemLabel"); - when(influxDBConfiguration.isAddLabelTag()).thenReturn(true); - InfluxPoint point = instance.convert(item, null); + instance = getService(false, false, true, false); + InfluxPoint point = instance.convert(item, null).get(); if (point == null) { Assertions.fail("'point' is null"); @@ -166,8 +168,8 @@ public class ItemToStorePointCreatorTest { assertThat(point.getTags(), hasEntry(InfluxDBConstants.TAG_LABEL_NAME, "ItemLabel")); - when(influxDBConfiguration.isAddLabelTag()).thenReturn(false); - point = instance.convert(item, null); + instance = getService(false, false, false, false); + point = instance.convert(item, null).get(); if (point == null) { Assertions.fail("'point' is null"); @@ -178,14 +180,14 @@ public class ItemToStorePointCreatorTest { } @Test - public void shouldStoreMetadataAsTagsIfProvided() { + public void shouldStoreMetadataAsTagsIfProvided() throws ExecutionException, InterruptedException { NumberItem item = ItemTestHelper.createNumberItem("myitem", 5); MetadataKey metadataKey = new MetadataKey(InfluxDBPersistenceService.SERVICE_NAME, item.getName()); when(metadataRegistry.get(metadataKey)) .thenReturn(new Metadata(metadataKey, "", Map.of("key1", "val1", "key2", "val2"))); - InfluxPoint point = instance.convert(item, null); + InfluxPoint point = instance.convert(item, null).get(); if (point == null) { Assertions.fail("'point' is null"); @@ -197,18 +199,18 @@ public class ItemToStorePointCreatorTest { } @Test - public void shouldUseMeasurementNameFromMetadataIfProvided() { + public void shouldUseMeasurementNameFromMetadataIfProvided() throws ExecutionException, InterruptedException { NumberItem item = ItemTestHelper.createNumberItem("myitem", 5); MetadataKey metadataKey = new MetadataKey(InfluxDBPersistenceService.SERVICE_NAME, item.getName()); - InfluxPoint point = instance.convert(item, null); + InfluxPoint point = instance.convert(item, null).get(); if (point == null) { Assertions.fail(); return; } assertThat(point.getMeasurementName(), equalTo(item.getName())); - point = instance.convert(item, null); + point = instance.convert(item, null).get(); if (point == null) { Assertions.fail(); return; @@ -219,7 +221,7 @@ public class ItemToStorePointCreatorTest { when(metadataRegistry.get(metadataKey)) .thenReturn(new Metadata(metadataKey, "measurementName", Map.of("key1", "val1", "key2", "val2"))); - point = instance.convert(item, null); + point = instance.convert(item, null).get(); if (point == null) { Assertions.fail(); return; @@ -230,7 +232,7 @@ public class ItemToStorePointCreatorTest { when(metadataRegistry.get(metadataKey)) .thenReturn(new Metadata(metadataKey, "", Map.of("key1", "val1", "key2", "val2"))); - point = instance.convert(item, null); + point = instance.convert(item, null).get(); if (point == null) { Assertions.fail(); return; @@ -238,4 +240,22 @@ public class ItemToStorePointCreatorTest { assertThat(point.getMeasurementName(), equalTo(item.getName())); assertThat(point.getTags(), hasEntry("item", item.getName())); } + + private InfluxDBPersistenceService getService(boolean replaceUnderscore, boolean category, boolean label, + boolean typeTag) { + InfluxDBMetadataService influxDBMetadataService = new InfluxDBMetadataService(metadataRegistry); + + Map configuration = new HashMap<>(); + configuration.putAll(BASE_CONFIGURATION); + configuration.put(REPLACE_UNDERSCORE_PARAM, replaceUnderscore); + configuration.put(ADD_CATEGORY_TAG_PARAM, category); + configuration.put(ADD_LABEL_TAG_PARAM, label); + configuration.put(ADD_TYPE_TAG_PARAM, typeTag); + + InfluxDBPersistenceService instance = new InfluxDBPersistenceService(itemRegistryMock, influxDBMetadataService, + configuration); + instance.setItemFactory(new CoreItemFactory()); + + return instance; + } }