[velux] Stability checks and improvements in slip io (#10119)

This commit is contained in:
Andrew Fiddian-Green 2021-03-26 16:13:37 +00:00 committed by GitHub
parent 6e1119a49a
commit 361a6726fc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 47 additions and 22 deletions

View File

@ -52,7 +52,7 @@ In addition there are some optional Configuration Parameters.
|-------------------------|------------------|:--------:|--------------------------------------------------------------| |-------------------------|------------------|:--------:|--------------------------------------------------------------|
| ipAddress | | Yes | Hostname or address for accessing the Velux Bridge. | | ipAddress | | Yes | Hostname or address for accessing the Velux Bridge. |
| password | velux123 | Yes | Password for authentication against the Velux Bridge.(\*\*) | | password | velux123 | Yes | Password for authentication against the Velux Bridge.(\*\*) |
| timeoutMsecs | 2000 | No | Communication timeout in milliseconds. | | timeoutMsecs | 3000 | No | Communication timeout in milliseconds. |
| protocol | slip | No | Underlying communication protocol (http/https/slip). | | protocol | slip | No | Underlying communication protocol (http/https/slip). |
| tcpPort | 51200 | No | TCP port (80 or 51200) for accessing the Velux Bridge. | | tcpPort | 51200 | No | TCP port (80 or 51200) for accessing the Velux Bridge. |
| retries | 5 | No | Number of retries during I/O. | | retries | 5 | No | Number of retries during I/O. |

View File

@ -21,8 +21,10 @@ import java.util.NoSuchElementException;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.eclipse.jdt.annotation.NonNullByDefault; import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable; import org.eclipse.jdt.annotation.Nullable;
@ -49,6 +51,7 @@ class DataInputStreamWithTimeout implements Closeable {
// special character that marks the first and last byte of a slip message // special character that marks the first and last byte of a slip message
private static final byte SLIP_MARK = (byte) 0xc0; private static final byte SLIP_MARK = (byte) 0xc0;
private static final byte SLIP_PROT = 0;
private final Logger logger = LoggerFactory.getLogger(DataInputStreamWithTimeout.class); private final Logger logger = LoggerFactory.getLogger(DataInputStreamWithTimeout.class);
@ -63,9 +66,19 @@ class DataInputStreamWithTimeout implements Closeable {
private class Poller implements Callable<Boolean> { private class Poller implements Callable<Boolean> {
private boolean interrupted = false; private boolean interrupted = false;
private Future<Boolean> pollerFinished;
public Poller(ExecutorService executor) {
logger.trace("Poller: created");
pollerFinished = executor.submit(this);
}
public void interrupt() { public void interrupt() {
interrupted = true; interrupted = true;
try {
pollerFinished.get();
} catch (InterruptedException | ExecutionException e) {
}
} }
/** /**
@ -75,34 +88,47 @@ class DataInputStreamWithTimeout implements Closeable {
*/ */
@Override @Override
public Boolean call() throws Exception { public Boolean call() throws Exception {
logger.trace("Poller.call(): started");
byte[] buf = new byte[BUFFER_SIZE]; byte[] buf = new byte[BUFFER_SIZE];
byte byt; int byt;
int i = 0; int i = 0;
// clean start, no exception, empty queue // clean start, no exception, empty queue
pollException = null; pollException = null;
slipMessageQueue.clear(); slipMessageQueue.clear();
// loop forever or until internally or externally interrupted // loop forever or until externally interrupted
while ((!interrupted) && (!Thread.interrupted())) { while (!Thread.interrupted()) {
try { try {
buf[i] = byt = (byte) inputStream.read(); if (interrupted) {
if (byt == SLIP_MARK) { // fully flush the input buffer
if (i > 0) { inputStream.readAllBytes();
// the minimal slip message is 7 bytes [MM PP LL CC CC KK MM] break;
if ((i > 5) && (buf[0] == SLIP_MARK)) { }
slipMessageQueue.offer(Arrays.copyOfRange(buf, 0, i + 1)); byt = inputStream.read();
if (slipMessageQueue.size() > QUEUE_SIZE) { if (byt < 0) {
logger.warn("pollRunner() => slip message queue overflow => PLEASE REPORT !!"); // end of stream is OK => keep on polling
slipMessageQueue.poll(); continue;
} }
buf[i] = (byte) byt;
if ((i > 0) && (buf[i] == SLIP_MARK)) {
// the minimal slip message is 7 bytes [MM PP LL CC CC KK MM]
if ((i > 5) && (buf[0] == SLIP_MARK) && (buf[1] == SLIP_PROT)) {
slipMessageQueue.offer(Arrays.copyOfRange(buf, 0, i + 1));
if (slipMessageQueue.size() > QUEUE_SIZE) {
logger.warn("Poller.call(): slip message queue overflow => PLEASE REPORT !!");
slipMessageQueue.poll();
} }
i = 0; i = 0;
} else {
logger.warn("Poller.call(): non slip messsage discarded => PLEASE REPORT !!");
buf[0] = SLIP_MARK; buf[0] = SLIP_MARK;
continue; i = 1;
} }
continue;
} }
if (++i >= BUFFER_SIZE) { if (++i >= BUFFER_SIZE) {
logger.warn("Poller.call(): input buffer overrun => PLEASE REPORT !!");
i = 0; i = 0;
} }
} catch (SocketTimeoutException e) { } catch (SocketTimeoutException e) {
@ -112,11 +138,12 @@ class DataInputStreamWithTimeout implements Closeable {
// any other exception => stop polling // any other exception => stop polling
String msg = e.getMessage(); String msg = e.getMessage();
pollException = msg != null ? msg : "Generic IOException"; pollException = msg != null ? msg : "Generic IOException";
logger.debug("pollRunner() stopping '{}'", pollException); logger.debug("Poller.call(): stopping '{}'", pollException);
break; break;
} }
} }
logger.trace("Poller.call(): ended");
// we only get here if shutdown or an error occurs so free ourself so we can be recreated again // we only get here if shutdown or an error occurs so free ourself so we can be recreated again
pollRunner = null; pollRunner = null;
return true; return true;
@ -210,11 +237,9 @@ class DataInputStreamWithTimeout implements Closeable {
* Start the polling task * Start the polling task
*/ */
private void startPolling() { private void startPolling() {
Poller pollRunner = this.pollRunner;
if (pollRunner == null) { if (pollRunner == null) {
logger.trace("startPolling()"); logger.trace("startPolling()");
pollRunner = this.pollRunner = new Poller(); pollRunner = new Poller(executor);
executor.submit(pollRunner);
} }
} }
@ -226,7 +251,6 @@ class DataInputStreamWithTimeout implements Closeable {
if (pollRunner != null) { if (pollRunner != null) {
logger.trace("stopPolling()"); logger.trace("stopPolling()");
pollRunner.interrupt(); pollRunner.interrupt();
this.pollRunner = null;
} }
executor.shutdown(); executor.shutdown();
} }

View File

@ -39,10 +39,11 @@
<!-- Velux Bridge factory default --> <!-- Velux Bridge factory default -->
<default>velux123</default> <default>velux123</default>
</parameter> </parameter>
<parameter name="timeoutMsecs" type="integer" min="500" step="1" max="5000" required="false"> <parameter name="timeoutMsecs" type="integer" min="500" step="1" max="10000">
<label>@text/config.velux.bridge.timeoutMsecs.label</label> <label>@text/config.velux.bridge.timeoutMsecs.label</label>
<description>@text/config.velux.bridge.timeoutMsecs.description</description> <description>@text/config.velux.bridge.timeoutMsecs.description</description>
<default>2000</default> <required>false</required>
<default>3000</default>
<advanced>true</advanced> <advanced>true</advanced>
</parameter> </parameter>
<parameter name="retries" type="integer" min="0" step="1" max="10" required="false"> <parameter name="retries" type="integer" min="0" step="1" max="10" required="false">