Fix MQTT transport deprecations (#8570)

Signed-off-by: Wouter Born <github@maindrain.net>
This commit is contained in:
Wouter Born 2020-09-24 22:50:46 +02:00 committed by GitHub
parent 5788ad095e
commit b503dce9fd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 30 additions and 27 deletions

View File

@ -79,7 +79,6 @@ public class ChannelStateTests {
doReturn(voidFutureComplete).when(connection).unsubscribeAll(); doReturn(voidFutureComplete).when(connection).unsubscribeAll();
doReturn(CompletableFuture.completedFuture(true)).when(connection).subscribe(any(), any()); doReturn(CompletableFuture.completedFuture(true)).when(connection).subscribe(any(), any());
doReturn(CompletableFuture.completedFuture(true)).when(connection).unsubscribe(any(), any()); doReturn(CompletableFuture.completedFuture(true)).when(connection).unsubscribe(any(), any());
doReturn(CompletableFuture.completedFuture(true)).when(connection).publish(any(), any());
doReturn(CompletableFuture.completedFuture(true)).when(connection).publish(any(), any(), anyInt(), doReturn(CompletableFuture.completedFuture(true)).when(connection).publish(any(), any(), anyInt(),
anyBoolean()); anyBoolean());

View File

@ -84,7 +84,6 @@ public class GenericThingHandlerTests {
doReturn(voidFutureComplete).when(connection).unsubscribeAll(); doReturn(voidFutureComplete).when(connection).unsubscribeAll();
doReturn(CompletableFuture.completedFuture(true)).when(connection).subscribe(any(), any()); doReturn(CompletableFuture.completedFuture(true)).when(connection).subscribe(any(), any());
doReturn(CompletableFuture.completedFuture(true)).when(connection).unsubscribe(any(), any()); doReturn(CompletableFuture.completedFuture(true)).when(connection).unsubscribe(any(), any());
doReturn(CompletableFuture.completedFuture(true)).when(connection).publish(any(), any());
doReturn(CompletableFuture.completedFuture(true)).when(connection).publish(any(), any(), anyInt(), doReturn(CompletableFuture.completedFuture(true)).when(connection).publish(any(), any(), anyInt(),
anyBoolean()); anyBoolean());

View File

@ -63,9 +63,9 @@ public class MQTTActions implements ThingActions, IMQTTActions {
@Override @Override
@RuleAction(label = "@text/actionLabel", description = "@text/actionDesc") @RuleAction(label = "@text/actionLabel", description = "@text/actionDesc")
public void publishMQTT( public void publishMQTT(
@ActionInput(name = "topic", label = "@text/actionInputTopicLabel", description = "@text/actionInputTopicDesc") @Nullable String topic, @ActionInput(name = "topic", label = "@text/actionInputTopicLabel", description = "@text/actionInputTopicDesc") @Nullable final String topic,
@ActionInput(name = "value", label = "@text/actionInputValueLabel", description = "@text/actionInputValueDesc") @Nullable String value, @ActionInput(name = "value", label = "@text/actionInputValueLabel", description = "@text/actionInputValueDesc") @Nullable final String value,
@ActionInput(name = "retain", label = "@text/actionInputRetainlabel", description = "@text/actionInputRetainDesc") @Nullable Boolean retain) { @ActionInput(name = "retain", label = "@text/actionInputRetainlabel", description = "@text/actionInputRetainDesc") @Nullable final Boolean retain) {
AbstractBrokerHandler brokerHandler = handler; AbstractBrokerHandler brokerHandler = handler;
if (brokerHandler == null) { if (brokerHandler == null) {
logger.warn("MQTT Action service ThingHandler is null!"); logger.warn("MQTT Action service ThingHandler is null!");
@ -84,15 +84,14 @@ public class MQTTActions implements ThingActions, IMQTTActions {
logger.debug("skipping MQTT publishing of value '{}' as topic is null.", value); logger.debug("skipping MQTT publishing of value '{}' as topic is null.", value);
return; return;
} }
if (retain == null) {
retain = connection.isRetain(); connection.publish(topic, value.getBytes(), connection.getQos(), retain != null && retain.booleanValue())
} .thenRun(() -> {
connection.publish(topic, value.getBytes(), connection.getQos(), retain).thenRun(() -> { logger.debug("MQTT publish to {} performed", topic);
logger.debug("MQTT publish to {} performed", topic); }).exceptionally(e -> {
}).exceptionally(e -> { logger.warn("MQTT publish to {} failed!", topic);
logger.warn("MQTT publish to {} failed!", topic); return null;
return null; });
});
} }
public static void publishMQTT(@Nullable ThingActions actions, @Nullable String topic, @Nullable String value) { public static void publishMQTT(@Nullable ThingActions actions, @Nullable String topic, @Nullable String value) {

View File

@ -42,8 +42,10 @@ public interface MQTTTopicDiscoveryService {
/** /**
* Publish a message to all connected brokers * Publish a message to all connected brokers
* *
* @param topic The topic to publish on * @param topic The topic
* @param payload The message to publish * @param payload The message payload
* @param qos The quality of service for this message
* @param retain Set to true to retain the message on the broker
*/ */
void publish(String topic, byte[] payload); void publish(String topic, byte[] payload, int qos, boolean retain);
} }

View File

@ -223,8 +223,6 @@ public class BrokerHandler extends AbstractBrokerHandler implements PinnedCallba
connection.setTimeoutExecutor(scheduler, TIMEOUT_DEFAULT); connection.setTimeoutExecutor(scheduler, TIMEOUT_DEFAULT);
} }
connection.setRetain(config.retainMessages);
return connection; return connection;
} }

View File

@ -69,7 +69,6 @@ public class SystemBrokerHandler extends AbstractBrokerHandler implements MqttSe
properties.put(PROPERTY_PASSWORD, password); properties.put(PROPERTY_PASSWORD, password);
} }
properties.put(PROPERTY_QOS, String.valueOf(connection.getQos())); properties.put(PROPERTY_QOS, String.valueOf(connection.getQos()));
properties.put(PROPERTY_RETAIN, String.valueOf(connection.isRetain()));
final MqttWillAndTestament lastWill = connection.getLastWill(); final MqttWillAndTestament lastWill = connection.getLastWill();
if (lastWill != null) { if (lastWill != null) {
properties.put(PROPERTY_LAST_WILL, lastWill.toString()); properties.put(PROPERTY_LAST_WILL, lastWill.toString());

View File

@ -12,7 +12,13 @@
*/ */
package org.openhab.binding.mqtt.internal; package org.openhab.binding.mqtt.internal;
import java.util.*; import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
@ -127,10 +133,10 @@ public class MqttBrokerHandlerFactory extends BaseThingHandlerFactory implements
} }
@Override @Override
public void publish(String topic, byte[] payload) { public void publish(String topic, byte[] payload, int qos, boolean retain) {
handlers.forEach(handler -> { handlers.forEach(handler -> {
handler.getConnectionAsync().thenAccept(connection -> { handler.getConnectionAsync().thenAccept(connection -> {
connection.publish(topic, payload); connection.publish(topic, payload, qos, retain);
}); });
}); });
} }

View File

@ -42,6 +42,7 @@ import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoExtension;
import org.openhab.core.OpenHAB; import org.openhab.core.OpenHAB;
import org.openhab.core.io.transport.mqtt.MqttBrokerConnection; import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
import org.openhab.core.io.transport.mqtt.MqttBrokerConnection.MqttVersion;
import org.openhab.core.io.transport.mqtt.MqttBrokerConnection.Protocol; import org.openhab.core.io.transport.mqtt.MqttBrokerConnection.Protocol;
import org.openhab.core.io.transport.mqtt.MqttConnectionObserver; import org.openhab.core.io.transport.mqtt.MqttConnectionObserver;
import org.openhab.core.io.transport.mqtt.MqttConnectionState; import org.openhab.core.io.transport.mqtt.MqttConnectionState;
@ -115,8 +116,8 @@ public class MqttEmbeddedBrokerServiceTest extends JavaTest {
verify(service).addBrokerConnection(anyString(), eq(c)); verify(service).addBrokerConnection(anyString(), eq(c));
// Connect with a second connection but wrong credentials // Connect with a second connection but wrong credentials
MqttBrokerConnection wrongCredentials = new MqttBrokerConnection(Protocol.TCP, c.getHost(), c.getPort(), false, MqttBrokerConnection wrongCredentials = new MqttBrokerConnection(Protocol.TCP, MqttVersion.V3, c.getHost(),
"wrongCred"); c.getPort(), false, "wrongCred");
wrongCredentials.setCredentials("someUser", "somePassword"); wrongCredentials.setCredentials("someUser", "somePassword");
if (wrongCredentials.start().get()) { if (wrongCredentials.start().get()) {
@ -126,8 +127,8 @@ public class MqttEmbeddedBrokerServiceTest extends JavaTest {
wrongCredentials.stop().get(); wrongCredentials.stop().get();
// Connect with a second connection but correct credentials // Connect with a second connection but correct credentials
MqttBrokerConnection correctCredentials = new MqttBrokerConnection(Protocol.TCP, c.getHost(), c.getPort(), MqttBrokerConnection correctCredentials = new MqttBrokerConnection(Protocol.TCP, MqttVersion.V3, c.getHost(),
false, "correctCred"); c.getPort(), false, "correctCred");
correctCredentials.setCredentials(c.getUser(), c.getPassword()); correctCredentials.setCredentials(c.getUser(), c.getPassword());
if (!correctCredentials.start().get()) { if (!correctCredentials.start().get()) {