[influxdb] Implement ModifiablePersistenceService (#14959)

* [influxdb] Implement ModifiablePersistenceService

Signed-off-by: Jan N. Klug <github@klug.nrw>
This commit is contained in:
J-N-K 2023-05-13 12:37:48 +02:00 committed by GitHub
parent 01add04f9e
commit 717addee9a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 111 additions and 39 deletions

View File

@ -42,6 +42,7 @@ import org.openhab.core.items.ItemRegistry;
import org.openhab.core.items.ItemUtil;
import org.openhab.core.persistence.FilterCriteria;
import org.openhab.core.persistence.HistoricItem;
import org.openhab.core.persistence.ModifiablePersistenceService;
import org.openhab.core.persistence.PersistenceItemInfo;
import org.openhab.core.persistence.PersistenceService;
import org.openhab.core.persistence.QueryablePersistenceService;
@ -92,7 +93,7 @@ import org.slf4j.LoggerFactory;
QueryablePersistenceService.class }, configurationPid = "org.openhab.influxdb", //
property = Constants.SERVICE_PID + "=org.openhab.influxdb")
@ConfigurableService(category = "persistence", label = "InfluxDB Persistence Service", description_uri = InfluxDBPersistenceService.CONFIG_URI)
public class InfluxDBPersistenceService implements QueryablePersistenceService {
public class InfluxDBPersistenceService implements ModifiablePersistenceService {
public static final String SERVICE_NAME = "influxdb";
private final Logger logger = LoggerFactory.getLogger(InfluxDBPersistenceService.class);
@ -190,11 +191,20 @@ public class InfluxDBPersistenceService implements QueryablePersistenceService {
@Override
public void store(Item item, @Nullable String alias) {
store(item, ZonedDateTime.now(), item.getState(), alias);
}
@Override
public void store(Item item, ZonedDateTime date, State state) {
store(item, date, state, null);
}
public void store(Item item, ZonedDateTime date, State state, @Nullable String alias) {
if (!serviceActivated) {
logger.warn("InfluxDB service not ready. Storing {} rejected.", item);
return;
}
convert(item, alias).thenAccept(point -> {
convert(item, state, date.toInstant(), null).thenAccept(point -> {
if (point == null) {
logger.trace("Ignoring item {}, conversion to an InfluxDB point failed.", item.getName());
return;
@ -207,6 +217,20 @@ public class InfluxDBPersistenceService implements QueryablePersistenceService {
});
}
@Override
public boolean remove(FilterCriteria filter) throws IllegalArgumentException {
if (serviceActivated && checkConnection()) {
if (filter.getItemName() == null) {
logger.warn("Item name is missing in filter {} when trying to remove data.", filter);
return false;
}
return influxDBRepository.remove(filter);
} else {
logger.debug("Remove query {} ignored, InfluxDB is not connected.", filter);
return false;
}
}
@Override
public Iterable<HistoricItem> query(FilterCriteria filter) {
if (serviceActivated && checkConnection()) {
@ -215,13 +239,12 @@ public class InfluxDBPersistenceService implements QueryablePersistenceService {
filter.getItemName(), filter.getOrdering().toString(), filter.getState(), filter.getOperator(),
filter.getBeginDate(), filter.getEndDate(), filter.getPageSize(), filter.getPageNumber());
if (filter.getItemName() == null) {
logger.warn("Item name is missing in filter {}", filter);
logger.warn("Item name is missing in filter {} when querying data.", filter);
return List.of();
}
String query = influxDBRepository.createQueryCreator().createQuery(filter,
List<InfluxDBRepository.InfluxRow> results = influxDBRepository.query(filter,
configuration.getRetentionPolicy());
logger.trace("Query {}", query);
List<InfluxDBRepository.InfluxRow> results = influxDBRepository.query(query);
return results.stream().map(this::mapRowToHistoricItem).collect(Collectors.toList());
} else {
logger.debug("Query for persisted data ignored, InfluxDB is not connected");
@ -279,13 +302,12 @@ public class InfluxDBPersistenceService implements QueryablePersistenceService {
* @return a {@link CompletableFuture} that contains either <code>null</code> for item states that cannot be
* converted or the corresponding {@link InfluxPoint}
*/
CompletableFuture<@Nullable InfluxPoint> convert(Item item, @Nullable String storeAlias) {
CompletableFuture<@Nullable InfluxPoint> convert(Item item, State state, Instant timeStamp,
@Nullable String storeAlias) {
String itemName = item.getName();
String itemLabel = item.getLabel();
String category = item.getCategory();
State state = item.getState();
String itemType = item.getType();
Instant timeStamp = Instant.now();
if (state instanceof UnDefType) {
return CompletableFuture.completedFuture(null);

View File

@ -17,6 +17,7 @@ import java.util.List;
import java.util.Map;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.openhab.core.persistence.FilterCriteria;
/**
* Manages InfluxDB server interaction maintaining client connection
@ -61,11 +62,11 @@ public interface InfluxDBRepository {
/**
* Executes Flux query
*
* @param query Query
* @param filter the query filter
* @return Query results
*
*/
List<InfluxRow> query(String query);
List<InfluxRow> query(FilterCriteria filter, String retentionPolicy);
/**
* Write points to database
@ -76,11 +77,12 @@ public interface InfluxDBRepository {
boolean write(List<InfluxPoint> influxPoints);
/**
* create a query creator on this repository
* Execute delete query
*
* @return the query creator for this repository
* @param filter the query filter
* @return <code>true</code> if query executed successfully, <code>false</code> otherwise
*/
FilterCriteriaQueryCreator createQueryCreator();
boolean remove(FilterCriteria filter);
record InfluxRow(Instant time, String itemName, Object value) {
}

View File

@ -35,6 +35,7 @@ import org.influxdb.dto.Point;
import org.influxdb.dto.Pong;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
import org.openhab.core.persistence.FilterCriteria;
import org.openhab.persistence.influxdb.internal.FilterCriteriaQueryCreator;
import org.openhab.persistence.influxdb.internal.InfluxDBConfiguration;
import org.openhab.persistence.influxdb.internal.InfluxDBMetadataService;
@ -58,12 +59,14 @@ public class InfluxDB1RepositoryImpl implements InfluxDBRepository {
private final Logger logger = LoggerFactory.getLogger(InfluxDB1RepositoryImpl.class);
private final InfluxDBConfiguration configuration;
private final InfluxDBMetadataService influxDBMetadataService;
private final FilterCriteriaQueryCreator queryCreator;
private @Nullable InfluxDB client;
public InfluxDB1RepositoryImpl(InfluxDBConfiguration configuration,
InfluxDBMetadataService influxDBMetadataService) {
this.configuration = configuration;
this.influxDBMetadataService = influxDBMetadataService;
this.queryCreator = new InfluxDB1FilterCriteriaQueryCreatorImpl(configuration, influxDBMetadataService);
}
@Override
@ -134,6 +137,12 @@ public class InfluxDB1RepositoryImpl implements InfluxDBRepository {
return true;
}
@Override
public boolean remove(FilterCriteria filter) {
logger.warn("Removing data is not supported in InfluxDB v1.");
return false;
}
private Optional<Point> convertPointToClientFormat(InfluxPoint point) {
Point.Builder clientPoint = Point.measurement(point.getMeasurementName()).time(point.getTime().toEpochMilli(),
TimeUnit.MILLISECONDS);
@ -155,9 +164,11 @@ public class InfluxDB1RepositoryImpl implements InfluxDBRepository {
}
@Override
public List<InfluxRow> query(String query) {
public List<InfluxRow> query(FilterCriteria filter, String retentionPolicy) {
final InfluxDB currentClient = client;
if (currentClient != null) {
String query = queryCreator.createQuery(filter, retentionPolicy);
logger.trace("Query {}", query);
Query parsedQuery = new Query(query, configuration.getDatabaseName());
List<QueryResult.Result> results = currentClient.query(parsedQuery, TimeUnit.MILLISECONDS).getResults();
return convertClientResultToRepository(results);
@ -216,9 +227,4 @@ public class InfluxDB1RepositoryImpl implements InfluxDBRepository {
public Map<String, Integer> getStoredItemsCount() {
return Collections.emptyMap();
}
@Override
public FilterCriteriaQueryCreator createQueryCreator() {
return new InfluxDB1FilterCriteriaQueryCreatorImpl(configuration, influxDBMetadataService);
}
}

View File

@ -15,6 +15,8 @@ package org.openhab.persistence.influxdb.internal.influx2;
import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.*;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZonedDateTime;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
@ -25,6 +27,7 @@ import java.util.stream.Stream;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.openhab.core.persistence.FilterCriteria;
import org.openhab.persistence.influxdb.internal.FilterCriteriaQueryCreator;
import org.openhab.persistence.influxdb.internal.InfluxDBConfiguration;
import org.openhab.persistence.influxdb.internal.InfluxDBConstants;
@ -34,6 +37,7 @@ import org.openhab.persistence.influxdb.internal.InfluxPoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.influxdb.client.DeleteApi;
import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.InfluxDBClientFactory;
import com.influxdb.client.InfluxDBClientOptions;
@ -55,15 +59,18 @@ public class InfluxDB2RepositoryImpl implements InfluxDBRepository {
private final Logger logger = LoggerFactory.getLogger(InfluxDB2RepositoryImpl.class);
private final InfluxDBConfiguration configuration;
private final InfluxDBMetadataService influxDBMetadataService;
private final FilterCriteriaQueryCreator queryCreator;
private @Nullable InfluxDBClient client;
private @Nullable QueryApi queryAPI;
private @Nullable WriteApi writeAPI;
private @Nullable DeleteApi deleteAPI;
public InfluxDB2RepositoryImpl(InfluxDBConfiguration configuration,
InfluxDBMetadataService influxDBMetadataService) {
this.configuration = configuration;
this.influxDBMetadataService = influxDBMetadataService;
this.queryCreator = new InfluxDB2FilterCriteriaQueryCreatorImpl(configuration, influxDBMetadataService);
}
@Override
@ -88,6 +95,7 @@ public class InfluxDB2RepositoryImpl implements InfluxDBRepository {
queryAPI = createdClient.getQueryApi();
writeAPI = createdClient.getWriteApi();
deleteAPI = createdClient.getDeleteApi();
logger.debug("Successfully connected to InfluxDB. Instance ready={}", createdClient.ready());
return checkConnectionStatus();
@ -137,6 +145,42 @@ public class InfluxDB2RepositoryImpl implements InfluxDBRepository {
return true;
}
@Override
public boolean remove(FilterCriteria filter) {
final DeleteApi currentDeleteApi = deleteAPI;
if (currentDeleteApi == null) {
return false;
}
if (filter.getState() != null) {
logger.warn("Deleting by value is not supported in InfluxDB v2.");
return false;
}
OffsetDateTime start = Objects.requireNonNullElse(filter.getBeginDate(), ZonedDateTime.now().minusYears(100))
.toOffsetDateTime();
OffsetDateTime stop = Objects.requireNonNullElse(filter.getEndDate(), ZonedDateTime.now().plusYears(100))
.toOffsetDateTime();
// create predicate
String predicate = "";
String itemName = filter.getItemName();
if (itemName != null) {
String name = influxDBMetadataService.getMeasurementNameOrDefault(itemName, itemName);
String measurementName = configuration.isReplaceUnderscore() ? name.replace('_', '.') : name;
predicate = "(_measurement=\"" + measurementName + "\")";
}
try {
deleteAPI.delete(start, stop, predicate, configuration.getRetentionPolicy(),
configuration.getDatabaseName());
} catch (InfluxException e) {
logger.debug("Deleting from database failed", e);
return false;
}
return true;
}
private Optional<Point> convertPointToClientFormat(InfluxPoint point) {
Point clientPoint = Point.measurement(point.getMeasurementName()).time(point.getTime(), WritePrecision.MS);
@Nullable
@ -158,9 +202,11 @@ public class InfluxDB2RepositoryImpl implements InfluxDBRepository {
}
@Override
public List<InfluxRow> query(String query) {
public List<InfluxRow> query(FilterCriteria filter, String retentionPolicy) {
final QueryApi currentQueryAPI = queryAPI;
if (currentQueryAPI != null) {
String query = queryCreator.createQuery(filter, retentionPolicy);
logger.trace("Query {}", query);
List<FluxTable> clientResult = currentQueryAPI.query(query);
return clientResult.stream().flatMap(this::mapRawResultToHistoric).toList();
} else {
@ -204,9 +250,4 @@ public class InfluxDB2RepositoryImpl implements InfluxDBRepository {
return Collections.emptyMap();
}
}
@Override
public FilterCriteriaQueryCreator createQueryCreator() {
return new InfluxDB2FilterCriteriaQueryCreatorImpl(configuration, influxDBMetadataService);
}
}

View File

@ -18,6 +18,7 @@ import static org.mockito.Mockito.when;
import static org.openhab.persistence.influxdb.internal.InfluxDBConfiguration.*;
import java.math.BigDecimal;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
@ -74,7 +75,7 @@ public class ItemToStorePointCreatorTest {
@MethodSource
public void convertBasicItem(Number number) throws ExecutionException, InterruptedException {
NumberItem item = ItemTestHelper.createNumberItem("myitem", number);
InfluxPoint point = instance.convert(item, null).get();
InfluxPoint point = instance.convert(item, item.getState(), Instant.now(), null).get();
if (point == null) {
Assertions.fail("'point' is null");
@ -94,7 +95,7 @@ public class ItemToStorePointCreatorTest {
@Test
public void shouldUseAliasAsMeasurementNameIfProvided() throws ExecutionException, InterruptedException {
NumberItem item = ItemTestHelper.createNumberItem("myitem", 5);
InfluxPoint point = instance.convert(item, "aliasName").get();
InfluxPoint point = instance.convert(item, item.getState(), Instant.now(), "aliasName").get();
if (point == null) {
Assertions.fail("'point' is null");
@ -110,7 +111,7 @@ public class ItemToStorePointCreatorTest {
item.setCategory("categoryValue");
instance = getService(false, true, false, false);
InfluxPoint point = instance.convert(item, null).get();
InfluxPoint point = instance.convert(item, item.getState(), Instant.now(), null).get();
if (point == null) {
Assertions.fail("'point' is null");
@ -120,7 +121,7 @@ public class ItemToStorePointCreatorTest {
assertThat(point.getTags(), hasEntry(InfluxDBConstants.TAG_CATEGORY_NAME, "categoryValue"));
instance = getService(false, false, false, false);
point = instance.convert(item, null).get();
point = instance.convert(item, item.getState(), Instant.now(), null).get();
if (point == null) {
Assertions.fail("'point' is null");
@ -135,7 +136,7 @@ public class ItemToStorePointCreatorTest {
NumberItem item = ItemTestHelper.createNumberItem("myitem", 5);
instance = getService(false, false, false, true);
InfluxPoint point = instance.convert(item, null).get();
InfluxPoint point = instance.convert(item, item.getState(), Instant.now(), null).get();
if (point == null) {
Assertions.fail("'point' is null");
@ -145,7 +146,7 @@ public class ItemToStorePointCreatorTest {
assertThat(point.getTags(), hasEntry(InfluxDBConstants.TAG_TYPE_NAME, "Number"));
instance = getService(false, false, false, false);
point = instance.convert(item, null).get();
point = instance.convert(item, item.getState(), Instant.now(), null).get();
if (point == null) {
Assertions.fail("'point' is null");
@ -161,7 +162,7 @@ public class ItemToStorePointCreatorTest {
item.setLabel("ItemLabel");
instance = getService(false, false, true, false);
InfluxPoint point = instance.convert(item, null).get();
InfluxPoint point = instance.convert(item, item.getState(), Instant.now(), null).get();
if (point == null) {
Assertions.fail("'point' is null");
@ -171,7 +172,7 @@ public class ItemToStorePointCreatorTest {
assertThat(point.getTags(), hasEntry(InfluxDBConstants.TAG_LABEL_NAME, "ItemLabel"));
instance = getService(false, false, false, false);
point = instance.convert(item, null).get();
point = instance.convert(item, item.getState(), Instant.now(), null).get();
if (point == null) {
Assertions.fail("'point' is null");
@ -189,7 +190,7 @@ public class ItemToStorePointCreatorTest {
when(metadataRegistry.get(metadataKey))
.thenReturn(new Metadata(metadataKey, "", Map.of("key1", "val1", "key2", "val2")));
InfluxPoint point = instance.convert(item, null).get();
InfluxPoint point = instance.convert(item, item.getState(), Instant.now(), null).get();
if (point == null) {
Assertions.fail("'point' is null");
@ -205,14 +206,14 @@ public class ItemToStorePointCreatorTest {
NumberItem item = ItemTestHelper.createNumberItem("myitem", 5);
MetadataKey metadataKey = new MetadataKey(InfluxDBPersistenceService.SERVICE_NAME, item.getName());
InfluxPoint point = instance.convert(item, null).get();
InfluxPoint point = instance.convert(item, item.getState(), Instant.now(), null).get();
if (point == null) {
Assertions.fail();
return;
}
assertThat(point.getMeasurementName(), equalTo(item.getName()));
point = instance.convert(item, null).get();
point = instance.convert(item, item.getState(), Instant.now(), null).get();
if (point == null) {
Assertions.fail();
return;
@ -223,7 +224,7 @@ public class ItemToStorePointCreatorTest {
when(metadataRegistry.get(metadataKey))
.thenReturn(new Metadata(metadataKey, "measurementName", Map.of("key1", "val1", "key2", "val2")));
point = instance.convert(item, null).get();
point = instance.convert(item, item.getState(), Instant.now(), null).get();
if (point == null) {
Assertions.fail();
return;
@ -234,7 +235,7 @@ public class ItemToStorePointCreatorTest {
when(metadataRegistry.get(metadataKey))
.thenReturn(new Metadata(metadataKey, "", Map.of("key1", "val1", "key2", "val2")));
point = instance.convert(item, null).get();
point = instance.convert(item, item.getState(), Instant.now(), null).get();
if (point == null) {
Assertions.fail();
return;