diff --git a/bundles/org.openhab.binding.mqtt.generic/src/main/java/org/openhab/binding/mqtt/generic/AbstractMQTTThingHandler.java b/bundles/org.openhab.binding.mqtt.generic/src/main/java/org/openhab/binding/mqtt/generic/AbstractMQTTThingHandler.java index 1c11e4628..14ecdc851 100644 --- a/bundles/org.openhab.binding.mqtt.generic/src/main/java/org/openhab/binding/mqtt/generic/AbstractMQTTThingHandler.java +++ b/bundles/org.openhab.binding.mqtt.generic/src/main/java/org/openhab/binding/mqtt/generic/AbstractMQTTThingHandler.java @@ -12,7 +12,6 @@ */ package org.openhab.binding.mqtt.generic; -import java.util.Collection; import java.util.HashSet; import java.util.Map; import java.util.Optional; @@ -23,12 +22,10 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.stream.Collectors; import org.eclipse.jdt.annotation.NonNull; import org.eclipse.jdt.annotation.NonNullByDefault; import org.eclipse.jdt.annotation.Nullable; -import org.openhab.binding.mqtt.generic.utils.FutureCollector; import org.openhab.binding.mqtt.generic.values.OnOffValue; import org.openhab.binding.mqtt.generic.values.Value; import org.openhab.binding.mqtt.handler.AbstractBrokerHandler; @@ -195,19 +192,7 @@ public abstract class AbstractMQTTThingHandler extends BaseThingHandler // We do not set the thing to ONLINE here in the AbstractBase, that is the responsibility of a derived // class. try { - Collection> futures = availabilityStates.values().stream().map(s -> { - if (s != null) { - return s.start(connection, scheduler, 0); - } - return CompletableFuture.allOf(); - }).collect(Collectors.toList()); - - futures.add(start(connection)); - - futures.stream().collect(FutureCollector.allOf()).exceptionally(e -> { - updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, e.getLocalizedMessage()); - return null; - }).get(subscribeTimeout, TimeUnit.MILLISECONDS); + start(connection).get(subscribeTimeout, TimeUnit.MILLISECONDS); } catch (InterruptedException | ExecutionException | TimeoutException ignored) { updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, "Did not receive all required topics"); diff --git a/bundles/org.openhab.binding.mqtt.generic/src/main/java/org/openhab/binding/mqtt/generic/internal/handler/GenericMQTTThingHandler.java b/bundles/org.openhab.binding.mqtt.generic/src/main/java/org/openhab/binding/mqtt/generic/internal/handler/GenericMQTTThingHandler.java index 8cc05429e..c157506fb 100644 --- a/bundles/org.openhab.binding.mqtt.generic/src/main/java/org/openhab/binding/mqtt/generic/internal/handler/GenericMQTTThingHandler.java +++ b/bundles/org.openhab.binding.mqtt.generic/src/main/java/org/openhab/binding/mqtt/generic/internal/handler/GenericMQTTThingHandler.java @@ -85,6 +85,9 @@ public class GenericMQTTThingHandler extends AbstractMQTTThingHandler implements */ @Override protected CompletableFuture<@Nullable Void> start(MqttBrokerConnection connection) { + // availability topics are also started asynchronously, so no problem here + clearAllAvailabilityTopics(); + initializeAvailabilityTopicsFromConfig(); return channelStateByChannelUID.values().stream().map(c -> c.start(connection, scheduler, 0)) .collect(FutureCollector.allOf()).thenRun(this::calculateThingStatus); } @@ -142,15 +145,7 @@ public class GenericMQTTThingHandler extends AbstractMQTTThingHandler implements @Override public void initialize() { - GenericThingConfiguration config = getConfigAs(GenericThingConfiguration.class); - - String availabilityTopic = config.availabilityTopic; - - if (availabilityTopic != null) { - addAvailabilityTopic(availabilityTopic, config.payloadAvailable, config.payloadNotAvailable); - } else { - clearAllAvailabilityTopics(); - } + initializeAvailabilityTopicsFromConfig(); List configErrors = new ArrayList<>(); for (Channel channel : thing.getChannels()) { @@ -193,4 +188,16 @@ public class GenericMQTTThingHandler extends AbstractMQTTThingHandler implements updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.NONE); } } + + private void initializeAvailabilityTopicsFromConfig() { + GenericThingConfiguration config = getConfigAs(GenericThingConfiguration.class); + + String availabilityTopic = config.availabilityTopic; + + if (availabilityTopic != null) { + addAvailabilityTopic(availabilityTopic, config.payloadAvailable, config.payloadNotAvailable); + } else { + clearAllAvailabilityTopics(); + } + } } diff --git a/bundles/org.openhab.binding.mqtt.generic/src/test/java/org/openhab/binding/mqtt/generic/internal/handler/GenericThingHandlerTests.java b/bundles/org.openhab.binding.mqtt.generic/src/test/java/org/openhab/binding/mqtt/generic/internal/handler/GenericThingHandlerTests.java index 13cf9946e..d9f41464a 100644 --- a/bundles/org.openhab.binding.mqtt.generic/src/test/java/org/openhab/binding/mqtt/generic/internal/handler/GenericThingHandlerTests.java +++ b/bundles/org.openhab.binding.mqtt.generic/src/test/java/org/openhab/binding/mqtt/generic/internal/handler/GenericThingHandlerTests.java @@ -192,4 +192,16 @@ public class GenericThingHandlerTests { verify(callback).stateUpdated(eq(textChannelUID), argThat(arg -> "UPDATE".equals(arg.toString()))); assertThat(textValue.getChannelState().toString(), is("UPDATE")); } + + @Test + public void handleBridgeStatusChange() { + Configuration config = new Configuration(); + config.put("availabilityTopic", "test/LWT"); + when(thing.getConfiguration()).thenReturn(config); + thingHandler.initialize(); + thingHandler + .bridgeStatusChanged(new ThingStatusInfo(ThingStatus.OFFLINE, ThingStatusDetail.BRIDGE_OFFLINE, null)); + thingHandler.bridgeStatusChanged(new ThingStatusInfo(ThingStatus.ONLINE, ThingStatusDetail.NONE, null)); + verify(connection, times(2)).subscribe(eq("test/LWT"), any()); + } }