diff --git a/bundles/org.openhab.binding.novafinedust/src/main/java/org/openhab/binding/novafinedust/internal/SDS011Handler.java b/bundles/org.openhab.binding.novafinedust/src/main/java/org/openhab/binding/novafinedust/internal/SDS011Handler.java index 656896215..87cec1ad1 100644 --- a/bundles/org.openhab.binding.novafinedust/src/main/java/org/openhab/binding/novafinedust/internal/SDS011Handler.java +++ b/bundles/org.openhab.binding.novafinedust/src/main/java/org/openhab/binding/novafinedust/internal/SDS011Handler.java @@ -52,6 +52,7 @@ import org.slf4j.LoggerFactory; @NonNullByDefault public class SDS011Handler extends BaseThingHandler { private static final Duration CONNECTION_MONITOR_START_DELAY_OFFSET = Duration.ofSeconds(10); + private static final Duration RETRY_INIT_DELAY = Duration.ofSeconds(10); private final Logger logger = LoggerFactory.getLogger(SDS011Handler.class); private final SerialPortManager serialPortManager; @@ -59,8 +60,9 @@ public class SDS011Handler extends BaseThingHandler { private NovaFineDustConfiguration config = new NovaFineDustConfiguration(); private @Nullable SDS011Communicator communicator; - private @Nullable ScheduledFuture pollingJob; + private @Nullable ScheduledFuture dataReadJob; private @Nullable ScheduledFuture connectionMonitor; + private @Nullable ScheduledFuture retryInitJob; private ZonedDateTime lastCommunication = ZonedDateTime.now(); @@ -100,14 +102,16 @@ public class SDS011Handler extends BaseThingHandler { return; } - // parse ports and if the port is found, initialize the reader + // parse port and if the port is found, initialize the reader SerialPortIdentifier portId = serialPortManager.getIdentifier(config.port); if (portId == null) { updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.OFFLINE.CONFIGURATION_ERROR, "Port is not known!"); + logger.debug("Serial port {} was not found, retrying in {}.", config.port, RETRY_INIT_DELAY); + retryInitJob = scheduler.schedule(this::initialize, RETRY_INIT_DELAY.getSeconds(), TimeUnit.SECONDS); return; } - this.communicator = new SDS011Communicator(this, portId); + this.communicator = new SDS011Communicator(this, portId, scheduler); if (config.reporting) { timeBetweenDataShouldArrive = Duration.ofMinutes(config.reportingInterval); @@ -116,37 +120,24 @@ public class SDS011Handler extends BaseThingHandler { timeBetweenDataShouldArrive = Duration.ofSeconds(config.pollingInterval); scheduler.submit(() -> initializeCommunicator(WorkMode.POLLING, timeBetweenDataShouldArrive)); } - - Duration connectionMonitorStartDelay = timeBetweenDataShouldArrive.plus(CONNECTION_MONITOR_START_DELAY_OFFSET); - connectionMonitor = scheduler.scheduleWithFixedDelay(this::verifyIfStillConnected, - connectionMonitorStartDelay.getSeconds(), timeBetweenDataShouldArrive.getSeconds(), TimeUnit.SECONDS); } private void initializeCommunicator(WorkMode mode, Duration interval) { SDS011Communicator localCommunicator = communicator; if (localCommunicator == null) { updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.OFFLINE.COMMUNICATION_ERROR, - "Could not create communicator instance"); + "Communicator instance is null in initializeCommunicator()"); return; } boolean initSuccessful = false; - try { - initSuccessful = localCommunicator.initialize(mode, interval); - } catch (final IOException ex) { - updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.OFFLINE.COMMUNICATION_ERROR, "I/O error!"); - return; - } catch (PortInUseException e) { - updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.OFFLINE.COMMUNICATION_ERROR, "Port is in use!"); - return; - } catch (TooManyListenersException e) { - updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.OFFLINE.COMMUNICATION_ERROR, - "Cannot attach listener to port!"); - return; - } catch (UnsupportedCommOperationException e) { - updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.OFFLINE.COMMUNICATION_ERROR, - "Cannot set serial port parameters"); - return; + int retryInit = 3; + int retryCount = 0; + // sometimes the device is a little difficult and needs multiple configuration attempts + while (!initSuccessful && retryCount < retryInit) { + logger.trace("Trying to initialize device attempt={}", retryCount); + initSuccessful = doInit(localCommunicator, mode, interval); + retryCount++; } if (initSuccessful) { @@ -154,7 +145,7 @@ public class SDS011Handler extends BaseThingHandler { updateStatus(ThingStatus.ONLINE); if (mode == WorkMode.POLLING) { - pollingJob = scheduler.scheduleWithFixedDelay(() -> { + dataReadJob = scheduler.scheduleWithFixedDelay(() -> { try { localCommunicator.requestSensorData(); } catch (IOException e) { @@ -162,15 +153,57 @@ public class SDS011Handler extends BaseThingHandler { "Cannot query data from device"); } }, 2, config.pollingInterval, TimeUnit.SECONDS); + } else { + // start a job that reads the port until data arrives + int reportingReadStartDelay = 10; + int startReadBeforeDataArrives = 5; + long readReportedDataInterval = (config.reportingInterval * 60) - reportingReadStartDelay + - startReadBeforeDataArrives; + logger.trace("Scheduling job to receive reported values"); + dataReadJob = scheduler.scheduleWithFixedDelay(() -> { + try { + localCommunicator.readSensorData(); + } catch (IOException e) { + updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.OFFLINE.COMMUNICATION_ERROR, + "Cannot query data from device, because: " + e.getMessage()); + } + }, reportingReadStartDelay, readReportedDataInterval, TimeUnit.SECONDS); } + + Duration connectionMonitorStartDelay = timeBetweenDataShouldArrive + .plus(CONNECTION_MONITOR_START_DELAY_OFFSET); + connectionMonitor = scheduler.scheduleWithFixedDelay(this::verifyIfStillConnected, + connectionMonitorStartDelay.getSeconds(), timeBetweenDataShouldArrive.getSeconds(), + TimeUnit.SECONDS); } else { updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.OFFLINE.COMMUNICATION_ERROR, "Commands and replies from the device don't seem to match"); - logger.debug("Could not configure sensor -> setting Thing to OFFLINE and disposing the handler"); - dispose(); + logger.debug( + "Could not configure sensor -> setting Thing to OFFLINE, disposing the handler and reschedule initialize in {} seconds", + RETRY_INIT_DELAY); + doDispose(false); + retryInitJob = scheduler.schedule(this::initialize, RETRY_INIT_DELAY.getSeconds(), TimeUnit.SECONDS); } } + private boolean doInit(SDS011Communicator localCommunicator, WorkMode mode, Duration interval) { + boolean initSuccessful = false; + try { + initSuccessful = localCommunicator.initialize(mode, interval); + } catch (final IOException ex) { + updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.OFFLINE.COMMUNICATION_ERROR, "I/O error!"); + } catch (PortInUseException e) { + updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.OFFLINE.COMMUNICATION_ERROR, "Port is in use!"); + } catch (TooManyListenersException e) { + updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.OFFLINE.COMMUNICATION_ERROR, + "Cannot attach listener to port, because there are too many listeners!"); + } catch (UnsupportedCommOperationException e) { + updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.OFFLINE.COMMUNICATION_ERROR, + "Cannot set serial port parameters"); + } + return initSuccessful; + } + private boolean validateConfiguration() { if (config.port.isEmpty()) { updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.OFFLINE.CONFIGURATION_ERROR, "Port must be set!"); @@ -194,10 +227,14 @@ public class SDS011Handler extends BaseThingHandler { @Override public void dispose() { - ScheduledFuture localPollingJob = this.pollingJob; + doDispose(true); + } + + private void doDispose(boolean sendDeviceToSleep) { + ScheduledFuture localPollingJob = this.dataReadJob; if (localPollingJob != null) { localPollingJob.cancel(true); - this.pollingJob = null; + this.dataReadJob = null; } ScheduledFuture localConnectionMonitor = this.connectionMonitor; @@ -206,9 +243,15 @@ public class SDS011Handler extends BaseThingHandler { this.connectionMonitor = null; } + ScheduledFuture localRetryOpenPortJob = this.retryInitJob; + if (localRetryOpenPortJob != null) { + localRetryOpenPortJob.cancel(true); + this.retryInitJob = null; + } + SDS011Communicator localCommunicator = this.communicator; if (localCommunicator != null) { - localCommunicator.dispose(); + localCommunicator.dispose(sendDeviceToSleep); } this.statePM10 = UnDefType.UNDEF; @@ -248,7 +291,7 @@ public class SDS011Handler extends BaseThingHandler { "Check connection cable and afterwards disable and enable this thing to make it work again"); // in case someone has pulled the plug, we dispose ourselves and the user has to deactivate/activate the // thing once the cable is plugged in again - dispose(); + doDispose(false); } else { logger.trace("Check Alive timer: All OK: lastCommunication={}, interval={}, tollerance={}", lastCommunication, timeBetweenDataShouldArrive, dataCanBeLateTolerance); diff --git a/bundles/org.openhab.binding.novafinedust/src/main/java/org/openhab/binding/novafinedust/internal/sds011protocol/SDS011Communicator.java b/bundles/org.openhab.binding.novafinedust/src/main/java/org/openhab/binding/novafinedust/internal/sds011protocol/SDS011Communicator.java index db438376a..3e3c11e32 100644 --- a/bundles/org.openhab.binding.novafinedust/src/main/java/org/openhab/binding/novafinedust/internal/sds011protocol/SDS011Communicator.java +++ b/bundles/org.openhab.binding.novafinedust/src/main/java/org/openhab/binding/novafinedust/internal/sds011protocol/SDS011Communicator.java @@ -18,6 +18,11 @@ import java.io.OutputStream; import java.time.Duration; import java.util.Arrays; import java.util.TooManyListenersException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.eclipse.jdt.annotation.NonNullByDefault; import org.eclipse.jdt.annotation.Nullable; @@ -32,8 +37,6 @@ import org.openhab.binding.novafinedust.internal.sds011protocol.messages.SleepRe import org.openhab.binding.novafinedust.internal.sds011protocol.messages.WorkingPeriodReply; import org.openhab.core.io.transport.serial.PortInUseException; import org.openhab.core.io.transport.serial.SerialPort; -import org.openhab.core.io.transport.serial.SerialPortEvent; -import org.openhab.core.io.transport.serial.SerialPortEventListener; import org.openhab.core.io.transport.serial.SerialPortIdentifier; import org.openhab.core.io.transport.serial.UnsupportedCommOperationException; import org.openhab.core.util.HexUtils; @@ -47,7 +50,9 @@ import org.slf4j.LoggerFactory; * */ @NonNullByDefault -public class SDS011Communicator implements SerialPortEventListener { +public class SDS011Communicator { + + private static final int MAX_SENDOR_REPORTINGS_UNTIL_EXPECTED_REPLY = 20; private final Logger logger = LoggerFactory.getLogger(SDS011Communicator.class); @@ -57,10 +62,13 @@ public class SDS011Communicator implements SerialPortEventListener { private @Nullable OutputStream outputStream; private @Nullable InputStream inputStream; + private @Nullable ScheduledExecutorService scheduler; - public SDS011Communicator(SDS011Handler thingHandler, SerialPortIdentifier portId) { + public SDS011Communicator(SDS011Handler thingHandler, SerialPortIdentifier portId, + ScheduledExecutorService scheduler) { this.thingHandler = thingHandler; this.portId = portId; + this.scheduler = scheduler; } /** @@ -78,8 +86,12 @@ public class SDS011Communicator implements SerialPortEventListener { throws PortInUseException, TooManyListenersException, IOException, UnsupportedCommOperationException { boolean initSuccessful = true; + logger.trace("Initializing with mode={}, interval={}", mode, interval); + SerialPort localSerialPort = portId.open(thingHandler.getThing().getUID().toString(), 2000); + logger.trace("Port opened, object is={}", localSerialPort); localSerialPort.setSerialPortParams(9600, 8, 1, 0); + logger.trace("Serial parameters set on port"); outputStream = localSerialPort.getOutputStream(); inputStream = localSerialPort.getInputStream(); @@ -87,24 +99,27 @@ public class SDS011Communicator implements SerialPortEventListener { if (inputStream == null || outputStream == null) { throw new IOException("Could not create input or outputstream for the port"); } + logger.trace("Input and Outputstream opened for the port"); // wake up the device initSuccessful &= sendSleep(false); + logger.trace("Wake up call done, initSuccessful={}", initSuccessful); initSuccessful &= getFirmware(); + logger.trace("Firmware requested, initSuccessful={}", initSuccessful); if (mode == WorkMode.POLLING) { initSuccessful &= setMode(WorkMode.POLLING); + logger.trace("Polling mode set, initSuccessful={}", initSuccessful); initSuccessful &= setWorkingPeriod((byte) 0); + logger.trace("Working period for polling set, initSuccessful={}", initSuccessful); } else { // reporting initSuccessful &= setWorkingPeriod((byte) interval.toMinutes()); + logger.trace("Working period for reporting set, initSuccessful={}", initSuccessful); initSuccessful &= setMode(WorkMode.REPORTING); + logger.trace("Reporting mode set, initSuccessful={}", initSuccessful); } - // enable listeners only after we have configured the sensor above because for configuring we send and read data - // sequentially - localSerialPort.notifyOnDataAvailable(true); - localSerialPort.addEventListener(this); this.serialPort = localSerialPort; return initSuccessful; @@ -116,7 +131,12 @@ public class SDS011Communicator implements SerialPortEventListener { logger.debug("Will send command: {} ({})", HexUtils.bytesToHex(commandData), Arrays.toString(commandData)); } - write(commandData); + try { + write(commandData); + } catch (IOException ioex) { + logger.debug("Got an exception while writing a command, will not try to fetch a reply for it.", ioex); + throw ioex; + } try { // Give the sensor some time to handle the command @@ -127,9 +147,13 @@ public class SDS011Communicator implements SerialPortEventListener { } SensorReply reply = readReply(); // in case there is still another reporting active, we want to discard the sensor data and read the reply to our - // command again - if (reply instanceof SensorMeasuredDataReply) { - reply = readReply(); + // command again, this might happen more often in case the sensor has buffered some data + for (int i = 0; i < MAX_SENDOR_REPORTINGS_UNTIL_EXPECTED_REPLY; i++) { + if (reply instanceof SensorMeasuredDataReply) { + reply = readReply(); + } else { + break; + } } return reply; } @@ -218,7 +242,7 @@ public class SDS011Communicator implements SerialPortEventListener { } /** - * Request data from the device, they will be returned via the serialEvent callback + * Request data from the device * * @throws IOException */ @@ -229,6 +253,13 @@ public class SDS011Communicator implements SerialPortEventListener { logger.debug("Requesting sensor data, will send: {}", HexUtils.bytesToHex(data)); } write(data); + try { + Thread.sleep(200); // give the device some time to handle the command + } catch (InterruptedException e) { + logger.warn("Interrupted while waiting before reading a reply to our rquest data command."); + Thread.currentThread().interrupt(); + } + readSensorData(); } private @Nullable SensorReply readReply() throws IOException { @@ -237,7 +268,8 @@ public class SDS011Communicator implements SerialPortEventListener { InputStream localInpuStream = inputStream; int b = -1; - if (localInpuStream != null && localInpuStream.available() > 0) { + if (localInpuStream != null) { + logger.trace("Reading for reply until first byte is found"); while ((b = localInpuStream.read()) != Constants.MESSAGE_START_AS_INT) { logger.debug("Trying to find first reply byte now..."); } @@ -252,25 +284,16 @@ public class SDS011Communicator implements SerialPortEventListener { return null; } - /** - * Data from the device is arriving and will be parsed accordingly - */ - @Override - public void serialEvent(SerialPortEvent event) { - if (event.getEventType() == SerialPortEvent.DATA_AVAILABLE) { - // we get here if data has been received - SensorReply reply = null; - try { - reply = readReply(); - logger.debug("Got data from sensor: {}", reply); - } catch (IOException e) { - logger.warn("Could not read available data from the serial port: {}", e.getMessage()); - } - if (reply instanceof SensorMeasuredDataReply) { - SensorMeasuredDataReply sensorData = (SensorMeasuredDataReply) reply; - if (sensorData.isValidData()) { - thingHandler.updateChannels(sensorData); - } + public void readSensorData() throws IOException { + logger.trace("readSensorData() called"); + SensorReply reply = readReply(); + logger.trace("readSensorData(): Read reply={}", reply); + if (reply instanceof SensorMeasuredDataReply) { + SensorMeasuredDataReply sensorData = (SensorMeasuredDataReply) reply; + logger.trace("We received sensor data"); + if (sensorData.isValidData()) { + logger.trace("Sensor data is valid => updating channels"); + thingHandler.updateChannels(sensorData); } } } @@ -278,38 +301,46 @@ public class SDS011Communicator implements SerialPortEventListener { /** * Shutdown the communication, i.e. send the device to sleep and close the serial port */ - public void dispose() { + public void dispose(boolean sendtoSleep) { SerialPort localSerialPort = serialPort; if (localSerialPort != null) { + if (sendtoSleep) { + sendDeviceToSleepOnDispose(); + } + + logger.debug("Closing the port now"); + localSerialPort.close(); + + serialPort = null; + } + this.scheduler = null; + } + + private void sendDeviceToSleepOnDispose() { + @Nullable + ScheduledExecutorService localScheduler = scheduler; + if (localScheduler != null) { + Future sleepJob = null; try { - // send the device to sleep to preserve power and extend the lifetime of the sensor - sendSleep(true); - } catch (IOException e) { - // ignore because we are shutting down anyway - logger.debug("Exception while disposing communicator (will ignore it)", e); - } finally { - localSerialPort.removeEventListener(); - localSerialPort.close(); - serialPort = null; + sleepJob = localScheduler.submit(() -> { + try { + sendSleep(true); + } catch (IOException e) { + logger.debug("Exception while sending sleep on disposing the communicator (will ignore it)", e); + } + }); + sleepJob.get(5, TimeUnit.SECONDS); + } catch (TimeoutException e) { + logger.warn("Could not send device to sleep, because command takes longer than 5 seconds."); + sleepJob.cancel(true); + } catch (ExecutionException e) { + logger.debug("Could not execute sleep command.", e); + } catch (InterruptedException e) { + logger.debug("Sending device to sleep was interrupted."); + Thread.currentThread().interrupt(); } - } - - try { - InputStream localInputStream = inputStream; - if (localInputStream != null) { - localInputStream.close(); - } - } catch (IOException e) { - logger.debug("Error while closing the input stream: {}", e.getMessage()); - } - - try { - OutputStream localOutputStream = outputStream; - if (localOutputStream != null) { - localOutputStream.close(); - } - } catch (IOException e) { - logger.debug("Error while closing the output stream: {}", e.getMessage()); + } else { + logger.debug("Scheduler was null, could not send device to sleep."); } } }