[velux] Improve shutdown exception handling (#12356)

* [velux] add isDisposing flag to accelerate shutdown

Signed-off-by: Andrew Fiddian-Green <software@whitebear.ch>

* [velux] refactor Poller into a separate class

Signed-off-by: Andrew Fiddian-Green <software@whitebear.ch>

* [velux] use new Poller class; fix startup, shutdown, and exception code

Signed-off-by: Andrew Fiddian-Green <software@whitebear.ch>

* [velux] demote confusing log message

Signed-off-by: Andrew Fiddian-Green <software@whitebear.ch>

* [velux] slightly more elegant interrupt flag set / check

Signed-off-by: Andrew Fiddian-Green <software@whitebear.ch>
This commit is contained in:
Andrew Fiddian-Green 2022-03-19 14:00:30 +00:00 committed by GitHub
parent a6f5b48dd5
commit e9cb9c30d0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 204 additions and 145 deletions

View File

@ -147,6 +147,9 @@ public class Connection implements Closeable {
} catch (IOException ioe) {
logger.info("io() on {}: Exception occurred during I/O: {}.", host, ioe.getMessage());
lastIOE = ioe;
if (bridgeInstance.isDisposing()) {
break;
} else {
// Error Retries with Exponential Backoff
long waitTime = ((long) Math.pow(2, retryCount)
* bridgeInstance.veluxBridgeConfiguration().timeoutMsecs);
@ -157,6 +160,7 @@ public class Connection implements Closeable {
logger.trace("io() on {}: wait interrupted.", host);
}
}
}
} while (retryCount++ < bridgeInstance.veluxBridgeConfiguration().retries);
if (retryCount >= bridgeInstance.veluxBridgeConfiguration().retries) {
logger.info("io() on {}: socket I/O failed {} times.", host,

View File

@ -15,16 +15,15 @@ package org.openhab.binding.velux.internal.bridge.slip.io;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.net.SocketTimeoutException;
import java.util.Arrays;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
@ -45,141 +44,34 @@ import org.slf4j.LoggerFactory;
@NonNullByDefault
class DataInputStreamWithTimeout implements Closeable {
private static final int QUEUE_SIZE = 512;
private static final int BUFFER_SIZE = 512;
private static final int SLEEP_INTERVAL_MSECS = 50;
// special character that marks the first and last byte of a slip message
private static final byte SLIP_MARK = (byte) 0xc0;
private static final byte SLIP_PROT = 0;
private static final long MAX_WAIT_SECONDS = 15;
private final Logger logger = LoggerFactory.getLogger(DataInputStreamWithTimeout.class);
private final Queue<byte[]> slipMessageQueue = new ConcurrentLinkedQueue<>();
private final InputStream inputStream;
private final VeluxBridgeHandler bridge;
private InputStream inputStream;
private @Nullable String pollException = null;
private @Nullable Poller pollRunner = null;
private ExecutorService executor;
private class Poller implements Callable<Boolean> {
private boolean interrupted = false;
private Future<Boolean> pollerFinished;
public Poller(ExecutorService executor) {
logger.trace("Poller: created");
pollerFinished = executor.submit(this);
}
public void interrupt() {
interrupted = true;
try {
pollerFinished.get();
} catch (InterruptedException | ExecutionException e) {
}
}
/**
* Task that loops to read bytes from {@link InputStream} and build SLIP packets from them. The SLIP packets are
* placed in a {@link ConcurrentLinkedQueue}. It loops continuously until 'interrupt()' or 'Thread.interrupt()'
* are called when terminates early after the next socket read timeout.
*/
@Override
public Boolean call() throws Exception {
logger.trace("Poller.call(): started");
byte[] buf = new byte[BUFFER_SIZE];
int byt;
int i = 0;
// clean start, no exception, empty queue
pollException = null;
slipMessageQueue.clear();
// loop forever; on shutdown interrupt() gets called to break out of the loop
while (true) {
try {
if (interrupted) {
// fully flush the input buffer
inputStream.readAllBytes();
break;
}
byt = inputStream.read();
if (byt < 0) {
// end of stream is OK => keep on polling
continue;
}
buf[i] = (byte) byt;
if ((i > 0) && (buf[i] == SLIP_MARK)) {
// the minimal slip message is 7 bytes [MM PP LL CC CC KK MM]
if ((i > 5) && (buf[0] == SLIP_MARK) && (buf[1] == SLIP_PROT)) {
slipMessageQueue.offer(Arrays.copyOfRange(buf, 0, i + 1));
if (slipMessageQueue.size() > QUEUE_SIZE) {
logger.warn("Poller.call(): slip message queue overflow => PLEASE REPORT !!");
slipMessageQueue.poll();
}
i = 0;
} else {
logger.warn("Poller.call(): non slip messsage discarded => PLEASE REPORT !!");
buf[0] = SLIP_MARK;
i = 1;
}
continue;
}
if (++i >= BUFFER_SIZE) {
logger.warn("Poller.call(): input buffer overrun => PLEASE REPORT !!");
i = 0;
}
} catch (SocketTimeoutException e) {
// socket read time outs are OK => keep on polling; unless interrupted
if (interrupted) {
break;
}
continue;
} catch (IOException e) {
// any other exception => stop polling
String msg = e.getMessage();
pollException = msg != null ? msg : "Generic IOException";
logger.debug("Poller.call(): stopping '{}'", pollException);
break;
}
}
logger.trace("Poller.call(): ended");
// we only get here if shutdown or an error occurs so free ourself so we can be recreated again
pollRunner = null;
return true;
}
}
/**
* Check if there was an exception on the polling loop task and if so, throw it back on the caller thread.
*
* @throws IOException
*/
private void throwIfPollException() throws IOException {
if (pollException != null) {
logger.debug("passPollException() polling loop exception {}", pollException);
throw new IOException(pollException);
}
}
private @Nullable Poller poller;
private @Nullable Future<Boolean> future;
private @Nullable ExecutorService executor;
/**
* Creates a {@link DataInputStreamWithTimeout} as a wrapper around the specified underlying {@link InputStream}
*
* @param stream the specified input stream
* @param inputStream the specified input stream
* @param bridge the actual Bridge Thing instance
*/
public DataInputStreamWithTimeout(InputStream stream, VeluxBridgeHandler bridge) {
inputStream = stream;
executor = Executors.newSingleThreadExecutor(bridge.getThreadFactory());
public DataInputStreamWithTimeout(InputStream inputStream, VeluxBridgeHandler bridge) {
this.inputStream = inputStream;
this.bridge = bridge;
}
/**
* Overridden method of {@link Closeable} interface. Stops the polling thread.
* Overridden method of {@link Closeable} interface. Stops the polling task.
*
* @throws IOException
* @throws IOException (although actually no exceptions are thrown)
*/
@Override
public void close() throws IOException {
@ -192,7 +84,8 @@ class DataInputStreamWithTimeout implements Closeable {
*
* @param timeoutMSecs the timeout period in milliseconds.
* @return the next SLIP message if there is one on the queue, or any empty byte[] array if not.
* @throws IOException
* @throws IOException if the poller task has unexpectedly terminated e.g. via an IOException, or if either the
* poller task, or the calling thread have been interrupted
*/
public synchronized byte[] readSlipMessage(int timeoutMSecs) throws IOException {
startPolling();
@ -203,16 +96,22 @@ class DataInputStreamWithTimeout implements Closeable {
logger.trace("readSlipMessage() => return slip message");
return slip;
} catch (NoSuchElementException e) {
// queue empty, wait and continue
// queue empty, fall through and continue
}
throwIfPollException();
try {
Thread.sleep(SLEEP_INTERVAL_MSECS);
} catch (InterruptedException e) {
logger.debug("readSlipMessage() => thread interrupt");
Future<Boolean> future = this.future;
if ((future != null) && future.isDone()) {
future.get(); // throws ExecutionException, InterruptedException
// future terminated without exception, but prematurely, which is itself an exception
throw new IOException("Poller thread terminated prematurely");
}
Thread.sleep(SLEEP_INTERVAL_MSECS); // throws InterruptedException
} catch (ExecutionException | InterruptedException e) {
// re-cast other exceptions as IOException
throw new IOException(e);
}
}
logger.debug("readSlipMessage() => no slip message after {}mS => time out", timeoutMSecs);
logger.debug("readSlipMessage() => no slip message");
return new byte[0];
}
@ -239,9 +138,12 @@ class DataInputStreamWithTimeout implements Closeable {
* Start the polling task
*/
private void startPolling() {
if (pollRunner == null) {
logger.trace("startPolling()");
pollRunner = new Poller(executor);
if (future == null) {
logger.debug("startPolling() called");
slipMessageQueue.clear();
poller = new Poller(inputStream, slipMessageQueue);
executor = Executors.newSingleThreadExecutor(bridge.getThreadFactory());
future = executor.submit(poller);
}
}
@ -249,11 +151,31 @@ class DataInputStreamWithTimeout implements Closeable {
* Stop the polling task
*/
private void stopPolling() {
Poller pollRunner = this.pollRunner;
if (pollRunner != null) {
logger.trace("stopPolling()");
pollRunner.interrupt();
}
logger.debug("stopPolling() called");
Poller poller = this.poller;
Future<Boolean> future = this.future;
ExecutorService executor = this.executor;
this.poller = null;
this.future = null;
this.executor = null;
if (executor != null) {
executor.shutdown();
}
if (poller != null) {
poller.interrupt();
}
if (future != null) {
try {
future.get(MAX_WAIT_SECONDS, TimeUnit.SECONDS);
} catch (ExecutionException e) {
// expected exception due to e.g. IOException on socket close
} catch (TimeoutException | InterruptedException e) {
// unexpected exception due to e.g. KLF200 'zombie state'
logger.warn("stopPolling() exception '{}' => PLEASE REPORT !!", e.getMessage());
}
}
}
}

View File

@ -0,0 +1,121 @@
/**
* Copyright (c) 2010-2022 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.binding.velux.internal.bridge.slip.io;
import java.io.IOException;
import java.io.InputStream;
import java.net.SocketTimeoutException;
import java.util.Arrays;
import java.util.Queue;
import java.util.concurrent.Callable;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class implements a Callable to read SLIP messages from the input stream.
*
* It implements a secondary polling thread to asynchronously read bytes from the socket input stream into a buffer.
* And it parses the bytes into SLIP messages, which are placed on a message queue.
*
* @author Andrew Fiddian-Green - Initial Contribution; refactored from private class in DataInputStreamWithTimeout
*/
@NonNullByDefault
public class Poller implements Callable<Boolean> {
private static final int BUFFER_SIZE = 512;
private static final int QUEUE_SIZE = 512;
// special character that marks the first and last byte of a slip message
private static final byte SLIP_MARK = (byte) 0xc0;
private static final byte SLIP_PROT = 0;
private final Logger logger = LoggerFactory.getLogger(Poller.class);
private final InputStream inputStream;
private final Queue<byte[]> messageQueue;
private @Nullable volatile Thread thread;
public Poller(InputStream stream, Queue<byte[]> queue) {
logger.trace("Poller: created");
inputStream = stream;
messageQueue = queue;
}
public void interrupt() {
Thread thread = this.thread;
if ((thread != null) && thread.isAlive()) {
thread.interrupt();
}
}
/**
* Task that loops to read bytes from inputStream and build SLIP packets from them. The SLIP packets are placed in
* messageQueue. It runs until 'interrupt()' or 'Thread.interrupt()' are called.
*
* @throws IOException in case of socket read errors
*/
@Override
public Boolean call() throws IOException {
thread = Thread.currentThread();
logger.trace("Poller.call(): started");
byte[] buf = new byte[BUFFER_SIZE];
int byt;
int i = 0;
while (!Thread.currentThread().isInterrupted()) {
try {
byt = inputStream.read(); // throws IOException
// end of stream is OK => continue polling
if (byt < 0) {
continue;
}
} catch (SocketTimeoutException e) {
// socket read time out is OK => continue polling
continue;
}
buf[i] = (byte) byt;
if ((i > 0) && (buf[i] == SLIP_MARK)) {
// the minimal slip message is 7 bytes [MM PP LL CC CC KK MM]
if ((i > 5) && (buf[0] == SLIP_MARK) && (buf[1] == SLIP_PROT)) {
messageQueue.offer(Arrays.copyOfRange(buf, 0, i + 1));
if (messageQueue.size() > QUEUE_SIZE) {
logger.warn("Poller.call(): slip message queue overflow => PLEASE REPORT !!");
messageQueue.poll();
}
i = 0;
} else {
if (logger.isWarnEnabled()) {
StringBuilder sb = new StringBuilder();
for (int j = 0; j <= i; j++) {
sb.append(String.format("%02X ", buf[j]));
}
logger.warn("Poller.call(): non slip messsage {} discarded => PLEASE REPORT !!", sb.toString());
}
buf[0] = SLIP_MARK;
i = 1;
}
continue;
}
if (++i >= BUFFER_SIZE) {
logger.warn("Poller.call(): input buffer overrun => PLEASE REPORT !!");
i = 0;
}
}
logger.trace("Poller.call(): completed");
return true;
}
}

View File

@ -135,6 +135,7 @@ public class VeluxBridgeHandler extends ExtendedBaseBridgeHandler implements Vel
private VeluxBridge myJsonBridge = new JsonVeluxBridge(this);
private VeluxBridge mySlipBridge = new SlipVeluxBridge(this);
private boolean disposing = false;
/*
* **************************************
@ -279,6 +280,7 @@ public class VeluxBridgeHandler extends ExtendedBaseBridgeHandler implements Vel
veluxBridgeConfiguration = new VeluxBinding(getConfigAs(VeluxBridgeConfiguration.class)).checked();
scheduler.execute(() -> {
disposing = false;
initializeSchedulerJob();
});
}
@ -314,6 +316,7 @@ public class VeluxBridgeHandler extends ExtendedBaseBridgeHandler implements Vel
@Override
public void dispose() {
scheduler.submit(() -> {
disposing = true;
disposeSchedulerJob();
});
}
@ -882,4 +885,13 @@ public class VeluxBridgeHandler extends ExtendedBaseBridgeHandler implements Vel
}
return threadFactory;
}
/**
* Indicates if the bridge thing is being disposed.
*
* @return true if the bridge thing is being disposed.
*/
public boolean isDisposing() {
return disposing;
}
}