[openthermgateway] Various improvements (#12507)

* [openthermgateway] various tweaks

Signed-off-by: Andrew Fiddian-Green <software@whitebear.ch>
This commit is contained in:
Andrew Fiddian-Green 2022-03-28 20:40:35 +01:00 committed by GitHub
parent bd1f5f8bc4
commit 4f4dfcca20
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 171 additions and 53 deletions

View File

@ -50,6 +50,8 @@ import org.slf4j.LoggerFactory;
*/ */
@NonNullByDefault @NonNullByDefault
public class OpenThermGatewayHandler extends BaseBridgeHandler implements OpenThermGatewayCallback { public class OpenThermGatewayHandler extends BaseBridgeHandler implements OpenThermGatewayCallback {
private static final String PROPERTY_GATEWAY_ID_NAME = "gatewayId";
private static final String PROPERTY_GATEWAY_ID_TAG = "PR: A=";
private final Logger logger = LoggerFactory.getLogger(OpenThermGatewayHandler.class); private final Logger logger = LoggerFactory.getLogger(OpenThermGatewayHandler.class);
@ -181,6 +183,18 @@ public class OpenThermGatewayHandler extends BaseBridgeHandler implements OpenTh
} }
} }
@Override
public void receiveAcknowledgement(String message) {
scheduler.submit(() -> receiveAcknowledgementTask(message));
}
private void receiveAcknowledgementTask(String message) {
if (message.startsWith(PROPERTY_GATEWAY_ID_TAG)) {
getThing().setProperty(PROPERTY_GATEWAY_ID_NAME,
message.substring(PROPERTY_GATEWAY_ID_TAG.length()).strip());
}
}
@Override @Override
public void handleRemoval() { public void handleRemoval() {
logger.debug("Removing OpenThermGateway handler"); logger.debug("Removing OpenThermGateway handler");

View File

@ -26,4 +26,6 @@ public interface OpenThermGatewayCallback {
void connectionStateChanged(ConnectionState state); void connectionStateChanged(ConnectionState state);
void receiveMessage(Message message); void receiveMessage(Message message);
void receiveAcknowledgement(String message);
} }

View File

@ -20,10 +20,9 @@ import java.io.InputStreamReader;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.Socket; import java.net.Socket;
import java.util.AbstractMap; import java.time.Instant;
import java.util.Map; import java.util.ArrayDeque;
import java.util.Map.Entry; import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@ -42,11 +41,15 @@ import org.slf4j.LoggerFactory;
* *
* @author Arjen Korevaar - Initial contribution * @author Arjen Korevaar - Initial contribution
* @author Arjan Mels - Improved robustness by re-sending commands, handling all message types (not only Boiler) * @author Arjan Mels - Improved robustness by re-sending commands, handling all message types (not only Boiler)
* @author Andrew Fiddian-Green - Improve thread interruption, socket timeouts, exception handling, FIFO command queue
*/ */
@NonNullByDefault @NonNullByDefault
public class OpenThermGatewaySocketConnector implements OpenThermGatewayConnector { public class OpenThermGatewaySocketConnector implements OpenThermGatewayConnector {
private static final int COMMAND_RESPONSE_MIN_WAIT_TIME_MILLISECONDS = 100; private static final int COMMAND_RESPONSE_MIN_WAIT_TIME_MILLISECONDS = 100;
private static final int COMMAND_RESPONSE_MAX_WAIT_TIME_MILLISECONDS = 5000; private static final int COMMAND_RESPONSE_MAX_WAIT_TIME_MILLISECONDS = 5000;
private static final int MAXIMUM_FIFO_BUFFER_SIZE = 20;
private static final String WDT_RESET_RESPONSE_MESSAGE = "WDT reset";
private final Logger logger = LoggerFactory.getLogger(OpenThermGatewaySocketConnector.class); private final Logger logger = LoggerFactory.getLogger(OpenThermGatewaySocketConnector.class);
@ -61,7 +64,56 @@ public class OpenThermGatewaySocketConnector implements OpenThermGatewayConnecto
private @Nullable Future<Boolean> future; private @Nullable Future<Boolean> future;
private @Nullable ExecutorService executor; private @Nullable ExecutorService executor;
private Map<String, Entry<Long, GatewayCommand>> pendingCommands = new ConcurrentHashMap<>(); /**
* FIFO queue of commands that are pending being sent to the gateway. That is commands that are either not yet sent,
* or sent but not yet acknowledged and pending possible re-sending.
*
* Note: we must use 'synchronized' when accessing this object to ensure proper thread safety.
*/
private final Queue<PendingCommand> pendingCommands = new ArrayDeque<>(MAXIMUM_FIFO_BUFFER_SIZE);
/**
* Wrapper for a command entry in the pending command FIFO queue.
*
* @author AndrewFG - initial contribution
*/
private class PendingCommand {
protected final GatewayCommand command;
protected final Instant expiryTime = Instant.now().plusMillis(COMMAND_RESPONSE_MAX_WAIT_TIME_MILLISECONDS);
protected Instant sentTime = Instant.MIN;
protected PendingCommand(GatewayCommand command) {
this.command = command;
}
/**
* Check if the command has been sent to the gateway.
*
* @return true if it has been sent
*/
protected boolean sent() {
return Instant.MIN.isBefore(sentTime);
}
/**
* Check if the command is ready to send (or re-send) to the gateway.
*
* @return true if the command has either not been sent, or sent but not acknowledged within due time i.e. it
* needs to be re-sent
*/
protected boolean readyToSend() {
return sentTime.isBefore(Instant.now().minusMillis(COMMAND_RESPONSE_MIN_WAIT_TIME_MILLISECONDS));
}
/**
* Check if the command has expired.
*
* @return true if the expiry time has expired
*/
protected boolean expired() {
return Instant.now().isAfter(expiryTime);
}
}
public OpenThermGatewaySocketConnector(OpenThermGatewayCallback callback, OpenThermGatewayConfiguration config) { public OpenThermGatewaySocketConnector(OpenThermGatewayCallback callback, OpenThermGatewayConfiguration config) {
this.callback = callback; this.callback = callback;
@ -98,12 +150,15 @@ public class OpenThermGatewaySocketConnector implements OpenThermGatewayConnecto
String message = reader.readLine(); String message = reader.readLine();
if (message != null) { if (message != null) {
logger.trace("Read: {}", message);
handleMessage(message); handleMessage(message);
} else { } else {
logger.debug("Received NULL message from OpenTherm Gateway (EOF)"); logger.debug("Received NULL message from OpenTherm Gateway (EOF)");
break; break;
} }
} }
// disable reporting every message (for cleaner re-starting)
sendCommand(GatewayCommand.parse(GatewayCommandCode.PRINTSUMMARY, "1"));
} catch (IOException ex) { } catch (IOException ex) {
logger.warn("Error communicating with OpenTherm Gateway: '{}'", ex.getMessage()); logger.warn("Error communicating with OpenTherm Gateway: '{}'", ex.getMessage());
} }
@ -162,61 +217,108 @@ public class OpenThermGatewaySocketConnector implements OpenThermGatewayConnecto
} }
@Override @Override
public synchronized void sendCommand(GatewayCommand command) { public void sendCommand(GatewayCommand command) {
PrintWriter wrt = writer; synchronized (pendingCommands) {
// append the command to the end of the FIFO queue
pendingCommands.put(command.getCode(), if (MAXIMUM_FIFO_BUFFER_SIZE < pendingCommands.size()
new AbstractMap.SimpleImmutableEntry<>(System.currentTimeMillis(), command)); || !pendingCommands.offer(new PendingCommand(command))) {
logger.warn("Command refused: FIFO buffer overrun => PLEASE REPORT !!");
String msg = command.toFullString();
if (isConnected() && (wrt != null)) {
logger.debug("Sending message: {}", msg);
wrt.print(msg + "\r\n");
wrt.flush();
if (wrt.checkError()) {
logger.warn("sendCommand() error sending message to OpenTherm Gateway => PLEASE REPORT !!");
stop();
} }
// send the FIFO head command, which may or may not be the one just added
pendingCommandsSendHeadCommandIfReady();
} // release the pendingCommands lock
}
/**
* Process the incoming message. Remove any expired commands from the queue. Check if the incoming message is an
* acknowledgement. If it is the acknowledgement for the FIFO head command, remove it from the queue. Try to send
* the (next) FIFO head command, if it exists, and is ready to send. And finally if the message is not an
* acknowledgement, check if it is a valid message, and if so, pass it to the gateway Thing handler for processing.
*
* @param message the incoming message received from the gateway
*/
private void handleMessage(String message) {
// check if the message is a command acknowledgement e.g. having the form "XX: yyy"
boolean isCommandAcknowledgement = (message.length() > 2) && (message.charAt(2) == ':');
synchronized (pendingCommands) {
// remove all expired commands
pendingCommandsRemoveAllExpiredCommands();
// if acknowledgement is for the FIFO head command, remove it from the queue
if (isCommandAcknowledgement) {
pendingCommandsRemoveHeadCommandIfAcknowledgement(message);
}
// (re-)send the FIFO head command, if it exists and is ready to send
pendingCommandsSendHeadCommandIfReady();
} // release the pendingCommands lock
if (isCommandAcknowledgement) {
callback.receiveAcknowledgement(message);
} else if (message.startsWith(WDT_RESET_RESPONSE_MESSAGE)) {
logger.warn("OpenTherm Gateway was reset by its Watch-Dog Timer!");
} else { } else {
logger.debug("Unable to send message: {}. OpenThermGatewaySocketConnector is not connected.", msg); Message msg = Message.parse(message);
// ignore and log bad messages
if (msg == null) {
logger.debug("Received message: {}, (unknown)", message);
return;
}
// pass good messages to the Thing handler for processing
if (msg.getMessageType() == MessageType.READACK || msg.getMessageType() == MessageType.WRITEDATA
|| msg.getID() == 0 || msg.getID() == 1) {
callback.receiveMessage(msg);
}
} }
} }
private void handleMessage(String message) { /**
if (message.length() > 2 && message.charAt(2) == ':') { * If there is a FIFO head command that is ready to (re-)send, then send it.
String code = message.substring(0, 2); */
String value = message.substring(3); private void pendingCommandsSendHeadCommandIfReady() {
// process the command at the head of the queue
PendingCommand headCommand = pendingCommands.peek();
if (headCommand != null && headCommand.readyToSend()) {
String message = headCommand.command.toFullString();
logger.debug("Received command confirmation: {}: {}", code, value); // transmit the command string
pendingCommands.remove(code); PrintWriter writer = this.writer;
return; if (isConnected() && (writer != null)) {
} writer.print(message + "\r\n");
writer.flush();
long currentTime = System.currentTimeMillis(); if (writer.checkError()) {
logger.warn("Error sending command to OpenTherm Gateway => PLEASE REPORT !!");
for (Entry<Long, GatewayCommand> timeAndCommand : pendingCommands.values()) { stop();
long responseTime = timeAndCommand.getKey() + COMMAND_RESPONSE_MIN_WAIT_TIME_MILLISECONDS; }
long timeoutTime = timeAndCommand.getKey() + COMMAND_RESPONSE_MAX_WAIT_TIME_MILLISECONDS; if (logger.isTraceEnabled()) {
logger.trace("Sent: {}{}", message, headCommand.sent() ? " (repeat)" : "");
if (currentTime > responseTime && currentTime <= timeoutTime) { }
logger.debug("Resending command: {}", timeAndCommand.getValue()); headCommand.sentTime = Instant.now();
sendCommand(timeAndCommand.getValue()); } else {
} else if (currentTime > timeoutTime) { logger.debug("Unable to send command: {}. OpenThermGatewaySocketConnector is not connected.", message);
pendingCommands.remove(timeAndCommand.getValue().getCode());
} }
} }
}
Message msg = Message.parse(message); /**
* If the acknowledgement message corresponds to the FIFO head command then remove it from the queue.
*
* @param message must be an acknowledgement message in the form "XX: yyy"
*/
private void pendingCommandsRemoveHeadCommandIfAcknowledgement(String message) {
PendingCommand headCommand = pendingCommands.peek();
if (headCommand != null && headCommand.command.getCode().equals(message.substring(0, 2))) {
pendingCommands.poll();
}
}
if (msg == null) { /**
logger.trace("Received message: {}, (unknown)", message); * Remove all expired commands from the queue.
return; */
} private void pendingCommandsRemoveAllExpiredCommands() {
logger.trace("Received message: {}, {} {} {}", message, msg.getID(), msg.getCodeType(), msg.getMessageType()); pendingCommands.removeIf(pendingCommand -> pendingCommand.expired());
if (msg.getMessageType() == MessageType.READACK || msg.getMessageType() == MessageType.WRITEDATA
|| msg.getID() == 0 || msg.getID() == 1) {
callback.receiveMessage(msg);
}
} }
} }

View File

@ -10,7 +10,7 @@
<channel id="sendcommand" typeId="sendcommand"/> <channel id="sendcommand" typeId="sendcommand"/>
</channels> </channels>
<properties> <properties>
<property name="version">2.2.0</property> <property name="version">2.2.1</property>
</properties> </properties>
<config-description-ref uri="thing-type:openthermgateway:openthermgateway"/> <config-description-ref uri="thing-type:openthermgateway:openthermgateway"/>
</bridge-type> </bridge-type>