[http] Fix refresh time check and calculation (#16288)

Signed-off-by: Jan N. Klug <github@klug.nrw>
Signed-off-by: Ciprian Pascu <contact@ciprianpascu.ro>
This commit is contained in:
J-N-K 2024-01-17 21:21:30 +01:00 committed by Ciprian Pascu
parent f0dd5c6a79
commit d78843cfc3
4 changed files with 40 additions and 31 deletions

View File

@ -168,15 +168,6 @@ public class HttpThingHandler extends BaseThingHandler implements HttpStatusList
}
rateLimitedHttpClient.setDelay(config.delay);
int urlHandlerCount = urlHandlers.size();
if (urlHandlerCount * config.delay > config.refresh * 1000) {
// this should prevent the rate limit queue from filling up
config.refresh = (urlHandlerCount * config.delay) / 1000 + 1;
logger.warn(
"{} channels in thing {} with a delay of {} incompatible with the configured refresh time. Refresh-Time increased to the minimum of {}",
urlHandlerCount, thing.getUID(), config.delay, config.refresh);
}
// remove empty headers
config.headers.removeIf(String::isBlank);
@ -236,6 +227,17 @@ public class HttpThingHandler extends BaseThingHandler implements HttpStatusList
// create channels
thing.getChannels().forEach(this::createChannel);
int urlHandlerCount = urlHandlers.size();
if (urlHandlerCount * config.delay > config.refresh * 1000) {
// this should prevent the rate limit queue from filling up
config.refresh = (urlHandlerCount * config.delay) / 1000 + 1;
logger.warn(
"{} channels in thing {} with a delay of {} incompatible with the configured refresh time. Refresh-Time increased to the minimum of {}",
urlHandlerCount, thing.getUID(), config.delay, config.refresh);
}
urlHandlers.values().forEach(urlHandler -> urlHandler.start(scheduler, config.refresh));
updateStatus(ThingStatus.UNKNOWN);
}
@ -330,8 +332,9 @@ public class HttpThingHandler extends BaseThingHandler implements HttpStatusList
// we need a key consisting of stateContent and URL, only if both are equal, we can use the same cache
String key = channelConfig.stateContent + "$" + stateUrl;
channelUrls.put(channelUID, key);
Objects.requireNonNull(urlHandlers.computeIfAbsent(key,
k -> new RefreshingUrlCache(scheduler, rateLimitedHttpClient, stateUrl, config,
Objects.requireNonNull(
urlHandlers.computeIfAbsent(key,
k -> new RefreshingUrlCache(rateLimitedHttpClient, stateUrl, config,
channelConfig.stateContent, config.contentType, this)))
.addConsumer(itemValueConverter::process);
}

View File

@ -59,12 +59,11 @@ public class RefreshingUrlCache {
private final @Nullable String httpContentType;
private final HttpStatusListener httpStatusListener;
private final ScheduledFuture<?> future;
private @Nullable ScheduledFuture<?> future;
private @Nullable ChannelHandlerContent lastContent;
public RefreshingUrlCache(ScheduledExecutorService executor, RateLimitedHttpClient httpClient, String url,
HttpThingConfig thingConfig, String httpContent, @Nullable String httpContentType,
HttpStatusListener httpStatusListener) {
public RefreshingUrlCache(RateLimitedHttpClient httpClient, String url, HttpThingConfig thingConfig,
String httpContent, @Nullable String httpContentType, HttpStatusListener httpStatusListener) {
this.httpClient = httpClient;
this.url = url;
this.strictErrorHandling = thingConfig.strictErrorHandling;
@ -76,9 +75,25 @@ public class RefreshingUrlCache {
this.httpContentType = httpContentType;
this.httpStatusListener = httpStatusListener;
fallbackEncoding = thingConfig.encoding;
}
future = executor.scheduleWithFixedDelay(this::refresh, 1, thingConfig.refresh, TimeUnit.SECONDS);
logger.trace("Started refresh task for URL '{}' with interval {}s", url, thingConfig.refresh);
public void start(ScheduledExecutorService executor, int refreshTime) {
if (future != null) {
logger.warn("Starting refresh task requested but it is already started. This is bug.");
return;
}
future = executor.scheduleWithFixedDelay(this::refresh, 1, refreshTime, TimeUnit.SECONDS);
logger.trace("Started refresh task for URL '{}' with interval {}s", url, refreshTime);
}
public void stop() {
// clearing all listeners to prevent further updates
consumers.clear();
ScheduledFuture<?> future = this.future;
if (future != null) {
future.cancel(true);
logger.trace("Stopped refresh task for URL '{}'", url);
}
}
private void refresh() {
@ -132,13 +147,6 @@ public class RefreshingUrlCache {
}
}
public void stop() {
// clearing all listeners to prevent further updates
consumers.clear();
future.cancel(false);
logger.trace("Stopped refresh task for URL '{}'", url);
}
public void addConsumer(Consumer<@Nullable ChannelHandlerContent> consumer) {
consumers.add(consumer);
}

View File

@ -99,7 +99,7 @@ public class RateLimitedHttpClientTest extends AbstractWireMockTest {
assertThat((int) msBetween, allOf(greaterThanOrEqualTo(1000), lessThan(1100)));
}
private List<Response> doLimitTest(int setDelay, List<Boolean> config) {
private void doLimitTest(int setDelay, List<Boolean> config) {
stubFor(get(urlEqualTo(TEST_LOCATION)).willReturn(aResponse().withBody(TEST_CONTENT)));
RateLimitedHttpClient rateLimitedHttpClient = new RateLimitedHttpClient(httpClient, scheduler);
@ -129,8 +129,6 @@ public class RateLimitedHttpClientTest extends AbstractWireMockTest {
// wait until we got all results
waitForAssert(() -> assertEquals(config.size(), responses.size()));
rateLimitedHttpClient.shutdown();
return responses;
}
private static class Response {

View File

@ -252,10 +252,10 @@ public class RefreshingUrlCacheTest extends AbstractWireMockTest {
* @return the cache object
*/
private RefreshingUrlCache getUrlCache(String content) {
RefreshingUrlCache urlCache = new RefreshingUrlCache(scheduler, rateLimitedHttpClient, url, thingConfig,
content, null, statusListener);
RefreshingUrlCache urlCache = new RefreshingUrlCache(rateLimitedHttpClient, url, thingConfig, content, null,
statusListener);
urlCache.addConsumer(contentWrappers::add);
urlCache.start(scheduler, thingConfig.refresh);
return urlCache;
}
}