[deconz] improve websocket connection process (#9298)

* improve websocket connection

Signed-off-by: Jan N. Klug <jan.n.klug@rub.de>
This commit is contained in:
J-N-K 2020-12-09 23:04:40 +01:00 committed by GitHub
parent 7e9b27ec41
commit 654db27939
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 42 additions and 22 deletions

View File

@ -232,10 +232,11 @@ public class DeconzBridgeHandler extends BaseBridgeHandler implements WebSocketC
/** /**
* Starts the websocket connection. * Starts the websocket connection.
*
* {@link #requestFullState} need to be called first to obtain the websocket port. * {@link #requestFullState} need to be called first to obtain the websocket port.
*/ */
private void startWebsocket() { private void startWebsocket() {
if (websocket.isConnected() || websocketPort == 0) { if (websocket.isConnected() || websocketPort == 0 || websocketReconnect == false) {
return; return;
} }
@ -288,10 +289,8 @@ public class DeconzBridgeHandler extends BaseBridgeHandler implements WebSocketC
} }
stopTimer(); stopTimer();
// Wait for POLL_FREQUENCY_SEC after a connection error before trying again // Wait for POLL_FREQUENCY_SEC after a connection error before trying again
if (websocketReconnect) {
scheduledFuture = scheduler.schedule(this::startWebsocket, POLL_FREQUENCY_SEC, TimeUnit.SECONDS); scheduledFuture = scheduler.schedule(this::startWebsocket, POLL_FREQUENCY_SEC, TimeUnit.SECONDS);
} }
}
@Override @Override
public void connectionEstablished() { public void connectionEstablished() {
@ -302,10 +301,8 @@ public class DeconzBridgeHandler extends BaseBridgeHandler implements WebSocketC
@Override @Override
public void connectionLost(String reason) { public void connectionLost(String reason) {
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, reason); updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, reason);
if (websocketReconnect) {
startWebsocket(); startWebsocket();
} }
}
/** /**
* Return the websocket connection. * Return the websocket connection.

View File

@ -17,6 +17,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import org.eclipse.jdt.annotation.NonNullByDefault; import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose; import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect; import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
@ -51,29 +52,33 @@ public class WebSocketConnection {
private final Logger logger = LoggerFactory.getLogger(WebSocketConnection.class); private final Logger logger = LoggerFactory.getLogger(WebSocketConnection.class);
private final WebSocketClient client; private final WebSocketClient client;
private final String socketName;
private final Gson gson;
private final WebSocketConnectionListener connectionListener; private final WebSocketConnectionListener connectionListener;
private final Map<Map.Entry<ResourceType, String>, WebSocketMessageListener> listeners = new ConcurrentHashMap<>(); private final Map<Map.Entry<ResourceType, String>, WebSocketMessageListener> listeners = new ConcurrentHashMap<>();
private final Gson gson; private ConnectionState connectionState = ConnectionState.DISCONNECTED;
private boolean connected = false;
public WebSocketConnection(WebSocketConnectionListener listener, WebSocketClient client, Gson gson) { public WebSocketConnection(WebSocketConnectionListener listener, WebSocketClient client, Gson gson) {
this.connectionListener = listener; this.connectionListener = listener;
this.client = client; this.client = client;
this.client.setMaxIdleTimeout(0); this.client.setMaxIdleTimeout(0);
this.gson = gson; this.gson = gson;
this.socketName = ((QueuedThreadPool) client.getExecutor()).getName() + "$" + this.hashCode();
} }
public void start(String ip) { public void start(String ip) {
if (connected) { if (connectionState == ConnectionState.CONNECTED) {
return;
} else if (connectionState == ConnectionState.CONNECTING) {
logger.debug("{} already connecting", socketName);
return; return;
} }
try { try {
URI destUri = URI.create("ws://" + ip); URI destUri = URI.create("ws://" + ip);
client.start(); client.start();
logger.debug("Trying to connect {} to {}", socketName, destUri);
logger.debug("Connecting to: {}", destUri);
client.connect(this, destUri).get(); client.connect(this, destUri).get();
} catch (Exception e) { } catch (Exception e) {
connectionListener.connectionError(e); connectionListener.connectionError(e);
@ -82,10 +87,10 @@ public class WebSocketConnection {
public void close() { public void close() {
try { try {
connected = false; connectionState = ConnectionState.DISCONNECTING;
client.stop(); client.stop();
} catch (Exception e) { } catch (Exception e) {
logger.debug("Error while closing connection", e); logger.debug("{} encountered an error while closing connection", socketName, e);
} }
client.destroy(); client.destroy();
} }
@ -98,17 +103,18 @@ public class WebSocketConnection {
listeners.remove(Map.entry(resourceType, sensorID)); listeners.remove(Map.entry(resourceType, sensorID));
} }
@SuppressWarnings("unused")
@OnWebSocketConnect @OnWebSocketConnect
public void onConnect(Session session) { public void onConnect(Session session) {
connected = true; connectionState = ConnectionState.CONNECTED;
logger.debug("Connect: {}", session.getRemoteAddress().getAddress()); logger.debug("{} successfully connected to {}", socketName, session.getRemoteAddress().getAddress());
connectionListener.connectionEstablished(); connectionListener.connectionEstablished();
} }
@SuppressWarnings("null") @SuppressWarnings("null, unused")
@OnWebSocketMessage @OnWebSocketMessage
public void onMessage(String message) { public void onMessage(String message) {
logger.trace("Raw data received by websocket: {}", message); logger.trace("Raw data received by websocket {}: {}", socketName, message);
DeconzBaseMessage changedMessage = gson.fromJson(message, DeconzBaseMessage.class); DeconzBaseMessage changedMessage = gson.fromJson(message, DeconzBaseMessage.class);
if (changedMessage.r == ResourceType.UNKNOWN) { if (changedMessage.r == ResourceType.UNKNOWN) {
@ -137,19 +143,36 @@ public class WebSocketConnection {
} }
} }
@SuppressWarnings("unused")
@OnWebSocketError @OnWebSocketError
public void onError(Throwable cause) { public void onError(Throwable cause) {
connected = false; connectionState = ConnectionState.DISCONNECTED;
connectionListener.connectionError(cause); connectionListener.connectionError(cause);
} }
@SuppressWarnings("unused")
@OnWebSocketClose @OnWebSocketClose
public void onClose(int statusCode, String reason) { public void onClose(int statusCode, String reason) {
connected = false; connectionState = ConnectionState.DISCONNECTED;
connectionListener.connectionLost(reason); connectionListener.connectionLost(reason);
} }
/**
* check connection state (successfully connected)
*
* @return true if connected, false if connecting, disconnecting or disconnected
*/
public boolean isConnected() { public boolean isConnected() {
return connected; return connectionState == ConnectionState.CONNECTED;
}
/**
* used internally to represent the connection state
*/
private enum ConnectionState {
CONNECTING,
CONNECTED,
DISCONNECTING,
DISCONNECTED
} }
} }