[deconz] refactoring and fix connection issues (#9614)
* fix connection attempts when thing disposed Signed-off-by: Jan N. Klug <jan.n.klug@rub.de> * refactor session handling Signed-off-by: Jan N. Klug <jan.n.klug@rub.de> * rename Signed-off-by: Jan N. Klug <jan.n.klug@rub.de> * refactoring Signed-off-by: Jan N. Klug <jan.n.klug@rub.de>
This commit is contained in:
parent
924e0c7faf
commit
d2bd5114fd
@ -12,16 +12,15 @@
|
|||||||
*/
|
*/
|
||||||
package org.openhab.binding.deconz.internal.handler;
|
package org.openhab.binding.deconz.internal.handler;
|
||||||
|
|
||||||
import static org.openhab.binding.deconz.internal.Util.buildUrl;
|
|
||||||
|
|
||||||
import java.util.concurrent.ScheduledFuture;
|
import java.util.concurrent.ScheduledFuture;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
import org.eclipse.jdt.annotation.NonNullByDefault;
|
import org.eclipse.jdt.annotation.NonNullByDefault;
|
||||||
import org.eclipse.jdt.annotation.Nullable;
|
import org.eclipse.jdt.annotation.Nullable;
|
||||||
import org.openhab.binding.deconz.internal.dto.DeconzBaseMessage;
|
import org.openhab.binding.deconz.internal.dto.DeconzBaseMessage;
|
||||||
import org.openhab.binding.deconz.internal.netutils.AsyncHttpClient;
|
|
||||||
import org.openhab.binding.deconz.internal.netutils.WebSocketConnection;
|
import org.openhab.binding.deconz.internal.netutils.WebSocketConnection;
|
||||||
import org.openhab.binding.deconz.internal.netutils.WebSocketMessageListener;
|
import org.openhab.binding.deconz.internal.netutils.WebSocketMessageListener;
|
||||||
import org.openhab.binding.deconz.internal.types.ResourceType;
|
import org.openhab.binding.deconz.internal.types.ResourceType;
|
||||||
@ -47,11 +46,9 @@ public abstract class DeconzBaseThingHandler extends BaseThingHandler implements
|
|||||||
private final Logger logger = LoggerFactory.getLogger(DeconzBaseThingHandler.class);
|
private final Logger logger = LoggerFactory.getLogger(DeconzBaseThingHandler.class);
|
||||||
protected final ResourceType resourceType;
|
protected final ResourceType resourceType;
|
||||||
protected ThingConfig config = new ThingConfig();
|
protected ThingConfig config = new ThingConfig();
|
||||||
protected DeconzBridgeConfig bridgeConfig = new DeconzBridgeConfig();
|
|
||||||
protected final Gson gson;
|
protected final Gson gson;
|
||||||
private @Nullable ScheduledFuture<?> initializationJob;
|
private @Nullable ScheduledFuture<?> initializationJob;
|
||||||
protected @Nullable WebSocketConnection connection;
|
protected @Nullable WebSocketConnection connection;
|
||||||
protected @Nullable AsyncHttpClient http;
|
|
||||||
|
|
||||||
public DeconzBaseThingHandler(Thing thing, Gson gson, ResourceType resourceType) {
|
public DeconzBaseThingHandler(Thing thing, Gson gson, ResourceType resourceType) {
|
||||||
super(thing);
|
super(thing);
|
||||||
@ -111,8 +108,6 @@ public abstract class DeconzBaseThingHandler extends BaseThingHandler implements
|
|||||||
|
|
||||||
final WebSocketConnection webSocketConnection = bridgeHandler.getWebsocketConnection();
|
final WebSocketConnection webSocketConnection = bridgeHandler.getWebsocketConnection();
|
||||||
this.connection = webSocketConnection;
|
this.connection = webSocketConnection;
|
||||||
this.http = bridgeHandler.getHttp();
|
|
||||||
this.bridgeConfig = bridgeHandler.getBridgeConfig();
|
|
||||||
|
|
||||||
updateStatus(ThingStatus.UNKNOWN, ThingStatusDetail.NONE);
|
updateStatus(ThingStatus.UNKNOWN, ThingStatusDetail.NONE);
|
||||||
|
|
||||||
@ -182,17 +177,14 @@ public abstract class DeconzBaseThingHandler extends BaseThingHandler implements
|
|||||||
*/
|
*/
|
||||||
protected void sendCommand(@Nullable Object object, Command originalCommand, ChannelUID channelUID,
|
protected void sendCommand(@Nullable Object object, Command originalCommand, ChannelUID channelUID,
|
||||||
String commandUrl, @Nullable Runnable acceptProcessing) {
|
String commandUrl, @Nullable Runnable acceptProcessing) {
|
||||||
AsyncHttpClient asyncHttpClient = http;
|
DeconzBridgeHandler bridgeHandler = getBridgeHandler();
|
||||||
if (asyncHttpClient == null) {
|
if (bridgeHandler == null) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
String url = buildUrl(bridgeConfig.host, bridgeConfig.httpPort, bridgeConfig.apikey,
|
String endpoint = Stream.of(resourceType.getIdentifier(), config.id, commandUrl)
|
||||||
resourceType.getIdentifier(), config.id, commandUrl);
|
.collect(Collectors.joining("/"));
|
||||||
|
|
||||||
String json = object == null ? null : gson.toJson(object);
|
bridgeHandler.sendObject(endpoint, object).thenAccept(v -> {
|
||||||
logger.trace("Sending {} to {} {} via {}", json, resourceType, config.id, url);
|
|
||||||
|
|
||||||
asyncHttpClient.put(url, json, bridgeConfig.timeout).thenAccept(v -> {
|
|
||||||
if (acceptProcessing != null) {
|
if (acceptProcessing != null) {
|
||||||
acceptProcessing.run();
|
acceptProcessing.run();
|
||||||
}
|
}
|
||||||
|
|||||||
@ -70,7 +70,7 @@ public class DeconzBridgeHandler extends BaseBridgeHandler implements WebSocketC
|
|||||||
private int websocketPort = 0;
|
private int websocketPort = 0;
|
||||||
/** Prevent a dispose/init cycle while this flag is set. Use for property updates */
|
/** Prevent a dispose/init cycle while this flag is set. Use for property updates */
|
||||||
private boolean ignoreConfigurationUpdate;
|
private boolean ignoreConfigurationUpdate;
|
||||||
private boolean websocketReconnect = false;
|
private boolean thingDisposing = false;
|
||||||
|
|
||||||
private final ExpiringCacheAsync<Optional<BridgeFullState>> fullStateCache = new ExpiringCacheAsync<>(1000);
|
private final ExpiringCacheAsync<Optional<BridgeFullState>> fullStateCache = new ExpiringCacheAsync<>(1000);
|
||||||
|
|
||||||
@ -92,7 +92,7 @@ public class DeconzBridgeHandler extends BaseBridgeHandler implements WebSocketC
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Collection<Class<? extends ThingHandlerService>> getServices() {
|
public Collection<Class<? extends ThingHandlerService>> getServices() {
|
||||||
return Collections.singleton(ThingDiscoveryService.class);
|
return Set.of(ThingDiscoveryService.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -123,11 +123,15 @@ public class DeconzBridgeHandler extends BaseBridgeHandler implements WebSocketC
|
|||||||
* @param r The response
|
* @param r The response
|
||||||
*/
|
*/
|
||||||
private void parseAPIKeyResponse(AsyncHttpClient.Result r) {
|
private void parseAPIKeyResponse(AsyncHttpClient.Result r) {
|
||||||
|
if (thingDisposing) {
|
||||||
|
// discard response if thing handler is already disposing
|
||||||
|
return;
|
||||||
|
}
|
||||||
if (r.getResponseCode() == 403) {
|
if (r.getResponseCode() == 403) {
|
||||||
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_PENDING,
|
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_PENDING,
|
||||||
"Allow authentication for 3rd party apps. Trying again in " + POLL_FREQUENCY_SEC + " seconds");
|
"Allow authentication for 3rd party apps. Trying again in " + POLL_FREQUENCY_SEC + " seconds");
|
||||||
stopTimer();
|
stopTimer();
|
||||||
scheduledFuture = scheduler.schedule(() -> requestApiKey(), POLL_FREQUENCY_SEC, TimeUnit.SECONDS);
|
scheduledFuture = scheduler.schedule(this::requestApiKey, POLL_FREQUENCY_SEC, TimeUnit.SECONDS);
|
||||||
} else if (r.getResponseCode() == 200) {
|
} else if (r.getResponseCode() == 200) {
|
||||||
ApiKeyMessage[] response = Objects.requireNonNull(gson.fromJson(r.getBody(), ApiKeyMessage[].class));
|
ApiKeyMessage[] response = Objects.requireNonNull(gson.fromJson(r.getBody(), ApiKeyMessage[].class));
|
||||||
if (response.length == 0) {
|
if (response.length == 0) {
|
||||||
@ -160,7 +164,7 @@ public class DeconzBridgeHandler extends BaseBridgeHandler implements WebSocketC
|
|||||||
*/
|
*/
|
||||||
private CompletableFuture<Optional<BridgeFullState>> refreshFullStateCache() {
|
private CompletableFuture<Optional<BridgeFullState>> refreshFullStateCache() {
|
||||||
logger.trace("{} starts refreshing the fullStateCache", thing.getUID());
|
logger.trace("{} starts refreshing the fullStateCache", thing.getUID());
|
||||||
if (config.apikey == null) {
|
if (config.apikey == null || thingDisposing) {
|
||||||
return CompletableFuture.completedFuture(Optional.empty());
|
return CompletableFuture.completedFuture(Optional.empty());
|
||||||
}
|
}
|
||||||
String url = buildUrl(config.getHostWithoutPort(), config.httpPort, config.apikey);
|
String url = buildUrl(config.getHostWithoutPort(), config.httpPort, config.apikey);
|
||||||
@ -191,6 +195,10 @@ public class DeconzBridgeHandler extends BaseBridgeHandler implements WebSocketC
|
|||||||
*/
|
*/
|
||||||
public void initializeBridgeState() {
|
public void initializeBridgeState() {
|
||||||
getBridgeFullState().thenAccept(fullState -> fullState.ifPresentOrElse(state -> {
|
getBridgeFullState().thenAccept(fullState -> fullState.ifPresentOrElse(state -> {
|
||||||
|
if (thingDisposing) {
|
||||||
|
// discard response if thing handler is already disposing
|
||||||
|
return;
|
||||||
|
}
|
||||||
if (state.config.name.isEmpty()) {
|
if (state.config.name.isEmpty()) {
|
||||||
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.NONE,
|
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.NONE,
|
||||||
"You are connected to a HUE bridge, not a deCONZ software!");
|
"You are connected to a HUE bridge, not a deCONZ software!");
|
||||||
@ -216,11 +224,12 @@ public class DeconzBridgeHandler extends BaseBridgeHandler implements WebSocketC
|
|||||||
|
|
||||||
// Use requested websocket port if no specific port is given
|
// Use requested websocket port if no specific port is given
|
||||||
websocketPort = config.port == 0 ? state.config.websocketport : config.port;
|
websocketPort = config.port == 0 ? state.config.websocketport : config.port;
|
||||||
websocketReconnect = true;
|
|
||||||
startWebsocket();
|
startWebsocket();
|
||||||
}, () -> {
|
}, () -> {
|
||||||
// initial response was empty, re-trying in POLL_FREQUENCY_SEC seconds
|
// initial response was empty, re-trying in POLL_FREQUENCY_SEC seconds
|
||||||
|
if (!thingDisposing) {
|
||||||
scheduledFuture = scheduler.schedule(this::initializeBridgeState, POLL_FREQUENCY_SEC, TimeUnit.SECONDS);
|
scheduledFuture = scheduler.schedule(this::initializeBridgeState, POLL_FREQUENCY_SEC, TimeUnit.SECONDS);
|
||||||
|
}
|
||||||
})).exceptionally(e -> {
|
})).exceptionally(e -> {
|
||||||
if (e != null) {
|
if (e != null) {
|
||||||
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.NONE, e.getMessage());
|
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.NONE, e.getMessage());
|
||||||
@ -237,7 +246,7 @@ public class DeconzBridgeHandler extends BaseBridgeHandler implements WebSocketC
|
|||||||
* {@link #initializeBridgeState} need to be called first to obtain the websocket port.
|
* {@link #initializeBridgeState} need to be called first to obtain the websocket port.
|
||||||
*/
|
*/
|
||||||
private void startWebsocket() {
|
private void startWebsocket() {
|
||||||
if (websocket.isConnected() || websocketPort == 0 || websocketReconnect == false) {
|
if (websocket.isConnected() || websocketPort == 0 || thingDisposing) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -251,11 +260,11 @@ public class DeconzBridgeHandler extends BaseBridgeHandler implements WebSocketC
|
|||||||
* Perform a request to the REST API for generating an API key.
|
* Perform a request to the REST API for generating an API key.
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
private CompletableFuture<?> requestApiKey() {
|
private void requestApiKey() {
|
||||||
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_PENDING, "Requesting API Key");
|
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_PENDING, "Requesting API Key");
|
||||||
stopTimer();
|
stopTimer();
|
||||||
String url = buildUrl(config.getHostWithoutPort(), config.httpPort);
|
String url = buildUrl(config.getHostWithoutPort(), config.httpPort);
|
||||||
return http.post(url, "{\"devicetype\":\"openHAB\"}", config.timeout).thenAccept(this::parseAPIKeyResponse)
|
http.post(url, "{\"devicetype\":\"openHAB\"}", config.timeout).thenAccept(this::parseAPIKeyResponse)
|
||||||
.exceptionally(e -> {
|
.exceptionally(e -> {
|
||||||
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, e.getMessage());
|
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, e.getMessage());
|
||||||
logger.warn("Authorisation failed", e);
|
logger.warn("Authorisation failed", e);
|
||||||
@ -265,7 +274,8 @@ public class DeconzBridgeHandler extends BaseBridgeHandler implements WebSocketC
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void initialize() {
|
public void initialize() {
|
||||||
logger.debug("Start initializing!");
|
logger.debug("Start initializing bridge {}", thing.getUID());
|
||||||
|
thingDisposing = false;
|
||||||
config = getConfigAs(DeconzBridgeConfig.class);
|
config = getConfigAs(DeconzBridgeConfig.class);
|
||||||
if (config.apikey == null) {
|
if (config.apikey == null) {
|
||||||
requestApiKey();
|
requestApiKey();
|
||||||
@ -276,23 +286,11 @@ public class DeconzBridgeHandler extends BaseBridgeHandler implements WebSocketC
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void dispose() {
|
public void dispose() {
|
||||||
websocketReconnect = false;
|
thingDisposing = true;
|
||||||
stopTimer();
|
stopTimer();
|
||||||
websocket.close();
|
websocket.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void connectionError(@Nullable Throwable e) {
|
|
||||||
if (e != null) {
|
|
||||||
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, e.getMessage());
|
|
||||||
} else {
|
|
||||||
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, "Unknown reason");
|
|
||||||
}
|
|
||||||
stopTimer();
|
|
||||||
// Wait for POLL_FREQUENCY_SEC after a connection error before trying again
|
|
||||||
scheduledFuture = scheduler.schedule(this::startWebsocket, POLL_FREQUENCY_SEC, TimeUnit.SECONDS);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void connectionEstablished() {
|
public void connectionEstablished() {
|
||||||
stopTimer();
|
stopTimer();
|
||||||
@ -302,7 +300,10 @@ public class DeconzBridgeHandler extends BaseBridgeHandler implements WebSocketC
|
|||||||
@Override
|
@Override
|
||||||
public void connectionLost(String reason) {
|
public void connectionLost(String reason) {
|
||||||
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, reason);
|
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, reason);
|
||||||
startWebsocket();
|
|
||||||
|
stopTimer();
|
||||||
|
// Wait for POLL_FREQUENCY_SEC after a connection was closed before trying again
|
||||||
|
scheduledFuture = scheduler.schedule(this::startWebsocket, POLL_FREQUENCY_SEC, TimeUnit.SECONDS);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -313,16 +314,17 @@ public class DeconzBridgeHandler extends BaseBridgeHandler implements WebSocketC
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return the http connection.
|
* Send an object to the gateway
|
||||||
|
*
|
||||||
|
* @param endPoint the endpoint (e.g. "lights/2/state")
|
||||||
|
* @param object the object (or null if no object)
|
||||||
|
* @return CompletableFuture of the result
|
||||||
*/
|
*/
|
||||||
public AsyncHttpClient getHttp() {
|
public CompletableFuture<AsyncHttpClient.Result> sendObject(String endPoint, @Nullable Object object) {
|
||||||
return http;
|
String json = object == null ? null : gson.toJson(object);
|
||||||
}
|
String url = buildUrl(config.host, config.httpPort, config.apikey, endPoint);
|
||||||
|
logger.trace("Sending {} via {}", json, url);
|
||||||
|
|
||||||
/**
|
return http.put(url, json, config.timeout);
|
||||||
* Return the bridge configuration.
|
|
||||||
*/
|
|
||||||
public DeconzBridgeConfig getBridgeConfig() {
|
|
||||||
return config;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -16,9 +16,10 @@ import java.net.URI;
|
|||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import org.eclipse.jdt.annotation.NonNullByDefault;
|
import org.eclipse.jdt.annotation.NonNullByDefault;
|
||||||
import org.eclipse.jetty.util.thread.QueuedThreadPool;
|
import org.eclipse.jdt.annotation.Nullable;
|
||||||
import org.eclipse.jetty.websocket.api.Session;
|
import org.eclipse.jetty.websocket.api.Session;
|
||||||
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
|
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
|
||||||
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
|
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
|
||||||
@ -43,6 +44,7 @@ import com.google.gson.Gson;
|
|||||||
@WebSocket
|
@WebSocket
|
||||||
@NonNullByDefault
|
@NonNullByDefault
|
||||||
public class WebSocketConnection {
|
public class WebSocketConnection {
|
||||||
|
private static final AtomicInteger INSTANCE_COUNTER = new AtomicInteger();
|
||||||
private final Logger logger = LoggerFactory.getLogger(WebSocketConnection.class);
|
private final Logger logger = LoggerFactory.getLogger(WebSocketConnection.class);
|
||||||
|
|
||||||
private final WebSocketClient client;
|
private final WebSocketClient client;
|
||||||
@ -50,16 +52,17 @@ public class WebSocketConnection {
|
|||||||
private final Gson gson;
|
private final Gson gson;
|
||||||
|
|
||||||
private final WebSocketConnectionListener connectionListener;
|
private final WebSocketConnectionListener connectionListener;
|
||||||
private final Map<Map.Entry<ResourceType, String>, WebSocketMessageListener> listeners = new ConcurrentHashMap<>();
|
private final Map<String, WebSocketMessageListener> listeners = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
private ConnectionState connectionState = ConnectionState.DISCONNECTED;
|
private ConnectionState connectionState = ConnectionState.DISCONNECTED;
|
||||||
|
private @Nullable Session session;
|
||||||
|
|
||||||
public WebSocketConnection(WebSocketConnectionListener listener, WebSocketClient client, Gson gson) {
|
public WebSocketConnection(WebSocketConnectionListener listener, WebSocketClient client, Gson gson) {
|
||||||
this.connectionListener = listener;
|
this.connectionListener = listener;
|
||||||
this.client = client;
|
this.client = client;
|
||||||
this.client.setMaxIdleTimeout(0);
|
this.client.setMaxIdleTimeout(0);
|
||||||
this.gson = gson;
|
this.gson = gson;
|
||||||
this.socketName = ((QueuedThreadPool) client.getExecutor()).getName() + "$" + this.hashCode();
|
this.socketName = "Websocket$" + System.currentTimeMillis() + "-" + INSTANCE_COUNTER.incrementAndGet();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void start(String ip) {
|
public void start(String ip) {
|
||||||
@ -68,6 +71,8 @@ public class WebSocketConnection {
|
|||||||
} else if (connectionState == ConnectionState.CONNECTING) {
|
} else if (connectionState == ConnectionState.CONNECTING) {
|
||||||
logger.debug("{} already connecting", socketName);
|
logger.debug("{} already connecting", socketName);
|
||||||
return;
|
return;
|
||||||
|
} else if (connectionState == ConnectionState.DISCONNECTING) {
|
||||||
|
logger.warn("{} trying to re-connect while still disconnecting", socketName);
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
URI destUri = URI.create("ws://" + ip);
|
URI destUri = URI.create("ws://" + ip);
|
||||||
@ -75,7 +80,7 @@ public class WebSocketConnection {
|
|||||||
logger.debug("Trying to connect {} to {}", socketName, destUri);
|
logger.debug("Trying to connect {} to {}", socketName, destUri);
|
||||||
client.connect(this, destUri).get();
|
client.connect(this, destUri).get();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
connectionListener.connectionError(e);
|
connectionListener.connectionLost("Error while connecting: " + e.getMessage());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -90,33 +95,40 @@ public class WebSocketConnection {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void registerListener(ResourceType resourceType, String sensorID, WebSocketMessageListener listener) {
|
public void registerListener(ResourceType resourceType, String sensorID, WebSocketMessageListener listener) {
|
||||||
listeners.put(Map.entry(resourceType, sensorID), listener);
|
listeners.put(getListenerId(resourceType, sensorID), listener);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void unregisterListener(ResourceType resourceType, String sensorID) {
|
public void unregisterListener(ResourceType resourceType, String sensorID) {
|
||||||
listeners.remove(Map.entry(resourceType, sensorID));
|
listeners.remove(getListenerId(resourceType, sensorID));
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unused")
|
@SuppressWarnings("unused")
|
||||||
@OnWebSocketConnect
|
@OnWebSocketConnect
|
||||||
public void onConnect(Session session) {
|
public void onConnect(Session session) {
|
||||||
connectionState = ConnectionState.CONNECTED;
|
connectionState = ConnectionState.CONNECTED;
|
||||||
logger.debug("{} successfully connected to {}", socketName, session.getRemoteAddress().getAddress());
|
logger.debug("{} successfully connected to {}: {}", socketName, session.getRemoteAddress().getAddress(),
|
||||||
|
session.hashCode());
|
||||||
connectionListener.connectionEstablished();
|
connectionListener.connectionEstablished();
|
||||||
|
this.session = session;
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings({ "null", "unused" })
|
@SuppressWarnings({ "null", "unused" })
|
||||||
@OnWebSocketMessage
|
@OnWebSocketMessage
|
||||||
public void onMessage(String message) {
|
public void onMessage(Session session, String message) {
|
||||||
logger.trace("Raw data received by websocket {}: {}", socketName, message);
|
if (!session.equals(this.session)) {
|
||||||
|
handleWrongSession(session, message);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
logger.trace("{} received raw data: {}", socketName, message);
|
||||||
|
|
||||||
|
try {
|
||||||
DeconzBaseMessage changedMessage = Objects.requireNonNull(gson.fromJson(message, DeconzBaseMessage.class));
|
DeconzBaseMessage changedMessage = Objects.requireNonNull(gson.fromJson(message, DeconzBaseMessage.class));
|
||||||
if (changedMessage.r == ResourceType.UNKNOWN) {
|
if (changedMessage.r == ResourceType.UNKNOWN) {
|
||||||
logger.trace("Received message has unknown resource type. Skipping message.");
|
logger.trace("Received message has unknown resource type. Skipping message.");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
WebSocketMessageListener listener = listeners.get(Map.entry(changedMessage.r, changedMessage.id));
|
WebSocketMessageListener listener = listeners.get(getListenerId(changedMessage.r, changedMessage.id));
|
||||||
if (listener == null) {
|
if (listener == null) {
|
||||||
logger.debug(
|
logger.debug(
|
||||||
"Couldn't find listener for id {} with resource type {}. Either no thing for this id has been defined or this is a bug.",
|
"Couldn't find listener for id {} with resource type {}. Either no thing for this id has been defined or this is a bug.",
|
||||||
@ -126,7 +138,8 @@ public class WebSocketConnection {
|
|||||||
|
|
||||||
Class<? extends DeconzBaseMessage> expectedMessageType = changedMessage.r.getExpectedMessageType();
|
Class<? extends DeconzBaseMessage> expectedMessageType = changedMessage.r.getExpectedMessageType();
|
||||||
if (expectedMessageType == null) {
|
if (expectedMessageType == null) {
|
||||||
logger.warn("BUG! Could not get expected message type for resource type {}. Please report this incident.",
|
logger.warn(
|
||||||
|
"BUG! Could not get expected message type for resource type {}. Please report this incident.",
|
||||||
changedMessage.r);
|
changedMessage.r);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -134,23 +147,52 @@ public class WebSocketConnection {
|
|||||||
DeconzBaseMessage deconzMessage = gson.fromJson(message, expectedMessageType);
|
DeconzBaseMessage deconzMessage = gson.fromJson(message, expectedMessageType);
|
||||||
if (deconzMessage != null) {
|
if (deconzMessage != null) {
|
||||||
listener.messageReceived(changedMessage.id, deconzMessage);
|
listener.messageReceived(changedMessage.id, deconzMessage);
|
||||||
|
|
||||||
|
}
|
||||||
|
} catch (RuntimeException e) {
|
||||||
|
// we need to catch all processing exceptions, otherwise they could affect the connection
|
||||||
|
logger.warn("{} encountered an error while processing the message {}: {}", socketName, message,
|
||||||
|
e.getMessage());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unused")
|
@SuppressWarnings("unused")
|
||||||
@OnWebSocketError
|
@OnWebSocketError
|
||||||
public void onError(Throwable cause) {
|
public void onError(Session session, Throwable cause) {
|
||||||
connectionState = ConnectionState.DISCONNECTED;
|
if (!session.equals(this.session)) {
|
||||||
connectionListener.connectionError(cause);
|
handleWrongSession(session, "Connection error: " + cause.getMessage());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
logger.warn("{} connection errored, closing: {}", socketName, cause.getMessage());
|
||||||
|
|
||||||
|
Session storedSession = this.session;
|
||||||
|
if (storedSession != null && storedSession.isOpen()) {
|
||||||
|
storedSession.close(-1, "Processing error");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unused")
|
@SuppressWarnings("unused")
|
||||||
@OnWebSocketClose
|
@OnWebSocketClose
|
||||||
public void onClose(int statusCode, String reason) {
|
public void onClose(Session session, int statusCode, String reason) {
|
||||||
|
if (!session.equals(this.session)) {
|
||||||
|
handleWrongSession(session, "Connection closed: " + statusCode + " / " + reason);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
logger.trace("{} closed connection: {} / {}", socketName, statusCode, reason);
|
||||||
connectionState = ConnectionState.DISCONNECTED;
|
connectionState = ConnectionState.DISCONNECTED;
|
||||||
|
this.session = null;
|
||||||
connectionListener.connectionLost(reason);
|
connectionListener.connectionLost(reason);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void handleWrongSession(Session session, String message) {
|
||||||
|
logger.warn("{}/{} received and discarded message for other session {}: {}.", socketName, session.hashCode(),
|
||||||
|
session.hashCode(), message);
|
||||||
|
if (session.isOpen()) {
|
||||||
|
// Close the session if it is still open. It should already be closed anyway
|
||||||
|
session.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* check connection state (successfully connected)
|
* check connection state (successfully connected)
|
||||||
*
|
*
|
||||||
@ -160,6 +202,17 @@ public class WebSocketConnection {
|
|||||||
return connectionState == ConnectionState.CONNECTED;
|
return connectionState == ConnectionState.CONNECTED;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* create a unique identifier for a listener
|
||||||
|
*
|
||||||
|
* @param resourceType the listener resource-type (LIGHT, SENSOR, ...)
|
||||||
|
* @param id the listener id (same as deconz-id)
|
||||||
|
* @return a unique string for this listener
|
||||||
|
*/
|
||||||
|
private String getListenerId(ResourceType resourceType, String id) {
|
||||||
|
return resourceType.name() + "$" + id;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* used internally to represent the connection state
|
* used internally to represent the connection state
|
||||||
*/
|
*/
|
||||||
|
|||||||
@ -21,13 +21,6 @@ import org.eclipse.jdt.annotation.NonNullByDefault;
|
|||||||
*/
|
*/
|
||||||
@NonNullByDefault
|
@NonNullByDefault
|
||||||
public interface WebSocketConnectionListener {
|
public interface WebSocketConnectionListener {
|
||||||
/**
|
|
||||||
* An error occurred during connection or while connecting.
|
|
||||||
*
|
|
||||||
* @param e The error
|
|
||||||
*/
|
|
||||||
void connectionError(Throwable e);
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Connection successfully established.
|
* Connection successfully established.
|
||||||
*/
|
*/
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user