From d2bd5114fd94b50888e1888e9bae19c069667847 Mon Sep 17 00:00:00 2001 From: J-N-K Date: Thu, 31 Dec 2020 15:03:57 +0100 Subject: [PATCH] [deconz] refactoring and fix connection issues (#9614) * fix connection attempts when thing disposed Signed-off-by: Jan N. Klug * refactor session handling Signed-off-by: Jan N. Klug * rename Signed-off-by: Jan N. Klug * refactoring Signed-off-by: Jan N. Klug --- .../handler/DeconzBaseThingHandler.java | 22 ++-- .../internal/handler/DeconzBridgeHandler.java | 68 +++++----- .../netutils/WebSocketConnection.java | 119 +++++++++++++----- .../netutils/WebSocketConnectionListener.java | 7 -- 4 files changed, 128 insertions(+), 88 deletions(-) diff --git a/bundles/org.openhab.binding.deconz/src/main/java/org/openhab/binding/deconz/internal/handler/DeconzBaseThingHandler.java b/bundles/org.openhab.binding.deconz/src/main/java/org/openhab/binding/deconz/internal/handler/DeconzBaseThingHandler.java index 200b811bb..26285db53 100644 --- a/bundles/org.openhab.binding.deconz/src/main/java/org/openhab/binding/deconz/internal/handler/DeconzBaseThingHandler.java +++ b/bundles/org.openhab.binding.deconz/src/main/java/org/openhab/binding/deconz/internal/handler/DeconzBaseThingHandler.java @@ -12,16 +12,15 @@ */ package org.openhab.binding.deconz.internal.handler; -import static org.openhab.binding.deconz.internal.Util.buildUrl; - import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.eclipse.jdt.annotation.NonNullByDefault; import org.eclipse.jdt.annotation.Nullable; import org.openhab.binding.deconz.internal.dto.DeconzBaseMessage; -import org.openhab.binding.deconz.internal.netutils.AsyncHttpClient; import org.openhab.binding.deconz.internal.netutils.WebSocketConnection; import org.openhab.binding.deconz.internal.netutils.WebSocketMessageListener; import org.openhab.binding.deconz.internal.types.ResourceType; @@ -47,11 +46,9 @@ public abstract class DeconzBaseThingHandler extends BaseThingHandler implements private final Logger logger = LoggerFactory.getLogger(DeconzBaseThingHandler.class); protected final ResourceType resourceType; protected ThingConfig config = new ThingConfig(); - protected DeconzBridgeConfig bridgeConfig = new DeconzBridgeConfig(); protected final Gson gson; private @Nullable ScheduledFuture initializationJob; protected @Nullable WebSocketConnection connection; - protected @Nullable AsyncHttpClient http; public DeconzBaseThingHandler(Thing thing, Gson gson, ResourceType resourceType) { super(thing); @@ -111,8 +108,6 @@ public abstract class DeconzBaseThingHandler extends BaseThingHandler implements final WebSocketConnection webSocketConnection = bridgeHandler.getWebsocketConnection(); this.connection = webSocketConnection; - this.http = bridgeHandler.getHttp(); - this.bridgeConfig = bridgeHandler.getBridgeConfig(); updateStatus(ThingStatus.UNKNOWN, ThingStatusDetail.NONE); @@ -182,17 +177,14 @@ public abstract class DeconzBaseThingHandler extends BaseThingHandler implements */ protected void sendCommand(@Nullable Object object, Command originalCommand, ChannelUID channelUID, String commandUrl, @Nullable Runnable acceptProcessing) { - AsyncHttpClient asyncHttpClient = http; - if (asyncHttpClient == null) { + DeconzBridgeHandler bridgeHandler = getBridgeHandler(); + if (bridgeHandler == null) { return; } - String url = buildUrl(bridgeConfig.host, bridgeConfig.httpPort, bridgeConfig.apikey, - resourceType.getIdentifier(), config.id, commandUrl); + String endpoint = Stream.of(resourceType.getIdentifier(), config.id, commandUrl) + .collect(Collectors.joining("/")); - String json = object == null ? null : gson.toJson(object); - logger.trace("Sending {} to {} {} via {}", json, resourceType, config.id, url); - - asyncHttpClient.put(url, json, bridgeConfig.timeout).thenAccept(v -> { + bridgeHandler.sendObject(endpoint, object).thenAccept(v -> { if (acceptProcessing != null) { acceptProcessing.run(); } diff --git a/bundles/org.openhab.binding.deconz/src/main/java/org/openhab/binding/deconz/internal/handler/DeconzBridgeHandler.java b/bundles/org.openhab.binding.deconz/src/main/java/org/openhab/binding/deconz/internal/handler/DeconzBridgeHandler.java index 7a4b1633d..2a1672813 100644 --- a/bundles/org.openhab.binding.deconz/src/main/java/org/openhab/binding/deconz/internal/handler/DeconzBridgeHandler.java +++ b/bundles/org.openhab.binding.deconz/src/main/java/org/openhab/binding/deconz/internal/handler/DeconzBridgeHandler.java @@ -70,7 +70,7 @@ public class DeconzBridgeHandler extends BaseBridgeHandler implements WebSocketC private int websocketPort = 0; /** Prevent a dispose/init cycle while this flag is set. Use for property updates */ private boolean ignoreConfigurationUpdate; - private boolean websocketReconnect = false; + private boolean thingDisposing = false; private final ExpiringCacheAsync> fullStateCache = new ExpiringCacheAsync<>(1000); @@ -92,7 +92,7 @@ public class DeconzBridgeHandler extends BaseBridgeHandler implements WebSocketC @Override public Collection> getServices() { - return Collections.singleton(ThingDiscoveryService.class); + return Set.of(ThingDiscoveryService.class); } @Override @@ -123,11 +123,15 @@ public class DeconzBridgeHandler extends BaseBridgeHandler implements WebSocketC * @param r The response */ private void parseAPIKeyResponse(AsyncHttpClient.Result r) { + if (thingDisposing) { + // discard response if thing handler is already disposing + return; + } if (r.getResponseCode() == 403) { updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_PENDING, "Allow authentication for 3rd party apps. Trying again in " + POLL_FREQUENCY_SEC + " seconds"); stopTimer(); - scheduledFuture = scheduler.schedule(() -> requestApiKey(), POLL_FREQUENCY_SEC, TimeUnit.SECONDS); + scheduledFuture = scheduler.schedule(this::requestApiKey, POLL_FREQUENCY_SEC, TimeUnit.SECONDS); } else if (r.getResponseCode() == 200) { ApiKeyMessage[] response = Objects.requireNonNull(gson.fromJson(r.getBody(), ApiKeyMessage[].class)); if (response.length == 0) { @@ -160,7 +164,7 @@ public class DeconzBridgeHandler extends BaseBridgeHandler implements WebSocketC */ private CompletableFuture> refreshFullStateCache() { logger.trace("{} starts refreshing the fullStateCache", thing.getUID()); - if (config.apikey == null) { + if (config.apikey == null || thingDisposing) { return CompletableFuture.completedFuture(Optional.empty()); } String url = buildUrl(config.getHostWithoutPort(), config.httpPort, config.apikey); @@ -191,6 +195,10 @@ public class DeconzBridgeHandler extends BaseBridgeHandler implements WebSocketC */ public void initializeBridgeState() { getBridgeFullState().thenAccept(fullState -> fullState.ifPresentOrElse(state -> { + if (thingDisposing) { + // discard response if thing handler is already disposing + return; + } if (state.config.name.isEmpty()) { updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.NONE, "You are connected to a HUE bridge, not a deCONZ software!"); @@ -216,11 +224,12 @@ public class DeconzBridgeHandler extends BaseBridgeHandler implements WebSocketC // Use requested websocket port if no specific port is given websocketPort = config.port == 0 ? state.config.websocketport : config.port; - websocketReconnect = true; startWebsocket(); }, () -> { // initial response was empty, re-trying in POLL_FREQUENCY_SEC seconds - scheduledFuture = scheduler.schedule(this::initializeBridgeState, POLL_FREQUENCY_SEC, TimeUnit.SECONDS); + if (!thingDisposing) { + scheduledFuture = scheduler.schedule(this::initializeBridgeState, POLL_FREQUENCY_SEC, TimeUnit.SECONDS); + } })).exceptionally(e -> { if (e != null) { updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.NONE, e.getMessage()); @@ -237,7 +246,7 @@ public class DeconzBridgeHandler extends BaseBridgeHandler implements WebSocketC * {@link #initializeBridgeState} need to be called first to obtain the websocket port. */ private void startWebsocket() { - if (websocket.isConnected() || websocketPort == 0 || websocketReconnect == false) { + if (websocket.isConnected() || websocketPort == 0 || thingDisposing) { return; } @@ -251,11 +260,11 @@ public class DeconzBridgeHandler extends BaseBridgeHandler implements WebSocketC * Perform a request to the REST API for generating an API key. * */ - private CompletableFuture requestApiKey() { + private void requestApiKey() { updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_PENDING, "Requesting API Key"); stopTimer(); String url = buildUrl(config.getHostWithoutPort(), config.httpPort); - return http.post(url, "{\"devicetype\":\"openHAB\"}", config.timeout).thenAccept(this::parseAPIKeyResponse) + http.post(url, "{\"devicetype\":\"openHAB\"}", config.timeout).thenAccept(this::parseAPIKeyResponse) .exceptionally(e -> { updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, e.getMessage()); logger.warn("Authorisation failed", e); @@ -265,7 +274,8 @@ public class DeconzBridgeHandler extends BaseBridgeHandler implements WebSocketC @Override public void initialize() { - logger.debug("Start initializing!"); + logger.debug("Start initializing bridge {}", thing.getUID()); + thingDisposing = false; config = getConfigAs(DeconzBridgeConfig.class); if (config.apikey == null) { requestApiKey(); @@ -276,23 +286,11 @@ public class DeconzBridgeHandler extends BaseBridgeHandler implements WebSocketC @Override public void dispose() { - websocketReconnect = false; + thingDisposing = true; stopTimer(); websocket.close(); } - @Override - public void connectionError(@Nullable Throwable e) { - if (e != null) { - updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, e.getMessage()); - } else { - updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, "Unknown reason"); - } - stopTimer(); - // Wait for POLL_FREQUENCY_SEC after a connection error before trying again - scheduledFuture = scheduler.schedule(this::startWebsocket, POLL_FREQUENCY_SEC, TimeUnit.SECONDS); - } - @Override public void connectionEstablished() { stopTimer(); @@ -302,7 +300,10 @@ public class DeconzBridgeHandler extends BaseBridgeHandler implements WebSocketC @Override public void connectionLost(String reason) { updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, reason); - startWebsocket(); + + stopTimer(); + // Wait for POLL_FREQUENCY_SEC after a connection was closed before trying again + scheduledFuture = scheduler.schedule(this::startWebsocket, POLL_FREQUENCY_SEC, TimeUnit.SECONDS); } /** @@ -313,16 +314,17 @@ public class DeconzBridgeHandler extends BaseBridgeHandler implements WebSocketC } /** - * Return the http connection. + * Send an object to the gateway + * + * @param endPoint the endpoint (e.g. "lights/2/state") + * @param object the object (or null if no object) + * @return CompletableFuture of the result */ - public AsyncHttpClient getHttp() { - return http; - } + public CompletableFuture sendObject(String endPoint, @Nullable Object object) { + String json = object == null ? null : gson.toJson(object); + String url = buildUrl(config.host, config.httpPort, config.apikey, endPoint); + logger.trace("Sending {} via {}", json, url); - /** - * Return the bridge configuration. - */ - public DeconzBridgeConfig getBridgeConfig() { - return config; + return http.put(url, json, config.timeout); } } diff --git a/bundles/org.openhab.binding.deconz/src/main/java/org/openhab/binding/deconz/internal/netutils/WebSocketConnection.java b/bundles/org.openhab.binding.deconz/src/main/java/org/openhab/binding/deconz/internal/netutils/WebSocketConnection.java index 4d0fb4c4e..7bc096e7a 100644 --- a/bundles/org.openhab.binding.deconz/src/main/java/org/openhab/binding/deconz/internal/netutils/WebSocketConnection.java +++ b/bundles/org.openhab.binding.deconz/src/main/java/org/openhab/binding/deconz/internal/netutils/WebSocketConnection.java @@ -16,9 +16,10 @@ import java.net.URI; import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; import org.eclipse.jdt.annotation.NonNullByDefault; -import org.eclipse.jetty.util.thread.QueuedThreadPool; +import org.eclipse.jdt.annotation.Nullable; import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose; import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect; @@ -43,6 +44,7 @@ import com.google.gson.Gson; @WebSocket @NonNullByDefault public class WebSocketConnection { + private static final AtomicInteger INSTANCE_COUNTER = new AtomicInteger(); private final Logger logger = LoggerFactory.getLogger(WebSocketConnection.class); private final WebSocketClient client; @@ -50,16 +52,17 @@ public class WebSocketConnection { private final Gson gson; private final WebSocketConnectionListener connectionListener; - private final Map, WebSocketMessageListener> listeners = new ConcurrentHashMap<>(); + private final Map listeners = new ConcurrentHashMap<>(); private ConnectionState connectionState = ConnectionState.DISCONNECTED; + private @Nullable Session session; public WebSocketConnection(WebSocketConnectionListener listener, WebSocketClient client, Gson gson) { this.connectionListener = listener; this.client = client; this.client.setMaxIdleTimeout(0); this.gson = gson; - this.socketName = ((QueuedThreadPool) client.getExecutor()).getName() + "$" + this.hashCode(); + this.socketName = "Websocket$" + System.currentTimeMillis() + "-" + INSTANCE_COUNTER.incrementAndGet(); } public void start(String ip) { @@ -68,6 +71,8 @@ public class WebSocketConnection { } else if (connectionState == ConnectionState.CONNECTING) { logger.debug("{} already connecting", socketName); return; + } else if (connectionState == ConnectionState.DISCONNECTING) { + logger.warn("{} trying to re-connect while still disconnecting", socketName); } try { URI destUri = URI.create("ws://" + ip); @@ -75,7 +80,7 @@ public class WebSocketConnection { logger.debug("Trying to connect {} to {}", socketName, destUri); client.connect(this, destUri).get(); } catch (Exception e) { - connectionListener.connectionError(e); + connectionListener.connectionLost("Error while connecting: " + e.getMessage()); } } @@ -90,67 +95,104 @@ public class WebSocketConnection { } public void registerListener(ResourceType resourceType, String sensorID, WebSocketMessageListener listener) { - listeners.put(Map.entry(resourceType, sensorID), listener); + listeners.put(getListenerId(resourceType, sensorID), listener); } public void unregisterListener(ResourceType resourceType, String sensorID) { - listeners.remove(Map.entry(resourceType, sensorID)); + listeners.remove(getListenerId(resourceType, sensorID)); } @SuppressWarnings("unused") @OnWebSocketConnect public void onConnect(Session session) { connectionState = ConnectionState.CONNECTED; - logger.debug("{} successfully connected to {}", socketName, session.getRemoteAddress().getAddress()); + logger.debug("{} successfully connected to {}: {}", socketName, session.getRemoteAddress().getAddress(), + session.hashCode()); connectionListener.connectionEstablished(); + this.session = session; } @SuppressWarnings({ "null", "unused" }) @OnWebSocketMessage - public void onMessage(String message) { - logger.trace("Raw data received by websocket {}: {}", socketName, message); - - DeconzBaseMessage changedMessage = Objects.requireNonNull(gson.fromJson(message, DeconzBaseMessage.class)); - if (changedMessage.r == ResourceType.UNKNOWN) { - logger.trace("Received message has unknown resource type. Skipping message."); + public void onMessage(Session session, String message) { + if (!session.equals(this.session)) { + handleWrongSession(session, message); return; } + logger.trace("{} received raw data: {}", socketName, message); - WebSocketMessageListener listener = listeners.get(Map.entry(changedMessage.r, changedMessage.id)); - if (listener == null) { - logger.debug( - "Couldn't find listener for id {} with resource type {}. Either no thing for this id has been defined or this is a bug.", - changedMessage.id, changedMessage.r); - return; - } + try { + DeconzBaseMessage changedMessage = Objects.requireNonNull(gson.fromJson(message, DeconzBaseMessage.class)); + if (changedMessage.r == ResourceType.UNKNOWN) { + logger.trace("Received message has unknown resource type. Skipping message."); + return; + } - Class expectedMessageType = changedMessage.r.getExpectedMessageType(); - if (expectedMessageType == null) { - logger.warn("BUG! Could not get expected message type for resource type {}. Please report this incident.", - changedMessage.r); - return; - } + WebSocketMessageListener listener = listeners.get(getListenerId(changedMessage.r, changedMessage.id)); + if (listener == null) { + logger.debug( + "Couldn't find listener for id {} with resource type {}. Either no thing for this id has been defined or this is a bug.", + changedMessage.id, changedMessage.r); + return; + } - DeconzBaseMessage deconzMessage = gson.fromJson(message, expectedMessageType); - if (deconzMessage != null) { - listener.messageReceived(changedMessage.id, deconzMessage); + Class expectedMessageType = changedMessage.r.getExpectedMessageType(); + if (expectedMessageType == null) { + logger.warn( + "BUG! Could not get expected message type for resource type {}. Please report this incident.", + changedMessage.r); + return; + } + + DeconzBaseMessage deconzMessage = gson.fromJson(message, expectedMessageType); + if (deconzMessage != null) { + listener.messageReceived(changedMessage.id, deconzMessage); + + } + } catch (RuntimeException e) { + // we need to catch all processing exceptions, otherwise they could affect the connection + logger.warn("{} encountered an error while processing the message {}: {}", socketName, message, + e.getMessage()); } } @SuppressWarnings("unused") @OnWebSocketError - public void onError(Throwable cause) { - connectionState = ConnectionState.DISCONNECTED; - connectionListener.connectionError(cause); + public void onError(Session session, Throwable cause) { + if (!session.equals(this.session)) { + handleWrongSession(session, "Connection error: " + cause.getMessage()); + return; + } + logger.warn("{} connection errored, closing: {}", socketName, cause.getMessage()); + + Session storedSession = this.session; + if (storedSession != null && storedSession.isOpen()) { + storedSession.close(-1, "Processing error"); + } } @SuppressWarnings("unused") @OnWebSocketClose - public void onClose(int statusCode, String reason) { + public void onClose(Session session, int statusCode, String reason) { + if (!session.equals(this.session)) { + handleWrongSession(session, "Connection closed: " + statusCode + " / " + reason); + return; + } + logger.trace("{} closed connection: {} / {}", socketName, statusCode, reason); connectionState = ConnectionState.DISCONNECTED; + this.session = null; connectionListener.connectionLost(reason); } + private void handleWrongSession(Session session, String message) { + logger.warn("{}/{} received and discarded message for other session {}: {}.", socketName, session.hashCode(), + session.hashCode(), message); + if (session.isOpen()) { + // Close the session if it is still open. It should already be closed anyway + session.close(); + } + } + /** * check connection state (successfully connected) * @@ -160,6 +202,17 @@ public class WebSocketConnection { return connectionState == ConnectionState.CONNECTED; } + /** + * create a unique identifier for a listener + * + * @param resourceType the listener resource-type (LIGHT, SENSOR, ...) + * @param id the listener id (same as deconz-id) + * @return a unique string for this listener + */ + private String getListenerId(ResourceType resourceType, String id) { + return resourceType.name() + "$" + id; + } + /** * used internally to represent the connection state */ diff --git a/bundles/org.openhab.binding.deconz/src/main/java/org/openhab/binding/deconz/internal/netutils/WebSocketConnectionListener.java b/bundles/org.openhab.binding.deconz/src/main/java/org/openhab/binding/deconz/internal/netutils/WebSocketConnectionListener.java index 01749714e..ccf689544 100644 --- a/bundles/org.openhab.binding.deconz/src/main/java/org/openhab/binding/deconz/internal/netutils/WebSocketConnectionListener.java +++ b/bundles/org.openhab.binding.deconz/src/main/java/org/openhab/binding/deconz/internal/netutils/WebSocketConnectionListener.java @@ -21,13 +21,6 @@ import org.eclipse.jdt.annotation.NonNullByDefault; */ @NonNullByDefault public interface WebSocketConnectionListener { - /** - * An error occurred during connection or while connecting. - * - * @param e The error - */ - void connectionError(Throwable e); - /** * Connection successfully established. */