mirror of
https://github.com/openhab/openhab-addons.git
synced 2025-01-26 15:21:41 +01:00
[influxdb] Code improvements and enhancements (#14304)
* [influxdb] code improvements Signed-off-by: Jan N. Klug <github@klug.nrw>
This commit is contained in:
parent
0e246aa313
commit
ed7159c780
@ -16,82 +16,87 @@
|
|||||||
|
|
||||||
<properties>
|
<properties>
|
||||||
<bnd.importpackage>
|
<bnd.importpackage>
|
||||||
!javax.annotation;!android.*,!com.android.*,!com.google.appengine.*,!dalvik.system,!kotlin.*,!kotlinx.*,!org.conscrypt,!sun.security.ssl,!org.apache.harmony.*,!org.apache.http.*,!rx.*,!org.msgpack.*
|
!javax.annotation.*;!android.*,!com.android.*,!com.google.appengine.*,!dalvik.system,!kotlin.*,!kotlinx.*,!org.conscrypt,!sun.security.ssl,!org.apache.harmony.*,!org.apache.http.*,!rx.*,!org.msgpack.*
|
||||||
</bnd.importpackage>
|
</bnd.importpackage>
|
||||||
|
<okhttp3.version>3.14.9</okhttp3.version>
|
||||||
|
<retrofit.version>2.7.2</retrofit.version>
|
||||||
|
<influx2.version>1.15.0</influx2.version>
|
||||||
|
<influx1.version>2.21</influx1.version>
|
||||||
</properties>
|
</properties>
|
||||||
|
|
||||||
<dependencies>
|
<dependencies>
|
||||||
<!-- START InfluxDB 2.0 -->
|
<!-- START InfluxDB 2.0 -->
|
||||||
<!-- START influxdb-client-java -->
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>com.influxdb</groupId>
|
<groupId>com.influxdb</groupId>
|
||||||
<artifactId>influxdb-client-java</artifactId>
|
<artifactId>influxdb-client-java</artifactId>
|
||||||
<version>1.6.0</version>
|
<version>${influx2.version}</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<artifactId>influxdb-client-core</artifactId>
|
|
||||||
<groupId>com.influxdb</groupId>
|
<groupId>com.influxdb</groupId>
|
||||||
<version>1.6.0</version>
|
<artifactId>influxdb-client-core</artifactId>
|
||||||
|
<version>${influx2.version}</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
|
<groupId>com.influxdb</groupId>
|
||||||
|
<artifactId>flux-dsl</artifactId>
|
||||||
|
<version>${influx2.version}</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.squareup.retrofit2</groupId>
|
||||||
<artifactId>converter-gson</artifactId>
|
<artifactId>converter-gson</artifactId>
|
||||||
<groupId>com.squareup.retrofit2</groupId>
|
<version>${retrofit.version}</version>
|
||||||
<version>2.5.0</version>
|
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
|
<groupId>com.squareup.retrofit2</groupId>
|
||||||
<artifactId>converter-scalars</artifactId>
|
<artifactId>converter-scalars</artifactId>
|
||||||
<groupId>com.squareup.retrofit2</groupId>
|
<version>${retrofit.version}</version>
|
||||||
<version>2.5.0</version>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
|
||||||
<artifactId>gson</artifactId>
|
|
||||||
<groupId>com.google.code.gson</groupId>
|
|
||||||
<version>2.9.1</version>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
|
||||||
<artifactId>gson-fire</artifactId>
|
|
||||||
<groupId>io.gsonfire</groupId>
|
|
||||||
<version>1.8.0</version>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
|
||||||
<artifactId>okio</artifactId>
|
|
||||||
<groupId>com.squareup.okio</groupId>
|
|
||||||
<version>1.17.3</version>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
|
||||||
<artifactId>commons-csv</artifactId>
|
|
||||||
<groupId>org.apache.commons</groupId>
|
|
||||||
<version>1.6</version>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
|
||||||
<artifactId>json</artifactId>
|
|
||||||
<groupId>org.json</groupId>
|
|
||||||
<version>20180813</version>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
|
||||||
<artifactId>okhttp</artifactId>
|
|
||||||
<groupId>com.squareup.okhttp3</groupId>
|
|
||||||
<version>${okhttp.version}</version>
|
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<artifactId>retrofit</artifactId>
|
<artifactId>retrofit</artifactId>
|
||||||
<groupId>com.squareup.retrofit2</groupId>
|
<groupId>com.squareup.retrofit2</groupId>
|
||||||
<version>2.6.2</version>
|
<version>${retrofit.version}</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<artifactId>jsr305</artifactId>
|
|
||||||
<groupId>com.google.code.findbugs</groupId>
|
|
||||||
<version>3.0.2</version>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
|
||||||
<artifactId>logging-interceptor</artifactId>
|
|
||||||
<groupId>com.squareup.okhttp3</groupId>
|
<groupId>com.squareup.okhttp3</groupId>
|
||||||
<version>${okhttp.version}</version>
|
<artifactId>okhttp</artifactId>
|
||||||
|
<version>${okhttp3.version}</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.squareup.okhttp3</groupId>
|
||||||
|
<artifactId>logging-interceptor</artifactId>
|
||||||
|
<version>${okhttp3.version}</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.google.code.gson</groupId>
|
||||||
|
<artifactId>gson</artifactId>
|
||||||
|
<version>2.9.1</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>io.gsonfire</groupId>
|
||||||
|
<artifactId>gson-fire</artifactId>
|
||||||
|
<version>1.8.4</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.squareup.okio</groupId>
|
||||||
|
<artifactId>okio</artifactId>
|
||||||
|
<version>1.17.3</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.commons</groupId>
|
||||||
|
<artifactId>commons-csv</artifactId>
|
||||||
|
<version>1.8</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<artifactId>json</artifactId>
|
||||||
|
<groupId>org.json</groupId>
|
||||||
|
<version>20200518</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<artifactId>rxjava</artifactId>
|
<artifactId>rxjava</artifactId>
|
||||||
<groupId>io.reactivex.rxjava2</groupId>
|
<groupId>io.reactivex.rxjava2</groupId>
|
||||||
<version>2.2.17</version>
|
<version>2.2.19</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<artifactId>reactive-streams</artifactId>
|
<artifactId>reactive-streams</artifactId>
|
||||||
@ -101,29 +106,20 @@
|
|||||||
<dependency>
|
<dependency>
|
||||||
<artifactId>swagger-annotations</artifactId>
|
<artifactId>swagger-annotations</artifactId>
|
||||||
<groupId>io.swagger</groupId>
|
<groupId>io.swagger</groupId>
|
||||||
<version>1.5.22</version>
|
<version>1.6.1</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
<!--END influxdb-client-java -->
|
|
||||||
|
|
||||||
|
|
||||||
<dependency>
|
|
||||||
<groupId>com.influxdb</groupId>
|
|
||||||
<artifactId>flux-dsl</artifactId>
|
|
||||||
<version>1.6.0</version>
|
|
||||||
</dependency>
|
|
||||||
|
|
||||||
<!--END InfluxDB 2.0 -->
|
<!--END InfluxDB 2.0 -->
|
||||||
|
|
||||||
<!--START InfluxDB 1.0 -->
|
<!--START InfluxDB 1.0 -->
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.influxdb</groupId>
|
<groupId>org.influxdb</groupId>
|
||||||
<artifactId>influxdb-java</artifactId>
|
<artifactId>influxdb-java</artifactId>
|
||||||
<version>2.17</version>
|
<version>${influx1.version}</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>com.squareup.retrofit2</groupId>
|
<groupId>com.squareup.retrofit2</groupId>
|
||||||
<artifactId>converter-moshi</artifactId>
|
<artifactId>converter-moshi</artifactId>
|
||||||
<version>2.6.2</version>
|
<version>${retrofit.version}</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>com.squareup.moshi</groupId>
|
<groupId>com.squareup.moshi</groupId>
|
||||||
@ -135,4 +131,5 @@
|
|||||||
<!-- END InfluxDB 1.0 -->
|
<!-- END InfluxDB 1.0 -->
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
|
|
||||||
</project>
|
</project>
|
||||||
|
@ -25,7 +25,6 @@ import org.eclipse.jdt.annotation.Nullable;
|
|||||||
import org.openhab.core.config.core.ConfigurableService;
|
import org.openhab.core.config.core.ConfigurableService;
|
||||||
import org.openhab.core.items.Item;
|
import org.openhab.core.items.Item;
|
||||||
import org.openhab.core.items.ItemRegistry;
|
import org.openhab.core.items.ItemRegistry;
|
||||||
import org.openhab.core.items.MetadataRegistry;
|
|
||||||
import org.openhab.core.persistence.FilterCriteria;
|
import org.openhab.core.persistence.FilterCriteria;
|
||||||
import org.openhab.core.persistence.HistoricItem;
|
import org.openhab.core.persistence.HistoricItem;
|
||||||
import org.openhab.core.persistence.PersistenceItemInfo;
|
import org.openhab.core.persistence.PersistenceItemInfo;
|
||||||
@ -36,18 +35,19 @@ import org.openhab.core.types.State;
|
|||||||
import org.openhab.persistence.influxdb.internal.FilterCriteriaQueryCreator;
|
import org.openhab.persistence.influxdb.internal.FilterCriteriaQueryCreator;
|
||||||
import org.openhab.persistence.influxdb.internal.InfluxDBConfiguration;
|
import org.openhab.persistence.influxdb.internal.InfluxDBConfiguration;
|
||||||
import org.openhab.persistence.influxdb.internal.InfluxDBHistoricItem;
|
import org.openhab.persistence.influxdb.internal.InfluxDBHistoricItem;
|
||||||
|
import org.openhab.persistence.influxdb.internal.InfluxDBMetadataService;
|
||||||
import org.openhab.persistence.influxdb.internal.InfluxDBPersistentItemInfo;
|
import org.openhab.persistence.influxdb.internal.InfluxDBPersistentItemInfo;
|
||||||
import org.openhab.persistence.influxdb.internal.InfluxDBRepository;
|
import org.openhab.persistence.influxdb.internal.InfluxDBRepository;
|
||||||
import org.openhab.persistence.influxdb.internal.InfluxDBStateConvertUtils;
|
import org.openhab.persistence.influxdb.internal.InfluxDBStateConvertUtils;
|
||||||
import org.openhab.persistence.influxdb.internal.InfluxPoint;
|
import org.openhab.persistence.influxdb.internal.InfluxPoint;
|
||||||
import org.openhab.persistence.influxdb.internal.InfluxRow;
|
|
||||||
import org.openhab.persistence.influxdb.internal.ItemToStorePointCreator;
|
import org.openhab.persistence.influxdb.internal.ItemToStorePointCreator;
|
||||||
import org.openhab.persistence.influxdb.internal.RepositoryFactory;
|
import org.openhab.persistence.influxdb.internal.UnexpectedConditionException;
|
||||||
|
import org.openhab.persistence.influxdb.internal.influx1.InfluxDB1RepositoryImpl;
|
||||||
|
import org.openhab.persistence.influxdb.internal.influx2.InfluxDB2RepositoryImpl;
|
||||||
import org.osgi.framework.Constants;
|
import org.osgi.framework.Constants;
|
||||||
import org.osgi.service.component.annotations.Activate;
|
import org.osgi.service.component.annotations.Activate;
|
||||||
import org.osgi.service.component.annotations.Component;
|
import org.osgi.service.component.annotations.Component;
|
||||||
import org.osgi.service.component.annotations.Deactivate;
|
import org.osgi.service.component.annotations.Deactivate;
|
||||||
import org.osgi.service.component.annotations.Modified;
|
|
||||||
import org.osgi.service.component.annotations.Reference;
|
import org.osgi.service.component.annotations.Reference;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
@ -85,47 +85,38 @@ public class InfluxDBPersistenceService implements QueryablePersistenceService {
|
|||||||
|
|
||||||
// External dependencies
|
// External dependencies
|
||||||
private final ItemRegistry itemRegistry;
|
private final ItemRegistry itemRegistry;
|
||||||
private final MetadataRegistry metadataRegistry;
|
private final InfluxDBMetadataService influxDBMetadataService;
|
||||||
|
|
||||||
// Internal dependencies/state
|
private final InfluxDBConfiguration configuration;
|
||||||
private InfluxDBConfiguration configuration = InfluxDBConfiguration.NO_CONFIGURATION;
|
private final ItemToStorePointCreator itemToStorePointCreator;
|
||||||
|
private final InfluxDBRepository influxDBRepository;
|
||||||
// Relax rules because can only be null if component is not active
|
private boolean tryReconnection;
|
||||||
private @NonNullByDefault({}) ItemToStorePointCreator itemToStorePointCreator;
|
|
||||||
private @NonNullByDefault({}) InfluxDBRepository influxDBRepository;
|
|
||||||
|
|
||||||
private boolean tryReconnection = false;
|
|
||||||
|
|
||||||
@Activate
|
@Activate
|
||||||
public InfluxDBPersistenceService(final @Reference ItemRegistry itemRegistry,
|
public InfluxDBPersistenceService(final @Reference ItemRegistry itemRegistry,
|
||||||
final @Reference MetadataRegistry metadataRegistry) {
|
final @Reference InfluxDBMetadataService influxDBMetadataService, Map<String, Object> config) {
|
||||||
this.itemRegistry = itemRegistry;
|
this.itemRegistry = itemRegistry;
|
||||||
this.metadataRegistry = metadataRegistry;
|
this.influxDBMetadataService = influxDBMetadataService;
|
||||||
}
|
this.configuration = new InfluxDBConfiguration(config);
|
||||||
|
if (configuration.isValid()) {
|
||||||
/**
|
this.influxDBRepository = createInfluxDBRepository();
|
||||||
* Connect to database when service is activated
|
this.influxDBRepository.connect();
|
||||||
*/
|
this.itemToStorePointCreator = new ItemToStorePointCreator(configuration, influxDBMetadataService);
|
||||||
@Activate
|
|
||||||
public void activate(final @Nullable Map<String, Object> config) {
|
|
||||||
logger.debug("InfluxDB persistence service is being activated");
|
|
||||||
|
|
||||||
if (loadConfiguration(config)) {
|
|
||||||
itemToStorePointCreator = new ItemToStorePointCreator(configuration, metadataRegistry);
|
|
||||||
influxDBRepository = createInfluxDBRepository();
|
|
||||||
influxDBRepository.connect();
|
|
||||||
tryReconnection = true;
|
tryReconnection = true;
|
||||||
} else {
|
} else {
|
||||||
logger.error("Cannot load configuration, persistence service wont work");
|
throw new IllegalArgumentException("Configuration invalid.");
|
||||||
tryReconnection = false;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.debug("InfluxDB persistence service is now activated");
|
logger.info("InfluxDB persistence service started.");
|
||||||
}
|
}
|
||||||
|
|
||||||
// Visible for testing
|
// Visible for testing
|
||||||
protected InfluxDBRepository createInfluxDBRepository() {
|
protected InfluxDBRepository createInfluxDBRepository() throws IllegalArgumentException {
|
||||||
return RepositoryFactory.createRepository(configuration);
|
return switch (configuration.getVersion()) {
|
||||||
|
case V1 -> new InfluxDB1RepositoryImpl(configuration, influxDBMetadataService);
|
||||||
|
case V2 -> new InfluxDB2RepositoryImpl(configuration, influxDBMetadataService);
|
||||||
|
default -> throw new IllegalArgumentException("Failed to instantiate repository.");
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -133,48 +124,9 @@ public class InfluxDBPersistenceService implements QueryablePersistenceService {
|
|||||||
*/
|
*/
|
||||||
@Deactivate
|
@Deactivate
|
||||||
public void deactivate() {
|
public void deactivate() {
|
||||||
logger.debug("InfluxDB persistence service deactivated");
|
|
||||||
tryReconnection = false;
|
tryReconnection = false;
|
||||||
if (influxDBRepository != null) {
|
influxDBRepository.disconnect();
|
||||||
influxDBRepository.disconnect();
|
logger.info("InfluxDB persistence service stopped.");
|
||||||
influxDBRepository = null;
|
|
||||||
}
|
|
||||||
if (itemToStorePointCreator != null) {
|
|
||||||
itemToStorePointCreator = null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Rerun deactivation/activation code each time configuration is changed
|
|
||||||
*/
|
|
||||||
@Modified
|
|
||||||
protected void modified(@Nullable Map<String, Object> config) {
|
|
||||||
if (config != null) {
|
|
||||||
logger.debug("Config has been modified will deactivate/activate with new config");
|
|
||||||
|
|
||||||
deactivate();
|
|
||||||
activate(config);
|
|
||||||
} else {
|
|
||||||
logger.warn("Null configuration, ignoring");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private boolean loadConfiguration(@Nullable Map<String, Object> config) {
|
|
||||||
boolean configurationIsValid;
|
|
||||||
if (config != null) {
|
|
||||||
configuration = new InfluxDBConfiguration(config);
|
|
||||||
configurationIsValid = configuration.isValid();
|
|
||||||
if (configurationIsValid) {
|
|
||||||
logger.debug("Loaded configuration {}", config);
|
|
||||||
} else {
|
|
||||||
logger.warn("Some configuration properties are not valid {}", config);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
configuration = InfluxDBConfiguration.NO_CONFIGURATION;
|
|
||||||
configurationIsValid = false;
|
|
||||||
logger.warn("Ignoring configuration because it's null");
|
|
||||||
}
|
|
||||||
return configurationIsValid;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -190,18 +142,17 @@ public class InfluxDBPersistenceService implements QueryablePersistenceService {
|
|||||||
@Override
|
@Override
|
||||||
public Set<PersistenceItemInfo> getItemInfo() {
|
public Set<PersistenceItemInfo> getItemInfo() {
|
||||||
if (checkConnection()) {
|
if (checkConnection()) {
|
||||||
return influxDBRepository.getStoredItemsCount().entrySet().stream()
|
return influxDBRepository.getStoredItemsCount().entrySet().stream().map(InfluxDBPersistentItemInfo::new)
|
||||||
.map(entry -> new InfluxDBPersistentItemInfo(entry.getKey(), entry.getValue()))
|
|
||||||
.collect(Collectors.toUnmodifiableSet());
|
.collect(Collectors.toUnmodifiableSet());
|
||||||
} else {
|
} else {
|
||||||
logger.info("getItemInfo ignored, InfluxDB is not yet connected");
|
logger.info("getItemInfo ignored, InfluxDB is not connected");
|
||||||
return Set.of();
|
return Set.of();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void store(Item item) {
|
public void store(Item item) {
|
||||||
store(item, item.getName());
|
store(item, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -209,41 +160,42 @@ public class InfluxDBPersistenceService implements QueryablePersistenceService {
|
|||||||
if (checkConnection()) {
|
if (checkConnection()) {
|
||||||
InfluxPoint point = itemToStorePointCreator.convert(item, alias);
|
InfluxPoint point = itemToStorePointCreator.convert(item, alias);
|
||||||
if (point != null) {
|
if (point != null) {
|
||||||
logger.trace("Storing item {} in InfluxDB point {}", item, point);
|
try {
|
||||||
influxDBRepository.write(point);
|
influxDBRepository.write(point);
|
||||||
|
logger.trace("Stored item {} in InfluxDB point {}", item, point);
|
||||||
|
} catch (UnexpectedConditionException e) {
|
||||||
|
logger.warn("Failed to store item {} in InfluxDB point {}", point, item);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
logger.trace("Ignoring item {} as is cannot be converted to an InfluxDB point", item);
|
logger.trace("Ignoring item {}, conversion to an InfluxDB point failed.", item);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
logger.debug("store ignored, InfluxDB is not yet connected");
|
logger.debug("store ignored, InfluxDB is not connected");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Iterable<HistoricItem> query(FilterCriteria filter) {
|
public Iterable<HistoricItem> query(FilterCriteria filter) {
|
||||||
logger.debug("Got a query for historic points!");
|
|
||||||
|
|
||||||
if (checkConnection()) {
|
if (checkConnection()) {
|
||||||
logger.trace(
|
logger.trace(
|
||||||
"Filter: itemname: {}, ordering: {}, state: {}, operator: {}, getBeginDate: {}, getEndDate: {}, getPageSize: {}, getPageNumber: {}",
|
"Query-Filter: itemname: {}, ordering: {}, state: {}, operator: {}, getBeginDate: {}, getEndDate: {}, getPageSize: {}, getPageNumber: {}",
|
||||||
filter.getItemName(), filter.getOrdering().toString(), filter.getState(), filter.getOperator(),
|
filter.getItemName(), filter.getOrdering().toString(), filter.getState(), filter.getOperator(),
|
||||||
filter.getBeginDate(), filter.getEndDate(), filter.getPageSize(), filter.getPageNumber());
|
filter.getBeginDate(), filter.getEndDate(), filter.getPageSize(), filter.getPageNumber());
|
||||||
|
String query = influxDBRepository.createQueryCreator().createQuery(filter,
|
||||||
String query = RepositoryFactory.createQueryCreator(configuration, metadataRegistry).createQuery(filter,
|
|
||||||
configuration.getRetentionPolicy());
|
configuration.getRetentionPolicy());
|
||||||
logger.trace("Query {}", query);
|
logger.trace("Query {}", query);
|
||||||
List<InfluxRow> results = influxDBRepository.query(query);
|
List<InfluxDBRepository.InfluxRow> results = influxDBRepository.query(query);
|
||||||
return results.stream().map(this::mapRow2HistoricItem).collect(Collectors.toList());
|
return results.stream().map(this::mapRowToHistoricItem).collect(Collectors.toList());
|
||||||
} else {
|
} else {
|
||||||
logger.debug("query ignored, InfluxDB is not yet connected");
|
logger.debug("Query for persisted data ignored, InfluxDB is not connected");
|
||||||
return List.of();
|
return List.of();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private HistoricItem mapRow2HistoricItem(InfluxRow row) {
|
private HistoricItem mapRowToHistoricItem(InfluxDBRepository.InfluxRow row) {
|
||||||
State state = InfluxDBStateConvertUtils.objectToState(row.getValue(), row.getItemName(), itemRegistry);
|
State state = InfluxDBStateConvertUtils.objectToState(row.value(), row.itemName(), itemRegistry);
|
||||||
return new InfluxDBHistoricItem(row.getItemName(), state,
|
return new InfluxDBHistoricItem(row.itemName(), state,
|
||||||
ZonedDateTime.ofInstant(row.getTime(), ZoneId.systemDefault()));
|
ZonedDateTime.ofInstant(row.time(), ZoneId.systemDefault()));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -257,14 +209,11 @@ public class InfluxDBPersistenceService implements QueryablePersistenceService {
|
|||||||
* @return true if connected
|
* @return true if connected
|
||||||
*/
|
*/
|
||||||
private boolean checkConnection() {
|
private boolean checkConnection() {
|
||||||
if (influxDBRepository == null) {
|
if (influxDBRepository.isConnected()) {
|
||||||
return false;
|
|
||||||
} else if (influxDBRepository.isConnected()) {
|
|
||||||
return true;
|
return true;
|
||||||
} else if (tryReconnection) {
|
} else if (tryReconnection) {
|
||||||
logger.debug("Connection lost, trying re-connection");
|
logger.debug("Connection lost, trying re-connection");
|
||||||
influxDBRepository.connect();
|
return influxDBRepository.connect();
|
||||||
return influxDBRepository.isConnected();
|
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -32,21 +32,13 @@ public interface FilterCriteriaQueryCreator {
|
|||||||
String createQuery(FilterCriteria criteria, String retentionPolicy);
|
String createQuery(FilterCriteria criteria, String retentionPolicy);
|
||||||
|
|
||||||
default String getOperationSymbol(FilterCriteria.Operator operator, InfluxDBVersion version) {
|
default String getOperationSymbol(FilterCriteria.Operator operator, InfluxDBVersion version) {
|
||||||
switch (operator) {
|
return switch (operator) {
|
||||||
case EQ:
|
case EQ -> "=";
|
||||||
return "=";
|
case LT -> "<";
|
||||||
case LT:
|
case LTE -> "<=";
|
||||||
return "<";
|
case GT -> ">";
|
||||||
case LTE:
|
case GTE -> ">=";
|
||||||
return "<=";
|
case NEQ -> version == InfluxDBVersion.V1 ? "<>" : "!=";
|
||||||
case GT:
|
};
|
||||||
return ">";
|
|
||||||
case GTE:
|
|
||||||
return ">=";
|
|
||||||
case NEQ:
|
|
||||||
return version == InfluxDBVersion.V1 ? "<>" : "!=";
|
|
||||||
default:
|
|
||||||
throw new UnnexpectedConditionException("Not expected operator " + operator);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -12,13 +12,13 @@
|
|||||||
*/
|
*/
|
||||||
package org.openhab.persistence.influxdb.internal;
|
package org.openhab.persistence.influxdb.internal;
|
||||||
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.StringJoiner;
|
import java.util.StringJoiner;
|
||||||
|
|
||||||
import org.eclipse.jdt.annotation.NonNullByDefault;
|
import org.eclipse.jdt.annotation.NonNullByDefault;
|
||||||
import org.eclipse.jdt.annotation.Nullable;
|
import org.eclipse.jdt.annotation.Nullable;
|
||||||
|
import org.openhab.core.config.core.ConfigParser;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
@ -40,7 +40,6 @@ public class InfluxDBConfiguration {
|
|||||||
public static final String ADD_CATEGORY_TAG_PARAM = "addCategoryTag";
|
public static final String ADD_CATEGORY_TAG_PARAM = "addCategoryTag";
|
||||||
public static final String ADD_LABEL_TAG_PARAM = "addLabelTag";
|
public static final String ADD_LABEL_TAG_PARAM = "addLabelTag";
|
||||||
public static final String ADD_TYPE_TAG_PARAM = "addTypeTag";
|
public static final String ADD_TYPE_TAG_PARAM = "addTypeTag";
|
||||||
public static InfluxDBConfiguration NO_CONFIGURATION = new InfluxDBConfiguration(Collections.emptyMap());
|
|
||||||
private final Logger logger = LoggerFactory.getLogger(InfluxDBConfiguration.class);
|
private final Logger logger = LoggerFactory.getLogger(InfluxDBConfiguration.class);
|
||||||
private final String url;
|
private final String url;
|
||||||
private final String user;
|
private final String user;
|
||||||
@ -49,36 +48,23 @@ public class InfluxDBConfiguration {
|
|||||||
private final String databaseName;
|
private final String databaseName;
|
||||||
private final String retentionPolicy;
|
private final String retentionPolicy;
|
||||||
private final InfluxDBVersion version;
|
private final InfluxDBVersion version;
|
||||||
|
|
||||||
private final boolean replaceUnderscore;
|
private final boolean replaceUnderscore;
|
||||||
private final boolean addCategoryTag;
|
private final boolean addCategoryTag;
|
||||||
private final boolean addTypeTag;
|
private final boolean addTypeTag;
|
||||||
private final boolean addLabelTag;
|
private final boolean addLabelTag;
|
||||||
|
|
||||||
public InfluxDBConfiguration(Map<String, Object> config) {
|
public InfluxDBConfiguration(Map<String, Object> config) {
|
||||||
url = (String) config.getOrDefault(URL_PARAM, "http://127.0.0.1:8086");
|
url = ConfigParser.valueAsOrElse(config.get(URL_PARAM), String.class, "http://127.0.0.1:8086");
|
||||||
user = (String) config.getOrDefault(USER_PARAM, "openhab");
|
user = ConfigParser.valueAsOrElse(config.get(USER_PARAM), String.class, "openhab");
|
||||||
password = (String) config.getOrDefault(PASSWORD_PARAM, "");
|
password = ConfigParser.valueAsOrElse(config.get(PASSWORD_PARAM), String.class, "");
|
||||||
token = (String) config.getOrDefault(TOKEN_PARAM, "");
|
token = ConfigParser.valueAsOrElse(config.get(TOKEN_PARAM), String.class, "");
|
||||||
databaseName = (String) config.getOrDefault(DATABASE_PARAM, "openhab");
|
databaseName = ConfigParser.valueAsOrElse(config.get(DATABASE_PARAM), String.class, "openhab");
|
||||||
retentionPolicy = (String) config.getOrDefault(RETENTION_POLICY_PARAM, "autogen");
|
retentionPolicy = ConfigParser.valueAsOrElse(config.get(RETENTION_POLICY_PARAM), String.class, "autogen");
|
||||||
version = parseInfluxVersion((String) config.getOrDefault(VERSION_PARAM, InfluxDBVersion.V1.name()));
|
version = parseInfluxVersion((String) config.getOrDefault(VERSION_PARAM, InfluxDBVersion.V1.name()));
|
||||||
|
replaceUnderscore = ConfigParser.valueAsOrElse(config.get(REPLACE_UNDERSCORE_PARAM), Boolean.class, false);
|
||||||
replaceUnderscore = getConfigBooleanValue(config, REPLACE_UNDERSCORE_PARAM, false);
|
addCategoryTag = ConfigParser.valueAsOrElse(config.get(ADD_CATEGORY_TAG_PARAM), Boolean.class, false);
|
||||||
addCategoryTag = getConfigBooleanValue(config, ADD_CATEGORY_TAG_PARAM, false);
|
addLabelTag = ConfigParser.valueAsOrElse(config.get(ADD_LABEL_TAG_PARAM), Boolean.class, false);
|
||||||
addLabelTag = getConfigBooleanValue(config, ADD_LABEL_TAG_PARAM, false);
|
addTypeTag = ConfigParser.valueAsOrElse(config.get(ADD_TYPE_TAG_PARAM), Boolean.class, false);
|
||||||
addTypeTag = getConfigBooleanValue(config, ADD_TYPE_TAG_PARAM, false);
|
|
||||||
}
|
|
||||||
|
|
||||||
private static boolean getConfigBooleanValue(Map<String, Object> config, String key, boolean defaultValue) {
|
|
||||||
Object object = config.get(key);
|
|
||||||
if (object instanceof Boolean) {
|
|
||||||
return (Boolean) object;
|
|
||||||
} else if (object instanceof String) {
|
|
||||||
return "true".equalsIgnoreCase((String) object);
|
|
||||||
} else {
|
|
||||||
return defaultValue;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private InfluxDBVersion parseInfluxVersion(@Nullable String value) {
|
private InfluxDBVersion parseInfluxVersion(@Nullable String value) {
|
||||||
@ -171,19 +157,10 @@ public class InfluxDBConfiguration {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
String sb = "InfluxDBConfiguration{" + "url='" + url + '\'' + ", user='" + user + '\'' + ", password='"
|
return "InfluxDBConfiguration{url='" + url + "', user='" + user + "', password='" + password.length()
|
||||||
+ password.length() + " chars" + '\'' + ", token='" + token.length() + " chars" + '\''
|
+ " chars', token='" + token.length() + " chars', databaseName='" + databaseName
|
||||||
+ ", databaseName='" + databaseName + '\'' + ", retentionPolicy='" + retentionPolicy + '\''
|
+ "', retentionPolicy='" + retentionPolicy + "', version=" + version + ", replaceUnderscore="
|
||||||
+ ", version=" + version + ", replaceUnderscore=" + replaceUnderscore + ", addCategoryTag="
|
+ replaceUnderscore + ", addCategoryTag=" + addCategoryTag + ", addTypeTag=" + addTypeTag
|
||||||
+ addCategoryTag + ", addTypeTag=" + addTypeTag + ", addLabelTag=" + addLabelTag + '}';
|
+ ", addLabelTag=" + addLabelTag + '}';
|
||||||
return sb;
|
|
||||||
}
|
|
||||||
|
|
||||||
public int getTokenLength() {
|
|
||||||
return token.length();
|
|
||||||
}
|
|
||||||
|
|
||||||
public char[] getTokenAsCharArray() {
|
|
||||||
return token.toCharArray();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -18,7 +18,6 @@ import java.time.ZonedDateTime;
|
|||||||
import org.eclipse.jdt.annotation.NonNullByDefault;
|
import org.eclipse.jdt.annotation.NonNullByDefault;
|
||||||
import org.openhab.core.persistence.HistoricItem;
|
import org.openhab.core.persistence.HistoricItem;
|
||||||
import org.openhab.core.types.State;
|
import org.openhab.core.types.State;
|
||||||
import org.openhab.core.types.UnDefType;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Java bean used to return items queries results from InfluxDB.
|
* Java bean used to return items queries results from InfluxDB.
|
||||||
@ -30,8 +29,8 @@ import org.openhab.core.types.UnDefType;
|
|||||||
public class InfluxDBHistoricItem implements HistoricItem {
|
public class InfluxDBHistoricItem implements HistoricItem {
|
||||||
|
|
||||||
private String name = "";
|
private String name = "";
|
||||||
private State state = UnDefType.NULL;
|
private final State state;
|
||||||
private ZonedDateTime timestamp;
|
private final ZonedDateTime timestamp;
|
||||||
|
|
||||||
public InfluxDBHistoricItem(String name, State state, ZonedDateTime timestamp) {
|
public InfluxDBHistoricItem(String name, State state, ZonedDateTime timestamp) {
|
||||||
this.name = name;
|
this.name = name;
|
||||||
@ -53,19 +52,11 @@ public class InfluxDBHistoricItem implements HistoricItem {
|
|||||||
return state;
|
return state;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setState(State state) {
|
|
||||||
this.state = state;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ZonedDateTime getTimestamp() {
|
public ZonedDateTime getTimestamp() {
|
||||||
return timestamp;
|
return timestamp;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setTimestamp(ZonedDateTime timestamp) {
|
|
||||||
this.timestamp = timestamp;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return DateFormat.getDateTimeInstance().format(timestamp) + ": " + name + " -> " + state.toString();
|
return DateFormat.getDateTimeInstance().format(timestamp) + ": " + name + " -> " + state.toString();
|
||||||
|
@ -0,0 +1,70 @@
|
|||||||
|
/**
|
||||||
|
* Copyright (c) 2010-2023 Contributors to the openHAB project
|
||||||
|
*
|
||||||
|
* See the NOTICE file(s) distributed with this work for additional
|
||||||
|
* information.
|
||||||
|
*
|
||||||
|
* This program and the accompanying materials are made available under the
|
||||||
|
* terms of the Eclipse Public License 2.0 which is available at
|
||||||
|
* http://www.eclipse.org/legal/epl-2.0
|
||||||
|
*
|
||||||
|
* SPDX-License-Identifier: EPL-2.0
|
||||||
|
*/
|
||||||
|
package org.openhab.persistence.influxdb.internal;
|
||||||
|
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
|
import org.eclipse.jdt.annotation.NonNullByDefault;
|
||||||
|
import org.openhab.core.items.Metadata;
|
||||||
|
import org.openhab.core.items.MetadataKey;
|
||||||
|
import org.openhab.core.items.MetadataRegistry;
|
||||||
|
import org.openhab.persistence.influxdb.InfluxDBPersistenceService;
|
||||||
|
import org.osgi.service.component.annotations.Activate;
|
||||||
|
import org.osgi.service.component.annotations.Component;
|
||||||
|
import org.osgi.service.component.annotations.Reference;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Utility service for using item metadata in InfluxDB
|
||||||
|
*
|
||||||
|
* @author Jan N. Klug - Initial contribution
|
||||||
|
*/
|
||||||
|
@NonNullByDefault
|
||||||
|
@Component(service = InfluxDBMetadataService.class)
|
||||||
|
public class InfluxDBMetadataService {
|
||||||
|
private final MetadataRegistry metadataRegistry;
|
||||||
|
|
||||||
|
@Activate
|
||||||
|
public InfluxDBMetadataService(@Reference MetadataRegistry metadataRegistry) {
|
||||||
|
this.metadataRegistry = metadataRegistry;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* get the measurement name from the item metadata or return the provided default
|
||||||
|
*
|
||||||
|
* @param itemName the item name
|
||||||
|
* @param defaultName the default measurement name (
|
||||||
|
* @return the metadata measurement name if present, defaultName otherwise
|
||||||
|
*/
|
||||||
|
public String getMeasurementNameOrDefault(String itemName, String defaultName) {
|
||||||
|
Optional<Metadata> metadata = getMetaData(itemName);
|
||||||
|
if (metadata.isPresent()) {
|
||||||
|
String metaName = metadata.get().getValue();
|
||||||
|
if (!metaName.isBlank()) {
|
||||||
|
return metaName;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return defaultName;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* get an Optional of the metadata for an item
|
||||||
|
*
|
||||||
|
* @param itemName the item name
|
||||||
|
* @return Optional with the metadata (may be empty)
|
||||||
|
*/
|
||||||
|
public Optional<Metadata> getMetaData(String itemName) {
|
||||||
|
MetadataKey key = new MetadataKey(InfluxDBPersistenceService.SERVICE_NAME, itemName);
|
||||||
|
return Optional.ofNullable(metadataRegistry.get(key));
|
||||||
|
}
|
||||||
|
}
|
@ -1,51 +0,0 @@
|
|||||||
/**
|
|
||||||
* Copyright (c) 2010-2023 Contributors to the openHAB project
|
|
||||||
*
|
|
||||||
* See the NOTICE file(s) distributed with this work for additional
|
|
||||||
* information.
|
|
||||||
*
|
|
||||||
* This program and the accompanying materials are made available under the
|
|
||||||
* terms of the Eclipse Public License 2.0 which is available at
|
|
||||||
* http://www.eclipse.org/legal/epl-2.0
|
|
||||||
*
|
|
||||||
* SPDX-License-Identifier: EPL-2.0
|
|
||||||
*/
|
|
||||||
package org.openhab.persistence.influxdb.internal;
|
|
||||||
|
|
||||||
import org.eclipse.jdt.annotation.NonNullByDefault;
|
|
||||||
import org.eclipse.jdt.annotation.Nullable;
|
|
||||||
import org.openhab.core.items.Metadata;
|
|
||||||
import org.openhab.core.items.MetadataKey;
|
|
||||||
import org.openhab.core.items.MetadataRegistry;
|
|
||||||
import org.openhab.persistence.influxdb.InfluxDBPersistenceService;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Logic to use items metadata from an openHAB {@link Item}
|
|
||||||
*
|
|
||||||
* @author Johannes Ott - Initial contribution
|
|
||||||
*/
|
|
||||||
@NonNullByDefault
|
|
||||||
public class InfluxDBMetadataUtils {
|
|
||||||
|
|
||||||
private InfluxDBMetadataUtils() {
|
|
||||||
}
|
|
||||||
|
|
||||||
public static String calculateMeasurementNameFromMetadataIfPresent(
|
|
||||||
final @Nullable MetadataRegistry currentMetadataRegistry, String name, @Nullable String itemName) {
|
|
||||||
|
|
||||||
if (itemName == null || currentMetadataRegistry == null) {
|
|
||||||
return name;
|
|
||||||
}
|
|
||||||
|
|
||||||
MetadataKey key = new MetadataKey(InfluxDBPersistenceService.SERVICE_NAME, itemName);
|
|
||||||
Metadata metadata = currentMetadataRegistry.get(key);
|
|
||||||
if (metadata != null) {
|
|
||||||
String metaName = metadata.getValue();
|
|
||||||
if (!metaName.isBlank()) {
|
|
||||||
name = metaName;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return name;
|
|
||||||
}
|
|
||||||
}
|
|
@ -13,6 +13,7 @@
|
|||||||
package org.openhab.persistence.influxdb.internal;
|
package org.openhab.persistence.influxdb.internal;
|
||||||
|
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
import org.eclipse.jdt.annotation.NonNullByDefault;
|
import org.eclipse.jdt.annotation.NonNullByDefault;
|
||||||
import org.eclipse.jdt.annotation.Nullable;
|
import org.eclipse.jdt.annotation.Nullable;
|
||||||
@ -28,9 +29,9 @@ public class InfluxDBPersistentItemInfo implements PersistenceItemInfo {
|
|||||||
private final String name;
|
private final String name;
|
||||||
private final Integer count;
|
private final Integer count;
|
||||||
|
|
||||||
public InfluxDBPersistentItemInfo(String name, Integer count) {
|
public InfluxDBPersistentItemInfo(Map.Entry<String, Integer> itemInfo) {
|
||||||
this.name = name;
|
this.name = itemInfo.getKey();
|
||||||
this.count = count;
|
this.count = itemInfo.getValue();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -12,6 +12,7 @@
|
|||||||
*/
|
*/
|
||||||
package org.openhab.persistence.influxdb.internal;
|
package org.openhab.persistence.influxdb.internal;
|
||||||
|
|
||||||
|
import java.time.Instant;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
@ -34,7 +35,7 @@ public interface InfluxDBRepository {
|
|||||||
/**
|
/**
|
||||||
* Connect to InfluxDB server
|
* Connect to InfluxDB server
|
||||||
*
|
*
|
||||||
* @return True if successful, otherwise false
|
* @return <code>true</code> if successful, otherwise <code>false</code>
|
||||||
*/
|
*/
|
||||||
boolean connect();
|
boolean connect();
|
||||||
|
|
||||||
@ -46,12 +47,12 @@ public interface InfluxDBRepository {
|
|||||||
/**
|
/**
|
||||||
* Check if connection is currently ready
|
* Check if connection is currently ready
|
||||||
*
|
*
|
||||||
* @return True if its ready, otherwise false
|
* @return True if it's ready, otherwise false
|
||||||
*/
|
*/
|
||||||
boolean checkConnectionStatus();
|
boolean checkConnectionStatus();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return all stored item names with it's count of stored points
|
* Return all stored item names with its count of stored points
|
||||||
*
|
*
|
||||||
* @return Map with <ItemName,ItemCount> entries
|
* @return Map with <ItemName,ItemCount> entries
|
||||||
*/
|
*/
|
||||||
@ -62,6 +63,7 @@ public interface InfluxDBRepository {
|
|||||||
*
|
*
|
||||||
* @param query Query
|
* @param query Query
|
||||||
* @return Query results
|
* @return Query results
|
||||||
|
*
|
||||||
*/
|
*/
|
||||||
List<InfluxRow> query(String query);
|
List<InfluxRow> query(String query);
|
||||||
|
|
||||||
@ -69,6 +71,17 @@ public interface InfluxDBRepository {
|
|||||||
* Write point to database
|
* Write point to database
|
||||||
*
|
*
|
||||||
* @param influxPoint Point to write
|
* @param influxPoint Point to write
|
||||||
|
* @throws UnexpectedConditionException when an error occurs
|
||||||
*/
|
*/
|
||||||
void write(InfluxPoint influxPoint);
|
void write(InfluxPoint influxPoint) throws UnexpectedConditionException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* create a query creator on this repository
|
||||||
|
*
|
||||||
|
* @return the query creator for this repository
|
||||||
|
*/
|
||||||
|
FilterCriteriaQueryCreator createQueryCreator();
|
||||||
|
|
||||||
|
record InfluxRow(Instant time, String itemName, Object value) {
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -53,7 +53,7 @@ import org.slf4j.LoggerFactory;
|
|||||||
public class InfluxDBStateConvertUtils {
|
public class InfluxDBStateConvertUtils {
|
||||||
static final Number DIGITAL_VALUE_OFF = 0; // Visible for testing
|
static final Number DIGITAL_VALUE_OFF = 0; // Visible for testing
|
||||||
static final Number DIGITAL_VALUE_ON = 1; // Visible for testing
|
static final Number DIGITAL_VALUE_ON = 1; // Visible for testing
|
||||||
private static Logger logger = LoggerFactory.getLogger(InfluxDBStateConvertUtils.class);
|
private static final Logger LOGGER = LoggerFactory.getLogger(InfluxDBStateConvertUtils.class);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Converts {@link State} to objects fitting into influxdb values.
|
* Converts {@link State} to objects fitting into influxdb values.
|
||||||
@ -67,7 +67,7 @@ public class InfluxDBStateConvertUtils {
|
|||||||
if (state instanceof HSBType) {
|
if (state instanceof HSBType) {
|
||||||
value = state.toString();
|
value = state.toString();
|
||||||
} else if (state instanceof PointType) {
|
} else if (state instanceof PointType) {
|
||||||
value = point2String((PointType) state);
|
value = state.toString();
|
||||||
} else if (state instanceof DecimalType) {
|
} else if (state instanceof DecimalType) {
|
||||||
value = ((DecimalType) state).toBigDecimal();
|
value = ((DecimalType) state).toBigDecimal();
|
||||||
} else if (state instanceof QuantityType<?>) {
|
} else if (state instanceof QuantityType<?>) {
|
||||||
@ -93,22 +93,15 @@ public class InfluxDBStateConvertUtils {
|
|||||||
* @return the state of the item represented by the itemName parameter, else the string value of
|
* @return the state of the item represented by the itemName parameter, else the string value of
|
||||||
* the Object parameter
|
* the Object parameter
|
||||||
*/
|
*/
|
||||||
public static State objectToState(@Nullable Object value, String itemName, @Nullable ItemRegistry itemRegistry) {
|
public static State objectToState(Object value, String itemName, ItemRegistry itemRegistry) {
|
||||||
State state = null;
|
try {
|
||||||
if (itemRegistry != null) {
|
Item item = itemRegistry.getItem(itemName);
|
||||||
try {
|
return objectToState(value, item);
|
||||||
Item item = itemRegistry.getItem(itemName);
|
} catch (ItemNotFoundException e) {
|
||||||
state = objectToState(value, item);
|
LOGGER.info("Could not find item '{}' in registry", itemName);
|
||||||
} catch (ItemNotFoundException e) {
|
|
||||||
logger.info("Could not find item '{}' in registry", itemName);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (state == null) {
|
return new StringType(String.valueOf(value));
|
||||||
state = new StringType(String.valueOf(value));
|
|
||||||
}
|
|
||||||
|
|
||||||
return state;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static State objectToState(@Nullable Object value, Item itemToSetState) {
|
public static State objectToState(@Nullable Object value, Item itemToSetState) {
|
||||||
@ -128,7 +121,7 @@ public class InfluxDBStateConvertUtils {
|
|||||||
} else if (item instanceof DimmerItem) {
|
} else if (item instanceof DimmerItem) {
|
||||||
return new PercentType(valueStr);
|
return new PercentType(valueStr);
|
||||||
} else if (item instanceof SwitchItem) {
|
} else if (item instanceof SwitchItem) {
|
||||||
return toBoolean(valueStr) ? OnOffType.ON : OnOffType.OFF;
|
return OnOffType.from(toBoolean(valueStr));
|
||||||
} else if (item instanceof ContactItem) {
|
} else if (item instanceof ContactItem) {
|
||||||
return toBoolean(valueStr) ? OpenClosedType.OPEN : OpenClosedType.CLOSED;
|
return toBoolean(valueStr) ? OpenClosedType.OPEN : OpenClosedType.CLOSED;
|
||||||
} else if (item instanceof RollershutterItem) {
|
} else if (item instanceof RollershutterItem) {
|
||||||
@ -149,22 +142,10 @@ public class InfluxDBStateConvertUtils {
|
|||||||
if ("1".equals(object) || "1.0".equals(object)) {
|
if ("1".equals(object) || "1.0".equals(object)) {
|
||||||
return true;
|
return true;
|
||||||
} else {
|
} else {
|
||||||
return Boolean.valueOf(String.valueOf(object));
|
return Boolean.parseBoolean(String.valueOf(object));
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static String point2String(PointType point) {
|
|
||||||
StringBuilder buf = new StringBuilder();
|
|
||||||
buf.append(point.getLatitude().toString());
|
|
||||||
buf.append(",");
|
|
||||||
buf.append(point.getLongitude().toString());
|
|
||||||
if (!point.getAltitude().equals(DecimalType.ZERO)) {
|
|
||||||
buf.append(",");
|
|
||||||
buf.append(point.getAltitude().toString());
|
|
||||||
}
|
|
||||||
return buf.toString(); // latitude, longitude, altitude
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -27,10 +27,10 @@ import org.eclipse.jdt.annotation.NonNullByDefault;
|
|||||||
*/
|
*/
|
||||||
@NonNullByDefault({ DefaultLocation.PARAMETER })
|
@NonNullByDefault({ DefaultLocation.PARAMETER })
|
||||||
public class InfluxPoint {
|
public class InfluxPoint {
|
||||||
private String measurementName;
|
private final String measurementName;
|
||||||
private Instant time;
|
private final Instant time;
|
||||||
private Object value;
|
private final Object value;
|
||||||
private Map<String, String> tags;
|
private final Map<String, String> tags;
|
||||||
|
|
||||||
private InfluxPoint(Builder builder) {
|
private InfluxPoint(Builder builder) {
|
||||||
measurementName = builder.measurementName;
|
measurementName = builder.measurementName;
|
||||||
@ -60,10 +60,10 @@ public class InfluxPoint {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public static final class Builder {
|
public static final class Builder {
|
||||||
private String measurementName;
|
private final String measurementName;
|
||||||
private Instant time;
|
private Instant time;
|
||||||
private Object value;
|
private Object value;
|
||||||
private Map<String, String> tags = new HashMap<>();
|
private final Map<String, String> tags = new HashMap<>();
|
||||||
|
|
||||||
private Builder(String measurementName) {
|
private Builder(String measurementName) {
|
||||||
this.measurementName = measurementName;
|
this.measurementName = measurementName;
|
||||||
@ -79,8 +79,8 @@ public class InfluxPoint {
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Builder withTag(String name, String value) {
|
public Builder withTag(String name, Object value) {
|
||||||
tags.put(name, value);
|
tags.put(name, value.toString());
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,48 +0,0 @@
|
|||||||
/**
|
|
||||||
* Copyright (c) 2010-2023 Contributors to the openHAB project
|
|
||||||
*
|
|
||||||
* See the NOTICE file(s) distributed with this work for additional
|
|
||||||
* information.
|
|
||||||
*
|
|
||||||
* This program and the accompanying materials are made available under the
|
|
||||||
* terms of the Eclipse Public License 2.0 which is available at
|
|
||||||
* http://www.eclipse.org/legal/epl-2.0
|
|
||||||
*
|
|
||||||
* SPDX-License-Identifier: EPL-2.0
|
|
||||||
*/
|
|
||||||
package org.openhab.persistence.influxdb.internal;
|
|
||||||
|
|
||||||
import java.time.Instant;
|
|
||||||
|
|
||||||
import org.eclipse.jdt.annotation.NonNullByDefault;
|
|
||||||
import org.eclipse.jdt.annotation.Nullable;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Row data returned from database query
|
|
||||||
*
|
|
||||||
* @author Joan Pujol Espinar - Initial contribution
|
|
||||||
*/
|
|
||||||
@NonNullByDefault
|
|
||||||
public class InfluxRow {
|
|
||||||
private final String itemName;
|
|
||||||
private final Instant time;
|
|
||||||
private final @Nullable Object value;
|
|
||||||
|
|
||||||
public InfluxRow(Instant time, String itemName, @Nullable Object value) {
|
|
||||||
this.time = time;
|
|
||||||
this.itemName = itemName;
|
|
||||||
this.value = value;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Instant getTime() {
|
|
||||||
return time;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getItemName() {
|
|
||||||
return itemName;
|
|
||||||
}
|
|
||||||
|
|
||||||
public @Nullable Object getValue() {
|
|
||||||
return value;
|
|
||||||
}
|
|
||||||
}
|
|
@ -12,20 +12,20 @@
|
|||||||
*/
|
*/
|
||||||
package org.openhab.persistence.influxdb.internal;
|
package org.openhab.persistence.influxdb.internal;
|
||||||
|
|
||||||
import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.*;
|
import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.TAG_CATEGORY_NAME;
|
||||||
|
import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.TAG_ITEM_NAME;
|
||||||
|
import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.TAG_LABEL_NAME;
|
||||||
|
import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.TAG_TYPE_NAME;
|
||||||
|
|
||||||
import java.time.Instant;
|
import java.time.Instant;
|
||||||
|
import java.util.Objects;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
|
||||||
import org.eclipse.jdt.annotation.NonNullByDefault;
|
import org.eclipse.jdt.annotation.NonNullByDefault;
|
||||||
import org.eclipse.jdt.annotation.Nullable;
|
import org.eclipse.jdt.annotation.Nullable;
|
||||||
import org.openhab.core.items.Item;
|
import org.openhab.core.items.Item;
|
||||||
import org.openhab.core.items.Metadata;
|
|
||||||
import org.openhab.core.items.MetadataKey;
|
|
||||||
import org.openhab.core.items.MetadataRegistry;
|
|
||||||
import org.openhab.core.types.State;
|
import org.openhab.core.types.State;
|
||||||
import org.openhab.core.types.UnDefType;
|
import org.openhab.core.types.UnDefType;
|
||||||
import org.openhab.persistence.influxdb.InfluxDBPersistenceService;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Logic to create an InfluxDB {@link InfluxPoint} from an openHAB {@link Item}
|
* Logic to create an InfluxDB {@link InfluxPoint} from an openHAB {@link Item}
|
||||||
@ -35,11 +35,12 @@ import org.openhab.persistence.influxdb.InfluxDBPersistenceService;
|
|||||||
@NonNullByDefault
|
@NonNullByDefault
|
||||||
public class ItemToStorePointCreator {
|
public class ItemToStorePointCreator {
|
||||||
private final InfluxDBConfiguration configuration;
|
private final InfluxDBConfiguration configuration;
|
||||||
private final @Nullable MetadataRegistry metadataRegistry;
|
private final InfluxDBMetadataService influxDBMetadataService;
|
||||||
|
|
||||||
public ItemToStorePointCreator(InfluxDBConfiguration configuration, @Nullable MetadataRegistry metadataRegistry) {
|
public ItemToStorePointCreator(InfluxDBConfiguration configuration,
|
||||||
|
InfluxDBMetadataService influxDBMetadataService) {
|
||||||
this.configuration = configuration;
|
this.configuration = configuration;
|
||||||
this.metadataRegistry = metadataRegistry;
|
this.influxDBMetadataService = influxDBMetadataService;
|
||||||
}
|
}
|
||||||
|
|
||||||
public @Nullable InfluxPoint convert(Item item, @Nullable String storeAlias) {
|
public @Nullable InfluxPoint convert(Item item, @Nullable String storeAlias) {
|
||||||
@ -53,19 +54,17 @@ public class ItemToStorePointCreator {
|
|||||||
|
|
||||||
Object value = InfluxDBStateConvertUtils.stateToObject(state);
|
Object value = InfluxDBStateConvertUtils.stateToObject(state);
|
||||||
|
|
||||||
InfluxPoint.Builder point = InfluxPoint.newBuilder(measurementName).withTime(Instant.now()).withValue(value)
|
InfluxPoint.Builder pointBuilder = InfluxPoint.newBuilder(measurementName).withTime(Instant.now())
|
||||||
.withTag(TAG_ITEM_NAME, itemName);
|
.withValue(value).withTag(TAG_ITEM_NAME, itemName);
|
||||||
|
|
||||||
addPointTags(item, point);
|
addPointTags(item, pointBuilder);
|
||||||
|
|
||||||
return point.build();
|
return pointBuilder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
private String calculateMeasurementName(Item item, @Nullable String storeAlias) {
|
private String calculateMeasurementName(Item item, @Nullable String storeAlias) {
|
||||||
String name = storeAlias != null && !storeAlias.isBlank() ? storeAlias : item.getName();
|
String name = storeAlias != null && !storeAlias.isBlank() ? storeAlias : item.getName();
|
||||||
|
name = influxDBMetadataService.getMeasurementNameOrDefault(item.getName(), name);
|
||||||
name = InfluxDBMetadataUtils.calculateMeasurementNameFromMetadataIfPresent(metadataRegistry, name,
|
|
||||||
item.getName());
|
|
||||||
|
|
||||||
if (configuration.isReplaceUnderscore()) {
|
if (configuration.isReplaceUnderscore()) {
|
||||||
name = name.replace('_', '.');
|
name = name.replace('_', '.');
|
||||||
@ -75,19 +74,9 @@ public class ItemToStorePointCreator {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private State getItemState(Item item) {
|
private State getItemState(Item item) {
|
||||||
final State state;
|
return calculateDesiredTypeConversionToStore(item)
|
||||||
final Optional<Class<? extends State>> desiredConversion = calculateDesiredTypeConversionToStore(item);
|
.map(desiredClass -> Objects.requireNonNullElseGet(item.getStateAs(desiredClass), item::getState))
|
||||||
if (desiredConversion.isPresent()) {
|
.orElseGet(item::getState);
|
||||||
State convertedState = item.getStateAs(desiredConversion.get());
|
|
||||||
if (convertedState != null) {
|
|
||||||
state = convertedState;
|
|
||||||
} else {
|
|
||||||
state = item.getState();
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
state = item.getState();
|
|
||||||
}
|
|
||||||
return state;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private Optional<Class<? extends State>> calculateDesiredTypeConversionToStore(Item item) {
|
private Optional<Class<? extends State>> calculateDesiredTypeConversionToStore(Item item) {
|
||||||
@ -95,36 +84,22 @@ public class ItemToStorePointCreator {
|
|||||||
.findFirst().map(commandType -> commandType.asSubclass(State.class));
|
.findFirst().map(commandType -> commandType.asSubclass(State.class));
|
||||||
}
|
}
|
||||||
|
|
||||||
private void addPointTags(Item item, InfluxPoint.Builder point) {
|
private void addPointTags(Item item, InfluxPoint.Builder pointBuilder) {
|
||||||
if (configuration.isAddCategoryTag()) {
|
if (configuration.isAddCategoryTag()) {
|
||||||
String categoryName = item.getCategory();
|
String categoryName = Objects.requireNonNullElse(item.getCategory(), "n/a");
|
||||||
if (categoryName == null) {
|
pointBuilder.withTag(TAG_CATEGORY_NAME, categoryName);
|
||||||
categoryName = "n/a";
|
|
||||||
}
|
|
||||||
point.withTag(TAG_CATEGORY_NAME, categoryName);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (configuration.isAddTypeTag()) {
|
if (configuration.isAddTypeTag()) {
|
||||||
point.withTag(TAG_TYPE_NAME, item.getType());
|
pointBuilder.withTag(TAG_TYPE_NAME, item.getType());
|
||||||
}
|
}
|
||||||
|
|
||||||
if (configuration.isAddLabelTag()) {
|
if (configuration.isAddLabelTag()) {
|
||||||
String labelName = item.getLabel();
|
String labelName = Objects.requireNonNullElse(item.getLabel(), "n/a");
|
||||||
if (labelName == null) {
|
pointBuilder.withTag(TAG_LABEL_NAME, labelName);
|
||||||
labelName = "n/a";
|
|
||||||
}
|
|
||||||
point.withTag(TAG_LABEL_NAME, labelName);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
final MetadataRegistry currentMetadataRegistry = metadataRegistry;
|
influxDBMetadataService.getMetaData(item.getName())
|
||||||
if (currentMetadataRegistry != null) {
|
.ifPresent(metadata -> metadata.getConfiguration().forEach(pointBuilder::withTag));
|
||||||
MetadataKey key = new MetadataKey(InfluxDBPersistenceService.SERVICE_NAME, item.getName());
|
|
||||||
Metadata metadata = currentMetadataRegistry.get(key);
|
|
||||||
if (metadata != null) {
|
|
||||||
metadata.getConfiguration().forEach((tagName, tagValue) -> {
|
|
||||||
point.withTag(tagName, tagValue.toString());
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,54 +0,0 @@
|
|||||||
/**
|
|
||||||
* Copyright (c) 2010-2023 Contributors to the openHAB project
|
|
||||||
*
|
|
||||||
* See the NOTICE file(s) distributed with this work for additional
|
|
||||||
* information.
|
|
||||||
*
|
|
||||||
* This program and the accompanying materials are made available under the
|
|
||||||
* terms of the Eclipse Public License 2.0 which is available at
|
|
||||||
* http://www.eclipse.org/legal/epl-2.0
|
|
||||||
*
|
|
||||||
* SPDX-License-Identifier: EPL-2.0
|
|
||||||
*/
|
|
||||||
package org.openhab.persistence.influxdb.internal;
|
|
||||||
|
|
||||||
import org.eclipse.jdt.annotation.NonNullByDefault;
|
|
||||||
import org.openhab.core.items.MetadataRegistry;
|
|
||||||
import org.openhab.persistence.influxdb.internal.influx1.Influx1FilterCriteriaQueryCreatorImpl;
|
|
||||||
import org.openhab.persistence.influxdb.internal.influx1.InfluxDB1RepositoryImpl;
|
|
||||||
import org.openhab.persistence.influxdb.internal.influx2.Influx2FilterCriteriaQueryCreatorImpl;
|
|
||||||
import org.openhab.persistence.influxdb.internal.influx2.InfluxDB2RepositoryImpl;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Factory that returns {@link InfluxDBRepository} and
|
|
||||||
* {@link FilterCriteriaQueryCreator} implementations depending on InfluxDB
|
|
||||||
* version
|
|
||||||
*
|
|
||||||
* @author Joan Pujol Espinar - Initial contribution
|
|
||||||
*/
|
|
||||||
@NonNullByDefault
|
|
||||||
public class RepositoryFactory {
|
|
||||||
|
|
||||||
public static InfluxDBRepository createRepository(InfluxDBConfiguration influxDBConfiguration) {
|
|
||||||
switch (influxDBConfiguration.getVersion()) {
|
|
||||||
case V1:
|
|
||||||
return new InfluxDB1RepositoryImpl(influxDBConfiguration);
|
|
||||||
case V2:
|
|
||||||
return new InfluxDB2RepositoryImpl(influxDBConfiguration);
|
|
||||||
default:
|
|
||||||
throw new UnnexpectedConditionException("Not expected version " + influxDBConfiguration.getVersion());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static FilterCriteriaQueryCreator createQueryCreator(InfluxDBConfiguration influxDBConfiguration,
|
|
||||||
MetadataRegistry metadataRegistry) {
|
|
||||||
switch (influxDBConfiguration.getVersion()) {
|
|
||||||
case V1:
|
|
||||||
return new Influx1FilterCriteriaQueryCreatorImpl(influxDBConfiguration, metadataRegistry);
|
|
||||||
case V2:
|
|
||||||
return new Influx2FilterCriteriaQueryCreatorImpl(influxDBConfiguration, metadataRegistry);
|
|
||||||
default:
|
|
||||||
throw new UnnexpectedConditionException("Not expected version " + influxDBConfiguration.getVersion());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -15,19 +15,15 @@ package org.openhab.persistence.influxdb.internal;
|
|||||||
import org.eclipse.jdt.annotation.NonNullByDefault;
|
import org.eclipse.jdt.annotation.NonNullByDefault;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Throw to indicate an unnexpected condition that should not have happened (a bug)
|
* Throw to indicate an unexpected condition that should not have happened (a bug)
|
||||||
*
|
*
|
||||||
* @author Joan Pujol Espinar - Initial contribution
|
* @author Joan Pujol Espinar - Initial contribution
|
||||||
*/
|
*/
|
||||||
@NonNullByDefault
|
@NonNullByDefault
|
||||||
public class UnnexpectedConditionException extends RuntimeException {
|
public class UnexpectedConditionException extends Exception {
|
||||||
private static final long serialVersionUID = 1128380327167959556L;
|
private static final long serialVersionUID = 1128380327167959556L;
|
||||||
|
|
||||||
public UnnexpectedConditionException(String message) {
|
public UnexpectedConditionException(String message) {
|
||||||
super(message);
|
super(message);
|
||||||
}
|
}
|
||||||
|
|
||||||
public UnnexpectedConditionException(String message, Throwable cause) {
|
|
||||||
super(message, cause);
|
|
||||||
}
|
|
||||||
}
|
}
|
@ -23,11 +23,10 @@ import org.influxdb.querybuilder.BuiltQuery;
|
|||||||
import org.influxdb.querybuilder.Select;
|
import org.influxdb.querybuilder.Select;
|
||||||
import org.influxdb.querybuilder.Where;
|
import org.influxdb.querybuilder.Where;
|
||||||
import org.influxdb.querybuilder.clauses.SimpleClause;
|
import org.influxdb.querybuilder.clauses.SimpleClause;
|
||||||
import org.openhab.core.items.MetadataRegistry;
|
|
||||||
import org.openhab.core.persistence.FilterCriteria;
|
import org.openhab.core.persistence.FilterCriteria;
|
||||||
import org.openhab.persistence.influxdb.internal.FilterCriteriaQueryCreator;
|
import org.openhab.persistence.influxdb.internal.FilterCriteriaQueryCreator;
|
||||||
import org.openhab.persistence.influxdb.internal.InfluxDBConfiguration;
|
import org.openhab.persistence.influxdb.internal.InfluxDBConfiguration;
|
||||||
import org.openhab.persistence.influxdb.internal.InfluxDBMetadataUtils;
|
import org.openhab.persistence.influxdb.internal.InfluxDBMetadataService;
|
||||||
import org.openhab.persistence.influxdb.internal.InfluxDBVersion;
|
import org.openhab.persistence.influxdb.internal.InfluxDBVersion;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -36,24 +35,22 @@ import org.openhab.persistence.influxdb.internal.InfluxDBVersion;
|
|||||||
* @author Joan Pujol Espinar - Initial contribution
|
* @author Joan Pujol Espinar - Initial contribution
|
||||||
*/
|
*/
|
||||||
@NonNullByDefault
|
@NonNullByDefault
|
||||||
public class Influx1FilterCriteriaQueryCreatorImpl implements FilterCriteriaQueryCreator {
|
public class InfluxDB1FilterCriteriaQueryCreatorImpl implements FilterCriteriaQueryCreator {
|
||||||
|
|
||||||
private InfluxDBConfiguration configuration;
|
private final InfluxDBConfiguration configuration;
|
||||||
private MetadataRegistry metadataRegistry;
|
private final InfluxDBMetadataService influxDBMetadataService;
|
||||||
|
|
||||||
public Influx1FilterCriteriaQueryCreatorImpl(InfluxDBConfiguration configuration,
|
public InfluxDB1FilterCriteriaQueryCreatorImpl(InfluxDBConfiguration configuration,
|
||||||
MetadataRegistry metadataRegistry) {
|
InfluxDBMetadataService influxDBMetadataService) {
|
||||||
this.configuration = configuration;
|
this.configuration = configuration;
|
||||||
this.metadataRegistry = metadataRegistry;
|
this.influxDBMetadataService = influxDBMetadataService;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String createQuery(FilterCriteria criteria, String retentionPolicy) {
|
public String createQuery(FilterCriteria criteria, String retentionPolicy) {
|
||||||
final String tableName;
|
|
||||||
final String itemName = criteria.getItemName();
|
final String itemName = criteria.getItemName();
|
||||||
boolean hasCriteriaName = itemName != null;
|
final String tableName = getTableName(itemName);
|
||||||
|
final boolean hasCriteriaName = itemName != null;
|
||||||
tableName = calculateTableName(itemName);
|
|
||||||
|
|
||||||
Select select = select().column("\"" + COLUMN_VALUE_NAME_V1 + "\"::field")
|
Select select = select().column("\"" + COLUMN_VALUE_NAME_V1 + "\"::field")
|
||||||
.column("\"" + TAG_ITEM_NAME + "\"::tag")
|
.column("\"" + TAG_ITEM_NAME + "\"::tag")
|
||||||
@ -62,20 +59,17 @@ public class Influx1FilterCriteriaQueryCreatorImpl implements FilterCriteriaQuer
|
|||||||
Where where = select.where();
|
Where where = select.where();
|
||||||
|
|
||||||
if (itemName != null && !tableName.equals(itemName)) {
|
if (itemName != null && !tableName.equals(itemName)) {
|
||||||
where = where.and(BuiltQuery.QueryBuilder.eq(TAG_ITEM_NAME, itemName));
|
where.and(BuiltQuery.QueryBuilder.eq(TAG_ITEM_NAME, itemName));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (criteria.getBeginDate() != null) {
|
if (criteria.getBeginDate() != null) {
|
||||||
where = where.and(
|
where.and(BuiltQuery.QueryBuilder.gte(COLUMN_TIME_NAME_V1, criteria.getBeginDate().toInstant().toString()));
|
||||||
BuiltQuery.QueryBuilder.gte(COLUMN_TIME_NAME_V1, criteria.getBeginDate().toInstant().toString()));
|
|
||||||
}
|
}
|
||||||
if (criteria.getEndDate() != null) {
|
if (criteria.getEndDate() != null) {
|
||||||
where = where.and(
|
where.and(BuiltQuery.QueryBuilder.lte(COLUMN_TIME_NAME_V1, criteria.getEndDate().toInstant().toString()));
|
||||||
BuiltQuery.QueryBuilder.lte(COLUMN_TIME_NAME_V1, criteria.getEndDate().toInstant().toString()));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (criteria.getState() != null && criteria.getOperator() != null) {
|
if (criteria.getState() != null && criteria.getOperator() != null) {
|
||||||
where = where.and(new SimpleClause(COLUMN_VALUE_NAME_V1,
|
where.and(new SimpleClause(COLUMN_VALUE_NAME_V1,
|
||||||
getOperationSymbol(criteria.getOperator(), InfluxDBVersion.V1),
|
getOperationSymbol(criteria.getOperator(), InfluxDBVersion.V1),
|
||||||
stateToObject(criteria.getState())));
|
stateToObject(criteria.getState())));
|
||||||
}
|
}
|
||||||
@ -88,24 +82,21 @@ public class Influx1FilterCriteriaQueryCreatorImpl implements FilterCriteriaQuer
|
|||||||
|
|
||||||
if (criteria.getPageSize() != Integer.MAX_VALUE) {
|
if (criteria.getPageSize() != Integer.MAX_VALUE) {
|
||||||
if (criteria.getPageNumber() != 0) {
|
if (criteria.getPageNumber() != 0) {
|
||||||
select = select.limit(criteria.getPageSize(), criteria.getPageSize() * criteria.getPageNumber());
|
select = select.limit(criteria.getPageSize(), (long) criteria.getPageSize() * criteria.getPageNumber());
|
||||||
} else {
|
} else {
|
||||||
select = select.limit(criteria.getPageSize());
|
select = select.limit(criteria.getPageSize());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
final Query query = (Query) select;
|
return ((Query) select).getCommand();
|
||||||
return query.getCommand();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private String calculateTableName(@Nullable String itemName) {
|
private String getTableName(@Nullable String itemName) {
|
||||||
if (itemName == null) {
|
if (itemName == null) {
|
||||||
return "/.*/";
|
return "/.*/";
|
||||||
}
|
}
|
||||||
|
|
||||||
String name = itemName;
|
String name = influxDBMetadataService.getMeasurementNameOrDefault(itemName, itemName);
|
||||||
|
|
||||||
name = InfluxDBMetadataUtils.calculateMeasurementNameFromMetadataIfPresent(metadataRegistry, name, itemName);
|
|
||||||
|
|
||||||
if (configuration.isReplaceUnderscore()) {
|
if (configuration.isReplaceUnderscore()) {
|
||||||
name = name.replace('_', '.');
|
name = name.replace('_', '.');
|
@ -23,7 +23,6 @@ import java.util.Collections;
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.Optional;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.eclipse.jdt.annotation.NonNullByDefault;
|
import org.eclipse.jdt.annotation.NonNullByDefault;
|
||||||
@ -34,11 +33,12 @@ import org.influxdb.dto.Point;
|
|||||||
import org.influxdb.dto.Pong;
|
import org.influxdb.dto.Pong;
|
||||||
import org.influxdb.dto.Query;
|
import org.influxdb.dto.Query;
|
||||||
import org.influxdb.dto.QueryResult;
|
import org.influxdb.dto.QueryResult;
|
||||||
|
import org.openhab.persistence.influxdb.internal.FilterCriteriaQueryCreator;
|
||||||
import org.openhab.persistence.influxdb.internal.InfluxDBConfiguration;
|
import org.openhab.persistence.influxdb.internal.InfluxDBConfiguration;
|
||||||
|
import org.openhab.persistence.influxdb.internal.InfluxDBMetadataService;
|
||||||
import org.openhab.persistence.influxdb.internal.InfluxDBRepository;
|
import org.openhab.persistence.influxdb.internal.InfluxDBRepository;
|
||||||
import org.openhab.persistence.influxdb.internal.InfluxPoint;
|
import org.openhab.persistence.influxdb.internal.InfluxPoint;
|
||||||
import org.openhab.persistence.influxdb.internal.InfluxRow;
|
import org.openhab.persistence.influxdb.internal.UnexpectedConditionException;
|
||||||
import org.openhab.persistence.influxdb.internal.UnnexpectedConditionException;
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
@ -53,12 +53,14 @@ import org.slf4j.LoggerFactory;
|
|||||||
@NonNullByDefault
|
@NonNullByDefault
|
||||||
public class InfluxDB1RepositoryImpl implements InfluxDBRepository {
|
public class InfluxDB1RepositoryImpl implements InfluxDBRepository {
|
||||||
private final Logger logger = LoggerFactory.getLogger(InfluxDB1RepositoryImpl.class);
|
private final Logger logger = LoggerFactory.getLogger(InfluxDB1RepositoryImpl.class);
|
||||||
private InfluxDBConfiguration configuration;
|
private final InfluxDBConfiguration configuration;
|
||||||
@Nullable
|
private final InfluxDBMetadataService influxDBMetadataService;
|
||||||
private InfluxDB client;
|
private @Nullable InfluxDB client;
|
||||||
|
|
||||||
public InfluxDB1RepositoryImpl(InfluxDBConfiguration configuration) {
|
public InfluxDB1RepositoryImpl(InfluxDBConfiguration configuration,
|
||||||
|
InfluxDBMetadataService influxDBMetadataService) {
|
||||||
this.configuration = configuration;
|
this.configuration = configuration;
|
||||||
|
this.influxDBMetadataService = influxDBMetadataService;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -79,12 +81,15 @@ public class InfluxDB1RepositoryImpl implements InfluxDBRepository {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void disconnect() {
|
public void disconnect() {
|
||||||
|
final InfluxDB currentClient = client;
|
||||||
|
if (currentClient != null) {
|
||||||
|
currentClient.close();
|
||||||
|
}
|
||||||
this.client = null;
|
this.client = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean checkConnectionStatus() {
|
public boolean checkConnectionStatus() {
|
||||||
boolean dbStatus = false;
|
|
||||||
final InfluxDB currentClient = client;
|
final InfluxDB currentClient = client;
|
||||||
if (currentClient != null) {
|
if (currentClient != null) {
|
||||||
try {
|
try {
|
||||||
@ -92,30 +97,23 @@ public class InfluxDB1RepositoryImpl implements InfluxDBRepository {
|
|||||||
String version = pong.getVersion();
|
String version = pong.getVersion();
|
||||||
// may be check for version >= 0.9
|
// may be check for version >= 0.9
|
||||||
if (version != null && !version.contains("unknown")) {
|
if (version != null && !version.contains("unknown")) {
|
||||||
dbStatus = true;
|
|
||||||
logger.debug("database status is OK, version is {}", version);
|
logger.debug("database status is OK, version is {}", version);
|
||||||
|
return true;
|
||||||
} else {
|
} else {
|
||||||
logger.warn("database ping error, version is: \"{}\" response time was \"{}\"", version,
|
logger.warn("database ping error, version is: \"{}\" response time was \"{}\"", version,
|
||||||
pong.getResponseTime());
|
pong.getResponseTime());
|
||||||
dbStatus = false;
|
|
||||||
}
|
}
|
||||||
} catch (RuntimeException e) {
|
} catch (RuntimeException e) {
|
||||||
dbStatus = false;
|
logger.warn("database error: {}", e.getMessage(), e);
|
||||||
logger.error("database connection failed", e);
|
|
||||||
handleDatabaseException(e);
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
logger.warn("checkConnection: database is not connected");
|
logger.warn("checkConnection: database is not connected");
|
||||||
}
|
}
|
||||||
return dbStatus;
|
return false;
|
||||||
}
|
|
||||||
|
|
||||||
private void handleDatabaseException(Exception e) {
|
|
||||||
logger.warn("database error: {}", e.getMessage(), e);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void write(InfluxPoint point) {
|
public void write(InfluxPoint point) throws UnexpectedConditionException {
|
||||||
final InfluxDB currentClient = this.client;
|
final InfluxDB currentClient = this.client;
|
||||||
if (currentClient != null) {
|
if (currentClient != null) {
|
||||||
Point clientPoint = convertPointToClientFormat(point);
|
Point clientPoint = convertPointToClientFormat(point);
|
||||||
@ -125,26 +123,23 @@ public class InfluxDB1RepositoryImpl implements InfluxDBRepository {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private Point convertPointToClientFormat(InfluxPoint point) {
|
private Point convertPointToClientFormat(InfluxPoint point) throws UnexpectedConditionException {
|
||||||
Point.Builder clientPoint = Point.measurement(point.getMeasurementName()).time(point.getTime().toEpochMilli(),
|
Point.Builder clientPoint = Point.measurement(point.getMeasurementName()).time(point.getTime().toEpochMilli(),
|
||||||
TimeUnit.MILLISECONDS);
|
TimeUnit.MILLISECONDS);
|
||||||
setPointValue(point.getValue(), clientPoint);
|
Object value = point.getValue();
|
||||||
point.getTags().entrySet().forEach(e -> clientPoint.tag(e.getKey(), e.getValue()));
|
|
||||||
return clientPoint.build();
|
|
||||||
}
|
|
||||||
|
|
||||||
private void setPointValue(@Nullable Object value, Point.Builder point) {
|
|
||||||
if (value instanceof String) {
|
if (value instanceof String) {
|
||||||
point.addField(FIELD_VALUE_NAME, (String) value);
|
clientPoint.addField(FIELD_VALUE_NAME, (String) value);
|
||||||
} else if (value instanceof Number) {
|
} else if (value instanceof Number) {
|
||||||
point.addField(FIELD_VALUE_NAME, (Number) value);
|
clientPoint.addField(FIELD_VALUE_NAME, (Number) value);
|
||||||
} else if (value instanceof Boolean) {
|
} else if (value instanceof Boolean) {
|
||||||
point.addField(FIELD_VALUE_NAME, (Boolean) value);
|
clientPoint.addField(FIELD_VALUE_NAME, (Boolean) value);
|
||||||
} else if (value == null) {
|
} else if (value == null) {
|
||||||
point.addField(FIELD_VALUE_NAME, (String) null);
|
clientPoint.addField(FIELD_VALUE_NAME, "null");
|
||||||
} else {
|
} else {
|
||||||
throw new UnnexpectedConditionException("Not expected value type");
|
throw new UnexpectedConditionException("Not expected value type");
|
||||||
}
|
}
|
||||||
|
point.getTags().forEach(clientPoint::tag);
|
||||||
|
return clientPoint.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -153,58 +148,47 @@ public class InfluxDB1RepositoryImpl implements InfluxDBRepository {
|
|||||||
if (currentClient != null) {
|
if (currentClient != null) {
|
||||||
Query parsedQuery = new Query(query, configuration.getDatabaseName());
|
Query parsedQuery = new Query(query, configuration.getDatabaseName());
|
||||||
List<QueryResult.Result> results = currentClient.query(parsedQuery, TimeUnit.MILLISECONDS).getResults();
|
List<QueryResult.Result> results = currentClient.query(parsedQuery, TimeUnit.MILLISECONDS).getResults();
|
||||||
return convertClientResutToRepository(results);
|
return convertClientResultToRepository(results);
|
||||||
} else {
|
} else {
|
||||||
logger.warn("Returning empty list because queryAPI isn't present");
|
logger.warn("Returning empty list because queryAPI isn't present");
|
||||||
return Collections.emptyList();
|
return List.of();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<InfluxRow> convertClientResutToRepository(List<QueryResult.Result> results) {
|
private List<InfluxRow> convertClientResultToRepository(List<QueryResult.Result> results) {
|
||||||
List<InfluxRow> rows = new ArrayList<>();
|
List<InfluxRow> rows = new ArrayList<>();
|
||||||
for (QueryResult.Result result : results) {
|
for (QueryResult.Result result : results) {
|
||||||
List<QueryResult.Series> seriess = result.getSeries();
|
List<QueryResult.Series> allSeries = result.getSeries();
|
||||||
if (result.getError() != null) {
|
if (result.getError() != null) {
|
||||||
logger.warn("{}", result.getError());
|
logger.warn("{}", result.getError());
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (seriess == null) {
|
if (allSeries == null) {
|
||||||
logger.debug("query returned no series");
|
logger.debug("query returned no series");
|
||||||
} else {
|
} else {
|
||||||
for (QueryResult.Series series : seriess) {
|
for (QueryResult.Series series : allSeries) {
|
||||||
logger.trace("series {}", series.toString());
|
logger.trace("series {}", series);
|
||||||
List<List<@Nullable Object>> valuess = series.getValues();
|
String defaultItemName = series.getName();
|
||||||
if (valuess == null) {
|
List<List<Object>> allValues = series.getValues();
|
||||||
|
if (allValues == null) {
|
||||||
logger.debug("query returned no values");
|
logger.debug("query returned no values");
|
||||||
} else {
|
} else {
|
||||||
List<String> columns = series.getColumns();
|
List<String> columns = series.getColumns();
|
||||||
logger.trace("columns {}", columns);
|
logger.trace("columns {}", columns);
|
||||||
if (columns != null) {
|
if (columns != null) {
|
||||||
Integer timestampColumn = null;
|
int timestampColumn = columns.indexOf(COLUMN_TIME_NAME_V1);
|
||||||
Integer valueColumn = null;
|
int valueColumn = columns.indexOf(COLUMN_VALUE_NAME_V1);
|
||||||
Integer itemNameColumn = null;
|
int itemNameColumn = columns.indexOf(TAG_ITEM_NAME);
|
||||||
for (int i = 0; i < columns.size(); i++) {
|
if (valueColumn == -1 || timestampColumn == -1) {
|
||||||
String columnName = columns.get(i);
|
|
||||||
if (columnName.equals(COLUMN_TIME_NAME_V1)) {
|
|
||||||
timestampColumn = i;
|
|
||||||
} else if (columnName.equals(COLUMN_VALUE_NAME_V1)) {
|
|
||||||
valueColumn = i;
|
|
||||||
} else if (columnName.equals(TAG_ITEM_NAME)) {
|
|
||||||
itemNameColumn = i;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (valueColumn == null || timestampColumn == null) {
|
|
||||||
throw new IllegalStateException("missing column");
|
throw new IllegalStateException("missing column");
|
||||||
}
|
}
|
||||||
for (int i = 0; i < valuess.size(); i++) {
|
for (List<Object> valueObject : allValues) {
|
||||||
Double rawTime = (Double) Objects.requireNonNull(valuess.get(i).get(timestampColumn));
|
Double rawTime = (Double) valueObject.get(timestampColumn);
|
||||||
Instant time = Instant.ofEpochMilli(rawTime.longValue());
|
Instant time = Instant.ofEpochMilli(rawTime.longValue());
|
||||||
@Nullable
|
Object value = valueObject.get(valueColumn);
|
||||||
Object value = valuess.get(i).get(valueColumn);
|
String itemName = itemNameColumn == -1 ? defaultItemName
|
||||||
var currentI = i;
|
: Objects.requireNonNullElse((String) valueObject.get(itemNameColumn),
|
||||||
String itemName = Optional.ofNullable(itemNameColumn)
|
defaultItemName);
|
||||||
.flatMap(inc -> Optional.ofNullable((String) valuess.get(currentI).get(inc)))
|
|
||||||
.orElse(series.getName());
|
|
||||||
logger.trace("adding historic item {}: time {} value {}", itemName, time, value);
|
logger.trace("adding historic item {}: time {} value {}", itemName, time, value);
|
||||||
rows.add(new InfluxRow(time, itemName, value));
|
rows.add(new InfluxRow(time, itemName, value));
|
||||||
}
|
}
|
||||||
@ -220,4 +204,9 @@ public class InfluxDB1RepositoryImpl implements InfluxDBRepository {
|
|||||||
public Map<String, Integer> getStoredItemsCount() {
|
public Map<String, Integer> getStoredItemsCount() {
|
||||||
return Collections.emptyMap();
|
return Collections.emptyMap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public FilterCriteriaQueryCreator createQueryCreator() {
|
||||||
|
return new InfluxDB1FilterCriteriaQueryCreatorImpl(configuration, influxDBMetadataService);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -20,11 +20,10 @@ import static org.openhab.persistence.influxdb.internal.InfluxDBStateConvertUtil
|
|||||||
import java.time.temporal.ChronoUnit;
|
import java.time.temporal.ChronoUnit;
|
||||||
|
|
||||||
import org.eclipse.jdt.annotation.NonNullByDefault;
|
import org.eclipse.jdt.annotation.NonNullByDefault;
|
||||||
import org.openhab.core.items.MetadataRegistry;
|
|
||||||
import org.openhab.core.persistence.FilterCriteria;
|
import org.openhab.core.persistence.FilterCriteria;
|
||||||
import org.openhab.persistence.influxdb.internal.FilterCriteriaQueryCreator;
|
import org.openhab.persistence.influxdb.internal.FilterCriteriaQueryCreator;
|
||||||
import org.openhab.persistence.influxdb.internal.InfluxDBConfiguration;
|
import org.openhab.persistence.influxdb.internal.InfluxDBConfiguration;
|
||||||
import org.openhab.persistence.influxdb.internal.InfluxDBMetadataUtils;
|
import org.openhab.persistence.influxdb.internal.InfluxDBMetadataService;
|
||||||
import org.openhab.persistence.influxdb.internal.InfluxDBVersion;
|
import org.openhab.persistence.influxdb.internal.InfluxDBVersion;
|
||||||
|
|
||||||
import com.influxdb.query.dsl.Flux;
|
import com.influxdb.query.dsl.Flux;
|
||||||
@ -37,15 +36,14 @@ import com.influxdb.query.dsl.functions.restriction.Restrictions;
|
|||||||
* @author Joan Pujol Espinar - Initial contribution
|
* @author Joan Pujol Espinar - Initial contribution
|
||||||
*/
|
*/
|
||||||
@NonNullByDefault
|
@NonNullByDefault
|
||||||
public class Influx2FilterCriteriaQueryCreatorImpl implements FilterCriteriaQueryCreator {
|
public class InfluxDB2FilterCriteriaQueryCreatorImpl implements FilterCriteriaQueryCreator {
|
||||||
|
private final InfluxDBConfiguration configuration;
|
||||||
|
private final InfluxDBMetadataService influxDBMetadataService;
|
||||||
|
|
||||||
private InfluxDBConfiguration configuration;
|
public InfluxDB2FilterCriteriaQueryCreatorImpl(InfluxDBConfiguration configuration,
|
||||||
private MetadataRegistry metadataRegistry;
|
InfluxDBMetadataService influxDBMetadataService) {
|
||||||
|
|
||||||
public Influx2FilterCriteriaQueryCreatorImpl(InfluxDBConfiguration configuration,
|
|
||||||
MetadataRegistry metadataRegistry) {
|
|
||||||
this.configuration = configuration;
|
this.configuration = configuration;
|
||||||
this.metadataRegistry = metadataRegistry;
|
this.influxDBMetadataService = influxDBMetadataService;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -54,26 +52,22 @@ public class Influx2FilterCriteriaQueryCreatorImpl implements FilterCriteriaQuer
|
|||||||
|
|
||||||
RangeFlux range = flux.range();
|
RangeFlux range = flux.range();
|
||||||
if (criteria.getBeginDate() != null) {
|
if (criteria.getBeginDate() != null) {
|
||||||
range = range.withStart(criteria.getBeginDate().toInstant());
|
range.withStart(criteria.getBeginDate().toInstant());
|
||||||
} else {
|
} else {
|
||||||
range = flux.range(-100L, ChronoUnit.YEARS); // Flux needs a mandatory start range
|
range = flux.range(-100L, ChronoUnit.YEARS); // Flux needs a mandatory start range
|
||||||
}
|
}
|
||||||
if (criteria.getEndDate() != null) {
|
if (criteria.getEndDate() != null) {
|
||||||
range = range.withStop(criteria.getEndDate().toInstant());
|
range.withStop(criteria.getEndDate().toInstant());
|
||||||
}
|
}
|
||||||
flux = range;
|
flux = range;
|
||||||
|
|
||||||
String itemName = criteria.getItemName();
|
String itemName = criteria.getItemName();
|
||||||
if (itemName != null) {
|
if (itemName != null) {
|
||||||
String measurementName = calculateMeasurementName(itemName);
|
String name = influxDBMetadataService.getMeasurementNameOrDefault(itemName, itemName);
|
||||||
boolean needsToUseItemTagName = !measurementName.equals(itemName);
|
String measurementName = configuration.isReplaceUnderscore() ? name.replace('_', '.') : name;
|
||||||
|
|
||||||
flux = flux.filter(measurement().equal(measurementName));
|
flux = flux.filter(measurement().equal(measurementName));
|
||||||
if (needsToUseItemTagName) {
|
if (!measurementName.equals(itemName)) {
|
||||||
flux = flux.filter(tag(TAG_ITEM_NAME).equal(itemName));
|
flux = flux.filter(tag(TAG_ITEM_NAME).equal(itemName));
|
||||||
}
|
|
||||||
|
|
||||||
if (needsToUseItemTagName) {
|
|
||||||
flux = flux.keep(new String[] { FIELD_MEASUREMENT_NAME, COLUMN_TIME_NAME_V2, COLUMN_VALUE_NAME_V2,
|
flux = flux.keep(new String[] { FIELD_MEASUREMENT_NAME, COLUMN_TIME_NAME_V2, COLUMN_VALUE_NAME_V2,
|
||||||
TAG_ITEM_NAME });
|
TAG_ITEM_NAME });
|
||||||
} else {
|
} else {
|
||||||
@ -112,16 +106,4 @@ public class Influx2FilterCriteriaQueryCreatorImpl implements FilterCriteriaQuer
|
|||||||
}
|
}
|
||||||
return flux;
|
return flux;
|
||||||
}
|
}
|
||||||
|
|
||||||
private String calculateMeasurementName(String itemName) {
|
|
||||||
String name = itemName;
|
|
||||||
|
|
||||||
name = InfluxDBMetadataUtils.calculateMeasurementNameFromMetadataIfPresent(metadataRegistry, name, itemName);
|
|
||||||
|
|
||||||
if (configuration.isReplaceUnderscore()) {
|
|
||||||
name = name.replace('_', '.');
|
|
||||||
}
|
|
||||||
|
|
||||||
return name;
|
|
||||||
}
|
|
||||||
}
|
}
|
@ -19,17 +19,18 @@ import java.util.Collections;
|
|||||||
import java.util.LinkedHashMap;
|
import java.util.LinkedHashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.stream.Collectors;
|
import java.util.Objects;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
import org.eclipse.jdt.annotation.NonNullByDefault;
|
import org.eclipse.jdt.annotation.NonNullByDefault;
|
||||||
import org.eclipse.jdt.annotation.Nullable;
|
import org.eclipse.jdt.annotation.Nullable;
|
||||||
|
import org.openhab.persistence.influxdb.internal.FilterCriteriaQueryCreator;
|
||||||
import org.openhab.persistence.influxdb.internal.InfluxDBConfiguration;
|
import org.openhab.persistence.influxdb.internal.InfluxDBConfiguration;
|
||||||
import org.openhab.persistence.influxdb.internal.InfluxDBConstants;
|
import org.openhab.persistence.influxdb.internal.InfluxDBConstants;
|
||||||
|
import org.openhab.persistence.influxdb.internal.InfluxDBMetadataService;
|
||||||
import org.openhab.persistence.influxdb.internal.InfluxDBRepository;
|
import org.openhab.persistence.influxdb.internal.InfluxDBRepository;
|
||||||
import org.openhab.persistence.influxdb.internal.InfluxPoint;
|
import org.openhab.persistence.influxdb.internal.InfluxPoint;
|
||||||
import org.openhab.persistence.influxdb.internal.InfluxRow;
|
import org.openhab.persistence.influxdb.internal.UnexpectedConditionException;
|
||||||
import org.openhab.persistence.influxdb.internal.UnnexpectedConditionException;
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
@ -51,38 +52,29 @@ import com.influxdb.query.FluxTable;
|
|||||||
@NonNullByDefault
|
@NonNullByDefault
|
||||||
public class InfluxDB2RepositoryImpl implements InfluxDBRepository {
|
public class InfluxDB2RepositoryImpl implements InfluxDBRepository {
|
||||||
private final Logger logger = LoggerFactory.getLogger(InfluxDB2RepositoryImpl.class);
|
private final Logger logger = LoggerFactory.getLogger(InfluxDB2RepositoryImpl.class);
|
||||||
private InfluxDBConfiguration configuration;
|
private final InfluxDBConfiguration configuration;
|
||||||
@Nullable
|
private final InfluxDBMetadataService influxDBMetadataService;
|
||||||
private InfluxDBClient client;
|
|
||||||
@Nullable
|
|
||||||
private QueryApi queryAPI;
|
|
||||||
@Nullable
|
|
||||||
private WriteApi writeAPI;
|
|
||||||
|
|
||||||
public InfluxDB2RepositoryImpl(InfluxDBConfiguration configuration) {
|
private @Nullable InfluxDBClient client;
|
||||||
|
private @Nullable QueryApi queryAPI;
|
||||||
|
private @Nullable WriteApi writeAPI;
|
||||||
|
|
||||||
|
public InfluxDB2RepositoryImpl(InfluxDBConfiguration configuration,
|
||||||
|
InfluxDBMetadataService influxDBMetadataService) {
|
||||||
this.configuration = configuration;
|
this.configuration = configuration;
|
||||||
|
this.influxDBMetadataService = influxDBMetadataService;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns if the client has been successfully connected to server
|
|
||||||
*
|
|
||||||
* @return True if it's connected, otherwise false
|
|
||||||
*/
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isConnected() {
|
public boolean isConnected() {
|
||||||
return client != null;
|
return client != null;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Connect to InfluxDB server
|
|
||||||
*
|
|
||||||
* @return True if successful, otherwise false
|
|
||||||
*/
|
|
||||||
@Override
|
@Override
|
||||||
public boolean connect() {
|
public boolean connect() {
|
||||||
InfluxDBClientOptions.Builder optionsBuilder = InfluxDBClientOptions.builder().url(configuration.getUrl())
|
InfluxDBClientOptions.Builder optionsBuilder = InfluxDBClientOptions.builder().url(configuration.getUrl())
|
||||||
.org(configuration.getDatabaseName()).bucket(configuration.getRetentionPolicy());
|
.org(configuration.getDatabaseName()).bucket(configuration.getRetentionPolicy());
|
||||||
char[] token = configuration.getTokenAsCharArray();
|
char[] token = configuration.getToken().toCharArray();
|
||||||
if (token.length > 0) {
|
if (token.length > 0) {
|
||||||
optionsBuilder.authenticateToken(token);
|
optionsBuilder.authenticateToken(token);
|
||||||
} else {
|
} else {
|
||||||
@ -92,15 +84,14 @@ public class InfluxDB2RepositoryImpl implements InfluxDBRepository {
|
|||||||
|
|
||||||
final InfluxDBClient createdClient = InfluxDBClientFactory.create(clientOptions);
|
final InfluxDBClient createdClient = InfluxDBClientFactory.create(clientOptions);
|
||||||
this.client = createdClient;
|
this.client = createdClient;
|
||||||
logger.debug("Succesfully connected to InfluxDB. Instance ready={}", createdClient.ready());
|
|
||||||
queryAPI = createdClient.getQueryApi();
|
queryAPI = createdClient.getQueryApi();
|
||||||
writeAPI = createdClient.getWriteApi();
|
writeAPI = createdClient.getWriteApi();
|
||||||
|
logger.debug("Successfully connected to InfluxDB. Instance ready={}", createdClient.ready());
|
||||||
|
|
||||||
return checkConnectionStatus();
|
return checkConnectionStatus();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Disconnect from InfluxDB server
|
|
||||||
*/
|
|
||||||
@Override
|
@Override
|
||||||
public void disconnect() {
|
public void disconnect() {
|
||||||
final InfluxDBClient currentClient = this.client;
|
final InfluxDBClient currentClient = this.client;
|
||||||
@ -110,11 +101,6 @@ public class InfluxDB2RepositoryImpl implements InfluxDBRepository {
|
|||||||
this.client = null;
|
this.client = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Check if connection is currently ready
|
|
||||||
*
|
|
||||||
* @return True if its ready, otherwise false
|
|
||||||
*/
|
|
||||||
@Override
|
@Override
|
||||||
public boolean checkConnectionStatus() {
|
public boolean checkConnectionStatus() {
|
||||||
final InfluxDBClient currentClient = client;
|
final InfluxDBClient currentClient = client;
|
||||||
@ -133,13 +119,8 @@ public class InfluxDB2RepositoryImpl implements InfluxDBRepository {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Write point to database
|
|
||||||
*
|
|
||||||
* @param point
|
|
||||||
*/
|
|
||||||
@Override
|
@Override
|
||||||
public void write(InfluxPoint point) {
|
public void write(InfluxPoint point) throws UnexpectedConditionException {
|
||||||
final WriteApi currentWriteAPI = writeAPI;
|
final WriteApi currentWriteAPI = writeAPI;
|
||||||
if (currentWriteAPI != null) {
|
if (currentWriteAPI != null) {
|
||||||
currentWriteAPI.writePoint(convertPointToClientFormat(point));
|
currentWriteAPI.writePoint(convertPointToClientFormat(point));
|
||||||
@ -148,14 +129,14 @@ public class InfluxDB2RepositoryImpl implements InfluxDBRepository {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private Point convertPointToClientFormat(InfluxPoint point) {
|
private Point convertPointToClientFormat(InfluxPoint point) throws UnexpectedConditionException {
|
||||||
Point clientPoint = Point.measurement(point.getMeasurementName()).time(point.getTime(), WritePrecision.MS);
|
Point clientPoint = Point.measurement(point.getMeasurementName()).time(point.getTime(), WritePrecision.MS);
|
||||||
setPointValue(point.getValue(), clientPoint);
|
setPointValue(point.getValue(), clientPoint);
|
||||||
point.getTags().entrySet().forEach(e -> clientPoint.addTag(e.getKey(), e.getValue()));
|
point.getTags().forEach(clientPoint::addTag);
|
||||||
return clientPoint;
|
return clientPoint;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void setPointValue(@Nullable Object value, Point point) {
|
private void setPointValue(@Nullable Object value, Point point) throws UnexpectedConditionException {
|
||||||
if (value instanceof String) {
|
if (value instanceof String) {
|
||||||
point.addField(FIELD_VALUE_NAME, (String) value);
|
point.addField(FIELD_VALUE_NAME, (String) value);
|
||||||
} else if (value instanceof Number) {
|
} else if (value instanceof Number) {
|
||||||
@ -165,49 +146,31 @@ public class InfluxDB2RepositoryImpl implements InfluxDBRepository {
|
|||||||
} else if (value == null) {
|
} else if (value == null) {
|
||||||
point.addField(FIELD_VALUE_NAME, (String) null);
|
point.addField(FIELD_VALUE_NAME, (String) null);
|
||||||
} else {
|
} else {
|
||||||
throw new UnnexpectedConditionException("Not expected value type");
|
throw new UnexpectedConditionException("Not expected value type");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Executes Flux query
|
|
||||||
*
|
|
||||||
* @param query Query
|
|
||||||
* @return Query results
|
|
||||||
*/
|
|
||||||
@Override
|
@Override
|
||||||
public List<InfluxRow> query(String query) {
|
public List<InfluxRow> query(String query) {
|
||||||
final QueryApi currentQueryAPI = queryAPI;
|
final QueryApi currentQueryAPI = queryAPI;
|
||||||
if (currentQueryAPI != null) {
|
if (currentQueryAPI != null) {
|
||||||
List<FluxTable> clientResult = currentQueryAPI.query(query);
|
List<FluxTable> clientResult = currentQueryAPI.query(query);
|
||||||
return convertClientResutToRepository(clientResult);
|
return clientResult.stream().flatMap(this::mapRawResultToHistoric).toList();
|
||||||
} else {
|
} else {
|
||||||
logger.warn("Returning empty list because queryAPI isn't present");
|
logger.warn("Returning empty list because queryAPI isn't present");
|
||||||
return Collections.emptyList();
|
return List.of();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<InfluxRow> convertClientResutToRepository(List<FluxTable> clientResult) {
|
|
||||||
return clientResult.stream().flatMap(this::mapRawResultToHistoric).collect(Collectors.toList());
|
|
||||||
}
|
|
||||||
|
|
||||||
private Stream<InfluxRow> mapRawResultToHistoric(FluxTable rawRow) {
|
private Stream<InfluxRow> mapRawResultToHistoric(FluxTable rawRow) {
|
||||||
return rawRow.getRecords().stream().map(r -> {
|
return rawRow.getRecords().stream().map(r -> {
|
||||||
String itemName = (String) r.getValueByKey(InfluxDBConstants.TAG_ITEM_NAME);
|
String itemName = (String) r.getValueByKey(InfluxDBConstants.TAG_ITEM_NAME);
|
||||||
if (itemName == null) { // use measurement name if item is not tagged
|
|
||||||
itemName = r.getMeasurement();
|
|
||||||
}
|
|
||||||
Object value = r.getValueByKey(COLUMN_VALUE_NAME_V2);
|
Object value = r.getValueByKey(COLUMN_VALUE_NAME_V2);
|
||||||
Instant time = (Instant) r.getValueByKey(COLUMN_TIME_NAME_V2);
|
Instant time = (Instant) r.getValueByKey(COLUMN_TIME_NAME_V2);
|
||||||
return new InfluxRow(time, itemName, value);
|
return new InfluxRow(time, itemName, value);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Return all stored item names with it's count of stored points
|
|
||||||
*
|
|
||||||
* @return Map with <ItemName,ItemCount> entries
|
|
||||||
*/
|
|
||||||
@Override
|
@Override
|
||||||
public Map<String, Integer> getStoredItemsCount() {
|
public Map<String, Integer> getStoredItemsCount() {
|
||||||
final QueryApi currentQueryAPI = queryAPI;
|
final QueryApi currentQueryAPI = queryAPI;
|
||||||
@ -221,13 +184,19 @@ public class InfluxDB2RepositoryImpl implements InfluxDBRepository {
|
|||||||
+ " |> group()";
|
+ " |> group()";
|
||||||
|
|
||||||
List<FluxTable> queryResult = currentQueryAPI.query(query);
|
List<FluxTable> queryResult = currentQueryAPI.query(query);
|
||||||
queryResult.stream().findFirst().orElse(new FluxTable()).getRecords().forEach(row -> {
|
Objects.requireNonNull(queryResult.stream().findFirst().orElse(new FluxTable())).getRecords()
|
||||||
result.put((String) row.getValueByKey(TAG_ITEM_NAME), ((Number) row.getValue()).intValue());
|
.forEach(row -> {
|
||||||
});
|
result.put((String) row.getValueByKey(TAG_ITEM_NAME), ((Number) row.getValue()).intValue());
|
||||||
|
});
|
||||||
return result;
|
return result;
|
||||||
} else {
|
} else {
|
||||||
logger.warn("Returning empty result because queryAPI isn't present");
|
logger.warn("Returning empty result because queryAPI isn't present");
|
||||||
return Collections.emptyMap();
|
return Collections.emptyMap();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public FilterCriteriaQueryCreator createQueryCreator() {
|
||||||
|
return new InfluxDB2FilterCriteriaQueryCreatorImpl(configuration, influxDBMetadataService);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,47 +0,0 @@
|
|||||||
/**
|
|
||||||
* Copyright (c) 2010-2023 Contributors to the openHAB project
|
|
||||||
*
|
|
||||||
* See the NOTICE file(s) distributed with this work for additional
|
|
||||||
* information.
|
|
||||||
*
|
|
||||||
* This program and the accompanying materials are made available under the
|
|
||||||
* terms of the Eclipse Public License 2.0 which is available at
|
|
||||||
* http://www.eclipse.org/legal/epl-2.0
|
|
||||||
*
|
|
||||||
* SPDX-License-Identifier: EPL-2.0
|
|
||||||
*/
|
|
||||||
package org.openhab.persistence.influxdb.internal;
|
|
||||||
|
|
||||||
import static org.openhab.persistence.influxdb.internal.InfluxDBConfiguration.*;
|
|
||||||
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
import org.eclipse.jdt.annotation.NonNullByDefault;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @author Joan Pujol Espinar - Initial contribution
|
|
||||||
*/
|
|
||||||
@NonNullByDefault
|
|
||||||
public class ConfigurationTestHelper {
|
|
||||||
|
|
||||||
public static Map<String, Object> createValidConfigurationParameters() {
|
|
||||||
Map<String, Object> config = new HashMap<>();
|
|
||||||
config.put(URL_PARAM, "http://localhost:8086");
|
|
||||||
config.put(VERSION_PARAM, InfluxDBVersion.V2.name());
|
|
||||||
config.put(TOKEN_PARAM, "sampletoken");
|
|
||||||
config.put(DATABASE_PARAM, "openhab");
|
|
||||||
config.put(RETENTION_POLICY_PARAM, "default");
|
|
||||||
return config;
|
|
||||||
}
|
|
||||||
|
|
||||||
public static InfluxDBConfiguration createValidConfiguration() {
|
|
||||||
return new InfluxDBConfiguration(createValidConfigurationParameters());
|
|
||||||
}
|
|
||||||
|
|
||||||
public static Map<String, Object> createInvalidConfigurationParameters() {
|
|
||||||
Map<String, Object> config = createValidConfigurationParameters();
|
|
||||||
config.remove(TOKEN_PARAM);
|
|
||||||
return config;
|
|
||||||
}
|
|
||||||
}
|
|
@ -12,15 +12,20 @@
|
|||||||
*/
|
*/
|
||||||
package org.openhab.persistence.influxdb.internal;
|
package org.openhab.persistence.influxdb.internal;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||||
import static org.mockito.ArgumentMatchers.any;
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
import static org.mockito.Mockito.*;
|
import static org.mockito.Mockito.*;
|
||||||
|
import static org.openhab.persistence.influxdb.internal.InfluxDBConfiguration.DATABASE_PARAM;
|
||||||
|
import static org.openhab.persistence.influxdb.internal.InfluxDBConfiguration.PASSWORD_PARAM;
|
||||||
|
import static org.openhab.persistence.influxdb.internal.InfluxDBConfiguration.RETENTION_POLICY_PARAM;
|
||||||
|
import static org.openhab.persistence.influxdb.internal.InfluxDBConfiguration.TOKEN_PARAM;
|
||||||
|
import static org.openhab.persistence.influxdb.internal.InfluxDBConfiguration.URL_PARAM;
|
||||||
|
import static org.openhab.persistence.influxdb.internal.InfluxDBConfiguration.USER_PARAM;
|
||||||
|
import static org.openhab.persistence.influxdb.internal.InfluxDBConfiguration.VERSION_PARAM;
|
||||||
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import org.eclipse.jdt.annotation.DefaultLocation;
|
|
||||||
import org.eclipse.jdt.annotation.NonNullByDefault;
|
import org.eclipse.jdt.annotation.NonNullByDefault;
|
||||||
import org.junit.jupiter.api.AfterEach;
|
|
||||||
import org.junit.jupiter.api.BeforeEach;
|
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
import org.junit.jupiter.api.extension.ExtendWith;
|
import org.junit.jupiter.api.extension.ExtendWith;
|
||||||
import org.mockito.Mock;
|
import org.mockito.Mock;
|
||||||
@ -33,74 +38,91 @@ import org.openhab.persistence.influxdb.InfluxDBPersistenceService;
|
|||||||
* @author Joan Pujol Espinar - Initial contribution
|
* @author Joan Pujol Espinar - Initial contribution
|
||||||
*/
|
*/
|
||||||
@ExtendWith(MockitoExtension.class)
|
@ExtendWith(MockitoExtension.class)
|
||||||
@NonNullByDefault(value = { DefaultLocation.PARAMETER, DefaultLocation.RETURN_TYPE })
|
@NonNullByDefault
|
||||||
public class InfluxDBPersistenceServiceTest {
|
public class InfluxDBPersistenceServiceTest {
|
||||||
private InfluxDBPersistenceService instance;
|
private static final Map<String, Object> VALID_V1_CONFIGURATION = Map.of( //
|
||||||
|
URL_PARAM, "http://localhost:8086", //
|
||||||
|
VERSION_PARAM, InfluxDBVersion.V1.name(), //
|
||||||
|
USER_PARAM, "user", PASSWORD_PARAM, "password", //
|
||||||
|
DATABASE_PARAM, "openhab", //
|
||||||
|
RETENTION_POLICY_PARAM, "default");
|
||||||
|
|
||||||
private @Mock InfluxDBRepository influxDBRepository;
|
private static final Map<String, Object> VALID_V2_CONFIGURATION = Map.of( //
|
||||||
|
URL_PARAM, "http://localhost:8086", //
|
||||||
|
VERSION_PARAM, InfluxDBVersion.V2.name(), //
|
||||||
|
TOKEN_PARAM, "sampletoken", //
|
||||||
|
DATABASE_PARAM, "openhab", //
|
||||||
|
RETENTION_POLICY_PARAM, "default");
|
||||||
|
|
||||||
private Map<String, Object> validConfig;
|
private static final Map<String, Object> INVALID_V1_CONFIGURATION = Map.of(//
|
||||||
private Map<String, Object> invalidConfig;
|
URL_PARAM, "http://localhost:8086", //
|
||||||
|
VERSION_PARAM, InfluxDBVersion.V1.name(), //
|
||||||
|
USER_PARAM, "user", //
|
||||||
|
DATABASE_PARAM, "openhab", //
|
||||||
|
RETENTION_POLICY_PARAM, "default");
|
||||||
|
|
||||||
@BeforeEach
|
private static final Map<String, Object> INVALID_V2_CONFIGURATION = Map.of( //
|
||||||
public void before() {
|
URL_PARAM, "http://localhost:8086", //
|
||||||
instance = new InfluxDBPersistenceService(mock(ItemRegistry.class), mock(MetadataRegistry.class)) {
|
VERSION_PARAM, InfluxDBVersion.V2.name(), //
|
||||||
@Override
|
DATABASE_PARAM, "openhab", //
|
||||||
protected InfluxDBRepository createInfluxDBRepository() {
|
RETENTION_POLICY_PARAM, "default");
|
||||||
return influxDBRepository;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
validConfig = ConfigurationTestHelper.createValidConfigurationParameters();
|
private @Mock @NonNullByDefault({}) InfluxDBRepository influxDBRepositoryMock;
|
||||||
invalidConfig = ConfigurationTestHelper.createInvalidConfigurationParameters();
|
|
||||||
}
|
|
||||||
|
|
||||||
@AfterEach
|
private final InfluxDBMetadataService influxDBMetadataService = new InfluxDBMetadataService(
|
||||||
public void after() {
|
mock(MetadataRegistry.class));
|
||||||
validConfig = null;
|
|
||||||
invalidConfig = null;
|
@Test
|
||||||
instance = null;
|
public void activateWithValidV1ConfigShouldConnectRepository() {
|
||||||
influxDBRepository = null;
|
getService(VALID_V1_CONFIGURATION);
|
||||||
|
verify(influxDBRepositoryMock).connect();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void activateWithValidConfigShouldConnectRepository() {
|
public void activateWithValidV2ConfigShouldConnectRepository() {
|
||||||
instance.activate(validConfig);
|
getService(VALID_V2_CONFIGURATION);
|
||||||
verify(influxDBRepository).connect();
|
verify(influxDBRepositoryMock).connect();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void activateWithInvalidConfigShouldNotConnectRepository() {
|
public void activateWithInvalidV1ConfigShouldFail() {
|
||||||
instance.activate(invalidConfig);
|
assertThrows(IllegalArgumentException.class, () -> getService(INVALID_V1_CONFIGURATION));
|
||||||
verify(influxDBRepository, never()).connect();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void activateWithNullConfigShouldNotConnectRepository() {
|
public void activateWithInvalidV2ShouldFail() {
|
||||||
instance.activate(null);
|
assertThrows(IllegalArgumentException.class, () -> getService(INVALID_V2_CONFIGURATION));
|
||||||
verify(influxDBRepository, never()).connect();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void deactivateShouldDisconnectRepository() {
|
public void deactivateShouldDisconnectRepository() {
|
||||||
instance.activate(validConfig);
|
InfluxDBPersistenceService instance = getService(VALID_V2_CONFIGURATION);
|
||||||
instance.deactivate();
|
instance.deactivate();
|
||||||
verify(influxDBRepository).disconnect();
|
verify(influxDBRepositoryMock).disconnect();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void storeItemWithConnectedRepository() {
|
public void storeItemWithConnectedRepository() throws UnexpectedConditionException {
|
||||||
instance.activate(validConfig);
|
InfluxDBPersistenceService instance = getService(VALID_V2_CONFIGURATION);
|
||||||
when(influxDBRepository.isConnected()).thenReturn(true);
|
when(influxDBRepositoryMock.isConnected()).thenReturn(true);
|
||||||
instance.store(ItemTestHelper.createNumberItem("number", 5));
|
instance.store(ItemTestHelper.createNumberItem("number", 5));
|
||||||
verify(influxDBRepository).write(any());
|
verify(influxDBRepositoryMock).write(any());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void storeItemWithDisconnectedRepositoryIsIgnored() {
|
public void storeItemWithDisconnectedRepositoryIsIgnored() throws UnexpectedConditionException {
|
||||||
instance.activate(validConfig);
|
InfluxDBPersistenceService instance = getService(VALID_V2_CONFIGURATION);
|
||||||
when(influxDBRepository.isConnected()).thenReturn(false);
|
when(influxDBRepositoryMock.isConnected()).thenReturn(false);
|
||||||
instance.store(ItemTestHelper.createNumberItem("number", 5));
|
instance.store(ItemTestHelper.createNumberItem("number", 5));
|
||||||
verify(influxDBRepository, never()).write(any());
|
verify(influxDBRepositoryMock, never()).write(any());
|
||||||
|
}
|
||||||
|
|
||||||
|
private InfluxDBPersistenceService getService(Map<String, Object> config) {
|
||||||
|
return new InfluxDBPersistenceService(mock(ItemRegistry.class), influxDBMetadataService, config) {
|
||||||
|
@Override
|
||||||
|
protected InfluxDBRepository createInfluxDBRepository() {
|
||||||
|
return influxDBRepositoryMock;
|
||||||
|
}
|
||||||
|
};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -36,8 +36,8 @@ import org.openhab.core.items.MetadataRegistry;
|
|||||||
import org.openhab.core.library.types.PercentType;
|
import org.openhab.core.library.types.PercentType;
|
||||||
import org.openhab.core.persistence.FilterCriteria;
|
import org.openhab.core.persistence.FilterCriteria;
|
||||||
import org.openhab.persistence.influxdb.InfluxDBPersistenceService;
|
import org.openhab.persistence.influxdb.InfluxDBPersistenceService;
|
||||||
import org.openhab.persistence.influxdb.internal.influx1.Influx1FilterCriteriaQueryCreatorImpl;
|
import org.openhab.persistence.influxdb.internal.influx1.InfluxDB1FilterCriteriaQueryCreatorImpl;
|
||||||
import org.openhab.persistence.influxdb.internal.influx2.Influx2FilterCriteriaQueryCreatorImpl;
|
import org.openhab.persistence.influxdb.internal.influx2.InfluxDB2FilterCriteriaQueryCreatorImpl;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author Joan Pujol Espinar - Initial contribution
|
* @author Joan Pujol Espinar - Initial contribution
|
||||||
@ -54,13 +54,14 @@ public class InfluxFilterCriteriaQueryCreatorImplTest {
|
|||||||
private @Mock InfluxDBConfiguration influxDBConfiguration;
|
private @Mock InfluxDBConfiguration influxDBConfiguration;
|
||||||
private @Mock MetadataRegistry metadataRegistry;
|
private @Mock MetadataRegistry metadataRegistry;
|
||||||
|
|
||||||
private Influx1FilterCriteriaQueryCreatorImpl instanceV1;
|
private InfluxDB1FilterCriteriaQueryCreatorImpl instanceV1;
|
||||||
private Influx2FilterCriteriaQueryCreatorImpl instanceV2;
|
private InfluxDB2FilterCriteriaQueryCreatorImpl instanceV2;
|
||||||
|
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
public void before() {
|
public void before() {
|
||||||
instanceV1 = new Influx1FilterCriteriaQueryCreatorImpl(influxDBConfiguration, metadataRegistry);
|
InfluxDBMetadataService influxDBMetadataService = new InfluxDBMetadataService(metadataRegistry);
|
||||||
instanceV2 = new Influx2FilterCriteriaQueryCreatorImpl(influxDBConfiguration, metadataRegistry);
|
instanceV1 = new InfluxDB1FilterCriteriaQueryCreatorImpl(influxDBConfiguration, influxDBMetadataService);
|
||||||
|
instanceV2 = new InfluxDB2FilterCriteriaQueryCreatorImpl(influxDBConfiguration, influxDBMetadataService);
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterEach
|
@AfterEach
|
||||||
@ -79,10 +80,11 @@ public class InfluxFilterCriteriaQueryCreatorImplTest {
|
|||||||
assertThat(queryV1, equalTo("SELECT \"value\"::field,\"item\"::tag FROM \"origin\".\"sampleItem\";"));
|
assertThat(queryV1, equalTo("SELECT \"value\"::field,\"item\"::tag FROM \"origin\".\"sampleItem\";"));
|
||||||
|
|
||||||
String queryV2 = instanceV2.createQuery(criteria, RETENTION_POLICY);
|
String queryV2 = instanceV2.createQuery(criteria, RETENTION_POLICY);
|
||||||
assertThat(queryV2,
|
assertThat(queryV2, equalTo("""
|
||||||
equalTo("from(bucket:\"origin\")\n\t" + "|> range(start:-100y)\n\t"
|
from(bucket:"origin")
|
||||||
+ "|> filter(fn: (r) => r[\"_measurement\"] == \"sampleItem\")\n\t"
|
\t|> range(start:-100y)
|
||||||
+ "|> keep(columns:[\"_measurement\", \"_time\", \"_value\"])"));
|
\t|> filter(fn: (r) => r["_measurement"] == "sampleItem")
|
||||||
|
\t|> keep(columns:["_measurement", "_time", "_value"])"""));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -112,10 +114,11 @@ public class InfluxFilterCriteriaQueryCreatorImplTest {
|
|||||||
assertThat(queryV1, equalTo(expectedQueryV1));
|
assertThat(queryV1, equalTo(expectedQueryV1));
|
||||||
|
|
||||||
String queryV2 = instanceV2.createQuery(criteria, RETENTION_POLICY);
|
String queryV2 = instanceV2.createQuery(criteria, RETENTION_POLICY);
|
||||||
String expectedQueryV2 = String.format(
|
String expectedQueryV2 = String.format("""
|
||||||
"from(bucket:\"origin\")\n\t" + "|> range(start:%s, stop:%s)\n\t"
|
from(bucket:"origin")
|
||||||
+ "|> filter(fn: (r) => r[\"_measurement\"] == \"sampleItem\")\n\t"
|
\t|> range(start:%s, stop:%s)
|
||||||
+ "|> keep(columns:[\"_measurement\", \"_time\", \"_value\"])",
|
\t|> filter(fn: (r) => r["_measurement"] == "sampleItem")
|
||||||
|
\t|> keep(columns:["_measurement", "_time", "_value"])""",
|
||||||
INFLUX2_DATE_FORMATTER.format(now.toInstant()), INFLUX2_DATE_FORMATTER.format(tomorrow.toInstant()));
|
INFLUX2_DATE_FORMATTER.format(now.toInstant()), INFLUX2_DATE_FORMATTER.format(tomorrow.toInstant()));
|
||||||
assertThat(queryV2, equalTo(expectedQueryV2));
|
assertThat(queryV2, equalTo(expectedQueryV2));
|
||||||
}
|
}
|
||||||
@ -131,11 +134,12 @@ public class InfluxFilterCriteriaQueryCreatorImplTest {
|
|||||||
equalTo("SELECT \"value\"::field,\"item\"::tag FROM \"origin\".\"sampleItem\" WHERE value <= 90;"));
|
equalTo("SELECT \"value\"::field,\"item\"::tag FROM \"origin\".\"sampleItem\" WHERE value <= 90;"));
|
||||||
|
|
||||||
String queryV2 = instanceV2.createQuery(criteria, RETENTION_POLICY);
|
String queryV2 = instanceV2.createQuery(criteria, RETENTION_POLICY);
|
||||||
assertThat(queryV2,
|
assertThat(queryV2, equalTo("""
|
||||||
equalTo("from(bucket:\"origin\")\n\t" + "|> range(start:-100y)\n\t"
|
from(bucket:"origin")
|
||||||
+ "|> filter(fn: (r) => r[\"_measurement\"] == \"sampleItem\")\n\t"
|
\t|> range(start:-100y)
|
||||||
+ "|> keep(columns:[\"_measurement\", \"_time\", \"_value\"])\n\t"
|
\t|> filter(fn: (r) => r["_measurement"] == "sampleItem")
|
||||||
+ "|> filter(fn: (r) => (r[\"_field\"] == \"value\" and r[\"_value\"] <= 90))"));
|
\t|> keep(columns:["_measurement", "_time", "_value"])
|
||||||
|
\t|> filter(fn: (r) => (r["_field"] == "value" and r["_value"] <= 90))"""));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -149,9 +153,12 @@ public class InfluxFilterCriteriaQueryCreatorImplTest {
|
|||||||
equalTo("SELECT \"value\"::field,\"item\"::tag FROM \"origin\".\"sampleItem\" LIMIT 10 OFFSET 20;"));
|
equalTo("SELECT \"value\"::field,\"item\"::tag FROM \"origin\".\"sampleItem\" LIMIT 10 OFFSET 20;"));
|
||||||
|
|
||||||
String queryV2 = instanceV2.createQuery(criteria, RETENTION_POLICY);
|
String queryV2 = instanceV2.createQuery(criteria, RETENTION_POLICY);
|
||||||
assertThat(queryV2, equalTo("from(bucket:\"origin\")\n\t" + "|> range(start:-100y)\n\t"
|
assertThat(queryV2, equalTo("""
|
||||||
+ "|> filter(fn: (r) => r[\"_measurement\"] == \"sampleItem\")\n\t"
|
from(bucket:"origin")
|
||||||
+ "|> keep(columns:[\"_measurement\", \"_time\", \"_value\"])\n\t" + "|> limit(n:10, offset:20)"));
|
\t|> range(start:-100y)
|
||||||
|
\t|> filter(fn: (r) => r["_measurement"] == "sampleItem")
|
||||||
|
\t|> keep(columns:["_measurement", "_time", "_value"])
|
||||||
|
\t|> limit(n:10, offset:20)"""));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -164,11 +171,12 @@ public class InfluxFilterCriteriaQueryCreatorImplTest {
|
|||||||
equalTo("SELECT \"value\"::field,\"item\"::tag FROM \"origin\".\"sampleItem\" ORDER BY time ASC;"));
|
equalTo("SELECT \"value\"::field,\"item\"::tag FROM \"origin\".\"sampleItem\" ORDER BY time ASC;"));
|
||||||
|
|
||||||
String queryV2 = instanceV2.createQuery(criteria, RETENTION_POLICY);
|
String queryV2 = instanceV2.createQuery(criteria, RETENTION_POLICY);
|
||||||
assertThat(queryV2,
|
assertThat(queryV2, equalTo("""
|
||||||
equalTo("from(bucket:\"origin\")\n\t" + "|> range(start:-100y)\n\t"
|
from(bucket:"origin")
|
||||||
+ "|> filter(fn: (r) => r[\"_measurement\"] == \"sampleItem\")\n\t"
|
\t|> range(start:-100y)
|
||||||
+ "|> keep(columns:[\"_measurement\", \"_time\", \"_value\"])\n\t"
|
\t|> filter(fn: (r) => r["_measurement"] == "sampleItem")
|
||||||
+ "|> sort(desc:false, columns:[\"_time\"])"));
|
\t|> keep(columns:["_measurement", "_time", "_value"])
|
||||||
|
\t|> sort(desc:false, columns:["_time"])"""));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -177,19 +185,17 @@ public class InfluxFilterCriteriaQueryCreatorImplTest {
|
|||||||
criteria.setOrdering(FilterCriteria.Ordering.DESCENDING);
|
criteria.setOrdering(FilterCriteria.Ordering.DESCENDING);
|
||||||
criteria.setPageSize(1);
|
criteria.setPageSize(1);
|
||||||
String queryV2 = instanceV2.createQuery(criteria, RETENTION_POLICY);
|
String queryV2 = instanceV2.createQuery(criteria, RETENTION_POLICY);
|
||||||
assertThat(queryV2,
|
assertThat(queryV2, equalTo("""
|
||||||
equalTo("from(bucket:\"origin\")\n\t" + "|> range(start:-100y)\n\t"
|
from(bucket:"origin")
|
||||||
+ "|> filter(fn: (r) => r[\"_measurement\"] == \"sampleItem\")\n\t"
|
\t|> range(start:-100y)
|
||||||
+ "|> keep(columns:[\"_measurement\", \"_time\", \"_value\"])\n\t" + "|> last()"));
|
\t|> filter(fn: (r) => r["_measurement"] == "sampleItem")
|
||||||
|
\t|> keep(columns:["_measurement", "_time", "_value"])
|
||||||
|
\t|> last()"""));
|
||||||
}
|
}
|
||||||
|
|
||||||
private FilterCriteria createBaseCriteria() {
|
private FilterCriteria createBaseCriteria() {
|
||||||
return createBaseCriteria(ITEM_NAME);
|
|
||||||
}
|
|
||||||
|
|
||||||
private FilterCriteria createBaseCriteria(String sampleItem) {
|
|
||||||
FilterCriteria criteria = new FilterCriteria();
|
FilterCriteria criteria = new FilterCriteria();
|
||||||
criteria.setItemName(sampleItem);
|
criteria.setItemName(ITEM_NAME);
|
||||||
criteria.setOrdering(null);
|
criteria.setOrdering(null);
|
||||||
return criteria;
|
return criteria;
|
||||||
}
|
}
|
||||||
@ -207,12 +213,12 @@ public class InfluxFilterCriteriaQueryCreatorImplTest {
|
|||||||
"SELECT \"value\"::field,\"item\"::tag FROM \"origin\".\"measurementName\" WHERE item = 'sampleItem';"));
|
"SELECT \"value\"::field,\"item\"::tag FROM \"origin\".\"measurementName\" WHERE item = 'sampleItem';"));
|
||||||
|
|
||||||
String queryV2 = instanceV2.createQuery(criteria, RETENTION_POLICY);
|
String queryV2 = instanceV2.createQuery(criteria, RETENTION_POLICY);
|
||||||
assertThat(queryV2,
|
assertThat(queryV2, equalTo("""
|
||||||
equalTo("from(bucket:\"origin\")\n\t" + "|> range(start:-100y)\n\t"
|
from(bucket:"origin")
|
||||||
+ "|> filter(fn: (r) => r[\"_measurement\"] == \"measurementName\")\n\t"
|
\t|> range(start:-100y)
|
||||||
+ "|> filter(fn: (r) => r[\"item\"] == \"sampleItem\")\n\t"
|
\t|> filter(fn: (r) => r["_measurement"] == "measurementName")
|
||||||
+ "|> keep(columns:[\"_measurement\", \"_time\", \"_value\", \"item\"])"));
|
\t|> filter(fn: (r) => r["item"] == "sampleItem")
|
||||||
|
\t|> keep(columns:["_measurement", "_time", "_value", "item"])"""));
|
||||||
when(metadataRegistry.get(metadataKey))
|
when(metadataRegistry.get(metadataKey))
|
||||||
.thenReturn(new Metadata(metadataKey, "", Map.of("key1", "val1", "key2", "val2")));
|
.thenReturn(new Metadata(metadataKey, "", Map.of("key1", "val1", "key2", "val2")));
|
||||||
|
|
||||||
@ -220,9 +226,10 @@ public class InfluxFilterCriteriaQueryCreatorImplTest {
|
|||||||
assertThat(queryV1, equalTo("SELECT \"value\"::field,\"item\"::tag FROM \"origin\".\"sampleItem\";"));
|
assertThat(queryV1, equalTo("SELECT \"value\"::field,\"item\"::tag FROM \"origin\".\"sampleItem\";"));
|
||||||
|
|
||||||
queryV2 = instanceV2.createQuery(criteria, RETENTION_POLICY);
|
queryV2 = instanceV2.createQuery(criteria, RETENTION_POLICY);
|
||||||
assertThat(queryV2,
|
assertThat(queryV2, equalTo("""
|
||||||
equalTo("from(bucket:\"origin\")\n\t" + "|> range(start:-100y)\n\t"
|
from(bucket:"origin")
|
||||||
+ "|> filter(fn: (r) => r[\"_measurement\"] == \"sampleItem\")\n\t"
|
\t|> range(start:-100y)
|
||||||
+ "|> keep(columns:[\"_measurement\", \"_time\", \"_value\"])"));
|
\t|> filter(fn: (r) => r["_measurement"] == "sampleItem")
|
||||||
|
\t|> keep(columns:["_measurement", "_time", "_value"])"""));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -23,6 +23,7 @@ import java.util.stream.Stream;
|
|||||||
import org.eclipse.jdt.annotation.DefaultLocation;
|
import org.eclipse.jdt.annotation.DefaultLocation;
|
||||||
import org.eclipse.jdt.annotation.NonNullByDefault;
|
import org.eclipse.jdt.annotation.NonNullByDefault;
|
||||||
import org.junit.jupiter.api.AfterEach;
|
import org.junit.jupiter.api.AfterEach;
|
||||||
|
import org.junit.jupiter.api.Assertions;
|
||||||
import org.junit.jupiter.api.BeforeEach;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
import org.junit.jupiter.api.extension.ExtendWith;
|
import org.junit.jupiter.api.extension.ExtendWith;
|
||||||
@ -40,7 +41,6 @@ import org.openhab.persistence.influxdb.InfluxDBPersistenceService;
|
|||||||
* @author Joan Pujol Espinar - Initial contribution
|
* @author Joan Pujol Espinar - Initial contribution
|
||||||
*/
|
*/
|
||||||
@ExtendWith(MockitoExtension.class)
|
@ExtendWith(MockitoExtension.class)
|
||||||
@SuppressWarnings("null") // In case of any NPE it will cause test fail that it's the expected result
|
|
||||||
@NonNullByDefault(value = { DefaultLocation.PARAMETER, DefaultLocation.RETURN_TYPE })
|
@NonNullByDefault(value = { DefaultLocation.PARAMETER, DefaultLocation.RETURN_TYPE })
|
||||||
public class ItemToStorePointCreatorTest {
|
public class ItemToStorePointCreatorTest {
|
||||||
|
|
||||||
@ -50,12 +50,13 @@ public class ItemToStorePointCreatorTest {
|
|||||||
|
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
public void before() {
|
public void before() {
|
||||||
|
InfluxDBMetadataService influxDBMetadataService = new InfluxDBMetadataService(metadataRegistry);
|
||||||
when(influxDBConfiguration.isAddCategoryTag()).thenReturn(false);
|
when(influxDBConfiguration.isAddCategoryTag()).thenReturn(false);
|
||||||
when(influxDBConfiguration.isAddLabelTag()).thenReturn(false);
|
when(influxDBConfiguration.isAddLabelTag()).thenReturn(false);
|
||||||
when(influxDBConfiguration.isAddTypeTag()).thenReturn(false);
|
when(influxDBConfiguration.isAddTypeTag()).thenReturn(false);
|
||||||
when(influxDBConfiguration.isReplaceUnderscore()).thenReturn(false);
|
when(influxDBConfiguration.isReplaceUnderscore()).thenReturn(false);
|
||||||
|
|
||||||
instance = new ItemToStorePointCreator(influxDBConfiguration, metadataRegistry);
|
instance = new ItemToStorePointCreator(influxDBConfiguration, influxDBMetadataService);
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterEach
|
@AfterEach
|
||||||
@ -71,11 +72,17 @@ public class ItemToStorePointCreatorTest {
|
|||||||
NumberItem item = ItemTestHelper.createNumberItem("myitem", number);
|
NumberItem item = ItemTestHelper.createNumberItem("myitem", number);
|
||||||
InfluxPoint point = instance.convert(item, null);
|
InfluxPoint point = instance.convert(item, null);
|
||||||
|
|
||||||
|
if (point == null) {
|
||||||
|
Assertions.fail("'point' is null");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
assertThat(point.getMeasurementName(), equalTo(item.getName()));
|
assertThat(point.getMeasurementName(), equalTo(item.getName()));
|
||||||
assertThat("Must Store item name", point.getTags(), hasEntry("item", item.getName()));
|
assertThat("Must Store item name", point.getTags(), hasEntry("item", item.getName()));
|
||||||
assertThat(point.getValue(), equalTo(new BigDecimal(number.toString())));
|
assertThat(point.getValue(), equalTo(new BigDecimal(number.toString())));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unused")
|
||||||
private static Stream<Number> convertBasicItem() {
|
private static Stream<Number> convertBasicItem() {
|
||||||
return Stream.of(5, 5.5, 5L);
|
return Stream.of(5, 5.5, 5L);
|
||||||
}
|
}
|
||||||
@ -84,6 +91,12 @@ public class ItemToStorePointCreatorTest {
|
|||||||
public void shouldUseAliasAsMeasurementNameIfProvided() {
|
public void shouldUseAliasAsMeasurementNameIfProvided() {
|
||||||
NumberItem item = ItemTestHelper.createNumberItem("myitem", 5);
|
NumberItem item = ItemTestHelper.createNumberItem("myitem", 5);
|
||||||
InfluxPoint point = instance.convert(item, "aliasName");
|
InfluxPoint point = instance.convert(item, "aliasName");
|
||||||
|
|
||||||
|
if (point == null) {
|
||||||
|
Assertions.fail("'point' is null");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
assertThat(point.getMeasurementName(), is("aliasName"));
|
assertThat(point.getMeasurementName(), is("aliasName"));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -94,10 +107,22 @@ public class ItemToStorePointCreatorTest {
|
|||||||
|
|
||||||
when(influxDBConfiguration.isAddCategoryTag()).thenReturn(true);
|
when(influxDBConfiguration.isAddCategoryTag()).thenReturn(true);
|
||||||
InfluxPoint point = instance.convert(item, null);
|
InfluxPoint point = instance.convert(item, null);
|
||||||
|
|
||||||
|
if (point == null) {
|
||||||
|
Assertions.fail("'point' is null");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
assertThat(point.getTags(), hasEntry(InfluxDBConstants.TAG_CATEGORY_NAME, "categoryValue"));
|
assertThat(point.getTags(), hasEntry(InfluxDBConstants.TAG_CATEGORY_NAME, "categoryValue"));
|
||||||
|
|
||||||
when(influxDBConfiguration.isAddCategoryTag()).thenReturn(false);
|
when(influxDBConfiguration.isAddCategoryTag()).thenReturn(false);
|
||||||
point = instance.convert(item, null);
|
point = instance.convert(item, null);
|
||||||
|
|
||||||
|
if (point == null) {
|
||||||
|
Assertions.fail("'point' is null");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
assertThat(point.getTags(), not(hasKey(InfluxDBConstants.TAG_CATEGORY_NAME)));
|
assertThat(point.getTags(), not(hasKey(InfluxDBConstants.TAG_CATEGORY_NAME)));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -107,10 +132,22 @@ public class ItemToStorePointCreatorTest {
|
|||||||
|
|
||||||
when(influxDBConfiguration.isAddTypeTag()).thenReturn(true);
|
when(influxDBConfiguration.isAddTypeTag()).thenReturn(true);
|
||||||
InfluxPoint point = instance.convert(item, null);
|
InfluxPoint point = instance.convert(item, null);
|
||||||
|
|
||||||
|
if (point == null) {
|
||||||
|
Assertions.fail("'point' is null");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
assertThat(point.getTags(), hasEntry(InfluxDBConstants.TAG_TYPE_NAME, "Number"));
|
assertThat(point.getTags(), hasEntry(InfluxDBConstants.TAG_TYPE_NAME, "Number"));
|
||||||
|
|
||||||
when(influxDBConfiguration.isAddTypeTag()).thenReturn(false);
|
when(influxDBConfiguration.isAddTypeTag()).thenReturn(false);
|
||||||
point = instance.convert(item, null);
|
point = instance.convert(item, null);
|
||||||
|
|
||||||
|
if (point == null) {
|
||||||
|
Assertions.fail("'point' is null");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
assertThat(point.getTags(), not(hasKey(InfluxDBConstants.TAG_TYPE_NAME)));
|
assertThat(point.getTags(), not(hasKey(InfluxDBConstants.TAG_TYPE_NAME)));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -121,10 +158,22 @@ public class ItemToStorePointCreatorTest {
|
|||||||
|
|
||||||
when(influxDBConfiguration.isAddLabelTag()).thenReturn(true);
|
when(influxDBConfiguration.isAddLabelTag()).thenReturn(true);
|
||||||
InfluxPoint point = instance.convert(item, null);
|
InfluxPoint point = instance.convert(item, null);
|
||||||
|
|
||||||
|
if (point == null) {
|
||||||
|
Assertions.fail("'point' is null");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
assertThat(point.getTags(), hasEntry(InfluxDBConstants.TAG_LABEL_NAME, "ItemLabel"));
|
assertThat(point.getTags(), hasEntry(InfluxDBConstants.TAG_LABEL_NAME, "ItemLabel"));
|
||||||
|
|
||||||
when(influxDBConfiguration.isAddLabelTag()).thenReturn(false);
|
when(influxDBConfiguration.isAddLabelTag()).thenReturn(false);
|
||||||
point = instance.convert(item, null);
|
point = instance.convert(item, null);
|
||||||
|
|
||||||
|
if (point == null) {
|
||||||
|
Assertions.fail("'point' is null");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
assertThat(point.getTags(), not(hasKey(InfluxDBConstants.TAG_LABEL_NAME)));
|
assertThat(point.getTags(), not(hasKey(InfluxDBConstants.TAG_LABEL_NAME)));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -137,6 +186,12 @@ public class ItemToStorePointCreatorTest {
|
|||||||
.thenReturn(new Metadata(metadataKey, "", Map.of("key1", "val1", "key2", "val2")));
|
.thenReturn(new Metadata(metadataKey, "", Map.of("key1", "val1", "key2", "val2")));
|
||||||
|
|
||||||
InfluxPoint point = instance.convert(item, null);
|
InfluxPoint point = instance.convert(item, null);
|
||||||
|
|
||||||
|
if (point == null) {
|
||||||
|
Assertions.fail("'point' is null");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
assertThat(point.getTags(), hasEntry("key1", "val1"));
|
assertThat(point.getTags(), hasEntry("key1", "val1"));
|
||||||
assertThat(point.getTags(), hasEntry("key2", "val2"));
|
assertThat(point.getTags(), hasEntry("key2", "val2"));
|
||||||
}
|
}
|
||||||
@ -147,9 +202,17 @@ public class ItemToStorePointCreatorTest {
|
|||||||
MetadataKey metadataKey = new MetadataKey(InfluxDBPersistenceService.SERVICE_NAME, item.getName());
|
MetadataKey metadataKey = new MetadataKey(InfluxDBPersistenceService.SERVICE_NAME, item.getName());
|
||||||
|
|
||||||
InfluxPoint point = instance.convert(item, null);
|
InfluxPoint point = instance.convert(item, null);
|
||||||
|
if (point == null) {
|
||||||
|
Assertions.fail();
|
||||||
|
return;
|
||||||
|
}
|
||||||
assertThat(point.getMeasurementName(), equalTo(item.getName()));
|
assertThat(point.getMeasurementName(), equalTo(item.getName()));
|
||||||
|
|
||||||
point = instance.convert(item, null);
|
point = instance.convert(item, null);
|
||||||
|
if (point == null) {
|
||||||
|
Assertions.fail();
|
||||||
|
return;
|
||||||
|
}
|
||||||
assertThat(point.getMeasurementName(), equalTo(item.getName()));
|
assertThat(point.getMeasurementName(), equalTo(item.getName()));
|
||||||
assertThat(point.getTags(), hasEntry("item", item.getName()));
|
assertThat(point.getTags(), hasEntry("item", item.getName()));
|
||||||
|
|
||||||
@ -157,6 +220,10 @@ public class ItemToStorePointCreatorTest {
|
|||||||
.thenReturn(new Metadata(metadataKey, "measurementName", Map.of("key1", "val1", "key2", "val2")));
|
.thenReturn(new Metadata(metadataKey, "measurementName", Map.of("key1", "val1", "key2", "val2")));
|
||||||
|
|
||||||
point = instance.convert(item, null);
|
point = instance.convert(item, null);
|
||||||
|
if (point == null) {
|
||||||
|
Assertions.fail();
|
||||||
|
return;
|
||||||
|
}
|
||||||
assertThat(point.getMeasurementName(), equalTo("measurementName"));
|
assertThat(point.getMeasurementName(), equalTo("measurementName"));
|
||||||
assertThat(point.getTags(), hasEntry("item", item.getName()));
|
assertThat(point.getTags(), hasEntry("item", item.getName()));
|
||||||
|
|
||||||
@ -164,6 +231,10 @@ public class ItemToStorePointCreatorTest {
|
|||||||
.thenReturn(new Metadata(metadataKey, "", Map.of("key1", "val1", "key2", "val2")));
|
.thenReturn(new Metadata(metadataKey, "", Map.of("key1", "val1", "key2", "val2")));
|
||||||
|
|
||||||
point = instance.convert(item, null);
|
point = instance.convert(item, null);
|
||||||
|
if (point == null) {
|
||||||
|
Assertions.fail();
|
||||||
|
return;
|
||||||
|
}
|
||||||
assertThat(point.getMeasurementName(), equalTo(item.getName()));
|
assertThat(point.getMeasurementName(), equalTo(item.getName()));
|
||||||
assertThat(point.getTags(), hasEntry("item", item.getName()));
|
assertThat(point.getTags(), hasEntry("item", item.getName()));
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user