diff --git a/bundles/org.openhab.binding.homeconnect/src/main/java/org/openhab/binding/homeconnect/internal/client/HomeConnectEventSourceClient.java b/bundles/org.openhab.binding.homeconnect/src/main/java/org/openhab/binding/homeconnect/internal/client/HomeConnectEventSourceClient.java index ab0faf124d5..94fcb5e587b 100644 --- a/bundles/org.openhab.binding.homeconnect/src/main/java/org/openhab/binding/homeconnect/internal/client/HomeConnectEventSourceClient.java +++ b/bundles/org.openhab.binding.homeconnect/src/main/java/org/openhab/binding/homeconnect/internal/client/HomeConnectEventSourceClient.java @@ -95,11 +95,11 @@ public class HomeConnectEventSourceClient { if (!eventSourceConnections.containsKey(eventListener)) { logger.debug("Create new event source listener for '{}'.", haId); - Client client = clientBuilder.readTimeout(SSE_REQUEST_READ_TIMEOUT, TimeUnit.SECONDS).register( - new HomeConnectStreamingRequestFilter(HttpHelper.getAuthorizationHeader(oAuthClientService))) - .build(); - SseEventSource eventSource = eventSourceFactory - .newSource(client.target(apiUrl + "/api/homeappliances/" + haId + "/events")); + String target = apiUrl + "/api/homeappliances/" + haId + "/events"; + + Client client = createClient(target); + + SseEventSource eventSource = eventSourceFactory.newSource(client.target(target)); HomeConnectEventSourceListener eventSourceListener = new HomeConnectEventSourceListener(haId, eventListener, this, scheduler, eventQueue); eventSource.register(eventSourceListener::onEvent, eventSourceListener::onError, @@ -149,9 +149,11 @@ public class HomeConnectEventSourceClient { } private void closeEventSource(SseEventSource eventSource, boolean immediate, boolean completed) { - if (eventSource.isOpen() && !completed) { - logger.debug("Close event source (immediate = {})", immediate); - eventSource.close(immediate ? 0 : 10, TimeUnit.SECONDS); + var open = eventSource.isOpen(); + logger.debug("Closing event source. open={}, completed={}, immediate={}", open, completed, immediate); + if (open && !completed) { + eventSource.close(immediate ? 0 : 5, TimeUnit.SECONDS); + logger.debug("Event source closed."); } HomeConnectEventSourceListener eventSourceListener = eventSourceListeners.get(eventSource); if (eventSourceListener != null) { @@ -159,6 +161,26 @@ public class HomeConnectEventSourceClient { } } + private Client createClient(String target) throws CommunicationException, AuthorizationException { + boolean filterRegistered = clientBuilder.getConfiguration() + .isRegistered(HomeConnectStreamingRequestFilter.class); + + Client client; + HomeConnectStreamingRequestFilter filter; + if (filterRegistered) { + filter = clientBuilder.getConfiguration().getInstances().stream() + .filter(instance -> instance instanceof HomeConnectStreamingRequestFilter) + .map(instance -> (HomeConnectStreamingRequestFilter) instance).findAny().orElseThrow(); + client = clientBuilder.readTimeout(SSE_REQUEST_READ_TIMEOUT, TimeUnit.SECONDS).build(); + } else { + filter = new HomeConnectStreamingRequestFilter(); + client = clientBuilder.readTimeout(SSE_REQUEST_READ_TIMEOUT, TimeUnit.SECONDS).register(filter).build(); + } + filter.setAuthorizationHeader(target, HttpHelper.getAuthorizationHeader(oAuthClientService)); + + return client; + } + /** * Connection count. * diff --git a/bundles/org.openhab.binding.homeconnect/src/main/java/org/openhab/binding/homeconnect/internal/client/HomeConnectEventSourceListener.java b/bundles/org.openhab.binding.homeconnect/src/main/java/org/openhab/binding/homeconnect/internal/client/HomeConnectEventSourceListener.java index e33b0456835..8341d5e47cb 100644 --- a/bundles/org.openhab.binding.homeconnect/src/main/java/org/openhab/binding/homeconnect/internal/client/HomeConnectEventSourceListener.java +++ b/bundles/org.openhab.binding.homeconnect/src/main/java/org/openhab/binding/homeconnect/internal/client/HomeConnectEventSourceListener.java @@ -27,6 +27,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import javax.ws.rs.InternalServerErrorException; import javax.ws.rs.NotAuthorizedException; import javax.ws.rs.sse.InboundSseEvent; @@ -138,9 +139,14 @@ public class HomeConnectEventSourceListener { // seconds. So we wait few seconds before trying again. if (error instanceof NotAuthorizedException) { logger.debug( - "Event source listener connection failure due to unauthorized exception : wait 10 seconds... haId={}", + "Event source listener connection failure due to unauthorized exception : wait 20 seconds... haId={}", haId); - scheduledExecutorService.schedule(() -> eventListener.onClosed(), 10, TimeUnit.SECONDS); + scheduledExecutorService.schedule(() -> eventListener.onClosed(), 20, TimeUnit.SECONDS); + } else if (error instanceof InternalServerErrorException) { + logger.debug( + "Event source listener connection failure due to internal server exception : wait 2 seconds... haId={}", + haId); + scheduledExecutorService.schedule(() -> eventListener.onClosed(), 2, TimeUnit.SECONDS); } else { eventListener.onClosed(); } diff --git a/bundles/org.openhab.binding.homeconnect/src/main/java/org/openhab/binding/homeconnect/internal/client/HomeConnectStreamingRequestFilter.java b/bundles/org.openhab.binding.homeconnect/src/main/java/org/openhab/binding/homeconnect/internal/client/HomeConnectStreamingRequestFilter.java index c93acd25bc1..66984e89178 100644 --- a/bundles/org.openhab.binding.homeconnect/src/main/java/org/openhab/binding/homeconnect/internal/client/HomeConnectStreamingRequestFilter.java +++ b/bundles/org.openhab.binding.homeconnect/src/main/java/org/openhab/binding/homeconnect/internal/client/HomeConnectStreamingRequestFilter.java @@ -13,14 +13,19 @@ package org.openhab.binding.homeconnect.internal.client; import java.io.IOException; +import java.util.concurrent.ConcurrentHashMap; import javax.ws.rs.client.ClientRequestContext; import javax.ws.rs.client.ClientRequestFilter; +import javax.ws.rs.client.ClientResponseContext; +import javax.ws.rs.client.ClientResponseFilter; import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.MultivaluedMap; import org.eclipse.jdt.annotation.NonNullByDefault; import org.eclipse.jdt.annotation.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Inserts Authorization header for requests on the streaming REST API. @@ -28,23 +33,49 @@ import org.eclipse.jdt.annotation.Nullable; * @author Laurent Garnier - Initial contribution */ @NonNullByDefault -public class HomeConnectStreamingRequestFilter implements ClientRequestFilter { +public class HomeConnectStreamingRequestFilter implements ClientRequestFilter, ClientResponseFilter { private static final String TEXT_EVENT_STREAM = "text/event-stream"; - private final String authorizationHeader; - - public HomeConnectStreamingRequestFilter(String authorizationHeader) { - this.authorizationHeader = authorizationHeader; - } + private final Logger logger = LoggerFactory.getLogger(HomeConnectStreamingRequestFilter.class); + private final ConcurrentHashMap authorizationHeaders = new ConcurrentHashMap<>(); @Override public void filter(@Nullable ClientRequestContext requestContext) throws IOException { if (requestContext != null) { MultivaluedMap headers = requestContext.getHeaders(); - headers.putSingle(HttpHeaders.AUTHORIZATION, authorizationHeader); + String authorizationHeader = authorizationHeaders.get(requestContext.getUri().toString()); + if (authorizationHeader != null) { + headers.putSingle(HttpHeaders.AUTHORIZATION, authorizationHeader); + } else { + logger.warn("No authorization header set! uri={}", requestContext.getUri()); + } headers.putSingle(HttpHeaders.CACHE_CONTROL, "no-cache"); headers.putSingle(HttpHeaders.ACCEPT, TEXT_EVENT_STREAM); } } + + @Override + public void filter(@Nullable ClientRequestContext requestContext, @Nullable ClientResponseContext responseContext) + throws IOException { + if (logger.isDebugEnabled() && requestContext != null) { + StringBuilder sb = new StringBuilder(); + sb.append("SSE connection: "); + sb.append(requestContext.getUri()).append("\n"); + requestContext.getHeaders() + .forEach((name, value) -> sb.append("> ").append(name).append(": ").append(value).append("\n")); + + if (responseContext != null) { + responseContext.getHeaders() + .forEach((name, value) -> sb.append("< ").append(name).append(": ").append(value).append("\n")); + } + + logger.debug("{}", sb); + } + } + + public void setAuthorizationHeader(String target, String header) { + logger.debug("Set authorization header. target={}, header={}", target, header); + authorizationHeaders.put(target, header); + } } diff --git a/bundles/org.openhab.binding.homeconnect/src/main/java/org/openhab/binding/homeconnect/internal/client/HttpHelper.java b/bundles/org.openhab.binding.homeconnect/src/main/java/org/openhab/binding/homeconnect/internal/client/HttpHelper.java index c03f5896115..3fdaedb3807 100644 --- a/bundles/org.openhab.binding.homeconnect/src/main/java/org/openhab/binding/homeconnect/internal/client/HttpHelper.java +++ b/bundles/org.openhab.binding.homeconnect/src/main/java/org/openhab/binding/homeconnect/internal/client/HttpHelper.java @@ -60,6 +60,8 @@ public class HttpHelper { private static final Gson GSON = new GsonBuilder().setPrettyPrinting().create(); private static final JsonParser JSON_PARSER = new JsonParser(); private static final Map BUCKET_MAP = new HashMap<>(); + private static final Object AUTHORIZATION_HEADER_MONITOR = new Object(); + private static final Object BUCKET_MONITOR = new Object(); private static @Nullable String lastAccessToken = null; @@ -90,36 +92,42 @@ public class HttpHelper { public static String getAuthorizationHeader(OAuthClientService oAuthClientService) throws AuthorizationException, CommunicationException { - try { - AccessTokenResponse accessTokenResponse = oAuthClientService.getAccessTokenResponse(); - // refresh the token if it's about to expire - if (accessTokenResponse != null - && accessTokenResponse.isExpired(LocalDateTime.now(), OAUTH_EXPIRE_BUFFER)) { - LoggerFactory.getLogger(HttpHelper.class).debug("Requesting a refresh of the access token."); - accessTokenResponse = oAuthClientService.refreshToken(); - } - - if (accessTokenResponse != null) { - String lastToken = lastAccessToken; - if (lastToken == null) { - LoggerFactory.getLogger(HttpHelper.class).debug("The used access token was created at {}", - accessTokenResponse.getCreatedOn().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)); - } else if (!lastToken.equals(accessTokenResponse.getAccessToken())) { - LoggerFactory.getLogger(HttpHelper.class).debug("The access token changed. New one created at {}", - accessTokenResponse.getCreatedOn().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)); + synchronized (AUTHORIZATION_HEADER_MONITOR) { + try { + AccessTokenResponse accessTokenResponse = oAuthClientService.getAccessTokenResponse(); + // refresh the token if it's about to expire + if (accessTokenResponse != null + && accessTokenResponse.isExpired(LocalDateTime.now(), OAUTH_EXPIRE_BUFFER)) { + LoggerFactory.getLogger(HttpHelper.class).debug("Requesting a refresh of the access token."); + accessTokenResponse = oAuthClientService.refreshToken(); } - lastAccessToken = accessTokenResponse.getAccessToken(); - return BEARER + accessTokenResponse.getAccessToken(); - } else { - LoggerFactory.getLogger(HttpHelper.class).error("No access token available! Fatal error."); - throw new AuthorizationException("No access token available!"); + + if (accessTokenResponse != null) { + String lastToken = lastAccessToken; + if (lastToken == null) { + LoggerFactory.getLogger(HttpHelper.class).debug("The used access token was created at {}", + accessTokenResponse.getCreatedOn().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)); + } else if (!lastToken.equals(accessTokenResponse.getAccessToken())) { + LoggerFactory.getLogger(HttpHelper.class).debug( + "The access token changed. New one created at {}", + accessTokenResponse.getCreatedOn().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)); + } + lastAccessToken = accessTokenResponse.getAccessToken(); + + LoggerFactory.getLogger(HttpHelper.class).debug("Current access token: {}", + accessTokenResponse.getAccessToken()); + return BEARER + accessTokenResponse.getAccessToken(); + } else { + LoggerFactory.getLogger(HttpHelper.class).error("No access token available! Fatal error."); + throw new AuthorizationException("No access token available!"); + } + } catch (IOException e) { + String errorMessage = e.getMessage(); + throw new CommunicationException(errorMessage != null ? errorMessage : "IOException", e); + } catch (OAuthException | OAuthResponseException e) { + String errorMessage = e.getMessage(); + throw new AuthorizationException(errorMessage != null ? errorMessage : "oAuth exception", e); } - } catch (IOException e) { - String errorMessage = e.getMessage(); - throw new CommunicationException(errorMessage != null ? errorMessage : "IOException", e); - } catch (OAuthException | OAuthResponseException e) { - String errorMessage = e.getMessage(); - throw new AuthorizationException(errorMessage != null ? errorMessage : "oAuth exception", e); } } @@ -127,20 +135,22 @@ public class HttpHelper { return JSON_PARSER.parse(json); } - private static synchronized Bucket getBucket(String clientId) { - Bucket bucket = null; - if (BUCKET_MAP.containsKey(clientId)) { - bucket = BUCKET_MAP.get(clientId); - } + private static Bucket getBucket(String clientId) { + synchronized (BUCKET_MONITOR) { + Bucket bucket = null; + if (BUCKET_MAP.containsKey(clientId)) { + bucket = BUCKET_MAP.get(clientId); + } - if (bucket == null) { - bucket = Bucket4j.builder() - // allows 50 tokens per minute (added 10 second buffer) - .addLimit(classic(50, intervally(50, Duration.ofSeconds(70))).withInitialTokens(40)) - // but not often then 50 tokens per second - .addLimit(classic(10, intervally(10, Duration.ofSeconds(1)))).build(); - BUCKET_MAP.put(clientId, bucket); + if (bucket == null) { + bucket = Bucket4j.builder() + // allows 50 tokens per minute (added 10 second buffer) + .addLimit(classic(50, intervally(50, Duration.ofSeconds(70))).withInitialTokens(40)) + // but not often then 50 tokens per second + .addLimit(classic(10, intervally(10, Duration.ofSeconds(1)))).build(); + BUCKET_MAP.put(clientId, bucket); + } + return bucket; } - return bucket; } }