Enhancement for websocket re-econnectivity & fix item/channel response (#12191)

Signed-off-by: Stian Kjoglum <stiankj@online.no>
Signed-off-by: kjoglum <stiankj@online.no>
This commit is contained in:
Stian Kjoglum 2022-02-05 11:09:07 +01:00 committed by GitHub
parent 97422175c7
commit 882e52647d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 42 additions and 25 deletions

View File

@ -20,7 +20,6 @@ import java.math.BigDecimal;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.Executor;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -37,7 +36,6 @@ import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest; import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient; import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.openhab.binding.tibber.internal.config.TibberConfiguration; import org.openhab.binding.tibber.internal.config.TibberConfiguration;
import org.openhab.core.common.ThreadPoolManager;
import org.openhab.core.io.net.http.HttpUtil; import org.openhab.core.io.net.http.HttpUtil;
import org.openhab.core.library.types.DateTimeType; import org.openhab.core.library.types.DateTimeType;
import org.openhab.core.library.types.DecimalType; import org.openhab.core.library.types.DecimalType;
@ -69,9 +67,8 @@ public class TibberHandler extends BaseThingHandler {
private static final int REQUEST_TIMEOUT = (int) TimeUnit.SECONDS.toMillis(20); private static final int REQUEST_TIMEOUT = (int) TimeUnit.SECONDS.toMillis(20);
private final Logger logger = LoggerFactory.getLogger(TibberHandler.class); private final Logger logger = LoggerFactory.getLogger(TibberHandler.class);
private final Properties httpHeader = new Properties(); private final Properties httpHeader = new Properties();
private final SslContextFactory sslContextFactory = new SslContextFactory.Client(true);
private final Executor websocketExecutor = ThreadPoolManager.getPool("tibber.websocket");
private TibberConfiguration tibberConfig = new TibberConfiguration(); private TibberConfiguration tibberConfig = new TibberConfiguration();
private @Nullable SslContextFactory sslContextFactory;
private @Nullable TibberWebSocketListener socket; private @Nullable TibberWebSocketListener socket;
private @Nullable Session session; private @Nullable Session session;
private @Nullable WebSocketClient client; private @Nullable WebSocketClient client;
@ -125,10 +122,10 @@ public class TibberHandler extends BaseThingHandler {
.getAsJsonObject("features").get("realTimeConsumptionEnabled").toString(); .getAsJsonObject("features").get("realTimeConsumptionEnabled").toString();
if ("true".equals(rtEnabled)) { if ("true".equals(rtEnabled)) {
logger.info("Pulse associated with HomeId: Live stream will be started"); logger.debug("Pulse associated with HomeId: Live stream will be started");
open(); open();
} else { } else {
logger.info("No Pulse associated with HomeId: No live stream will be started"); logger.debug("No Pulse associated with HomeId: No live stream will be started");
} }
} else { } else {
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_ERROR, updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_ERROR,
@ -163,7 +160,8 @@ public class TibberHandler extends BaseThingHandler {
updateState(CURRENT_TOTAL, new DecimalType(myObject.get("total").toString())); updateState(CURRENT_TOTAL, new DecimalType(myObject.get("total").toString()));
String timestamp = myObject.get("startsAt").toString().substring(1, 20); String timestamp = myObject.get("startsAt").toString().substring(1, 20);
updateState(CURRENT_STARTSAT, new DateTimeType(timestamp)); updateState(CURRENT_STARTSAT, new DateTimeType(timestamp));
updateState(CURRENT_LEVEL, new StringType(myObject.get("level").toString())); updateState(CURRENT_LEVEL,
new StringType(myObject.get("level").toString().replaceAll("^\"|\"$", "")));
} catch (JsonSyntaxException e) { } catch (JsonSyntaxException e) {
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
@ -246,7 +244,7 @@ public class TibberHandler extends BaseThingHandler {
public void updateRequest() throws IOException { public void updateRequest() throws IOException {
getURLInput(BASE_URL); getURLInput(BASE_URL);
if ("true".equals(rtEnabled) && !isConnected()) { if ("true".equals(rtEnabled) && !isConnected()) {
logger.info("Attempting to reopen Websocket connection"); logger.debug("Attempting to reopen Websocket connection");
open(); open();
} }
} }
@ -288,12 +286,12 @@ public class TibberHandler extends BaseThingHandler {
WebSocketClient client = this.client; WebSocketClient client = this.client;
if (client != null) { if (client != null) {
try { try {
logger.warn("Stopping and Terminating Websocket connection"); logger.debug("DISPOSE - Stopping and Terminating Websocket connection");
client.stop(); client.stop();
client.destroy();
} catch (Exception e) { } catch (Exception e) {
logger.warn("Websocket Client Stop Exception: {}", e.getMessage()); logger.warn("Websocket Client Stop Exception: {}", e.getMessage());
} }
client.destroy();
this.client = null; this.client = null;
} }
} }
@ -302,23 +300,26 @@ public class TibberHandler extends BaseThingHandler {
public void open() { public void open() {
WebSocketClient client = this.client; WebSocketClient client = this.client;
if (client == null || !client.isRunning()) { if (client == null || !client.isRunning() || !isConnected()) {
if (client != null) { if (client != null) {
try { try {
client.stop(); client.stop();
} catch (Exception e) { } catch (Exception e) {
logger.warn("Failed to stop websocket client: {}", e.getMessage()); logger.warn("OPEN FRAME - Failed to stop websocket client: {}", e.getMessage());
} }
client.destroy();
} }
sslContextFactory = new SslContextFactory.Client(true);
sslContextFactory.setTrustAll(true); sslContextFactory.setTrustAll(true);
sslContextFactory.setEndpointIdentificationAlgorithm(null); sslContextFactory.setEndpointIdentificationAlgorithm(null);
client = new WebSocketClient(sslContextFactory, websocketExecutor); client = new WebSocketClient(sslContextFactory);
client.setMaxIdleTimeout(600 * 1000); client.setMaxIdleTimeout(30 * 1000);
this.client = client; this.client = client;
TibberWebSocketListener socket = this.socket; TibberWebSocketListener socket = this.socket;
if (socket == null) { if (socket == null) {
logger.debug("New socket being created");
socket = new TibberWebSocketListener(); socket = new TibberWebSocketListener();
this.socket = socket; this.socket = socket;
} }
@ -328,14 +329,25 @@ public class TibberHandler extends BaseThingHandler {
newRequest.setSubProtocols("graphql-subscriptions"); newRequest.setSubProtocols("graphql-subscriptions");
try { try {
logger.info("Starting Websocket connection"); logger.debug("Starting Websocket connection");
client.start(); client.start();
} catch (Exception e) { } catch (Exception e) {
logger.warn("Websocket Start Exception: {}", e.getMessage()); logger.warn("Websocket Start Exception: {}", e.getMessage());
} }
try { try {
logger.info("Connecting Websocket connection"); logger.debug("Connecting Websocket connection");
sessionFuture = client.connect(socket, new URI(SUBSCRIPTION_URL), newRequest); sessionFuture = client.connect(socket, new URI(SUBSCRIPTION_URL), newRequest);
try {
Thread.sleep(10 * 1000);
} catch (InterruptedException e) {
}
Session session = this.session;
if (!session.isOpen()) {
close();
logger.warn("Unable to establish websocket session");
} else {
logger.debug("Websocket session established");
}
} catch (IOException e) { } catch (IOException e) {
logger.warn("Websocket Connect Exception: {}", e.getMessage()); logger.warn("Websocket Connect Exception: {}", e.getMessage());
} catch (URISyntaxException e) { } catch (URISyntaxException e) {
@ -353,15 +365,19 @@ public class TibberHandler extends BaseThingHandler {
try { try {
TibberWebSocketListener socket = this.socket; TibberWebSocketListener socket = this.socket;
if (socket != null) { if (socket != null) {
logger.info("Sending websocket disconnect message"); logger.debug("Sending websocket disconnect message");
socket.sendMessage(disconnect); socket.sendMessage(disconnect);
} else { } else {
logger.debug("Socket unable to send disconnect message: Socket is null"); logger.warn("Socket unable to send disconnect message: Socket is null");
} }
} catch (IOException e) { } catch (IOException e) {
logger.warn("Websocket Close Exception: {}", e.getMessage()); logger.warn("Websocket Close Exception: {}", e.getMessage());
} }
session.close(); try {
session.close();
} catch (Exception e) {
logger.warn("Unable to disconnect session");
}
this.session = null; this.session = null;
this.socket = null; this.socket = null;
} }
@ -374,8 +390,9 @@ public class TibberHandler extends BaseThingHandler {
try { try {
client.stop(); client.stop();
} catch (Exception e) { } catch (Exception e) {
logger.warn("Failed to stop websocket client: {}", e.getMessage()); logger.warn("CLOSE FRAME - Failed to stop websocket client: {}", e.getMessage());
} }
client.destroy();
} }
} }
@ -395,7 +412,7 @@ public class TibberHandler extends BaseThingHandler {
String connection = "{\"type\":\"connection_init\", \"payload\":\"token=" + tibberConfig.getToken() + "\"}"; String connection = "{\"type\":\"connection_init\", \"payload\":\"token=" + tibberConfig.getToken() + "\"}";
try { try {
if (socket != null) { if (socket != null) {
logger.info("Sending websocket connect message"); logger.debug("Sending websocket connect message");
socket.sendMessage(connection); socket.sendMessage(connection);
} else { } else {
logger.debug("Socket unable to send connect message: Socket is null"); logger.debug("Socket unable to send connect message: Socket is null");
@ -407,11 +424,11 @@ public class TibberHandler extends BaseThingHandler {
@OnWebSocketClose @OnWebSocketClose
public void onClose(int statusCode, String reason) { public void onClose(int statusCode, String reason) {
logger.info("Closing a WebSocket due to {}", reason); logger.debug("Closing a WebSocket due to {}", reason);
WebSocketClient client = TibberHandler.this.client; WebSocketClient client = TibberHandler.this.client;
if (client != null && client.isRunning()) { if (client != null && client.isRunning()) {
try { try {
logger.info("Stopping and Terminating Websocket connection"); logger.debug("ONCLOSE - Stopping and Terminating Websocket connection");
client.stop(); client.stop();
} catch (Exception e) { } catch (Exception e) {
logger.warn("Websocket Client Stop Exception: {}", e.getMessage()); logger.warn("Websocket Client Stop Exception: {}", e.getMessage());
@ -423,13 +440,13 @@ public class TibberHandler extends BaseThingHandler {
public void onWebSocketError(Throwable e) { public void onWebSocketError(Throwable e) {
String message = e.getMessage(); String message = e.getMessage();
logger.debug("Error during websocket communication: {}", message); logger.debug("Error during websocket communication: {}", message);
onClose(0, message != null ? message : "null"); close();
} }
@OnWebSocketMessage @OnWebSocketMessage
public void onMessage(String message) { public void onMessage(String message) {
if (message.contains("connection_ack")) { if (message.contains("connection_ack")) {
logger.info("Connected to Server"); logger.debug("Connected to Server");
startSubscription(); startSubscription();
} else if (message.contains("error") || message.contains("terminate")) { } else if (message.contains("error") || message.contains("terminate")) {
logger.debug("Error/terminate received from server: {}", message); logger.debug("Error/terminate received from server: {}", message);