[energidataservice] Introduce subscription-based providers (#17456)

* Introduce subscription-based providers

Signed-off-by: Jacob Laursen <jacob-github@vindvejr.dk>
Signed-off-by: Ciprian Pascu <contact@ciprianpascu.ro>
This commit is contained in:
Jacob Laursen 2024-10-11 16:14:58 +02:00 committed by Ciprian Pascu
parent 21c47c85d7
commit 05399bd819
32 changed files with 2392 additions and 812 deletions

View File

@ -27,6 +27,7 @@ import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.eclipse.jdt.annotation.NonNullByDefault;
@ -86,12 +87,12 @@ public class ApiController {
.create();
private final HttpClient httpClient;
private final TimeZoneProvider timeZoneProvider;
private final String userAgent;
private final Supplier<String> userAgentSupplier;
public ApiController(HttpClient httpClient, TimeZoneProvider timeZoneProvider) {
this.httpClient = httpClient;
this.timeZoneProvider = timeZoneProvider;
userAgent = "openHAB/" + FrameworkUtil.getBundle(this.getClass()).getVersion().toString();
this.userAgentSupplier = this::getUserAgent;
}
/**
@ -116,7 +117,7 @@ public class ApiController {
.param("start", start.toString()) //
.param("filter", "{\"" + FILTER_KEY_PRICE_AREA + "\":\"" + priceArea + "\"}") //
.param("columns", "HourUTC,SpotPrice" + currency) //
.agent(userAgent) //
.agent(userAgentSupplier.get()) //
.method(HttpMethod.GET);
if (!end.isEmpty()) {
@ -138,6 +139,10 @@ public class ApiController {
}
}
private String getUserAgent() {
return "openHAB/" + FrameworkUtil.getBundle(this.getClass()).getVersion().toString();
}
private String sendRequest(Request request, Map<String, String> properties)
throws TimeoutException, ExecutionException, InterruptedException, DataServiceException {
logger.trace("GET request for {}", request.getURI());
@ -210,7 +215,7 @@ public class ApiController {
.timeout(REQUEST_TIMEOUT_SECONDS, TimeUnit.SECONDS) //
.param("filter", mapToFilter(filterMap)) //
.param("columns", columns) //
.agent(userAgent) //
.agent(userAgentSupplier.get()) //
.method(HttpMethod.GET);
DateQueryParameter start = tariffFilter.getStart();
@ -277,7 +282,7 @@ public class ApiController {
.param("filter", "{\"" + FILTER_KEY_PRICE_AREA + "\":\"" + priceArea + "\"}") //
.param("columns", "Minutes5UTC,CO2Emission") //
.param("sort", "Minutes5UTC DESC") //
.agent(userAgent) //
.agent(userAgentSupplier.get()) //
.method(HttpMethod.GET);
try {

View File

@ -1,297 +0,0 @@
/**
* Copyright (c) 2010-2024 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.binding.energidataservice.internal;
import static org.openhab.binding.energidataservice.internal.EnergiDataServiceBindingConstants.*;
import java.math.BigDecimal;
import java.time.Clock;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Currency;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.openhab.binding.energidataservice.internal.api.dto.DatahubPricelistRecord;
import org.openhab.binding.energidataservice.internal.api.dto.ElspotpriceRecord;
/**
* The {@link CacheManager} is responsible for maintaining a cache of received
* data from Energi Data Service.
*
* @author Jacob Laursen - Initial contribution
*/
@NonNullByDefault
public class CacheManager {
public static final int NUMBER_OF_HISTORIC_HOURS = 24;
public static final int SPOT_PRICE_MAX_CACHE_SIZE = 24 + 11 + NUMBER_OF_HISTORIC_HOURS;
public static final int TARIFF_MAX_CACHE_SIZE = 24 * 2 + NUMBER_OF_HISTORIC_HOURS;
private final Clock clock;
private final PriceListParser priceListParser = new PriceListParser();
private Map<Instant, BigDecimal> spotPriceMap = new ConcurrentHashMap<>(SPOT_PRICE_MAX_CACHE_SIZE);
private Map<DatahubTariff, Collection<DatahubPricelistRecord>> datahubRecordsMap = new HashMap<>();
private Map<DatahubTariff, Map<Instant, BigDecimal>> tariffsMap = new ConcurrentHashMap<>();
public CacheManager() {
this(Clock.systemDefaultZone());
}
public CacheManager(Clock clock) {
this.clock = clock.withZone(NORD_POOL_TIMEZONE);
for (DatahubTariff datahubTariff : DatahubTariff.values()) {
datahubRecordsMap.put(datahubTariff, new ArrayList<>());
tariffsMap.put(datahubTariff, new ConcurrentHashMap<>(TARIFF_MAX_CACHE_SIZE));
}
}
/**
* Clear all cached data.
*/
public void clear() {
spotPriceMap.clear();
for (DatahubTariff datahubTariff : DatahubTariff.values()) {
Collection<DatahubPricelistRecord> datahubRecords = datahubRecordsMap.get(datahubTariff);
if (datahubRecords != null) {
datahubRecords.clear();
}
Map<Instant, BigDecimal> tariffs = tariffsMap.get(datahubTariff);
if (tariffs != null) {
tariffs.clear();
}
}
}
/**
* Convert and cache the supplied {@link ElspotpriceRecord}s.
*
* @param records The records as received from Energi Data Service.
* @param currency The currency in which the records were requested.
*/
public void putSpotPrices(ElspotpriceRecord[] records, Currency currency) {
boolean isDKK = EnergiDataServiceBindingConstants.CURRENCY_DKK.equals(currency);
for (ElspotpriceRecord record : records) {
spotPriceMap.put(record.hour(),
(isDKK ? record.spotPriceDKK() : record.spotPriceEUR()).divide(BigDecimal.valueOf(1000)));
}
cleanup();
}
/**
* Replace current "raw"/unprocessed tariff records in cache.
* Map of hourly tariffs will be updated automatically.
*
* @param records to cache
*/
public void putTariffs(DatahubTariff datahubTariff, Collection<DatahubPricelistRecord> records) {
Collection<DatahubPricelistRecord> datahubRecords = datahubRecordsMap.get(datahubTariff);
if (datahubRecords == null) {
throw new IllegalStateException("Datahub records not initialized");
}
putDatahubRecords(datahubRecords, records);
updateTariffs(datahubTariff);
}
private void putDatahubRecords(Collection<DatahubPricelistRecord> destination,
Collection<DatahubPricelistRecord> source) {
LocalDateTime localHourStart = LocalDateTime.now(clock.withZone(DATAHUB_TIMEZONE))
.minus(NUMBER_OF_HISTORIC_HOURS, ChronoUnit.HOURS).truncatedTo(ChronoUnit.HOURS);
destination.clear();
destination.addAll(source.stream().filter(r -> !r.validTo().isBefore(localHourStart)).toList());
}
/**
* Update map of hourly tariffs from internal cache.
*/
public void updateTariffs(DatahubTariff datahubTariff) {
Collection<DatahubPricelistRecord> datahubRecords = datahubRecordsMap.get(datahubTariff);
if (datahubRecords == null) {
throw new IllegalStateException("Datahub records not initialized");
}
tariffsMap.put(datahubTariff, priceListParser.toHourly(datahubRecords));
cleanup();
}
/**
* Get current spot price.
*
* @return spot price currently valid
*/
public @Nullable BigDecimal getSpotPrice() {
return getSpotPrice(Instant.now(clock));
}
/**
* Get spot price valid at provided instant.
*
* @param time {@link Instant} for which to get the spot price
* @return spot price at given time or null if not available
*/
public @Nullable BigDecimal getSpotPrice(Instant time) {
return spotPriceMap.get(getHourStart(time));
}
/**
* Get map of all cached spot prices.
*
* @return spot prices currently available, {@link #NUMBER_OF_HISTORIC_HOURS} back
*/
public Map<Instant, BigDecimal> getSpotPrices() {
return new HashMap<>(spotPriceMap);
}
/**
* Get current tariff.
*
* @return tariff currently valid
*/
public @Nullable BigDecimal getTariff(DatahubTariff datahubTariff) {
return getTariff(datahubTariff, Instant.now(clock));
}
/**
* Get tariff valid at provided instant.
*
* @param time {@link Instant} for which to get the tariff
* @return tariff at given time or null if not available
*/
public @Nullable BigDecimal getTariff(DatahubTariff datahubTariff, Instant time) {
Map<Instant, BigDecimal> tariffs = tariffsMap.get(datahubTariff);
if (tariffs == null) {
throw new IllegalStateException("Tariffs not initialized");
}
return tariffs.get(getHourStart(time));
}
/**
* Get map of all cached tariffs.
*
* @return tariffs currently available, {@link #NUMBER_OF_HISTORIC_HOURS} back
*/
public Map<Instant, BigDecimal> getTariffs(DatahubTariff datahubTariff) {
Map<Instant, BigDecimal> tariffs = tariffsMap.get(datahubTariff);
if (tariffs == null) {
throw new IllegalStateException("Tariffs not initialized");
}
return new HashMap<>(tariffs);
}
/**
* Get number of future spot prices including current hour.
*
* @return number of future spot prices
*/
public long getNumberOfFutureSpotPrices() {
Instant currentHourStart = getCurrentHourStart();
return spotPriceMap.entrySet().stream().filter(p -> !p.getKey().isBefore(currentHourStart)).count();
}
/**
* Check if historic spot prices ({@link #NUMBER_OF_HISTORIC_HOURS}) are cached.
*
* @return true if historic spot prices are cached
*/
public boolean areHistoricSpotPricesCached() {
return arePricesCached(spotPriceMap, getCurrentHourStart().minus(1, ChronoUnit.HOURS));
}
/**
* Check if all current spot prices are cached taking into consideration that next day's spot prices
* should be available at 13:00 CET.
*
* @return true if spot prices are fully cached
*/
public boolean areSpotPricesFullyCached() {
Instant end = ZonedDateTime.of(LocalDate.now(clock), LocalTime.of(23, 0), NORD_POOL_TIMEZONE).toInstant();
LocalTime now = LocalTime.now(clock);
if (now.isAfter(DAILY_REFRESH_TIME_CET)) {
end = end.plus(24, ChronoUnit.HOURS);
}
return arePricesCached(spotPriceMap, end);
}
private boolean arePricesCached(Map<Instant, BigDecimal> priceMap, Instant end) {
for (Instant hourStart = getFirstHourStart(); hourStart.compareTo(end) <= 0; hourStart = hourStart.plus(1,
ChronoUnit.HOURS)) {
if (priceMap.get(hourStart) == null) {
return false;
}
}
return true;
}
/**
* Check if we have "raw" tariff records cached which are valid tomorrow.
*
* @return true if tariff records for tomorrow are cached
*/
public boolean areTariffsValidTomorrow(DatahubTariff datahubTariff) {
Collection<DatahubPricelistRecord> datahubRecords = datahubRecordsMap.get(datahubTariff);
if (datahubRecords == null) {
throw new IllegalStateException("Datahub records not initialized");
}
return isValidNextDay(datahubRecords);
}
/**
* Remove historic prices.
*/
public void cleanup() {
Instant firstHourStart = getFirstHourStart();
spotPriceMap.entrySet().removeIf(entry -> entry.getKey().isBefore(firstHourStart));
for (Map<Instant, BigDecimal> tariffs : tariffsMap.values()) {
tariffs.entrySet().removeIf(entry -> entry.getKey().isBefore(firstHourStart));
}
}
private boolean isValidNextDay(Collection<DatahubPricelistRecord> records) {
LocalDateTime localHourStart = LocalDateTime.now(EnergiDataServiceBindingConstants.DATAHUB_TIMEZONE)
.truncatedTo(ChronoUnit.HOURS);
LocalDateTime localMidnight = localHourStart.plusDays(1).truncatedTo(ChronoUnit.DAYS);
return records.stream().anyMatch(r -> r.validTo().isAfter(localMidnight));
}
private Instant getCurrentHourStart() {
return getHourStart(Instant.now(clock));
}
private Instant getFirstHourStart() {
return getHourStart(Instant.now(clock).minus(NUMBER_OF_HISTORIC_HOURS, ChronoUnit.HOURS));
}
private Instant getHourStart(Instant instant) {
return instant.truncatedTo(ChronoUnit.HOURS);
}
}

View File

@ -17,6 +17,8 @@ import java.time.LocalTime;
import java.time.ZoneId;
import java.util.Currency;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.openhab.core.thing.ChannelUID;
@ -62,6 +64,12 @@ public class EnergiDataServiceBindingConstants {
CHANNEL_SYSTEM_TARIFF, CHANNEL_TRANSMISSION_GRID_TARIFF, CHANNEL_ELECTRICITY_TAX,
CHANNEL_REDUCED_ELECTRICITY_TAX);
public static final Set<String> CO2_EMISSION_CHANNELS = Set.of(CHANNEL_CO2_EMISSION_PROGNOSIS,
CHANNEL_CO2_EMISSION_REALTIME);
public static final Set<String> SUBSCRIPTION_CHANNELS = Stream
.concat(ELECTRICITY_CHANNELS.stream(), CO2_EMISSION_CHANNELS.stream()).collect(Collectors.toSet());
// List of all properties
public static final String PROPERTY_REMAINING_CALLS = "remainingCalls";
public static final String PROPERTY_TOTAL_CALLS = "totalCalls";

View File

@ -26,6 +26,8 @@ import java.util.concurrent.ConcurrentHashMap;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.openhab.binding.energidataservice.internal.api.dto.DatahubPricelistRecord;
import org.openhab.binding.energidataservice.internal.provider.cache.DatahubPriceSubscriptionCache;
import org.openhab.binding.energidataservice.internal.provider.cache.ElectricityPriceSubscriptionCache;
/**
* Parses results from {@link org.openhab.binding.energidataservice.internal.api.dto.DatahubPricelistRecords}
@ -47,7 +49,8 @@ public class PriceListParser {
}
public Map<Instant, BigDecimal> toHourly(Collection<DatahubPricelistRecord> records) {
Instant firstHourStart = Instant.now(clock).minus(CacheManager.NUMBER_OF_HISTORIC_HOURS, ChronoUnit.HOURS)
Instant firstHourStart = Instant.now(clock)
.minus(ElectricityPriceSubscriptionCache.NUMBER_OF_HISTORIC_HOURS, ChronoUnit.HOURS)
.truncatedTo(ChronoUnit.HOURS);
Instant lastHourStart = Instant.now(clock).truncatedTo(ChronoUnit.HOURS).plus(2, ChronoUnit.DAYS)
.truncatedTo(ChronoUnit.DAYS);
@ -57,7 +60,7 @@ public class PriceListParser {
public Map<Instant, BigDecimal> toHourly(Collection<DatahubPricelistRecord> records, Instant firstHourStart,
Instant lastHourStart) {
Map<Instant, BigDecimal> totalMap = new ConcurrentHashMap<>(CacheManager.TARIFF_MAX_CACHE_SIZE);
Map<Instant, BigDecimal> totalMap = new ConcurrentHashMap<>(DatahubPriceSubscriptionCache.MAX_CACHE_SIZE);
records.stream().map(record -> record.chargeTypeCode()).distinct().forEach(chargeTypeCode -> {
Map<Instant, BigDecimal> currentMap = toHourly(records, chargeTypeCode, firstHourStart, lastHourStart);
for (Entry<Instant, BigDecimal> current : currentMap.entrySet()) {
@ -74,7 +77,7 @@ public class PriceListParser {
private Map<Instant, BigDecimal> toHourly(Collection<DatahubPricelistRecord> records, String chargeTypeCode,
Instant firstHourStart, Instant lastHourStart) {
Map<Instant, BigDecimal> tariffMap = new ConcurrentHashMap<>(CacheManager.TARIFF_MAX_CACHE_SIZE);
Map<Instant, BigDecimal> tariffMap = new ConcurrentHashMap<>(DatahubPriceSubscriptionCache.MAX_CACHE_SIZE);
LocalDateTime previousValidFrom = LocalDateTime.MAX;
LocalDateTime previousValidTo = LocalDateTime.MIN;

View File

@ -13,6 +13,7 @@
package org.openhab.binding.energidataservice.internal.api;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
/**
* Charge type code for DatahubPricelist dataset.
@ -43,6 +44,16 @@ public class ChargeTypeCode {
return code;
}
@Override
public boolean equals(@Nullable Object o) {
return o == this || (o instanceof ChargeTypeCode other && code.equals(other.code));
}
@Override
public int hashCode() {
return code.hashCode();
}
public static ChargeTypeCode of(String code) {
return new ChargeTypeCode(code);
}

View File

@ -13,9 +13,11 @@
package org.openhab.binding.energidataservice.internal.api;
import java.util.Collection;
import java.util.Objects;
import java.util.Set;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
/**
* Filter for the DatahubPricelist dataset.
@ -69,4 +71,27 @@ public class DatahubTariffFilter {
public DateQueryParameter getEnd() {
return end;
}
@Override
public boolean equals(@Nullable Object o) {
if (o == this) {
return true;
}
if (!(o instanceof DatahubTariffFilter other)) {
return false;
}
return chargeTypeCodes.equals(other.chargeTypeCodes) && notes.equals(other.notes) && start.equals(other.start)
&& end.equals(other.end);
}
@Override
public int hashCode() {
return Objects.hash(chargeTypeCodes, notes, start, end);
}
@Override
public String toString() {
return chargeTypeCodes.toString() + "," + notes.toString() + "," + start + "," + end;
}
}

View File

@ -14,6 +14,7 @@ package org.openhab.binding.energidataservice.internal.api;
import java.time.Duration;
import java.time.LocalDate;
import java.util.Objects;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
@ -36,16 +37,16 @@ public class DateQueryParameter {
private DateQueryParameter() {
}
public DateQueryParameter(LocalDate date) {
private DateQueryParameter(LocalDate date) {
this.date = date;
}
public DateQueryParameter(DateQueryParameterType dateType, Duration offset) {
private DateQueryParameter(DateQueryParameterType dateType, Duration offset) {
this.dateType = dateType;
this.offset = offset;
}
public DateQueryParameter(DateQueryParameterType dateType) {
private DateQueryParameter(DateQueryParameterType dateType) {
this.dateType = dateType;
}
@ -68,6 +69,24 @@ public class DateQueryParameter {
return "null";
}
@Override
public boolean equals(@Nullable Object o) {
if (o == this) {
return true;
}
if (!(o instanceof DateQueryParameter other)) {
return false;
}
return Objects.equals(date, other.date) && Objects.equals(offset, other.offset)
&& Objects.equals(dateType, other.dateType);
}
@Override
public int hashCode() {
return Objects.hash(date, offset, dateType);
}
public boolean isEmpty() {
return this == EMPTY;
}

View File

@ -13,6 +13,7 @@
package org.openhab.binding.energidataservice.internal.api;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
/**
* Global Location Number.
@ -43,6 +44,16 @@ public class GlobalLocationNumber {
return gln;
}
@Override
public boolean equals(@Nullable Object o) {
return o == this || (o instanceof GlobalLocationNumber other && gln.equals(other.gln));
}
@Override
public int hashCode() {
return gln.hashCode();
}
public boolean isEmpty() {
return this == EMPTY;
}

View File

@ -126,9 +126,9 @@ public class EnergiDataServiceCommandExtension extends AbstractConsoleCommandExt
LocalDate startDate;
LocalDate endDate;
private int ARGUMENT_POSITION_PRICE_COMPONENT = 1;
private int ARGUMENT_POSITION_START_DATE = 2;
private int ARGUMENT_POSITION_END_DATE = 3;
private static final int ARGUMENT_POSITION_PRICE_COMPONENT = 1;
private static final int ARGUMENT_POSITION_START_DATE = 2;
private static final int ARGUMENT_POSITION_END_DATE = 3;
ParsedUpdateParameters(String[] args) {
if (args.length < 3 || args.length > 4) {

View File

@ -20,6 +20,8 @@ import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.eclipse.jetty.client.HttpClient;
import org.openhab.binding.energidataservice.internal.handler.EnergiDataServiceHandler;
import org.openhab.binding.energidataservice.internal.provider.Co2EmissionProvider;
import org.openhab.binding.energidataservice.internal.provider.ElectricityPriceProvider;
import org.openhab.core.i18n.TimeZoneProvider;
import org.openhab.core.io.net.http.HttpClientFactory;
import org.openhab.core.thing.Thing;
@ -46,13 +48,19 @@ public class EnergiDataServiceHandlerFactory extends BaseThingHandlerFactory {
private final HttpClient httpClient;
private final TimeZoneProvider timeZoneProvider;
private final ElectricityPriceProvider electricityPriceProvider;
private final Co2EmissionProvider co2EmissionProvider;
@Activate
public EnergiDataServiceHandlerFactory(final @Reference HttpClientFactory httpClientFactory,
final @Reference TimeZoneProvider timeZoneProvider, ComponentContext componentContext) {
final @Reference TimeZoneProvider timeZoneProvider,
final @Reference ElectricityPriceProvider electricityPriceProvider,
final @Reference Co2EmissionProvider co2EmissionProvider, ComponentContext componentContext) {
super.activate(componentContext);
this.httpClient = httpClientFactory.getCommonHttpClient();
this.timeZoneProvider = timeZoneProvider;
this.electricityPriceProvider = electricityPriceProvider;
this.co2EmissionProvider = co2EmissionProvider;
}
@Override
@ -65,7 +73,8 @@ public class EnergiDataServiceHandlerFactory extends BaseThingHandlerFactory {
ThingTypeUID thingTypeUID = thing.getThingTypeUID();
if (THING_TYPE_SERVICE.equals(thingTypeUID)) {
return new EnergiDataServiceHandler(thing, httpClient, timeZoneProvider);
return new EnergiDataServiceHandler(thing, httpClient, timeZoneProvider, electricityPriceProvider,
co2EmissionProvider);
}
return null;

View File

@ -19,50 +19,50 @@ import java.math.BigDecimal;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.Currency;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.measure.Unit;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.http.HttpStatus;
import org.openhab.binding.energidataservice.internal.ApiController;
import org.openhab.binding.energidataservice.internal.CacheManager;
import org.openhab.binding.energidataservice.internal.DatahubTariff;
import org.openhab.binding.energidataservice.internal.EnergiDataServiceBindingConstants;
import org.openhab.binding.energidataservice.internal.PriceListParser;
import org.openhab.binding.energidataservice.internal.action.EnergiDataServiceActions;
import org.openhab.binding.energidataservice.internal.api.ChargeType;
import org.openhab.binding.energidataservice.internal.api.ChargeTypeCode;
import org.openhab.binding.energidataservice.internal.api.DatahubTariffFilter;
import org.openhab.binding.energidataservice.internal.api.DatahubTariffFilterFactory;
import org.openhab.binding.energidataservice.internal.api.Dataset;
import org.openhab.binding.energidataservice.internal.api.DateQueryParameter;
import org.openhab.binding.energidataservice.internal.api.DateQueryParameterType;
import org.openhab.binding.energidataservice.internal.api.GlobalLocationNumber;
import org.openhab.binding.energidataservice.internal.api.dto.CO2EmissionRecord;
import org.openhab.binding.energidataservice.internal.api.dto.DatahubPricelistRecord;
import org.openhab.binding.energidataservice.internal.api.dto.ElspotpriceRecord;
import org.openhab.binding.energidataservice.internal.config.DatahubPriceConfiguration;
import org.openhab.binding.energidataservice.internal.config.EnergiDataServiceConfiguration;
import org.openhab.binding.energidataservice.internal.exception.DataServiceException;
import org.openhab.binding.energidataservice.internal.retry.RetryPolicyFactory;
import org.openhab.binding.energidataservice.internal.retry.RetryStrategy;
import org.openhab.binding.energidataservice.internal.provider.Co2EmissionProvider;
import org.openhab.binding.energidataservice.internal.provider.ElectricityPriceProvider;
import org.openhab.binding.energidataservice.internal.provider.cache.ElectricityPriceSubscriptionCache;
import org.openhab.binding.energidataservice.internal.provider.listener.Co2EmissionListener;
import org.openhab.binding.energidataservice.internal.provider.listener.ElectricityPriceListener;
import org.openhab.binding.energidataservice.internal.provider.subscription.Co2EmissionSubscription;
import org.openhab.binding.energidataservice.internal.provider.subscription.DatahubPriceSubscription;
import org.openhab.binding.energidataservice.internal.provider.subscription.ElectricityPriceSubscription;
import org.openhab.binding.energidataservice.internal.provider.subscription.SpotPriceSubscription;
import org.openhab.binding.energidataservice.internal.provider.subscription.Subscription;
import org.openhab.core.i18n.TimeZoneProvider;
import org.openhab.core.library.types.DecimalType;
import org.openhab.core.library.types.QuantityType;
@ -90,29 +90,29 @@ import org.slf4j.LoggerFactory;
* @author Jacob Laursen - Initial contribution
*/
@NonNullByDefault
public class EnergiDataServiceHandler extends BaseThingHandler {
public class EnergiDataServiceHandler extends BaseThingHandler
implements ElectricityPriceListener, Co2EmissionListener {
private static final Duration emissionPrognosisJobInterval = Duration.ofMinutes(15);
private static final Duration emissionRealtimeJobInterval = Duration.ofMinutes(5);
private static final Map<String, DatahubTariff> CHANNEL_ID_TO_DATAHUB_TARIFF = Arrays.stream(DatahubTariff.values())
.collect(Collectors.toMap(DatahubTariff::getChannelId, Function.identity()));
private final Logger logger = LoggerFactory.getLogger(EnergiDataServiceHandler.class);
private final TimeZoneProvider timeZoneProvider;
private final ApiController apiController;
private final CacheManager cacheManager;
private final ElectricityPriceProvider electricityPriceProvider;
private final Co2EmissionProvider co2EmissionProvider;
private final Set<Subscription> activeSubscriptions = new HashSet<>();
private EnergiDataServiceConfiguration config;
private RetryStrategy retryPolicy = RetryPolicyFactory.initial();
private boolean realtimeEmissionsFetchedFirstTime = false;
private @Nullable ScheduledFuture<?> refreshPriceFuture;
private @Nullable ScheduledFuture<?> refreshEmissionPrognosisFuture;
private @Nullable ScheduledFuture<?> refreshEmissionRealtimeFuture;
private @Nullable ScheduledFuture<?> priceUpdateFuture;
public EnergiDataServiceHandler(Thing thing, HttpClient httpClient, TimeZoneProvider timeZoneProvider) {
public EnergiDataServiceHandler(final Thing thing, final HttpClient httpClient,
final TimeZoneProvider timeZoneProvider, final ElectricityPriceProvider electricityPriceProvider,
final Co2EmissionProvider co2EmissionProvider) {
super(thing);
this.timeZoneProvider = timeZoneProvider;
this.apiController = new ApiController(httpClient, timeZoneProvider);
this.cacheManager = new CacheManager();
this.electricityPriceProvider = electricityPriceProvider;
this.co2EmissionProvider = co2EmissionProvider;
// Default configuration
this.config = new EnergiDataServiceConfiguration();
@ -126,12 +126,15 @@ public class EnergiDataServiceHandler extends BaseThingHandler {
String channelId = channelUID.getId();
if (ELECTRICITY_CHANNELS.contains(channelId)) {
refreshElectricityPrices();
} else if (CHANNEL_CO2_EMISSION_PROGNOSIS.equals(channelId)) {
rescheduleEmissionPrognosisJob();
} else if (CHANNEL_CO2_EMISSION_REALTIME.equals(channelId)) {
realtimeEmissionsFetchedFirstTime = false;
rescheduleEmissionRealtimeJob();
if (!electricityPriceProvider.forceRefreshPrices(getChannelSubscription(channelId))) {
// All subscriptions are automatically notified upon actual changes after download.
// If cached values are the same, we will update the requested channel directly.
updateChannelFromCache(getChannelSubscription(channelId), channelId);
}
} else if (CO2_EMISSION_CHANNELS.contains(channelId)) {
Subscription subscription = getChannelSubscription(channelId);
unsubscribe(subscription);
subscribe(subscription);
}
}
@ -157,42 +160,19 @@ public class EnergiDataServiceHandler extends BaseThingHandler {
return;
}
updateStatus(ThingStatus.UNKNOWN);
refreshPriceFuture = scheduler.schedule(this::refreshElectricityPrices, 0, TimeUnit.SECONDS);
if (isLinked(CHANNEL_CO2_EMISSION_PROGNOSIS)) {
rescheduleEmissionPrognosisJob();
}
if (isLinked(CHANNEL_CO2_EMISSION_REALTIME)) {
rescheduleEmissionRealtimeJob();
if (SUBSCRIPTION_CHANNELS.stream().anyMatch(this::isLinked)) {
updateStatus(ThingStatus.UNKNOWN);
subscribeLinkedChannels();
} else {
updateStatus(ThingStatus.ONLINE);
}
}
@Override
public void dispose() {
ScheduledFuture<?> refreshPriceFuture = this.refreshPriceFuture;
if (refreshPriceFuture != null) {
refreshPriceFuture.cancel(true);
this.refreshPriceFuture = null;
}
ScheduledFuture<?> refreshEmissionPrognosisFuture = this.refreshEmissionPrognosisFuture;
if (refreshEmissionPrognosisFuture != null) {
refreshEmissionPrognosisFuture.cancel(true);
this.refreshEmissionPrognosisFuture = null;
}
ScheduledFuture<?> refreshEmissionRealtimeFuture = this.refreshEmissionRealtimeFuture;
if (refreshEmissionRealtimeFuture != null) {
refreshEmissionRealtimeFuture.cancel(true);
this.refreshEmissionRealtimeFuture = null;
}
ScheduledFuture<?> priceUpdateFuture = this.priceUpdateFuture;
if (priceUpdateFuture != null) {
priceUpdateFuture.cancel(true);
this.priceUpdateFuture = null;
}
cacheManager.clear();
electricityPriceProvider.unsubscribe(this);
co2EmissionProvider.unsubscribe(this);
activeSubscriptions.clear();
}
@Override
@ -202,13 +182,30 @@ public class EnergiDataServiceHandler extends BaseThingHandler {
@Override
public void channelLinked(ChannelUID channelUID) {
super.channelLinked(channelUID);
String channelId = channelUID.getId();
if (!SUBSCRIPTION_CHANNELS.contains(channelId)) {
// Do not trigger REFRESH command for subscription-based channels, we will trigger
// a state update ourselves through relevant provider.
super.channelLinked(channelUID);
}
if (!"DK1".equals(config.priceArea) && !"DK2".equals(config.priceArea)
&& (CHANNEL_CO2_EMISSION_PROGNOSIS.equals(channelUID.getId())
|| CHANNEL_CO2_EMISSION_REALTIME.contains(channelUID.getId()))) {
logger.warn("Item linked to channel '{}', but price area {} is not supported for this channel",
channelUID.getId(), config.priceArea);
if (ELECTRICITY_CHANNELS.contains(channelId)) {
Subscription subscription = getChannelSubscription(channelId);
if (subscribe(subscription)) {
logger.debug("First item linked to channel '{}', starting {}", channelId, subscription);
} else {
updateChannelFromCache(subscription, channelId);
}
} else if (CO2_EMISSION_CHANNELS.contains(channelId)) {
if ("DK1".equals(config.priceArea) || "DK2".equals(config.priceArea)) {
Subscription subscription = getChannelSubscription(channelId);
if (subscribe(subscription)) {
logger.debug("First item linked to channel '{}', starting {}", channelId, subscription);
}
} else {
logger.warn("Item linked to channel '{}', but price area {} is not supported for this channel",
channelId, config.priceArea);
}
}
}
@ -216,115 +213,146 @@ public class EnergiDataServiceHandler extends BaseThingHandler {
public void channelUnlinked(ChannelUID channelUID) {
super.channelUnlinked(channelUID);
if (CHANNEL_CO2_EMISSION_PROGNOSIS.equals(channelUID.getId()) && !isLinked(CHANNEL_CO2_EMISSION_PROGNOSIS)) {
logger.debug("No more items linked to channel '{}', stopping emission prognosis refresh job",
channelUID.getId());
ScheduledFuture<?> refreshEmissionPrognosisFuture = this.refreshEmissionPrognosisFuture;
if (refreshEmissionPrognosisFuture != null) {
refreshEmissionPrognosisFuture.cancel(true);
this.refreshEmissionPrognosisFuture = null;
}
} else if (CHANNEL_CO2_EMISSION_REALTIME.contains(channelUID.getId())
&& !isLinked(CHANNEL_CO2_EMISSION_REALTIME)) {
logger.debug("No more items linked to channel '{}', stopping realtime emission refresh job",
channelUID.getId());
ScheduledFuture<?> refreshEmissionRealtimeFuture = this.refreshEmissionRealtimeFuture;
if (refreshEmissionRealtimeFuture != null) {
refreshEmissionRealtimeFuture.cancel(true);
this.refreshEmissionRealtimeFuture = null;
}
String channelId = channelUID.getId();
if (SUBSCRIPTION_CHANNELS.contains(channelId) && !isLinked(channelId)) {
Subscription subscription = getChannelSubscription(channelId);
logger.debug("No more items linked to channel '{}', stopping {}", channelId, subscription);
unsubscribe(getChannelSubscription(channelId));
}
}
private void refreshElectricityPrices() {
RetryStrategy retryPolicy;
try {
boolean spotPricesDownloaded = false;
if (isLinked(CHANNEL_SPOT_PRICE)) {
spotPricesDownloaded = downloadSpotPrices();
}
for (DatahubTariff datahubTariff : DatahubTariff.values()) {
if (isLinked(datahubTariff.getChannelId())) {
downloadTariffs(datahubTariff);
}
}
updateStatus(ThingStatus.ONLINE);
updatePrices();
updateElectricityTimeSeriesFromCache();
if (isLinked(CHANNEL_SPOT_PRICE)) {
long numberOfFutureSpotPrices = cacheManager.getNumberOfFutureSpotPrices();
LocalTime now = LocalTime.now(NORD_POOL_TIMEZONE);
if (numberOfFutureSpotPrices >= 13 || (numberOfFutureSpotPrices == 12
&& now.isAfter(DAILY_REFRESH_TIME_CET.minusHours(1)) && now.isBefore(DAILY_REFRESH_TIME_CET))) {
if (spotPricesDownloaded) {
triggerChannel(CHANNEL_EVENT, EVENT_DAY_AHEAD_AVAILABLE);
}
retryPolicy = RetryPolicyFactory.atFixedTime(DAILY_REFRESH_TIME_CET, NORD_POOL_TIMEZONE);
} else {
logger.warn("Spot prices are not available, retry scheduled (see details in Thing properties)");
retryPolicy = RetryPolicyFactory.whenExpectedSpotPriceDataMissing();
}
} else {
retryPolicy = RetryPolicyFactory.atFixedTime(LocalTime.MIDNIGHT, timeZoneProvider.getTimeZone());
}
} catch (DataServiceException e) {
if (e.getHttpStatus() != 0) {
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.OFFLINE.COMMUNICATION_ERROR,
HttpStatus.getCode(e.getHttpStatus()).getMessage());
} else {
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.OFFLINE.COMMUNICATION_ERROR, e.getMessage());
}
if (e.getCause() != null) {
logger.debug("Error retrieving prices", e);
}
retryPolicy = RetryPolicyFactory.fromThrowable(e);
} catch (InterruptedException e) {
logger.debug("Refresh job interrupted");
Thread.currentThread().interrupt();
return;
}
reschedulePriceRefreshJob(retryPolicy);
@Override
public void onDayAheadAvailable() {
triggerChannel(CHANNEL_EVENT, EVENT_DAY_AHEAD_AVAILABLE);
}
private boolean downloadSpotPrices() throws InterruptedException, DataServiceException {
if (cacheManager.areSpotPricesFullyCached()) {
logger.debug("Cached spot prices still valid, skipping download.");
@Override
public void onCurrentSpotPrice(@Nullable BigDecimal price, Currency currency) {
updateStatus(ThingStatus.ONLINE);
updatePriceState(CHANNEL_SPOT_PRICE, price, currency);
}
@Override
public void onSpotPrices(Map<Instant, BigDecimal> spotPrices, Currency currency) {
updateStatus(ThingStatus.ONLINE);
updatePriceTimeSeries(CHANNEL_SPOT_PRICE, spotPrices, currency, false);
}
@Override
public void onCurrentTariff(DatahubTariff datahubTariff, @Nullable BigDecimal tariff) {
updateStatus(ThingStatus.ONLINE);
updatePriceState(datahubTariff.getChannelId(), tariff, CURRENCY_DKK);
}
@Override
public void onTariffs(DatahubTariff datahubTariff, Map<Instant, BigDecimal> tariffs) {
updateStatus(ThingStatus.ONLINE);
updatePriceTimeSeries(datahubTariff.getChannelId(), tariffs, CURRENCY_DKK, true);
}
@Override
public void onCurrentEmission(Co2EmissionSubscription.Type type, BigDecimal emission) {
updateStatus(ThingStatus.ONLINE);
updateState(type == Co2EmissionSubscription.Type.Prognosis ? CHANNEL_CO2_EMISSION_PROGNOSIS
: CHANNEL_CO2_EMISSION_REALTIME, new QuantityType<>(emission, Units.GRAM_PER_KILOWATT_HOUR));
}
@Override
public void onEmissions(Co2EmissionSubscription.Type type, Map<Instant, BigDecimal> emissions) {
updateStatus(ThingStatus.ONLINE);
TimeSeries timeSeries = new TimeSeries(REPLACE);
for (Entry<Instant, BigDecimal> emission : emissions.entrySet()) {
timeSeries.add(emission.getKey(), new QuantityType<>(emission.getValue(), Units.GRAM_PER_KILOWATT_HOUR));
}
sendTimeSeries(type == Co2EmissionSubscription.Type.Prognosis ? CHANNEL_CO2_EMISSION_PROGNOSIS
: CHANNEL_CO2_EMISSION_REALTIME, timeSeries);
}
@Override
public void onPropertiesUpdated(Map<String, String> properties) {
updateProperties(properties);
}
@Override
public void onCommunicationError(@Nullable String description) {
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.OFFLINE.COMMUNICATION_ERROR, description);
}
@Override
public String toString() {
return this.thing.getUID().getAsString();
}
private void subscribeLinkedChannels() {
if (isLinked(CHANNEL_SPOT_PRICE)) {
subscribe(getChannelSubscription(CHANNEL_SPOT_PRICE));
}
Arrays.stream(DatahubTariff.values()).filter(tariff -> isLinked(tariff.getChannelId()))
.map(tariff -> DatahubPriceSubscription.of(tariff, getGlobalLocationNumber(tariff),
getDatahubTariffFilter(tariff)))
.forEach(this::subscribe);
if ("DK1".equals(config.priceArea) || "DK2".equals(config.priceArea)) {
CO2_EMISSION_CHANNELS.stream().filter(this::isLinked)
.forEach(channelId -> subscribe(getChannelSubscription(channelId)));
}
}
private boolean subscribe(Subscription subscription) {
if (activeSubscriptions.add(subscription)) {
if (subscription instanceof ElectricityPriceSubscription) {
electricityPriceProvider.subscribe(this, subscription);
} else if (subscription instanceof Co2EmissionSubscription) {
co2EmissionProvider.subscribe(this, subscription);
} else {
throw new IllegalArgumentException(subscription.getClass().getName() + " is not supported");
}
return true;
} else {
return false;
}
DateQueryParameter start;
if (cacheManager.areHistoricSpotPricesCached()) {
start = DateQueryParameter.of(DateQueryParameterType.UTC_NOW);
} else {
start = DateQueryParameter.of(DateQueryParameterType.UTC_NOW,
Duration.ofHours(-CacheManager.NUMBER_OF_HISTORIC_HOURS));
}
Map<String, String> properties = editProperties();
try {
ElspotpriceRecord[] spotPriceRecords = apiController.getSpotPrices(config.priceArea, config.getCurrency(),
start, DateQueryParameter.EMPTY, properties);
cacheManager.putSpotPrices(spotPriceRecords, config.getCurrency());
} finally {
updateProperties(properties);
}
return true;
}
private void downloadTariffs(DatahubTariff datahubTariff) throws InterruptedException, DataServiceException {
GlobalLocationNumber globalLocationNumber = getGlobalLocationNumber(datahubTariff);
if (globalLocationNumber.isEmpty()) {
return;
private void unsubscribe(Subscription subscription) {
if (activeSubscriptions.remove(subscription)) {
if (subscription instanceof ElectricityPriceSubscription) {
electricityPriceProvider.unsubscribe(this, subscription);
} else if (subscription instanceof Co2EmissionSubscription) {
co2EmissionProvider.unsubscribe(this, subscription);
} else {
throw new IllegalArgumentException(subscription.getClass().getName() + " is not supported");
}
}
if (cacheManager.areTariffsValidTomorrow(datahubTariff)) {
logger.debug("Cached tariffs of type {} still valid, skipping download.", datahubTariff);
cacheManager.updateTariffs(datahubTariff);
}
private Subscription getChannelSubscription(String channelId) {
if (CHANNEL_SPOT_PRICE.equals(channelId)) {
return SpotPriceSubscription.of(config.priceArea, config.getCurrency());
} else if (CHANNEL_CO2_EMISSION_PROGNOSIS.equals(channelId)) {
return Co2EmissionSubscription.of(config.priceArea, Co2EmissionSubscription.Type.Prognosis);
} else if (CHANNEL_CO2_EMISSION_REALTIME.equals(channelId)) {
return Co2EmissionSubscription.of(config.priceArea, Co2EmissionSubscription.Type.Realtime);
} else {
DatahubTariffFilter filter = getDatahubTariffFilter(datahubTariff);
cacheManager.putTariffs(datahubTariff, downloadPriceLists(globalLocationNumber, filter));
DatahubTariff tariff = CHANNEL_ID_TO_DATAHUB_TARIFF.get(channelId);
if (tariff != null) {
return DatahubPriceSubscription.of(tariff, getGlobalLocationNumber(tariff),
getDatahubTariffFilter(tariff));
}
}
throw new IllegalArgumentException("Could not create subscription for channel id " + channelId);
}
private void updateChannelFromCache(Subscription subscription, String channelId) {
BigDecimal currentPrice = electricityPriceProvider.getCurrentPriceIfCached(subscription);
Map<Instant, BigDecimal> prices = electricityPriceProvider.getPricesIfCached(subscription);
if (subscription instanceof SpotPriceSubscription) {
updatePriceState(channelId, currentPrice, config.getCurrency());
updatePriceTimeSeries(channelId, prices, config.getCurrency(), false);
} else if (subscription instanceof DatahubPriceSubscription) {
updatePriceState(channelId, currentPrice, CURRENCY_DKK);
updatePriceTimeSeries(channelId, prices, CURRENCY_DKK, true);
}
}
@ -345,16 +373,6 @@ public class EnergiDataServiceHandler extends BaseThingHandler {
};
}
private Collection<DatahubPricelistRecord> downloadPriceLists(GlobalLocationNumber globalLocationNumber,
DatahubTariffFilter filter) throws InterruptedException, DataServiceException {
Map<String, String> properties = editProperties();
try {
return apiController.getDatahubPriceLists(globalLocationNumber, ChargeType.Tariff, filter, properties);
} finally {
updateProperties(properties);
}
}
private DatahubTariffFilter getGridTariffFilter() {
Channel channel = getThing().getChannel(CHANNEL_GRID_TARIFF);
if (channel == null) {
@ -387,110 +405,8 @@ public class EnergiDataServiceHandler extends BaseThingHandler {
start);
}
return new DatahubTariffFilter(filter,
DateQueryParameter.of(filter.getStart(), Duration.ofHours(-CacheManager.NUMBER_OF_HISTORIC_HOURS)));
}
private void refreshCo2EmissionPrognosis() {
try {
updateCo2Emissions(Dataset.CO2EmissionPrognosis, CHANNEL_CO2_EMISSION_PROGNOSIS,
DateQueryParameter.of(DateQueryParameterType.UTC_NOW, Duration.ofMinutes(-5)));
updateStatus(ThingStatus.ONLINE);
} catch (DataServiceException e) {
if (e.getHttpStatus() != 0) {
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.OFFLINE.COMMUNICATION_ERROR,
HttpStatus.getCode(e.getHttpStatus()).getMessage());
} else {
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.OFFLINE.COMMUNICATION_ERROR, e.getMessage());
}
if (e.getCause() != null) {
logger.debug("Error retrieving CO2 emission prognosis", e);
}
} catch (InterruptedException e) {
logger.debug("Emission prognosis refresh job interrupted");
Thread.currentThread().interrupt();
return;
}
}
private void refreshCo2EmissionRealtime() {
try {
updateCo2Emissions(Dataset.CO2Emission, CHANNEL_CO2_EMISSION_REALTIME,
DateQueryParameter.of(DateQueryParameterType.UTC_NOW,
realtimeEmissionsFetchedFirstTime ? Duration.ofMinutes(-5) : Duration.ofHours(-24)));
realtimeEmissionsFetchedFirstTime = true;
updateStatus(ThingStatus.ONLINE);
} catch (DataServiceException e) {
if (e.getHttpStatus() != 0) {
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.OFFLINE.COMMUNICATION_ERROR,
HttpStatus.getCode(e.getHttpStatus()).getMessage());
} else {
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.OFFLINE.COMMUNICATION_ERROR, e.getMessage());
}
if (e.getCause() != null) {
logger.debug("Error retrieving CO2 realtime emissions", e);
}
} catch (InterruptedException e) {
logger.debug("Emission realtime refresh job interrupted");
Thread.currentThread().interrupt();
return;
}
}
private void updateCo2Emissions(Dataset dataset, String channelId, DateQueryParameter dateQueryParameter)
throws InterruptedException, DataServiceException {
if (!"DK1".equals(config.priceArea) && !"DK2".equals(config.priceArea)) {
// Dataset is only for Denmark.
return;
}
Map<String, String> properties = editProperties();
CO2EmissionRecord[] emissionRecords = apiController.getCo2Emissions(dataset, config.priceArea,
dateQueryParameter, properties);
updateProperties(properties);
TimeSeries timeSeries = new TimeSeries(REPLACE);
Instant now = Instant.now();
if (dataset == Dataset.CO2Emission && emissionRecords.length > 0) {
// Records are sorted descending, first record is current.
updateState(channelId, new QuantityType<>(emissionRecords[0].emission(), Units.GRAM_PER_KILOWATT_HOUR));
}
for (CO2EmissionRecord emissionRecord : emissionRecords) {
State state = new QuantityType<>(emissionRecord.emission(), Units.GRAM_PER_KILOWATT_HOUR);
timeSeries.add(emissionRecord.start(), state);
if (dataset == Dataset.CO2EmissionPrognosis && now.compareTo(emissionRecord.start()) >= 0
&& now.compareTo(emissionRecord.end()) < 0) {
updateState(channelId, state);
}
}
sendTimeSeries(channelId, timeSeries);
}
private void updatePrices() {
cacheManager.cleanup();
updateCurrentSpotPrice();
Arrays.stream(DatahubTariff.values())
.forEach(tariff -> updateCurrentTariff(tariff.getChannelId(), cacheManager.getTariff(tariff)));
reschedulePriceUpdateJob();
}
private void updateCurrentSpotPrice() {
if (!isLinked(CHANNEL_SPOT_PRICE)) {
return;
}
BigDecimal spotPrice = cacheManager.getSpotPrice();
updatePriceState(CHANNEL_SPOT_PRICE, spotPrice, config.getCurrency());
}
private void updateCurrentTariff(String channelId, @Nullable BigDecimal tariff) {
if (!isLinked(channelId)) {
return;
}
updatePriceState(channelId, tariff, CURRENCY_DKK);
return new DatahubTariffFilter(filter, DateQueryParameter.of(filter.getStart(),
Duration.ofHours(-ElectricityPriceSubscriptionCache.NUMBER_OF_HISTORIC_HOURS)));
}
private void updatePriceState(String channelID, @Nullable BigDecimal price, Currency currency) {
@ -529,7 +445,7 @@ public class EnergiDataServiceHandler extends BaseThingHandler {
Currency currency = config.getCurrency();
ElspotpriceRecord[] spotPriceRecords = apiController.getSpotPrices(config.priceArea, currency,
DateQueryParameter.of(startDate), DateQueryParameter.of(endDate.plusDays(1)), properties);
boolean isDKK = EnergiDataServiceBindingConstants.CURRENCY_DKK.equals(currency);
boolean isDKK = CURRENCY_DKK.equals(currency);
TimeSeries spotPriceTimeSeries = new TimeSeries(REPLACE);
if (spotPriceRecords.length == 0) {
return 0;
@ -584,12 +500,13 @@ public class EnergiDataServiceHandler extends BaseThingHandler {
return updatePriceTimeSeries(datahubTariff.getChannelId(), tariffMap, CURRENCY_DKK, true);
}
private void updateElectricityTimeSeriesFromCache() {
updatePriceTimeSeries(CHANNEL_SPOT_PRICE, cacheManager.getSpotPrices(), config.getCurrency(), false);
for (DatahubTariff datahubTariff : DatahubTariff.values()) {
String channelId = datahubTariff.getChannelId();
updatePriceTimeSeries(channelId, cacheManager.getTariffs(datahubTariff), CURRENCY_DKK, true);
private Collection<DatahubPricelistRecord> downloadPriceLists(GlobalLocationNumber globalLocationNumber,
DatahubTariffFilter filter) throws InterruptedException, DataServiceException {
Map<String, String> properties = editProperties();
try {
return apiController.getDatahubPriceLists(globalLocationNumber, ChargeType.Tariff, filter, properties);
} finally {
updateProperties(properties);
}
}
@ -634,19 +551,7 @@ public class EnergiDataServiceHandler extends BaseThingHandler {
* @return Map of future spot prices
*/
public Map<Instant, BigDecimal> getSpotPrices() {
try {
downloadSpotPrices();
} catch (DataServiceException e) {
if (logger.isDebugEnabled()) {
logger.warn("Error retrieving spot prices", e);
} else {
logger.warn("Error retrieving spot prices: {}", e.getMessage());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return cacheManager.getSpotPrices();
return this.getPrices(getChannelSubscription(CHANNEL_SPOT_PRICE));
}
/**
@ -656,19 +561,20 @@ public class EnergiDataServiceHandler extends BaseThingHandler {
* @return Map of future tariffs
*/
public Map<Instant, BigDecimal> getTariffs(DatahubTariff datahubTariff) {
return this.getPrices(DatahubPriceSubscription.of(datahubTariff, getGlobalLocationNumber(datahubTariff),
getDatahubTariffFilter(datahubTariff)));
}
private Map<Instant, BigDecimal> getPrices(Subscription subscription) {
try {
downloadTariffs(datahubTariff);
return electricityPriceProvider.getPrices(subscription);
} catch (DataServiceException e) {
if (logger.isDebugEnabled()) {
logger.warn("Error retrieving tariffs", e);
} else {
logger.warn("Error retrieving tariffs of type {}: {}", datahubTariff, e.getMessage());
}
logger.warn("Error retrieving prices for subscription {}: {}", subscription, e.getMessage());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return cacheManager.getTariffs(datahubTariff);
return Map.of();
}
/**
@ -679,66 +585,4 @@ public class EnergiDataServiceHandler extends BaseThingHandler {
public boolean isReducedElectricityTax() {
return config.reducedElectricityTax;
}
private void reschedulePriceUpdateJob() {
ScheduledFuture<?> priceUpdateJob = this.priceUpdateFuture;
if (priceUpdateJob != null) {
// Do not interrupt ourselves.
priceUpdateJob.cancel(false);
this.priceUpdateFuture = null;
}
Instant now = Instant.now();
long millisUntilNextClockHour = Duration
.between(now, now.plus(1, ChronoUnit.HOURS).truncatedTo(ChronoUnit.HOURS)).toMillis() + 1;
this.priceUpdateFuture = scheduler.schedule(this::updatePrices, millisUntilNextClockHour,
TimeUnit.MILLISECONDS);
logger.debug("Price update job rescheduled in {} milliseconds", millisUntilNextClockHour);
}
private void reschedulePriceRefreshJob(RetryStrategy retryPolicy) {
// Preserve state of previous retry policy when configuration is the same.
if (!retryPolicy.equals(this.retryPolicy)) {
this.retryPolicy = retryPolicy;
}
ScheduledFuture<?> refreshJob = this.refreshPriceFuture;
long secondsUntilNextRefresh = this.retryPolicy.getDuration().getSeconds();
Instant timeOfNextRefresh = Instant.now().plusSeconds(secondsUntilNextRefresh);
this.refreshPriceFuture = scheduler.schedule(this::refreshElectricityPrices, secondsUntilNextRefresh,
TimeUnit.SECONDS);
logger.debug("Price refresh job rescheduled in {} seconds: {}", secondsUntilNextRefresh, timeOfNextRefresh);
DateTimeFormatter formatter = DateTimeFormatter.ofPattern(PROPERTY_DATETIME_FORMAT);
updateProperty(PROPERTY_NEXT_CALL, LocalDateTime.ofInstant(timeOfNextRefresh, timeZoneProvider.getTimeZone())
.truncatedTo(ChronoUnit.SECONDS).format(formatter));
if (refreshJob != null) {
refreshJob.cancel(true);
}
}
private void rescheduleEmissionPrognosisJob() {
logger.debug("Scheduling emission prognosis refresh job now and every {}", emissionPrognosisJobInterval);
ScheduledFuture<?> refreshEmissionPrognosisFuture = this.refreshEmissionPrognosisFuture;
if (refreshEmissionPrognosisFuture != null) {
refreshEmissionPrognosisFuture.cancel(true);
}
this.refreshEmissionPrognosisFuture = scheduler.scheduleWithFixedDelay(this::refreshCo2EmissionPrognosis, 0,
emissionPrognosisJobInterval.toSeconds(), TimeUnit.SECONDS);
}
private void rescheduleEmissionRealtimeJob() {
logger.debug("Scheduling emission realtime refresh job now and every {}", emissionRealtimeJobInterval);
ScheduledFuture<?> refreshEmissionFuture = this.refreshEmissionRealtimeFuture;
if (refreshEmissionFuture != null) {
refreshEmissionFuture.cancel(true);
}
this.refreshEmissionRealtimeFuture = scheduler.scheduleWithFixedDelay(this::refreshCo2EmissionRealtime, 0,
emissionRealtimeJobInterval.toSeconds(), TimeUnit.SECONDS);
}
}

View File

@ -0,0 +1,108 @@
/**
* Copyright (c) 2010-2024 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.binding.energidataservice.internal.provider;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.openhab.binding.energidataservice.internal.provider.listener.SubscriptionListener;
import org.openhab.binding.energidataservice.internal.provider.subscription.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The {@link AbstractProvider} is responsible for managing subscriptions.
*
* @author Jacob Laursen - Initial contribution
*/
@NonNullByDefault
public abstract class AbstractProvider<L extends SubscriptionListener> {
protected final Map<L, Set<Subscription>> listenerToSubscriptions = new ConcurrentHashMap<>();
protected final Map<Subscription, Set<L>> subscriptionToListeners = new ConcurrentHashMap<>();
private final Logger logger = LoggerFactory.getLogger(AbstractProvider.class);
protected boolean subscribeInternal(L listener, Subscription subscription) {
Set<Subscription> subscriptionsForListener = Objects
.requireNonNull(listenerToSubscriptions.computeIfAbsent(listener, k -> ConcurrentHashMap.newKeySet()));
if (subscriptionsForListener.contains(subscription)) {
throw new IllegalArgumentException(
"Duplicate listener registration for " + listener.getClass().getName() + ": " + subscription);
}
subscriptionsForListener.add(subscription);
Set<L> listenersForSubscription = subscriptionToListeners.get(subscription);
boolean isFirstDistinctSubscription = false;
if (listenersForSubscription == null) {
isFirstDistinctSubscription = true;
listenersForSubscription = ConcurrentHashMap.newKeySet();
subscriptionToListeners.put(subscription, listenersForSubscription);
}
listenersForSubscription.add(listener);
logger.debug("Listener {} started {}", listener, subscription);
return isFirstDistinctSubscription;
}
protected boolean unsubscribeInternal(L listener, Subscription subscription) {
Set<Subscription> listenerSubscriptions = listenerToSubscriptions.get(listener);
if (listenerSubscriptions == null || !listenerSubscriptions.contains(subscription)) {
throw new IllegalArgumentException(
"Listener is not subscribed to the specified subscription: " + subscription);
}
listenerSubscriptions.remove(subscription);
if (listenerSubscriptions.isEmpty()) {
listenerToSubscriptions.remove(listener);
}
Set<L> listenersForSubscription = subscriptionToListeners.get(subscription);
boolean isLastDistinctSubscription = false;
if (listenersForSubscription != null) {
listenersForSubscription.remove(listener);
if (listenersForSubscription.isEmpty()) {
subscriptionToListeners.remove(subscription);
isLastDistinctSubscription = true;
}
}
logger.debug("Listener {} stopped {}", listener, subscription);
return isLastDistinctSubscription;
}
public void unsubscribe(L listener) {
Set<Subscription> listenerSubscriptions = listenerToSubscriptions.get(listener);
if (listenerSubscriptions == null) {
return;
}
for (Subscription subscription : listenerSubscriptions) {
unsubscribeInternal(listener, subscription);
}
}
protected Set<L> getListeners(Subscription subscription) {
return subscriptionToListeners.getOrDefault(subscription, ConcurrentHashMap.newKeySet());
}
}

View File

@ -0,0 +1,223 @@
/**
* Copyright (c) 2010-2024 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.binding.energidataservice.internal.provider;
import java.math.BigDecimal;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.eclipse.jetty.http.HttpStatus;
import org.openhab.binding.energidataservice.internal.ApiController;
import org.openhab.binding.energidataservice.internal.api.Dataset;
import org.openhab.binding.energidataservice.internal.api.DateQueryParameter;
import org.openhab.binding.energidataservice.internal.api.DateQueryParameterType;
import org.openhab.binding.energidataservice.internal.api.dto.CO2EmissionRecord;
import org.openhab.binding.energidataservice.internal.exception.DataServiceException;
import org.openhab.binding.energidataservice.internal.provider.listener.Co2EmissionListener;
import org.openhab.binding.energidataservice.internal.provider.subscription.Co2EmissionSubscription;
import org.openhab.binding.energidataservice.internal.provider.subscription.Subscription;
import org.openhab.core.i18n.TimeZoneProvider;
import org.openhab.core.io.net.http.HttpClientFactory;
import org.openhab.core.scheduler.PeriodicScheduler;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The {@link Co2EmissionProvider} is responsible for fetching CO2 emission
* data and providing it to subscribed listeners.
*
* @author Jacob Laursen - Initial contribution
*/
@NonNullByDefault
@Component(service = Co2EmissionProvider.class)
public class Co2EmissionProvider extends AbstractProvider<Co2EmissionListener> {
private static final Duration EMISSION_PROGNOSIS_JOB_INTERVAL = Duration.ofMinutes(15);
private static final Duration EMISSION_REALTIME_JOB_INTERVAL = Duration.ofMinutes(5);
private final Logger logger = LoggerFactory.getLogger(Co2EmissionProvider.class);
private final PeriodicScheduler scheduler;
private final ApiController apiController;
private boolean realtimeEmissionsFetchedFirstTime = false;
private @Nullable ScheduledFuture<?> refreshEmissionPrognosisFuture;
private @Nullable ScheduledFuture<?> refreshEmissionRealtimeFuture;
@Activate
public Co2EmissionProvider(final @Reference PeriodicScheduler scheduler,
final @Reference HttpClientFactory httpClientFactory, final @Reference TimeZoneProvider timeZoneProvider) {
this.scheduler = scheduler;
this.apiController = new ApiController(httpClientFactory.getCommonHttpClient(), timeZoneProvider);
}
@Deactivate
public void deactivate() {
stopJobs();
}
public void subscribe(Co2EmissionListener listener, Subscription subscription) {
if (!(subscription instanceof Co2EmissionSubscription co2EmissionSubscription)) {
throw new IllegalArgumentException(subscription.getClass().getName() + " is not supported");
}
if (!"DK1".equals(co2EmissionSubscription.getPriceArea())
&& !"DK2".equals(co2EmissionSubscription.getPriceArea())) {
// Dataset is only for Denmark.
throw new IllegalArgumentException("Only price areas DK1 and DK2 are supported");
}
subscribeInternal(listener, subscription);
if (Co2EmissionSubscription.Type.Prognosis == co2EmissionSubscription.getType()) {
rescheduleEmissionPrognosisJob();
} else if (Co2EmissionSubscription.Type.Realtime == co2EmissionSubscription.getType()) {
rescheduleEmissionRealtimeJob();
}
}
public void unsubscribe(Co2EmissionListener listener, Subscription subscription) {
unsubscribeInternal(listener, subscription);
if (!subscriptionToListeners.keySet().stream().filter(Co2EmissionSubscription.class::isInstance)
.map(Co2EmissionSubscription.class::cast)
.anyMatch(s -> s.getType() == Co2EmissionSubscription.Type.Prognosis)) {
logger.trace("Last prognosis subscriber, stop job");
stopPrognosisJob();
}
if (!subscriptionToListeners.keySet().stream().filter(Co2EmissionSubscription.class::isInstance)
.map(Co2EmissionSubscription.class::cast)
.anyMatch(s -> s.getType() == Co2EmissionSubscription.Type.Realtime)) {
logger.trace("Last realtime subscriber, stop job");
stopRealtimeJob();
realtimeEmissionsFetchedFirstTime = false;
}
}
private void stopJobs() {
stopPrognosisJob();
stopRealtimeJob();
}
private void stopPrognosisJob() {
ScheduledFuture<?> refreshEmissionPrognosisFuture = this.refreshEmissionPrognosisFuture;
if (refreshEmissionPrognosisFuture != null) {
refreshEmissionPrognosisFuture.cancel(true);
this.refreshEmissionPrognosisFuture = null;
}
}
private void stopRealtimeJob() {
ScheduledFuture<?> refreshEmissionRealtimeFuture = this.refreshEmissionRealtimeFuture;
if (refreshEmissionRealtimeFuture != null) {
refreshEmissionRealtimeFuture.cancel(true);
this.refreshEmissionRealtimeFuture = null;
}
}
private void rescheduleEmissionPrognosisJob() {
logger.debug("Scheduling emission prognosis refresh job now and every {}", EMISSION_PROGNOSIS_JOB_INTERVAL);
stopPrognosisJob();
refreshEmissionPrognosisFuture = scheduler.schedule(this::refreshCo2EmissionPrognosis, Duration.ZERO,
EMISSION_PROGNOSIS_JOB_INTERVAL);
}
private void rescheduleEmissionRealtimeJob() {
logger.debug("Scheduling emission realtime refresh job now and every {}", EMISSION_REALTIME_JOB_INTERVAL);
stopRealtimeJob();
refreshEmissionRealtimeFuture = scheduler.schedule(this::refreshCo2EmissionRealtime, Duration.ZERO,
EMISSION_REALTIME_JOB_INTERVAL);
}
private void refreshCo2EmissionPrognosis() {
refreshCo2Emission(Co2EmissionSubscription.Type.Prognosis);
}
private void refreshCo2EmissionRealtime() {
refreshCo2Emission(Co2EmissionSubscription.Type.Realtime);
}
private void refreshCo2Emission(Co2EmissionSubscription.Type type) {
try {
for (Subscription subscription : subscriptionToListeners.keySet()) {
if (!(subscription instanceof Co2EmissionSubscription co2EmissionSubscription)
|| co2EmissionSubscription.getType() != type) {
continue;
}
updateCo2Emissions(co2EmissionSubscription,
DateQueryParameter.of(DateQueryParameterType.UTC_NOW,
realtimeEmissionsFetchedFirstTime || type == Co2EmissionSubscription.Type.Prognosis
? Duration.ofMinutes(-5)
: Duration.ofHours(-24)));
if (type == Co2EmissionSubscription.Type.Realtime) {
realtimeEmissionsFetchedFirstTime = true;
}
}
} catch (DataServiceException e) {
if (e.getHttpStatus() != 0) {
listenerToSubscriptions.keySet().forEach(
listener -> listener.onCommunicationError(HttpStatus.getCode(e.getHttpStatus()).getMessage()));
} else {
listenerToSubscriptions.keySet().forEach(listener -> listener.onCommunicationError(e.getMessage()));
}
if (e.getCause() != null) {
logger.debug("Error retrieving CO2 emissions", e);
}
} catch (InterruptedException e) {
logger.debug("Emission refresh job {} interrupted", type);
Thread.currentThread().interrupt();
return;
}
}
private void updateCo2Emissions(Co2EmissionSubscription subscription, DateQueryParameter dateQueryParameter)
throws InterruptedException, DataServiceException {
Dataset dataset = subscription.getType().getDataset();
Map<String, String> properties = new HashMap<>();
CO2EmissionRecord[] emissionRecords = apiController.getCo2Emissions(dataset, subscription.getPriceArea(),
dateQueryParameter, properties);
Set<Co2EmissionListener> listeners = getListeners(subscription);
listenerToSubscriptions.keySet().forEach(listener -> listener.onPropertiesUpdated(properties));
Instant now = Instant.now();
if (dataset == Dataset.CO2Emission && emissionRecords.length > 0) {
// Records are sorted descending, first record is current.
listeners.forEach(
listener -> listener.onCurrentEmission(subscription.getType(), emissionRecords[0].emission()));
}
Map<Instant, BigDecimal> emissions = new HashMap<>();
for (CO2EmissionRecord emissionRecord : emissionRecords) {
emissions.put(emissionRecord.start(), emissionRecord.emission());
if (dataset == Dataset.CO2EmissionPrognosis && now.compareTo(emissionRecord.start()) >= 0
&& now.compareTo(emissionRecord.end()) < 0) {
listeners.forEach(
listener -> listener.onCurrentEmission(subscription.getType(), emissionRecord.emission()));
}
}
listeners.forEach(listener -> listener.onEmissions(subscription.getType(), emissions));
}
}

View File

@ -0,0 +1,442 @@
/**
* Copyright (c) 2010-2024 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.binding.energidataservice.internal.provider;
import static org.openhab.binding.energidataservice.internal.EnergiDataServiceBindingConstants.*;
import java.math.BigDecimal;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.eclipse.jetty.http.HttpStatus;
import org.openhab.binding.energidataservice.internal.ApiController;
import org.openhab.binding.energidataservice.internal.api.ChargeType;
import org.openhab.binding.energidataservice.internal.api.DateQueryParameter;
import org.openhab.binding.energidataservice.internal.api.DateQueryParameterType;
import org.openhab.binding.energidataservice.internal.api.GlobalLocationNumber;
import org.openhab.binding.energidataservice.internal.api.dto.DatahubPricelistRecord;
import org.openhab.binding.energidataservice.internal.api.dto.ElspotpriceRecord;
import org.openhab.binding.energidataservice.internal.exception.DataServiceException;
import org.openhab.binding.energidataservice.internal.provider.cache.DatahubPriceSubscriptionCache;
import org.openhab.binding.energidataservice.internal.provider.cache.ElectricityPriceSubscriptionCache;
import org.openhab.binding.energidataservice.internal.provider.cache.SpotPriceSubscriptionCache;
import org.openhab.binding.energidataservice.internal.provider.cache.SubscriptionDataCache;
import org.openhab.binding.energidataservice.internal.provider.listener.ElectricityPriceListener;
import org.openhab.binding.energidataservice.internal.provider.subscription.DatahubPriceSubscription;
import org.openhab.binding.energidataservice.internal.provider.subscription.ElectricityPriceSubscription;
import org.openhab.binding.energidataservice.internal.provider.subscription.SpotPriceSubscription;
import org.openhab.binding.energidataservice.internal.provider.subscription.Subscription;
import org.openhab.binding.energidataservice.internal.retry.RetryPolicyFactory;
import org.openhab.binding.energidataservice.internal.retry.RetryStrategy;
import org.openhab.core.i18n.TimeZoneProvider;
import org.openhab.core.io.net.http.HttpClientFactory;
import org.openhab.core.scheduler.Scheduler;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The {@link ElectricityPriceProvider} is responsible for fetching electricity
* prices and providing them to subscribed listeners.
*
* @author Jacob Laursen - Initial contribution
*/
@NonNullByDefault
@Component(service = ElectricityPriceProvider.class)
public class ElectricityPriceProvider extends AbstractProvider<ElectricityPriceListener> {
private final Logger logger = LoggerFactory.getLogger(ElectricityPriceProvider.class);
private final TimeZoneProvider timeZoneProvider;
private final Scheduler scheduler;
private final ApiController apiController;
private final Map<Subscription, SubscriptionDataCache<?>> subscriptionDataCaches = new ConcurrentHashMap<>();
private @Nullable ScheduledFuture<?> refreshFuture;
private @Nullable ScheduledFuture<?> priceUpdateFuture;
private RetryStrategy retryPolicy = RetryPolicyFactory.initial();
@Activate
public ElectricityPriceProvider(final @Reference Scheduler scheduler,
final @Reference HttpClientFactory httpClientFactory, final @Reference TimeZoneProvider timeZoneProvider) {
this.scheduler = scheduler;
this.timeZoneProvider = timeZoneProvider;
this.apiController = new ApiController(httpClientFactory.getCommonHttpClient(), timeZoneProvider);
}
@Deactivate
public void deactivate() {
stopJobs();
}
public void subscribe(ElectricityPriceListener listener, Subscription subscription) {
if (!(subscription instanceof ElectricityPriceSubscription)) {
throw new IllegalArgumentException(subscription.getClass().getName() + " is not supported");
}
boolean isFirstDistinctSubscription = subscribeInternal(listener, subscription);
if (isFirstDistinctSubscription) {
ScheduledFuture<?> refreshFuture = this.refreshFuture;
if (refreshFuture != null) {
refreshFuture.cancel(true);
}
this.refreshFuture = scheduler.at(this::refreshElectricityPrices, Instant.now());
} else {
publishCurrentPriceFromCache(subscription, Set.of(listener));
publishPricesFromCache(subscription, Set.of(listener));
}
}
public void unsubscribe(ElectricityPriceListener listener, Subscription subscription) {
boolean isLastDistinctSubscription = unsubscribeInternal(listener, subscription);
if (isLastDistinctSubscription) {
subscriptionDataCaches.remove(subscription);
}
if (subscriptionToListeners.isEmpty()) {
logger.trace("Last subscriber, stop jobs");
stopJobs();
}
}
private void stopJobs() {
ScheduledFuture<?> refreshFuture = this.refreshFuture;
if (refreshFuture != null) {
refreshFuture.cancel(true);
this.refreshFuture = null;
}
ScheduledFuture<?> priceUpdateFuture = this.priceUpdateFuture;
if (priceUpdateFuture != null) {
priceUpdateFuture.cancel(true);
this.priceUpdateFuture = null;
}
}
private void refreshElectricityPrices() {
RetryStrategy retryPolicy;
try {
Set<ElectricityPriceListener> spotPricesUpdatedListeners = new HashSet<>();
boolean spotPricesSubscribed = false;
long numberOfFutureSpotPrices = 0;
for (Entry<Subscription, Set<ElectricityPriceListener>> subscriptionListener : subscriptionToListeners
.entrySet()) {
Subscription subscription = subscriptionListener.getKey();
Set<ElectricityPriceListener> listeners = subscriptionListener.getValue();
boolean pricesUpdated = downloadPrices(subscription, false);
if (subscription instanceof SpotPriceSubscription) {
spotPricesSubscribed = true;
if (pricesUpdated) {
spotPricesUpdatedListeners.addAll(listeners);
}
long numberOfFutureSpotPricesForSubscription = getSpotPriceSubscriptionDataCache(subscription)
.getNumberOfFuturePrices();
if (numberOfFutureSpotPrices == 0
|| numberOfFutureSpotPricesForSubscription < numberOfFutureSpotPrices) {
numberOfFutureSpotPrices = numberOfFutureSpotPricesForSubscription;
}
}
updateCurrentPrices(subscription);
publishPricesFromCache(subscription, listeners);
}
reschedulePriceUpdateJob();
if (spotPricesSubscribed) {
LocalTime now = LocalTime.now(NORD_POOL_TIMEZONE);
if (numberOfFutureSpotPrices >= 13 || (numberOfFutureSpotPrices == 12
&& now.isAfter(DAILY_REFRESH_TIME_CET.minusHours(1)) && now.isBefore(DAILY_REFRESH_TIME_CET))) {
spotPricesUpdatedListeners.forEach(listener -> listener.onDayAheadAvailable());
retryPolicy = RetryPolicyFactory.atFixedTime(DAILY_REFRESH_TIME_CET, NORD_POOL_TIMEZONE);
} else {
logger.warn("Spot prices are not available, retry scheduled (see details in Thing properties)");
retryPolicy = RetryPolicyFactory.whenExpectedSpotPriceDataMissing();
}
} else {
retryPolicy = RetryPolicyFactory.atFixedTime(LocalTime.MIDNIGHT, timeZoneProvider.getTimeZone());
}
} catch (DataServiceException e) {
if (e.getHttpStatus() != 0) {
listenerToSubscriptions.keySet().forEach(
listener -> listener.onCommunicationError(HttpStatus.getCode(e.getHttpStatus()).getMessage()));
} else {
listenerToSubscriptions.keySet().forEach(listener -> listener.onCommunicationError(e.getMessage()));
}
if (e.getCause() != null) {
logger.debug("Error retrieving prices", e);
}
retryPolicy = RetryPolicyFactory.fromThrowable(e);
} catch (InterruptedException e) {
logger.debug("Refresh job interrupted");
Thread.currentThread().interrupt();
return;
}
reschedulePriceRefreshJob(retryPolicy);
}
/**
* Get current price if cached, otherwise null.
*
* @param subscription
* @return current price
*/
public @Nullable BigDecimal getCurrentPriceIfCached(Subscription subscription) {
return getSubscriptionDataCache(subscription).get(Instant.now());
}
/**
* Get prices if cached, otherwise null.
*
* @param subscription
* @return Map of prices
*/
public Map<Instant, BigDecimal> getPricesIfCached(Subscription subscription) {
return getSubscriptionDataCache(subscription).get();
}
/**
* Force refresh prices for {@link Subscription} even if already cached.
* The prices are not returned, but will be stored in the cache and can
* be obtained by {@link #getCurrentPriceIfCached(Subscription)}
* or {@link #getPricesIfCached(Subscription)}.
*
* @return true if cached values were changed as a result of the refresh
*/
public boolean forceRefreshPrices(Subscription subscription) {
try {
return downloadPrices(subscription, true);
} catch (DataServiceException e) {
logger.debug("Error force retrieving prices", e);
return false;
} catch (InterruptedException e) {
logger.debug("Force refresh interrupted");
Thread.currentThread().interrupt();
return false;
}
}
/**
* Get all prices for given {@link Subscription}.
* If the prices are not already cached, they will be fetched
* from the service.
*
* @param subscription Subscription for which to get prices
* @return Map of available prices
* @throws InterruptedException
* @throws DataServiceException
*/
public Map<Instant, BigDecimal> getPrices(Subscription subscription)
throws InterruptedException, DataServiceException {
downloadPrices(subscription, false);
return getSubscriptionDataCache(subscription).get();
}
private boolean downloadPrices(Subscription subscription, boolean force)
throws InterruptedException, DataServiceException {
if (subscription instanceof SpotPriceSubscription spotPriceSubscription) {
return downloadSpotPrices(spotPriceSubscription, false);
} else if (subscription instanceof DatahubPriceSubscription datahubPriceSubscription) {
return downloadTariffs(datahubPriceSubscription, false);
}
throw new IllegalArgumentException("Subscription " + subscription + " is not supported");
}
private boolean downloadSpotPrices(SpotPriceSubscription subscription, boolean force)
throws InterruptedException, DataServiceException {
SpotPriceSubscriptionCache cache = getSpotPriceSubscriptionDataCache(subscription);
if (!force && cache.arePricesFullyCached()) {
logger.debug("Cached spot prices still valid, skipping download.");
return false;
}
DateQueryParameter start;
if (!force && cache.areHistoricPricesCached()) {
start = DateQueryParameter.of(DateQueryParameterType.UTC_NOW);
} else {
start = DateQueryParameter.of(DateQueryParameterType.UTC_NOW,
Duration.ofHours(-ElectricityPriceSubscriptionCache.NUMBER_OF_HISTORIC_HOURS));
}
Map<String, String> properties = new HashMap<>();
boolean isUpdated = false;
try {
ElspotpriceRecord[] spotPriceRecords = apiController.getSpotPrices(subscription.getPriceArea(),
subscription.getCurrency(), start, DateQueryParameter.EMPTY, properties);
isUpdated = cache.put(spotPriceRecords);
} finally {
listenerToSubscriptions.keySet().forEach(listener -> listener.onPropertiesUpdated(properties));
}
return isUpdated;
}
private boolean downloadTariffs(DatahubPriceSubscription subscription, boolean force)
throws InterruptedException, DataServiceException {
GlobalLocationNumber globalLocationNumber = subscription.getGlobalLocationNumber();
if (globalLocationNumber.isEmpty()) {
return false;
}
DatahubPriceSubscriptionCache cache = getDatahubPriceSubscriptionDataCache(subscription);
if (!force && cache.areTariffsValidTomorrow()) {
logger.debug("Cached tariffs of type {} still valid, skipping download.", subscription.getDatahubTariff());
cache.update();
return false;
} else {
return cache.put(downloadPriceLists(subscription));
}
}
private Collection<DatahubPricelistRecord> downloadPriceLists(DatahubPriceSubscription subscription)
throws InterruptedException, DataServiceException {
Map<String, String> properties = new HashMap<>();
try {
return apiController.getDatahubPriceLists(subscription.getGlobalLocationNumber(), ChargeType.Tariff,
subscription.getFilter(), properties);
} finally {
listenerToSubscriptions.keySet().forEach(listener -> listener.onPropertiesUpdated(properties));
}
}
private SpotPriceSubscriptionCache getSpotPriceSubscriptionDataCache(Subscription subscription) {
if (!(subscription instanceof SpotPriceSubscription)) {
throw new IllegalArgumentException("Invalid cache requested for subscription " + subscription);
}
SubscriptionDataCache<?> dataCache = getSubscriptionDataCache(subscription);
if (dataCache instanceof SpotPriceSubscriptionCache spotPriceSubscriptionCache) {
return spotPriceSubscriptionCache;
}
throw new IllegalArgumentException("Unexpected cache for subscription " + subscription);
}
private DatahubPriceSubscriptionCache getDatahubPriceSubscriptionDataCache(Subscription subscription) {
if (!(subscription instanceof DatahubPriceSubscription)) {
throw new IllegalArgumentException("Invalid cache requested for subscription " + subscription);
}
SubscriptionDataCache<?> dataCache = getSubscriptionDataCache(subscription);
if (dataCache instanceof DatahubPriceSubscriptionCache datahubPriceSubscriptionCache) {
return datahubPriceSubscriptionCache;
}
throw new IllegalArgumentException("Unexpected cache for subscription " + subscription);
}
private SubscriptionDataCache<?> getSubscriptionDataCache(Subscription subscription) {
SubscriptionDataCache<?> dataCache = subscriptionDataCaches.get(subscription);
if (dataCache != null) {
return dataCache;
}
if (subscription instanceof SpotPriceSubscription spotPriceSubscription) {
dataCache = new SpotPriceSubscriptionCache(spotPriceSubscription);
} else if (subscription instanceof DatahubPriceSubscription) {
dataCache = new DatahubPriceSubscriptionCache();
} else {
throw new IllegalArgumentException("No supported cache for subscription " + subscription);
}
subscriptionDataCaches.put(subscription, dataCache);
return dataCache;
}
private void publishPricesFromCache(Subscription subscription, Set<ElectricityPriceListener> listeners) {
if (subscription instanceof SpotPriceSubscription spotPriceSubscription) {
SpotPriceSubscriptionCache cache = getSpotPriceSubscriptionDataCache(subscription);
listeners.forEach(listener -> listener.onSpotPrices(cache.get(), spotPriceSubscription.getCurrency()));
} else if (subscription instanceof DatahubPriceSubscription datahubPriceSubscription) {
DatahubPriceSubscriptionCache cache = getDatahubPriceSubscriptionDataCache(subscription);
listeners.forEach(listener -> listener.onTariffs(datahubPriceSubscription.getDatahubTariff(), cache.get()));
}
}
private void updatePricesForAllSubscriptions() {
subscriptionToListeners.keySet().stream().forEach(this::updateCurrentPrices);
// Clean up caches not directly related to listener subscriptions, e.g. from Thing
// actions when having no linked channels.
subscriptionDataCaches.entrySet().stream().filter(entry -> !subscriptionToListeners.containsKey(entry.getKey()))
.forEach(entry -> entry.getValue().flush());
reschedulePriceUpdateJob();
}
private void updateCurrentPrices(Subscription subscription) {
getSubscriptionDataCache(subscription).flush();
publishCurrentPriceFromCache(subscription, getListeners(subscription));
}
private void publishCurrentPriceFromCache(Subscription subscription, Set<ElectricityPriceListener> listeners) {
BigDecimal currentPrice = getSubscriptionDataCache(subscription).get(Instant.now());
if (subscription instanceof SpotPriceSubscription spotPriceSubscription) {
listeners.forEach(
listener -> listener.onCurrentSpotPrice(currentPrice, spotPriceSubscription.getCurrency()));
} else if (subscription instanceof DatahubPriceSubscription datahubPriceSubscription) {
listeners.forEach(
listener -> listener.onCurrentTariff(datahubPriceSubscription.getDatahubTariff(), currentPrice));
}
}
private void reschedulePriceUpdateJob() {
ScheduledFuture<?> priceUpdateJob = this.priceUpdateFuture;
if (priceUpdateJob != null) {
// Do not interrupt ourselves.
priceUpdateJob.cancel(false);
this.priceUpdateFuture = null;
}
Instant nextUpdate = Instant.now().plus(1, ChronoUnit.HOURS).truncatedTo(ChronoUnit.HOURS);
this.priceUpdateFuture = scheduler.at(this::updatePricesForAllSubscriptions, nextUpdate);
logger.debug("Price update job rescheduled at {}", nextUpdate);
}
private void reschedulePriceRefreshJob(RetryStrategy retryPolicy) {
// Preserve state of previous retry policy when configuration is the same.
if (!retryPolicy.equals(this.retryPolicy)) {
this.retryPolicy = retryPolicy;
}
ScheduledFuture<?> refreshJob = this.refreshFuture;
long secondsUntilNextRefresh = this.retryPolicy.getDuration().getSeconds();
Instant timeOfNextRefresh = Instant.now().plusSeconds(secondsUntilNextRefresh);
this.refreshFuture = scheduler.at(this::refreshElectricityPrices, timeOfNextRefresh);
logger.debug("Price refresh job rescheduled in {} seconds: {}", secondsUntilNextRefresh, timeOfNextRefresh);
DateTimeFormatter formatter = DateTimeFormatter.ofPattern(PROPERTY_DATETIME_FORMAT);
String nextCall = LocalDateTime.ofInstant(timeOfNextRefresh, timeZoneProvider.getTimeZone())
.truncatedTo(ChronoUnit.SECONDS).format(formatter);
Map<String, String> propertyMap = Map.of(PROPERTY_NEXT_CALL, nextCall);
listenerToSubscriptions.keySet().forEach(listener -> listener.onPropertiesUpdated(propertyMap));
if (refreshJob != null) {
refreshJob.cancel(true);
}
}
}

View File

@ -0,0 +1,91 @@
/**
* Copyright (c) 2010-2024 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.binding.energidataservice.internal.provider.cache;
import static org.openhab.binding.energidataservice.internal.EnergiDataServiceBindingConstants.*;
import java.time.Clock;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.openhab.binding.energidataservice.internal.PriceListParser;
import org.openhab.binding.energidataservice.internal.api.dto.DatahubPricelistRecord;
/**
* Datahub price (tariff) specific {@link ElectricityPriceSubscriptionCache} implementation.
*
* @author Jacob Laursen - Initial contribution
*/
@NonNullByDefault
public class DatahubPriceSubscriptionCache
extends ElectricityPriceSubscriptionCache<Collection<DatahubPricelistRecord>> {
public static final int MAX_CACHE_SIZE = 24 * 2 + NUMBER_OF_HISTORIC_HOURS;
private final PriceListParser priceListParser = new PriceListParser();
private Collection<DatahubPricelistRecord> datahubRecords = new CopyOnWriteArrayList<>();
public DatahubPriceSubscriptionCache() {
this(Clock.systemDefaultZone());
}
public DatahubPriceSubscriptionCache(Clock clock) {
super(clock, MAX_CACHE_SIZE);
}
/**
* Replace current "raw"/unprocessed tariff records in cache.
* Map of hourly tariffs will be updated automatically.
*
* @param records The records as received from Energi Data Service.
*/
@Override
public boolean put(Collection<DatahubPricelistRecord> records) {
LocalDateTime localHourStart = LocalDateTime.now(clock.withZone(DATAHUB_TIMEZONE))
.minus(NUMBER_OF_HISTORIC_HOURS, ChronoUnit.HOURS).truncatedTo(ChronoUnit.HOURS);
List<DatahubPricelistRecord> newRecords = records.stream().filter(r -> !r.validTo().isBefore(localHourStart))
.toList();
boolean recordsAreEqual = datahubRecords.containsAll(newRecords) && newRecords.containsAll(datahubRecords);
datahubRecords = new CopyOnWriteArrayList<>(newRecords);
update();
return !recordsAreEqual;
}
/**
* Update map of hourly tariffs from internal cache.
*/
public void update() {
priceMap.putAll(priceListParser.toHourly(datahubRecords));
flush();
}
/**
* Check if we have "raw" tariff records cached which are valid tomorrow.
*
* @return true if tariff records for tomorrow are cached
*/
public boolean areTariffsValidTomorrow() {
LocalDateTime localHourStart = LocalDateTime.now(clock.withZone(DATAHUB_TIMEZONE))
.truncatedTo(ChronoUnit.HOURS);
LocalDateTime localMidnight = localHourStart.plusDays(1).truncatedTo(ChronoUnit.DAYS);
return datahubRecords.stream().anyMatch(r -> r.validTo().isAfter(localMidnight));
}
}

View File

@ -0,0 +1,118 @@
/**
* Copyright (c) 2010-2024 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.binding.energidataservice.internal.provider.cache;
import static org.openhab.binding.energidataservice.internal.EnergiDataServiceBindingConstants.NORD_POOL_TIMEZONE;
import java.math.BigDecimal;
import java.time.Clock;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
/**
* Electricity price specific {@link SubscriptionDataCache} implementation.
*
* @author Jacob Laursen - Initial contribution
*/
@NonNullByDefault
public abstract class ElectricityPriceSubscriptionCache<R> implements SubscriptionDataCache<R> {
public static final int NUMBER_OF_HISTORIC_HOURS = 24;
protected final Map<Instant, BigDecimal> priceMap;
protected final Clock clock;
protected ElectricityPriceSubscriptionCache(Clock clock, int initialCapacity) {
this.clock = clock.withZone(NORD_POOL_TIMEZONE);
this.priceMap = new ConcurrentHashMap<>(initialCapacity);
}
@Override
public void flush() {
Instant firstHourStart = getFirstHourStart();
priceMap.entrySet().removeIf(entry -> entry.getKey().isBefore(firstHourStart));
}
/**
* Get map of all cached prices.
*
* @return prices currently available, {@link #NUMBER_OF_HISTORIC_HOURS} back
*/
@Override
public Map<Instant, BigDecimal> get() {
return new HashMap<>(priceMap);
}
/**
* Get price valid at provided instant.
*
* @param time {@link Instant} for which to get the price
* @return price at given time or null if not available
*/
@Override
public @Nullable BigDecimal get(Instant time) {
return priceMap.get(getHourStart(time));
}
/**
* Get number of future prices including current hour.
*
* @return number of future prices
*/
@Override
public long getNumberOfFuturePrices() {
Instant currentHourStart = getCurrentHourStart();
return priceMap.entrySet().stream().filter(p -> !p.getKey().isBefore(currentHourStart)).count();
}
/**
* Check if historic prices ({@link #NUMBER_OF_HISTORIC_HOURS}) are cached.
*
* @return true if historic prices are cached
*/
@Override
public boolean areHistoricPricesCached() {
return arePricesCached(getCurrentHourStart().minus(1, ChronoUnit.HOURS));
}
protected boolean arePricesCached(Instant end) {
for (Instant hourStart = getFirstHourStart(); hourStart.compareTo(end) <= 0; hourStart = hourStart.plus(1,
ChronoUnit.HOURS)) {
if (priceMap.get(hourStart) == null) {
return false;
}
}
return true;
}
protected Instant getCurrentHourStart() {
return getHourStart(Instant.now(clock));
}
protected Instant getFirstHourStart() {
return getHourStart(Instant.now(clock).minus(NUMBER_OF_HISTORIC_HOURS, ChronoUnit.HOURS));
}
protected Instant getHourStart(Instant instant) {
return instant.truncatedTo(ChronoUnit.HOURS);
}
}

View File

@ -0,0 +1,90 @@
/**
* Copyright (c) 2010-2024 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.binding.energidataservice.internal.provider.cache;
import static org.openhab.binding.energidataservice.internal.EnergiDataServiceBindingConstants.*;
import java.math.BigDecimal;
import java.time.Clock;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalTime;
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.openhab.binding.energidataservice.internal.api.dto.ElspotpriceRecord;
import org.openhab.binding.energidataservice.internal.provider.subscription.SpotPriceSubscription;
/**
* Spot price specific {@link ElectricityPriceSubscriptionCache} implementation.
*
* @author Jacob Laursen - Initial contribution
*/
@NonNullByDefault
public class SpotPriceSubscriptionCache extends ElectricityPriceSubscriptionCache<ElspotpriceRecord[]> {
private static final int MAX_CACHE_SIZE = 24 + 11 + NUMBER_OF_HISTORIC_HOURS;
private final SpotPriceSubscription subscription;
public SpotPriceSubscriptionCache(SpotPriceSubscription subscription) {
this(subscription, Clock.systemDefaultZone());
}
public SpotPriceSubscriptionCache(SpotPriceSubscription subscription, Clock clock) {
super(clock, MAX_CACHE_SIZE);
this.subscription = subscription;
}
/**
* Convert and cache the supplied {@link ElspotpriceRecord}s.
*
* @param records The records as received from Energi Data Service.
* @return true if the provided records resulted in any cache changes
*/
@Override
public boolean put(ElspotpriceRecord[] records) {
boolean isDKK = CURRENCY_DKK.equals(subscription.getCurrency());
boolean anyChanges = false;
int oldSize = priceMap.size();
for (ElspotpriceRecord record : records) {
BigDecimal newValue = (isDKK ? record.spotPriceDKK() : record.spotPriceEUR())
.divide(BigDecimal.valueOf(1000));
BigDecimal oldValue = priceMap.put(record.hour(), newValue);
if (oldValue == null || newValue.compareTo(oldValue) != 0) {
anyChanges = true;
}
}
anyChanges |= oldSize != priceMap.size();
flush();
return anyChanges;
}
/**
* Check if all current spot prices are cached taking into consideration that next day's spot prices
* should be available at 13:00 CET.
*
* @return true if spot prices are fully cached
*/
public boolean arePricesFullyCached() {
Instant end = ZonedDateTime.of(LocalDate.now(clock), LocalTime.of(23, 0), NORD_POOL_TIMEZONE).toInstant();
LocalTime now = LocalTime.now(clock);
if (now.isAfter(DAILY_REFRESH_TIME_CET)) {
end = end.plus(24, ChronoUnit.HOURS);
}
return arePricesCached(end);
}
}

View File

@ -0,0 +1,73 @@
/**
* Copyright (c) 2010-2024 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.binding.energidataservice.internal.provider.cache;
import java.math.BigDecimal;
import java.time.Instant;
import java.util.Map;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
/**
* Generic interface for caching prices related to subscription.
*
* @author Jacob Laursen - Initial contribution
*/
@NonNullByDefault
public interface SubscriptionDataCache<R> {
/**
* Add records to cache.
*
* @param records Records to add to cache
* @return true if the provided records resulted in any cache changes
*/
boolean put(R records);
/**
* Get cached prices.
*
* @return Map of cached key/value pairs
*/
Map<Instant, BigDecimal> get();
/**
* Get cached price for specific {@link Instant}.
*
* @param time Get cached value at this time
* @return Price at given time
*/
@Nullable
BigDecimal get(Instant time);
/**
* Flush expired cached values.
*/
void flush();
/**
* Get the number of future prices in the cache.
*
* @return number of cached future prices
*/
long getNumberOfFuturePrices();
/**
* Check if all required historic values are cached, considering
* {@link ElectricityPriceSubscriptionCache#NUMBER_OF_HISTORIC_HOURS}.
*
* @return true if historic values are fully cached
*/
boolean areHistoricPricesCached();
}

View File

@ -0,0 +1,48 @@
/**
* Copyright (c) 2010-2024 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.binding.energidataservice.internal.provider.listener;
import java.math.BigDecimal;
import java.time.Instant;
import java.util.Map;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.openhab.binding.energidataservice.internal.provider.subscription.Co2EmissionSubscription;
/**
* {@link Co2EmissionListener} provides an interface for receiving
* CO2 emission data.
*
* @author Jacob Laursen - Initial contribution
*/
@NonNullByDefault
public interface Co2EmissionListener extends SubscriptionListener {
/**
* Current realtime emission or prognosis has been updated.
*
* @param type The type (either {@link Co2EmissionSubscription.Type#Realtime} or
* {@link Co2EmissionSubscription.Type#Prognosis})
* @param emission Emission in g/kWh
*/
void onCurrentEmission(Co2EmissionSubscription.Type type, BigDecimal emission);
/**
* Realtime emissions or prognosis have changed.
* Can be used to update time series.
*
* @param type The type (either {@link Co2EmissionSubscription.Type#Realtime} or
* {@link Co2EmissionSubscription.Type#Prognosis})
* @param emissions Emissions in g/kWh
*/
void onEmissions(Co2EmissionSubscription.Type type, Map<Instant, BigDecimal> emissions);
}

View File

@ -0,0 +1,70 @@
/**
* Copyright (c) 2010-2024 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.binding.energidataservice.internal.provider.listener;
import java.math.BigDecimal;
import java.time.Instant;
import java.util.Currency;
import java.util.Map;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.openhab.binding.energidataservice.internal.DatahubTariff;
/**
* {@link ElectricityPriceListener} provides an interface for receiving
* electricity price data.
*
* @author Jacob Laursen - Initial contribution
*/
@NonNullByDefault
public interface ElectricityPriceListener extends SubscriptionListener {
/**
* New day-ahead spot prices are available.
*/
void onDayAheadAvailable();
/**
* Current spot price has been updated (every hour).
*
* @param price New current price
* @param currency Currency
*/
void onCurrentSpotPrice(@Nullable BigDecimal price, Currency currency);
/**
* Spot prices have changed.
* Can be used to update time series.
*
* @param spotPrices New spot prices
* @param currency Currency
*/
void onSpotPrices(Map<Instant, BigDecimal> spotPrices, Currency currency);
/**
* Current tariff has been updated.
*
* @param datahubTariff Tariff type that was updated
* @param tariff New tariff
*/
void onCurrentTariff(DatahubTariff datahubTariff, @Nullable BigDecimal tariff);
/**
* Tariffs have changed.
* Can be used to update time series.
*
* @param datahubTariff Tariff type that was updated
* @param tariffs New tariffs
*/
void onTariffs(DatahubTariff datahubTariff, Map<Instant, BigDecimal> tariffs);
}

View File

@ -0,0 +1,41 @@
/**
* Copyright (c) 2010-2024 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.binding.energidataservice.internal.provider.listener;
import java.util.Map;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
/**
* {@link SubscriptionListener} provides a generic interface for receiving data
* from different providers.
*
* @author Jacob Laursen - Initial contribution
*/
@NonNullByDefault
public interface SubscriptionListener {
/**
* Properties (such as lastCall and nextCall) have been updated.
*
* @param properties
*/
void onPropertiesUpdated(Map<String, String> properties);
/**
* A communication error has occurred when calling the service.
*
* @param description Error description
*/
void onCommunicationError(@Nullable String description);
}

View File

@ -0,0 +1,84 @@
/**
* Copyright (c) 2010-2024 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.binding.energidataservice.internal.provider.subscription;
import java.util.Objects;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.openhab.binding.energidataservice.internal.api.Dataset;
/**
* Class for CO2 emission subscription.
*
* @author Jacob Laursen - Initial contribution
*/
@NonNullByDefault
public class Co2EmissionSubscription implements Subscription {
private final String priceArea;
private final Type type;
public enum Type {
Prognosis(Dataset.CO2EmissionPrognosis),
Realtime(Dataset.CO2Emission);
private final Dataset dataset;
Type(Dataset dataset) {
this.dataset = dataset;
}
public Dataset getDataset() {
return dataset;
}
}
private Co2EmissionSubscription(String priceArea, Type type) {
this.priceArea = priceArea;
this.type = type;
}
@Override
public boolean equals(@Nullable Object o) {
if (o == this) {
return true;
}
if (!(o instanceof Co2EmissionSubscription other)) {
return false;
}
return this.priceArea.equals(other.priceArea) && this.type.equals((other.type));
}
@Override
public int hashCode() {
return Objects.hash(priceArea, type);
}
@Override
public String toString() {
return "Co2EmissionSubscription: PriceArea=" + priceArea + ", Type=" + type;
}
public String getPriceArea() {
return priceArea;
}
public Type getType() {
return type;
}
public static Co2EmissionSubscription of(String priceArea, Type type) {
return new Co2EmissionSubscription(priceArea, type);
}
}

View File

@ -0,0 +1,79 @@
/**
* Copyright (c) 2010-2024 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.binding.energidataservice.internal.provider.subscription;
import java.util.Objects;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.openhab.binding.energidataservice.internal.DatahubTariff;
import org.openhab.binding.energidataservice.internal.api.DatahubTariffFilter;
import org.openhab.binding.energidataservice.internal.api.GlobalLocationNumber;
/**
* Class for datahub price subscription.
*
* @author Jacob Laursen - Initial contribution
*/
@NonNullByDefault
public class DatahubPriceSubscription implements ElectricityPriceSubscription {
private final DatahubTariff datahubTariff;
private final GlobalLocationNumber globalLocationNumber;
private final DatahubTariffFilter filter;
private DatahubPriceSubscription(DatahubTariff datahubTariff, GlobalLocationNumber globalLocationNumber,
DatahubTariffFilter filter) {
this.datahubTariff = datahubTariff;
this.globalLocationNumber = globalLocationNumber;
this.filter = filter;
}
@Override
public boolean equals(@Nullable Object o) {
if (o == this) {
return true;
}
if (!(o instanceof DatahubPriceSubscription other)) {
return false;
}
return this.globalLocationNumber.equals(other.globalLocationNumber) && this.filter.equals(other.filter);
}
@Override
public int hashCode() {
return Objects.hash(globalLocationNumber, filter);
}
@Override
public String toString() {
return "DatahubPriceSubscription: GLN=" + globalLocationNumber + ", Filter=" + filter;
}
public DatahubTariff getDatahubTariff() {
return datahubTariff;
}
public GlobalLocationNumber getGlobalLocationNumber() {
return globalLocationNumber;
}
public DatahubTariffFilter getFilter() {
return filter;
}
public static DatahubPriceSubscription of(DatahubTariff datahubTariff, GlobalLocationNumber globalLocationNumber,
DatahubTariffFilter filter) {
return new DatahubPriceSubscription(datahubTariff, globalLocationNumber, filter);
}
}

View File

@ -0,0 +1,24 @@
/**
* Copyright (c) 2010-2024 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.binding.energidataservice.internal.provider.subscription;
import org.eclipse.jdt.annotation.NonNullByDefault;
/**
* Super interface for electricity price subscription.
*
* @author Jacob Laursen - Initial contribution
*/
@NonNullByDefault
public interface ElectricityPriceSubscription extends Subscription {
}

View File

@ -0,0 +1,69 @@
/**
* Copyright (c) 2010-2024 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.binding.energidataservice.internal.provider.subscription;
import java.util.Currency;
import java.util.Objects;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
/**
* Class for spot price subscription.
*
* @author Jacob Laursen - Initial contribution
*/
@NonNullByDefault
public class SpotPriceSubscription implements ElectricityPriceSubscription {
private final String priceArea;
private final Currency currency;
private SpotPriceSubscription(String priceArea, Currency currency) {
this.priceArea = priceArea;
this.currency = currency;
}
@Override
public boolean equals(@Nullable Object o) {
if (o == this) {
return true;
}
if (!(o instanceof SpotPriceSubscription other)) {
return false;
}
return this.priceArea.equals(other.priceArea) && this.currency.equals(other.currency);
}
@Override
public int hashCode() {
return Objects.hash(priceArea, currency);
}
@Override
public String toString() {
return "SpotPriceSubscription: PriceArea=" + priceArea + ", Currency=" + currency;
}
public String getPriceArea() {
return priceArea;
}
public Currency getCurrency() {
return currency;
}
public static SpotPriceSubscription of(String priceArea, Currency currency) {
return new SpotPriceSubscription(priceArea, currency);
}
}

View File

@ -0,0 +1,24 @@
/**
* Copyright (c) 2010-2024 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.binding.energidataservice.internal.provider.subscription;
import org.eclipse.jdt.annotation.NonNullByDefault;
/**
* Generic interface for subscription.
*
* @author Jacob Laursen - Initial contribution
*/
@NonNullByDefault
public interface Subscription {
}

View File

@ -82,10 +82,9 @@ public class ExponentialBackoff implements RetryStrategy {
if (o == this) {
return true;
}
if (!(o instanceof ExponentialBackoff)) {
if (!(o instanceof ExponentialBackoff other)) {
return false;
}
ExponentialBackoff other = (ExponentialBackoff) o;
return this.factor == other.factor && this.jitter == other.jitter && this.minimum.equals(other.minimum)
&& this.maximum.equals(other.maximum);

View File

@ -75,10 +75,9 @@ public class Linear implements RetryStrategy {
if (o == this) {
return true;
}
if (!(o instanceof Linear)) {
if (!(o instanceof Linear other)) {
return false;
}
Linear other = (Linear) o;
return this.jitter == other.jitter && this.minimum.equals(other.minimum) && this.maximum.equals(other.maximum);
}

View File

@ -1,126 +0,0 @@
/**
* Copyright (c) 2010-2024 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.binding.energidataservice.internal;
import static org.hamcrest.CoreMatchers.*;
import static org.hamcrest.MatcherAssert.assertThat;
import java.math.BigDecimal;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.time.temporal.ChronoUnit;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;
import org.openhab.binding.energidataservice.internal.api.dto.ElspotpriceRecord;
/**
* Tests for {@link CacheManager}.
*
* @author Jacob Laursen - Initial contribution
*/
@NonNullByDefault
@ExtendWith(MockitoExtension.class)
public class CacheManagerTest {
@Test
void areSpotPricesFullyCachedToday() {
Instant now = Instant.parse("2023-02-07T08:38:47Z");
Instant first = Instant.parse("2023-02-06T08:00:00Z");
Instant last = Instant.parse("2023-02-07T22:00:00Z");
Clock clock = Clock.fixed(now, EnergiDataServiceBindingConstants.NORD_POOL_TIMEZONE);
CacheManager cacheManager = new CacheManager(clock);
populateWithSpotPrices(cacheManager, first, last);
assertThat(cacheManager.areSpotPricesFullyCached(), is(true));
}
@Test
void areSpotPricesFullyCachedTodayMissingAtStart() {
Instant now = Instant.parse("2023-02-07T08:38:47Z");
Instant first = Instant.parse("2023-02-06T21:00:00Z");
Instant last = Instant.parse("2023-02-07T22:00:00Z");
Clock clock = Clock.fixed(now, EnergiDataServiceBindingConstants.NORD_POOL_TIMEZONE);
CacheManager cacheManager = new CacheManager(clock);
populateWithSpotPrices(cacheManager, first, last);
assertThat(cacheManager.areSpotPricesFullyCached(), is(false));
}
@Test
void areSpotPricesFullyCachedTodayMissingAtEnd() {
Instant now = Instant.parse("2023-02-07T08:38:47Z");
Instant first = Instant.parse("2023-02-06T20:00:00Z");
Instant last = Instant.parse("2023-02-07T21:00:00Z");
Clock clock = Clock.fixed(now, EnergiDataServiceBindingConstants.NORD_POOL_TIMEZONE);
CacheManager cacheManager = new CacheManager(clock);
populateWithSpotPrices(cacheManager, first, last);
assertThat(cacheManager.areSpotPricesFullyCached(), is(false));
}
@Test
void areSpotPricesFullyCachedTodayOtherTimezoneIsIgnored() {
Instant now = Instant.parse("2023-02-07T08:38:47Z");
Instant first = Instant.parse("2023-02-06T08:00:00Z");
Instant last = Instant.parse("2023-02-07T22:00:00Z");
Clock clock = Clock.fixed(now, ZoneId.of("Asia/Tokyo"));
CacheManager cacheManager = new CacheManager(clock);
populateWithSpotPrices(cacheManager, first, last);
assertThat(cacheManager.areSpotPricesFullyCached(), is(true));
}
@Test
void areSpotPricesFullyCachedTomorrow() {
Instant now = Instant.parse("2023-02-07T12:00:00Z");
Instant first = Instant.parse("2023-02-06T12:00:00Z");
Instant last = Instant.parse("2023-02-08T22:00:00Z");
Clock clock = Clock.fixed(now, EnergiDataServiceBindingConstants.NORD_POOL_TIMEZONE);
CacheManager cacheManager = new CacheManager(clock);
populateWithSpotPrices(cacheManager, first, last);
assertThat(cacheManager.areSpotPricesFullyCached(), is(true));
}
@Test
void areHistoricSpotPricesCached() {
Instant now = Instant.parse("2023-02-07T08:38:47Z");
Instant first = Instant.parse("2023-02-06T08:00:00Z");
Instant last = Instant.parse("2023-02-07T07:00:00Z");
Clock clock = Clock.fixed(now, EnergiDataServiceBindingConstants.NORD_POOL_TIMEZONE);
CacheManager cacheManager = new CacheManager(clock);
populateWithSpotPrices(cacheManager, first, last);
assertThat(cacheManager.areHistoricSpotPricesCached(), is(true));
}
@Test
void areHistoricSpotPricesCachedFirstHourMissing() {
Instant now = Instant.parse("2023-02-07T08:38:47Z");
Instant first = Instant.parse("2023-02-06T21:00:00Z");
Instant last = Instant.parse("2023-02-07T08:00:00Z");
Clock clock = Clock.fixed(now, EnergiDataServiceBindingConstants.NORD_POOL_TIMEZONE);
CacheManager cacheManager = new CacheManager(clock);
populateWithSpotPrices(cacheManager, first, last);
assertThat(cacheManager.areHistoricSpotPricesCached(), is(false));
}
private void populateWithSpotPrices(CacheManager cacheManager, Instant first, Instant last) {
int size = (int) Duration.between(first, last).getSeconds() / 60 / 60 + 1;
ElspotpriceRecord[] records = new ElspotpriceRecord[size];
int i = 0;
for (Instant hourStart = first; !hourStart.isAfter(last); hourStart = hourStart.plus(1, ChronoUnit.HOURS)) {
records[i++] = new ElspotpriceRecord(hourStart, BigDecimal.ONE, BigDecimal.ZERO);
}
cacheManager.putSpotPrices(records, EnergiDataServiceBindingConstants.CURRENCY_DKK);
}
}

View File

@ -0,0 +1,161 @@
/**
* Copyright (c) 2010-2024 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.binding.energidataservice.internal.provider;
import static org.hamcrest.CoreMatchers.*;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.math.BigDecimal;
import java.time.Instant;
import java.util.Currency;
import java.util.Map;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import org.openhab.binding.energidataservice.internal.DatahubTariff;
import org.openhab.binding.energidataservice.internal.provider.listener.ElectricityPriceListener;
import org.openhab.binding.energidataservice.internal.provider.subscription.SpotPriceSubscription;
import org.openhab.binding.energidataservice.internal.provider.subscription.Subscription;
import org.openhab.core.i18n.TimeZoneProvider;
import org.openhab.core.io.net.http.HttpClientFactory;
import org.openhab.core.scheduler.ScheduledCompletableFuture;
import org.openhab.core.scheduler.Scheduler;
import org.openhab.core.scheduler.SchedulerRunnable;
/**
* Tests for {@link ElectricityPriceProvider}.
*
* @author Jacob Laursen - Initial contribution
*/
@NonNullByDefault
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
public class ElectricityPriceProviderTest {
private @NonNullByDefault({}) @Mock Scheduler scheduler;
private @NonNullByDefault({}) @Mock HttpClientFactory httpClientFactory;
private @NonNullByDefault({}) @Mock TimeZoneProvider timeZoneProvider;
private @NonNullByDefault({}) @Mock MockedListener listener1;
private @NonNullByDefault({}) @Mock MockedListener listener2;
private @NonNullByDefault({}) ElectricityPriceProvider provider;
@SuppressWarnings("unchecked")
@BeforeEach
void setUp() {
ScheduledCompletableFuture<@Nullable Void> futureMock = (ScheduledCompletableFuture<@Nullable Void>) mock(
ScheduledCompletableFuture.class);
when(scheduler.at(any(SchedulerRunnable.class), any(Instant.class))).thenReturn(futureMock);
provider = new ElectricityPriceProvider(scheduler, httpClientFactory, timeZoneProvider);
}
@AfterEach
void teardown() {
provider.unsubscribe(listener1);
provider.unsubscribe(listener2);
}
@Test
void subscribeDuplicateRegistrationThrowsIllegalArgumentException() {
provider.subscribe(listener1, SpotPriceSubscription.of("DK1", Currency.getInstance("DKK")));
assertThrows(IllegalArgumentException.class, () -> {
provider.subscribe(listener1, SpotPriceSubscription.of("DK1", Currency.getInstance("DKK")));
});
}
@Test
void subscribeFirstSubscriptionSchedulesRefreshJob() {
provider.subscribe(listener1, SpotPriceSubscription.of("DK1", Currency.getInstance("DKK")));
verify(scheduler, times(1)).at(any(SchedulerRunnable.class), any(Instant.class));
}
@Test
void subscribeSecondSubscriptionReschedulesRefreshJob() {
provider.subscribe(listener1, SpotPriceSubscription.of("DK1", Currency.getInstance("DKK")));
provider.subscribe(listener1, SpotPriceSubscription.of("DK1", Currency.getInstance("EUR")));
verify(scheduler, times(2)).at(any(SchedulerRunnable.class), any(Instant.class));
}
@Test
void subscribeSecondSubscriptionFromOtherListenerReschedulesRefreshJob() {
provider.subscribe(listener1, SpotPriceSubscription.of("DK1", Currency.getInstance("DKK")));
provider.subscribe(listener2, SpotPriceSubscription.of("DK1", Currency.getInstance("EUR")));
verify(scheduler, times(2)).at(any(SchedulerRunnable.class), any(Instant.class));
}
@Test
void subscribeSecondSameSubscriptionFromOtherListenerDoesNotScheduleRefreshJob() {
provider.subscribe(listener1, SpotPriceSubscription.of("DK1", Currency.getInstance("DKK")));
provider.subscribe(listener2, SpotPriceSubscription.of("DK1", Currency.getInstance("DKK")));
verify(scheduler, times(1)).at(any(SchedulerRunnable.class), any(Instant.class));
}
@Test
void subscribeAfterUnsubscribeSchedulesRefreshJobAgain() {
Subscription subscription = SpotPriceSubscription.of("DK1", Currency.getInstance("DKK"));
provider.subscribe(listener1, subscription);
provider.unsubscribe(listener1, subscription);
provider.subscribe(listener1, subscription);
verify(scheduler, times(2)).at(any(SchedulerRunnable.class), any(Instant.class));
}
@Test
void unsubscribeUnknownSubscriptionThrowsIllegalArgumentException() {
assertThrows(IllegalArgumentException.class, () -> {
provider.unsubscribe(listener1, SpotPriceSubscription.of("DK1", Currency.getInstance("DKK")));
});
}
private class MockedListener implements ElectricityPriceListener {
@Override
public void onDayAheadAvailable() {
}
@Override
public void onCurrentSpotPrice(@Nullable BigDecimal price, Currency currency) {
}
@Override
public void onSpotPrices(Map<Instant, BigDecimal> spotPrices, Currency currency) {
}
@Override
public void onCurrentTariff(DatahubTariff datahubTariff, @Nullable BigDecimal tariff) {
}
@Override
public void onTariffs(DatahubTariff datahubTariff, Map<Instant, BigDecimal> tariffs) {
}
@Override
public void onPropertiesUpdated(Map<String, String> properties) {
}
@Override
public void onCommunicationError(@Nullable String description) {
}
}
}

View File

@ -0,0 +1,138 @@
/**
* Copyright (c) 2010-2024 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.binding.energidataservice.internal.provider.cache;
import static org.hamcrest.CoreMatchers.*;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.openhab.binding.energidataservice.internal.EnergiDataServiceBindingConstants.*;
import java.math.BigDecimal;
import java.time.Clock;
import java.time.Instant;
import java.time.LocalDateTime;
import java.util.ArrayList;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;
import org.openhab.binding.energidataservice.internal.api.dto.DatahubPricelistRecord;
/**
* Tests for {@link DatahubPriceSubscriptionCache}.
*
* @author Jacob Laursen - Initial contribution
*/
@NonNullByDefault
@ExtendWith(MockitoExtension.class)
public class DatahubPriceSubscriptionCacheTest {
@Test
void areTariffsValidTomorrowTwoDaysBeforeEnding() {
Instant now = Instant.parse("2024-09-29T09:22:00Z");
LocalDateTime from = LocalDateTime.parse("2024-08-01T00:00:00");
LocalDateTime to = LocalDateTime.parse("2024-10-01T00:00:00");
Clock clock = Clock.fixed(now, DATAHUB_TIMEZONE);
DatahubPriceSubscriptionCache cache = new DatahubPriceSubscriptionCache(clock);
populateWithDatahubPrices(cache, from, to);
assertThat(cache.areTariffsValidTomorrow(), is(true));
}
@Test
void areTariffsValidTomorrowOneDayBeforeEnding() {
Instant now = Instant.parse("2024-09-30T09:22:00Z");
LocalDateTime from = LocalDateTime.parse("2024-08-01T00:00:00");
LocalDateTime to = LocalDateTime.parse("2024-10-01T00:00:00");
Clock clock = Clock.fixed(now, DATAHUB_TIMEZONE);
DatahubPriceSubscriptionCache cache = new DatahubPriceSubscriptionCache(clock);
populateWithDatahubPrices(cache, from, to);
assertThat(cache.areTariffsValidTomorrow(), is(false));
}
@Test
void updateCacheIsNotChanged() {
Instant now = Instant.parse("2024-09-30T09:22:00Z");
LocalDateTime from = LocalDateTime.parse("2024-08-01T00:00:00");
LocalDateTime to = LocalDateTime.parse("2024-10-01T00:00:00");
Clock clock = Clock.fixed(now, DATAHUB_TIMEZONE);
DatahubPriceSubscriptionCache cache = new DatahubPriceSubscriptionCache(clock);
populateWithDatahubPrices(cache, from, to);
assertThat(populateWithDatahubPrices(cache, from, to), is(false));
}
@Test
void updateCacheIsNotChangedSameValue() {
Instant now = Instant.parse("2024-09-30T09:22:00Z");
LocalDateTime from = LocalDateTime.parse("2024-08-01T00:00:00");
LocalDateTime to = LocalDateTime.parse("2024-10-01T00:00:00");
Clock clock = Clock.fixed(now, DATAHUB_TIMEZONE);
DatahubPriceSubscriptionCache cache = new DatahubPriceSubscriptionCache(clock);
populateWithDatahubPrices(cache, from, to);
from = LocalDateTime.parse("2024-10-01T00:00:00");
to = LocalDateTime.parse("2024-11-01T00:00:00");
populateWithDatahubPrices(cache, from, to);
var changedRecords = new ArrayList<DatahubPricelistRecord>();
changedRecords.add(new DatahubPricelistRecord(from, to, "CD", BigDecimal.ZERO, BigDecimal.ZERO, BigDecimal.ZERO,
BigDecimal.ZERO, BigDecimal.ZERO, BigDecimal.ZERO, BigDecimal.ZERO, BigDecimal.ZERO, BigDecimal.ZERO,
BigDecimal.ZERO, BigDecimal.ZERO, BigDecimal.ZERO, BigDecimal.ZERO, BigDecimal.ZERO, BigDecimal.ZERO,
BigDecimal.ZERO, BigDecimal.ZERO, BigDecimal.ZERO, BigDecimal.ZERO, BigDecimal.ZERO, BigDecimal.ZERO,
BigDecimal.ZERO, BigDecimal.ZERO, BigDecimal.ZERO));
assertThat(cache.put(changedRecords), is(false));
}
@Test
void updateCacheIsChangedByOneValue() {
Instant now = Instant.parse("2024-09-30T09:22:00Z");
LocalDateTime from = LocalDateTime.parse("2024-08-01T00:00:00");
LocalDateTime to = LocalDateTime.parse("2024-10-01T00:00:00");
Clock clock = Clock.fixed(now, DATAHUB_TIMEZONE);
DatahubPriceSubscriptionCache cache = new DatahubPriceSubscriptionCache(clock);
populateWithDatahubPrices(cache, from, to);
from = LocalDateTime.parse("2024-10-01T00:00:00");
to = LocalDateTime.parse("2024-11-01T00:00:00");
populateWithDatahubPrices(cache, from, to);
var changedRecords = new ArrayList<DatahubPricelistRecord>();
changedRecords.add(new DatahubPricelistRecord(from, to, "CD", BigDecimal.ONE, BigDecimal.ZERO, BigDecimal.ZERO,
BigDecimal.ZERO, BigDecimal.ZERO, BigDecimal.ZERO, BigDecimal.ZERO, BigDecimal.ZERO, BigDecimal.ZERO,
BigDecimal.ZERO, BigDecimal.ZERO, BigDecimal.ZERO, BigDecimal.ZERO, BigDecimal.ZERO, BigDecimal.ZERO,
BigDecimal.ZERO, BigDecimal.ZERO, BigDecimal.ZERO, BigDecimal.ZERO, BigDecimal.ZERO, BigDecimal.ZERO,
BigDecimal.ZERO, BigDecimal.ZERO, BigDecimal.ZERO));
assertThat(cache.put(changedRecords), is(true));
}
@Test
void updateCacheIsChangedByAdditionalKey() {
Instant now = Instant.parse("2024-09-30T09:22:00Z");
LocalDateTime from = LocalDateTime.parse("2024-08-01T00:00:00");
LocalDateTime to = LocalDateTime.parse("2024-10-01T00:00:00");
Clock clock = Clock.fixed(now, DATAHUB_TIMEZONE);
DatahubPriceSubscriptionCache cache = new DatahubPriceSubscriptionCache(clock);
populateWithDatahubPrices(cache, from, to);
assertThat(populateWithDatahubPrices(cache, to, to.plusMonths(1)), is(true));
}
private boolean populateWithDatahubPrices(DatahubPriceSubscriptionCache cache, LocalDateTime validFrom,
LocalDateTime validTo) {
var records = new ArrayList<DatahubPricelistRecord>();
records.add(new DatahubPricelistRecord(validFrom, validTo, "CD", BigDecimal.ZERO, BigDecimal.ZERO,
BigDecimal.ZERO, BigDecimal.ZERO, BigDecimal.ZERO, BigDecimal.ZERO, BigDecimal.ZERO, BigDecimal.ZERO,
BigDecimal.ZERO, BigDecimal.ZERO, BigDecimal.ZERO, BigDecimal.ZERO, BigDecimal.ZERO, BigDecimal.ZERO,
BigDecimal.ZERO, BigDecimal.ZERO, BigDecimal.ZERO, BigDecimal.ZERO, BigDecimal.ZERO, BigDecimal.ZERO,
BigDecimal.ZERO, BigDecimal.ZERO, BigDecimal.ZERO, BigDecimal.ZERO));
return cache.put(records);
}
}

View File

@ -0,0 +1,187 @@
/**
* Copyright (c) 2010-2024 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.binding.energidataservice.internal.provider.cache;
import static org.hamcrest.CoreMatchers.*;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.openhab.binding.energidataservice.internal.EnergiDataServiceBindingConstants.*;
import java.math.BigDecimal;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.time.temporal.ChronoUnit;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;
import org.openhab.binding.energidataservice.internal.api.dto.ElspotpriceRecord;
import org.openhab.binding.energidataservice.internal.provider.subscription.SpotPriceSubscription;
/**
* Tests for {@link SpotPriceSubscriptionCache}.
*
* @author Jacob Laursen - Initial contribution
*/
@NonNullByDefault
@ExtendWith(MockitoExtension.class)
public class SpotPriceSubscriptionCacheTest {
@Test
void areSpotPricesFullyCachedToday() {
Instant now = Instant.parse("2023-02-07T08:38:47Z");
Instant first = Instant.parse("2023-02-06T08:00:00Z");
Instant last = Instant.parse("2023-02-07T22:00:00Z");
Clock clock = Clock.fixed(now, NORD_POOL_TIMEZONE);
SpotPriceSubscriptionCache cache = new SpotPriceSubscriptionCache(SpotPriceSubscription.of("DK1", CURRENCY_DKK),
clock);
populateWithSpotPrices(cache, first, last);
assertThat(cache.arePricesFullyCached(), is(true));
}
@Test
void areSpotPricesFullyCachedTodayMissingAtStart() {
Instant now = Instant.parse("2023-02-07T08:38:47Z");
Instant first = Instant.parse("2023-02-06T21:00:00Z");
Instant last = Instant.parse("2023-02-07T22:00:00Z");
Clock clock = Clock.fixed(now, NORD_POOL_TIMEZONE);
SpotPriceSubscriptionCache cache = new SpotPriceSubscriptionCache(SpotPriceSubscription.of("DK1", CURRENCY_DKK),
clock);
populateWithSpotPrices(cache, first, last);
assertThat(cache.arePricesFullyCached(), is(false));
}
@Test
void areSpotPricesFullyCachedTodayMissingAtEnd() {
Instant now = Instant.parse("2023-02-07T08:38:47Z");
Instant first = Instant.parse("2023-02-06T20:00:00Z");
Instant last = Instant.parse("2023-02-07T21:00:00Z");
Clock clock = Clock.fixed(now, NORD_POOL_TIMEZONE);
SpotPriceSubscriptionCache cache = new SpotPriceSubscriptionCache(SpotPriceSubscription.of("DK1", CURRENCY_DKK),
clock);
populateWithSpotPrices(cache, first, last);
assertThat(cache.arePricesFullyCached(), is(false));
}
@Test
void areSpotPricesFullyCachedTodayOtherTimezoneIsIgnored() {
Instant now = Instant.parse("2023-02-07T08:38:47Z");
Instant first = Instant.parse("2023-02-06T08:00:00Z");
Instant last = Instant.parse("2023-02-07T22:00:00Z");
Clock clock = Clock.fixed(now, ZoneId.of("Asia/Tokyo"));
SpotPriceSubscriptionCache cache = new SpotPriceSubscriptionCache(SpotPriceSubscription.of("DK1", CURRENCY_DKK),
clock);
populateWithSpotPrices(cache, first, last);
assertThat(cache.arePricesFullyCached(), is(true));
}
@Test
void areSpotPricesFullyCachedTomorrow() {
Instant now = Instant.parse("2023-02-07T12:00:00Z");
Instant first = Instant.parse("2023-02-06T12:00:00Z");
Instant last = Instant.parse("2023-02-08T22:00:00Z");
Clock clock = Clock.fixed(now, NORD_POOL_TIMEZONE);
SpotPriceSubscriptionCache cache = new SpotPriceSubscriptionCache(SpotPriceSubscription.of("DK1", CURRENCY_DKK),
clock);
populateWithSpotPrices(cache, first, last);
assertThat(cache.arePricesFullyCached(), is(true));
}
@Test
void areHistoricSpotPricesCached() {
Instant now = Instant.parse("2023-02-07T08:38:47Z");
Instant first = Instant.parse("2023-02-06T08:00:00Z");
Instant last = Instant.parse("2023-02-07T07:00:00Z");
Clock clock = Clock.fixed(now, NORD_POOL_TIMEZONE);
SpotPriceSubscriptionCache cache = new SpotPriceSubscriptionCache(SpotPriceSubscription.of("DK1", CURRENCY_DKK),
clock);
populateWithSpotPrices(cache, first, last);
assertThat(cache.areHistoricPricesCached(), is(true));
}
@Test
void areHistoricSpotPricesCachedFirstHourMissing() {
Instant now = Instant.parse("2023-02-07T08:38:47Z");
Instant first = Instant.parse("2023-02-06T21:00:00Z");
Instant last = Instant.parse("2023-02-07T08:00:00Z");
Clock clock = Clock.fixed(now, NORD_POOL_TIMEZONE);
SpotPriceSubscriptionCache cache = new SpotPriceSubscriptionCache(SpotPriceSubscription.of("DK1", CURRENCY_DKK),
clock);
populateWithSpotPrices(cache, first, last);
assertThat(cache.areHistoricPricesCached(), is(false));
}
@Test
void updateCacheIsNotChanged() {
Instant now = Instant.parse("2023-02-07T08:38:47Z");
Instant first = Instant.parse("2023-02-06T08:00:00Z");
Instant last = Instant.parse("2023-02-07T07:00:00Z");
Clock clock = Clock.fixed(now, NORD_POOL_TIMEZONE);
SpotPriceSubscriptionCache cache = new SpotPriceSubscriptionCache(SpotPriceSubscription.of("DK1", CURRENCY_DKK),
clock);
populateWithSpotPrices(cache, first, last);
assertThat(populateWithSpotPrices(cache, first, last), is(false));
}
@Test
void updateCacheIsNotChangedSameValue() {
Instant now = Instant.parse("2023-02-07T08:38:47Z");
Instant first = Instant.parse("2023-02-06T08:00:00Z");
Instant last = Instant.parse("2023-02-07T07:00:00Z");
Clock clock = Clock.fixed(now, NORD_POOL_TIMEZONE);
SpotPriceSubscriptionCache cache = new SpotPriceSubscriptionCache(SpotPriceSubscription.of("DK1", CURRENCY_DKK),
clock);
populateWithSpotPrices(cache, first, last);
ElspotpriceRecord[] changedRecords = new ElspotpriceRecord[1];
changedRecords[0] = new ElspotpriceRecord(last, BigDecimal.ONE, BigDecimal.ZERO);
assertThat(cache.put(changedRecords), is(false));
}
@Test
void updateCacheIsChangedByOneValue() {
Instant now = Instant.parse("2023-02-07T08:38:47Z");
Instant first = Instant.parse("2023-02-06T08:00:00Z");
Instant last = Instant.parse("2023-02-07T07:00:00Z");
Clock clock = Clock.fixed(now, NORD_POOL_TIMEZONE);
SpotPriceSubscriptionCache cache = new SpotPriceSubscriptionCache(SpotPriceSubscription.of("DK1", CURRENCY_DKK),
clock);
populateWithSpotPrices(cache, first, last);
ElspotpriceRecord[] changedRecords = new ElspotpriceRecord[1];
changedRecords[0] = new ElspotpriceRecord(last, BigDecimal.TEN, BigDecimal.ZERO);
assertThat(cache.put(changedRecords), is(true));
}
@Test
void updateCacheIsChangedByAdditionalKey() {
Instant now = Instant.parse("2023-02-07T08:38:47Z");
Instant first = Instant.parse("2023-02-06T08:00:00Z");
Instant last = Instant.parse("2023-02-07T07:00:00Z");
Clock clock = Clock.fixed(now, NORD_POOL_TIMEZONE);
SpotPriceSubscriptionCache cache = new SpotPriceSubscriptionCache(SpotPriceSubscription.of("DK1", CURRENCY_DKK),
clock);
populateWithSpotPrices(cache, first, last);
assertThat(populateWithSpotPrices(cache, first, last.plus(1, ChronoUnit.HOURS)), is(true));
}
private boolean populateWithSpotPrices(SpotPriceSubscriptionCache cache, Instant first, Instant last) {
int size = (int) Duration.between(first, last).getSeconds() / 60 / 60 + 1;
ElspotpriceRecord[] records = new ElspotpriceRecord[size];
int i = 0;
for (Instant hourStart = first; !hourStart.isAfter(last); hourStart = hourStart.plus(1, ChronoUnit.HOURS)) {
records[i++] = new ElspotpriceRecord(hourStart, BigDecimal.ONE, BigDecimal.ZERO);
}
return cache.put(records);
}
}