[homeconnect] Fix invalid authorization header during Server-Sent Events (SSE) client creation (#10848)

* [homeconnect] Improve logging of SSE connection and add backoff interval in case of connection error

Signed-off-by: Jonas Brüstel <jonas@bruestel.net>

* [homeconnect] Fix SSE authorization header problem

Signed-off-by: Jonas Brüstel <jonas@bruestel.net>

* [homeconnect] Fix synchronized monitor

Signed-off-by: Jonas Brüstel <jonas@bruestel.net>
This commit is contained in:
bruestel 2021-06-18 11:25:13 +02:00 committed by GitHub
parent 1630430705
commit ca1de9dc4d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 127 additions and 58 deletions

View File

@ -95,11 +95,11 @@ public class HomeConnectEventSourceClient {
if (!eventSourceConnections.containsKey(eventListener)) {
logger.debug("Create new event source listener for '{}'.", haId);
Client client = clientBuilder.readTimeout(SSE_REQUEST_READ_TIMEOUT, TimeUnit.SECONDS).register(
new HomeConnectStreamingRequestFilter(HttpHelper.getAuthorizationHeader(oAuthClientService)))
.build();
SseEventSource eventSource = eventSourceFactory
.newSource(client.target(apiUrl + "/api/homeappliances/" + haId + "/events"));
String target = apiUrl + "/api/homeappliances/" + haId + "/events";
Client client = createClient(target);
SseEventSource eventSource = eventSourceFactory.newSource(client.target(target));
HomeConnectEventSourceListener eventSourceListener = new HomeConnectEventSourceListener(haId, eventListener,
this, scheduler, eventQueue);
eventSource.register(eventSourceListener::onEvent, eventSourceListener::onError,
@ -149,9 +149,11 @@ public class HomeConnectEventSourceClient {
}
private void closeEventSource(SseEventSource eventSource, boolean immediate, boolean completed) {
if (eventSource.isOpen() && !completed) {
logger.debug("Close event source (immediate = {})", immediate);
eventSource.close(immediate ? 0 : 10, TimeUnit.SECONDS);
var open = eventSource.isOpen();
logger.debug("Closing event source. open={}, completed={}, immediate={}", open, completed, immediate);
if (open && !completed) {
eventSource.close(immediate ? 0 : 5, TimeUnit.SECONDS);
logger.debug("Event source closed.");
}
HomeConnectEventSourceListener eventSourceListener = eventSourceListeners.get(eventSource);
if (eventSourceListener != null) {
@ -159,6 +161,26 @@ public class HomeConnectEventSourceClient {
}
}
private Client createClient(String target) throws CommunicationException, AuthorizationException {
boolean filterRegistered = clientBuilder.getConfiguration()
.isRegistered(HomeConnectStreamingRequestFilter.class);
Client client;
HomeConnectStreamingRequestFilter filter;
if (filterRegistered) {
filter = clientBuilder.getConfiguration().getInstances().stream()
.filter(instance -> instance instanceof HomeConnectStreamingRequestFilter)
.map(instance -> (HomeConnectStreamingRequestFilter) instance).findAny().orElseThrow();
client = clientBuilder.readTimeout(SSE_REQUEST_READ_TIMEOUT, TimeUnit.SECONDS).build();
} else {
filter = new HomeConnectStreamingRequestFilter();
client = clientBuilder.readTimeout(SSE_REQUEST_READ_TIMEOUT, TimeUnit.SECONDS).register(filter).build();
}
filter.setAuthorizationHeader(target, HttpHelper.getAuthorizationHeader(oAuthClientService));
return client;
}
/**
* Connection count.
*

View File

@ -27,6 +27,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import javax.ws.rs.InternalServerErrorException;
import javax.ws.rs.NotAuthorizedException;
import javax.ws.rs.sse.InboundSseEvent;
@ -138,9 +139,14 @@ public class HomeConnectEventSourceListener {
// seconds. So we wait few seconds before trying again.
if (error instanceof NotAuthorizedException) {
logger.debug(
"Event source listener connection failure due to unauthorized exception : wait 10 seconds... haId={}",
"Event source listener connection failure due to unauthorized exception : wait 20 seconds... haId={}",
haId);
scheduledExecutorService.schedule(() -> eventListener.onClosed(), 10, TimeUnit.SECONDS);
scheduledExecutorService.schedule(() -> eventListener.onClosed(), 20, TimeUnit.SECONDS);
} else if (error instanceof InternalServerErrorException) {
logger.debug(
"Event source listener connection failure due to internal server exception : wait 2 seconds... haId={}",
haId);
scheduledExecutorService.schedule(() -> eventListener.onClosed(), 2, TimeUnit.SECONDS);
} else {
eventListener.onClosed();
}

View File

@ -13,14 +13,19 @@
package org.openhab.binding.homeconnect.internal.client;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import javax.ws.rs.client.ClientRequestContext;
import javax.ws.rs.client.ClientRequestFilter;
import javax.ws.rs.client.ClientResponseContext;
import javax.ws.rs.client.ClientResponseFilter;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MultivaluedMap;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Inserts Authorization header for requests on the streaming REST API.
@ -28,23 +33,49 @@ import org.eclipse.jdt.annotation.Nullable;
* @author Laurent Garnier - Initial contribution
*/
@NonNullByDefault
public class HomeConnectStreamingRequestFilter implements ClientRequestFilter {
public class HomeConnectStreamingRequestFilter implements ClientRequestFilter, ClientResponseFilter {
private static final String TEXT_EVENT_STREAM = "text/event-stream";
private final String authorizationHeader;
public HomeConnectStreamingRequestFilter(String authorizationHeader) {
this.authorizationHeader = authorizationHeader;
}
private final Logger logger = LoggerFactory.getLogger(HomeConnectStreamingRequestFilter.class);
private final ConcurrentHashMap<String, String> authorizationHeaders = new ConcurrentHashMap<>();
@Override
public void filter(@Nullable ClientRequestContext requestContext) throws IOException {
if (requestContext != null) {
MultivaluedMap<String, Object> headers = requestContext.getHeaders();
headers.putSingle(HttpHeaders.AUTHORIZATION, authorizationHeader);
String authorizationHeader = authorizationHeaders.get(requestContext.getUri().toString());
if (authorizationHeader != null) {
headers.putSingle(HttpHeaders.AUTHORIZATION, authorizationHeader);
} else {
logger.warn("No authorization header set! uri={}", requestContext.getUri());
}
headers.putSingle(HttpHeaders.CACHE_CONTROL, "no-cache");
headers.putSingle(HttpHeaders.ACCEPT, TEXT_EVENT_STREAM);
}
}
@Override
public void filter(@Nullable ClientRequestContext requestContext, @Nullable ClientResponseContext responseContext)
throws IOException {
if (logger.isDebugEnabled() && requestContext != null) {
StringBuilder sb = new StringBuilder();
sb.append("SSE connection: ");
sb.append(requestContext.getUri()).append("\n");
requestContext.getHeaders()
.forEach((name, value) -> sb.append("> ").append(name).append(": ").append(value).append("\n"));
if (responseContext != null) {
responseContext.getHeaders()
.forEach((name, value) -> sb.append("< ").append(name).append(": ").append(value).append("\n"));
}
logger.debug("{}", sb);
}
}
public void setAuthorizationHeader(String target, String header) {
logger.debug("Set authorization header. target={}, header={}", target, header);
authorizationHeaders.put(target, header);
}
}

View File

@ -60,6 +60,8 @@ public class HttpHelper {
private static final Gson GSON = new GsonBuilder().setPrettyPrinting().create();
private static final JsonParser JSON_PARSER = new JsonParser();
private static final Map<String, Bucket> BUCKET_MAP = new HashMap<>();
private static final Object AUTHORIZATION_HEADER_MONITOR = new Object();
private static final Object BUCKET_MONITOR = new Object();
private static @Nullable String lastAccessToken = null;
@ -90,36 +92,42 @@ public class HttpHelper {
public static String getAuthorizationHeader(OAuthClientService oAuthClientService)
throws AuthorizationException, CommunicationException {
try {
AccessTokenResponse accessTokenResponse = oAuthClientService.getAccessTokenResponse();
// refresh the token if it's about to expire
if (accessTokenResponse != null
&& accessTokenResponse.isExpired(LocalDateTime.now(), OAUTH_EXPIRE_BUFFER)) {
LoggerFactory.getLogger(HttpHelper.class).debug("Requesting a refresh of the access token.");
accessTokenResponse = oAuthClientService.refreshToken();
}
if (accessTokenResponse != null) {
String lastToken = lastAccessToken;
if (lastToken == null) {
LoggerFactory.getLogger(HttpHelper.class).debug("The used access token was created at {}",
accessTokenResponse.getCreatedOn().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
} else if (!lastToken.equals(accessTokenResponse.getAccessToken())) {
LoggerFactory.getLogger(HttpHelper.class).debug("The access token changed. New one created at {}",
accessTokenResponse.getCreatedOn().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
synchronized (AUTHORIZATION_HEADER_MONITOR) {
try {
AccessTokenResponse accessTokenResponse = oAuthClientService.getAccessTokenResponse();
// refresh the token if it's about to expire
if (accessTokenResponse != null
&& accessTokenResponse.isExpired(LocalDateTime.now(), OAUTH_EXPIRE_BUFFER)) {
LoggerFactory.getLogger(HttpHelper.class).debug("Requesting a refresh of the access token.");
accessTokenResponse = oAuthClientService.refreshToken();
}
lastAccessToken = accessTokenResponse.getAccessToken();
return BEARER + accessTokenResponse.getAccessToken();
} else {
LoggerFactory.getLogger(HttpHelper.class).error("No access token available! Fatal error.");
throw new AuthorizationException("No access token available!");
if (accessTokenResponse != null) {
String lastToken = lastAccessToken;
if (lastToken == null) {
LoggerFactory.getLogger(HttpHelper.class).debug("The used access token was created at {}",
accessTokenResponse.getCreatedOn().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
} else if (!lastToken.equals(accessTokenResponse.getAccessToken())) {
LoggerFactory.getLogger(HttpHelper.class).debug(
"The access token changed. New one created at {}",
accessTokenResponse.getCreatedOn().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
}
lastAccessToken = accessTokenResponse.getAccessToken();
LoggerFactory.getLogger(HttpHelper.class).debug("Current access token: {}",
accessTokenResponse.getAccessToken());
return BEARER + accessTokenResponse.getAccessToken();
} else {
LoggerFactory.getLogger(HttpHelper.class).error("No access token available! Fatal error.");
throw new AuthorizationException("No access token available!");
}
} catch (IOException e) {
String errorMessage = e.getMessage();
throw new CommunicationException(errorMessage != null ? errorMessage : "IOException", e);
} catch (OAuthException | OAuthResponseException e) {
String errorMessage = e.getMessage();
throw new AuthorizationException(errorMessage != null ? errorMessage : "oAuth exception", e);
}
} catch (IOException e) {
String errorMessage = e.getMessage();
throw new CommunicationException(errorMessage != null ? errorMessage : "IOException", e);
} catch (OAuthException | OAuthResponseException e) {
String errorMessage = e.getMessage();
throw new AuthorizationException(errorMessage != null ? errorMessage : "oAuth exception", e);
}
}
@ -127,20 +135,22 @@ public class HttpHelper {
return JSON_PARSER.parse(json);
}
private static synchronized Bucket getBucket(String clientId) {
Bucket bucket = null;
if (BUCKET_MAP.containsKey(clientId)) {
bucket = BUCKET_MAP.get(clientId);
}
private static Bucket getBucket(String clientId) {
synchronized (BUCKET_MONITOR) {
Bucket bucket = null;
if (BUCKET_MAP.containsKey(clientId)) {
bucket = BUCKET_MAP.get(clientId);
}
if (bucket == null) {
bucket = Bucket4j.builder()
// allows 50 tokens per minute (added 10 second buffer)
.addLimit(classic(50, intervally(50, Duration.ofSeconds(70))).withInitialTokens(40))
// but not often then 50 tokens per second
.addLimit(classic(10, intervally(10, Duration.ofSeconds(1)))).build();
BUCKET_MAP.put(clientId, bucket);
if (bucket == null) {
bucket = Bucket4j.builder()
// allows 50 tokens per minute (added 10 second buffer)
.addLimit(classic(50, intervally(50, Duration.ofSeconds(70))).withInitialTokens(40))
// but not often then 50 tokens per second
.addLimit(classic(10, intervally(10, Duration.ofSeconds(1)))).build();
BUCKET_MAP.put(clientId, bucket);
}
return bucket;
}
return bucket;
}
}