diff --git a/bundles/org.openhab.binding.mqtt.generic/src/test/java/org/openhab/binding/mqtt/generic/ChannelStateTests.java b/bundles/org.openhab.binding.mqtt.generic/src/test/java/org/openhab/binding/mqtt/generic/ChannelStateTests.java index a4c4e46d4..3d0d9876e 100644 --- a/bundles/org.openhab.binding.mqtt.generic/src/test/java/org/openhab/binding/mqtt/generic/ChannelStateTests.java +++ b/bundles/org.openhab.binding.mqtt.generic/src/test/java/org/openhab/binding/mqtt/generic/ChannelStateTests.java @@ -79,7 +79,6 @@ public class ChannelStateTests { doReturn(voidFutureComplete).when(connection).unsubscribeAll(); doReturn(CompletableFuture.completedFuture(true)).when(connection).subscribe(any(), any()); doReturn(CompletableFuture.completedFuture(true)).when(connection).unsubscribe(any(), any()); - doReturn(CompletableFuture.completedFuture(true)).when(connection).publish(any(), any()); doReturn(CompletableFuture.completedFuture(true)).when(connection).publish(any(), any(), anyInt(), anyBoolean()); diff --git a/bundles/org.openhab.binding.mqtt.generic/src/test/java/org/openhab/binding/mqtt/generic/internal/handler/GenericThingHandlerTests.java b/bundles/org.openhab.binding.mqtt.generic/src/test/java/org/openhab/binding/mqtt/generic/internal/handler/GenericThingHandlerTests.java index 18a194890..25571e961 100644 --- a/bundles/org.openhab.binding.mqtt.generic/src/test/java/org/openhab/binding/mqtt/generic/internal/handler/GenericThingHandlerTests.java +++ b/bundles/org.openhab.binding.mqtt.generic/src/test/java/org/openhab/binding/mqtt/generic/internal/handler/GenericThingHandlerTests.java @@ -84,7 +84,6 @@ public class GenericThingHandlerTests { doReturn(voidFutureComplete).when(connection).unsubscribeAll(); doReturn(CompletableFuture.completedFuture(true)).when(connection).subscribe(any(), any()); doReturn(CompletableFuture.completedFuture(true)).when(connection).unsubscribe(any(), any()); - doReturn(CompletableFuture.completedFuture(true)).when(connection).publish(any(), any()); doReturn(CompletableFuture.completedFuture(true)).when(connection).publish(any(), any(), anyInt(), anyBoolean()); diff --git a/bundles/org.openhab.binding.mqtt/src/main/java/org/openhab/binding/mqtt/action/MQTTActions.java b/bundles/org.openhab.binding.mqtt/src/main/java/org/openhab/binding/mqtt/action/MQTTActions.java index 93d89bd71..339236a72 100644 --- a/bundles/org.openhab.binding.mqtt/src/main/java/org/openhab/binding/mqtt/action/MQTTActions.java +++ b/bundles/org.openhab.binding.mqtt/src/main/java/org/openhab/binding/mqtt/action/MQTTActions.java @@ -63,9 +63,9 @@ public class MQTTActions implements ThingActions, IMQTTActions { @Override @RuleAction(label = "@text/actionLabel", description = "@text/actionDesc") public void publishMQTT( - @ActionInput(name = "topic", label = "@text/actionInputTopicLabel", description = "@text/actionInputTopicDesc") @Nullable String topic, - @ActionInput(name = "value", label = "@text/actionInputValueLabel", description = "@text/actionInputValueDesc") @Nullable String value, - @ActionInput(name = "retain", label = "@text/actionInputRetainlabel", description = "@text/actionInputRetainDesc") @Nullable Boolean retain) { + @ActionInput(name = "topic", label = "@text/actionInputTopicLabel", description = "@text/actionInputTopicDesc") @Nullable final String topic, + @ActionInput(name = "value", label = "@text/actionInputValueLabel", description = "@text/actionInputValueDesc") @Nullable final String value, + @ActionInput(name = "retain", label = "@text/actionInputRetainlabel", description = "@text/actionInputRetainDesc") @Nullable final Boolean retain) { AbstractBrokerHandler brokerHandler = handler; if (brokerHandler == null) { logger.warn("MQTT Action service ThingHandler is null!"); @@ -84,15 +84,14 @@ public class MQTTActions implements ThingActions, IMQTTActions { logger.debug("skipping MQTT publishing of value '{}' as topic is null.", value); return; } - if (retain == null) { - retain = connection.isRetain(); - } - connection.publish(topic, value.getBytes(), connection.getQos(), retain).thenRun(() -> { - logger.debug("MQTT publish to {} performed", topic); - }).exceptionally(e -> { - logger.warn("MQTT publish to {} failed!", topic); - return null; - }); + + connection.publish(topic, value.getBytes(), connection.getQos(), retain != null && retain.booleanValue()) + .thenRun(() -> { + logger.debug("MQTT publish to {} performed", topic); + }).exceptionally(e -> { + logger.warn("MQTT publish to {} failed!", topic); + return null; + }); } public static void publishMQTT(@Nullable ThingActions actions, @Nullable String topic, @Nullable String value) { diff --git a/bundles/org.openhab.binding.mqtt/src/main/java/org/openhab/binding/mqtt/discovery/MQTTTopicDiscoveryService.java b/bundles/org.openhab.binding.mqtt/src/main/java/org/openhab/binding/mqtt/discovery/MQTTTopicDiscoveryService.java index a6843f467..a2361f100 100644 --- a/bundles/org.openhab.binding.mqtt/src/main/java/org/openhab/binding/mqtt/discovery/MQTTTopicDiscoveryService.java +++ b/bundles/org.openhab.binding.mqtt/src/main/java/org/openhab/binding/mqtt/discovery/MQTTTopicDiscoveryService.java @@ -42,8 +42,10 @@ public interface MQTTTopicDiscoveryService { /** * Publish a message to all connected brokers * - * @param topic The topic to publish on - * @param payload The message to publish + * @param topic The topic + * @param payload The message payload + * @param qos The quality of service for this message + * @param retain Set to true to retain the message on the broker */ - void publish(String topic, byte[] payload); + void publish(String topic, byte[] payload, int qos, boolean retain); } diff --git a/bundles/org.openhab.binding.mqtt/src/main/java/org/openhab/binding/mqtt/handler/BrokerHandler.java b/bundles/org.openhab.binding.mqtt/src/main/java/org/openhab/binding/mqtt/handler/BrokerHandler.java index 3e9f7f61a..e186fda76 100644 --- a/bundles/org.openhab.binding.mqtt/src/main/java/org/openhab/binding/mqtt/handler/BrokerHandler.java +++ b/bundles/org.openhab.binding.mqtt/src/main/java/org/openhab/binding/mqtt/handler/BrokerHandler.java @@ -223,8 +223,6 @@ public class BrokerHandler extends AbstractBrokerHandler implements PinnedCallba connection.setTimeoutExecutor(scheduler, TIMEOUT_DEFAULT); } - connection.setRetain(config.retainMessages); - return connection; } diff --git a/bundles/org.openhab.binding.mqtt/src/main/java/org/openhab/binding/mqtt/handler/SystemBrokerHandler.java b/bundles/org.openhab.binding.mqtt/src/main/java/org/openhab/binding/mqtt/handler/SystemBrokerHandler.java index b70dd7738..65a892ab9 100644 --- a/bundles/org.openhab.binding.mqtt/src/main/java/org/openhab/binding/mqtt/handler/SystemBrokerHandler.java +++ b/bundles/org.openhab.binding.mqtt/src/main/java/org/openhab/binding/mqtt/handler/SystemBrokerHandler.java @@ -69,7 +69,6 @@ public class SystemBrokerHandler extends AbstractBrokerHandler implements MqttSe properties.put(PROPERTY_PASSWORD, password); } properties.put(PROPERTY_QOS, String.valueOf(connection.getQos())); - properties.put(PROPERTY_RETAIN, String.valueOf(connection.isRetain())); final MqttWillAndTestament lastWill = connection.getLastWill(); if (lastWill != null) { properties.put(PROPERTY_LAST_WILL, lastWill.toString()); diff --git a/bundles/org.openhab.binding.mqtt/src/main/java/org/openhab/binding/mqtt/internal/MqttBrokerHandlerFactory.java b/bundles/org.openhab.binding.mqtt/src/main/java/org/openhab/binding/mqtt/internal/MqttBrokerHandlerFactory.java index 1f98ed65d..804783eb2 100644 --- a/bundles/org.openhab.binding.mqtt/src/main/java/org/openhab/binding/mqtt/internal/MqttBrokerHandlerFactory.java +++ b/bundles/org.openhab.binding.mqtt/src/main/java/org/openhab/binding/mqtt/internal/MqttBrokerHandlerFactory.java @@ -12,7 +12,13 @@ */ package org.openhab.binding.mqtt.internal; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.WeakHashMap; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -127,10 +133,10 @@ public class MqttBrokerHandlerFactory extends BaseThingHandlerFactory implements } @Override - public void publish(String topic, byte[] payload) { + public void publish(String topic, byte[] payload, int qos, boolean retain) { handlers.forEach(handler -> { handler.getConnectionAsync().thenAccept(connection -> { - connection.publish(topic, payload); + connection.publish(topic, payload, qos, retain); }); }); } diff --git a/bundles/org.openhab.io.mqttembeddedbroker/src/test/java/org/openhab/io/mqttembeddedbroker/internal/MqttEmbeddedBrokerServiceTest.java b/bundles/org.openhab.io.mqttembeddedbroker/src/test/java/org/openhab/io/mqttembeddedbroker/internal/MqttEmbeddedBrokerServiceTest.java index 92eb4f408..19ee819bc 100644 --- a/bundles/org.openhab.io.mqttembeddedbroker/src/test/java/org/openhab/io/mqttembeddedbroker/internal/MqttEmbeddedBrokerServiceTest.java +++ b/bundles/org.openhab.io.mqttembeddedbroker/src/test/java/org/openhab/io/mqttembeddedbroker/internal/MqttEmbeddedBrokerServiceTest.java @@ -42,6 +42,7 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.openhab.core.OpenHAB; import org.openhab.core.io.transport.mqtt.MqttBrokerConnection; +import org.openhab.core.io.transport.mqtt.MqttBrokerConnection.MqttVersion; import org.openhab.core.io.transport.mqtt.MqttBrokerConnection.Protocol; import org.openhab.core.io.transport.mqtt.MqttConnectionObserver; import org.openhab.core.io.transport.mqtt.MqttConnectionState; @@ -115,8 +116,8 @@ public class MqttEmbeddedBrokerServiceTest extends JavaTest { verify(service).addBrokerConnection(anyString(), eq(c)); // Connect with a second connection but wrong credentials - MqttBrokerConnection wrongCredentials = new MqttBrokerConnection(Protocol.TCP, c.getHost(), c.getPort(), false, - "wrongCred"); + MqttBrokerConnection wrongCredentials = new MqttBrokerConnection(Protocol.TCP, MqttVersion.V3, c.getHost(), + c.getPort(), false, "wrongCred"); wrongCredentials.setCredentials("someUser", "somePassword"); if (wrongCredentials.start().get()) { @@ -126,8 +127,8 @@ public class MqttEmbeddedBrokerServiceTest extends JavaTest { wrongCredentials.stop().get(); // Connect with a second connection but correct credentials - MqttBrokerConnection correctCredentials = new MqttBrokerConnection(Protocol.TCP, c.getHost(), c.getPort(), - false, "correctCred"); + MqttBrokerConnection correctCredentials = new MqttBrokerConnection(Protocol.TCP, MqttVersion.V3, c.getHost(), + c.getPort(), false, "correctCred"); correctCredentials.setCredentials(c.getUser(), c.getPassword()); if (!correctCredentials.start().get()) {