From 717addee9ae6e69e6f93131be192cda1bc3cd96a Mon Sep 17 00:00:00 2001 From: J-N-K Date: Sat, 13 May 2023 12:37:48 +0200 Subject: [PATCH] [influxdb] Implement ModifiablePersistenceService (#14959) * [influxdb] Implement ModifiablePersistenceService Signed-off-by: Jan N. Klug --- .../influxdb/InfluxDBPersistenceService.java | 40 ++++++++++---- .../influxdb/internal/InfluxDBRepository.java | 12 +++-- .../influx1/InfluxDB1RepositoryImpl.java | 18 ++++--- .../influx2/InfluxDB2RepositoryImpl.java | 53 ++++++++++++++++--- .../influxdb/ItemToStorePointCreatorTest.java | 27 +++++----- 5 files changed, 111 insertions(+), 39 deletions(-) diff --git a/bundles/org.openhab.persistence.influxdb/src/main/java/org/openhab/persistence/influxdb/InfluxDBPersistenceService.java b/bundles/org.openhab.persistence.influxdb/src/main/java/org/openhab/persistence/influxdb/InfluxDBPersistenceService.java index 783624651f4..4e7dfb33daa 100644 --- a/bundles/org.openhab.persistence.influxdb/src/main/java/org/openhab/persistence/influxdb/InfluxDBPersistenceService.java +++ b/bundles/org.openhab.persistence.influxdb/src/main/java/org/openhab/persistence/influxdb/InfluxDBPersistenceService.java @@ -42,6 +42,7 @@ import org.openhab.core.items.ItemRegistry; import org.openhab.core.items.ItemUtil; import org.openhab.core.persistence.FilterCriteria; import org.openhab.core.persistence.HistoricItem; +import org.openhab.core.persistence.ModifiablePersistenceService; import org.openhab.core.persistence.PersistenceItemInfo; import org.openhab.core.persistence.PersistenceService; import org.openhab.core.persistence.QueryablePersistenceService; @@ -92,7 +93,7 @@ import org.slf4j.LoggerFactory; QueryablePersistenceService.class }, configurationPid = "org.openhab.influxdb", // property = Constants.SERVICE_PID + "=org.openhab.influxdb") @ConfigurableService(category = "persistence", label = "InfluxDB Persistence Service", description_uri = InfluxDBPersistenceService.CONFIG_URI) -public class InfluxDBPersistenceService implements QueryablePersistenceService { +public class InfluxDBPersistenceService implements ModifiablePersistenceService { public static final String SERVICE_NAME = "influxdb"; private final Logger logger = LoggerFactory.getLogger(InfluxDBPersistenceService.class); @@ -190,11 +191,20 @@ public class InfluxDBPersistenceService implements QueryablePersistenceService { @Override public void store(Item item, @Nullable String alias) { + store(item, ZonedDateTime.now(), item.getState(), alias); + } + + @Override + public void store(Item item, ZonedDateTime date, State state) { + store(item, date, state, null); + } + + public void store(Item item, ZonedDateTime date, State state, @Nullable String alias) { if (!serviceActivated) { logger.warn("InfluxDB service not ready. Storing {} rejected.", item); return; } - convert(item, alias).thenAccept(point -> { + convert(item, state, date.toInstant(), null).thenAccept(point -> { if (point == null) { logger.trace("Ignoring item {}, conversion to an InfluxDB point failed.", item.getName()); return; @@ -207,6 +217,20 @@ public class InfluxDBPersistenceService implements QueryablePersistenceService { }); } + @Override + public boolean remove(FilterCriteria filter) throws IllegalArgumentException { + if (serviceActivated && checkConnection()) { + if (filter.getItemName() == null) { + logger.warn("Item name is missing in filter {} when trying to remove data.", filter); + return false; + } + return influxDBRepository.remove(filter); + } else { + logger.debug("Remove query {} ignored, InfluxDB is not connected.", filter); + return false; + } + } + @Override public Iterable query(FilterCriteria filter) { if (serviceActivated && checkConnection()) { @@ -215,13 +239,12 @@ public class InfluxDBPersistenceService implements QueryablePersistenceService { filter.getItemName(), filter.getOrdering().toString(), filter.getState(), filter.getOperator(), filter.getBeginDate(), filter.getEndDate(), filter.getPageSize(), filter.getPageNumber()); if (filter.getItemName() == null) { - logger.warn("Item name is missing in filter {}", filter); + logger.warn("Item name is missing in filter {} when querying data.", filter); return List.of(); } - String query = influxDBRepository.createQueryCreator().createQuery(filter, + + List results = influxDBRepository.query(filter, configuration.getRetentionPolicy()); - logger.trace("Query {}", query); - List results = influxDBRepository.query(query); return results.stream().map(this::mapRowToHistoricItem).collect(Collectors.toList()); } else { logger.debug("Query for persisted data ignored, InfluxDB is not connected"); @@ -279,13 +302,12 @@ public class InfluxDBPersistenceService implements QueryablePersistenceService { * @return a {@link CompletableFuture} that contains either null for item states that cannot be * converted or the corresponding {@link InfluxPoint} */ - CompletableFuture<@Nullable InfluxPoint> convert(Item item, @Nullable String storeAlias) { + CompletableFuture<@Nullable InfluxPoint> convert(Item item, State state, Instant timeStamp, + @Nullable String storeAlias) { String itemName = item.getName(); String itemLabel = item.getLabel(); String category = item.getCategory(); - State state = item.getState(); String itemType = item.getType(); - Instant timeStamp = Instant.now(); if (state instanceof UnDefType) { return CompletableFuture.completedFuture(null); diff --git a/bundles/org.openhab.persistence.influxdb/src/main/java/org/openhab/persistence/influxdb/internal/InfluxDBRepository.java b/bundles/org.openhab.persistence.influxdb/src/main/java/org/openhab/persistence/influxdb/internal/InfluxDBRepository.java index f955a6b3c1c..f73b1514907 100644 --- a/bundles/org.openhab.persistence.influxdb/src/main/java/org/openhab/persistence/influxdb/internal/InfluxDBRepository.java +++ b/bundles/org.openhab.persistence.influxdb/src/main/java/org/openhab/persistence/influxdb/internal/InfluxDBRepository.java @@ -17,6 +17,7 @@ import java.util.List; import java.util.Map; import org.eclipse.jdt.annotation.NonNullByDefault; +import org.openhab.core.persistence.FilterCriteria; /** * Manages InfluxDB server interaction maintaining client connection @@ -61,11 +62,11 @@ public interface InfluxDBRepository { /** * Executes Flux query * - * @param query Query + * @param filter the query filter * @return Query results * */ - List query(String query); + List query(FilterCriteria filter, String retentionPolicy); /** * Write points to database @@ -76,11 +77,12 @@ public interface InfluxDBRepository { boolean write(List influxPoints); /** - * create a query creator on this repository + * Execute delete query * - * @return the query creator for this repository + * @param filter the query filter + * @return true if query executed successfully, false otherwise */ - FilterCriteriaQueryCreator createQueryCreator(); + boolean remove(FilterCriteria filter); record InfluxRow(Instant time, String itemName, Object value) { } diff --git a/bundles/org.openhab.persistence.influxdb/src/main/java/org/openhab/persistence/influxdb/internal/influx1/InfluxDB1RepositoryImpl.java b/bundles/org.openhab.persistence.influxdb/src/main/java/org/openhab/persistence/influxdb/internal/influx1/InfluxDB1RepositoryImpl.java index 0319603e5f3..ef66df495e6 100644 --- a/bundles/org.openhab.persistence.influxdb/src/main/java/org/openhab/persistence/influxdb/internal/influx1/InfluxDB1RepositoryImpl.java +++ b/bundles/org.openhab.persistence.influxdb/src/main/java/org/openhab/persistence/influxdb/internal/influx1/InfluxDB1RepositoryImpl.java @@ -35,6 +35,7 @@ import org.influxdb.dto.Point; import org.influxdb.dto.Pong; import org.influxdb.dto.Query; import org.influxdb.dto.QueryResult; +import org.openhab.core.persistence.FilterCriteria; import org.openhab.persistence.influxdb.internal.FilterCriteriaQueryCreator; import org.openhab.persistence.influxdb.internal.InfluxDBConfiguration; import org.openhab.persistence.influxdb.internal.InfluxDBMetadataService; @@ -58,12 +59,14 @@ public class InfluxDB1RepositoryImpl implements InfluxDBRepository { private final Logger logger = LoggerFactory.getLogger(InfluxDB1RepositoryImpl.class); private final InfluxDBConfiguration configuration; private final InfluxDBMetadataService influxDBMetadataService; + private final FilterCriteriaQueryCreator queryCreator; private @Nullable InfluxDB client; public InfluxDB1RepositoryImpl(InfluxDBConfiguration configuration, InfluxDBMetadataService influxDBMetadataService) { this.configuration = configuration; this.influxDBMetadataService = influxDBMetadataService; + this.queryCreator = new InfluxDB1FilterCriteriaQueryCreatorImpl(configuration, influxDBMetadataService); } @Override @@ -134,6 +137,12 @@ public class InfluxDB1RepositoryImpl implements InfluxDBRepository { return true; } + @Override + public boolean remove(FilterCriteria filter) { + logger.warn("Removing data is not supported in InfluxDB v1."); + return false; + } + private Optional convertPointToClientFormat(InfluxPoint point) { Point.Builder clientPoint = Point.measurement(point.getMeasurementName()).time(point.getTime().toEpochMilli(), TimeUnit.MILLISECONDS); @@ -155,9 +164,11 @@ public class InfluxDB1RepositoryImpl implements InfluxDBRepository { } @Override - public List query(String query) { + public List query(FilterCriteria filter, String retentionPolicy) { final InfluxDB currentClient = client; if (currentClient != null) { + String query = queryCreator.createQuery(filter, retentionPolicy); + logger.trace("Query {}", query); Query parsedQuery = new Query(query, configuration.getDatabaseName()); List results = currentClient.query(parsedQuery, TimeUnit.MILLISECONDS).getResults(); return convertClientResultToRepository(results); @@ -216,9 +227,4 @@ public class InfluxDB1RepositoryImpl implements InfluxDBRepository { public Map getStoredItemsCount() { return Collections.emptyMap(); } - - @Override - public FilterCriteriaQueryCreator createQueryCreator() { - return new InfluxDB1FilterCriteriaQueryCreatorImpl(configuration, influxDBMetadataService); - } } diff --git a/bundles/org.openhab.persistence.influxdb/src/main/java/org/openhab/persistence/influxdb/internal/influx2/InfluxDB2RepositoryImpl.java b/bundles/org.openhab.persistence.influxdb/src/main/java/org/openhab/persistence/influxdb/internal/influx2/InfluxDB2RepositoryImpl.java index dade69cf59b..f675bf8241e 100644 --- a/bundles/org.openhab.persistence.influxdb/src/main/java/org/openhab/persistence/influxdb/internal/influx2/InfluxDB2RepositoryImpl.java +++ b/bundles/org.openhab.persistence.influxdb/src/main/java/org/openhab/persistence/influxdb/internal/influx2/InfluxDB2RepositoryImpl.java @@ -15,6 +15,8 @@ package org.openhab.persistence.influxdb.internal.influx2; import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.*; import java.time.Instant; +import java.time.OffsetDateTime; +import java.time.ZonedDateTime; import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; @@ -25,6 +27,7 @@ import java.util.stream.Stream; import org.eclipse.jdt.annotation.NonNullByDefault; import org.eclipse.jdt.annotation.Nullable; +import org.openhab.core.persistence.FilterCriteria; import org.openhab.persistence.influxdb.internal.FilterCriteriaQueryCreator; import org.openhab.persistence.influxdb.internal.InfluxDBConfiguration; import org.openhab.persistence.influxdb.internal.InfluxDBConstants; @@ -34,6 +37,7 @@ import org.openhab.persistence.influxdb.internal.InfluxPoint; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.influxdb.client.DeleteApi; import com.influxdb.client.InfluxDBClient; import com.influxdb.client.InfluxDBClientFactory; import com.influxdb.client.InfluxDBClientOptions; @@ -55,15 +59,18 @@ public class InfluxDB2RepositoryImpl implements InfluxDBRepository { private final Logger logger = LoggerFactory.getLogger(InfluxDB2RepositoryImpl.class); private final InfluxDBConfiguration configuration; private final InfluxDBMetadataService influxDBMetadataService; + private final FilterCriteriaQueryCreator queryCreator; private @Nullable InfluxDBClient client; private @Nullable QueryApi queryAPI; private @Nullable WriteApi writeAPI; + private @Nullable DeleteApi deleteAPI; public InfluxDB2RepositoryImpl(InfluxDBConfiguration configuration, InfluxDBMetadataService influxDBMetadataService) { this.configuration = configuration; this.influxDBMetadataService = influxDBMetadataService; + this.queryCreator = new InfluxDB2FilterCriteriaQueryCreatorImpl(configuration, influxDBMetadataService); } @Override @@ -88,6 +95,7 @@ public class InfluxDB2RepositoryImpl implements InfluxDBRepository { queryAPI = createdClient.getQueryApi(); writeAPI = createdClient.getWriteApi(); + deleteAPI = createdClient.getDeleteApi(); logger.debug("Successfully connected to InfluxDB. Instance ready={}", createdClient.ready()); return checkConnectionStatus(); @@ -137,6 +145,42 @@ public class InfluxDB2RepositoryImpl implements InfluxDBRepository { return true; } + @Override + public boolean remove(FilterCriteria filter) { + final DeleteApi currentDeleteApi = deleteAPI; + if (currentDeleteApi == null) { + return false; + } + + if (filter.getState() != null) { + logger.warn("Deleting by value is not supported in InfluxDB v2."); + return false; + } + OffsetDateTime start = Objects.requireNonNullElse(filter.getBeginDate(), ZonedDateTime.now().minusYears(100)) + .toOffsetDateTime(); + OffsetDateTime stop = Objects.requireNonNullElse(filter.getEndDate(), ZonedDateTime.now().plusYears(100)) + .toOffsetDateTime(); + + // create predicate + String predicate = ""; + String itemName = filter.getItemName(); + if (itemName != null) { + String name = influxDBMetadataService.getMeasurementNameOrDefault(itemName, itemName); + String measurementName = configuration.isReplaceUnderscore() ? name.replace('_', '.') : name; + predicate = "(_measurement=\"" + measurementName + "\")"; + } + + try { + deleteAPI.delete(start, stop, predicate, configuration.getRetentionPolicy(), + configuration.getDatabaseName()); + } catch (InfluxException e) { + logger.debug("Deleting from database failed", e); + return false; + } + + return true; + } + private Optional convertPointToClientFormat(InfluxPoint point) { Point clientPoint = Point.measurement(point.getMeasurementName()).time(point.getTime(), WritePrecision.MS); @Nullable @@ -158,9 +202,11 @@ public class InfluxDB2RepositoryImpl implements InfluxDBRepository { } @Override - public List query(String query) { + public List query(FilterCriteria filter, String retentionPolicy) { final QueryApi currentQueryAPI = queryAPI; if (currentQueryAPI != null) { + String query = queryCreator.createQuery(filter, retentionPolicy); + logger.trace("Query {}", query); List clientResult = currentQueryAPI.query(query); return clientResult.stream().flatMap(this::mapRawResultToHistoric).toList(); } else { @@ -204,9 +250,4 @@ public class InfluxDB2RepositoryImpl implements InfluxDBRepository { return Collections.emptyMap(); } } - - @Override - public FilterCriteriaQueryCreator createQueryCreator() { - return new InfluxDB2FilterCriteriaQueryCreatorImpl(configuration, influxDBMetadataService); - } } diff --git a/bundles/org.openhab.persistence.influxdb/src/test/java/org/openhab/persistence/influxdb/ItemToStorePointCreatorTest.java b/bundles/org.openhab.persistence.influxdb/src/test/java/org/openhab/persistence/influxdb/ItemToStorePointCreatorTest.java index 397ba8e1dd2..8f7387f15b6 100644 --- a/bundles/org.openhab.persistence.influxdb/src/test/java/org/openhab/persistence/influxdb/ItemToStorePointCreatorTest.java +++ b/bundles/org.openhab.persistence.influxdb/src/test/java/org/openhab/persistence/influxdb/ItemToStorePointCreatorTest.java @@ -18,6 +18,7 @@ import static org.mockito.Mockito.when; import static org.openhab.persistence.influxdb.internal.InfluxDBConfiguration.*; import java.math.BigDecimal; +import java.time.Instant; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutionException; @@ -74,7 +75,7 @@ public class ItemToStorePointCreatorTest { @MethodSource public void convertBasicItem(Number number) throws ExecutionException, InterruptedException { NumberItem item = ItemTestHelper.createNumberItem("myitem", number); - InfluxPoint point = instance.convert(item, null).get(); + InfluxPoint point = instance.convert(item, item.getState(), Instant.now(), null).get(); if (point == null) { Assertions.fail("'point' is null"); @@ -94,7 +95,7 @@ public class ItemToStorePointCreatorTest { @Test public void shouldUseAliasAsMeasurementNameIfProvided() throws ExecutionException, InterruptedException { NumberItem item = ItemTestHelper.createNumberItem("myitem", 5); - InfluxPoint point = instance.convert(item, "aliasName").get(); + InfluxPoint point = instance.convert(item, item.getState(), Instant.now(), "aliasName").get(); if (point == null) { Assertions.fail("'point' is null"); @@ -110,7 +111,7 @@ public class ItemToStorePointCreatorTest { item.setCategory("categoryValue"); instance = getService(false, true, false, false); - InfluxPoint point = instance.convert(item, null).get(); + InfluxPoint point = instance.convert(item, item.getState(), Instant.now(), null).get(); if (point == null) { Assertions.fail("'point' is null"); @@ -120,7 +121,7 @@ public class ItemToStorePointCreatorTest { assertThat(point.getTags(), hasEntry(InfluxDBConstants.TAG_CATEGORY_NAME, "categoryValue")); instance = getService(false, false, false, false); - point = instance.convert(item, null).get(); + point = instance.convert(item, item.getState(), Instant.now(), null).get(); if (point == null) { Assertions.fail("'point' is null"); @@ -135,7 +136,7 @@ public class ItemToStorePointCreatorTest { NumberItem item = ItemTestHelper.createNumberItem("myitem", 5); instance = getService(false, false, false, true); - InfluxPoint point = instance.convert(item, null).get(); + InfluxPoint point = instance.convert(item, item.getState(), Instant.now(), null).get(); if (point == null) { Assertions.fail("'point' is null"); @@ -145,7 +146,7 @@ public class ItemToStorePointCreatorTest { assertThat(point.getTags(), hasEntry(InfluxDBConstants.TAG_TYPE_NAME, "Number")); instance = getService(false, false, false, false); - point = instance.convert(item, null).get(); + point = instance.convert(item, item.getState(), Instant.now(), null).get(); if (point == null) { Assertions.fail("'point' is null"); @@ -161,7 +162,7 @@ public class ItemToStorePointCreatorTest { item.setLabel("ItemLabel"); instance = getService(false, false, true, false); - InfluxPoint point = instance.convert(item, null).get(); + InfluxPoint point = instance.convert(item, item.getState(), Instant.now(), null).get(); if (point == null) { Assertions.fail("'point' is null"); @@ -171,7 +172,7 @@ public class ItemToStorePointCreatorTest { assertThat(point.getTags(), hasEntry(InfluxDBConstants.TAG_LABEL_NAME, "ItemLabel")); instance = getService(false, false, false, false); - point = instance.convert(item, null).get(); + point = instance.convert(item, item.getState(), Instant.now(), null).get(); if (point == null) { Assertions.fail("'point' is null"); @@ -189,7 +190,7 @@ public class ItemToStorePointCreatorTest { when(metadataRegistry.get(metadataKey)) .thenReturn(new Metadata(metadataKey, "", Map.of("key1", "val1", "key2", "val2"))); - InfluxPoint point = instance.convert(item, null).get(); + InfluxPoint point = instance.convert(item, item.getState(), Instant.now(), null).get(); if (point == null) { Assertions.fail("'point' is null"); @@ -205,14 +206,14 @@ public class ItemToStorePointCreatorTest { NumberItem item = ItemTestHelper.createNumberItem("myitem", 5); MetadataKey metadataKey = new MetadataKey(InfluxDBPersistenceService.SERVICE_NAME, item.getName()); - InfluxPoint point = instance.convert(item, null).get(); + InfluxPoint point = instance.convert(item, item.getState(), Instant.now(), null).get(); if (point == null) { Assertions.fail(); return; } assertThat(point.getMeasurementName(), equalTo(item.getName())); - point = instance.convert(item, null).get(); + point = instance.convert(item, item.getState(), Instant.now(), null).get(); if (point == null) { Assertions.fail(); return; @@ -223,7 +224,7 @@ public class ItemToStorePointCreatorTest { when(metadataRegistry.get(metadataKey)) .thenReturn(new Metadata(metadataKey, "measurementName", Map.of("key1", "val1", "key2", "val2"))); - point = instance.convert(item, null).get(); + point = instance.convert(item, item.getState(), Instant.now(), null).get(); if (point == null) { Assertions.fail(); return; @@ -234,7 +235,7 @@ public class ItemToStorePointCreatorTest { when(metadataRegistry.get(metadataKey)) .thenReturn(new Metadata(metadataKey, "", Map.of("key1", "val1", "key2", "val2"))); - point = instance.convert(item, null).get(); + point = instance.convert(item, item.getState(), Instant.now(), null).get(); if (point == null) { Assertions.fail(); return;