[mqtt] Fix avail topics subscription after Brige Restart (#9851)

Fixes #9850

Signed-off-by: Florian Albrecht <cw.florian.albrecht@gmx.de>
This commit is contained in:
Florian Albrecht 2021-12-11 17:57:13 +01:00 committed by GitHub
parent ba3dfe3ed6
commit 13ca0d5500
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 29 additions and 25 deletions

View File

@ -12,7 +12,6 @@
*/
package org.openhab.binding.mqtt.generic;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
@ -23,12 +22,10 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.eclipse.jdt.annotation.NonNull;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.openhab.binding.mqtt.generic.utils.FutureCollector;
import org.openhab.binding.mqtt.generic.values.OnOffValue;
import org.openhab.binding.mqtt.generic.values.Value;
import org.openhab.binding.mqtt.handler.AbstractBrokerHandler;
@ -195,19 +192,7 @@ public abstract class AbstractMQTTThingHandler extends BaseThingHandler
// We do not set the thing to ONLINE here in the AbstractBase, that is the responsibility of a derived
// class.
try {
Collection<CompletableFuture<@Nullable Void>> futures = availabilityStates.values().stream().map(s -> {
if (s != null) {
return s.start(connection, scheduler, 0);
}
return CompletableFuture.allOf();
}).collect(Collectors.toList());
futures.add(start(connection));
futures.stream().collect(FutureCollector.allOf()).exceptionally(e -> {
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, e.getLocalizedMessage());
return null;
}).get(subscribeTimeout, TimeUnit.MILLISECONDS);
start(connection).get(subscribeTimeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException ignored) {
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR,
"Did not receive all required topics");

View File

@ -85,6 +85,9 @@ public class GenericMQTTThingHandler extends AbstractMQTTThingHandler implements
*/
@Override
protected CompletableFuture<@Nullable Void> start(MqttBrokerConnection connection) {
// availability topics are also started asynchronously, so no problem here
clearAllAvailabilityTopics();
initializeAvailabilityTopicsFromConfig();
return channelStateByChannelUID.values().stream().map(c -> c.start(connection, scheduler, 0))
.collect(FutureCollector.allOf()).thenRun(this::calculateThingStatus);
}
@ -142,15 +145,7 @@ public class GenericMQTTThingHandler extends AbstractMQTTThingHandler implements
@Override
public void initialize() {
GenericThingConfiguration config = getConfigAs(GenericThingConfiguration.class);
String availabilityTopic = config.availabilityTopic;
if (availabilityTopic != null) {
addAvailabilityTopic(availabilityTopic, config.payloadAvailable, config.payloadNotAvailable);
} else {
clearAllAvailabilityTopics();
}
initializeAvailabilityTopicsFromConfig();
List<ChannelUID> configErrors = new ArrayList<>();
for (Channel channel : thing.getChannels()) {
@ -193,4 +188,16 @@ public class GenericMQTTThingHandler extends AbstractMQTTThingHandler implements
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.NONE);
}
}
private void initializeAvailabilityTopicsFromConfig() {
GenericThingConfiguration config = getConfigAs(GenericThingConfiguration.class);
String availabilityTopic = config.availabilityTopic;
if (availabilityTopic != null) {
addAvailabilityTopic(availabilityTopic, config.payloadAvailable, config.payloadNotAvailable);
} else {
clearAllAvailabilityTopics();
}
}
}

View File

@ -192,4 +192,16 @@ public class GenericThingHandlerTests {
verify(callback).stateUpdated(eq(textChannelUID), argThat(arg -> "UPDATE".equals(arg.toString())));
assertThat(textValue.getChannelState().toString(), is("UPDATE"));
}
@Test
public void handleBridgeStatusChange() {
Configuration config = new Configuration();
config.put("availabilityTopic", "test/LWT");
when(thing.getConfiguration()).thenReturn(config);
thingHandler.initialize();
thingHandler
.bridgeStatusChanged(new ThingStatusInfo(ThingStatus.OFFLINE, ThingStatusDetail.BRIDGE_OFFLINE, null));
thingHandler.bridgeStatusChanged(new ThingStatusInfo(ThingStatus.ONLINE, ThingStatusDetail.NONE, null));
verify(connection, times(2)).subscribe(eq("test/LWT"), any());
}
}