[mqtt] Only unsubscribe if we subscribed before (#9758)

Fix for 9730

Signed-off-by: Jochen Klein <git@jochen.susca.de>
This commit is contained in:
Jochen Klein 2021-01-10 16:25:54 +01:00 committed by GitHub
parent 79b94e3e34
commit b6eea715fa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -53,8 +53,9 @@ public class TopicSubscribe implements MqttMessageSubscriber {
@Override @Override
public void processMessage(String topic, byte[] payload) { public void processMessage(String topic, byte[] payload) {
final MqttBrokerConnection connection = this.connection; final MqttBrokerConnection connection = this.connection;
if (connection == null) if (connection == null) {
return; return;
}
if (payload.length > 0) { if (payload.length > 0) {
topicDiscoveredListener.receivedMessage(thing, connection, topic, payload); topicDiscoveredListener.receivedMessage(thing, connection, topic, payload);
} else { } else {
@ -80,7 +81,8 @@ public class TopicSubscribe implements MqttMessageSubscriber {
* @return Completes with true if successful. Exceptionally otherwise. * @return Completes with true if successful. Exceptionally otherwise.
*/ */
public CompletableFuture<Boolean> stop() { public CompletableFuture<Boolean> stop() {
CompletableFuture<Boolean> stopFuture = connection == null ? CompletableFuture.completedFuture(true) CompletableFuture<Boolean> stopFuture = connection == null || !isStarted
? CompletableFuture.completedFuture(true)
: connection.unsubscribe(topic, this); : connection.unsubscribe(topic, this);
isStarted = false; isStarted = false;
return stopFuture; return stopFuture;