[enocean] Fix disposal of running tasks (#15329)
* Fix disposal of running tasks Fixes #15328 --------- Signed-off-by: Jacob Laursen <jacob-github@vindvejr.dk>
This commit is contained in:
parent
92238bcb8b
commit
a9d6d805f4
|
@ -296,10 +296,10 @@ public class EnOceanBaseActuatorHandler extends EnOceanBaseSensorHandler {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void dispose() {
|
public void dispose() {
|
||||||
ScheduledFuture<?> localRefreshJob = refreshJob;
|
ScheduledFuture<?> refreshJob = this.refreshJob;
|
||||||
if (localRefreshJob != null && !localRefreshJob.isCancelled()) {
|
if (refreshJob != null) {
|
||||||
localRefreshJob.cancel(true);
|
refreshJob.cancel(true);
|
||||||
refreshJob = null;
|
this.refreshJob = null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -202,9 +202,9 @@ public class EnOceanBaseSensorHandler extends EnOceanBaseThingHandler implements
|
||||||
// fire trigger for receive
|
// fire trigger for receive
|
||||||
triggerChannel(prepareAnswer, "requestAnswer");
|
triggerChannel(prepareAnswer, "requestAnswer");
|
||||||
// Send response after 100ms
|
// Send response after 100ms
|
||||||
ScheduledFuture<?> localResponseFuture = responseFuture;
|
ScheduledFuture<?> responseFuture = this.responseFuture;
|
||||||
if (localResponseFuture == null || localResponseFuture.isDone()) {
|
if (responseFuture == null || responseFuture.isDone()) {
|
||||||
localResponseFuture = scheduler.schedule(this::sendRequestResponse, 100, TimeUnit.MILLISECONDS);
|
this.responseFuture = scheduler.schedule(this::sendRequestResponse, 100, TimeUnit.MILLISECONDS);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -165,17 +165,14 @@ public class EnOceanBridgeHandler extends ConfigStatusBridgeHandler implements T
|
||||||
public void initialize() {
|
public void initialize() {
|
||||||
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_PENDING, "trying to connect to gateway...");
|
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_PENDING, "trying to connect to gateway...");
|
||||||
|
|
||||||
ScheduledFuture<?> localConnectorTask = connectorTask;
|
connectorTask = scheduler.scheduleWithFixedDelay(new Runnable() {
|
||||||
if (localConnectorTask == null || localConnectorTask.isDone()) {
|
@Override
|
||||||
localConnectorTask = scheduler.scheduleWithFixedDelay(new Runnable() {
|
public void run() {
|
||||||
@Override
|
if (thing.getStatus() != ThingStatus.ONLINE) {
|
||||||
public void run() {
|
initTransceiver();
|
||||||
if (thing.getStatus() != ThingStatus.ONLINE) {
|
|
||||||
initTransceiver();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}, 0, 60, TimeUnit.SECONDS);
|
}
|
||||||
}
|
}, 0, 60, TimeUnit.SECONDS);
|
||||||
}
|
}
|
||||||
|
|
||||||
private synchronized void initTransceiver() {
|
private synchronized void initTransceiver() {
|
||||||
|
@ -303,16 +300,16 @@ public class EnOceanBridgeHandler extends ConfigStatusBridgeHandler implements T
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void dispose() {
|
public synchronized void dispose() {
|
||||||
EnOceanTransceiver localTransceiver = transceiver;
|
EnOceanTransceiver transceiver = this.transceiver;
|
||||||
if (localTransceiver != null) {
|
if (transceiver != null) {
|
||||||
localTransceiver.shutDown();
|
transceiver.shutDown();
|
||||||
transceiver = null;
|
this.transceiver = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
ScheduledFuture<?> localConnectorTask = connectorTask;
|
ScheduledFuture<?> connectorTask = this.connectorTask;
|
||||||
if (localConnectorTask != null && !localConnectorTask.isDone()) {
|
if (connectorTask != null) {
|
||||||
localConnectorTask.cancel(true);
|
connectorTask.cancel(true);
|
||||||
connectorTask = null;
|
this.connectorTask = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
super.dispose();
|
super.dispose();
|
||||||
|
|
|
@ -276,13 +276,12 @@ public class EnOceanClassicDeviceHandler extends EnOceanBaseActuatorHandler {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void handleRemoval() {
|
public void handleRemoval() {
|
||||||
ScheduledFuture<?> future = releaseFuture;
|
ScheduledFuture<?> releaseFuture = this.releaseFuture;
|
||||||
if (future != null && !future.isDone()) {
|
if (releaseFuture != null) {
|
||||||
future.cancel(true);
|
releaseFuture.cancel(true);
|
||||||
future = null;
|
this.releaseFuture = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
releaseFuture = null;
|
|
||||||
super.handleRemoval();
|
super.handleRemoval();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -59,7 +59,7 @@ public abstract class EnOceanTransceiver implements SerialPortEventListener {
|
||||||
|
|
||||||
// Thread management
|
// Thread management
|
||||||
protected @Nullable Future<?> readingTask = null;
|
protected @Nullable Future<?> readingTask = null;
|
||||||
private @Nullable Future<?> timeOut = null;
|
private @Nullable Future<?> timeOutTask = null;
|
||||||
|
|
||||||
protected Logger logger = LoggerFactory.getLogger(EnOceanTransceiver.class);
|
protected Logger logger = LoggerFactory.getLogger(EnOceanTransceiver.class);
|
||||||
|
|
||||||
|
@ -121,14 +121,14 @@ public abstract class EnOceanTransceiver implements SerialPortEventListener {
|
||||||
localOutPutStream.write(b);
|
localOutPutStream.write(b);
|
||||||
localOutPutStream.flush();
|
localOutPutStream.flush();
|
||||||
}
|
}
|
||||||
Future<?> localTimeOut = timeOut;
|
Future<?> localTimeOutTask = timeOutTask;
|
||||||
if (localTimeOut != null) {
|
if (localTimeOutTask != null) {
|
||||||
localTimeOut.cancel(true);
|
localTimeOutTask.cancel(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
// slowdown sending of message to avoid hickups at receivers
|
// slowdown sending of message to avoid hickups at receivers
|
||||||
// Todo tweak sending intervall (250 ist just a first try)
|
// Todo tweak sending intervall (250 ist just a first try)
|
||||||
timeOut = scheduler.schedule(() -> {
|
timeOutTask = scheduler.schedule(() -> {
|
||||||
try {
|
try {
|
||||||
sendNext();
|
sendNext();
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
|
@ -222,9 +222,9 @@ public abstract class EnOceanTransceiver implements SerialPortEventListener {
|
||||||
|
|
||||||
public void startReceiving(ScheduledExecutorService scheduler) {
|
public void startReceiving(ScheduledExecutorService scheduler) {
|
||||||
@Nullable
|
@Nullable
|
||||||
Future<?> localReadingTask = readingTask;
|
Future<?> readingTask = this.readingTask;
|
||||||
if (localReadingTask == null || localReadingTask.isCancelled()) {
|
if (readingTask == null || readingTask.isCancelled()) {
|
||||||
readingTask = scheduler.submit(new Runnable() {
|
this.readingTask = scheduler.submit(new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
receivePackets();
|
receivePackets();
|
||||||
|
@ -238,14 +238,15 @@ public abstract class EnOceanTransceiver implements SerialPortEventListener {
|
||||||
logger.debug("shutting down transceiver");
|
logger.debug("shutting down transceiver");
|
||||||
logger.debug("Interrupt rx Thread");
|
logger.debug("Interrupt rx Thread");
|
||||||
|
|
||||||
Future<?> localTimeOut = timeOut;
|
Future<?> timeOutTask = this.timeOutTask;
|
||||||
if (localTimeOut != null) {
|
if (timeOutTask != null) {
|
||||||
localTimeOut.cancel(true);
|
timeOutTask.cancel(true);
|
||||||
|
this.timeOutTask = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
Future<?> localReadingTask = readingTask;
|
Future<?> readingTask = this.readingTask;
|
||||||
if (localReadingTask != null) {
|
if (readingTask != null) {
|
||||||
localReadingTask.cancel(true);
|
readingTask.cancel(true);
|
||||||
|
|
||||||
InputStream localInputStream = inputStream;
|
InputStream localInputStream = inputStream;
|
||||||
if (localInputStream != null) {
|
if (localInputStream != null) {
|
||||||
|
@ -255,10 +256,9 @@ public abstract class EnOceanTransceiver implements SerialPortEventListener {
|
||||||
logger.debug("IOException occured while closing the stream", e);
|
logger.debug("IOException occured while closing the stream", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
this.readingTask = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
readingTask = null;
|
|
||||||
timeOut = null;
|
|
||||||
listeners.clear();
|
listeners.clear();
|
||||||
eventListeners.clear();
|
eventListeners.clear();
|
||||||
teachInListener = null;
|
teachInListener = null;
|
||||||
|
@ -298,8 +298,8 @@ public abstract class EnOceanTransceiver implements SerialPortEventListener {
|
||||||
private void receivePackets() {
|
private void receivePackets() {
|
||||||
byte[] buffer = new byte[1];
|
byte[] buffer = new byte[1];
|
||||||
|
|
||||||
Future<?> localReadingTask = readingTask;
|
Future<?> readingTask = this.readingTask;
|
||||||
while (localReadingTask != null && !localReadingTask.isCancelled()) {
|
while (readingTask != null && !readingTask.isCancelled()) {
|
||||||
int bytesRead = read(buffer, 1);
|
int bytesRead = read(buffer, 1);
|
||||||
if (bytesRead > 0) {
|
if (bytesRead > 0) {
|
||||||
processMessage(buffer[0]);
|
processMessage(buffer[0]);
|
||||||
|
@ -320,10 +320,10 @@ public abstract class EnOceanTransceiver implements SerialPortEventListener {
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
logger.warn("Cannot read from null stream");
|
logger.warn("Cannot read from null stream");
|
||||||
Future<?> localReadingTask = readingTask;
|
Future<?> readingTask = this.readingTask;
|
||||||
if (localReadingTask != null) {
|
if (readingTask != null) {
|
||||||
localReadingTask.cancel(true);
|
readingTask.cancel(true);
|
||||||
readingTask = null;
|
this.readingTask = null;
|
||||||
}
|
}
|
||||||
TransceiverErrorListener localListener = errorListener;
|
TransceiverErrorListener localListener = errorListener;
|
||||||
if (localListener != null) {
|
if (localListener != null) {
|
||||||
|
|
Loading…
Reference in New Issue