[mqtt] Discovery services shall not unsubscribe unless they have already subscribed (#10566)
* [mqqt] do not allow unsubscribe unless already subscribed Signed-off-by: Andrew Fiddian-Green <software@whitebear.ch>
This commit is contained in:
parent
bcd89086f2
commit
2e8700e7c4
@ -16,6 +16,7 @@ import java.util.Date;
|
|||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.ScheduledFuture;
|
import java.util.concurrent.ScheduledFuture;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import org.eclipse.jdt.annotation.NonNullByDefault;
|
import org.eclipse.jdt.annotation.NonNullByDefault;
|
||||||
import org.eclipse.jdt.annotation.Nullable;
|
import org.eclipse.jdt.annotation.Nullable;
|
||||||
@ -48,11 +49,32 @@ public abstract class AbstractMQTTDiscovery extends AbstractDiscoveryService imp
|
|||||||
|
|
||||||
private @Nullable ScheduledFuture<?> scheduledStop;
|
private @Nullable ScheduledFuture<?> scheduledStop;
|
||||||
|
|
||||||
|
private AtomicBoolean isSubscribed;
|
||||||
|
|
||||||
public AbstractMQTTDiscovery(@Nullable Set<ThingTypeUID> supportedThingTypes, int timeout,
|
public AbstractMQTTDiscovery(@Nullable Set<ThingTypeUID> supportedThingTypes, int timeout,
|
||||||
boolean backgroundDiscoveryEnabledByDefault, String baseTopic) {
|
boolean backgroundDiscoveryEnabledByDefault, String baseTopic) {
|
||||||
super(supportedThingTypes, 0, backgroundDiscoveryEnabledByDefault);
|
super(supportedThingTypes, 0, backgroundDiscoveryEnabledByDefault);
|
||||||
this.subscribeTopic = baseTopic;
|
this.subscribeTopic = baseTopic;
|
||||||
this.timeout = timeout;
|
this.timeout = timeout;
|
||||||
|
isSubscribed = new AtomicBoolean(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Only subscribe if we were not already subscribed
|
||||||
|
*/
|
||||||
|
private void subscribe() {
|
||||||
|
if (!isSubscribed.getAndSet(true)) {
|
||||||
|
getDiscoveryService().subscribe(this, subscribeTopic);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Only unsubscribe if we were already subscribed
|
||||||
|
*/
|
||||||
|
private void unSubscribe() {
|
||||||
|
if (isSubscribed.getAndSet(false)) {
|
||||||
|
getDiscoveryService().unsubscribe(this);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -94,7 +116,7 @@ public abstract class AbstractMQTTDiscovery extends AbstractDiscoveryService imp
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
resetTimeout();
|
resetTimeout();
|
||||||
getDiscoveryService().subscribe(this, subscribeTopic);
|
subscribe();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -104,7 +126,7 @@ public abstract class AbstractMQTTDiscovery extends AbstractDiscoveryService imp
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
stopTimeout();
|
stopTimeout();
|
||||||
getDiscoveryService().unsubscribe(this);
|
unSubscribe();
|
||||||
super.stopScan();
|
super.stopScan();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -118,11 +140,11 @@ public abstract class AbstractMQTTDiscovery extends AbstractDiscoveryService imp
|
|||||||
protected void startBackgroundDiscovery() {
|
protected void startBackgroundDiscovery() {
|
||||||
// Remove results that are restored after a restart
|
// Remove results that are restored after a restart
|
||||||
removeOlderResults(new Date().getTime());
|
removeOlderResults(new Date().getTime());
|
||||||
getDiscoveryService().subscribe(this, subscribeTopic);
|
subscribe();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void stopBackgroundDiscovery() {
|
protected void stopBackgroundDiscovery() {
|
||||||
getDiscoveryService().unsubscribe(this);
|
unSubscribe();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -12,13 +12,11 @@
|
|||||||
*/
|
*/
|
||||||
package org.openhab.binding.mqtt.internal;
|
package org.openhab.binding.mqtt.internal;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.WeakHashMap;
|
import java.util.WeakHashMap;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
@ -54,11 +52,22 @@ import org.slf4j.LoggerFactory;
|
|||||||
@Component(service = { ThingHandlerFactory.class,
|
@Component(service = { ThingHandlerFactory.class,
|
||||||
MQTTTopicDiscoveryService.class }, configurationPid = "MqttBrokerHandlerFactory")
|
MQTTTopicDiscoveryService.class }, configurationPid = "MqttBrokerHandlerFactory")
|
||||||
public class MqttBrokerHandlerFactory extends BaseThingHandlerFactory implements MQTTTopicDiscoveryService {
|
public class MqttBrokerHandlerFactory extends BaseThingHandlerFactory implements MQTTTopicDiscoveryService {
|
||||||
|
|
||||||
private static final Set<ThingTypeUID> SUPPORTED_THING_TYPES_UIDS = Stream
|
private static final Set<ThingTypeUID> SUPPORTED_THING_TYPES_UIDS = Stream
|
||||||
.of(MqttBindingConstants.BRIDGE_TYPE_SYSTEMBROKER, MqttBindingConstants.BRIDGE_TYPE_BROKER)
|
.of(MqttBindingConstants.BRIDGE_TYPE_SYSTEMBROKER, MqttBindingConstants.BRIDGE_TYPE_BROKER)
|
||||||
.collect(Collectors.toSet());
|
.collect(Collectors.toSet());
|
||||||
|
|
||||||
private final Logger logger = LoggerFactory.getLogger(MqttBrokerHandlerFactory.class);
|
private final Logger logger = LoggerFactory.getLogger(MqttBrokerHandlerFactory.class);
|
||||||
protected final Map<String, List<MQTTTopicDiscoveryParticipant>> discoveryTopics = new HashMap<>();
|
|
||||||
|
/**
|
||||||
|
* This Map provides a lookup between a Topic string (key) and a Set of MQTTTopicDiscoveryParticipants (value),
|
||||||
|
* where the Set itself is a list of participants which are subscribed to the respective Topic.
|
||||||
|
*/
|
||||||
|
protected final Map<String, Set<MQTTTopicDiscoveryParticipant>> discoveryTopics = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This Set contains a list of all the Broker handlers that have been created by this factory
|
||||||
|
*/
|
||||||
protected final Set<AbstractBrokerHandler> handlers = Collections
|
protected final Set<AbstractBrokerHandler> handlers = Collections
|
||||||
.synchronizedSet(Collections.newSetFromMap(new WeakHashMap<>()));
|
.synchronizedSet(Collections.newSetFromMap(new WeakHashMap<>()));
|
||||||
|
|
||||||
@ -75,12 +84,13 @@ public class MqttBrokerHandlerFactory extends BaseThingHandlerFactory implements
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Add the given broker connection to all listeners.
|
* Add the given broker handler to the list of known handlers. And then iterate over all topics and their respective
|
||||||
|
* list of listeners, and register the respective new listener and topic with the given new broker handler.
|
||||||
*/
|
*/
|
||||||
protected void createdHandler(AbstractBrokerHandler handler) {
|
protected void createdHandler(AbstractBrokerHandler handler) {
|
||||||
handlers.add(handler);
|
handlers.add(handler);
|
||||||
discoveryTopics.forEach((topic, listenerList) -> {
|
discoveryTopics.forEach((topic, listeners) -> {
|
||||||
listenerList.forEach(listener -> {
|
listeners.forEach(listener -> {
|
||||||
handler.registerDiscoveryListener(listener, topic);
|
handler.registerDiscoveryListener(listener, topic);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
@ -111,24 +121,33 @@ public class MqttBrokerHandlerFactory extends BaseThingHandlerFactory implements
|
|||||||
/**
|
/**
|
||||||
* This factory also implements {@link MQTTTopicDiscoveryService} so consumers can subscribe to
|
* This factory also implements {@link MQTTTopicDiscoveryService} so consumers can subscribe to
|
||||||
* a MQTT topic that is registered on all available broker connections.
|
* a MQTT topic that is registered on all available broker connections.
|
||||||
*/
|
*
|
||||||
@Override
|
* Checks each topic, and if the listener is not already in the listener list for that topic, adds itself from that
|
||||||
public void subscribe(MQTTTopicDiscoveryParticipant listener, String topic) {
|
* list, and registers itself and the respective topic with all the known brokers.
|
||||||
List<MQTTTopicDiscoveryParticipant> listenerList = discoveryTopics.computeIfAbsent(topic,
|
|
||||||
t -> new ArrayList<>());
|
|
||||||
listenerList.add(listener);
|
|
||||||
handlers.forEach(broker -> broker.registerDiscoveryListener(listener, topic));
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Unsubscribe a listener from all available broker connections.
|
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
@SuppressWarnings("null")
|
@SuppressWarnings("null")
|
||||||
|
public void subscribe(MQTTTopicDiscoveryParticipant listener, String topic) {
|
||||||
|
Set<MQTTTopicDiscoveryParticipant> listeners = discoveryTopics.computeIfAbsent(topic,
|
||||||
|
t -> ConcurrentHashMap.newKeySet());
|
||||||
|
if (listeners.add(listener)) {
|
||||||
|
handlers.forEach(broker -> broker.registerDiscoveryListener(listener, topic));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This factory also implements {@link MQTTTopicDiscoveryService} so consumers can unsubscribe from
|
||||||
|
* a MQTT topic that is registered on all available broker connections.
|
||||||
|
*
|
||||||
|
* Checks each topic, and if the listener is in the listener list for that topic, removes itself from that list, and
|
||||||
|
* unregisters itself and the respective topic from all the known brokers.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
public void unsubscribe(MQTTTopicDiscoveryParticipant listener) {
|
public void unsubscribe(MQTTTopicDiscoveryParticipant listener) {
|
||||||
discoveryTopics.forEach((topic, listenerList) -> {
|
discoveryTopics.forEach((topic, listeners) -> {
|
||||||
listenerList.remove(listener);
|
if (listeners.remove(listener)) {
|
||||||
handlers.forEach(broker -> broker.unregisterDiscoveryListener(listener, topic));
|
handlers.forEach(broker -> broker.unregisterDiscoveryListener(listener, topic));
|
||||||
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user