mirror of
https://github.com/danieldemus/openhab-core.git
synced 2025-01-10 13:21:53 +01:00
Allow sending TimeSeries for items (#3597)
Signed-off-by: Jan N. Klug <github@klug.nrw>
This commit is contained in:
parent
fb6f1923e8
commit
cdbca4dd0a
@ -12,6 +12,7 @@
|
||||
*/
|
||||
package org.openhab.core.automation.module.script.defaultscope;
|
||||
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.Map;
|
||||
|
||||
import org.eclipse.jdt.annotation.NonNullByDefault;
|
||||
@ -19,6 +20,7 @@ import org.eclipse.jdt.annotation.Nullable;
|
||||
import org.openhab.core.items.Item;
|
||||
import org.openhab.core.types.Command;
|
||||
import org.openhab.core.types.State;
|
||||
import org.openhab.core.types.TimeSeries;
|
||||
|
||||
/**
|
||||
* The static methods of this class are made available as functions in the scripts.
|
||||
@ -98,7 +100,6 @@ public interface ScriptBusEvent {
|
||||
|
||||
/**
|
||||
* Posts a status update for a specified item to the event bus.
|
||||
* t
|
||||
*
|
||||
* @param item the item to send the status update for
|
||||
* @param state the new state of the item
|
||||
@ -106,6 +107,26 @@ public interface ScriptBusEvent {
|
||||
@Nullable
|
||||
Object postUpdate(@Nullable Item item, @Nullable State state);
|
||||
|
||||
/**
|
||||
* Sends a time series to the event bus
|
||||
*
|
||||
* @param item the item to send the time series for
|
||||
* @param timeSeries a {@link TimeSeries} containing policy and values
|
||||
*/
|
||||
@Nullable
|
||||
Object sendTimeSeries(@Nullable Item item, @Nullable TimeSeries timeSeries);
|
||||
|
||||
/**
|
||||
* Sends a time series to the event bus
|
||||
*
|
||||
* @param itemName the name of the item to send the status update for
|
||||
* @param values a {@link Map} containing the timeseries, composed of pairs of {@link ZonedDateTime} and
|
||||
* {@link State}
|
||||
* @param policy either <code>ADD</code> or <code>REPLACE</code>
|
||||
*/
|
||||
@Nullable
|
||||
Object sendTimeSeries(@Nullable String itemName, @Nullable Map<ZonedDateTime, State> values, String policy);
|
||||
|
||||
/**
|
||||
* Stores the current states for a list of items in a map.
|
||||
* A group item is not itself put into the map, but instead all its members.
|
||||
|
@ -12,6 +12,7 @@
|
||||
*/
|
||||
package org.openhab.core.automation.module.script.internal.defaultscope;
|
||||
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
@ -27,6 +28,7 @@ import org.openhab.core.items.ItemRegistry;
|
||||
import org.openhab.core.items.events.ItemEventFactory;
|
||||
import org.openhab.core.types.Command;
|
||||
import org.openhab.core.types.State;
|
||||
import org.openhab.core.types.TimeSeries;
|
||||
import org.openhab.core.types.TypeParser;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@ -153,6 +155,31 @@ public class ScriptBusEventImpl implements ScriptBusEvent {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public @Nullable Object sendTimeSeries(@Nullable Item item, @Nullable TimeSeries timeSeries) {
|
||||
EventPublisher eventPublisher1 = this.eventPublisher;
|
||||
if (eventPublisher1 != null && item != null && timeSeries != null) {
|
||||
eventPublisher1.post(ItemEventFactory.createTimeSeriesEvent(item.getName(), timeSeries, null));
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public @Nullable Object sendTimeSeries(@Nullable String itemName, @Nullable Map<ZonedDateTime, State> values,
|
||||
@Nullable String policy) {
|
||||
EventPublisher eventPublisher1 = this.eventPublisher;
|
||||
if (eventPublisher1 != null && itemName != null && values != null && policy != null) {
|
||||
try {
|
||||
TimeSeries timeSeries = new TimeSeries(TimeSeries.Policy.valueOf(policy));
|
||||
values.forEach((key, value) -> timeSeries.add(key.toInstant(), value));
|
||||
eventPublisher1.post(ItemEventFactory.createTimeSeriesEvent(itemName, timeSeries, null));
|
||||
} catch (IllegalArgumentException e) {
|
||||
LoggerFactory.getLogger(ScriptBusEventImpl.class).warn("Policy '{}' does not exist.", policy);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<Item, State> storeStates(Item @Nullable... items) {
|
||||
Map<Item, State> statesMap = new HashMap<>();
|
||||
|
@ -43,4 +43,10 @@ public class GlobalStrategies {
|
||||
return "restoreOnStartup";
|
||||
};
|
||||
};
|
||||
public static final Strategy FORECAST = new StrategyImpl() {
|
||||
@Override
|
||||
public String getName() {
|
||||
return "forecast";
|
||||
};
|
||||
};
|
||||
}
|
||||
|
@ -38,6 +38,7 @@ public class PersistenceGlobalScopeProvider extends AbstractGlobalScopeProvider
|
||||
res.getContents().add(GlobalStrategies.UPDATE);
|
||||
res.getContents().add(GlobalStrategies.CHANGE);
|
||||
res.getContents().add(GlobalStrategies.RESTORE);
|
||||
res.getContents().add(GlobalStrategies.FORECAST);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -12,11 +12,13 @@
|
||||
*/
|
||||
package org.openhab.core.model.script.actions;
|
||||
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
|
||||
import org.eclipse.jdt.annotation.Nullable;
|
||||
import org.openhab.core.events.EventPublisher;
|
||||
import org.openhab.core.items.GroupItem;
|
||||
import org.openhab.core.items.Item;
|
||||
@ -26,6 +28,7 @@ import org.openhab.core.items.events.ItemEventFactory;
|
||||
import org.openhab.core.model.script.ScriptServiceUtil;
|
||||
import org.openhab.core.types.Command;
|
||||
import org.openhab.core.types.State;
|
||||
import org.openhab.core.types.TimeSeries;
|
||||
import org.openhab.core.types.TypeParser;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@ -146,7 +149,7 @@ public class BusEvent {
|
||||
* Posts a status update for a specified item to the event bus.
|
||||
*
|
||||
* @param itemName the name of the item to send the status update for
|
||||
* @param stateAsString the new state of the item
|
||||
* @param stateString the new state of the item
|
||||
*/
|
||||
public static Object postUpdate(String itemName, String stateString) {
|
||||
ItemRegistry registry = ScriptServiceUtil.getItemRegistry();
|
||||
@ -169,6 +172,43 @@ public class BusEvent {
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends a time series to the event bus
|
||||
*
|
||||
* @param item the item to send the time series for
|
||||
* @param timeSeries a {@link TimeSeries} containing policy and values
|
||||
*/
|
||||
public static Object sendTimeSeries(@Nullable Item item, @Nullable TimeSeries timeSeries) {
|
||||
EventPublisher eventPublisher1 = ScriptServiceUtil.getEventPublisher();
|
||||
if (eventPublisher1 != null && item != null && timeSeries != null) {
|
||||
eventPublisher1.post(ItemEventFactory.createTimeSeriesEvent(item.getName(), timeSeries, null));
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends a time series to the event bus
|
||||
*
|
||||
* @param itemName the name of the item to send the status update for
|
||||
* @param values a {@link Map} containing the timeseries, composed of pairs of {@link ZonedDateTime} and
|
||||
* {@link State}
|
||||
* @param policy either <code>ADD</code> or <code>REPLACE</code>
|
||||
*/
|
||||
public static Object sendTimeSeries(@Nullable String itemName, @Nullable Map<ZonedDateTime, State> values,
|
||||
String policy) {
|
||||
EventPublisher eventPublisher1 = ScriptServiceUtil.getEventPublisher();
|
||||
if (eventPublisher1 != null && itemName != null && values != null && policy != null) {
|
||||
try {
|
||||
TimeSeries timeSeries = new TimeSeries(TimeSeries.Policy.valueOf(policy));
|
||||
values.forEach((key, value) -> timeSeries.add(key.toInstant(), value));
|
||||
eventPublisher1.post(ItemEventFactory.createTimeSeriesEvent(itemName, timeSeries, null));
|
||||
} catch (IllegalArgumentException e) {
|
||||
LoggerFactory.getLogger(BusEvent.class).warn("Policy '{}' does not exist.", policy);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private static <T extends State> List<String> getAcceptedDataTypeNames(Item item) {
|
||||
return item.getAcceptedDataTypes().stream().map(Class::getSimpleName).toList();
|
||||
}
|
||||
@ -232,6 +272,4 @@ public class BusEvent {
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
// static public JobKey timer(AbstractInstant instant, Object)
|
||||
}
|
||||
|
@ -15,6 +15,7 @@ package org.openhab.core.persistence;
|
||||
import java.time.ZonedDateTime;
|
||||
|
||||
import org.eclipse.jdt.annotation.NonNullByDefault;
|
||||
import org.eclipse.jdt.annotation.Nullable;
|
||||
import org.openhab.core.items.Item;
|
||||
import org.openhab.core.types.State;
|
||||
|
||||
@ -46,6 +47,25 @@ public interface ModifiablePersistenceService extends QueryablePersistenceServic
|
||||
*/
|
||||
void store(Item item, ZonedDateTime date, State state);
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* Stores the historic item value under a specified alias. This allows the item, time and value to be specified.
|
||||
*
|
||||
* <p>
|
||||
* Adding data with the same time as an existing record should update the current record value rather than adding a
|
||||
* new record.
|
||||
*
|
||||
* <p>
|
||||
* Implementors should keep in mind that all registered {@link PersistenceService}s are called synchronously. Hence
|
||||
* long running operations should be processed asynchronously. E.g. <code>store</code> adds things to a queue which
|
||||
* is processed by some asynchronous workers (Quartz Job, Thread, etc.).
|
||||
*
|
||||
* @param item the data to be stored
|
||||
* @param date the date of the record
|
||||
* @param state the state to be recorded
|
||||
*/
|
||||
void store(Item item, ZonedDateTime date, State state, @Nullable String alias);
|
||||
|
||||
/**
|
||||
* Removes data associated with an item from a persistence service.
|
||||
* If all data is removed for the specified item, the persistence service should free any resources associated with
|
||||
|
@ -12,6 +12,14 @@
|
||||
*/
|
||||
package org.openhab.core.persistence.internal;
|
||||
|
||||
import static org.openhab.core.persistence.FilterCriteria.Ordering.ASCENDING;
|
||||
import static org.openhab.core.persistence.strategy.PersistenceStrategy.Globals.FORECAST;
|
||||
import static org.openhab.core.persistence.strategy.PersistenceStrategy.Globals.RESTORE;
|
||||
import static org.openhab.core.persistence.strategy.PersistenceStrategy.Globals.UPDATE;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.time.ZoneId;
|
||||
import java.time.ZonedDateTime;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
@ -23,6 +31,7 @@ import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
@ -37,8 +46,10 @@ import org.openhab.core.items.ItemNotFoundException;
|
||||
import org.openhab.core.items.ItemRegistry;
|
||||
import org.openhab.core.items.ItemRegistryChangeListener;
|
||||
import org.openhab.core.items.StateChangeListener;
|
||||
import org.openhab.core.items.TimeSeriesListener;
|
||||
import org.openhab.core.persistence.FilterCriteria;
|
||||
import org.openhab.core.persistence.HistoricItem;
|
||||
import org.openhab.core.persistence.ModifiablePersistenceService;
|
||||
import org.openhab.core.persistence.PersistenceItemConfiguration;
|
||||
import org.openhab.core.persistence.PersistenceService;
|
||||
import org.openhab.core.persistence.QueryablePersistenceService;
|
||||
@ -53,12 +64,14 @@ import org.openhab.core.persistence.strategy.PersistenceCronStrategy;
|
||||
import org.openhab.core.persistence.strategy.PersistenceStrategy;
|
||||
import org.openhab.core.scheduler.CronScheduler;
|
||||
import org.openhab.core.scheduler.ScheduledCompletableFuture;
|
||||
import org.openhab.core.scheduler.Scheduler;
|
||||
import org.openhab.core.service.ReadyMarker;
|
||||
import org.openhab.core.service.ReadyMarkerFilter;
|
||||
import org.openhab.core.service.ReadyService;
|
||||
import org.openhab.core.service.ReadyService.ReadyTracker;
|
||||
import org.openhab.core.service.StartLevelService;
|
||||
import org.openhab.core.types.State;
|
||||
import org.openhab.core.types.TimeSeries;
|
||||
import org.openhab.core.types.UnDefType;
|
||||
import org.osgi.service.component.annotations.Activate;
|
||||
import org.osgi.service.component.annotations.Component;
|
||||
@ -75,18 +88,19 @@ import org.slf4j.LoggerFactory;
|
||||
* @author Kai Kreuzer - Initial contribution
|
||||
* @author Markus Rathgeb - Separation of persistence core and model, drop Quartz usage.
|
||||
* @author Jan N. Klug - Refactored to use service configuration registry
|
||||
* @author Jan N. Klug - Added time series support
|
||||
*/
|
||||
@Component(immediate = true)
|
||||
@NonNullByDefault
|
||||
public class PersistenceManager implements ItemRegistryChangeListener, StateChangeListener, ReadyTracker,
|
||||
PersistenceServiceConfigurationRegistryChangeListener {
|
||||
|
||||
PersistenceServiceConfigurationRegistryChangeListener, TimeSeriesListener {
|
||||
private final Logger logger = LoggerFactory.getLogger(PersistenceManager.class);
|
||||
|
||||
private final ReadyMarker marker = new ReadyMarker("persistence", "restore");
|
||||
|
||||
// the scheduler used for timer events
|
||||
private final CronScheduler scheduler;
|
||||
private final CronScheduler cronScheduler;
|
||||
private final Scheduler scheduler;
|
||||
private final ItemRegistry itemRegistry;
|
||||
private final SafeCaller safeCaller;
|
||||
private final ReadyService readyService;
|
||||
@ -97,9 +111,11 @@ public class PersistenceManager implements ItemRegistryChangeListener, StateChan
|
||||
private final Map<String, PersistenceServiceContainer> persistenceServiceContainers = new ConcurrentHashMap<>();
|
||||
|
||||
@Activate
|
||||
public PersistenceManager(final @Reference CronScheduler scheduler, final @Reference ItemRegistry itemRegistry,
|
||||
final @Reference SafeCaller safeCaller, final @Reference ReadyService readyService,
|
||||
public PersistenceManager(final @Reference CronScheduler cronScheduler, final @Reference Scheduler scheduler,
|
||||
final @Reference ItemRegistry itemRegistry, final @Reference SafeCaller safeCaller,
|
||||
final @Reference ReadyService readyService,
|
||||
final @Reference PersistenceServiceConfigurationRegistry persistenceServiceConfigurationRegistry) {
|
||||
this.cronScheduler = cronScheduler;
|
||||
this.scheduler = scheduler;
|
||||
this.itemRegistry = itemRegistry;
|
||||
this.safeCaller = safeCaller;
|
||||
@ -118,6 +134,7 @@ public class PersistenceManager implements ItemRegistryChangeListener, StateChan
|
||||
started = false;
|
||||
|
||||
persistenceServiceContainers.values().forEach(PersistenceServiceContainer::cancelPersistJobs);
|
||||
persistenceServiceContainers.values().forEach(PersistenceServiceContainer::cancelForecastJobs);
|
||||
|
||||
// remove item state change listeners
|
||||
itemRegistry.stream().filter(GenericItem.class::isInstance)
|
||||
@ -136,6 +153,7 @@ public class PersistenceManager implements ItemRegistryChangeListener, StateChan
|
||||
if (oldContainer != null) { // cancel all jobs if the persistence service is set and an old configuration is
|
||||
// already present
|
||||
oldContainer.cancelPersistJobs();
|
||||
oldContainer.cancelForecastJobs();
|
||||
}
|
||||
|
||||
if (started) {
|
||||
@ -147,6 +165,7 @@ public class PersistenceManager implements ItemRegistryChangeListener, StateChan
|
||||
PersistenceServiceContainer container = persistenceServiceContainers.remove(persistenceService.getId());
|
||||
if (container != null) {
|
||||
container.cancelPersistJobs();
|
||||
container.cancelForecastJobs();
|
||||
}
|
||||
}
|
||||
|
||||
@ -237,61 +256,8 @@ public class PersistenceManager implements ItemRegistryChangeListener, StateChan
|
||||
return items;
|
||||
}
|
||||
|
||||
/**
|
||||
* Handles the "restoreOnStartup" strategy for the item.
|
||||
* If the item state is still undefined when entering this method, all persistence configurations are checked,
|
||||
* if they have the "restoreOnStartup" strategy configured for the item. If so, the item state will be set
|
||||
* to its last persisted value.
|
||||
*
|
||||
* @param item the item to restore the state for
|
||||
*/
|
||||
private void restoreItemStateIfNeeded(Item item) {
|
||||
// get the last persisted state from the persistence service if no state is yet set
|
||||
if (UnDefType.NULL.equals(item.getState()) && item instanceof GenericItem) {
|
||||
List<PersistenceServiceContainer> matchingContainers = persistenceServiceContainers.values().stream() //
|
||||
.filter(container -> container.getPersistenceService() instanceof QueryablePersistenceService) //
|
||||
.filter(container -> container.getMatchingConfigurations(PersistenceStrategy.Globals.RESTORE)
|
||||
.anyMatch(itemConfig -> appliesToItem(itemConfig, item)))
|
||||
.toList();
|
||||
|
||||
for (PersistenceServiceContainer container : matchingContainers) {
|
||||
QueryablePersistenceService queryService = (QueryablePersistenceService) container
|
||||
.getPersistenceService();
|
||||
FilterCriteria filter = new FilterCriteria().setItemName(item.getName()).setPageSize(1);
|
||||
Iterable<HistoricItem> result = safeCaller.create(queryService, QueryablePersistenceService.class)
|
||||
.onTimeout(() -> {
|
||||
logger.warn("Querying persistence service '{}' to restore '{}' takes more than {}ms.",
|
||||
queryService.getId(), item.getName(), SafeCaller.DEFAULT_TIMEOUT);
|
||||
})
|
||||
.onException(e -> logger.error(
|
||||
"Exception occurred while querying persistence service '{}' to restore '{}': {}",
|
||||
queryService.getId(), item.getName(), e.getMessage(), e))
|
||||
.build().query(filter);
|
||||
if (result == null) {
|
||||
// in case of an exception or timeout, the safe caller returns null
|
||||
continue;
|
||||
}
|
||||
Iterator<HistoricItem> it = result.iterator();
|
||||
if (it.hasNext()) {
|
||||
HistoricItem historicItem = it.next();
|
||||
GenericItem genericItem = (GenericItem) item;
|
||||
genericItem.removeStateChangeListener(this);
|
||||
genericItem.setState(historicItem.getState());
|
||||
genericItem.addStateChangeListener(this);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Restored item state from '{}' for item '{}' -> '{}'",
|
||||
DateTimeFormatter.ISO_ZONED_DATE_TIME.format(historicItem.getTimestamp()),
|
||||
item.getName(), historicItem.getState());
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void startEventHandling(PersistenceServiceContainer serviceContainer) {
|
||||
serviceContainer.getMatchingConfigurations(PersistenceStrategy.Globals.RESTORE)
|
||||
.forEach(itemConfig -> getAllItems(itemConfig).forEach(this::restoreItemStateIfNeeded));
|
||||
serviceContainer.restoreStatesAndScheduleForecastJobs();
|
||||
serviceContainer.schedulePersistJobs();
|
||||
}
|
||||
|
||||
@ -304,16 +270,19 @@ public class PersistenceManager implements ItemRegistryChangeListener, StateChan
|
||||
|
||||
@Override
|
||||
public void added(Item item) {
|
||||
restoreItemStateIfNeeded(item);
|
||||
persistenceServiceContainers.values().forEach(container -> container.addItem(item));
|
||||
if (item instanceof GenericItem genericItem) {
|
||||
genericItem.addStateChangeListener(this);
|
||||
genericItem.addTimeSeriesListener(this);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removed(Item item) {
|
||||
persistenceServiceContainers.values().forEach(container -> container.removeItem(item.getName()));
|
||||
if (item instanceof GenericItem genericItem) {
|
||||
genericItem.removeStateChangeListener(this);
|
||||
genericItem.removeTimeSeriesListener(this);
|
||||
}
|
||||
}
|
||||
|
||||
@ -333,6 +302,50 @@ public class PersistenceManager implements ItemRegistryChangeListener, StateChan
|
||||
handleStateEvent(item, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void timeSeriesUpdated(Item item, TimeSeries timeSeries) {
|
||||
if (timeSeries.size() == 0) {
|
||||
// discard empty time series
|
||||
return;
|
||||
}
|
||||
persistenceServiceContainers.values().stream()
|
||||
.filter(psc -> psc.persistenceService instanceof ModifiablePersistenceService)
|
||||
.forEach(container -> Stream
|
||||
.concat(container.getMatchingConfigurations(UPDATE),
|
||||
container.getMatchingConfigurations(FORECAST))
|
||||
.distinct().filter(itemConfig -> appliesToItem(itemConfig, item)).forEach(itemConfig -> {
|
||||
ModifiablePersistenceService service = (ModifiablePersistenceService) container
|
||||
.getPersistenceService();
|
||||
// remove old values if replace selected
|
||||
if (timeSeries.getPolicy() == TimeSeries.Policy.REPLACE) {
|
||||
ZonedDateTime begin = timeSeries.getBegin().atZone(ZoneId.systemDefault());
|
||||
ZonedDateTime end = timeSeries.getEnd().atZone(ZoneId.systemDefault());
|
||||
FilterCriteria removeFilter = new FilterCriteria().setItemName(item.getName())
|
||||
.setBeginDate(begin).setEndDate(end);
|
||||
service.remove(removeFilter);
|
||||
ScheduledCompletableFuture<?> forecastJob = container.forecastJobs.get(item.getName());
|
||||
if (forecastJob != null && forecastJob.getScheduledTime().isAfter(begin)
|
||||
&& forecastJob.getScheduledTime().isBefore(end)) {
|
||||
forecastJob.cancel(true);
|
||||
container.forecastJobs.remove(item.getName());
|
||||
}
|
||||
}
|
||||
// update states
|
||||
timeSeries.getStates().forEach(
|
||||
e -> service.store(item, e.timestamp().atZone(ZoneId.systemDefault()), e.state()));
|
||||
timeSeries.getStates().filter(s -> s.timestamp().isAfter(Instant.now())).findFirst()
|
||||
.ifPresent(s -> {
|
||||
ScheduledCompletableFuture<?> forecastJob = container.forecastJobs
|
||||
.get(item.getName());
|
||||
if (forecastJob == null || forecastJob.getScheduledTime()
|
||||
.isAfter(s.timestamp().atZone(ZoneId.systemDefault()))) {
|
||||
container.scheduleNextForecastForItem(item.getName(), s.timestamp(),
|
||||
s.state());
|
||||
}
|
||||
});
|
||||
}));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onReadyMarkerAdded(ReadyMarker readyMarker) {
|
||||
ExecutorService scheduler = Executors.newSingleThreadExecutor(new NamedThreadFactory("persistenceManager"));
|
||||
@ -381,7 +394,9 @@ public class PersistenceManager implements ItemRegistryChangeListener, StateChan
|
||||
|
||||
private class PersistenceServiceContainer {
|
||||
private final PersistenceService persistenceService;
|
||||
private final Set<ScheduledCompletableFuture<?>> jobs = new HashSet<>();
|
||||
private final Set<ScheduledCompletableFuture<?>> persistJobs = new HashSet<>();
|
||||
private final Map<String, ScheduledCompletableFuture<?>> forecastJobs = new ConcurrentHashMap<>();
|
||||
private final Map<PersistenceStrategy, Collection<PersistenceItemConfiguration>> strategyCache = new ConcurrentHashMap<>();
|
||||
|
||||
private PersistenceServiceConfiguration configuration;
|
||||
|
||||
@ -403,19 +418,25 @@ public class PersistenceManager implements ItemRegistryChangeListener, StateChan
|
||||
*/
|
||||
public void setConfiguration(@Nullable PersistenceServiceConfiguration configuration) {
|
||||
cancelPersistJobs();
|
||||
cancelForecastJobs();
|
||||
this.configuration = Objects.requireNonNullElseGet(configuration, this::getDefaultConfig);
|
||||
strategyCache.clear();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all item configurations from this service that match a certain strategy
|
||||
*
|
||||
* @param strategy the {@link PersistenceStrategy} to look for
|
||||
* @return a @link Stream<PersistenceItemConfiguration>} of the result
|
||||
* @return a {@link Stream<PersistenceItemConfiguration>} of the result
|
||||
*/
|
||||
public Stream<PersistenceItemConfiguration> getMatchingConfigurations(PersistenceStrategy strategy) {
|
||||
boolean matchesDefaultStrategies = configuration.getDefaults().contains(strategy);
|
||||
return configuration.getConfigs().stream().filter(itemConfig -> itemConfig.strategies().contains(strategy)
|
||||
|| (itemConfig.strategies().isEmpty() && matchesDefaultStrategies));
|
||||
return Objects.requireNonNull(strategyCache.computeIfAbsent(strategy, s -> {
|
||||
boolean matchesDefaultStrategies = configuration.getDefaults().contains(strategy);
|
||||
return configuration.getConfigs().stream()
|
||||
.filter(itemConfig -> itemConfig.strategies().contains(strategy)
|
||||
|| (itemConfig.strategies().isEmpty() && matchesDefaultStrategies))
|
||||
.toList();
|
||||
}).stream());
|
||||
}
|
||||
|
||||
private PersistenceServiceConfiguration getDefaultConfig() {
|
||||
@ -430,11 +451,19 @@ public class PersistenceManager implements ItemRegistryChangeListener, StateChan
|
||||
* Cancel all scheduled cron jobs / strategies for this service
|
||||
*/
|
||||
public void cancelPersistJobs() {
|
||||
synchronized (jobs) {
|
||||
jobs.forEach(job -> job.cancel(true));
|
||||
jobs.clear();
|
||||
synchronized (persistJobs) {
|
||||
persistJobs.forEach(job -> job.cancel(true));
|
||||
persistJobs.clear();
|
||||
}
|
||||
logger.debug("Removed scheduled cron job for persistence service '{}'", configuration.getUID());
|
||||
logger.debug("Removed scheduled cron jobs for persistence service '{}'", configuration.getUID());
|
||||
}
|
||||
|
||||
public void cancelForecastJobs() {
|
||||
synchronized (forecastJobs) {
|
||||
forecastJobs.values().forEach(job -> job.cancel(true));
|
||||
forecastJobs.clear();
|
||||
}
|
||||
logger.debug("Removed scheduled forecast jobs for persistence service '{}'", configuration.getUID());
|
||||
}
|
||||
|
||||
/**
|
||||
@ -446,7 +475,7 @@ public class PersistenceManager implements ItemRegistryChangeListener, StateChan
|
||||
PersistenceCronStrategy cronStrategy = (PersistenceCronStrategy) strategy;
|
||||
String cronExpression = cronStrategy.getCronExpression();
|
||||
List<PersistenceItemConfiguration> itemConfigs = getMatchingConfigurations(strategy).toList();
|
||||
jobs.add(scheduler.schedule(() -> persistJob(itemConfigs), cronExpression));
|
||||
persistJobs.add(cronScheduler.schedule(() -> persistJob(itemConfigs), cronExpression));
|
||||
|
||||
logger.debug("Scheduled strategy {} with cron expression {} for service {}",
|
||||
cronStrategy.getName(), cronExpression, configuration.getUID());
|
||||
@ -454,6 +483,108 @@ public class PersistenceManager implements ItemRegistryChangeListener, StateChan
|
||||
});
|
||||
}
|
||||
|
||||
public void restoreStatesAndScheduleForecastJobs() {
|
||||
itemRegistry.getItems().forEach(this::addItem);
|
||||
}
|
||||
|
||||
public void addItem(Item item) {
|
||||
if (persistenceService instanceof QueryablePersistenceService) {
|
||||
if (UnDefType.NULL.equals(item.getState())
|
||||
&& (getMatchingConfigurations(RESTORE)
|
||||
.anyMatch(configuration -> appliesToItem(configuration, item)))
|
||||
|| getMatchingConfigurations(FORECAST)
|
||||
.anyMatch(configuration -> appliesToItem(configuration, item))) {
|
||||
restoreItemStateIfPossible(item);
|
||||
}
|
||||
if (getMatchingConfigurations(FORECAST).anyMatch(configuration -> appliesToItem(configuration, item))) {
|
||||
scheduleNextPersistedForecastForItem(item.getName());
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void removeItem(String itemName) {
|
||||
ScheduledCompletableFuture<?> job = forecastJobs.remove(itemName);
|
||||
if (job != null) {
|
||||
job.cancel(true);
|
||||
}
|
||||
}
|
||||
|
||||
private void restoreItemStateIfPossible(Item item) {
|
||||
QueryablePersistenceService queryService = (QueryablePersistenceService) persistenceService;
|
||||
|
||||
FilterCriteria filter = new FilterCriteria().setItemName(item.getName()).setEndDate(ZonedDateTime.now())
|
||||
.setPageSize(1);
|
||||
Iterable<HistoricItem> result = safeCaller.create(queryService, QueryablePersistenceService.class)
|
||||
.onTimeout(
|
||||
() -> logger.warn("Querying persistence service '{}' to restore '{}' takes more than {}ms.",
|
||||
queryService.getId(), item.getName(), SafeCaller.DEFAULT_TIMEOUT))
|
||||
.onException(e -> logger.error(
|
||||
"Exception occurred while querying persistence service '{}' to restore '{}': {}",
|
||||
queryService.getId(), item.getName(), e.getMessage(), e))
|
||||
.build().query(filter);
|
||||
if (result == null) {
|
||||
// in case of an exception or timeout, the safe caller returns null
|
||||
return;
|
||||
}
|
||||
Iterator<HistoricItem> it = result.iterator();
|
||||
if (it.hasNext()) {
|
||||
HistoricItem historicItem = it.next();
|
||||
GenericItem genericItem = (GenericItem) item;
|
||||
if (!UnDefType.NULL.equals(item.getState())) {
|
||||
// someone else already restored the state or a new state was set
|
||||
return;
|
||||
}
|
||||
genericItem.removeStateChangeListener(PersistenceManager.this);
|
||||
genericItem.setState(historicItem.getState());
|
||||
genericItem.addStateChangeListener(PersistenceManager.this);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Restored item state from '{}' for item '{}' -> '{}'",
|
||||
DateTimeFormatter.ISO_ZONED_DATE_TIME.format(historicItem.getTimestamp()), item.getName(),
|
||||
historicItem.getState());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void scheduleNextForecastForItem(String itemName, Instant time, State state) {
|
||||
ScheduledFuture<?> oldJob = forecastJobs.remove(itemName);
|
||||
if (oldJob != null) {
|
||||
oldJob.cancel(true);
|
||||
}
|
||||
forecastJobs.put(itemName, scheduler.at(() -> restoreItemState(itemName, state), time));
|
||||
logger.trace("Scheduled forecasted value for {} at {}", itemName, time);
|
||||
}
|
||||
|
||||
public void scheduleNextPersistedForecastForItem(String itemName) {
|
||||
Item item = itemRegistry.get(itemName);
|
||||
if (item instanceof GenericItem) {
|
||||
QueryablePersistenceService queryService = (QueryablePersistenceService) persistenceService;
|
||||
FilterCriteria filter = new FilterCriteria().setItemName(itemName).setBeginDate(ZonedDateTime.now())
|
||||
.setOrdering(ASCENDING);
|
||||
Iterator<HistoricItem> result = safeCaller.create(queryService, QueryablePersistenceService.class)
|
||||
.onTimeout(() -> logger.warn("Querying persistence service '{}' takes more than {}ms.",
|
||||
queryService.getId(), SafeCaller.DEFAULT_TIMEOUT))
|
||||
.onException(e -> logger.error("Exception occurred while querying persistence service '{}': {}",
|
||||
queryService.getId(), e.getMessage(), e))
|
||||
.build().query(filter).iterator();
|
||||
while (result.hasNext()) {
|
||||
HistoricItem next = result.next();
|
||||
if (next.getTimestamp().isAfter(ZonedDateTime.now())) {
|
||||
scheduleNextForecastForItem(itemName, next.getTimestamp().toInstant(), next.getState());
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void restoreItemState(String itemName, State state) {
|
||||
Item item = itemRegistry.get(itemName);
|
||||
if (item != null) {
|
||||
((GenericItem) item).setState(state);
|
||||
}
|
||||
scheduleNextPersistedForecastForItem(itemName);
|
||||
}
|
||||
|
||||
private void persistJob(List<PersistenceItemConfiguration> itemConfigs) {
|
||||
itemConfigs.forEach(itemConfig -> {
|
||||
for (Item item : getAllItems(itemConfig)) {
|
||||
|
@ -29,9 +29,12 @@ public class PersistenceStrategy {
|
||||
public static final PersistenceStrategy UPDATE = new PersistenceStrategy("everyUpdate");
|
||||
public static final PersistenceStrategy CHANGE = new PersistenceStrategy("everyChange");
|
||||
public static final PersistenceStrategy RESTORE = new PersistenceStrategy("restoreOnStartup");
|
||||
|
||||
public static final Map<String, PersistenceStrategy> STRATEGIES = Map.of(UPDATE.name, UPDATE, CHANGE.name,
|
||||
CHANGE, RESTORE.name, RESTORE);
|
||||
public static final PersistenceStrategy FORECAST = new PersistenceStrategy("forecast");
|
||||
public static final Map<String, PersistenceStrategy> STRATEGIES = Map.of( //
|
||||
UPDATE.name, UPDATE, //
|
||||
CHANGE.name, CHANGE, //
|
||||
RESTORE.name, RESTORE, //
|
||||
FORECAST.name, FORECAST);
|
||||
}
|
||||
|
||||
private final String name;
|
||||
|
@ -14,17 +14,20 @@ package org.openhab.core.persistence.internal;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.clearInvocations;
|
||||
import static org.mockito.Mockito.timeout;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.verifyNoMoreInteractions;
|
||||
import static org.mockito.Mockito.when;
|
||||
import static org.mockito.ArgumentMatchers.*;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.time.Instant;
|
||||
import java.time.ZoneId;
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
|
||||
import org.eclipse.jdt.annotation.NonNullByDefault;
|
||||
import org.eclipse.jdt.annotation.Nullable;
|
||||
@ -32,6 +35,7 @@ import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.mockito.InOrder;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
import org.mockito.junit.jupiter.MockitoSettings;
|
||||
@ -39,14 +43,18 @@ import org.mockito.quality.Strictness;
|
||||
import org.openhab.core.common.SafeCaller;
|
||||
import org.openhab.core.common.SafeCallerBuilder;
|
||||
import org.openhab.core.items.GroupItem;
|
||||
import org.openhab.core.items.Item;
|
||||
import org.openhab.core.items.ItemNotFoundException;
|
||||
import org.openhab.core.items.ItemRegistry;
|
||||
import org.openhab.core.library.items.NumberItem;
|
||||
import org.openhab.core.library.items.StringItem;
|
||||
import org.openhab.core.library.types.DecimalType;
|
||||
import org.openhab.core.library.types.StringType;
|
||||
import org.openhab.core.persistence.FilterCriteria;
|
||||
import org.openhab.core.persistence.HistoricItem;
|
||||
import org.openhab.core.persistence.ModifiablePersistenceService;
|
||||
import org.openhab.core.persistence.PersistenceItemConfiguration;
|
||||
import org.openhab.core.persistence.PersistenceItemInfo;
|
||||
import org.openhab.core.persistence.PersistenceService;
|
||||
import org.openhab.core.persistence.QueryablePersistenceService;
|
||||
import org.openhab.core.persistence.config.PersistenceAllConfig;
|
||||
@ -61,10 +69,12 @@ import org.openhab.core.persistence.strategy.PersistenceCronStrategy;
|
||||
import org.openhab.core.persistence.strategy.PersistenceStrategy;
|
||||
import org.openhab.core.scheduler.CronScheduler;
|
||||
import org.openhab.core.scheduler.ScheduledCompletableFuture;
|
||||
import org.openhab.core.scheduler.Scheduler;
|
||||
import org.openhab.core.scheduler.SchedulerRunnable;
|
||||
import org.openhab.core.service.ReadyMarker;
|
||||
import org.openhab.core.service.ReadyService;
|
||||
import org.openhab.core.types.State;
|
||||
import org.openhab.core.types.TimeSeries;
|
||||
import org.openhab.core.types.UnDefType;
|
||||
|
||||
/**
|
||||
@ -108,7 +118,10 @@ public class PersistenceManagerTest {
|
||||
private static final String TEST_PERSISTENCE_SERVICE_ID = "testPersistenceService";
|
||||
private static final String TEST_QUERYABLE_PERSISTENCE_SERVICE_ID = "testQueryablePersistenceService";
|
||||
|
||||
private static final String TEST_MODIFIABLE_PERSISTENCE_SERVICE_ID = "testModifiablePersistenceService";
|
||||
|
||||
private @NonNullByDefault({}) @Mock CronScheduler cronSchedulerMock;
|
||||
private @NonNullByDefault({}) @Mock Scheduler schedulerMock;
|
||||
private @NonNullByDefault({}) @Mock ScheduledCompletableFuture<Void> scheduledFutureMock;
|
||||
private @NonNullByDefault({}) @Mock ItemRegistry itemRegistryMock;
|
||||
private @NonNullByDefault({}) @Mock SafeCaller safeCallerMock;
|
||||
@ -118,6 +131,7 @@ public class PersistenceManagerTest {
|
||||
|
||||
private @NonNullByDefault({}) @Mock PersistenceService persistenceServiceMock;
|
||||
private @NonNullByDefault({}) @Mock QueryablePersistenceService queryablePersistenceServiceMock;
|
||||
private @NonNullByDefault({}) @Mock ModifiablePersistenceService modifiablePersistenceServiceMock;
|
||||
|
||||
private @NonNullByDefault({}) PersistenceManager manager;
|
||||
|
||||
@ -139,13 +153,15 @@ public class PersistenceManagerTest {
|
||||
when(persistenceServiceMock.getId()).thenReturn(TEST_PERSISTENCE_SERVICE_ID);
|
||||
when(queryablePersistenceServiceMock.getId()).thenReturn(TEST_QUERYABLE_PERSISTENCE_SERVICE_ID);
|
||||
when(queryablePersistenceServiceMock.query(any())).thenReturn(List.of(TEST_HISTORIC_ITEM));
|
||||
when(modifiablePersistenceServiceMock.getId()).thenReturn(TEST_MODIFIABLE_PERSISTENCE_SERVICE_ID);
|
||||
|
||||
manager = new PersistenceManager(cronSchedulerMock, itemRegistryMock, safeCallerMock, readyServiceMock,
|
||||
persistenceServiceConfigurationRegistryMock);
|
||||
manager = new PersistenceManager(cronSchedulerMock, schedulerMock, itemRegistryMock, safeCallerMock,
|
||||
readyServiceMock, persistenceServiceConfigurationRegistryMock);
|
||||
manager.addPersistenceService(persistenceServiceMock);
|
||||
manager.addPersistenceService(queryablePersistenceServiceMock);
|
||||
manager.addPersistenceService(modifiablePersistenceServiceMock);
|
||||
|
||||
clearInvocations(persistenceServiceMock, queryablePersistenceServiceMock);
|
||||
clearInvocations(persistenceServiceMock, queryablePersistenceServiceMock, modifiablePersistenceServiceMock);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -299,6 +315,82 @@ public class PersistenceManagerTest {
|
||||
verifyNoMoreInteractions(persistenceServiceMock);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void storeTimeSeriesAndForecastsScheduled() {
|
||||
List<ScheduledCompletableFuture<?>> futures = new ArrayList<>();
|
||||
TestModifiablePersistenceService service = spy(new TestModifiablePersistenceService());
|
||||
manager.addPersistenceService(service);
|
||||
|
||||
when(schedulerMock.at(any(SchedulerRunnable.class), any(Instant.class))).thenAnswer(i -> {
|
||||
ScheduledCompletableFuture<?> future = mock(ScheduledCompletableFuture.class);
|
||||
when(future.getScheduledTime()).thenReturn(((Instant) i.getArgument(1)).atZone(ZoneId.systemDefault()));
|
||||
futures.add(future);
|
||||
return future;
|
||||
});
|
||||
|
||||
addConfiguration(TestModifiablePersistenceService.ID, new PersistenceAllConfig(),
|
||||
PersistenceStrategy.Globals.FORECAST, null);
|
||||
|
||||
Instant time1 = Instant.now().minusSeconds(1000);
|
||||
Instant time2 = Instant.now().plusSeconds(1000);
|
||||
Instant time3 = Instant.now().plusSeconds(2000);
|
||||
Instant time4 = Instant.now().plusSeconds(3000);
|
||||
|
||||
// add elements
|
||||
TimeSeries timeSeries = new TimeSeries(TimeSeries.Policy.ADD);
|
||||
timeSeries.add(time1, new StringType("one"));
|
||||
timeSeries.add(time2, new StringType("two"));
|
||||
timeSeries.add(time3, new StringType("three"));
|
||||
timeSeries.add(time4, new StringType("four"));
|
||||
|
||||
manager.timeSeriesUpdated(TEST_ITEM, timeSeries);
|
||||
InOrder inOrder = inOrder(service, schedulerMock);
|
||||
|
||||
// verify elements are stored
|
||||
timeSeries.getStates().forEach(entry -> inOrder.verify(service).store(any(Item.class),
|
||||
eq(entry.timestamp().atZone(ZoneId.systemDefault())), eq(entry.state())));
|
||||
|
||||
// first element not scheduled, because it is in the past, check if second is scheduled
|
||||
inOrder.verify(schedulerMock).at(any(SchedulerRunnable.class), eq(time2));
|
||||
inOrder.verifyNoMoreInteractions();
|
||||
|
||||
// replace elements
|
||||
TimeSeries timeSeries2 = new TimeSeries(TimeSeries.Policy.REPLACE);
|
||||
timeSeries2.add(time3, new StringType("three2"));
|
||||
timeSeries2.add(time4, new StringType("four2"));
|
||||
|
||||
manager.timeSeriesUpdated(TEST_ITEM, timeSeries2);
|
||||
|
||||
// verify removal of old elements from service
|
||||
ArgumentCaptor<FilterCriteria> filterCaptor = ArgumentCaptor.forClass(FilterCriteria.class);
|
||||
inOrder.verify(service).remove(filterCaptor.capture());
|
||||
FilterCriteria filterCriteria = filterCaptor.getValue();
|
||||
assertThat(filterCriteria.getItemName(), is(TEST_ITEM_NAME));
|
||||
assertThat(filterCriteria.getBeginDate(), is(time3.atZone(ZoneId.systemDefault())));
|
||||
assertThat(filterCriteria.getEndDate(), is(time4.atZone(ZoneId.systemDefault())));
|
||||
|
||||
// verify restore future is not cancelled
|
||||
verify(futures.get(0), never()).cancel(anyBoolean());
|
||||
|
||||
// verify new values are stored
|
||||
inOrder.verify(service, times(2)).store(any(Item.class), any(ZonedDateTime.class), any(State.class));
|
||||
inOrder.verifyNoMoreInteractions();
|
||||
|
||||
// try adding a new element in front and check it is correctly scheduled
|
||||
Instant time5 = Instant.now().plusSeconds(500);
|
||||
// add elements
|
||||
TimeSeries timeSeries3 = new TimeSeries(TimeSeries.Policy.ADD);
|
||||
timeSeries3.add(time5, new StringType("five"));
|
||||
|
||||
manager.timeSeriesUpdated(TEST_ITEM, timeSeries3);
|
||||
// verify old restore future is cancelled
|
||||
inOrder.verify(service, times(1)).store(any(Item.class), any(ZonedDateTime.class), any(State.class));
|
||||
verify(futures.get(0)).cancel(true);
|
||||
|
||||
// verify new restore future is properly created
|
||||
inOrder.verify(schedulerMock).at(any(SchedulerRunnable.class), eq(time5));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void cronStrategyIsScheduledAndCancelledAndPersistsValue() throws Exception {
|
||||
ArgumentCaptor<SchedulerRunnable> runnableCaptor = ArgumentCaptor.forClass(SchedulerRunnable.class);
|
||||
@ -402,4 +494,85 @@ public class PersistenceManagerTest {
|
||||
|
||||
return serviceConfiguration;
|
||||
}
|
||||
|
||||
private static class TestModifiablePersistenceService implements ModifiablePersistenceService {
|
||||
public static final String ID = "TMPS";
|
||||
private final Map<ZonedDateTime, State> states = new HashMap<>();
|
||||
|
||||
@Override
|
||||
public void store(Item item, ZonedDateTime date, State state) {
|
||||
states.put(date, state);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void store(Item item, ZonedDateTime date, State state, @Nullable String alias) {
|
||||
store(item, date, state);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean remove(FilterCriteria filter) throws IllegalArgumentException {
|
||||
ZonedDateTime begin = Objects.requireNonNull(filter.getBeginDate());
|
||||
ZonedDateTime end = Objects.requireNonNull(filter.getEndDate());
|
||||
List<ZonedDateTime> keys = states.keySet().stream().filter(t -> t.isAfter(begin) && t.isBefore(end))
|
||||
.toList();
|
||||
keys.forEach(states::remove);
|
||||
return !keys.isEmpty();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getId() {
|
||||
return ID;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getLabel(@Nullable Locale locale) {
|
||||
return ID;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void store(Item item) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void store(Item item, @Nullable String alias) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<PersistenceStrategy> getDefaultStrategies() {
|
||||
return List.of();
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public Iterable<HistoricItem> query(FilterCriteria filter) {
|
||||
ZonedDateTime begin = Objects.requireNonNull(filter.getBeginDate());
|
||||
ZonedDateTime end = Objects.requireNonNull(filter.getEndDate());
|
||||
List<ZonedDateTime> keys = states.keySet().stream().filter(t -> t.isAfter(begin) && t.isBefore(end))
|
||||
.toList();
|
||||
return (Iterable<HistoricItem>) states.entrySet().stream().filter(e -> keys.contains(e.getKey()))
|
||||
.map(e -> new HistoricItem() {
|
||||
@Override
|
||||
public ZonedDateTime getTimestamp() {
|
||||
return e.getKey();
|
||||
}
|
||||
|
||||
@Override
|
||||
public State getState() {
|
||||
return e.getValue();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return "item";
|
||||
}
|
||||
}).iterator();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<PersistenceItemInfo> getItemInfo() {
|
||||
return Set.of();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -48,6 +48,7 @@ public class MagicBindingConstants {
|
||||
public static final ThingTypeUID THING_TYPE_DYNAMIC_STATE_DESCRIPTION = new ThingTypeUID(BINDING_ID,
|
||||
"dynamic-state-description");
|
||||
public static final ThingTypeUID THING_TYPE_ONLINE_OFFLINE = new ThingTypeUID(BINDING_ID, "online-offline");
|
||||
public static final ThingTypeUID THING_TYPE_TIMESERIES = new ThingTypeUID(BINDING_ID, "timeseries");
|
||||
|
||||
// bridged things
|
||||
public static final ThingTypeUID THING_TYPE_BRIDGE_1 = new ThingTypeUID(BINDING_ID, "magic-bridge1");
|
||||
@ -67,7 +68,7 @@ public class MagicBindingConstants {
|
||||
public static final String CHANNEL_BATTERY_LEVEL = "battery-level";
|
||||
public static final String CHANNEL_SYSTEM_COMMAND = "systemcommand";
|
||||
public static final String CHANNEL_SIGNAL_STRENGTH = "signal-strength";
|
||||
|
||||
public static final String CHANNEL_FORECAST = "forecast";
|
||||
// Firmware update needed models
|
||||
public static final String UPDATE_MODEL_PROPERTY = "updateModel";
|
||||
|
||||
|
@ -0,0 +1,111 @@
|
||||
/**
|
||||
* Copyright (c) 2010-2023 Contributors to the openHAB project
|
||||
*
|
||||
* See the NOTICE file(s) distributed with this work for additional
|
||||
* information.
|
||||
*
|
||||
* This program and the accompanying materials are made available under the
|
||||
* terms of the Eclipse Public License 2.0 which is available at
|
||||
* http://www.eclipse.org/legal/epl-2.0
|
||||
*
|
||||
* SPDX-License-Identifier: EPL-2.0
|
||||
*/
|
||||
package org.openhab.core.magic.binding.handler;
|
||||
|
||||
import static org.openhab.core.magic.binding.MagicBindingConstants.CHANNEL_FORECAST;
|
||||
import static org.openhab.core.types.TimeSeries.Policy.ADD;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.eclipse.jdt.annotation.NonNullByDefault;
|
||||
import org.eclipse.jdt.annotation.Nullable;
|
||||
import org.openhab.core.library.types.DecimalType;
|
||||
import org.openhab.core.thing.ChannelUID;
|
||||
import org.openhab.core.thing.Thing;
|
||||
import org.openhab.core.thing.ThingStatus;
|
||||
import org.openhab.core.thing.binding.BaseThingHandler;
|
||||
import org.openhab.core.types.Command;
|
||||
import org.openhab.core.types.TimeSeries;
|
||||
|
||||
/**
|
||||
* The {@link MagicTimeSeriesHandler} is capable of providing a series of different forecasts
|
||||
*
|
||||
* @author Jan N. Klug - Initial contribution
|
||||
*/
|
||||
@NonNullByDefault
|
||||
public class MagicTimeSeriesHandler extends BaseThingHandler {
|
||||
|
||||
private @Nullable ScheduledFuture<?> scheduledJob;
|
||||
private Configuration configuration = new Configuration();
|
||||
|
||||
public MagicTimeSeriesHandler(Thing thing) {
|
||||
super(thing);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleCommand(ChannelUID channelUID, Command command) {
|
||||
// no-op
|
||||
}
|
||||
|
||||
@Override
|
||||
public void initialize() {
|
||||
configuration = getConfigAs(Configuration.class);
|
||||
startScheduledJob();
|
||||
|
||||
updateStatus(ThingStatus.ONLINE);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void dispose() {
|
||||
stopScheduledJob();
|
||||
}
|
||||
|
||||
private void startScheduledJob() {
|
||||
ScheduledFuture<?> localScheduledJob = scheduledJob;
|
||||
if (localScheduledJob == null || localScheduledJob.isCancelled()) {
|
||||
scheduledJob = scheduler.scheduleWithFixedDelay(() -> {
|
||||
Instant now = Instant.now();
|
||||
TimeSeries timeSeries = new TimeSeries(ADD);
|
||||
Duration stepSize = Duration.ofSeconds(configuration.interval / configuration.count);
|
||||
double range = configuration.max - configuration.min;
|
||||
for (int i = 1; i <= configuration.count; i++) {
|
||||
double value = switch (configuration.type) {
|
||||
case RND -> Math.random() * range + configuration.min;
|
||||
case ASC -> (range / configuration.count) * i + configuration.min;
|
||||
case DESC -> configuration.max + (range / configuration.count) * i;
|
||||
};
|
||||
timeSeries.add(now.plus(stepSize.multipliedBy(i)), new DecimalType(value));
|
||||
}
|
||||
sendTimeSeries(CHANNEL_FORECAST, timeSeries);
|
||||
}, 0, configuration.interval, TimeUnit.SECONDS);
|
||||
}
|
||||
}
|
||||
|
||||
private void stopScheduledJob() {
|
||||
ScheduledFuture<?> localScheduledJob = scheduledJob;
|
||||
if (localScheduledJob != null && !localScheduledJob.isCancelled()) {
|
||||
localScheduledJob.cancel(true);
|
||||
scheduledJob = null;
|
||||
}
|
||||
}
|
||||
|
||||
public static class Configuration {
|
||||
public int interval = 600;
|
||||
public Type type = Type.RND;
|
||||
public double min = 0.0;
|
||||
public double max = 100.0;
|
||||
public int count = 10;
|
||||
|
||||
public Configuration() {
|
||||
}
|
||||
}
|
||||
|
||||
public enum Type {
|
||||
RND,
|
||||
ASC,
|
||||
DESC
|
||||
}
|
||||
}
|
@ -38,6 +38,7 @@ import org.openhab.core.magic.binding.handler.MagicOnlineOfflineHandler;
|
||||
import org.openhab.core.magic.binding.handler.MagicPlayerHandler;
|
||||
import org.openhab.core.magic.binding.handler.MagicRollershutterHandler;
|
||||
import org.openhab.core.magic.binding.handler.MagicThermostatThingHandler;
|
||||
import org.openhab.core.magic.binding.handler.MagicTimeSeriesHandler;
|
||||
import org.openhab.core.thing.Bridge;
|
||||
import org.openhab.core.thing.Thing;
|
||||
import org.openhab.core.thing.ThingTypeUID;
|
||||
@ -62,8 +63,8 @@ public class MagicHandlerFactory extends BaseThingHandlerFactory {
|
||||
THING_TYPE_CONTACT_SENSOR, THING_TYPE_CONFIG_THING, THING_TYPE_DELAYED_THING, THING_TYPE_LOCATION,
|
||||
THING_TYPE_THERMOSTAT, THING_TYPE_FIRMWARE_UPDATE, THING_TYPE_BRIDGE_1, THING_TYPE_BRIDGE_2,
|
||||
THING_TYPE_BRIDGED_THING, THING_TYPE_CHATTY_THING, THING_TYPE_ROLLERSHUTTER, THING_TYPE_PLAYER,
|
||||
THING_TYPE_IMAGE, THING_TYPE_ACTION_MODULE, THING_TYPE_DYNAMIC_STATE_DESCRIPTION,
|
||||
THING_TYPE_ONLINE_OFFLINE);
|
||||
THING_TYPE_IMAGE, THING_TYPE_ACTION_MODULE, THING_TYPE_DYNAMIC_STATE_DESCRIPTION, THING_TYPE_ONLINE_OFFLINE,
|
||||
THING_TYPE_TIMESERIES);
|
||||
|
||||
private final MagicDynamicCommandDescriptionProvider commandDescriptionProvider;
|
||||
private final MagicDynamicStateDescriptionProvider stateDescriptionProvider;
|
||||
@ -125,6 +126,8 @@ public class MagicHandlerFactory extends BaseThingHandlerFactory {
|
||||
return new MagicOnlineOfflineHandler(thing);
|
||||
} else if (THING_TYPE_BRIDGE_1.equals(thingTypeUID) || THING_TYPE_BRIDGE_2.equals(thingTypeUID)) {
|
||||
return new MagicBridgeHandler((Bridge) thing);
|
||||
} else if (THING_TYPE_TIMESERIES.equals(thingTypeUID)) {
|
||||
return new MagicTimeSeriesHandler(thing);
|
||||
}
|
||||
|
||||
return null;
|
||||
|
@ -160,4 +160,8 @@
|
||||
<category>time</category>
|
||||
</channel-type>
|
||||
|
||||
<channel-type id="forecast">
|
||||
<item-type>Number</item-type>
|
||||
<label>Forecast</label>
|
||||
</channel-type>
|
||||
</thing:thing-descriptions>
|
||||
|
@ -234,4 +234,44 @@
|
||||
</parameter>
|
||||
</config-description>
|
||||
</thing-type>
|
||||
|
||||
<thing-type id="timeseries">
|
||||
<label>Magic Timeseries Updates Thing</label>
|
||||
<description>Demonstrates the use of TimeSeries as forecast.</description>
|
||||
<channels>
|
||||
<channel id="forecast" typeId="forecast"/>
|
||||
</channels>
|
||||
<config-description>
|
||||
<parameter name="interval" type="integer" min="60" unit="s">
|
||||
<label>Interval</label>
|
||||
<description>The interval to send the generated data.</description>
|
||||
<default>600</default>
|
||||
</parameter>
|
||||
<parameter name="type" type="text">
|
||||
<label>Type</label>
|
||||
<description>How to generate the values.</description>
|
||||
<options>
|
||||
<option value="RND">Random</option>
|
||||
<option value="ASC">Ascending</option>
|
||||
<option value="DESC">Descending</option>
|
||||
</options>
|
||||
<default>RND</default>
|
||||
</parameter>
|
||||
<parameter name="min" type="decimal">
|
||||
<label>Minimum</label>
|
||||
<description>The minimum value.</description>
|
||||
<default>0</default>
|
||||
</parameter>
|
||||
<parameter name="max" type="decimal">
|
||||
<label>Maximum</label>
|
||||
<description>The maximum value.</description>
|
||||
<default>100</default>
|
||||
</parameter>
|
||||
<parameter name="count" type="integer" min="0">
|
||||
<label>Value Count</label>
|
||||
<description>The number of values to generate.</description>
|
||||
<default>10</default>
|
||||
</parameter>
|
||||
</config-description>
|
||||
</thing-type>
|
||||
</thing:thing-descriptions>
|
||||
|
@ -41,6 +41,7 @@ import org.openhab.core.thing.util.ThingHandlerHelper;
|
||||
import org.openhab.core.types.Command;
|
||||
import org.openhab.core.types.RefreshType;
|
||||
import org.openhab.core.types.State;
|
||||
import org.openhab.core.types.TimeSeries;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@ -60,6 +61,7 @@ import org.slf4j.LoggerFactory;
|
||||
* @author Stefan Bußweiler - Added new thing status handling, refactorings thing/bridge life cycle
|
||||
* @author Kai Kreuzer - Refactored isLinked method to not use deprecated functions anymore
|
||||
* @author Christoph Weitkamp - Moved OSGI ServiceTracker from BaseThingHandler to ThingHandlerCallback
|
||||
* @author Jan N. Klug - added time series support
|
||||
*/
|
||||
@NonNullByDefault
|
||||
public abstract class BaseThingHandler implements ThingHandler {
|
||||
@ -287,6 +289,36 @@ public abstract class BaseThingHandler implements ThingHandler {
|
||||
updateState(channelUID, state);
|
||||
}
|
||||
|
||||
/**
|
||||
* Send a time series to the channel. This can be used to transfer historic data or forecasts.
|
||||
*
|
||||
* @param channelUID unique id of the channel
|
||||
* @param timeSeries the {@link TimeSeries} that is sent
|
||||
*/
|
||||
protected void sendTimeSeries(ChannelUID channelUID, TimeSeries timeSeries) {
|
||||
synchronized (this) {
|
||||
ThingHandlerCallback callback1 = this.callback;
|
||||
if (callback1 != null) {
|
||||
callback1.sendTimeSeries(channelUID, timeSeries);
|
||||
} else {
|
||||
logger.warn(
|
||||
"Handler {} of thing {} tried sending to channel {} although the handler was already disposed.",
|
||||
this.getClass().getSimpleName(), channelUID.getThingUID(), channelUID.getId());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Send a time series to the channel. This can be used to transfer historic data or forecasts.
|
||||
*
|
||||
* @param channelID id of the channel
|
||||
* @param timeSeries the {@link TimeSeries} that is sent
|
||||
*/
|
||||
protected void sendTimeSeries(String channelID, TimeSeries timeSeries) {
|
||||
ChannelUID channelUID = new ChannelUID(this.getThing().getUID(), channelID);
|
||||
sendTimeSeries(channelUID, timeSeries);
|
||||
}
|
||||
|
||||
/**
|
||||
* Emits an event for the given channel.
|
||||
*
|
||||
|
@ -35,6 +35,7 @@ import org.openhab.core.thing.type.ChannelType;
|
||||
import org.openhab.core.thing.type.ChannelTypeUID;
|
||||
import org.openhab.core.types.Command;
|
||||
import org.openhab.core.types.State;
|
||||
import org.openhab.core.types.TimeSeries;
|
||||
|
||||
/**
|
||||
* {@link ThingHandlerCallback} is callback interface for {@link ThingHandler}s. The implementation of a
|
||||
@ -65,6 +66,14 @@ public interface ThingHandlerCallback {
|
||||
*/
|
||||
void postCommand(ChannelUID channelUID, Command command);
|
||||
|
||||
/**
|
||||
* Informs about a time series, whcihs is send from the channel.
|
||||
*
|
||||
* @param channelUID channel UID
|
||||
* @param timeSeries time series
|
||||
*/
|
||||
void sendTimeSeries(ChannelUID channelUID, TimeSeries timeSeries);
|
||||
|
||||
/**
|
||||
* Informs about an updated status of a thing.
|
||||
*
|
||||
|
@ -71,9 +71,11 @@ import org.openhab.core.thing.profiles.ProfileContext;
|
||||
import org.openhab.core.thing.profiles.ProfileFactory;
|
||||
import org.openhab.core.thing.profiles.ProfileTypeUID;
|
||||
import org.openhab.core.thing.profiles.StateProfile;
|
||||
import org.openhab.core.thing.profiles.TimeSeriesProfile;
|
||||
import org.openhab.core.thing.profiles.TriggerProfile;
|
||||
import org.openhab.core.types.Command;
|
||||
import org.openhab.core.types.State;
|
||||
import org.openhab.core.types.TimeSeries;
|
||||
import org.openhab.core.types.Type;
|
||||
import org.osgi.service.component.annotations.Activate;
|
||||
import org.osgi.service.component.annotations.Component;
|
||||
@ -90,6 +92,7 @@ import org.slf4j.LoggerFactory;
|
||||
* It mainly mediates commands, state updates and triggers from ThingHandlers to the framework and vice versa.
|
||||
*
|
||||
* @author Simon Kaufmann - Initial contribution factored out of ThingManger
|
||||
* @author Jan N. Klug - Added time series support
|
||||
*/
|
||||
@NonNullByDefault
|
||||
@Component(service = { EventSubscriber.class, CommunicationManager.class }, immediate = true)
|
||||
@ -520,6 +523,20 @@ public class CommunicationManager implements EventSubscriber, RegistryChangeList
|
||||
});
|
||||
}
|
||||
|
||||
public void sendTimeSeries(ChannelUID channelUID, TimeSeries timeSeries) {
|
||||
ThingUID thingUID = channelUID.getThingUID();
|
||||
Thing thing = thingRegistry.get(thingUID);
|
||||
handleCallFromHandler(channelUID, thing, profile -> {
|
||||
// TODO: check which profiles need enhancements
|
||||
if (profile instanceof TimeSeriesProfile timeSeriesProfile) {
|
||||
timeSeriesProfile.onTimeSeriesFromHandler(timeSeries);
|
||||
} else {
|
||||
logger.warn("Profile '{}' on channel {} does not support time series.", profile.getProfileTypeUID(),
|
||||
channelUID);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void handleCallFromHandler(ChannelUID channelUID, @Nullable Thing thing, Consumer<Profile> action) {
|
||||
itemChannelLinkRegistry.getLinks(channelUID).forEach(link -> {
|
||||
final Item item = getItem(link.getItemName());
|
||||
|
@ -42,6 +42,7 @@ import org.openhab.core.thing.type.ThingType;
|
||||
import org.openhab.core.thing.util.ThingHandlerHelper;
|
||||
import org.openhab.core.types.Command;
|
||||
import org.openhab.core.types.State;
|
||||
import org.openhab.core.types.TimeSeries;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@ -70,6 +71,11 @@ class ThingHandlerCallbackImpl implements ThingHandlerCallback {
|
||||
thingManager.communicationManager.postCommand(channelUID, command);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendTimeSeries(ChannelUID channelUID, TimeSeries timeSeries) {
|
||||
thingManager.communicationManager.sendTimeSeries(channelUID, timeSeries);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelTriggered(Thing thing, ChannelUID channelUID, String event) {
|
||||
thingManager.communicationManager.channelTriggered(thing, channelUID, event);
|
||||
|
@ -33,6 +33,7 @@ import org.openhab.core.thing.profiles.ProfileCallback;
|
||||
import org.openhab.core.thing.util.ThingHandlerHelper;
|
||||
import org.openhab.core.types.Command;
|
||||
import org.openhab.core.types.State;
|
||||
import org.openhab.core.types.TimeSeries;
|
||||
import org.openhab.core.types.TypeParser;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@ -145,6 +146,19 @@ public class ProfileCallbackImpl implements ProfileCallback {
|
||||
ItemEventFactory.createStateEvent(link.getItemName(), acceptedState, link.getLinkedUID().toString()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendTimeSeries(TimeSeries timeSeries) {
|
||||
Item item = itemProvider.apply(link.getItemName());
|
||||
if (item == null) {
|
||||
logger.warn("Cannot send time series event '{}' for item '{}', because no item could be found.", timeSeries,
|
||||
link.getItemName());
|
||||
return;
|
||||
}
|
||||
|
||||
eventPublisher.post(
|
||||
ItemEventFactory.createTimeSeriesEvent(link.getItemName(), timeSeries, link.getLinkedUID().toString()));
|
||||
}
|
||||
|
||||
@FunctionalInterface
|
||||
public interface AcceptedTypeConverter {
|
||||
@Nullable
|
||||
|
@ -16,10 +16,11 @@ import org.eclipse.jdt.annotation.NonNullByDefault;
|
||||
import org.openhab.core.thing.binding.ThingHandler;
|
||||
import org.openhab.core.thing.profiles.ProfileCallback;
|
||||
import org.openhab.core.thing.profiles.ProfileTypeUID;
|
||||
import org.openhab.core.thing.profiles.StateProfile;
|
||||
import org.openhab.core.thing.profiles.SystemProfiles;
|
||||
import org.openhab.core.thing.profiles.TimeSeriesProfile;
|
||||
import org.openhab.core.types.Command;
|
||||
import org.openhab.core.types.State;
|
||||
import org.openhab.core.types.TimeSeries;
|
||||
|
||||
/**
|
||||
* This is the default profile for stateful channels.
|
||||
@ -30,7 +31,7 @@ import org.openhab.core.types.State;
|
||||
* @author Simon Kaufmann - Initial contribution
|
||||
*/
|
||||
@NonNullByDefault
|
||||
public class SystemDefaultProfile implements StateProfile {
|
||||
public class SystemDefaultProfile implements TimeSeriesProfile {
|
||||
|
||||
private final ProfileCallback callback;
|
||||
|
||||
@ -58,6 +59,11 @@ public class SystemDefaultProfile implements StateProfile {
|
||||
callback.sendCommand(command);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTimeSeriesFromHandler(TimeSeries timeSeries) {
|
||||
callback.sendTimeSeries(timeSeries);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onStateUpdateFromItem(State state) {
|
||||
}
|
||||
|
@ -16,6 +16,7 @@ import org.eclipse.jdt.annotation.NonNullByDefault;
|
||||
import org.openhab.core.thing.link.ItemChannelLink;
|
||||
import org.openhab.core.types.Command;
|
||||
import org.openhab.core.types.State;
|
||||
import org.openhab.core.types.TimeSeries;
|
||||
|
||||
/**
|
||||
* Gives access to the framework features for continuing the communication flow.
|
||||
@ -52,4 +53,11 @@ public interface ProfileCallback {
|
||||
* @param state
|
||||
*/
|
||||
void sendUpdate(State state);
|
||||
|
||||
/**
|
||||
* Send a {@link TimeSeries} update to the framework.
|
||||
*
|
||||
* @param timeSeries
|
||||
*/
|
||||
void sendTimeSeries(TimeSeries timeSeries);
|
||||
}
|
||||
|
@ -0,0 +1,32 @@
|
||||
/**
|
||||
* Copyright (c) 2010-2023 Contributors to the openHAB project
|
||||
*
|
||||
* See the NOTICE file(s) distributed with this work for additional
|
||||
* information.
|
||||
*
|
||||
* This program and the accompanying materials are made available under the
|
||||
* terms of the Eclipse Public License 2.0 which is available at
|
||||
* http://www.eclipse.org/legal/epl-2.0
|
||||
*
|
||||
* SPDX-License-Identifier: EPL-2.0
|
||||
*/
|
||||
package org.openhab.core.thing.profiles;
|
||||
|
||||
import org.eclipse.jdt.annotation.NonNullByDefault;
|
||||
import org.openhab.core.types.TimeSeries;
|
||||
|
||||
/**
|
||||
* The {@link TimeSeriesProfile} extends the {@link StateProfile} to support {@link TimeSeries} updates
|
||||
*
|
||||
* @author Jan N. Klug - Initial contribution
|
||||
*/
|
||||
@NonNullByDefault
|
||||
public interface TimeSeriesProfile extends StateProfile {
|
||||
|
||||
/**
|
||||
* If a binding sends a time-series to a channel, this method will be called for each linked item.
|
||||
*
|
||||
* @param timeSeries the time-series
|
||||
*/
|
||||
void onTimeSeriesFromHandler(TimeSeries timeSeries);
|
||||
}
|
@ -1,141 +0,0 @@
|
||||
/**
|
||||
* Copyright (c) 2010-2023 Contributors to the openHAB project
|
||||
*
|
||||
* See the NOTICE file(s) distributed with this work for additional
|
||||
* information.
|
||||
*
|
||||
* This program and the accompanying materials are made available under the
|
||||
* terms of the Eclipse Public License 2.0 which is available at
|
||||
* http://www.eclipse.org/legal/epl-2.0
|
||||
*
|
||||
* SPDX-License-Identifier: EPL-2.0
|
||||
*/
|
||||
package org.openhab.core.thing;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.*;
|
||||
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import org.eclipse.jdt.annotation.NonNullByDefault;
|
||||
import org.junit.jupiter.api.Disabled;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.Arguments;
|
||||
import org.junit.jupiter.params.provider.MethodSource;
|
||||
import org.openhab.core.items.Item;
|
||||
import org.openhab.core.library.items.CallItem;
|
||||
import org.openhab.core.library.items.ColorItem;
|
||||
import org.openhab.core.library.items.ContactItem;
|
||||
import org.openhab.core.library.items.DateTimeItem;
|
||||
import org.openhab.core.library.items.DimmerItem;
|
||||
import org.openhab.core.library.items.ImageItem;
|
||||
import org.openhab.core.library.items.LocationItem;
|
||||
import org.openhab.core.library.items.PlayerItem;
|
||||
import org.openhab.core.library.items.RollershutterItem;
|
||||
import org.openhab.core.library.items.StringItem;
|
||||
import org.openhab.core.library.types.DateTimeType;
|
||||
import org.openhab.core.library.types.DecimalType;
|
||||
import org.openhab.core.library.types.HSBType;
|
||||
import org.openhab.core.library.types.IncreaseDecreaseType;
|
||||
import org.openhab.core.library.types.NextPreviousType;
|
||||
import org.openhab.core.library.types.OnOffType;
|
||||
import org.openhab.core.library.types.OpenClosedType;
|
||||
import org.openhab.core.library.types.PercentType;
|
||||
import org.openhab.core.library.types.PlayPauseType;
|
||||
import org.openhab.core.library.types.PointType;
|
||||
import org.openhab.core.library.types.QuantityType;
|
||||
import org.openhab.core.library.types.RawType;
|
||||
import org.openhab.core.library.types.RewindFastforwardType;
|
||||
import org.openhab.core.library.types.StringType;
|
||||
import org.openhab.core.library.types.UpDownType;
|
||||
import org.openhab.core.types.Command;
|
||||
import org.openhab.core.types.State;
|
||||
import org.openhab.core.types.Type;
|
||||
import org.openhab.core.types.UnDefType;
|
||||
|
||||
/**
|
||||
* @author Jan N. Klug - Initial contribution
|
||||
*/
|
||||
@NonNullByDefault
|
||||
public class CommunicationManagerConversionTest {
|
||||
// TODO: remove test - only to show CommunicationManager is too complex
|
||||
|
||||
private static final List<Class<? extends Item>> ITEM_TYPES = List.of(CallItem.class, ColorItem.class,
|
||||
ContactItem.class, DateTimeItem.class, DimmerItem.class, ImageItem.class, LocationItem.class,
|
||||
PlayerItem.class, RollershutterItem.class, StringItem.class);
|
||||
|
||||
private static final List<Class<? extends Type>> TYPES = List.of(DateTimeType.class, DecimalType.class,
|
||||
HSBType.class, IncreaseDecreaseType.class, NextPreviousType.class, OnOffType.class, OpenClosedType.class,
|
||||
PercentType.class, PlayPauseType.class, PointType.class, QuantityType.class, RawType.class,
|
||||
RewindFastforwardType.class, StringType.class, UpDownType.class, UnDefType.class);
|
||||
|
||||
private static Stream<Arguments> arguments()
|
||||
throws NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException {
|
||||
List<Arguments> arguments = new ArrayList<>();
|
||||
for (Class<? extends Item> itemType : ITEM_TYPES) {
|
||||
Item item = itemType.getDeclaredConstructor(String.class).newInstance("testItem");
|
||||
for (Class<? extends Type> type : TYPES) {
|
||||
if (type.isEnum()) {
|
||||
arguments.add(Arguments.of(item, type.getEnumConstants()[0]));
|
||||
} else if (type == RawType.class) {
|
||||
arguments.add(Arguments.of(item, new RawType(new byte[] {}, "mimeType")));
|
||||
} else {
|
||||
arguments.add(Arguments.of(item, type.getDeclaredConstructor().newInstance()));
|
||||
}
|
||||
}
|
||||
}
|
||||
return arguments.stream();
|
||||
}
|
||||
|
||||
@Disabled
|
||||
@MethodSource("arguments")
|
||||
@ParameterizedTest
|
||||
public void testCommand(Item item, Type originalType) {
|
||||
Type returnType = null;
|
||||
|
||||
List<Class<? extends Command>> acceptedTypes = item.getAcceptedCommandTypes();
|
||||
if (acceptedTypes.contains(originalType.getClass())) {
|
||||
returnType = originalType;
|
||||
} else {
|
||||
// Look for class hierarchy and convert appropriately
|
||||
for (Class<? extends Type> typeClass : acceptedTypes) {
|
||||
if (!typeClass.isEnum() && typeClass.isAssignableFrom(originalType.getClass()) //
|
||||
&& State.class.isAssignableFrom(typeClass) && originalType instanceof State state) {
|
||||
returnType = state.as((Class<? extends State>) typeClass);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (returnType != null && !returnType.getClass().equals(originalType.getClass())) {
|
||||
fail("CommunicationManager did a conversion for target item " + item.getType() + " from "
|
||||
+ originalType.getClass() + " to " + returnType.getClass());
|
||||
}
|
||||
}
|
||||
|
||||
@MethodSource("arguments")
|
||||
@ParameterizedTest
|
||||
public void testState(Item item, Type originalType) {
|
||||
Type returnType = null;
|
||||
|
||||
List<Class<? extends State>> acceptedTypes = item.getAcceptedDataTypes();
|
||||
if (acceptedTypes.contains(originalType.getClass())) {
|
||||
returnType = originalType;
|
||||
} else {
|
||||
// Look for class hierarchy and convert appropriately
|
||||
for (Class<? extends Type> typeClass : acceptedTypes) {
|
||||
if (!typeClass.isEnum() && typeClass.isAssignableFrom(originalType.getClass()) //
|
||||
&& State.class.isAssignableFrom(typeClass) && originalType instanceof State state) {
|
||||
returnType = state.as((Class<? extends State>) typeClass);
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (returnType != null && !returnType.equals(originalType)) {
|
||||
fail("CommunicationManager did a conversion for target item " + item.getType() + " from "
|
||||
+ originalType.getClass() + " to " + returnType.getClass());
|
||||
}
|
||||
}
|
||||
}
|
@ -22,6 +22,7 @@ import org.mockito.Mock;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
import org.openhab.core.library.types.OnOffType;
|
||||
import org.openhab.core.thing.profiles.ProfileCallback;
|
||||
import org.openhab.core.types.TimeSeries;
|
||||
|
||||
/**
|
||||
*
|
||||
@ -60,4 +61,15 @@ public class SystemDefaultProfileTest {
|
||||
verify(callbackMock).sendCommand(eq(OnOffType.ON));
|
||||
verifyNoMoreInteractions(callbackMock);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSendTimeSeries() {
|
||||
TimeSeries timeSeries = new TimeSeries(TimeSeries.Policy.ADD);
|
||||
|
||||
SystemDefaultProfile profile = new SystemDefaultProfile(callbackMock);
|
||||
profile.onTimeSeriesFromHandler(timeSeries);
|
||||
|
||||
verify(callbackMock).sendTimeSeries(timeSeries);
|
||||
verifyNoMoreInteractions(callbackMock);
|
||||
}
|
||||
}
|
||||
|
@ -22,6 +22,7 @@ import org.openhab.core.items.ItemRegistry;
|
||||
import org.openhab.core.items.events.AbstractItemEventSubscriber;
|
||||
import org.openhab.core.items.events.ItemCommandEvent;
|
||||
import org.openhab.core.items.events.ItemStateEvent;
|
||||
import org.openhab.core.items.events.ItemTimeSeriesEvent;
|
||||
import org.openhab.core.types.State;
|
||||
import org.osgi.service.component.annotations.Activate;
|
||||
import org.osgi.service.component.annotations.Component;
|
||||
@ -96,4 +97,16 @@ public class ItemUpdater extends AbstractItemEventSubscriber {
|
||||
logger.debug("Received command for non-existing item: {}", e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void receiveTimeSeries(ItemTimeSeriesEvent timeSeriesEvent) {
|
||||
try {
|
||||
Item item = itemRegistry.getItem(timeSeriesEvent.getItemName());
|
||||
if (!(item instanceof GroupItem) && item instanceof GenericItem genericItem) {
|
||||
genericItem.setTimeSeries(timeSeriesEvent.getTimeSeries());
|
||||
}
|
||||
} catch (ItemNotFoundException e) {
|
||||
logger.debug("Received command for non-existing item: {}", e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -39,6 +39,7 @@ import org.openhab.core.types.CommandOption;
|
||||
import org.openhab.core.types.RefreshType;
|
||||
import org.openhab.core.types.State;
|
||||
import org.openhab.core.types.StateDescription;
|
||||
import org.openhab.core.types.TimeSeries;
|
||||
import org.openhab.core.types.UnDefType;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@ -51,6 +52,7 @@ import org.slf4j.LoggerFactory;
|
||||
* @author Kai Kreuzer - Initial contribution
|
||||
* @author Andre Fuechsel - Added tags
|
||||
* @author Stefan Bußweiler - Migration to new ESH event concept
|
||||
* @author Jan N. Klug - Added time series support
|
||||
*/
|
||||
@NonNullByDefault
|
||||
public abstract class GenericItem implements ActiveItem {
|
||||
@ -64,6 +66,9 @@ public abstract class GenericItem implements ActiveItem {
|
||||
protected Set<StateChangeListener> listeners = new CopyOnWriteArraySet<>(
|
||||
Collections.newSetFromMap(new WeakHashMap<>()));
|
||||
|
||||
protected Set<TimeSeriesListener> timeSeriesListeners = new CopyOnWriteArraySet<>(
|
||||
Collections.newSetFromMap(new WeakHashMap<>()));
|
||||
|
||||
protected List<String> groupNames = new ArrayList<>();
|
||||
|
||||
protected Set<String> tags = new HashSet<>();
|
||||
@ -229,6 +234,50 @@ public abstract class GenericItem implements ActiveItem {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Set a new time series.
|
||||
* <p/>
|
||||
* Subclasses may override this method in order to do necessary conversions upfront. Afterwards,
|
||||
* {@link #applyTimeSeries(TimeSeries)} should be called by classes overriding this method.
|
||||
* <p/>
|
||||
* A time series may only contain events that are compatible with the item's internal state.
|
||||
*
|
||||
* @param timeSeries new time series of this item
|
||||
*/
|
||||
public void setTimeSeries(TimeSeries timeSeries) {
|
||||
applyTimeSeries(timeSeries);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets new time series, notifies listeners and sends events.
|
||||
* <p />
|
||||
* Classes overriding the {@link #setTimeSeries(TimeSeries)} method should call this method in order to actually set
|
||||
* the time series, inform listeners and send the event.
|
||||
* <p/>
|
||||
* A time series may only contain events that are compatible with the item's internal state.
|
||||
*
|
||||
* @param timeSeries new time series of this item
|
||||
*/
|
||||
protected final void applyTimeSeries(TimeSeries timeSeries) {
|
||||
// notify listeners
|
||||
Set<TimeSeriesListener> clonedListeners = new CopyOnWriteArraySet<>(timeSeriesListeners);
|
||||
ExecutorService pool = ThreadPoolManager.getPool(ITEM_THREADPOOLNAME);
|
||||
clonedListeners.forEach(listener -> pool.execute(() -> {
|
||||
try {
|
||||
listener.timeSeriesUpdated(GenericItem.this, timeSeries);
|
||||
} catch (Exception e) {
|
||||
logger.warn("failed notifying listener '{}' about timeseries update of item {}: {}", listener,
|
||||
GenericItem.this.getName(), e.getMessage(), e);
|
||||
}
|
||||
}));
|
||||
|
||||
// send event
|
||||
EventPublisher eventPublisher1 = this.eventPublisher;
|
||||
if (eventPublisher1 != null) {
|
||||
eventPublisher1.post(ItemEventFactory.createTimeSeriesUpdatedEvent(this.name, timeSeries, null));
|
||||
}
|
||||
}
|
||||
|
||||
private void sendStateUpdatedEvent(State newState) {
|
||||
EventPublisher eventPublisher1 = this.eventPublisher;
|
||||
if (eventPublisher1 != null) {
|
||||
@ -314,6 +363,18 @@ public abstract class GenericItem implements ActiveItem {
|
||||
}
|
||||
}
|
||||
|
||||
public void addTimeSeriesListener(TimeSeriesListener listener) {
|
||||
synchronized (timeSeriesListeners) {
|
||||
timeSeriesListeners.add(listener);
|
||||
}
|
||||
}
|
||||
|
||||
public void removeTimeSeriesListener(TimeSeriesListener listener) {
|
||||
synchronized (timeSeriesListeners) {
|
||||
timeSeriesListeners.remove(listener);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
final int prime = 31;
|
||||
@ -437,8 +498,7 @@ public abstract class GenericItem implements ActiveItem {
|
||||
* @return true if state is an acceptedDataType or subclass thereof
|
||||
*/
|
||||
public boolean isAcceptedState(List<Class<? extends State>> acceptedDataTypes, State state) {
|
||||
return acceptedDataTypes.stream().map(clazz -> clazz.isAssignableFrom(state.getClass())).filter(found -> found)
|
||||
.findAny().isPresent();
|
||||
return acceptedDataTypes.stream().anyMatch(clazz -> clazz.isAssignableFrom(state.getClass()));
|
||||
}
|
||||
|
||||
protected void logSetTypeError(State state) {
|
||||
@ -446,7 +506,12 @@ public abstract class GenericItem implements ActiveItem {
|
||||
state.getClass().getSimpleName(), getName(), getClass().getSimpleName());
|
||||
}
|
||||
|
||||
private @Nullable CommandDescription stateOptions2CommandOptions(StateDescription stateDescription) {
|
||||
protected void logSetTypeError(TimeSeries timeSeries) {
|
||||
logger.error("Tried to set invalid state in time series {} on item {} of type {}, ignoring it", timeSeries,
|
||||
getName(), getClass().getSimpleName());
|
||||
}
|
||||
|
||||
private CommandDescription stateOptions2CommandOptions(StateDescription stateDescription) {
|
||||
CommandDescriptionBuilder builder = CommandDescriptionBuilder.create();
|
||||
stateDescription.getOptions()
|
||||
.forEach(so -> builder.withCommandOption(new CommandOption(so.getValue(), so.getLabel())));
|
||||
|
@ -0,0 +1,38 @@
|
||||
/**
|
||||
* Copyright (c) 2010-2023 Contributors to the openHAB project
|
||||
*
|
||||
* See the NOTICE file(s) distributed with this work for additional
|
||||
* information.
|
||||
*
|
||||
* This program and the accompanying materials are made available under the
|
||||
* terms of the Eclipse Public License 2.0 which is available at
|
||||
* http://www.eclipse.org/legal/epl-2.0
|
||||
*
|
||||
* SPDX-License-Identifier: EPL-2.0
|
||||
*/
|
||||
package org.openhab.core.items;
|
||||
|
||||
import org.eclipse.jdt.annotation.NonNullByDefault;
|
||||
import org.openhab.core.types.TimeSeries;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* This interface must be implemented by all classes that want to be notified about |@link TimeSeries} updates of an
|
||||
* item.
|
||||
*
|
||||
* <p>
|
||||
* The {@link GenericItem} class provides the possibility to register such listeners.
|
||||
*
|
||||
* @author Jan N. Klug - Initial contribution
|
||||
*/
|
||||
@NonNullByDefault
|
||||
public interface TimeSeriesListener {
|
||||
|
||||
/**
|
||||
* This method is called, if a time series update was sent to the item.
|
||||
*
|
||||
* @param item the item the timeseries was updated for
|
||||
* @param timeSeries the time series
|
||||
*/
|
||||
void timeSeriesUpdated(Item item, TimeSeries timeSeries);
|
||||
}
|
@ -31,7 +31,8 @@ import org.openhab.core.events.EventSubscriber;
|
||||
@NonNullByDefault
|
||||
public abstract class AbstractItemEventSubscriber implements EventSubscriber {
|
||||
|
||||
private final Set<String> subscribedEventTypes = Set.of(ItemStateEvent.TYPE, ItemCommandEvent.TYPE);
|
||||
private final Set<String> subscribedEventTypes = Set.of(ItemStateEvent.TYPE, ItemCommandEvent.TYPE,
|
||||
ItemTimeSeriesEvent.TYPE);
|
||||
|
||||
@Override
|
||||
public Set<String> getSubscribedEventTypes() {
|
||||
@ -44,6 +45,8 @@ public abstract class AbstractItemEventSubscriber implements EventSubscriber {
|
||||
receiveUpdate(stateEvent);
|
||||
} else if (event instanceof ItemCommandEvent commandEvent) {
|
||||
receiveCommand(commandEvent);
|
||||
} else if (event instanceof ItemTimeSeriesEvent timeSeriesEvent) {
|
||||
receiveTimeSeries(timeSeriesEvent);
|
||||
}
|
||||
}
|
||||
|
||||
@ -66,4 +69,14 @@ public abstract class AbstractItemEventSubscriber implements EventSubscriber {
|
||||
// Default implementation: do nothing.
|
||||
// Can be implemented by subclass in order to handle item updates.
|
||||
}
|
||||
|
||||
/**
|
||||
* Callback method for receiving item timeseries events from the openHAB event bus.
|
||||
*
|
||||
* @param timeSeriesEvent the timeseries event
|
||||
*/
|
||||
protected void receiveTimeSeries(ItemTimeSeriesEvent timeSeriesEvent) {
|
||||
// Default implementation: do nothing.
|
||||
// Can be implemented by subclass in order to handle timeseries updates.
|
||||
}
|
||||
}
|
||||
|
@ -14,6 +14,7 @@ package org.openhab.core.items.events;
|
||||
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.time.Instant;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
@ -29,6 +30,7 @@ import org.openhab.core.items.dto.ItemDTOMapper;
|
||||
import org.openhab.core.types.Command;
|
||||
import org.openhab.core.types.RefreshType;
|
||||
import org.openhab.core.types.State;
|
||||
import org.openhab.core.types.TimeSeries;
|
||||
import org.openhab.core.types.Type;
|
||||
import org.openhab.core.types.UnDefType;
|
||||
import org.osgi.service.component.annotations.Component;
|
||||
@ -51,6 +53,8 @@ public class ItemEventFactory extends AbstractEventFactory {
|
||||
private static final String ITEM_STATE_EVENT_TOPIC = "openhab/items/{itemName}/state";
|
||||
|
||||
private static final String ITEM_STATE_UPDATED_EVENT_TOPIC = "openhab/items/{itemName}/stateupdated";
|
||||
private static final String ITEM_TIME_SERIES_EVENT_TOPIC = "openhab/items/{itemName}/timeseries";
|
||||
private static final String ITEM_TIME_SERIES_UPDATED_EVENT_TOPIC = "openhab/items/{itemName}/timeseriesupdated";
|
||||
|
||||
private static final String ITEM_STATE_PREDICTED_EVENT_TOPIC = "openhab/items/{itemName}/statepredicted";
|
||||
|
||||
@ -72,7 +76,8 @@ public class ItemEventFactory extends AbstractEventFactory {
|
||||
public ItemEventFactory() {
|
||||
super(Set.of(ItemCommandEvent.TYPE, ItemStateEvent.TYPE, ItemStatePredictedEvent.TYPE,
|
||||
ItemStateUpdatedEvent.TYPE, ItemStateChangedEvent.TYPE, ItemAddedEvent.TYPE, ItemUpdatedEvent.TYPE,
|
||||
ItemRemovedEvent.TYPE, GroupStateUpdatedEvent.TYPE, GroupItemStateChangedEvent.TYPE));
|
||||
ItemRemovedEvent.TYPE, GroupStateUpdatedEvent.TYPE, GroupItemStateChangedEvent.TYPE,
|
||||
ItemTimeSeriesEvent.TYPE, ItemTimeSeriesUpdatedEvent.TYPE));
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -88,6 +93,10 @@ public class ItemEventFactory extends AbstractEventFactory {
|
||||
return createStateUpdatedEvent(topic, payload);
|
||||
} else if (ItemStateChangedEvent.TYPE.equals(eventType)) {
|
||||
return createStateChangedEvent(topic, payload);
|
||||
} else if (ItemTimeSeriesEvent.TYPE.equals(eventType)) {
|
||||
return createTimeSeriesEvent(topic, payload);
|
||||
} else if (ItemTimeSeriesUpdatedEvent.TYPE.equals(eventType)) {
|
||||
return createTimeSeriesUpdatedEvent(topic, payload);
|
||||
} else if (ItemAddedEvent.TYPE.equals(eventType)) {
|
||||
return createAddedEvent(topic, payload);
|
||||
} else if (ItemUpdatedEvent.TYPE.equals(eventType)) {
|
||||
@ -155,6 +164,20 @@ public class ItemEventFactory extends AbstractEventFactory {
|
||||
return new ItemStateChangedEvent(topic, payload, itemName, state, oldState);
|
||||
}
|
||||
|
||||
private Event createTimeSeriesEvent(String topic, String payload) {
|
||||
String itemName = getItemName(topic);
|
||||
ItemTimeSeriesEventPayloadBean bean = deserializePayload(payload, ItemTimeSeriesEventPayloadBean.class);
|
||||
TimeSeries timeSeries = bean.getTimeSeries();
|
||||
return new ItemTimeSeriesEvent(topic, payload, itemName, timeSeries, null);
|
||||
}
|
||||
|
||||
private Event createTimeSeriesUpdatedEvent(String topic, String payload) {
|
||||
String itemName = getItemName(topic);
|
||||
ItemTimeSeriesEventPayloadBean bean = deserializePayload(payload, ItemTimeSeriesEventPayloadBean.class);
|
||||
TimeSeries timeSeries = bean.getTimeSeries();
|
||||
return new ItemTimeSeriesUpdatedEvent(topic, payload, itemName, timeSeries, null);
|
||||
}
|
||||
|
||||
private State getState(String type, String value) {
|
||||
return parseType(type, value, State.class);
|
||||
}
|
||||
@ -175,7 +198,7 @@ public class ItemEventFactory extends AbstractEventFactory {
|
||||
return topicElements[3];
|
||||
}
|
||||
|
||||
private <T> T parseType(String typeName, String valueToParse, Class<T> desiredClass) {
|
||||
private static <T> T parseType(String typeName, String valueToParse, Class<T> desiredClass) {
|
||||
Object parsedObject = null;
|
||||
String simpleClassName = typeName + TYPE_POSTFIX;
|
||||
parsedObject = parseSimpleClassName(simpleClassName, valueToParse);
|
||||
@ -190,7 +213,7 @@ public class ItemEventFactory extends AbstractEventFactory {
|
||||
return desiredClass.cast(parsedObject);
|
||||
}
|
||||
|
||||
private @Nullable Object parseSimpleClassName(String simpleClassName, String valueToParse) {
|
||||
private static @Nullable Object parseSimpleClassName(String simpleClassName, String valueToParse) {
|
||||
if (simpleClassName.equals(UnDefType.class.getSimpleName())) {
|
||||
return UnDefType.valueOf(valueToParse);
|
||||
}
|
||||
@ -320,6 +343,22 @@ public class ItemEventFactory extends AbstractEventFactory {
|
||||
return new ItemStateUpdatedEvent(topic, payload, itemName, state, source);
|
||||
}
|
||||
|
||||
public static ItemTimeSeriesEvent createTimeSeriesEvent(String itemName, TimeSeries timeSeries,
|
||||
@Nullable String source) {
|
||||
String topic = buildTopic(ITEM_TIME_SERIES_EVENT_TOPIC, itemName);
|
||||
ItemTimeSeriesEventPayloadBean bean = new ItemTimeSeriesEventPayloadBean(timeSeries);
|
||||
String payload = serializePayload(bean);
|
||||
return new ItemTimeSeriesEvent(topic, payload, itemName, timeSeries, source);
|
||||
}
|
||||
|
||||
public static ItemTimeSeriesUpdatedEvent createTimeSeriesUpdatedEvent(String itemName, TimeSeries timeSeries,
|
||||
@Nullable String source) {
|
||||
String topic = buildTopic(ITEM_TIME_SERIES_UPDATED_EVENT_TOPIC, itemName);
|
||||
ItemTimeSeriesEventPayloadBean bean = new ItemTimeSeriesEventPayloadBean(timeSeries);
|
||||
String payload = serializePayload(bean);
|
||||
return new ItemTimeSeriesUpdatedEvent(topic, payload, itemName, timeSeries, source);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a group item state updated event.
|
||||
*
|
||||
@ -585,4 +624,58 @@ public class ItemEventFactory extends AbstractEventFactory {
|
||||
return oldValue;
|
||||
}
|
||||
}
|
||||
|
||||
private static class ItemTimeSeriesEventPayloadBean {
|
||||
private @NonNullByDefault({}) List<TimeSeriesPayload> timeSeries;
|
||||
private @NonNullByDefault({}) String policy;
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
private ItemTimeSeriesEventPayloadBean() {
|
||||
// do not remove, GSON needs it
|
||||
}
|
||||
|
||||
public ItemTimeSeriesEventPayloadBean(TimeSeries timeSeries) {
|
||||
this.timeSeries = timeSeries.getStates().map(TimeSeriesPayload::new).toList();
|
||||
this.policy = timeSeries.getPolicy().name();
|
||||
}
|
||||
|
||||
public TimeSeries getTimeSeries() {
|
||||
TimeSeries timeSeries1 = new TimeSeries(TimeSeries.Policy.valueOf(policy));
|
||||
timeSeries.forEach(e -> {
|
||||
State state = parseType(e.getType(), e.getValue(), State.class);
|
||||
Instant instant = Instant.parse(e.getTimestamp());
|
||||
timeSeries1.add(instant, state);
|
||||
});
|
||||
return timeSeries1;
|
||||
}
|
||||
|
||||
private static class TimeSeriesPayload {
|
||||
private @NonNullByDefault({}) String type;
|
||||
private @NonNullByDefault({}) String value;
|
||||
private @NonNullByDefault({}) String timestamp;
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
private TimeSeriesPayload() {
|
||||
// do not remove, GSON needs it
|
||||
}
|
||||
|
||||
public TimeSeriesPayload(TimeSeries.Entry entry) {
|
||||
type = getStateType(entry.state());
|
||||
value = entry.state().toFullString();
|
||||
timestamp = entry.timestamp().toString();
|
||||
}
|
||||
|
||||
public String getType() {
|
||||
return type;
|
||||
}
|
||||
|
||||
public String getValue() {
|
||||
return value;
|
||||
}
|
||||
|
||||
public String getTimestamp() {
|
||||
return timestamp;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,65 @@
|
||||
/**
|
||||
* Copyright (c) 2010-2023 Contributors to the openHAB project
|
||||
*
|
||||
* See the NOTICE file(s) distributed with this work for additional
|
||||
* information.
|
||||
*
|
||||
* This program and the accompanying materials are made available under the
|
||||
* terms of the Eclipse Public License 2.0 which is available at
|
||||
* http://www.eclipse.org/legal/epl-2.0
|
||||
*
|
||||
* SPDX-License-Identifier: EPL-2.0
|
||||
*/
|
||||
package org.openhab.core.items.events;
|
||||
|
||||
import org.eclipse.jdt.annotation.NonNullByDefault;
|
||||
import org.eclipse.jdt.annotation.Nullable;
|
||||
import org.openhab.core.types.TimeSeries;
|
||||
|
||||
/**
|
||||
* The {@link ItemTimeSeriesEvent} can be used to report item time series updates through the openHAB event bus.
|
||||
* Time series events must be created with the {@link ItemEventFactory}.
|
||||
*
|
||||
* @author Jan N. Klug - Initial contribution
|
||||
*/
|
||||
@NonNullByDefault
|
||||
public class ItemTimeSeriesEvent extends ItemEvent {
|
||||
|
||||
public static final String TYPE = ItemTimeSeriesEvent.class.getSimpleName();
|
||||
|
||||
protected final TimeSeries timeSeries;
|
||||
|
||||
/**
|
||||
* Constructs a new item time series event.
|
||||
*
|
||||
* @param topic the topic
|
||||
* @param payload the payload
|
||||
* @param itemName the item name
|
||||
* @param timeSeries the time series
|
||||
* @param source the source, can be null
|
||||
*/
|
||||
protected ItemTimeSeriesEvent(String topic, String payload, String itemName, TimeSeries timeSeries,
|
||||
@Nullable String source) {
|
||||
super(topic, payload, itemName, source);
|
||||
this.timeSeries = timeSeries;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getType() {
|
||||
return TYPE;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the item time series.
|
||||
*
|
||||
* @return the item time series
|
||||
*/
|
||||
public TimeSeries getTimeSeries() {
|
||||
return timeSeries;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return String.format("Item '%s' shall process timeseries %s", itemName, timeSeries.getStates().toList());
|
||||
}
|
||||
}
|
@ -0,0 +1,65 @@
|
||||
/**
|
||||
* Copyright (c) 2010-2023 Contributors to the openHAB project
|
||||
*
|
||||
* See the NOTICE file(s) distributed with this work for additional
|
||||
* information.
|
||||
*
|
||||
* This program and the accompanying materials are made available under the
|
||||
* terms of the Eclipse Public License 2.0 which is available at
|
||||
* http://www.eclipse.org/legal/epl-2.0
|
||||
*
|
||||
* SPDX-License-Identifier: EPL-2.0
|
||||
*/
|
||||
package org.openhab.core.items.events;
|
||||
|
||||
import org.eclipse.jdt.annotation.NonNullByDefault;
|
||||
import org.eclipse.jdt.annotation.Nullable;
|
||||
import org.openhab.core.types.TimeSeries;
|
||||
|
||||
/**
|
||||
* The {@link ItemTimeSeriesUpdatedEvent} can be used to report item time series updates through the openHAB event bus.
|
||||
* Time series events must be created with the {@link ItemEventFactory}.
|
||||
*
|
||||
* @author Jan N. Klug - Initial contribution
|
||||
*/
|
||||
@NonNullByDefault
|
||||
public class ItemTimeSeriesUpdatedEvent extends ItemEvent {
|
||||
|
||||
public static final String TYPE = ItemTimeSeriesUpdatedEvent.class.getSimpleName();
|
||||
|
||||
protected final TimeSeries timeSeries;
|
||||
|
||||
/**
|
||||
* Constructs a new item time series updated event.
|
||||
*
|
||||
* @param topic the topic
|
||||
* @param payload the payload
|
||||
* @param itemName the item name
|
||||
* @param timeSeries the time series
|
||||
* @param source the source, can be null
|
||||
*/
|
||||
protected ItemTimeSeriesUpdatedEvent(String topic, String payload, String itemName, TimeSeries timeSeries,
|
||||
@Nullable String source) {
|
||||
super(topic, payload, itemName, source);
|
||||
this.timeSeries = timeSeries;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getType() {
|
||||
return TYPE;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the item time series.
|
||||
*
|
||||
* @return the item time series
|
||||
*/
|
||||
public TimeSeries getTimeSeries() {
|
||||
return timeSeries;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return String.format("Item '%s' updated timeseries %s", itemName, timeSeries.getStates().toList());
|
||||
}
|
||||
}
|
@ -21,6 +21,7 @@ import org.openhab.core.library.types.StringListType;
|
||||
import org.openhab.core.types.Command;
|
||||
import org.openhab.core.types.RefreshType;
|
||||
import org.openhab.core.types.State;
|
||||
import org.openhab.core.types.TimeSeries;
|
||||
import org.openhab.core.types.UnDefType;
|
||||
|
||||
/**
|
||||
@ -53,9 +54,18 @@ public class CallItem extends GenericItem {
|
||||
@Override
|
||||
public void setState(State state) {
|
||||
if (isAcceptedState(ACCEPTED_DATA_TYPES, state)) {
|
||||
super.setState(state);
|
||||
applyState(state);
|
||||
} else {
|
||||
logSetTypeError(state);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setTimeSeries(TimeSeries timeSeries) {
|
||||
if (timeSeries.getStates().allMatch(s -> isAcceptedState(ACCEPTED_DATA_TYPES, s.state()))) {
|
||||
applyTimeSeries(timeSeries);
|
||||
} else {
|
||||
logSetTypeError(timeSeries);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -25,6 +25,7 @@ import org.openhab.core.library.types.PercentType;
|
||||
import org.openhab.core.types.Command;
|
||||
import org.openhab.core.types.RefreshType;
|
||||
import org.openhab.core.types.State;
|
||||
import org.openhab.core.types.TimeSeries;
|
||||
import org.openhab.core.types.UnDefType;
|
||||
|
||||
/**
|
||||
@ -92,4 +93,13 @@ public class ColorItem extends DimmerItem {
|
||||
logSetTypeError(state);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setTimeSeries(TimeSeries timeSeries) {
|
||||
if (timeSeries.getStates().allMatch(s -> s.state() instanceof HSBType)) {
|
||||
applyTimeSeries(timeSeries);
|
||||
} else {
|
||||
logSetTypeError(timeSeries);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -21,6 +21,7 @@ import org.openhab.core.library.types.OpenClosedType;
|
||||
import org.openhab.core.types.Command;
|
||||
import org.openhab.core.types.RefreshType;
|
||||
import org.openhab.core.types.State;
|
||||
import org.openhab.core.types.TimeSeries;
|
||||
import org.openhab.core.types.UnDefType;
|
||||
|
||||
/**
|
||||
@ -53,9 +54,18 @@ public class ContactItem extends GenericItem {
|
||||
@Override
|
||||
public void setState(State state) {
|
||||
if (isAcceptedState(ACCEPTED_DATA_TYPES, state)) {
|
||||
super.setState(state);
|
||||
applyState(state);
|
||||
} else {
|
||||
logSetTypeError(state);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setTimeSeries(TimeSeries timeSeries) {
|
||||
if (timeSeries.getStates().allMatch(s -> s.state() instanceof OpenClosedType)) {
|
||||
applyTimeSeries(timeSeries);
|
||||
} else {
|
||||
logSetTypeError(timeSeries);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -21,6 +21,7 @@ import org.openhab.core.library.types.DateTimeType;
|
||||
import org.openhab.core.types.Command;
|
||||
import org.openhab.core.types.RefreshType;
|
||||
import org.openhab.core.types.State;
|
||||
import org.openhab.core.types.TimeSeries;
|
||||
import org.openhab.core.types.UnDefType;
|
||||
|
||||
/**
|
||||
@ -69,4 +70,13 @@ public class DateTimeItem extends GenericItem {
|
||||
logSetTypeError(state);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setTimeSeries(TimeSeries timeSeries) {
|
||||
if (timeSeries.getStates().allMatch(s -> s.state() instanceof DateTimeType)) {
|
||||
applyTimeSeries(timeSeries);
|
||||
} else {
|
||||
logSetTypeError(timeSeries);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -22,6 +22,7 @@ import org.openhab.core.library.types.PercentType;
|
||||
import org.openhab.core.types.Command;
|
||||
import org.openhab.core.types.RefreshType;
|
||||
import org.openhab.core.types.State;
|
||||
import org.openhab.core.types.TimeSeries;
|
||||
import org.openhab.core.types.UnDefType;
|
||||
|
||||
/**
|
||||
@ -79,4 +80,13 @@ public class DimmerItem extends SwitchItem {
|
||||
logSetTypeError(state);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setTimeSeries(TimeSeries timeSeries) {
|
||||
if (timeSeries.getStates().allMatch(s -> s.state() instanceof PercentType)) {
|
||||
super.applyTimeSeries(timeSeries);
|
||||
} else {
|
||||
logSetTypeError(timeSeries);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -21,6 +21,7 @@ import org.openhab.core.library.types.RawType;
|
||||
import org.openhab.core.types.Command;
|
||||
import org.openhab.core.types.RefreshType;
|
||||
import org.openhab.core.types.State;
|
||||
import org.openhab.core.types.TimeSeries;
|
||||
import org.openhab.core.types.UnDefType;
|
||||
|
||||
/**
|
||||
@ -51,9 +52,18 @@ public class ImageItem extends GenericItem {
|
||||
@Override
|
||||
public void setState(State state) {
|
||||
if (isAcceptedState(ACCEPTED_DATA_TYPES, state)) {
|
||||
super.setState(state);
|
||||
applyState(state);
|
||||
} else {
|
||||
logSetTypeError(state);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setTimeSeries(TimeSeries timeSeries) {
|
||||
if (timeSeries.getStates().allMatch(s -> s.state() instanceof RawType)) {
|
||||
applyTimeSeries(timeSeries);
|
||||
} else {
|
||||
logSetTypeError(timeSeries);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -23,6 +23,7 @@ import org.openhab.core.library.types.PointType;
|
||||
import org.openhab.core.types.Command;
|
||||
import org.openhab.core.types.RefreshType;
|
||||
import org.openhab.core.types.State;
|
||||
import org.openhab.core.types.TimeSeries;
|
||||
import org.openhab.core.types.UnDefType;
|
||||
|
||||
/**
|
||||
@ -75,9 +76,18 @@ public class LocationItem extends GenericItem {
|
||||
@Override
|
||||
public void setState(State state) {
|
||||
if (isAcceptedState(ACCEPTED_DATA_TYPES, state)) {
|
||||
super.setState(state);
|
||||
applyState(state);
|
||||
} else {
|
||||
logSetTypeError(state);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setTimeSeries(TimeSeries timeSeries) {
|
||||
if (timeSeries.getStates().allMatch(s -> s.state() instanceof PointType)) {
|
||||
applyTimeSeries(timeSeries);
|
||||
} else {
|
||||
logSetTypeError(timeSeries);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -14,6 +14,7 @@ package org.openhab.core.library.items;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Objects;
|
||||
|
||||
import javax.measure.Dimension;
|
||||
import javax.measure.Quantity;
|
||||
@ -35,6 +36,7 @@ import org.openhab.core.types.RefreshType;
|
||||
import org.openhab.core.types.State;
|
||||
import org.openhab.core.types.StateDescription;
|
||||
import org.openhab.core.types.StateDescriptionFragmentBuilder;
|
||||
import org.openhab.core.types.TimeSeries;
|
||||
import org.openhab.core.types.UnDefType;
|
||||
import org.openhab.core.types.util.UnitUtils;
|
||||
import org.slf4j.Logger;
|
||||
@ -136,13 +138,12 @@ public class NumberItem extends GenericItem implements MetadataAwareItem {
|
||||
return dimension;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setState(State state) {
|
||||
private @Nullable State getInternalState(State state) {
|
||||
if (state instanceof QuantityType<?> quantityType) {
|
||||
if (dimension == null) {
|
||||
// QuantityType update to a NumberItem without unit, strip unit
|
||||
DecimalType plainState = new DecimalType(quantityType.toBigDecimal());
|
||||
super.applyState(plainState);
|
||||
return plainState;
|
||||
} else {
|
||||
// QuantityType update to a NumberItem with unit, convert to item unit (if possible)
|
||||
Unit<?> stateUnit = quantityType.getUnit();
|
||||
@ -150,7 +151,7 @@ public class NumberItem extends GenericItem implements MetadataAwareItem {
|
||||
? quantityType.toInvertibleUnit(unit)
|
||||
: null;
|
||||
if (convertedState != null) {
|
||||
super.applyState(convertedState);
|
||||
return convertedState;
|
||||
} else {
|
||||
logger.warn("Failed to update item '{}' because '{}' could not be converted to the item unit '{}'",
|
||||
name, state, unit);
|
||||
@ -159,18 +160,44 @@ public class NumberItem extends GenericItem implements MetadataAwareItem {
|
||||
} else if (state instanceof DecimalType decimalType) {
|
||||
if (dimension == null) {
|
||||
// DecimalType update to NumberItem with unit
|
||||
super.applyState(decimalType);
|
||||
return decimalType;
|
||||
} else {
|
||||
// DecimalType update for a NumberItem with dimension, convert to QuantityType
|
||||
super.applyState(new QuantityType<>(decimalType.doubleValue(), unit));
|
||||
return new QuantityType<>(decimalType.doubleValue(), unit);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setState(State state) {
|
||||
if (state instanceof DecimalType || state instanceof QuantityType<?>) {
|
||||
State internalState = getInternalState(state);
|
||||
if (internalState != null) {
|
||||
applyState(internalState);
|
||||
}
|
||||
} else if (state instanceof UnDefType) {
|
||||
super.applyState(state);
|
||||
applyState(state);
|
||||
} else {
|
||||
logSetTypeError(state);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setTimeSeries(TimeSeries timeSeries) {
|
||||
TimeSeries internalSeries = new TimeSeries(timeSeries.getPolicy());
|
||||
timeSeries.getStates().forEach(s -> internalSeries.add(s.timestamp(),
|
||||
Objects.requireNonNullElse(getInternalState(s.state()), UnDefType.NULL)));
|
||||
|
||||
if (dimension != null && internalSeries.getStates().allMatch(s -> s.state() instanceof QuantityType<?>)) {
|
||||
applyTimeSeries(internalSeries);
|
||||
} else if (internalSeries.getStates().allMatch(s -> s.state() instanceof DecimalType)) {
|
||||
applyTimeSeries(internalSeries);
|
||||
} else {
|
||||
logSetTypeError(timeSeries);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the optional unit symbol for this {@link NumberItem}.
|
||||
*
|
||||
|
@ -23,6 +23,7 @@ import org.openhab.core.library.types.RewindFastforwardType;
|
||||
import org.openhab.core.types.Command;
|
||||
import org.openhab.core.types.RefreshType;
|
||||
import org.openhab.core.types.State;
|
||||
import org.openhab.core.types.TimeSeries;
|
||||
import org.openhab.core.types.UnDefType;
|
||||
|
||||
/**
|
||||
@ -71,9 +72,19 @@ public class PlayerItem extends GenericItem {
|
||||
@Override
|
||||
public void setState(State state) {
|
||||
if (isAcceptedState(ACCEPTED_DATA_TYPES, state)) {
|
||||
super.setState(state);
|
||||
applyState(state);
|
||||
} else {
|
||||
logSetTypeError(state);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setTimeSeries(TimeSeries timeSeries) {
|
||||
if (timeSeries.getStates()
|
||||
.allMatch(s -> s.state() instanceof PlayPauseType || s.state() instanceof RewindFastforwardType)) {
|
||||
applyTimeSeries(timeSeries);
|
||||
} else {
|
||||
logSetTypeError(timeSeries);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -23,6 +23,7 @@ import org.openhab.core.library.types.UpDownType;
|
||||
import org.openhab.core.types.Command;
|
||||
import org.openhab.core.types.RefreshType;
|
||||
import org.openhab.core.types.State;
|
||||
import org.openhab.core.types.TimeSeries;
|
||||
import org.openhab.core.types.UnDefType;
|
||||
|
||||
/**
|
||||
@ -80,4 +81,13 @@ public class RollershutterItem extends GenericItem {
|
||||
logSetTypeError(state);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setTimeSeries(TimeSeries timeSeries) {
|
||||
if (timeSeries.getStates().allMatch(s -> isAcceptedState(ACCEPTED_DATA_TYPES, s.state()))) {
|
||||
applyTimeSeries(timeSeries);
|
||||
} else {
|
||||
logSetTypeError(timeSeries);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -24,6 +24,7 @@ import org.openhab.core.library.types.StringType;
|
||||
import org.openhab.core.types.Command;
|
||||
import org.openhab.core.types.RefreshType;
|
||||
import org.openhab.core.types.State;
|
||||
import org.openhab.core.types.TimeSeries;
|
||||
import org.openhab.core.types.TypeParser;
|
||||
import org.openhab.core.types.UnDefType;
|
||||
|
||||
@ -76,9 +77,18 @@ public class StringItem extends GenericItem {
|
||||
@Override
|
||||
public void setState(State state) {
|
||||
if (isAcceptedState(ACCEPTED_DATA_TYPES, state)) {
|
||||
super.setState(state);
|
||||
applyState(state);
|
||||
} else {
|
||||
logSetTypeError(state);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setTimeSeries(TimeSeries timeSeries) {
|
||||
if (timeSeries.getStates().allMatch(s -> s.state() instanceof StringType)) {
|
||||
applyTimeSeries(timeSeries);
|
||||
} else {
|
||||
logSetTypeError(timeSeries);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -21,6 +21,7 @@ import org.openhab.core.library.types.OnOffType;
|
||||
import org.openhab.core.types.Command;
|
||||
import org.openhab.core.types.RefreshType;
|
||||
import org.openhab.core.types.State;
|
||||
import org.openhab.core.types.TimeSeries;
|
||||
import org.openhab.core.types.UnDefType;
|
||||
|
||||
/**
|
||||
@ -61,9 +62,18 @@ public class SwitchItem extends GenericItem {
|
||||
@Override
|
||||
public void setState(State state) {
|
||||
if (isAcceptedState(ACCEPTED_DATA_TYPES, state)) {
|
||||
super.setState(state);
|
||||
applyState(state);
|
||||
} else {
|
||||
logSetTypeError(state);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setTimeSeries(TimeSeries timeSeries) {
|
||||
if (timeSeries.getStates().allMatch(s -> s.state() instanceof OnOffType)) {
|
||||
applyTimeSeries(timeSeries);
|
||||
} else {
|
||||
logSetTypeError(timeSeries);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,124 @@
|
||||
/**
|
||||
* Copyright (c) 2010-2023 Contributors to the openHAB project
|
||||
*
|
||||
* See the NOTICE file(s) distributed with this work for additional
|
||||
* information.
|
||||
*
|
||||
* This program and the accompanying materials are made available under the
|
||||
* terms of the Eclipse Public License 2.0 which is available at
|
||||
* http://www.eclipse.org/legal/epl-2.0
|
||||
*
|
||||
* SPDX-License-Identifier: EPL-2.0
|
||||
*/
|
||||
package org.openhab.core.types;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.TreeSet;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import org.eclipse.jdt.annotation.NonNullByDefault;
|
||||
import org.eclipse.jdt.annotation.Nullable;
|
||||
|
||||
/**
|
||||
* The {@link TimeSeries} is used to transport a set of states together with their timestamp.
|
||||
* It can be used for persisting historic state or forecasts.
|
||||
*
|
||||
* @author Jan N. Klug - Initial contribution
|
||||
*/
|
||||
@NonNullByDefault
|
||||
public class TimeSeries {
|
||||
private final TreeSet<Entry> states = new TreeSet<>(Comparator.comparing(e -> e.timestamp));
|
||||
private final Policy policy;
|
||||
|
||||
public TimeSeries(Policy policy) {
|
||||
this.policy = policy;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the persistence policy of this series.
|
||||
* <p/>
|
||||
* {@link Policy#ADD} add the content to the persistence, {@link Policy#REPLACE} first removes all persisted
|
||||
* elements in the timespan given by {@link #getBegin()} and {@link #getEnd()}.
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
public Policy getPolicy() {
|
||||
return policy;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the timestamp of the first element in this series.
|
||||
*
|
||||
* @return the {@link Instant} of the first element
|
||||
*/
|
||||
public Instant getBegin() {
|
||||
return states.isEmpty() ? Instant.MAX : states.first().timestamp();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the timestamp of the last element in this series.
|
||||
*
|
||||
* @return the {@link Instant} of the last element
|
||||
*/
|
||||
public Instant getEnd() {
|
||||
return states.isEmpty() ? Instant.MIN : states.last().timestamp();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the number of elements in this series.
|
||||
*
|
||||
* @return the number of elements
|
||||
*/
|
||||
public int size() {
|
||||
return states.size();
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a new element to this series.
|
||||
* <p/>
|
||||
* Elements can be added in an arbitrary order and are sorted chronologically.
|
||||
*
|
||||
* @param timestamp an {@link Instant} for the given state
|
||||
* @param state the {@link State} at the given timestamp
|
||||
*/
|
||||
public void add(Instant timestamp, State state) {
|
||||
states.add(new Entry(timestamp, state));
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the content of this series.
|
||||
* <p/>
|
||||
* The entries are returned in chronological order, earlier entries before later entries.
|
||||
*
|
||||
* @return a {@link <Stream<Entry>} with the content of this series.
|
||||
*/
|
||||
public Stream<Entry> getStates() {
|
||||
return List.copyOf(states).stream();
|
||||
}
|
||||
|
||||
public record Entry(Instant timestamp, State state) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(@Nullable Object o) {
|
||||
if (this == o)
|
||||
return true;
|
||||
if (o == null || getClass() != o.getClass())
|
||||
return false;
|
||||
TimeSeries that = (TimeSeries) o;
|
||||
return Objects.equals(states, that.states) && policy == that.policy;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(states, policy);
|
||||
}
|
||||
|
||||
public enum Policy {
|
||||
ADD,
|
||||
REPLACE
|
||||
}
|
||||
}
|
@ -0,0 +1,66 @@
|
||||
/**
|
||||
* Copyright (c) 2010-2023 Contributors to the openHAB project
|
||||
*
|
||||
* See the NOTICE file(s) distributed with this work for additional
|
||||
* information.
|
||||
*
|
||||
* This program and the accompanying materials are made available under the
|
||||
* terms of the Eclipse Public License 2.0 which is available at
|
||||
* http://www.eclipse.org/legal/epl-2.0
|
||||
*
|
||||
* SPDX-License-Identifier: EPL-2.0
|
||||
*/
|
||||
package org.openhab.core.types;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.List;
|
||||
|
||||
import org.eclipse.jdt.annotation.NonNullByDefault;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.openhab.core.library.types.DecimalType;
|
||||
|
||||
/**
|
||||
* The {@link TimeSeriesTest} contains tests for {@link TimeSeries}
|
||||
*
|
||||
* @author Jan N. Klug - Initial contribution
|
||||
*/
|
||||
@NonNullByDefault
|
||||
public class TimeSeriesTest {
|
||||
|
||||
@Test
|
||||
public void testAdditionOrderDoesNotMatter() {
|
||||
Instant time1 = Instant.now();
|
||||
Instant time2 = time1.plusSeconds(1000);
|
||||
Instant time3 = time1.minusSeconds(1000);
|
||||
Instant time4 = time1.plusSeconds(50);
|
||||
|
||||
TimeSeries timeSeries = new TimeSeries(TimeSeries.Policy.ADD);
|
||||
assertThat(timeSeries.getPolicy(), is(TimeSeries.Policy.ADD));
|
||||
|
||||
timeSeries.add(time1, new DecimalType(time1.toEpochMilli()));
|
||||
timeSeries.add(time2, new DecimalType(time2.toEpochMilli()));
|
||||
timeSeries.add(time3, new DecimalType(time3.toEpochMilli()));
|
||||
timeSeries.add(time4, new DecimalType(time4.toEpochMilli()));
|
||||
|
||||
assertThat(timeSeries.size(), is(4));
|
||||
|
||||
// assert begin end time
|
||||
assertThat(timeSeries.getBegin(), is(time3));
|
||||
assertThat(timeSeries.getEnd(), is(time2));
|
||||
|
||||
// assert order of events and content
|
||||
List<TimeSeries.Entry> entries = timeSeries.getStates().toList();
|
||||
for (int i = 0; i < entries.size(); i++) {
|
||||
if (i > 0) {
|
||||
// assert order
|
||||
assertThat(entries.get(i).timestamp(), is(greaterThan(entries.get(i - 1).timestamp())));
|
||||
}
|
||||
assertThat(entries.get(i).timestamp().toEpochMilli(),
|
||||
is(entries.get(i).state().as(DecimalType.class).longValue()));
|
||||
}
|
||||
}
|
||||
}
|
@ -14,11 +14,13 @@ package org.openhab.core.internal.items;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.*;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.Queue;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
|
||||
import org.eclipse.jdt.annotation.NonNullByDefault;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.openhab.core.events.Event;
|
||||
@ -28,9 +30,13 @@ import org.openhab.core.items.Item;
|
||||
import org.openhab.core.items.ItemRegistry;
|
||||
import org.openhab.core.items.events.ItemEventFactory;
|
||||
import org.openhab.core.items.events.ItemStateChangedEvent;
|
||||
import org.openhab.core.items.events.ItemStateUpdatedEvent;
|
||||
import org.openhab.core.items.events.ItemTimeSeriesUpdatedEvent;
|
||||
import org.openhab.core.library.items.SwitchItem;
|
||||
import org.openhab.core.library.types.OnOffType;
|
||||
import org.openhab.core.test.java.JavaOSGiTest;
|
||||
import org.openhab.core.types.TimeSeries;
|
||||
import org.openhab.core.types.UnDefType;
|
||||
|
||||
/**
|
||||
* The {@link ItemUpdaterOSGiTest} runs inside an OSGi container and tests the {@link ItemRegistry}.
|
||||
@ -44,6 +50,8 @@ public class ItemUpdaterOSGiTest extends JavaOSGiTest {
|
||||
|
||||
private @NonNullByDefault({}) EventPublisher eventPublisher;
|
||||
private @NonNullByDefault({}) ItemRegistry itemRegistry;
|
||||
private @NonNullByDefault({}) SwitchItem switchItem;
|
||||
|
||||
private final Queue<Event> receivedEvents = new ConcurrentLinkedQueue<>();
|
||||
|
||||
@BeforeEach
|
||||
@ -55,7 +63,8 @@ public class ItemUpdaterOSGiTest extends JavaOSGiTest {
|
||||
itemRegistry = getService(ItemRegistry.class);
|
||||
assertNotNull(itemRegistry);
|
||||
|
||||
itemRegistry.add(new SwitchItem("switch"));
|
||||
switchItem = new SwitchItem("switch");
|
||||
itemRegistry.add(switchItem);
|
||||
|
||||
EventSubscriber eventSubscriber = new EventSubscriber() {
|
||||
@Override
|
||||
@ -65,12 +74,18 @@ public class ItemUpdaterOSGiTest extends JavaOSGiTest {
|
||||
|
||||
@Override
|
||||
public Set<String> getSubscribedEventTypes() {
|
||||
return Set.of(ItemStateChangedEvent.TYPE);
|
||||
return Set.of(ItemStateChangedEvent.TYPE, ItemStateUpdatedEvent.TYPE, ItemTimeSeriesUpdatedEvent.TYPE);
|
||||
}
|
||||
};
|
||||
registerService(eventSubscriber);
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
public void tearDown() {
|
||||
receivedEvents.clear();
|
||||
itemRegistry.remove(switchItem.getName());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testItemUpdaterSetsItemState() {
|
||||
eventPublisher.post(ItemEventFactory.createStateEvent("switch", OnOffType.ON));
|
||||
@ -79,32 +94,91 @@ public class ItemUpdaterOSGiTest extends JavaOSGiTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testItemUpdaterSendsStateChangedEvent() throws Exception {
|
||||
public void testItemUpdaterSendsStateUpdatedEvent() throws Exception {
|
||||
eventPublisher.post(ItemEventFactory.createStateEvent("switch", OnOffType.ON));
|
||||
|
||||
Item switchItem = itemRegistry.get("switch");
|
||||
waitForAssert(() -> assertEquals(OnOffType.ON, switchItem.getState()));
|
||||
|
||||
// wait for the initial events (updated and changed, because it was NULL before)
|
||||
waitForAssert(() -> {
|
||||
assertEquals(2, receivedEvents.size());
|
||||
ItemStateUpdatedEvent updatedEvent = (ItemStateUpdatedEvent) receivedEvents.poll();
|
||||
assertNotNull(updatedEvent);
|
||||
assertEquals(OnOffType.ON, updatedEvent.getItemState());
|
||||
ItemStateChangedEvent changedEvent = (ItemStateChangedEvent) receivedEvents.poll();
|
||||
assertNotNull(changedEvent);
|
||||
assertEquals(UnDefType.NULL, changedEvent.getOldItemState());
|
||||
assertEquals(OnOffType.ON, changedEvent.getItemState());
|
||||
});
|
||||
|
||||
// update with same value
|
||||
eventPublisher.post(ItemEventFactory.createStateEvent("switch", OnOffType.ON));
|
||||
|
||||
// wait for the updated event
|
||||
waitForAssert(() -> {
|
||||
assertEquals(1, receivedEvents.size());
|
||||
ItemStateUpdatedEvent updatedEvent = (ItemStateUpdatedEvent) receivedEvents.poll();
|
||||
assertNotNull(updatedEvent);
|
||||
assertEquals(OnOffType.ON, updatedEvent.getItemState());
|
||||
});
|
||||
|
||||
// ensure no other events send
|
||||
Thread.sleep(1000);
|
||||
assertTrue(receivedEvents.isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testItemUpdaterSendsStateChangedEvent() throws Exception {
|
||||
eventPublisher.post(ItemEventFactory.createStateEvent("switch", OnOffType.ON));
|
||||
|
||||
// wait for the initial events (updated and changed, because it was NULL before)
|
||||
waitForAssert(() -> {
|
||||
assertEquals(2, receivedEvents.size());
|
||||
ItemStateUpdatedEvent updatedEvent = (ItemStateUpdatedEvent) receivedEvents.poll();
|
||||
assertNotNull(updatedEvent);
|
||||
assertEquals(OnOffType.ON, updatedEvent.getItemState());
|
||||
ItemStateChangedEvent changedEvent = (ItemStateChangedEvent) receivedEvents.poll();
|
||||
assertNotNull(changedEvent);
|
||||
assertEquals(UnDefType.NULL, changedEvent.getOldItemState());
|
||||
assertEquals(OnOffType.ON, changedEvent.getItemState());
|
||||
});
|
||||
|
||||
// change state
|
||||
eventPublisher.post(ItemEventFactory.createStateEvent("switch", OnOffType.OFF));
|
||||
|
||||
// wait for an event that change the state from OFF to ON
|
||||
// there could be one remaining event from the 'ItemUpdater sets item state' test
|
||||
// wait for two events: the updated event and the changed event
|
||||
waitForAssert(() -> {
|
||||
assertFalse(receivedEvents.isEmpty());
|
||||
assertEquals(2, receivedEvents.size());
|
||||
ItemStateUpdatedEvent updatedEvent = (ItemStateUpdatedEvent) receivedEvents.poll();
|
||||
assertNotNull(updatedEvent);
|
||||
assertEquals(OnOffType.OFF, updatedEvent.getItemState());
|
||||
ItemStateChangedEvent changedEvent = (ItemStateChangedEvent) receivedEvents.poll();
|
||||
assertNotNull(changedEvent);
|
||||
assertEquals(OnOffType.ON, changedEvent.getOldItemState());
|
||||
assertEquals(OnOffType.OFF, changedEvent.getItemState());
|
||||
});
|
||||
|
||||
// send update for same state
|
||||
eventPublisher.post(ItemEventFactory.createStateEvent("switch", OnOffType.OFF));
|
||||
// wait a second and make sure no other events have been sent
|
||||
Thread.sleep(1000);
|
||||
assertTrue(receivedEvents.isEmpty());
|
||||
}
|
||||
|
||||
// wait a few milliseconds
|
||||
Thread.sleep(100);
|
||||
@Test
|
||||
public void testItemUpdaterSetsTimeSeries() throws InterruptedException {
|
||||
TimeSeries timeSeries = new TimeSeries(TimeSeries.Policy.ADD);
|
||||
timeSeries.add(Instant.now(), OnOffType.ON);
|
||||
eventPublisher.post(ItemEventFactory.createTimeSeriesEvent("switch", timeSeries, null));
|
||||
|
||||
// make sure no state changed event has been sent
|
||||
// wait for the event
|
||||
waitForAssert(() -> {
|
||||
assertEquals(1, receivedEvents.size());
|
||||
ItemTimeSeriesUpdatedEvent updatedEvent = (ItemTimeSeriesUpdatedEvent) receivedEvents.poll();
|
||||
assertNotNull(updatedEvent);
|
||||
assertEquals(timeSeries, updatedEvent.getTimeSeries());
|
||||
});
|
||||
|
||||
Thread.sleep(1000);
|
||||
assertTrue(receivedEvents.isEmpty());
|
||||
}
|
||||
}
|
||||
|
@ -72,12 +72,13 @@ import org.openhab.core.thing.profiles.ProfileContext;
|
||||
import org.openhab.core.thing.profiles.ProfileFactory;
|
||||
import org.openhab.core.thing.profiles.ProfileTypeProvider;
|
||||
import org.openhab.core.thing.profiles.ProfileTypeUID;
|
||||
import org.openhab.core.thing.profiles.StateProfile;
|
||||
import org.openhab.core.thing.profiles.TimeSeriesProfile;
|
||||
import org.openhab.core.thing.profiles.TriggerProfile;
|
||||
import org.openhab.core.thing.type.ChannelKind;
|
||||
import org.openhab.core.thing.type.ChannelType;
|
||||
import org.openhab.core.thing.type.ChannelTypeUID;
|
||||
import org.openhab.core.types.Command;
|
||||
import org.openhab.core.types.TimeSeries;
|
||||
|
||||
/**
|
||||
*
|
||||
@ -148,7 +149,7 @@ public class CommunicationManagerOSGiTest extends JavaOSGiTest {
|
||||
private @Mock @NonNullByDefault({}) ItemStateConverter itemStateConverterMock;
|
||||
private @Mock @NonNullByDefault({}) ProfileAdvisor profileAdvisorMock;
|
||||
private @Mock @NonNullByDefault({}) ProfileFactory profileFactoryMock;
|
||||
private @Mock @NonNullByDefault({}) StateProfile stateProfileMock;
|
||||
private @Mock @NonNullByDefault({}) TimeSeriesProfile stateProfileMock;
|
||||
private @Mock @NonNullByDefault({}) ThingHandler thingHandlerMock;
|
||||
private @Mock @NonNullByDefault({}) ThingRegistry thingRegistryMock;
|
||||
private @Mock @NonNullByDefault({}) TriggerProfile triggerProfileMock;
|
||||
@ -272,6 +273,32 @@ public class CommunicationManagerOSGiTest extends JavaOSGiTest {
|
||||
verifyNoMoreInteractions(triggerProfileMock);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTimeSeriesSingleLink() {
|
||||
TimeSeries timeSeries = new TimeSeries(TimeSeries.Policy.REPLACE);
|
||||
|
||||
manager.sendTimeSeries(STATE_CHANNEL_UID_1, timeSeries);
|
||||
|
||||
waitForAssert(() -> {
|
||||
verify(stateProfileMock).onTimeSeriesFromHandler(eq(timeSeries));
|
||||
});
|
||||
verifyNoMoreInteractions(stateProfileMock);
|
||||
verifyNoMoreInteractions(triggerProfileMock);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTimeSeriesMultiLink() {
|
||||
TimeSeries timeSeries = new TimeSeries(TimeSeries.Policy.REPLACE);
|
||||
|
||||
manager.sendTimeSeries(STATE_CHANNEL_UID_2, timeSeries);
|
||||
|
||||
waitForAssert(() -> {
|
||||
verify(stateProfileMock, times(2)).onTimeSeriesFromHandler(eq(timeSeries));
|
||||
});
|
||||
verifyNoMoreInteractions(stateProfileMock);
|
||||
verifyNoMoreInteractions(triggerProfileMock);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testItemCommandEventSingleLink() {
|
||||
manager.receive(ItemEventFactory.createCommandEvent(ITEM_NAME_2, OnOffType.ON));
|
||||
|
Loading…
Reference in New Issue
Block a user