added migrated 2.x add-ons

Signed-off-by: Kai Kreuzer <kai@openhab.org>
This commit is contained in:
Kai Kreuzer
2020-09-21 01:58:32 +02:00
parent bbf1a7fd29
commit 6df6783b60
11662 changed files with 1302875 additions and 11 deletions

View File

@@ -0,0 +1,38 @@
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry kind="src" output="target/classes" path="src/main/java">
<attributes>
<attribute name="optional" value="true"/>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry excluding="**" kind="src" output="target/classes" path="src/main/resources">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="src" output="target/test-classes" path="src/test/java">
<attributes>
<attribute name="optional" value="true"/>
<attribute name="maven.pomderived" value="true"/>
<attribute name="test" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-11">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="con" path="org.eclipse.m2e.MAVEN2_CLASSPATH_CONTAINER">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry excluding="**" kind="src" output="target/test-classes" path="src/test/resources">
<attributes>
<attribute name="maven.pomderived" value="true"/>
<attribute name="test" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="output" path="target/classes"/>
</classpath>

View File

@@ -0,0 +1,23 @@
<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
<name>org.openhab.binding.mqtt</name>
<comment></comment>
<projects>
</projects>
<buildSpec>
<buildCommand>
<name>org.eclipse.jdt.core.javabuilder</name>
<arguments>
</arguments>
</buildCommand>
<buildCommand>
<name>org.eclipse.m2e.core.maven2Builder</name>
<arguments>
</arguments>
</buildCommand>
</buildSpec>
<natures>
<nature>org.eclipse.jdt.core.javanature</nature>
<nature>org.eclipse.m2e.core.maven2Nature</nature>
</natures>
</projectDescription>

View File

@@ -0,0 +1,13 @@
This content is produced and maintained by the openHAB project.
* Project home: https://www.openhab.org
== Declared Project Licenses
This program and the accompanying materials are made available under the terms
of the Eclipse Public License 2.0 which is available at
https://www.eclipse.org/legal/epl-2.0/.
== Source Code
https://github.com/openhab/openhab-addons

View File

@@ -0,0 +1,76 @@
# MQTT Binding
> MQTT is a machine-to-machine (M2M)/"Internet of Things" connectivity protocol.
> It was designed as an extremely lightweight publish/subscribe messaging transport.
MQTT is a server/client architecture.
A server, also called broker is not provided within this binding.
You can use any of the freely available MQTT Brokers like [Mosquitto](https://mosquitto.org/)
or [Moquette](https://moquette-io.github.io/moquette/) or install the [included Moquette broker](https://www.openhab.org/addons/integrations/mqttembeddedbroker/) as add-on.
This particular binding allows to configure connections to brokers via openHAB Things.
This binding does NOT allow you to link Channels to MQTT topics or perform auto-discovery of available
MQTT topics. Please check out the available extensions:
<!--list-subs-->
## Supported Bridges
* Broker: This bridge represents an MQTT Broker connection, configured and managed by this binding.
* SystemBroker: A system configured broker cannot be changed by this binding and will be listed as read-only system-broker.
## Bridge Configuration
Required configuration parameters are:
* __host__: The IP/Hostname of the MQTT broker. Be aware that this binding allows only one bridge / one connection per unique host:port.
* __port__: The optional port of the MQTT broker. If none is provided, the typical ports 1883 and 8883 (SSL) are used. Be aware that this binding allows only one bridge / one connection per unique host:port.
* __secure__: Uses TLS/SSL to establish a secure connection to the broker. Can be true or false. Defaults to false.
Additionally the following parameters can be set:
* __qos__: Quality of Service. Can be 0, 1 or 2. Please read the MQTT specification for details. Defaults to 0.
* __clientID__: Use a fixed client ID. Defaults to empty which means a user ID is generated for this connection.
Reconnect parameters are:
* __reconnectTime__: Reconnect time in ms. If a connection is lost, the binding will wait this time before it tries to reconnect. Defaults to 60000 (60s).
* __keepAlive__: Keep alive / heartbeat timer in s. It can take up to this time to determine if a server connection is lost. A lower value may keep the broker unnecessarily busy for no or little additional value. Defaults to 60s.
An MQTT last will and testament can be configured:
* __lwtMessage__: An optional last will and testament message. Defaults to empty.
* __lwtTopic__: The last will topic. Defaults to empty and therefore disables the last will.
* __lwtQos__: The optional qos of the last will. Defaults to 0.
* __lwtRetain__: Retain last will message. Defaults to false.
For more security, the following optional parameters can be altered:
* __username__: The MQTT username (since MQTT 3.1). Defaults to empty.
* __password__: The MQTT password (since MQTT 3.1). Defaults to empty.
* __certificatepin__: If this is set: After the next connection has been successfully established, the certificate is pinned. The connection will be refused if another certificate is used. Clear **certificate** to allow a new certificate for the next connection attempt. This option will increase security.
* __publickeypin__: If this is set: After the next connection has been successfully established, the public key of the broker is pinned. The connection will be refused if another public key is used. Clear **publickey** to allow a new public key for the next connection attempt. This option will increase security.
* __certificate__: The certificate hash. If **certificatepin** is set this hash is used to verify the connection. Clear to allow a new certificate pinning on the next connection attempt. If empty will be filled automatically by the next successful connection. An example input would be `SHA-256:83F9171E06A313118889F7D79302BD1B7A2042EE0CFD029ABF8DD06FFA6CD9D3`.
* __publickey__: The public key hash. If **publickeypin** is set this hash is used to verify the connection. Clear to allow a new public key pinning on the next connection attempt. If empty will be filled automatically by the next successful connection. An example input would be `SHA-256:83F9171E06A313118889F7D79302BD1B7A2042EE0CFD029ABF8DD06FFA6CD9D3`.
By default discovery services (like homie or homeassistant) are enabled on a broker.
This behaviour can be controlled with a configuration parameter.
* __enableDiscovery__:If set to true, enables discovery on this broker, if set to false, disables discovery services on this broker.
## Supported Channels
You can extend your broker connection bridges with a channel:
* __publishTrigger__: This channel is triggered when a value is published to the configured MQTT topic on this broker connection. The event payload (in `receivedEvent`) will be the received MQTT topic and its value, separated by the hash character (`#`).
Configuration parameters are:
* __stateTopic__: This channel will trigger on this MQTT topic. This topic can contain wildcards like + and # for example "all/in/#" or "sensors/+/config".
* __payload__: An optional condition on the value of the MQTT topic that must match before this channel is triggered.
## Legacy MQTT1-Binding
This binding is not supposed to run in parallel to the old mqtt1-binding.
Please uninstall the old binding before installing this binding.

View File

@@ -0,0 +1,17 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.openhab.addons.bundles</groupId>
<artifactId>org.openhab.addons.reactor.bundles</artifactId>
<version>3.0.0-SNAPSHOT</version>
</parent>
<artifactId>org.openhab.binding.mqtt</artifactId>
<name>openHAB Add-ons :: Bundles :: MQTT Broker Binding</name>
</project>

View File

@@ -0,0 +1,11 @@
<?xml version="1.0" encoding="UTF-8"?>
<features name="org.openhab.binding.mqtt-${project.version}" xmlns="http://karaf.apache.org/xmlns/features/v1.4.0">
<repository>mvn:org.openhab.core.features.karaf/org.openhab.core.features.karaf.openhab-core/${ohc.version}/xml/features</repository>
<feature name="openhab-binding-mqtt" description="MQTT Binding" version="${project.version}">
<feature>openhab-runtime-base</feature>
<feature>openhab-transport-mqtt</feature>
<bundle start-level="80">mvn:org.openhab.addons.bundles/org.openhab.binding.mqtt/${project.version}</bundle>
</feature>
</features>

View File

@@ -0,0 +1,33 @@
/**
* Copyright (c) 2010-2020 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.binding.mqtt;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.openhab.core.thing.ThingTypeUID;
/**
* The {@link MqttBindingConstants} class defines common constants, which are
* used across the whole binding.
*
* @author David Graeff - Initial contribution
*/
@NonNullByDefault
public class MqttBindingConstants {
private static final String BINDING_ID = "mqtt";
// List of all Thing Type UIDs
public static final ThingTypeUID BRIDGE_TYPE_SYSTEMBROKER = new ThingTypeUID(BINDING_ID, "systemBroker");
public static final ThingTypeUID BRIDGE_TYPE_BROKER = new ThingTypeUID(BINDING_ID, "broker");
public static final String PUBLISH_TRIGGER_CHANNEL = "publishTrigger";
}

View File

@@ -0,0 +1,27 @@
/**
* Copyright (c) 2010-2020 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.binding.mqtt.action;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
/**
* The {@link IMQTTActions} defines the interface for all thing actions supported by the binding.
*
* @author Laurent Garnier - Initial contribution
*/
@NonNullByDefault
public interface IMQTTActions {
public void publishMQTT(@Nullable String topic, @Nullable String value, @Nullable Boolean retain);
}

View File

@@ -0,0 +1,125 @@
/**
* Copyright (c) 2010-2020 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.binding.mqtt.action;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.openhab.binding.mqtt.handler.AbstractBrokerHandler;
import org.openhab.core.automation.annotation.ActionInput;
import org.openhab.core.automation.annotation.RuleAction;
import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
import org.openhab.core.thing.binding.ThingActions;
import org.openhab.core.thing.binding.ThingActionsScope;
import org.openhab.core.thing.binding.ThingHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This is the automation engine action handler service for the publishMQTT action.
* <p>
* <b>Note:</b>The static method <b>invokeMethodOf</b> handles the case where
* the test <i>actions instanceof MQTTActions</i> fails. This test can fail
* due to an issue in openHAB core v2.5.0 where the {@link MQTTActions} class
* can be loaded by a different classloader than the <i>actions</i> instance.
*
* @author David Graeff - Initial contribution
*/
@ThingActionsScope(name = "mqtt")
@NonNullByDefault
public class MQTTActions implements ThingActions, IMQTTActions {
private final Logger logger = LoggerFactory.getLogger(MQTTActions.class);
private @Nullable AbstractBrokerHandler handler;
@Override
public void setThingHandler(@Nullable ThingHandler handler) {
this.handler = (AbstractBrokerHandler) handler;
}
@Override
public @Nullable ThingHandler getThingHandler() {
return this.handler;
}
@RuleAction(label = "@text/actionLabel", description = "@text/actionDesc")
public void publishMQTT(
@ActionInput(name = "topic", label = "@text/actionInputTopicLabel", description = "@text/actionInputTopicDesc") @Nullable String topic,
@ActionInput(name = "value", label = "@text/actionInputValueLabel", description = "@text/actionInputValueDesc") @Nullable String value) {
publishMQTT(topic, value, null);
}
@Override
@RuleAction(label = "@text/actionLabel", description = "@text/actionDesc")
public void publishMQTT(
@ActionInput(name = "topic", label = "@text/actionInputTopicLabel", description = "@text/actionInputTopicDesc") @Nullable String topic,
@ActionInput(name = "value", label = "@text/actionInputValueLabel", description = "@text/actionInputValueDesc") @Nullable String value,
@ActionInput(name = "retain", label = "@text/actionInputRetainlabel", description = "@text/actionInputRetainDesc") @Nullable Boolean retain) {
AbstractBrokerHandler brokerHandler = handler;
if (brokerHandler == null) {
logger.warn("MQTT Action service ThingHandler is null!");
return;
}
MqttBrokerConnection connection = brokerHandler.getConnection();
if (connection == null) {
logger.warn("MQTT Action service ThingHandler connection is null!");
return;
}
if (value == null) {
logger.debug("skipping MQTT publishing to topic '{}' due to null value.", topic);
return;
}
if (topic == null) {
logger.debug("skipping MQTT publishing of value '{}' as topic is null.", value);
return;
}
if (retain == null) {
retain = connection.isRetain();
}
connection.publish(topic, value.getBytes(), connection.getQos(), retain).thenRun(() -> {
logger.debug("MQTT publish to {} performed", topic);
}).exceptionally(e -> {
logger.warn("MQTT publish to {} failed!", topic);
return null;
});
}
public static void publishMQTT(@Nullable ThingActions actions, @Nullable String topic, @Nullable String value) {
publishMQTT(actions, topic, value, null);
}
public static void publishMQTT(@Nullable ThingActions actions, @Nullable String topic, @Nullable String value,
@Nullable Boolean retain) {
invokeMethodOf(actions).publishMQTT(topic, value, retain);
}
private static IMQTTActions invokeMethodOf(@Nullable ThingActions actions) {
if (actions == null) {
throw new IllegalArgumentException("actions cannot be null");
}
if (actions.getClass().getName().equals(MQTTActions.class.getName())) {
if (actions instanceof IMQTTActions) {
return (IMQTTActions) actions;
} else {
return (IMQTTActions) Proxy.newProxyInstance(IMQTTActions.class.getClassLoader(),
new Class[] { IMQTTActions.class }, (Object proxy, Method method, Object[] args) -> {
Method m = actions.getClass().getDeclaredMethod(method.getName(),
method.getParameterTypes());
return m.invoke(actions, args);
});
}
}
throw new IllegalArgumentException("Actions is not an instance of MQTTActions");
}
}

View File

@@ -0,0 +1,128 @@
/**
* Copyright (c) 2010-2020 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.binding.mqtt.discovery;
import java.util.Date;
import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.openhab.core.config.discovery.AbstractDiscoveryService;
import org.openhab.core.thing.ThingTypeUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Base MQTT discovery class. Responsible for connecting to the {@link MQTTTopicDiscoveryService}.
*
* Implement MQTT discovery services on top of this. You still need to reference
* the MQTTTopicDiscoveryService like in:
*
* <pre>
* &#64;NonNullByDefault({})
* &#64;Reference
* protected MQTTTopicDiscoveryService mqttTopicDiscovery;
* </pre>
*
* @author David Graeff - Initial contribution
*/
@NonNullByDefault
public abstract class AbstractMQTTDiscovery extends AbstractDiscoveryService implements MQTTTopicDiscoveryParticipant {
private final Logger logger = LoggerFactory.getLogger(AbstractMQTTDiscovery.class);
protected final String subscribeTopic;
private int timeout;
private @Nullable ScheduledFuture<?> scheduledStop;
public AbstractMQTTDiscovery(@Nullable Set<ThingTypeUID> supportedThingTypes, int timeout,
boolean backgroundDiscoveryEnabledByDefault, String baseTopic) {
super(supportedThingTypes, 0, backgroundDiscoveryEnabledByDefault);
this.subscribeTopic = baseTopic;
this.timeout = timeout;
}
/**
* Return the topic discovery service.
*/
protected abstract MQTTTopicDiscoveryService getDiscoveryService();
private synchronized void stopTimeout() {
if (scheduledStop != null) {
scheduledStop.cancel(false);
scheduledStop = null;
}
}
protected synchronized void resetTimeout() {
stopTimeout();
// schedule an automatic call of stopScan when timeout is reached
if (timeout > 0) {
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
stopScan();
} catch (Exception e) {
logger.debug("Exception occurred during execution: {}", e.getMessage(), e);
}
}
};
scheduledStop = scheduler.schedule(runnable, timeout, TimeUnit.SECONDS);
}
}
@Override
protected void startScan() {
if (isBackgroundDiscoveryEnabled()) {
super.stopScan();
return;
}
resetTimeout();
getDiscoveryService().subscribe(this, subscribeTopic);
}
@Override
protected synchronized void stopScan() {
if (isBackgroundDiscoveryEnabled()) {
super.stopScan();
return;
}
stopTimeout();
getDiscoveryService().unsubscribe(this);
super.stopScan();
}
@Override
public synchronized void abortScan() {
stopTimeout();
super.abortScan();
}
@Override
protected void startBackgroundDiscovery() {
// Remove results that are restored after a restart
removeOlderResults(new Date().getTime());
getDiscoveryService().subscribe(this, subscribeTopic);
}
@Override
protected void stopBackgroundDiscovery() {
getDiscoveryService().unsubscribe(this);
}
}

View File

@@ -0,0 +1,44 @@
/**
* Copyright (c) 2010-2020 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.binding.mqtt.discovery;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
import org.openhab.core.thing.ThingUID;
/**
* Implement this interface to get notified of received values and vanished topics.
*
* @author David Graeff - Initial contribution
*/
@NonNullByDefault
public interface MQTTTopicDiscoveryParticipant {
/**
* Called whenever a message on the subscribed topic got published or a retained message was received.
*
* @param thingUID The MQTT thing UID of the Thing that established/created the given broker connection.
* @param connection The broker connection
* @param topic The topic
* @param payload The topic payload
*/
void receivedMessage(ThingUID thingUID, MqttBrokerConnection connection, String topic, byte[] payload);
/**
* A MQTT topic vanished.
*
* @param thingUID The MQTT thing UID of the Thing that established/created the given broker connection.
* @param connection The broker connection
* @param topic The topic
*/
void topicVanished(ThingUID thingUID, MqttBrokerConnection connection, String topic);
}

View File

@@ -0,0 +1,49 @@
/**
* Copyright (c) 2010-2020 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.binding.mqtt.discovery;
import org.eclipse.jdt.annotation.NonNullByDefault;
/**
*
* The {@link MQTTTopicDiscoveryService} service is responsible for subscribing to a topic on
* all currently available broker connections as well as later on appearing broker connections.
*
* @author David Graeff - Initial contribution
*/
@NonNullByDefault
public interface MQTTTopicDiscoveryService {
/**
* Subscribe to the given topic and get notified of messages on that topic via the listener.
* Subscribing happens on a best-effort strategy. Any errors on any connections are suppressed.
*
* @param listener A listener. Need to be a strong reference.
* @param topic The topic. Can contain wildcards.
*/
void subscribe(MQTTTopicDiscoveryParticipant listener, String topic);
/**
* Unsubscribe the given listener.
*
* @param listener A listener that has subscribed before.
*/
void unsubscribe(MQTTTopicDiscoveryParticipant listener);
/**
* Publish a message to all connected brokers
*
* @param topic The topic to publish on
* @param payload The message to publish
*/
void publish(String topic, byte[] payload);
}

View File

@@ -0,0 +1,97 @@
/**
* Copyright (c) 2010-2020 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.binding.mqtt.discovery;
import java.util.concurrent.CompletableFuture;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
import org.openhab.core.io.transport.mqtt.MqttMessageSubscriber;
import org.openhab.core.thing.ThingUID;
/**
* Represents a MQTT subscription for one specific topic. This is an immutable class.
*
* @author David Graeff - Initial contribution
*/
@NonNullByDefault
public class TopicSubscribe implements MqttMessageSubscriber {
final @Nullable MqttBrokerConnection connection;
final ThingUID thing;
final String topic;
final MQTTTopicDiscoveryParticipant topicDiscoveredListener;
private boolean isStarted = false;
/**
* Creates a {@link TopicSubscribe} object.
*
* @param connection The broker connection
* @param topic The topic
* @param topicDiscoveredListener A listener
* @param thing A thing, used as an argument to the listener callback.
*/
public TopicSubscribe(@Nullable MqttBrokerConnection connection, String topic,
MQTTTopicDiscoveryParticipant topicDiscoveredListener, ThingUID thing) {
this.connection = connection;
this.thing = thing;
this.topic = topic;
this.topicDiscoveredListener = topicDiscoveredListener;
}
@Override
public void processMessage(String topic, byte[] payload) {
final MqttBrokerConnection connection = this.connection;
if (connection == null)
return;
if (payload.length > 0) {
topicDiscoveredListener.receivedMessage(thing, connection, topic, payload);
} else {
topicDiscoveredListener.topicVanished(thing, connection, topic);
}
}
/**
* Subscribe to the topic
*
* @return Completes with true if successful. Completes with false if not connected yet. Exceptionally otherwise.
*/
public CompletableFuture<Boolean> start() {
CompletableFuture<Boolean> startFuture = connection == null ? CompletableFuture.completedFuture(true)
: connection.subscribe(topic, this);
isStarted = true;
return startFuture;
}
/**
* Unsubscribes from the topic
*
* @return Completes with true if successful. Exceptionally otherwise.
*/
public CompletableFuture<Boolean> stop() {
CompletableFuture<Boolean> stopFuture = connection == null ? CompletableFuture.completedFuture(true)
: connection.unsubscribe(topic, this);
isStarted = false;
return stopFuture;
}
/**
* status of this topic subscription
*
* @return true if started
*/
public boolean isStarted() {
return isStarted;
}
}

View File

@@ -0,0 +1,259 @@
/**
* Copyright (c) 2010-2020 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.binding.mqtt.handler;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.openhab.binding.mqtt.action.MQTTActions;
import org.openhab.binding.mqtt.discovery.MQTTTopicDiscoveryParticipant;
import org.openhab.binding.mqtt.discovery.TopicSubscribe;
import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
import org.openhab.core.io.transport.mqtt.MqttConnectionObserver;
import org.openhab.core.io.transport.mqtt.MqttConnectionState;
import org.openhab.core.io.transport.mqtt.MqttService;
import org.openhab.core.thing.*;
import org.openhab.core.thing.binding.BaseBridgeHandler;
import org.openhab.core.thing.binding.ThingHandlerService;
import org.openhab.core.types.Command;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This base implementation handles connection changes of the {@link MqttBrokerConnection}
* and puts the Thing on or offline. It also handles adding/removing notifications of the
* {@link MqttService} and provides a basic dispose() implementation.
*
* @author David Graeff - Initial contribution
*/
@NonNullByDefault
public abstract class AbstractBrokerHandler extends BaseBridgeHandler implements MqttConnectionObserver {
public static final int TIMEOUT_DEFAULT = 1200; /* timeout in milliseconds */
private final Logger logger = LoggerFactory.getLogger(AbstractBrokerHandler.class);
final Map<ChannelUID, PublishTriggerChannel> channelStateByChannelUID = new HashMap<>();
private final Map<String, @Nullable Map<MQTTTopicDiscoveryParticipant, @Nullable TopicSubscribe>> discoveryTopics = new HashMap<>();
protected @Nullable MqttBrokerConnection connection;
protected CompletableFuture<MqttBrokerConnection> connectionFuture = new CompletableFuture<>();
public AbstractBrokerHandler(Bridge thing) {
super(thing);
}
@Override
public Collection<Class<? extends ThingHandlerService>> getServices() {
return Collections.singleton(MQTTActions.class);
}
/**
* Returns the underlying {@link MqttBrokerConnection} either immediately or after {@link #initialize()} has
* performed.
*/
public CompletableFuture<MqttBrokerConnection> getConnectionAsync() {
return connectionFuture;
}
/**
* Returns the underlying {@link MqttBrokerConnection}.
*/
public @Nullable MqttBrokerConnection getConnection() {
return connection;
}
/**
* Does nothing in the base implementation.
*/
@Override
public void handleCommand(ChannelUID channelUID, Command command) {
// No commands to handle
}
/**
* Registers a connection status listener and attempts a connection if there is none so far.
*/
@Override
public void initialize() {
final MqttBrokerConnection connection = this.connection;
if (connection == null) {
logger.warn("Trying to initialize {} but connection is null. This is most likely a bug.", thing.getUID());
return;
}
for (Channel channel : thing.getChannels()) {
final PublishTriggerChannelConfig channelConfig = channel.getConfiguration()
.as(PublishTriggerChannelConfig.class);
PublishTriggerChannel c = new PublishTriggerChannel(channelConfig, channel.getUID(), connection, this);
channelStateByChannelUID.put(channel.getUID(), c);
}
connection.addConnectionObserver(this);
connection.start().exceptionally(e -> {
connectionStateChanged(MqttConnectionState.DISCONNECTED, e);
return false;
}).thenAccept(v -> {
if (!v) {
connectionStateChanged(MqttConnectionState.DISCONNECTED, new TimeoutException("Timeout"));
} else {
connectionStateChanged(MqttConnectionState.CONNECTED, null);
}
});
connectionFuture.complete(connection);
discoveryTopics.forEach((topic, listenerMap) -> {
listenerMap.replaceAll((listener, oldTopicSubscribe) -> {
if (oldTopicSubscribe.isStarted()) {
oldTopicSubscribe.stop();
}
TopicSubscribe topicSubscribe = new TopicSubscribe(connection, topic, listener, thing.getUID());
if (discoveryEnabled()) {
topicSubscribe.start().handle((result, ex) -> {
if (ex != null) {
logger.warn("Failed to subscribe {} to discovery topic {} on broker {}", listener, topic,
thing.getUID());
} else {
logger.trace("Subscribed {} to discovery topic {} on broker {}", listener, topic,
thing.getUID());
}
return null;
});
}
return topicSubscribe;
});
});
}
@Override
public void connectionStateChanged(MqttConnectionState state, @Nullable Throwable error) {
if (state == MqttConnectionState.CONNECTED) {
updateStatus(ThingStatus.ONLINE);
channelStateByChannelUID.values().forEach(PublishTriggerChannel::start);
} else {
channelStateByChannelUID.values().forEach(PublishTriggerChannel::stop);
if (error == null) {
updateStatus(ThingStatus.OFFLINE);
} else {
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, error.getMessage());
}
}
}
@Override
protected void triggerChannel(ChannelUID channelUID, String event) {
super.triggerChannel(channelUID, event);
}
/**
* Removes listeners to the {@link MqttBrokerConnection}.
*/
@Override
public void dispose() {
channelStateByChannelUID.values().forEach(PublishTriggerChannel::stop);
channelStateByChannelUID.clear();
// keep topics, but stop subscriptions
discoveryTopics.forEach((topic, listenerMap) -> {
listenerMap.forEach((listener, topicSubscribe) -> {
topicSubscribe.stop();
});
});
if (connection != null) {
connection.removeConnectionObserver(this);
} else {
logger.warn("Trying to dispose handler {} but connection is already null. Most likely this is a bug.",
thing.getUID());
}
this.connection = null;
connectionFuture = new CompletableFuture<>();
super.dispose();
}
/**
* register a discovery listener to a specified topic on this broker (used by the handler factory)
*
* @param listener the discovery participant that wishes to be notified about this topic
* @param topic the topic (wildcards supported)
*/
public final void registerDiscoveryListener(MQTTTopicDiscoveryParticipant listener, String topic) {
Map<MQTTTopicDiscoveryParticipant, @Nullable TopicSubscribe> topicListeners = discoveryTopics
.computeIfAbsent(topic, t -> new HashMap<>());
topicListeners.compute(listener, (k, v) -> {
if (v != null) {
logger.warn("Duplicate subscription for {} to discovery topic {} on broker {}. Check discovery logic!",
listener, topic, thing.getUID());
v.stop();
}
TopicSubscribe topicSubscribe = new TopicSubscribe(connection, topic, listener, thing.getUID());
if (discoveryEnabled()) {
topicSubscribe.start().handle((result, ex) -> {
if (ex != null) {
logger.warn("Failed to subscribe {} to discovery topic {} on broker {}", listener, topic,
thing.getUID());
} else {
logger.trace("Subscribed {} to discovery topic {} on broker {}", listener, topic,
thing.getUID());
}
return null;
});
}
return topicSubscribe;
});
}
/**
* unregisters a discovery listener from a specified topic on this broker (used by the handler factory)
*
* @param listener the discovery participant that wishes no notifications about this topic
* @param topic the topic (as specified during registration)
*/
public final void unregisterDiscoveryListener(MQTTTopicDiscoveryParticipant listener, String topic) {
Map<MQTTTopicDiscoveryParticipant, @Nullable TopicSubscribe> topicListeners = discoveryTopics.compute(topic,
(k, v) -> {
if (v == null) {
logger.warn(
"Tried to unsubscribe {} from discovery topic {} on broker {} but topic not registered at all. Check discovery logic!",
listener, topic, thing.getUID());
return null;
}
v.compute(listener, (l, w) -> {
if (w == null) {
logger.warn(
"Tried to unsubscribe {} from discovery topic {} on broker {} but topic not registered for listener. Check discovery logic!",
listener, topic, thing.getUID());
} else {
w.stop();
logger.trace("Unsubscribed {} from discovery topic {} on broker {}", listener, topic,
thing.getUID());
}
return null;
});
return v.isEmpty() ? null : v;
});
}
/**
* check whether discovery is disabled on this broker
*
* @return true if discovery disabled
*/
public abstract boolean discoveryEnabled();
}

View File

@@ -0,0 +1,240 @@
/**
* Copyright (c) 2010-2020 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.binding.mqtt.handler;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.net.ssl.TrustManager;
import org.apache.commons.lang.StringUtils;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.openhab.binding.mqtt.internal.ssl.Pin;
import org.openhab.binding.mqtt.internal.ssl.PinMessageDigest;
import org.openhab.binding.mqtt.internal.ssl.PinTrustManager;
import org.openhab.binding.mqtt.internal.ssl.PinType;
import org.openhab.binding.mqtt.internal.ssl.PinnedCallback;
import org.openhab.core.config.core.Configuration;
import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
import org.openhab.core.io.transport.mqtt.MqttConnectionState;
import org.openhab.core.io.transport.mqtt.MqttService;
import org.openhab.core.io.transport.mqtt.MqttWillAndTestament;
import org.openhab.core.io.transport.mqtt.reconnect.PeriodicReconnectStrategy;
import org.openhab.core.thing.Bridge;
import org.openhab.core.util.HexUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This handler provided more detailed connection information from a
* {@link MqttBrokerConnection} via a Thing property, put the Thing
* offline or online depending on the connection and adds the configured
* connection to the {@link MqttService}.
*
* @author David Graeff - Initial contribution
*/
@NonNullByDefault
public class BrokerHandler extends AbstractBrokerHandler implements PinnedCallback {
private final Logger logger = LoggerFactory.getLogger(BrokerHandler.class);
protected BrokerHandlerConfig config = new BrokerHandlerConfig();
public BrokerHandler(Bridge thing) {
super(thing);
}
@Override
public void connectionStateChanged(MqttConnectionState state, @Nullable Throwable error) {
super.connectionStateChanged(state, error);
// Store generated client ID if none was set by the user
final MqttBrokerConnection connection = this.connection;
if (connection != null && state == MqttConnectionState.CONNECTED && StringUtils.isBlank(config.clientID)) {
config.clientID = connection.getClientId();
Configuration editConfig = editConfiguration();
editConfig.put("clientid", config.clientID);
updateConfiguration(editConfig);
}
}
/**
* This method gets called by the {@link PinningSSLContextProvider} if a new public key
* or certificate hash got pinned. The hash is stored in the thing configuration.
*/
@Override
public void pinnedLearnedHash(Pin pin) {
byte[] hash = pin.getHash();
if (hash == null) {
logger.error("Received pins hash is empty!");
return;
}
String configKey = null;
try {
switch (pin.getType()) {
case CERTIFICATE_TYPE:
configKey = BrokerHandlerConfig.class.getDeclaredField("certificate").getName();
break;
case PUBLIC_KEY_TYPE:
configKey = BrokerHandlerConfig.class.getDeclaredField("publickey").getName();
break;
}
} catch (NoSuchFieldException e) {
logger.error("Field name changed!", e);
return;
}
Configuration thingConfig = editConfiguration();
thingConfig.put(configKey, HexUtils.bytesToHex(hash));
updateConfiguration(thingConfig);
}
@Override
public void pinnedConnectionDenied(Pin pin) {
// We don't need to handle this here, because the {@link PinningSSLContextProvider}
// will throw a CertificateException if the connection fails.
}
@Override
public void pinnedConnectionAccepted() {
}
@Override
public void dispose() {
try {
if (connection != null) {
connection.stop().get(1000, TimeUnit.MILLISECONDS);
} else {
logger.warn("Trying to dispose handler {} but connection is already null. Most likely this is a bug.",
thing.getUID());
}
} catch (InterruptedException | ExecutionException | TimeoutException ignore) {
}
super.dispose();
}
@Override
public boolean discoveryEnabled() {
return config.enableDiscovery;
}
/**
* Reads the thing configuration related to public key or certificate pinning, creates an appropriate a
* {@link PinningSSLContextProvider} and assigns it to the {@link MqttBrokerConnection} instance.
* The instance need to be set before calling this method. If the SHA-256 algorithm is not supported
* by the platform, this method will do nothing.
*
* @throws IllegalArgumentException Throws this exception, if provided hash values cannot be
* assigned to the {@link PinningSSLContextProvider}.
*/
protected void assignSSLContextProvider(BrokerHandlerConfig config, MqttBrokerConnection connection,
PinnedCallback callback) throws IllegalArgumentException {
final PinTrustManager trustManager = new PinTrustManager();
connection.setTrustManagers(new TrustManager[] { trustManager });
trustManager.setCallback(callback);
if (config.certificatepin) {
try {
Pin pin;
if (StringUtils.isBlank(config.certificate)) {
pin = Pin.LearningPin(PinType.CERTIFICATE_TYPE);
} else {
String[] split = config.certificate.split(":");
if (split.length != 2) {
throw new NoSuchAlgorithmException("Algorithm is missing");
}
pin = Pin.CheckingPin(PinType.CERTIFICATE_TYPE, new PinMessageDigest(split[0]),
HexUtils.hexToBytes(split[1]));
}
trustManager.addPinning(pin);
} catch (NoSuchAlgorithmException e) {
throw new IllegalArgumentException(e);
}
}
if (config.publickeypin) {
try {
Pin pin;
if (StringUtils.isBlank(config.publickey)) {
pin = Pin.LearningPin(PinType.PUBLIC_KEY_TYPE);
} else {
String[] split = config.publickey.split(":");
if (split.length != 2) {
throw new NoSuchAlgorithmException("Algorithm is missing");
}
pin = Pin.CheckingPin(PinType.PUBLIC_KEY_TYPE, new PinMessageDigest(split[0]),
HexUtils.hexToBytes(split[1]));
}
trustManager.addPinning(pin);
} catch (NoSuchAlgorithmException e) {
throw new IllegalArgumentException(e);
}
}
}
/**
* Creates a broker connection based on the configuration of {@link #config}.
*
* @return Returns a valid MqttBrokerConnection
* @throws IllegalArgumentException If the configuration is invalid, this exception is thrown.
*/
protected MqttBrokerConnection createBrokerConnection() throws IllegalArgumentException {
String host = config.host;
if (StringUtils.isBlank(host) || host == null) {
throw new IllegalArgumentException("Host is empty!");
}
final MqttBrokerConnection connection = new MqttBrokerConnection(host, config.port, config.secure,
config.clientID);
final String username = config.username;
final String password = config.password;
if (StringUtils.isNotBlank(username) && password != null) {
connection.setCredentials(username, password); // Empty passwords are allowed
}
final String topic = config.lwtTopic;
if (topic != null) {
final String msg = config.lwtMessage;
MqttWillAndTestament will = new MqttWillAndTestament(topic, msg != null ? msg.getBytes() : null,
config.lwtQos, config.lwtRetain);
connection.setLastWill(will);
}
connection.setQos(config.qos);
if (config.reconnectTime != null) {
connection.setReconnectStrategy(new PeriodicReconnectStrategy(config.reconnectTime, 10000));
}
final Integer keepAlive = config.keepAlive;
if (keepAlive != null) {
connection.setKeepAliveInterval(keepAlive);
}
if (config.timeoutInMs != null) {
connection.setTimeoutExecutor(scheduler, TIMEOUT_DEFAULT);
}
connection.setRetain(config.retainMessages);
return connection;
}
@Override
public void initialize() {
config = getConfigAs(BrokerHandlerConfig.class);
final MqttBrokerConnection connection = createBrokerConnection();
assignSSLContextProvider(config, connection, this);
this.connection = connection;
super.initialize();
}
}

View File

@@ -0,0 +1,39 @@
/**
* Copyright (c) 2010-2020 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.binding.mqtt.handler;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.openhab.core.io.transport.mqtt.MqttBrokerConnectionConfig;
/**
* Holds the configuration of a {@link BrokerHandler} Thing. Parameters are inherited
* from {@link MqttBrokerConnectionConfig}, Additionally some
* reconnect and security related parameters are defined here.
*
* @author David Graeff - Initial contribution
*/
@NonNullByDefault
public class BrokerHandlerConfig extends MqttBrokerConnectionConfig {
public @Nullable Integer reconnectTime;
public @Nullable Integer timeoutInMs;
// For more security, the following optional parameters can be altered
public boolean certificatepin = false;
public boolean publickeypin = false;
public String certificate = "";
public String publickey = "";
public boolean enableDiscovery = true;
}

View File

@@ -0,0 +1,65 @@
/**
* Copyright (c) 2010-2020 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.binding.mqtt.handler;
import java.util.concurrent.CompletableFuture;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
import org.openhab.core.io.transport.mqtt.MqttMessageSubscriber;
import org.openhab.core.thing.ChannelUID;
/**
* Subscribes to a state topic and calls {@link AbstractBrokerHandler#triggerChannel(ChannelUID, String)} if a value got
* received.
*
* @author David Graeff - Initial contribution
*/
@NonNullByDefault
public class PublishTriggerChannel implements MqttMessageSubscriber {
private final MqttBrokerConnection connection;
private final PublishTriggerChannelConfig config;
private final ChannelUID uid;
private final AbstractBrokerHandler handler;
PublishTriggerChannel(PublishTriggerChannelConfig config, ChannelUID uid, MqttBrokerConnection connection,
AbstractBrokerHandler handler) {
this.config = config;
this.uid = uid;
this.connection = connection;
this.handler = handler;
}
CompletableFuture<Boolean> start() {
return stop().thenCompose(b -> connection.subscribe(config.stateTopic, this));
}
@Override
public void processMessage(String topic, byte[] payload) {
String value = new String(payload);
// Check condition
String expectedPayload = config.payload;
if (expectedPayload != null && !value.equals(expectedPayload)) {
return;
}
if (config.separator.isEmpty()) {
handler.triggerChannel(uid, value);
} else {
handler.triggerChannel(uid, topic + config.separator + value);
}
}
public CompletableFuture<Boolean> stop() {
return connection.unsubscribe(config.stateTopic, this);
}
}

View File

@@ -0,0 +1,28 @@
/**
* Copyright (c) 2010-2020 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.binding.mqtt.handler;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
/**
* Holds the configuration of a {@link PublishTriggerChannel}.
*
* @author David Graeff - Initial contribution
*/
@NonNullByDefault
public class PublishTriggerChannelConfig {
public String stateTopic = "";
public String separator = "";
public @Nullable String payload;
}

View File

@@ -0,0 +1,143 @@
/**
* Copyright (c) 2010-2020 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.binding.mqtt.handler;
import java.util.HashMap;
import java.util.Map;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
import org.openhab.core.io.transport.mqtt.MqttConnectionState;
import org.openhab.core.io.transport.mqtt.MqttService;
import org.openhab.core.io.transport.mqtt.MqttServiceObserver;
import org.openhab.core.io.transport.mqtt.MqttWillAndTestament;
import org.openhab.core.io.transport.mqtt.reconnect.PeriodicReconnectStrategy;
import org.openhab.core.thing.Bridge;
import org.openhab.core.thing.ThingStatus;
import org.openhab.core.thing.ThingStatusDetail;
/**
* This handler does not much except providing all information from a
* {@link MqttBrokerConnection} via Thing properties and put the Thing
* offline or online depending on the connection.
*
* @author David Graeff - Initial contribution
*/
@NonNullByDefault
public class SystemBrokerHandler extends AbstractBrokerHandler implements MqttServiceObserver {
// Properties
public static final String PROPERTY_URL = "url";
public static final String PROPERTY_USERNAME = "username";
public static final String PROPERTY_PASSWORD = "password";
public static final String PROPERTY_QOS = "qos";
public static final String PROPERTY_RETAIN = "retain";
public static final String PROPERTY_LAST_WILL = "lastwill";
public static final String PROPERTY_RECONNECT_TIME = "reconnect_time_ms";
public static final String PROPERTY_KEEP_ALIVE_TIME = "keep_alive_time_ms";
public static final String PROPERTY_CONNECT_TIMEOUT = "connect_timeout_ms";
protected final MqttService service;
protected String brokerID = "";
protected boolean discoveryEnabled = true;
public SystemBrokerHandler(Bridge thing, MqttService service) {
super(thing);
this.service = service;
}
@Override
public void connectionStateChanged(MqttConnectionState state, @Nullable Throwable error) {
Map<String, String> properties = new HashMap<>();
properties.put(PROPERTY_URL, connection.getHost() + ":" + String.valueOf(connection.getPort()));
final String username = connection.getUser();
final String password = connection.getPassword();
if (username != null && password != null) {
properties.put(PROPERTY_USERNAME, username);
properties.put(PROPERTY_PASSWORD, password);
}
properties.put(PROPERTY_QOS, String.valueOf(connection.getQos()));
properties.put(PROPERTY_RETAIN, String.valueOf(connection.isRetain()));
final MqttWillAndTestament lastWill = connection.getLastWill();
if (lastWill != null) {
properties.put(PROPERTY_LAST_WILL, lastWill.toString());
} else {
properties.put(PROPERTY_LAST_WILL, "");
}
if (connection.getReconnectStrategy() instanceof PeriodicReconnectStrategy) {
final PeriodicReconnectStrategy strategy = (PeriodicReconnectStrategy) connection.getReconnectStrategy();
if (strategy != null) {
properties.put(PROPERTY_RECONNECT_TIME, String.valueOf(strategy.getReconnectFrequency()));
}
}
properties.put(PROPERTY_KEEP_ALIVE_TIME, String.valueOf(connection.getKeepAliveInterval()));
updateProperties(properties);
super.connectionStateChanged(state, error);
}
/**
* The base implementation will set the connection variable to the given broker
* if it matches the brokerID and will start to connect to the broker if there
* is no connection established yet.
*/
@Override
public void brokerAdded(String connectionName, MqttBrokerConnection addedConnection) {
if (!connectionName.equals(brokerID) || connection == addedConnection) {
return;
}
this.connection = addedConnection;
super.initialize();
}
@Override
public void brokerRemoved(String connectionName, MqttBrokerConnection removedConnection) {
final MqttBrokerConnection connection = this.connection;
if (removedConnection.equals(connection)) {
connection.removeConnectionObserver(this);
this.connection = null;
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_ERROR, "@text/offline.sharedremoved");
return;
}
}
@Override
public void initialize() {
this.brokerID = getThing().getConfiguration().get("brokerid").toString();
this.discoveryEnabled = (Boolean) getThing().getConfiguration().get("enableDiscovery");
service.addBrokersListener(this);
connection = service.getBrokerConnection(brokerID);
if (connection == null) {
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_ERROR,
"@text/offline.notextualconfig [\"" + brokerID + "\"");
return;
}
super.initialize();
}
@Override
public void dispose() {
service.removeBrokersListener(this);
super.dispose();
}
@Override
public boolean discoveryEnabled() {
return discoveryEnabled;
}
}

View File

@@ -0,0 +1,137 @@
/**
* Copyright (c) 2010-2020 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.binding.mqtt.internal;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.openhab.binding.mqtt.MqttBindingConstants;
import org.openhab.binding.mqtt.discovery.MQTTTopicDiscoveryParticipant;
import org.openhab.binding.mqtt.discovery.MQTTTopicDiscoveryService;
import org.openhab.binding.mqtt.handler.AbstractBrokerHandler;
import org.openhab.binding.mqtt.handler.BrokerHandler;
import org.openhab.binding.mqtt.handler.SystemBrokerHandler;
import org.openhab.core.io.transport.mqtt.MqttService;
import org.openhab.core.thing.Bridge;
import org.openhab.core.thing.Thing;
import org.openhab.core.thing.ThingTypeUID;
import org.openhab.core.thing.binding.BaseThingHandlerFactory;
import org.openhab.core.thing.binding.ThingHandler;
import org.openhab.core.thing.binding.ThingHandlerFactory;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Reference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The {@link MqttBrokerHandlerFactory} is responsible for creating things and thing
* handlers. It keeps reference to all handlers and implements the {@link MQTTTopicDiscoveryService} service
* interface, so service consumers can subscribe to a topic on all available broker connections.
*
* @author David Graeff - Initial contribution
*/
@NonNullByDefault
@Component(service = { ThingHandlerFactory.class,
MQTTTopicDiscoveryService.class }, configurationPid = "MqttBrokerHandlerFactory")
public class MqttBrokerHandlerFactory extends BaseThingHandlerFactory implements MQTTTopicDiscoveryService {
private static final Set<ThingTypeUID> SUPPORTED_THING_TYPES_UIDS = Stream
.of(MqttBindingConstants.BRIDGE_TYPE_SYSTEMBROKER, MqttBindingConstants.BRIDGE_TYPE_BROKER)
.collect(Collectors.toSet());
private final Logger logger = LoggerFactory.getLogger(MqttBrokerHandlerFactory.class);
protected final Map<String, List<MQTTTopicDiscoveryParticipant>> discoveryTopics = new HashMap<>();
protected final Set<AbstractBrokerHandler> handlers = Collections
.synchronizedSet(Collections.newSetFromMap(new WeakHashMap<>()));
private MqttService mqttService;
@Activate
public MqttBrokerHandlerFactory(@Reference MqttService mqttService) {
this.mqttService = mqttService;
}
@Override
public boolean supportsThingType(ThingTypeUID thingTypeUID) {
return SUPPORTED_THING_TYPES_UIDS.contains(thingTypeUID);
}
/**
* Add the given broker connection to all listeners.
*/
protected void createdHandler(AbstractBrokerHandler handler) {
handlers.add(handler);
discoveryTopics.forEach((topic, listenerList) -> {
listenerList.forEach(listener -> {
handler.registerDiscoveryListener(listener, topic);
});
});
}
@Override
protected @Nullable ThingHandler createHandler(Thing thing) {
if (mqttService == null) {
throw new IllegalStateException("MqttService must be bound, before ThingHandlers can be created");
}
if (!(thing instanceof Bridge)) {
throw new IllegalStateException("A bridge type is expected");
}
final ThingTypeUID thingTypeUID = thing.getThingTypeUID();
final AbstractBrokerHandler handler;
if (thingTypeUID.equals(MqttBindingConstants.BRIDGE_TYPE_SYSTEMBROKER)) {
handler = new SystemBrokerHandler((Bridge) thing, mqttService);
} else if (thingTypeUID.equals(MqttBindingConstants.BRIDGE_TYPE_BROKER)) {
handler = new BrokerHandler((Bridge) thing);
} else {
throw new IllegalStateException("Not supported " + thingTypeUID.toString());
}
createdHandler(handler);
return handler;
}
/**
* This factory also implements {@link MQTTTopicDiscoveryService} so consumers can subscribe to
* a MQTT topic that is registered on all available broker connections.
*/
@Override
public void subscribe(MQTTTopicDiscoveryParticipant listener, String topic) {
List<MQTTTopicDiscoveryParticipant> listenerList = discoveryTopics.computeIfAbsent(topic,
t -> new ArrayList<>());
listenerList.add(listener);
handlers.forEach(broker -> broker.registerDiscoveryListener(listener, topic));
}
/**
* Unsubscribe a listener from all available broker connections.
*/
@Override
@SuppressWarnings("null")
public void unsubscribe(MQTTTopicDiscoveryParticipant listener) {
discoveryTopics.forEach((topic, listenerList) -> {
listenerList.remove(listener);
handlers.forEach(broker -> broker.unregisterDiscoveryListener(listener, topic));
});
}
@Override
public void publish(String topic, byte[] payload) {
handlers.forEach(handler -> {
handler.getConnectionAsync().thenAccept(connection -> {
connection.publish(topic, payload);
});
});
}
}

View File

@@ -0,0 +1,45 @@
/**
* Copyright (c) 2010-2020 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.binding.mqtt.internal;
import org.openhab.binding.mqtt.MqttBindingConstants;
import org.openhab.core.thing.ThingUID;
/**
* Maps a Mqtt broker URL to a possible ThingUID id.
*
* @author David Graeff - Initial contribution
*/
public class MqttThingID {
/**
* Convert the url (tcp://122.123.111.123:1883) to a version without colons, dots or slashes
* for creating a ThingUID with that string (tcp_122_123_111_123_1883).
*/
public static String getThingID(String host, int port) {
return host.replace('.', '_').replace("://", "_") + "_" + String.valueOf(port);
}
/**
* Creates a normal broker thing with an url that will be converted to a thing ID.
*
* @param url The broker connection url
* @return Returns a ThingUID
*/
public static ThingUID getThingUID(String host, int port) {
return new ThingUID(MqttBindingConstants.BRIDGE_TYPE_BROKER, getThingID(host, port));
}
public static ThingUID getTextualThingUID(String host, int port) {
return new ThingUID(MqttBindingConstants.BRIDGE_TYPE_SYSTEMBROKER, getThingID(host, port));
}
}

View File

@@ -0,0 +1,116 @@
/**
* Copyright (c) 2010-2020 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.binding.mqtt.internal.discovery;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.openhab.binding.mqtt.MqttBindingConstants;
import org.openhab.core.config.discovery.AbstractDiscoveryService;
import org.openhab.core.config.discovery.DiscoveryResultBuilder;
import org.openhab.core.config.discovery.DiscoveryService;
import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
import org.openhab.core.io.transport.mqtt.MqttService;
import org.openhab.core.io.transport.mqtt.MqttServiceObserver;
import org.openhab.core.thing.ThingUID;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The {@link MqttServiceDiscoveryService} is responsible for discovering connections on
* the MqttService shared connection pool.
*
* @author David Graeff - Initial contribution
*/
@Component(immediate = true, service = DiscoveryService.class, configurationPid = "discovery.systemmqttbroker")
public class MqttServiceDiscoveryService extends AbstractDiscoveryService implements MqttServiceObserver {
private final Logger logger = LoggerFactory.getLogger(MqttServiceDiscoveryService.class);
MqttService mqttService;
public MqttServiceDiscoveryService() {
super(Stream.of(MqttBindingConstants.BRIDGE_TYPE_SYSTEMBROKER, MqttBindingConstants.BRIDGE_TYPE_BROKER)
.collect(Collectors.toSet()), 0, true);
}
@Override
@Activate
protected void activate(Map<String, Object> config) {
super.activate(config);
}
@Override
@Deactivate
protected void deactivate() {
super.deactivate();
}
@Reference
public void setMqttService(MqttService service) {
mqttService = service;
}
public void unsetMqttService(MqttService service) {
mqttService = null;
}
@Override
protected void startScan() {
mqttService.addBrokersListener(this);
mqttService.getAllBrokerConnections().forEach((brokerId, broker) -> brokerAdded(brokerId, broker));
stopScan();
}
@Override
protected void startBackgroundDiscovery() {
if (mqttService == null) {
return;
}
mqttService.addBrokersListener(this);
mqttService.getAllBrokerConnections().forEach((brokerId, broker) -> brokerAdded(brokerId, broker));
}
@Override
protected void stopBackgroundDiscovery() {
if (mqttService == null) {
return;
}
mqttService.removeBrokersListener(this);
}
@Override
public void brokerAdded(String brokerId, MqttBrokerConnection broker) {
logger.trace("Found broker connection {}", brokerId);
Map<String, Object> properties = new HashMap<>();
properties.put("host", broker.getHost());
properties.put("port", broker.getPort());
properties.put("brokerid", brokerId);
ThingUID thingUID;
thingUID = new ThingUID(MqttBindingConstants.BRIDGE_TYPE_SYSTEMBROKER, brokerId);
thingDiscovered(DiscoveryResultBuilder.create(thingUID).withProperties(properties)
.withRepresentationProperty("brokerid").withLabel("MQTT Broker").build());
}
@Override
public void brokerRemoved(String brokerId, MqttBrokerConnection broker) {
ThingUID thingUID;
thingUID = new ThingUID(MqttBindingConstants.BRIDGE_TYPE_SYSTEMBROKER, brokerId);
thingRemoved(thingUID);
}
}

View File

@@ -0,0 +1,116 @@
/**
* Copyright (c) 2010-2020 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.binding.mqtt.internal.ssl;
import java.util.Arrays;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
/**
* A pin is either a public key pin or certificate pin and consists of the binary data
* and the used hash algorithm.
*
* @author David Graeff - Initial contribution
*/
@NonNullByDefault
public class Pin {
protected byte @Nullable [] pinData;
protected @Nullable PinMessageDigest hashDigest;
protected boolean learning;
protected final PinType type;
/**
* To simplify the creating of a Pin instance, you can use the factory
* methods `newCertificatePin` and `newPublicKeyPin` of this class.
*
* @param type The pin type
* @param hashDigest The hash method
* @param learning If the Pin is in learning mode.
* @param pinData The pinned data
*/
Pin(PinType type, @Nullable PinMessageDigest hashDigest, boolean learning, byte @Nullable [] pinData) {
this.type = type;
this.hashDigest = hashDigest;
this.learning = learning;
this.pinData = pinData;
}
public PinType getType() {
return type;
}
public byte @Nullable [] getHash() {
return pinData;
}
public void setLearningMode() {
this.learning = true;
this.pinData = null;
}
/**
* This sets the pin instance to checking mode. The given
* data is expected to be hashed in the Pins hashMethod.
*
* @param pinMessageDigest The signature algorithm message digest
* @param data For instance SHA-256 hash data
*/
public void setCheckMode(PinMessageDigest pinMessageDigest, byte[] data) {
this.hashDigest = pinMessageDigest;
this.learning = false;
this.pinData = data;
}
public static Pin LearningPin(PinType pinType) {
return new Pin(pinType, null, true, null);
}
public static Pin CheckingPin(PinType pinType, PinMessageDigest method, byte[] pinData) {
return new Pin(pinType, method, false, pinData);
}
/**
* Returns true if this pin is still learning.
*/
public boolean isLearning() {
return learning;
}
/**
* This method is used to determine if the given digest is equal to the
* one of this Pin. If this Pin is still learning, it will always return true.
*
* @param digestData SHA256 hash data of a public key or a certificate.
* @return Returns true if equal
*/
public boolean isEqual(byte[] digestData) {
if (learning) {
return true;
}
return Arrays.equals(pinData, digestData);
}
@Override
public String toString() {
byte[] pinData = this.pinData;
PinMessageDigest hashDigest = this.hashDigest;
if (hashDigest != null && pinData != null) {
return type.name() + ":" + hashDigest.toHexString(pinData);
} else if (hashDigest != null) {
return type.name() + ":" + hashDigest.getMethod();
} else {
return type.name();
}
}
}

View File

@@ -0,0 +1,64 @@
/**
* Copyright (c) 2010-2020 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.binding.mqtt.internal.ssl;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.openhab.core.util.HexUtils;
/**
* Encapsulates a {@link MessageDigest} with a specific Hash method. Extracts the digest data of
* a certificate.
*
* @author David Graeff - Initial contribution
*/
@NonNullByDefault
public class PinMessageDigest {
protected final MessageDigest messageDigest;
private final String method;
/**
* Creates a message digest for a certificate/public key pinning.
*
* @param method The hash method to use
* @throws NoSuchAlgorithmException
*/
public PinMessageDigest(String method) throws NoSuchAlgorithmException {
this.method = method;
this.messageDigest = MessageDigest.getInstance(method);
}
/**
* Outputs a string like "SHA-256:83F9171E06A313118889F7D79302BD1B7A2042EE0CFD029ABF8DD06FFA6CD9D3"
*
* @param digestData Digest data
*/
public String toHexString(byte[] digestData) {
return getMethod() + ":" + HexUtils.bytesToHex(digestData);
}
byte[] digest(byte[] origData) {
synchronized (messageDigest) {
return messageDigest.digest(origData);
}
}
/**
* @return Return the digest method for instance SHA-256
*/
public String getMethod() {
return method;
}
}

View File

@@ -0,0 +1,164 @@
/**
* Copyright (c) 2010-2020 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.binding.mqtt.internal.ssl;
import java.net.Socket;
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateEncodingException;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.X509ExtendedTrustManager;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
/**
* This is a custom {@link X509ExtendedTrustManager}. {@link Pin} objects can be added and will
* be used in the checkServerTrusted() method to determine if a connection can be trusted.
*
* @author David Graeff - Initial contribution
*/
@NonNullByDefault
public class PinTrustManager extends X509ExtendedTrustManager {
List<Pin> pins = new ArrayList<>();
protected @Nullable PinnedCallback callback;
/**
* Adds a pin (certificate key, public key) to the trust manager. If a connections has assigned pins,
* it will not accept any other certificates or public keys anymore!
*
* @param pin The pin
*/
public void addPinning(Pin pin) {
pins.add(pin);
}
public void setCallback(PinnedCallback callback) {
this.callback = callback;
}
@Override
public void checkClientTrusted(X509Certificate @Nullable [] chain, @Nullable String authType)
throws CertificateException {
throw new UnsupportedOperationException();
}
protected byte[] getEncoded(PinType type, X509Certificate cert) throws CertificateEncodingException {
switch (type) {
case CERTIFICATE_TYPE:
return cert.getEncoded();
case PUBLIC_KEY_TYPE:
return cert.getPublicKey().getEncoded();
}
throw new CertificateEncodingException("Type unknown");
}
/**
* A signature name depends on the security provider but usually follows
* https://docs.oracle.com/javase/8/docs/technotes/guides/security/StandardNames.html#Signature.
* E.g.: "SHA256withRSA". We need "SHA" and "256" to initialize a {@link PinMessageDigest}.
*/
PinMessageDigest getMessageDigestForSigAlg(String sigAlg) throws CertificateException {
final Matcher matcher = Pattern.compile("(\\D*)(\\d+)").matcher(sigAlg);
matcher.find();
final String sigAlgName = matcher.group(1);
final String sigAlgBits = matcher.group(2);
try {
return new PinMessageDigest(sigAlgName + "-" + sigAlgBits);
} catch (NoSuchAlgorithmException e) {
throw new CertificateException(e);
}
}
@Override
public void checkServerTrusted(X509Certificate @Nullable [] chainN, @Nullable String authType)
throws CertificateException {
X509Certificate[] chain = chainN;
if (chain == null) {
return;
}
final PinMessageDigest digestForSigAlg = getMessageDigestForSigAlg(chain[0].getSigAlgName());
final PinnedCallback callback = this.callback;
// All pins have to accept the connection
for (Pin pin : pins) {
byte[] origData = getEncoded(pin.getType(), chain[0]);
// If in learning mode: Learn new signature algorithm and hash and notify listeners
if (pin.isLearning()) {
pin.setCheckMode(digestForSigAlg, digestForSigAlg.digest(origData));
if (callback != null) {
callback.pinnedLearnedHash(pin);
}
continue;
} else {
final PinMessageDigest hashDigest = pin.hashDigest;
if (hashDigest == null) {
throw new CertificateException("No hashDigest given!");
}
// Check if hash is equal
final byte[] digestData = hashDigest.digest(origData);
if (pin.isEqual(digestData)) {
continue;
}
// This pin does not accept the connection
if (callback != null) {
callback.pinnedConnectionDenied(pin);
}
throw new CertificateException(pin.getType().name() + " pinning denied access. Destination pin is "
+ hashDigest.toHexString(digestData) + "' but expected: " + pin.toString());
}
}
// All pin instances passed, the connection is accepted
if (callback != null) {
callback.pinnedConnectionAccepted();
}
}
@Override
public X509Certificate[] getAcceptedIssuers() {
return new X509Certificate[0];
}
@Override
public void checkClientTrusted(X509Certificate @Nullable [] chain, @Nullable String authType,
@Nullable Socket socket) throws CertificateException {
checkClientTrusted(chain, authType);
}
@Override
public void checkClientTrusted(X509Certificate @Nullable [] chain, @Nullable String authType,
@Nullable SSLEngine sslEngine) throws CertificateException {
checkClientTrusted(chain, authType);
}
@Override
public void checkServerTrusted(X509Certificate @Nullable [] chain, @Nullable String authType,
@Nullable Socket socket) throws CertificateException {
checkServerTrusted(chain, authType);
}
@Override
public void checkServerTrusted(X509Certificate @Nullable [] chain, @Nullable String authType,
@Nullable SSLEngine sslEngine) throws CertificateException {
checkServerTrusted(chain, authType);
}
}

View File

@@ -0,0 +1,23 @@
/**
* Copyright (c) 2010-2020 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.binding.mqtt.internal.ssl;
/**
* A {@link Pin} is either a Public Key or Certificate Pin.
*
* @author David Graeff - Initial contribution
*/
public enum PinType {
PUBLIC_KEY_TYPE,
CERTIFICATE_TYPE
}

View File

@@ -0,0 +1,44 @@
/**
* Copyright (c) 2010-2020 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.binding.mqtt.internal.ssl;
import org.eclipse.jdt.annotation.NonNullByDefault;
/**
* Implement this to be notified by the {@link PinTrustManager} if a connection was
* accepted or denied and if a Pin switched from learning mode to checking mode.
*
* @author David Graeff - Initial contribution
*/
@NonNullByDefault
public interface PinnedCallback {
/**
* A public key or certificate hash has been learned. The given pin can be switched
* to checking mode now.
*
* @param pin Public Key or Certificate pin
*/
void pinnedLearnedHash(Pin pin);
/**
* A connection has been accepted
*/
void pinnedConnectionAccepted();
/**
* A connection has been denied
*
* @param pin The pin object that denied the connection
*/
void pinnedConnectionDenied(Pin pin);
}

View File

@@ -0,0 +1,10 @@
<?xml version="1.0" encoding="UTF-8"?>
<binding:binding id="mqtt" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:binding="https://openhab.org/schemas/binding/v1.0.0"
xsi:schemaLocation="https://openhab.org/schemas/binding/v1.0.0 https://openhab.org/schemas/binding-1.0.0.xsd">
<name>MQTT Binding</name>
<description>Allows management of MQTT broker connections and linking
of MQTT topics to Things and Channels</description>
<author>David Graeff</author>
</binding:binding>

View File

@@ -0,0 +1,16 @@
binding.mqtt.name = MQTT Binding
binding.mqtt.description = Erlaubt die Verwaltung von MQTT Verbindungen und das Verknüpfen von MQTT Topics
offline.notextualconfig=Die Systemverbindung mit dem Namen {0} existiert nicht mehr.
offline.dyninsteadoftextual=Eine dynamische Verbindung wurde gefunden, statt der erwarteten Systemverbindung: {0}.
offline.textualinsteadofdny=Eine Systemverbindung wurde gefunden, statt der erwarteten dynamischen Verbindung: {0}.
offline.sharedremoved=Eine andere Erweiterung hat unerwartet die Broker Verbindung entfernt.
actionLabel=sende eine MQTT Nachricht
actionDesc=Sendet einen Wert an das gegebene MQTT Topic.
actionInputTopicLabel=MQTT Topic
actionInputTopicDesc=Ein MQTT Topic an welches der Wert versendet wird
actionInputValueLabel=Wert
actionInputValueDesc=Der zu versendende Wert
actionInputRetainLabel=Retain
actionInputRetainDesc=Ob die Nachricht retained werden soll

View File

@@ -0,0 +1,14 @@
offline.notextualconfig=The system connection with the name {0} doesn't exist anymore.
offline.dyninsteadoftextual=A binding owned connection was found instead of a system connection for the broker name: {0}.
offline.textualinsteadofdny=A system connection was found instead of a dynamic connection for the broker name: {0}.
offline.sharedremoved=Another binding unexpectedly removed the internal broker connection.
actionLabel=publish an MQTT message
actionDesc=Publishes a value to the given MQTT topic.
actionInputTopicLabel=MQTT Topic
actionInputTopicDesc=The topic to publish a value to.
actionInputValueLabel=Value
actionInputValueDesc=The value to publish
actionInputRetainLabel=Retain
actionInputRetainDesc=Retain message

View File

@@ -0,0 +1,201 @@
<?xml version="1.0" encoding="UTF-8"?>
<thing:thing-descriptions bindingId="mqtt"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:thing="https://openhab.org/schemas/thing-description/v1.0.0"
xsi:schemaLocation="https://openhab.org/schemas/thing-description/v1.0.0 https://openhab.org/schemas/thing-description-1.0.0.xsd">
<bridge-type id="broker" extensible="publishTrigger">
<label>MQTT Broker</label>
<description>A connection to a MQTT broker</description>
<config-description>
<parameter name="host" type="text" required="true">
<label>Broker Hostname/IP</label>
<description>The IP/Hostname of the MQTT broker</description>
<context>network-address</context>
</parameter>
<parameter name="port" type="integer">
<label>Broker Port</label>
<description>The port is optional, if none is provided, the typical
ports 1883 and 8883 (SSL) are used.</description>
<context>network-address</context>
<advanced>true</advanced>
</parameter>
<parameter name="secure" type="boolean" required="true">
<label>Secure Connection</label>
<description>Uses TLS/SSL to establish a secure connection to the broker.</description>
<default>false</default>
</parameter>
<parameter name="qos" type="integer">
<label>Quality of Service</label>
<options>
<option value="0">At most once (0)</option>
<option value="1">At least once (1)</option>
<option value="2">Exactly once (2)</option>
</options>
<default>0</default>
<advanced>true</advanced>
</parameter>
<parameter name="clientID" type="text">
<label>Client ID</label>
<description>Use a fixed client ID. Defaults to empty which means a
client ID is generated for this connection.</description>
<advanced>true</advanced>
</parameter>
<parameter name="reconnectTime" type="integer">
<label>Reconnect Time</label>
<description>Reconnect time in ms. If a connection is lost, the binding will wait this time before it tries to
reconnect.</description>
<default>60000</default>
<advanced>true</advanced>
</parameter>
<parameter name="keepAlive" type="integer">
<label>Heartbeat</label>
<description>Keep alive / heartbeat timer in s. It can take up to this time to determine if a server connection is
lost. A lower value may keep the broker unnecessarily busy for no or little additional value.</description>
<default>60</default>
<advanced>true</advanced>
</parameter>
<parameter name="lwtMessage" type="text">
<label>Last Will Message</label>
<description>The last will message.</description>
<advanced>true</advanced>
</parameter>
<parameter name="lwtTopic" type="text">
<label>Last Will Topic</label>
<description>Defaults to empty and therefore disables the last will.</description>
<advanced>true</advanced>
</parameter>
<parameter name="lwtQos" type="integer">
<label>Last Will QoS</label>
<description>The quality of service parameter of the last will.</description>
<options>
<option value="0">At most once (0)</option>
<option value="1">At least once (1)</option>
<option value="2">Exactly once (2)</option>
</options>
<default>0</default>
<advanced>true</advanced>
</parameter>
<parameter name="lwtRetain" type="boolean">
<label>Last Will Retain</label>
<description>True if last Will should be retained (defaults to false)</description>
<default>true</default>
<advanced>true</advanced>
</parameter>
<parameter name="username" type="text">
<label>Username</label>
<description>The MQTT username</description>
<advanced>true</advanced>
</parameter>
<parameter name="password" type="text">
<label>Password</label>
<description>The MQTT password</description>
<context>password</context>
<advanced>true</advanced>
</parameter>
<parameter name="certificatepin" type="boolean">
<label>Certificate Pinning</label>
<description>If this and SSL is set: After the next connection has been successfully established, the certificate is
pinned. The connection will be refused if another certificate is used. Clear **certificate** to allow a new
certificate for the next connection attempt. This option can increase security.</description>
<default>true</default>
<advanced>true</advanced>
</parameter>
<parameter name="certificate" type="text">
<label>Certificate Hash</label>
<description>If **certificatepin** is set this hash is used to verify the connection. Clear to allow a new
certificate pinning on the next connection attempt. If empty will be filled automatically by the next successful
connection. An example input would be `SHA-256:83F9171E06A313118889F7D79302BD1B7A2042EE0CFD029ABF8DD06FFA6CD9D3`.</description>
<advanced>true</advanced>
</parameter>
<parameter name="publickeypin" type="boolean">
<label>Public Key Pinning</label>
<description>If this and SSL is set: After the next connection has been successfully established, the public key of
the broker is pinned. The connection will be refused if another public key is used. Clear **publickey** to allow a
new public key for the next connection attempt. This option can increase security.</description>
<default>true</default>
<advanced>true</advanced>
</parameter>
<parameter name="publickey" type="text">
<label>Public Key Hash</label>
<description>If **publickeypin** is set this hash is used to verify the connection. Clear to allow a new public key
pinning on the next connection attempt. If empty will be filled automatically by the next successful connection. An
example input would be `SHA-256:83F9171E06A313118889F7D79302BD1B7A2042EE0CFD029ABF8DD06FFA6CD9D3`</description>
<advanced>true</advanced>
</parameter>
<parameter name="enableDiscovery" type="boolean">
<label>Enable Discovery</label>
<description>If set to true enables this broker for all discovery services.</description>
<advanced>true</advanced>
<default>true</default>
</parameter>
</config-description>
</bridge-type>
<bridge-type id="systemBroker" extensible="publishTrigger">
<label>System MQTT Broker</label>
<description>A system configured and therefore read-only broker connection. Properties are reflecting the
configuration and internal connection status.</description>
<properties>
<property name="url"/>
<property name="username"/>
<property name="password"/>
<property name="qos"/>
<property name="retain"/>
<property name="lastwill"/>
<property name="reconnect_time_ms"/>
<property name="keep_alive_time_ms"/>
</properties>
<config-description>
<parameter name="brokerid" type="text" required="true">
<label>Broker ID</label>
<description>Each system wide configured MQTT broker has a unique broker ID.</description>
</parameter>
<parameter name="enableDiscovery" type="boolean">
<label>Enable Discovery</label>
<description>If set to true enables this broker for all discovery services.</description>
<advanced>true</advanced>
<default>true</default>
</parameter>
</config-description>
</bridge-type>
<channel-type id="publishTrigger">
<kind>trigger</kind>
<label>Publish Trigger</label>
<description>This channel is triggered when a value is published to the configured MQTT topic on this broker
connection. The event payload will be the received MQTT topic value.</description>
<event></event>
<config-description>
<parameter name="stateTopic" type="text" required="true">
<label>MQTT Topic</label>
<description>This channel will trigger on this MQTT topic. This topic can contain wildcards like + and # for example
"all/in/#" or "sensors/+/config".</description>
</parameter>
<parameter name="payload" type="text" required="false">
<label>Payload Condition</label>
<description>An optional condition on the value of the MQTT topic that must match before this channel is triggered.</description>
</parameter>
<parameter name="separator" type="text" required="false">
<label>Separator Character</label>
<description>The trigger channel payload usually only contains the received MQTT topic value. If you define a
separator character, for example '#', the topic and received value will be in the trigger channel payload. For
example: my_topic#my_received_value.</description>
</parameter>
</config-description>
</channel-type>
</thing:thing-descriptions>

View File

@@ -0,0 +1,104 @@
/**
* Copyright (c) 2010-2020 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.binding.mqtt.handler;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;
import java.util.Collections;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.openhab.binding.mqtt.internal.MqttThingID;
import org.openhab.core.config.core.Configuration;
import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
import org.openhab.core.io.transport.mqtt.MqttConnectionState;
import org.openhab.core.io.transport.mqtt.MqttException;
import org.openhab.core.io.transport.mqtt.MqttService;
import org.openhab.core.thing.Bridge;
import org.openhab.core.thing.binding.ThingHandlerCallback;
import org.osgi.service.cm.ConfigurationException;
/**
* Tests cases for {@link org.openhab.binding.mqtt.handler.AbstractBrokerHandler}.
*
* @author David Graeff - Initial contribution
*/
public class AbstractBrokerHandlerTest {
private final String HOST = "tcp://123.1.2.3";
private final int PORT = 80;
private SystemBrokerHandler handler;
int stateChangeCounter = 0;
@Mock
private ThingHandlerCallback callback;
@Mock
private Bridge thing;
@Mock
private MqttService service;
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
doReturn(MqttThingID.getThingUID(HOST, PORT)).when(thing).getUID();
doReturn(new Configuration(Collections.singletonMap("brokerid", MqttThingID.getThingUID(HOST, PORT).getId())))
.when(thing).getConfiguration();
handler = new SystemBrokerHandler(thing, service);
handler.setCallback(callback);
assertThat(handler.getThing().getConfiguration().get("brokerid"), is(MqttThingID.getThingID(HOST, PORT)));
stateChangeCounter = 0;
}
@Test
public void brokerAddedWrongID() throws ConfigurationException, MqttException {
MqttBrokerConnection brokerConnection = mock(MqttBrokerConnection.class);
when(brokerConnection.connectionState()).thenReturn(MqttConnectionState.CONNECTED);
handler.brokerAdded("nonsense_id", brokerConnection);
assertNull(handler.connection);
// We do not expect a status change, because brokerAdded will do nothing with invalid connections.
verify(callback, times(0)).statusUpdated(any(), any());
}
@Test
public void brokerRemovedBroker() throws ConfigurationException, MqttException {
MqttBrokerConnectionEx connection = spy(
new MqttBrokerConnectionEx("10.10.0.10", 80, false, "BrokerHandlerTest"));
handler.brokerAdded(handler.brokerID, connection);
assertThat(handler.connection, is(connection));
handler.brokerRemoved("something", connection);
assertNull(handler.connection);
}
@Test
public void brokerAdded() throws ConfigurationException, MqttException {
MqttBrokerConnectionEx connection = spy(
new MqttBrokerConnectionEx("10.10.0.10", 80, false, "BrokerHandlerTest"));
doReturn(connection).when(service).getBrokerConnection(eq(handler.brokerID));
verify(callback, times(0)).statusUpdated(any(), any());
handler.brokerAdded(handler.brokerID, connection);
assertThat(handler.connection, is(connection));
verify(connection).start();
// First connecting then connected and another connected after the future completes
verify(callback, times(3)).statusUpdated(any(), any());
}
}

View File

@@ -0,0 +1,44 @@
/**
* Copyright (c) 2010-2020 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.binding.mqtt.handler;
import static org.mockito.Mockito.verify;
import org.eclipse.jdt.annotation.NonNull;
import org.mockito.Mockito;
import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
import org.openhab.core.thing.Bridge;
/**
* Overwrite BrokerHandler to return our mocked/extended MqttBrokerConnection in
* {@link #createBrokerConnection()}.
*
* @author David Graeff - Initial contribution
*/
public class BrokerHandlerEx extends BrokerHandler {
final MqttBrokerConnectionEx e;
public BrokerHandlerEx(Bridge thing, MqttBrokerConnectionEx e) {
super(thing);
this.e = e;
}
@Override
protected @NonNull MqttBrokerConnection createBrokerConnection() throws IllegalArgumentException {
return e;
}
public static void verifyCreateBrokerConnection(BrokerHandler handler, int times) {
verify(handler, Mockito.times(times)).createBrokerConnection();
}
}

View File

@@ -0,0 +1,140 @@
/**
* Copyright (c) 2010-2020 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.binding.mqtt.handler;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.openhab.binding.mqtt.internal.MqttThingID;
import org.openhab.core.config.core.Configuration;
import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
import org.openhab.core.io.transport.mqtt.MqttConnectionState;
import org.openhab.core.io.transport.mqtt.MqttException;
import org.openhab.core.io.transport.mqtt.MqttService;
import org.openhab.core.thing.Bridge;
import org.openhab.core.thing.ThingStatus;
import org.openhab.core.thing.ThingStatusInfo;
import org.openhab.core.thing.binding.ThingHandlerCallback;
import org.osgi.service.cm.ConfigurationException;
/**
* Test cases for {@link BrokerHandler}.
*
* @author David Graeff - Initial contribution
*/
public class BrokerHandlerTest {
private ScheduledExecutorService scheduler;
@Mock
private ThingHandlerCallback callback;
@Mock
private Bridge thing;
@Mock
private MqttService service;
private MqttBrokerConnectionEx connection;
private BrokerHandler handler;
@Before
public void setUp() throws ConfigurationException, MqttException {
scheduler = new ScheduledThreadPoolExecutor(1);
MockitoAnnotations.initMocks(this);
when(thing.getUID()).thenReturn(MqttThingID.getThingUID("10.10.0.10", 80));
connection = spy(new MqttBrokerConnectionEx("10.10.0.10", 80, false, "BrokerHandlerTest"));
connection.setTimeoutExecutor(scheduler, 10);
connection.setConnectionCallback(connection);
Configuration config = new Configuration();
when(thing.getConfiguration()).thenReturn(config);
handler = spy(new BrokerHandlerEx(thing, connection));
handler.setCallback(callback);
}
@After
public void tearDown() {
scheduler.shutdownNow();
}
@Test(expected = IllegalArgumentException.class)
public void handlerInitWithoutUrl()
throws InterruptedException, IllegalArgumentException, MqttException, ConfigurationException {
// Assume it is a real handler and not a mock as defined above
handler = new BrokerHandler(thing);
assertThat(initializeHandlerWaitForTimeout(), is(true));
}
@Test
public void createBrokerConnection() {
Configuration config = new Configuration();
config.put("host", "10.10.0.10");
config.put("port", 80);
when(thing.getConfiguration()).thenReturn(config);
handler.initialize();
verify(handler).createBrokerConnection();
}
@Test
public void handlerInit()
throws InterruptedException, IllegalArgumentException, MqttException, ConfigurationException {
assertThat(initializeHandlerWaitForTimeout(), is(true));
ArgumentCaptor<ThingStatusInfo> statusInfoCaptor = ArgumentCaptor.forClass(ThingStatusInfo.class);
verify(callback, atLeast(3)).statusUpdated(eq(thing), statusInfoCaptor.capture());
Assert.assertThat(statusInfoCaptor.getValue().getStatus(), is(ThingStatus.ONLINE));
}
/**
* Utility method for tests that need the handler to be initialized to go on.
*
* @return Return true if successful. You usually want to use:
* assertThat(initializeHandlerWaitForTimeout(), is(true));
* @throws InterruptedException
* @throws IllegalArgumentException
* @throws MqttException
* @throws ConfigurationException
*/
boolean initializeHandlerWaitForTimeout()
throws InterruptedException, IllegalArgumentException, MqttException, ConfigurationException {
MqttBrokerConnection c = connection;
MqttConnectionObserverEx o = new MqttConnectionObserverEx();
c.addConnectionObserver(o);
assertThat(connection.connectionState(), is(MqttConnectionState.DISCONNECTED));
handler.initialize();
verify(connection, times(2)).addConnectionObserver(any());
verify(connection, times(1)).start();
// First we expect a CONNECTING state and then a CONNECTED unique state change
assertThat(o.counter, is(2));
// First we expect a CONNECTING state and then a CONNECTED state change
// (and other CONNECTED after the future completes)
verify(handler, times(3)).connectionStateChanged(any(), any());
return true;
}
}

View File

@@ -0,0 +1,113 @@
/**
* Copyright (c) 2010-2020 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.binding.mqtt.handler;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.eclipse.jdt.annotation.NonNull;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
import org.openhab.core.io.transport.mqtt.MqttConnectionState;
import org.openhab.core.io.transport.mqtt.internal.Subscription;
import org.openhab.core.io.transport.mqtt.internal.client.MqttAsyncClientWrapper;
import com.hivemq.client.mqtt.MqttClientState;
/**
* We need an extended MqttBrokerConnection to overwrite the protected `connectionCallbacks` with
* an instance that takes the mocked version of `MqttBrokerConnection` and overwrite the connection state.
*
* We also replace the internal MqttAsyncClient with a spied one, that in respect to the success flags
* immediately succeed or fail with publish, subscribe, unsubscribe, connect, disconnect.
*
* @author David Graeff - Initial contribution
*/
@NonNullByDefault
public class MqttBrokerConnectionEx extends MqttBrokerConnection {
public MqttConnectionState connectionStateOverwrite = MqttConnectionState.DISCONNECTED;
public boolean publishSuccess = true;
public boolean subscribeSuccess = true;
public boolean unsubscribeSuccess = true;
public boolean disconnectSuccess = true;
public boolean connectSuccess = true;
public boolean connectTimeout = false;
public MqttBrokerConnectionEx(String host, @Nullable Integer port, boolean secure, String clientId) {
super(host, port, secure, clientId);
}
public void setConnectionCallback(MqttBrokerConnectionEx o) {
connectionCallback = spy(new ConnectionCallback(o));
}
public Map<String, Subscription> getSubscribers() {
return subscribers;
}
@Override
protected MqttAsyncClientWrapper createClient() {
MqttAsyncClientWrapper mockedClient = mock(MqttAsyncClientWrapper.class);
// connect
doAnswer(i -> {
if (!connectTimeout) {
connectionCallback.onConnected(null);
connectionStateOverwrite = MqttConnectionState.CONNECTED;
return CompletableFuture.completedFuture(null);
}
return new CompletableFuture<>();
}).when(mockedClient).connect(any(), anyInt(), any(), any());
doAnswer(i -> {
if (disconnectSuccess) {
connectionCallback.onDisconnected(new Throwable("disconnect called"));
connectionStateOverwrite = MqttConnectionState.DISCONNECTED;
return CompletableFuture.completedFuture(null);
}
return new CompletableFuture<>();
}).when(mockedClient).disconnect();
// subscribe
doAnswer(i -> {
if (subscribeSuccess) {
return CompletableFuture.completedFuture(null);
} else {
CompletableFuture<Void> future = new CompletableFuture<>();
future.completeExceptionally(new Throwable("subscription failed"));
return future;
}
}).when(mockedClient).subscribe(any(), anyInt(), any());
// unsubscribe
doAnswer(i -> {
if (unsubscribeSuccess) {
return CompletableFuture.completedFuture(null);
} else {
CompletableFuture<Void> future = new CompletableFuture<>();
future.completeExceptionally(new Throwable("unsubscription failed"));
return future;
}
}).when(mockedClient).unsubscribe(any());
// state
doAnswer(i -> {
return MqttClientState.CONNECTED;
}).when(mockedClient).getState();
return mockedClient;
}
@Override
public @NonNull MqttConnectionState connectionState() {
return connectionStateOverwrite;
}
}

View File

@@ -0,0 +1,47 @@
/**
* Copyright (c) 2010-2020 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.binding.mqtt.handler;
import java.util.concurrent.Semaphore;
import org.eclipse.jdt.annotation.NonNull;
import org.eclipse.jdt.annotation.Nullable;
import org.openhab.core.io.transport.mqtt.MqttConnectionObserver;
import org.openhab.core.io.transport.mqtt.MqttConnectionState;
/**
* We need an extended MqttConnectionObserverEx for testing if the state changes are coming in the right order.
*
* @author David Graeff - Initial contribution
*/
public class MqttConnectionObserverEx implements MqttConnectionObserver {
public int counter = 0;
public Semaphore semaphore = new Semaphore(1);
public MqttConnectionObserverEx() throws InterruptedException {
semaphore.acquire();
}
@Override
public void connectionStateChanged(@NonNull MqttConnectionState state, @Nullable Throwable error) {
// First we expect a CONNECTING state and then a DISCONNECTED state change
if (counter == 0 && state == MqttConnectionState.CONNECTING) {
counter = 1;
} else if (counter == 1 && state == MqttConnectionState.CONNECTED) {
counter = 2;
semaphore.release();
} else if (counter == 1 && state == MqttConnectionState.DISCONNECTED) {
counter = 2;
}
}
}

View File

@@ -0,0 +1,154 @@
/**
* Copyright (c) 2010-2020 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.binding.mqtt.internal;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.*;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.openhab.core.config.core.Configuration;
import org.openhab.core.thing.Bridge;
import org.openhab.core.thing.binding.ThingHandlerCallback;
import org.openhab.core.io.transport.mqtt.MqttException;
import org.openhab.core.io.transport.mqtt.MqttService;
import org.openhab.core.io.transport.mqtt.internal.Subscription;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.openhab.binding.mqtt.discovery.MQTTTopicDiscoveryParticipant;
import org.openhab.binding.mqtt.discovery.MQTTTopicDiscoveryService;
import org.openhab.binding.mqtt.handler.BrokerHandler;
import org.openhab.binding.mqtt.handler.BrokerHandlerEx;
import org.openhab.binding.mqtt.handler.MqttBrokerConnectionEx;
import org.osgi.service.cm.ConfigurationException;
/**
* Test cases for the {@link MQTTTopicDiscoveryService} service.
*
* @author David Graeff - Initial contribution
*/
public class MQTTTopicDiscoveryServiceTest {
private ScheduledExecutorService scheduler;
private MqttBrokerHandlerFactory subject;
@Mock
private MqttService mqttService;
@Mock
private Bridge thing;
@Mock
private ThingHandlerCallback callback;
@Mock
MQTTTopicDiscoveryParticipant listener;
private MqttBrokerConnectionEx connection;
private BrokerHandler handler;
@Before
public void setUp() throws ConfigurationException, MqttException {
scheduler = new ScheduledThreadPoolExecutor(1);
MockitoAnnotations.initMocks(this);
when(thing.getUID()).thenReturn(MqttThingID.getThingUID("10.10.0.10", 80));
connection = spy(new MqttBrokerConnectionEx("10.10.0.10", 80, false, "BrokerHandlerTest"));
connection.setTimeoutExecutor(scheduler, 10);
connection.setConnectionCallback(connection);
Configuration config = new Configuration();
config.put("host", "10.10.0.10");
config.put("port", 80);
when(thing.getConfiguration()).thenReturn(config);
handler = spy(new BrokerHandlerEx(thing, connection));
handler.setCallback(callback);
subject = new MqttBrokerHandlerFactory(mqttService);
}
@After
public void tearDown() {
scheduler.shutdownNow();
}
@Test
public void firstSubscribeThenHandler() {
handler.initialize();
BrokerHandlerEx.verifyCreateBrokerConnection(handler, 1);
subject.subscribe(listener, "topic");
subject.createdHandler(handler);
assertTrue(subject.discoveryTopics.get("topic").contains(listener));
// Simulate receiving
final byte[] bytes = "TEST".getBytes();
connection.getSubscribers().get("topic").forEach(s -> s.processMessage("topic", bytes));
verify(listener).receivedMessage(eq(thing.getUID()), eq(connection), eq("topic"), eq(bytes));
}
@Test
public void firstHandlerThenSubscribe() {
handler.initialize();
BrokerHandlerEx.verifyCreateBrokerConnection(handler, 1);
subject.createdHandler(handler);
subject.subscribe(listener, "topic");
assertTrue(subject.discoveryTopics.get("topic").contains(listener));
// Simulate receiving
final byte[] bytes = "TEST".getBytes();
connection.getSubscribers().get("topic").forEach(s -> s.processMessage("topic", bytes));
verify(listener).receivedMessage(eq(thing.getUID()), eq(connection), eq("topic"), eq(bytes));
}
@Test
public void handlerInitializeAfterSubscribe() {
subject.createdHandler(handler);
subject.subscribe(listener, "topic");
assertTrue(subject.discoveryTopics.get("topic").contains(listener));
// Init handler -> create connection
handler.initialize();
BrokerHandlerEx.verifyCreateBrokerConnection(handler, 1);
// Simulate receiving
final byte[] bytes = "TEST".getBytes();
connection.getSubscribers().getOrDefault("topic", new Subscription("topic"))
.forEach(s -> s.processMessage("topic", bytes));
verify(listener).receivedMessage(eq(thing.getUID()), eq(connection), eq("topic"), eq(bytes));
}
@Test
public void topicVanished() {
handler.initialize();
BrokerHandlerEx.verifyCreateBrokerConnection(handler, 1);
subject.createdHandler(handler);
subject.subscribe(listener, "topic");
assertTrue(subject.discoveryTopics.get("topic").contains(listener));
// Simulate receiving
final byte[] bytes = "".getBytes();
connection.getSubscribers().getOrDefault("topic", new Subscription("topic"))
.forEach(s -> s.processMessage("topic", bytes));
verify(listener).topicVanished(eq(thing.getUID()), eq(connection), eq("topic"));
}
}

View File

@@ -0,0 +1,82 @@
/**
* Copyright (c) 2010-2020 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.binding.mqtt.internal.discovery;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.*;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import javax.naming.ConfigurationException;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.openhab.binding.mqtt.MqttBindingConstants;
import org.openhab.core.config.discovery.DiscoveryListener;
import org.openhab.core.config.discovery.DiscoveryResult;
import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
import org.openhab.core.io.transport.mqtt.MqttService;
/**
* Tests cases for {@link org.openhab.binding.mqtt.internal.discovery.MqttServiceDiscoveryService}.
*
* @author David Graeff - Initial contribution
*/
public class ServiceDiscoveryServiceTest {
@Mock
private MqttService service;
@Mock
private DiscoveryListener discoverListener;
@Before
public void initMocks() throws ConfigurationException {
MockitoAnnotations.initMocks(this);
Map<String, MqttBrokerConnection> brokers = new TreeMap<>();
brokers.put("testname", new MqttBrokerConnection("tcp://123.123.123.123", null, false, null));
brokers.put("textual", new MqttBrokerConnection("tcp://123.123.123.123", null, true, null));
when(service.getAllBrokerConnections()).thenReturn(brokers);
}
@Test
public void testDiscovery() throws ConfigurationException {
// Setting the MqttService will enable the background scanner
MqttServiceDiscoveryService d = new MqttServiceDiscoveryService();
d.addDiscoveryListener(discoverListener);
d.setMqttService(service);
d.startScan();
// We expect 3 discoveries. An embedded thing, a textual configured one, a non-textual one
ArgumentCaptor<DiscoveryResult> discoveryCapture = ArgumentCaptor.forClass(DiscoveryResult.class);
verify(discoverListener, times(2)).thingDiscovered(eq(d), discoveryCapture.capture());
List<DiscoveryResult> discoveryResults = discoveryCapture.getAllValues();
assertThat(discoveryResults.size(), is(2));
assertThat(discoveryResults.get(0).getThingTypeUID(), is(MqttBindingConstants.BRIDGE_TYPE_SYSTEMBROKER));
assertThat(discoveryResults.get(1).getThingTypeUID(), is(MqttBindingConstants.BRIDGE_TYPE_SYSTEMBROKER));
// Add another thing
d.brokerAdded("anotherone", new MqttBrokerConnection("tcp://123.123.123.123", null, false, null));
discoveryCapture = ArgumentCaptor.forClass(DiscoveryResult.class);
verify(discoverListener, times(3)).thingDiscovered(eq(d), discoveryCapture.capture());
discoveryResults = discoveryCapture.getAllValues();
assertThat(discoveryResults.size(), is(3));
assertThat(discoveryResults.get(2).getThingTypeUID(), is(MqttBindingConstants.BRIDGE_TYPE_SYSTEMBROKER));
}
}

View File

@@ -0,0 +1,177 @@
/**
* Copyright (c) 2010-2020 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.binding.mqtt.internal.ssl;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.*;
import java.io.FileNotFoundException;
import java.io.InputStream;
import java.security.NoSuchAlgorithmException;
import java.security.PublicKey;
import java.security.cert.CertificateException;
import java.security.cert.CertificateFactory;
import java.security.cert.X509Certificate;
import org.eclipse.jdt.annotation.NonNull;
import org.junit.Test;
import org.openhab.core.util.HexUtils;
/**
* Tests cases for {@link PinTrustManager}.
*
* @author David Graeff - Initial contribution
*/
public class PinningSSLContextProviderTest {
@Test
public void getDigestDataFor() throws NoSuchAlgorithmException, CertificateException, FileNotFoundException {
// Load test certificate
InputStream inputCert = getClass().getResourceAsStream("cert.pem");
X509Certificate certificate = (X509Certificate) CertificateFactory.getInstance("X.509")
.generateCertificate(inputCert);
PinTrustManager pinTrustManager = new PinTrustManager();
PinMessageDigest pinMessageDigest = pinTrustManager.getMessageDigestForSigAlg(certificate.getSigAlgName());
String hashForCert = HexUtils
.bytesToHex(pinMessageDigest.digest(pinTrustManager.getEncoded(PinType.CERTIFICATE_TYPE, certificate)));
String expectedHash = "41fa6d40d1e8f53ac81a395ac13b1efa10917718f1ebe3ac278925716d630b72".toUpperCase();
assertThat(hashForCert, is(expectedHash));
String hashForPublicKey = HexUtils
.bytesToHex(pinMessageDigest.digest(pinTrustManager.getEncoded(PinType.PUBLIC_KEY_TYPE, certificate)));
String expectedPubKeyHash = "9a6f30e67ae9723579da2575c35daf7da3b370b04ac0bde031f5e1f5e4617eb8".toUpperCase();
assertThat(hashForPublicKey, is(expectedPubKeyHash));
}
// Test if X509Certificate.getEncoded() is called if it is a certificate pin and
// X509Certificate.getPublicKey().getEncoded() is called if it is a public key pinning.
@Test
public void certPinCallsX509CertificateGetEncoded() throws NoSuchAlgorithmException, CertificateException {
PinTrustManager pinTrustManager = new PinTrustManager();
pinTrustManager.addPinning(Pin.LearningPin(PinType.CERTIFICATE_TYPE));
// Mock a certificate
X509Certificate certificate = mock(X509Certificate.class);
when(certificate.getEncoded()).thenReturn(new byte[0]);
when(certificate.getSigAlgName()).thenReturn("SHA256withRSA");
pinTrustManager.checkServerTrusted(new X509Certificate[] { certificate }, null);
verify(certificate).getEncoded();
}
// Test if X509Certificate.getEncoded() is called if it is a certificate pin and
// X509Certificate.getPublicKey().getEncoded() is called if it is a public key pinning.
@Test
public void pubKeyPinCallsX509CertificateGetPublicKey() throws NoSuchAlgorithmException, CertificateException {
PinTrustManager pinTrustManager = new PinTrustManager();
pinTrustManager.addPinning(Pin.LearningPin(PinType.PUBLIC_KEY_TYPE));
// Mock a certificate
PublicKey publicKey = mock(PublicKey.class);
when(publicKey.getEncoded()).thenReturn(new byte[0]);
X509Certificate certificate = mock(X509Certificate.class);
when(certificate.getSigAlgName()).thenReturn("SHA256withRSA");
when(certificate.getPublicKey()).thenReturn(publicKey);
pinTrustManager.checkServerTrusted(new X509Certificate[] { certificate }, null);
verify(publicKey).getEncoded();
}
/**
* Overwrite {@link #getMessageDigestForSigAlg(String)} method and return a pre-defined {@link PinMessageDigest}.
*/
public static class PinTrustManagerEx extends PinTrustManager {
private final PinMessageDigest pinMessageDigest;
PinTrustManagerEx(PinMessageDigest pinMessageDigest) {
this.pinMessageDigest = pinMessageDigest;
}
@Override
@NonNull
PinMessageDigest getMessageDigestForSigAlg(@NonNull String sigAlg) throws CertificateException {
return pinMessageDigest;
}
}
@Test
public void learningMode() throws NoSuchAlgorithmException, CertificateException {
PinMessageDigest pinMessageDigest = new PinMessageDigest("SHA-256");
PinTrustManager pinTrustManager = new PinTrustManagerEx(pinMessageDigest);
byte[] testCert = { 1, 2, 3 };
byte[] digestOfTestCert = pinMessageDigest.digest(testCert);
// Add a certificate pin in learning mode to a trust manager
Pin pin = Pin.LearningPin(PinType.CERTIFICATE_TYPE);
pinTrustManager.addPinning(pin);
assertThat(pinTrustManager.pins.size(), is(1));
// Mock a callback
PinnedCallback callback = mock(PinnedCallback.class);
pinTrustManager.setCallback(callback);
// Mock a certificate
X509Certificate certificate = mock(X509Certificate.class);
when(certificate.getEncoded()).thenReturn(testCert);
when(certificate.getSigAlgName()).thenReturn("SHA256withRSA");
// Perform an SSL certificate check
pinTrustManager.checkServerTrusted(new X509Certificate[] { certificate }, null);
// After a first connect learning mode should turn into check mode. It should have learned the hash data and
// message digest, returned by PinTrustManager.getMessageDigestForSigAlg().
assertThat(pin.learning, is(false));
assertThat(pin.pinData, is(digestOfTestCert));
assertThat(pin.hashDigest, is(pinMessageDigest));
// We expect callbacks
verify(callback).pinnedLearnedHash(eq(pin));
verify(callback).pinnedConnectionAccepted();
}
@Test
public void checkMode() throws NoSuchAlgorithmException, CertificateException {
PinTrustManager pinTrustManager = new PinTrustManager();
PinMessageDigest pinMessageDigest = new PinMessageDigest("SHA-256");
byte[] testCert = { 1, 2, 3 };
byte[] digestOfTestCert = pinMessageDigest.digest(testCert);
// Add a certificate pin in checking mode to a trust manager
Pin pin = Pin.CheckingPin(PinType.CERTIFICATE_TYPE, pinMessageDigest, digestOfTestCert);
pinTrustManager.addPinning(pin);
assertThat(pinTrustManager.pins.size(), is(1));
// Mock a callback
PinnedCallback callback = mock(PinnedCallback.class);
pinTrustManager.setCallback(callback);
// Mock a certificate
X509Certificate certificate = mock(X509Certificate.class);
when(certificate.getEncoded()).thenReturn(testCert);
when(certificate.getSigAlgName()).thenReturn("SHA256withRSA");
// Perform an SSL certificate check
pinTrustManager.checkServerTrusted(new X509Certificate[] { certificate }, null);
// After a first connect learning mode should turn into check mode
assertThat(pin.learning, is(false));
assertThat(pin.pinData, is(digestOfTestCert));
assertThat(pin.hashDigest, is(pinMessageDigest));
// We expect callbacks
verify(callback, times(0)).pinnedLearnedHash(eq(pin));
verify(callback).pinnedConnectionAccepted();
}
}

View File

@@ -0,0 +1,17 @@
-----BEGIN CERTIFICATE-----
MIICtTCCAh6gAwIBAgIJAP3uXk/Ty+/kMA0GCSqGSIb3DQEBCwUAMHIxCzAJBgNV
BAYTAkRFMRIwEAYDVQQIDAlUZXN0U3RhdGUxETAPBgNVBAcMCFRlc3RDaXR5MRow
GAYDVQQKDBFFY2xpcHNlIFNtYXJ0aG9tZTEgMB4GCSqGSIb3DQEJARYRdGVzdG1h
aWxAdGVzdC50c3QwHhcNMTcwNzE4MDkxOTIyWhcNMTgwNzE4MDkxOTIyWjByMQsw
CQYDVQQGEwJERTESMBAGA1UECAwJVGVzdFN0YXRlMREwDwYDVQQHDAhUZXN0Q2l0
eTEaMBgGA1UECgwRRWNsaXBzZSBTbWFydGhvbWUxIDAeBgkqhkiG9w0BCQEWEXRl
c3RtYWlsQHRlc3QudHN0MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDWITto
A/kssy/fLcUA+6gTFVhvtaZpNdFIYFXx2xJVx0Zoh7AHa2jraPmMXIZKtJN1Ylga
kB4MJAheeZic08FccxAK057+3xQGpmRyNm26vNx205TPshzrxRQ6Q5mM92habhli
V0MBy92vPUMoxydUE9Exa1cLRA9MzHRqfzB5XQIDAQABo1MwUTAdBgNVHQ4EFgQU
azTlD8frRKkVB4t1FhjQjE6fx+AwHwYDVR0jBBgwFoAUazTlD8frRKkVB4t1FhjQ
jE6fx+AwDwYDVR0TAQH/BAUwAwEB/zANBgkqhkiG9w0BAQsFAAOBgQAelsTWJSD4
mf8w/eOeZmS6VlfaJiRycOgWNufFMMR8YDOLLX1Mw0sqOmeis3XsSXgMXWNw2nfH
h4bstGuHM61ibs48hu/Pnk5qxg56tb7CCBD/tdoIqVH2yIytCIG9uXeukIPjFnaw
EJOuwnkIo8bYKL8VRq/d8ALF1Q9dg6Z+9Q==
-----END CERTIFICATE-----

View File

@@ -0,0 +1,16 @@
-----BEGIN PRIVATE KEY-----
MIICeAIBADANBgkqhkiG9w0BAQEFAASCAmIwggJeAgEAAoGBANYhO2gD+SyzL98t
xQD7qBMVWG+1pmk10UhgVfHbElXHRmiHsAdraOto+Yxchkq0k3ViWBqQHgwkCF55
mJzTwVxzEArTnv7fFAamZHI2bbq83HbTlM+yHOvFFDpDmYz3aFpuGWJXQwHL3a89
QyjHJ1QT0TFrVwtED0zMdGp/MHldAgMBAAECgYAd5jAEWyGs4yxZDmwGsh0K5R0f
JA8je7dIUuNNTRinT5b+O4wRzSauUE8gET9TKRm590x0ERGRAmsEvhfYNh02iRkP
OaOyUZXOsXVExQbwYr+Bodbi65Ql7J2mXaRXkK0xXSzRnfKpgpshUOeiClu1LnwA
IyxhsmoS7ZbJPK7NhQJBAO6MEYXbQK7EZ747FmHqYIpmYbZF7yB3bGWoe+4QdtYV
NCPBoGDkvcXbIfaonCVkSV+oXnEvyqNPOstoFi6WTI8CQQDly9UyRG3ywyZpJ96i
I1u3bgsBfILlNMWYyB4j//Jgo6H8ZiuvzqgjiKN8rlRKUZ2fnuO6uCOYcrWq8uel
gWlTAkEAr3IzXRjR7PglOSNqJd/k20XLreynoGBVSDtv0rsnO/NiYr4BP+JctQ2j
YC/IkDO/R2yk8WhuCEi4fGv0jJUcfwJBAMOqUudRYvkxd7RUMXqHduHyPkbOuTnn
PFUCGL/4gG4PBq++Y3Z4Fazj/Kj+W2FIq1kt1qS3g/+btNpRqDLBxWcCQQCVpToA
pj3iwnJE5ex5ll2yg+rlwlJBVt3NE/Yl+eu35KPgY5P0+ePbv+4s4IP03/sWUPwk
8IKEK6CWT39Dbem7
-----END PRIVATE KEY-----

View File

@@ -0,0 +1,47 @@
## Secure connection
In a first example a very secure connection to a broker is defined. It pins the returned certificate and public key.
If someone tries a man in the middle attack later on, this broker connection will recognize it and refuse a connection.
Be aware that if your brokers certificate changes, you need to remove the connection entry and add it again.
`mqttConnections.things`:
```xtend
mqtt:broker:mySecureBroker [ host="192.168.0.41", secure=true, certificatepin=true, publickeypin=true ]
```
## Plain, unsecured connection
The second connection is a plain, unsecured one. Unsecure connections are default, if you do not provide the "secure" parameter. Use this only for local MQTT Brokers.
`mqttConnections.things`:
```xtend
mqtt:broker:myUnsecureBroker [ host="192.168.0.42", secure=false ]
```
## Authentication with username and password
A third connection uses a username and password for authentication.
Secure is set to false in this example. This is a bad idea!
The credentials are plain values on the wire, therefore you should only use this on a secure connection.
`mqttConnections.things`:
```xtend
mqtt:broker:myAuthentificatedBroker [ host="192.168.0.43", secure=false, username="user", password="password" ]
```
## Public key pinning
In a fourth connection, the public key pinning is enabled again.
This time, a public key hash is provided to pin the connection to a specific server.
It follows the form "hashname:hashvalue". Valid *hashnames* are SHA-1, SHA-224, SHA-256, SHA-384, SHA-512 and all others listed
in [Java MessageDigest Algorithms](https://docs.oracle.com/javase/9/docs/specs/security/standard-names.html#messagedigest-algorithms).
`mqttConnections.things`:
```xtend
mqtt:broker:pinToPublicKey [ host="192.168.0.44", secure=true, publickeypin=true, publickey="SHA-256:9a6f30e67ae9723579da2575c35daf7da3b370b04ac0bde031f5e1f5e4617eb8" ]
```