removed embedded broker (#8649)

Also-by: Wouter Born <github@maindrain.net>
Signed-off-by: Kai Kreuzer <kai@openhab.org>
This commit is contained in:
Kai Kreuzer
2020-10-04 13:43:44 +02:00
committed by GitHub
parent f24f4971d6
commit 112f46c95e
28 changed files with 44 additions and 1533 deletions

View File

@@ -1,13 +0,0 @@
This content is produced and maintained by the openHAB project.
* Project home: https://www.openhab.org
== Declared Project Licenses
This program and the accompanying materials are made available under the terms
of the Eclipse Public License 2.0 which is available at
https://www.eclipse.org/legal/epl-2.0/.
== Source Code
https://github.com/openhab/openhab-addons

View File

@@ -1,42 +0,0 @@
# MQTT Broker Moquette
**Attention:** Moquette is no longer maintained upstream and this add-on is therefore considered deprecated in openHAB.
It is recommended to switch to an external broker like mosquitto.
The Moquette add-on will be removed in a future release of openHAB.
MQTT is a machine-to-machine (M2M)/"Internet of Things" connectivity protocol.
It was designed as an extremely lightweight publish/subscribe messaging transport.
To allow MQTT capable devices to communicate with each other you need a software called MQTT Broker.
You can either install one of the many MQTT Broker offerings like the free [Mosquitto](https://mosquitto.org/) broker or use this pre-configured [Moquette](https://github.com/andsel/moquette) broker.
Check your Inbox after installation.
## Service Configuration
All parameters are optional and can be set by file or the REST interface (user-interfaces).
* __port__: The port, the embedded broker should run on. Defaults to not set, which means the typical ports 1883 and 8883 (SSL) are used.
* __username__: The user name that clients need to provide to connect to this broker.
* __password__: The password that clients need to provide to connect to this broker.
* __secure__: If set, hosts a secure SSL connection on port 8883 or otherwise a non secure connection on port 1883 (if not overwritten by the port parameter).
* __persistence_file__: An optional persistence file. Retained messages are stored in this file. Can be empty to not store anything. The default is "userdata/mqttembedded.bin". If it starts with "/" on Linux/macOS or with a drive letter and colon (eg "c:/") it will be treated as an absolute path. Be careful to select a path that you have write access to.
## TLS connections
The keystore that is included to allow to start a TLS encrypted connection is generated by:
```
keytool -genkey -v -keystore serverkeystore.keystore -alias main -keyalg RSA -keysize 2048 -validity 100000 -deststoretype pkcs12
```
The keystore is embedded into the bundle and cannot be replaced.
## Plans for the future
* The moquette MQTT broker supports ACL (access control lists), so allows to restrict read/write access per topic per user or client id. That need to be exposed as configuration values at some point.
* Multiple users are supported by the broker software. openHAB does not yet have a user management though.
* The keystore need to be replaceable as soon as openHAB gained a way to configure SSL truststores framework wide.

View File

@@ -1,118 +0,0 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.openhab.addons.bundles</groupId>
<artifactId>org.openhab.addons.reactor.bundles</artifactId>
<version>3.0.0-SNAPSHOT</version>
</parent>
<artifactId>org.openhab.io.mqttembeddedbroker</artifactId>
<name>openHAB Add-ons :: Bundles :: IO :: MQTT Broker Moquette</name>
<properties>
<netty.version>4.1.42.Final</netty.version>
<bnd.importpackage>com.bugsnag.*;resolution:="optional",com.codahale.metrics.*;resolution:="optional",com.librato.metrics.reporter.*;resolution:="optional",com.zaxxer.hikari.*;resolution:="optional",io.netty.channel.epoll.*;resolution:="optional",io.netty.handler.codec.http.*;resolution:="optional"</bnd.importpackage>
<dep.noembedding>commons-codec,h2-mvstore,netty-common,netty-buffer,netty-transport,netty-codec,netty-codec-mqtt,netty-resolver,netty-handler</dep.noembedding>
</properties>
<dependencies>
<dependency>
<groupId>com.github.j-n-k</groupId>
<artifactId>moquette-broker</artifactId>
<version>0.13.0.OH3</version>
<scope>compile</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- https://mvnrepository.com/artifact/com.h2database/h2-mvstore -->
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2-mvstore</artifactId>
<version>1.4.199</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
<version>${netty.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
<version>${netty.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport</artifactId>
<version>${netty.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec</artifactId>
<version>${netty.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-mqtt</artifactId>
<version>${netty.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-resolver</artifactId>
<version>${netty.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
<version>${netty.version}</version>
<scope>compile</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<executions>
<execution>
<id>reserve-network-port</id>
<goals>
<goal>reserve-network-port</goal>
</goals>
<phase>process-resources</phase>
<configuration>
<portNames>
<portName>mqttembeddedbroker.port</portName>
</portNames>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@@ -1,13 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<features name="org.openhab.io.mqttembeddedbroker-${project.version}" xmlns="http://karaf.apache.org/xmlns/features/v1.4.0">
<repository>mvn:org.openhab.core.features.karaf/org.openhab.core.features.karaf.openhab-core/${ohc.version}/xml/features</repository>
<feature name="openhab-misc-mqttbroker" description="MQTT Broker Moquette" version="${project.version}">
<feature>openhab-runtime-base</feature>
<feature>openhab-transport-mqtt</feature>
<feature>openhab.tp-netty</feature>
<bundle dependency="true">mvn:com.h2database/h2-mvstore/1.4.199</bundle>
<bundle dependency="true">mvn:commons-codec/commons-codec/1.10</bundle>
<bundle start-level="75">mvn:org.openhab.addons.bundles/org.openhab.io.mqttembeddedbroker/${project.version}</bundle>
</feature>
</features>

View File

@@ -1,39 +0,0 @@
/**
* Copyright (c) 2010-2020 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.io.mqttembeddedbroker;
/**
* MQTT embedded broker constants
*
* @author David Graeff - Initial contribution
*/
public class Constants {
/**
* The broker connection client ID. You can request the embedded broker connection via the MqttService:
*
* <pre>
* MqttBrokerConnection c = mqttService.getBrokerConnection(Constants.CLIENTID);
* </pre>
*/
public static final String CLIENTID = "embedded-mqtt-broker";
/**
* The broker persistent identifier used for identifying configurations.
*/
public static final String PID = "org.openhab.core.mqttembeddedbroker";
/**
* The configuration key used for configuring the embedded broker port.
*/
public static final String PORT = "port";
}

View File

@@ -1,395 +0,0 @@
/**
* Copyright (c) 2010-2020 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.io.mqttembeddedbroker.internal;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.net.ssl.KeyManagerFactory;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.openhab.core.OpenHAB;
import org.openhab.core.config.core.ConfigurableService;
import org.openhab.core.config.core.Configuration;
import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
import org.openhab.core.io.transport.mqtt.MqttConnectionObserver;
import org.openhab.core.io.transport.mqtt.MqttConnectionState;
import org.openhab.core.io.transport.mqtt.MqttService;
import org.openhab.core.io.transport.mqtt.MqttServiceObserver;
import org.openhab.io.mqttembeddedbroker.Constants;
import org.openhab.io.mqttembeddedbroker.internal.MqttEmbeddedBrokerDetectStart.MqttEmbeddedBrokerStartedListener;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Modified;
import org.osgi.service.component.annotations.Reference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.moquette.BrokerConstants;
import io.moquette.broker.ISslContextCreator;
import io.moquette.broker.Server;
import io.moquette.broker.config.MemoryConfig;
import io.moquette.broker.security.IAuthenticator;
import io.moquette.broker.security.IAuthorizatorPolicy;
import io.moquette.interception.InterceptHandler;
import io.moquette.interception.messages.InterceptAcknowledgedMessage;
import io.moquette.interception.messages.InterceptConnectMessage;
import io.moquette.interception.messages.InterceptConnectionLostMessage;
import io.moquette.interception.messages.InterceptDisconnectMessage;
import io.moquette.interception.messages.InterceptPublishMessage;
import io.moquette.interception.messages.InterceptSubscribeMessage;
import io.moquette.interception.messages.InterceptUnsubscribeMessage;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
/**
* The {@link EmbeddedBrokerService} starts the embedded broker, creates a
* {@link MqttBrokerConnection} and adds it to the {@link MqttService}.
* <p>
* For now tls connections are offered with an accept-all trust manager
* and a predefined keystore if "secure" is set to true.
*
* @author David Graeff - Initial contribution
*/
@Component(service = EmbeddedBrokerService.class, configurationPid = Constants.PID, //
property = org.osgi.framework.Constants.SERVICE_PID + "=" + Constants.PID)
@ConfigurableService(category = "MQTT", label = "MQTT Embedded Broker", description_uri = "mqtt:mqttembeddedbroker")
@NonNullByDefault
public class EmbeddedBrokerService
implements MqttConnectionObserver, MqttServiceObserver, MqttEmbeddedBrokerStartedListener {
private final MqttService service;
private String persistenceFilename = "";
// private NetworkServerTls networkServerTls; //TODO wait for NetworkServerTls implementation
@NonNullByDefault({})
class BrokerMetricsListenerEx implements InterceptHandler {
@Override
public String getID() {
return "logger";
}
@Override
public Class<?>[] getInterceptedMessageTypes() {
return new Class<?>[] { InterceptConnectMessage.class, InterceptDisconnectMessage.class };
}
@Override
public void onConnect(InterceptConnectMessage arg0) {
logger.debug("MQTT Client connected: {}", arg0.getClientID());
}
@Override
public void onConnectionLost(InterceptConnectionLostMessage arg0) {
}
@Override
public void onDisconnect(InterceptDisconnectMessage arg0) {
logger.debug("MQTT Client disconnected: {}", arg0.getClientID());
}
@Override
public void onMessageAcknowledged(InterceptAcknowledgedMessage arg0) {
}
@Override
public void onPublish(InterceptPublishMessage arg0) {
}
@Override
public void onSubscribe(InterceptSubscribeMessage arg0) {
}
@Override
public void onUnsubscribe(InterceptUnsubscribeMessage arg0) {
}
}
protected @Nullable Server server;
private final Logger logger = LoggerFactory.getLogger(EmbeddedBrokerService.class);
protected MqttEmbeddedBrokerDetectStart detectStart = new MqttEmbeddedBrokerDetectStart(this);
protected BrokerMetricsListenerEx metrics = new BrokerMetricsListenerEx();
private @Nullable MqttBrokerConnection connection;
@Activate
public EmbeddedBrokerService(@Reference MqttService mqttService, Map<String, Object> configuration)
throws IOException {
this.service = mqttService;
initialize(configuration);
}
@Modified
public void modified(Map<String, Object> configuration) throws IOException {
deactivate();
initialize(configuration);
}
public void initialize(Map<String, Object> configuration) throws IOException {
ServiceConfiguration config = new Configuration(configuration).as(ServiceConfiguration.class);
int port = config.port == null ? (config.port = config.secure ? 8883 : 1883) : config.port;
// Create MqttBrokerConnection
connection = service.getBrokerConnection(Constants.CLIENTID);
if (connection != null) {
// Close the existing connection and remove it from the service
connection.stop();
service.removeBrokerConnection(Constants.CLIENTID);
}
connection = new MqttBrokerConnection("localhost", config.port, config.secure, Constants.CLIENTID);
connection.addConnectionObserver(this);
if (config.username != null) {
connection.setCredentials(config.username, config.password);
}
if (!config.persistenceFile.isEmpty()) {
final String persistenceFilename = config.persistenceFile;
if (!Paths.get(persistenceFilename).isAbsolute()) {
Path path = Paths.get(OpenHAB.getUserDataFolder()).toAbsolutePath();
Files.createDirectories(path);
this.persistenceFilename = path.resolve(persistenceFilename).toString();
}
logger.info("Broker persistence file: {}", persistenceFilename);
} else {
logger.info("Using in-memory persistence. No persistence file has been set!");
}
// Start embedded server
startEmbeddedServer(port, config.secure, config.username, config.password);
}
@Deactivate
public void deactivate() {
if (service != null) {
service.removeBrokersListener(this);
}
MqttBrokerConnection connection = this.connection;
if (connection == null) {
if (server != null) {
server.stopServer();
}
server = null;
return;
}
// Clean shutdown: Stop connection, wait for process to finish, shutdown server
connection.removeConnectionObserver(this);
try {
connection.stop().thenRun(() -> {
if (server != null) {
server.stopServer();
server = null;
}
}).get(10, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
logger.warn("Could not cleanly shutdown connection or server.", e);
}
connection = null;
}
@Override
public void brokerAdded(String brokerID, MqttBrokerConnection broker) {
}
@SuppressWarnings("null")
@Override
public void brokerRemoved(String brokerID, MqttBrokerConnection broker) {
// Do not allow this connection to be removed. Add it again.
if (broker.equals(connection)) {
service.addBrokerConnection(brokerID, broker);
}
}
/**
* For TLS connections we need to setup a keystore and provide Moquette/Netty with an {@link SslContext}.
* <p>
* If a context is requested by Moquette, this creator
* will use the bundled "serverkeystore.keystore" with password "openhab".
*
* @return An SslContext creator (not be confused with javas SSLContext).
*/
ISslContextCreator nettySSLcontextCreator() {
return () -> {
try {
InputStream inputStream = getClass().getClassLoader().getResourceAsStream("serverkeystore.keystore");
KeyStore keyStore = KeyStore.getInstance("jks");
keyStore.load(inputStream, "openhab".toCharArray());
KeyManagerFactory factory = KeyManagerFactory.getInstance("SunX509");
factory.init(keyStore, "openhab".toCharArray());
return SslContextBuilder.forServer(factory).build();
} catch (NoSuchAlgorithmException | CertificateException | IOException | KeyStoreException
| UnrecoverableKeyException e) {
logger.warn("Failed to create an SSL context");
return null;
}
};
}
public void startEmbeddedServer(@Nullable Integer portParam, boolean secure, @Nullable String username,
@Nullable String password) throws IOException {
Server server = new Server();
Properties properties = new Properties();
// Host and port
properties.put(BrokerConstants.HOST_PROPERTY_NAME, "0.0.0.0");
int port;
if (secure) {
port = (portParam == null) ? port = 8883 : portParam;
properties.put(BrokerConstants.SSL_PORT_PROPERTY_NAME, Integer.toString(port));
properties.put(BrokerConstants.PORT_PROPERTY_NAME, BrokerConstants.DISABLED_PORT_BIND);
properties.put(BrokerConstants.KEY_MANAGER_PASSWORD_PROPERTY_NAME, "esheshesh");
properties.put(BrokerConstants.JKS_PATH_PROPERTY_NAME, "serverkeystore.jks");
} else {
port = (portParam == null) ? port = 1883 : portParam;
// with SSL_PORT_PROPERTY_NAME set, netty tries to evaluate the SSL context and shuts down immediately.
// properties.put(BrokerConstants.SSL_PORT_PROPERTY_NAME, BrokerConstants.DISABLED_PORT_BIND);
properties.put(BrokerConstants.PORT_PROPERTY_NAME, Integer.toString(port));
}
// Authentication
IAuthenticator authentificator = null;
if (username != null && password != null && username.length() > 0 && password.length() > 0) {
properties.setProperty(BrokerConstants.ALLOW_ANONYMOUS_PROPERTY_NAME, Boolean.FALSE.toString());
properties.put(BrokerConstants.AUTHENTICATOR_CLASS_NAME,
MqttEmbeddedBrokerUserAuthenticator.class.getName());
authentificator = new MqttEmbeddedBrokerUserAuthenticator(username, password.getBytes());
logger.debug("Broker authentication is enabled");
} else {
properties.put(BrokerConstants.ALLOW_ANONYMOUS_PROPERTY_NAME, Boolean.TRUE.toString());
logger.debug("Broker anonymous access enabled");
}
if (!persistenceFilename.isEmpty()) { // Persistence: If not set, an in-memory database is used.
properties.put(BrokerConstants.PERSISTENT_STORE_PROPERTY_NAME, persistenceFilename);
properties.put(BrokerConstants.AUTOSAVE_INTERVAL_PROPERTY_NAME, "30"); // in seconds
}
// We may provide ACL functionality at some point as well
IAuthorizatorPolicy authorizer = null;
ISslContextCreator sslContextCreator = secure ? nettySSLcontextCreator() : null;
try {
server.startServer(new MemoryConfig(properties), null, sslContextCreator, authentificator, authorizer);
} catch (IllegalArgumentException e) {
if (e.getMessage().contains("Could not deserialize")) {
Path persistenceFilePath = Paths.get((new File(persistenceFilename)).getAbsolutePath());
logger.warn("persistence corrupt: {}, deleting {}", e.getMessage(), persistenceFilePath);
Files.delete(persistenceFilePath);
// retry starting broker, if it fails again, don't catch exception
server.startServer(new MemoryConfig(properties), null, sslContextCreator, authentificator, authorizer);
}
} catch (Exception e) {
logger.warn("Failed to start embedded MQTT server: {}", e.getMessage());
server.stopServer();
return;
}
this.server = server;
server.addInterceptHandler(metrics);
ScheduledExecutorService s = new ScheduledThreadPoolExecutor(1);
detectStart.startBrokerStartedDetection(port, s);
}
public void stopEmbeddedServer() {
Server server = this.server;
if (server != null) {
server.removeInterceptHandler(metrics);
detectStart.stopBrokerStartDetection();
server.stopServer();
this.server = null;
}
}
/**
* For testing: Returns true if the embedded server confirms that the MqttBrokerConnection is connected.
*/
protected boolean serverConfirmsEmbeddedClient() {
return server != null && server.listConnectedClients().stream()
.anyMatch(client -> Constants.CLIENTID.equals(client.getClientID()));
}
@Override
public void connectionStateChanged(MqttConnectionState state, @Nullable Throwable error) {
if (state == MqttConnectionState.CONNECTED) {
logger.debug("Embedded broker connection connected");
} else if (state == MqttConnectionState.CONNECTING) {
logger.debug("Embedded broker connection still connecting");
} else {
if (error == null) {
logger.warn("Embedded broker offline - Reason unknown");
} else {
logger.warn("Embedded broker offline", error);
}
}
if (state != MqttConnectionState.CONNECTED && state != MqttConnectionState.CONNECTING) {
stopEmbeddedServer();
}
}
/**
* The callback from the detectStart.startBrokerStartedDetection() call within
* {@link #startEmbeddedServer(Integer, boolean, String, String, String)}.
*/
@Override
public void mqttEmbeddedBrokerStarted(boolean timeout) {
MqttBrokerConnection connection = this.connection;
MqttService service = this.service;
if (connection == null || service == null) {
return;
}
service.addBrokerConnection(Constants.CLIENTID, connection);
connection.start().exceptionally(e -> {
connectionStateChanged(MqttConnectionState.DISCONNECTED, e);
return false;
}).thenAccept(v -> {
if (!v) {
connectionStateChanged(MqttConnectionState.DISCONNECTED, new TimeoutException("Timeout"));
}
});
}
public @Nullable MqttBrokerConnection getConnection() {
return connection;
}
public String getPersistenceFilename() {
return persistenceFilename;
}
public void setPersistenceFilename(String persistenceFilename) {
this.persistenceFilename = persistenceFilename;
}
}

View File

@@ -1,111 +0,0 @@
/**
* Copyright (c) 2010-2020 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.io.mqttembeddedbroker.internal;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import io.moquette.broker.Server;
/**
* Unfortunately there is no listener interface for the Moquette MQTT Broker
* to get notified when it is started and ready to accept connections.
* We therefore try to connect to the socket with a Socket object until a timeout is reached.
*
* @author David Graeff - Inital contriution
*/
@NonNullByDefault
public class MqttEmbeddedBrokerDetectStart {
protected @Nullable Server server;
protected final MqttEmbeddedBrokerStartedListener startedListener;
protected long startTime;
protected int port;
protected int timeout = 2000;
protected @Nullable ScheduledExecutorService scheduler;
protected @Nullable ScheduledFuture<?> schedule;
/**
* Implement this interface to be notified if a connection to the given tcp port can be established.
*/
public static interface MqttEmbeddedBrokerStartedListener {
public void mqttEmbeddedBrokerStarted(boolean timeout);
}
/**
* Registers the given listener. Start with {@link #startBrokerStartedDetection(int, ScheduledExecutorService)}.
*
* @param startedListener A listener
*/
public MqttEmbeddedBrokerDetectStart(MqttEmbeddedBrokerStartedListener startedListener) {
this.startedListener = startedListener;
}
/**
* Performs a tcp socket open/close process. Will notify the registered listener on success
* and retry until a timeout is reached otherwise.
*/
protected void servicePing() {
ScheduledExecutorService scheduler = this.scheduler;
if (scheduler == null) {
return;
}
try {
SocketAddress socketAddress = new InetSocketAddress("127.0.0.1", port);
Socket socket = new Socket();
socket.connect(socketAddress, 500);
socket.close();
schedule = null;
startedListener.mqttEmbeddedBrokerStarted(false);
return;
} catch (IOException ignored) {
}
if (System.currentTimeMillis() - startTime < timeout) {
schedule = scheduler.schedule(() -> servicePing(), 100, TimeUnit.MILLISECONDS);
} else {
startedListener.mqttEmbeddedBrokerStarted(true);
}
}
/**
* Start the broker server reachable detection
*
* @param port The Mqtt Server port
* @param scheduler A scheduler
*/
public void startBrokerStartedDetection(int port, ScheduledExecutorService scheduler) {
this.port = port;
this.scheduler = scheduler;
this.startTime = System.currentTimeMillis();
this.schedule = null;
servicePing();
}
/**
* Stops the broker server reachable detection if it is still running.
*/
public void stopBrokerStartDetection() {
if (schedule != null) {
schedule.cancel(true);
schedule = null;
}
}
}

View File

@@ -1,42 +0,0 @@
/**
* Copyright (c) 2010-2020 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.io.mqttembeddedbroker.internal;
import java.util.Arrays;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import io.moquette.broker.security.IAuthenticator;
/**
* Provides a {@link IAuthenticator} for the Moquette server, that accepts given user name and password.
* If ESH gains user credentials at some point, those should be accepted as well.
*
* @author David Graeff - Initial contribution
*/
@NonNullByDefault
public class MqttEmbeddedBrokerUserAuthenticator implements IAuthenticator {
final String username;
final byte[] password;
public MqttEmbeddedBrokerUserAuthenticator(String username, byte[] password) {
this.username = username;
this.password = password;
}
@Override
public boolean checkValid(@Nullable String clientId, @Nullable String username, byte @Nullable [] password) {
return this.username.equals(username) && Arrays.equals(this.password, password);
}
}

View File

@@ -1,31 +0,0 @@
/**
* Copyright (c) 2010-2020 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.io.mqttembeddedbroker.internal;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
/**
* Configuration of the {@link EmbeddedBrokerService}.
*
* @author David Graeff - Initial contribution
*/
@NonNullByDefault
public class ServiceConfiguration {
public @Nullable Integer port;
public Boolean secure = false;
public String persistenceFile = "mqttembedded.bin";
public @Nullable String username;
public @Nullable String password;
}

View File

@@ -1,43 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<config-description:config-descriptions
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:config-description="https://openhab.org/schemas/config-description/v1.0.0"
xsi:schemaLocation="https://openhab.org/schemas/config-description/v1.0.0
https://openhab.org/schemas/config-description-1.0.0.xsd">
<config-description uri="mqtt:mqttembeddedbroker">
<parameter name="port" type="integer" required="false">
<label>Embedded Broker Port</label>
<description>A custom broker connection port. Leave empty to use the default MQTT ports 1883 and 8883 (SSL) for
secure or non-secure connections.</description>
<default></default>
</parameter>
<parameter name="secure" type="boolean" required="true">
<label>Secure Connection?</label>
<description>If set, hosts a secure SSL connection on port 8883 or otherwise a non secure connection on port 1883 (if
not overwritten by the port parameter)</description>
<default>false</default>
</parameter>
<parameter name="username" type="text" required="false">
<label>Embedded Broker Username</label>
<description>Connections need to provide this username to access the broker.</description>
<default></default>
</parameter>
<parameter name="password" type="text" required="false">
<label>Embedded Broker Password</label>
<description>Connections need to provide this password to access the broker. Should only be used if it is a secure
connection, because the password is transferred plain over the wire.</description>
<default></default>
</parameter>
<parameter name="persistenceFile" type="text" required="false">
<label>Persistence File</label>
<description>An optional persistence file. Retained messages are stored in this file. Can be empty to not store
anything. If it starts with "/" on Linux/MacOS or with a drive letter and colon (eg "c:/") it will be treated as an
absolute path. Be careful to select a path that you have write access to.</description>
<default>mqttembedded.bin</default>
</parameter>
</config-description>
</config-description:config-descriptions>

View File

@@ -1,197 +0,0 @@
/**
* Copyright (c) 2010-2020 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.io.mqttembeddedbroker.internal;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.verify;
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.GeneralSecurityException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import javax.naming.ConfigurationException;
import org.apache.commons.io.FileUtils;
import org.h2.mvstore.MVMap;
import org.h2.mvstore.MVStore;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.openhab.core.OpenHAB;
import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
import org.openhab.core.io.transport.mqtt.MqttBrokerConnection.MqttVersion;
import org.openhab.core.io.transport.mqtt.MqttBrokerConnection.Protocol;
import org.openhab.core.io.transport.mqtt.MqttConnectionObserver;
import org.openhab.core.io.transport.mqtt.MqttConnectionState;
import org.openhab.core.io.transport.mqtt.MqttException;
import org.openhab.core.io.transport.mqtt.MqttService;
import org.openhab.core.test.java.JavaTest;
import io.moquette.broker.RetainedMessage;
import io.moquette.broker.subscriptions.Topic;
/**
* Tests connections with the embedded broker. Checks for credential based login,
* check for SSL connections.
*
* @author David Graeff - Initial contribution
*/
@ExtendWith(MockitoExtension.class)
public class MqttEmbeddedBrokerServiceTest extends JavaTest {
private EmbeddedBrokerService subject;
private Map<String, Object> config = new HashMap<>();
private @Mock MqttService service;
@BeforeEach
public void setUp() throws ConfigurationException, MqttException, GeneralSecurityException, IOException {
config.put("username", "username");
config.put("password", "password");
config.put("port", 12345);
config.put("secure", false);
config.put("persistenceFile", "");
subject = new EmbeddedBrokerService(service, config);
}
@AfterEach
public void cleanUp() {
subject.deactivate();
}
public void waitForConnectionChange(MqttBrokerConnection c, MqttConnectionState expectedState)
throws InterruptedException {
Semaphore semaphore = new Semaphore(1);
semaphore.acquire();
MqttConnectionObserver mqttConnectionObserver = (state, error) -> {
if (state == expectedState) {
semaphore.release();
}
};
c.addConnectionObserver(mqttConnectionObserver);
if (c.connectionState() == expectedState) {
semaphore.release();
}
// Start the connection and wait until timeout or connected callback returns.
semaphore.tryAcquire(3000, TimeUnit.MILLISECONDS);
c.removeConnectionObserver(mqttConnectionObserver);
}
@Test
public void connectUnsecureAndTestCredentials() throws InterruptedException, IOException, ExecutionException {
MqttBrokerConnection c = subject.getConnection();
assertNotNull(c);
waitForConnectionChange(c, MqttConnectionState.CONNECTED);
assertThat(c.getUser(), is("username"));
assertThat(c.getPassword(), is("password"));
assertThat(c.connectionState(), is(MqttConnectionState.CONNECTED));
verify(service).addBrokerConnection(anyString(), eq(c));
// Connect with a second connection but wrong credentials
MqttBrokerConnection wrongCredentials = new MqttBrokerConnection(Protocol.TCP, MqttVersion.V3, c.getHost(),
c.getPort(), false, "wrongCred");
wrongCredentials.setCredentials("someUser", "somePassword");
if (wrongCredentials.start().get()) {
fail("Wrong credentials accepted!");
}
wrongCredentials.stop().get();
// Connect with a second connection but correct credentials
MqttBrokerConnection correctCredentials = new MqttBrokerConnection(Protocol.TCP, MqttVersion.V3, c.getHost(),
c.getPort(), false, "correctCred");
correctCredentials.setCredentials(c.getUser(), c.getPassword());
if (!correctCredentials.start().get()) {
fail("Couldn't connect although correct credentials");
}
correctCredentials.stop().get();
}
@Test
public void connectSecure() throws InterruptedException, IOException {
config.put("secure", true);
subject.modified(config);
MqttBrokerConnection c = subject.getConnection();
assertNotNull(c);
waitForConnectionChange(c, MqttConnectionState.CONNECTED);
assertThat(c.getUser(), is("username"));
assertThat(c.getPassword(), is("password"));
assertThat(c.connectionState(), is(MqttConnectionState.CONNECTED));
verify(service).addBrokerConnection(anyString(), eq(c));
}
@Test
public void testPersistence() throws InterruptedException, IOException, ExecutionException {
config.put("persistenceFile", "persist.mqtt");
Path path = Paths.get(OpenHAB.getUserDataFolder()).toAbsolutePath();
File jksFile = path.resolve("persist.mqtt").toFile();
if (jksFile.exists()) {
jksFile.delete();
}
subject.modified(config);
MqttBrokerConnection c = subject.getConnection();
assertNotNull(c);
waitForConnectionChange(c, MqttConnectionState.CONNECTED);
c.publish("demotopic", "testtest".getBytes(), 2, true).get();
// Stop server -> close persistence storage and sync it to disk
subject.deactivate();
assertTrue(jksFile.exists());
// this is needed to ensure the file is correctly written
waitForAssert(() -> assertEquals(12288, jksFile.length()));
// The original file is still open, create a temp file for examination
File temp = File.createTempFile("abc", ".tmp");
temp.deleteOnExit();
FileUtils.copyFile(jksFile, temp);
MVStore mvStore = new MVStore.Builder().fileName(temp.getAbsolutePath()).autoCommitDisabled().open();
MVMap<Topic, RetainedMessage> openMap = mvStore.openMap("retained_store");
assertThat(openMap.size(), is(1));
for (Map.Entry<Topic, RetainedMessage> entry : openMap.entrySet()) {
assertThat(entry.getKey().toString(), is("demotopic"));
assertThat(new String(entry.getValue().getPayload()), is("testtest"));
}
}
}

View File

@@ -1,17 +0,0 @@
-----BEGIN CERTIFICATE-----
MIICtTCCAh6gAwIBAgIJAP3uXk/Ty+/kMA0GCSqGSIb3DQEBCwUAMHIxCzAJBgNV
BAYTAkRFMRIwEAYDVQQIDAlUZXN0U3RhdGUxETAPBgNVBAcMCFRlc3RDaXR5MRow
GAYDVQQKDBFFY2xpcHNlIFNtYXJ0aG9tZTEgMB4GCSqGSIb3DQEJARYRdGVzdG1h
aWxAdGVzdC50c3QwHhcNMTcwNzE4MDkxOTIyWhcNMTgwNzE4MDkxOTIyWjByMQsw
CQYDVQQGEwJERTESMBAGA1UECAwJVGVzdFN0YXRlMREwDwYDVQQHDAhUZXN0Q2l0
eTEaMBgGA1UECgwRRWNsaXBzZSBTbWFydGhvbWUxIDAeBgkqhkiG9w0BCQEWEXRl
c3RtYWlsQHRlc3QudHN0MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDWITto
A/kssy/fLcUA+6gTFVhvtaZpNdFIYFXx2xJVx0Zoh7AHa2jraPmMXIZKtJN1Ylga
kB4MJAheeZic08FccxAK057+3xQGpmRyNm26vNx205TPshzrxRQ6Q5mM92habhli
V0MBy92vPUMoxydUE9Exa1cLRA9MzHRqfzB5XQIDAQABo1MwUTAdBgNVHQ4EFgQU
azTlD8frRKkVB4t1FhjQjE6fx+AwHwYDVR0jBBgwFoAUazTlD8frRKkVB4t1FhjQ
jE6fx+AwDwYDVR0TAQH/BAUwAwEB/zANBgkqhkiG9w0BAQsFAAOBgQAelsTWJSD4
mf8w/eOeZmS6VlfaJiRycOgWNufFMMR8YDOLLX1Mw0sqOmeis3XsSXgMXWNw2nfH
h4bstGuHM61ibs48hu/Pnk5qxg56tb7CCBD/tdoIqVH2yIytCIG9uXeukIPjFnaw
EJOuwnkIo8bYKL8VRq/d8ALF1Q9dg6Z+9Q==
-----END CERTIFICATE-----

View File

@@ -1,16 +0,0 @@
-----BEGIN PRIVATE KEY-----
MIICeAIBADANBgkqhkiG9w0BAQEFAASCAmIwggJeAgEAAoGBANYhO2gD+SyzL98t
xQD7qBMVWG+1pmk10UhgVfHbElXHRmiHsAdraOto+Yxchkq0k3ViWBqQHgwkCF55
mJzTwVxzEArTnv7fFAamZHI2bbq83HbTlM+yHOvFFDpDmYz3aFpuGWJXQwHL3a89
QyjHJ1QT0TFrVwtED0zMdGp/MHldAgMBAAECgYAd5jAEWyGs4yxZDmwGsh0K5R0f
JA8je7dIUuNNTRinT5b+O4wRzSauUE8gET9TKRm590x0ERGRAmsEvhfYNh02iRkP
OaOyUZXOsXVExQbwYr+Bodbi65Ql7J2mXaRXkK0xXSzRnfKpgpshUOeiClu1LnwA
IyxhsmoS7ZbJPK7NhQJBAO6MEYXbQK7EZ747FmHqYIpmYbZF7yB3bGWoe+4QdtYV
NCPBoGDkvcXbIfaonCVkSV+oXnEvyqNPOstoFi6WTI8CQQDly9UyRG3ywyZpJ96i
I1u3bgsBfILlNMWYyB4j//Jgo6H8ZiuvzqgjiKN8rlRKUZ2fnuO6uCOYcrWq8uel
gWlTAkEAr3IzXRjR7PglOSNqJd/k20XLreynoGBVSDtv0rsnO/NiYr4BP+JctQ2j
YC/IkDO/R2yk8WhuCEi4fGv0jJUcfwJBAMOqUudRYvkxd7RUMXqHduHyPkbOuTnn
PFUCGL/4gG4PBq++Y3Z4Fazj/Kj+W2FIq1kt1qS3g/+btNpRqDLBxWcCQQCVpToA
pj3iwnJE5ex5ll2yg+rlwlJBVt3NE/Yl+eu35KPgY5P0+ePbv+4s4IP03/sWUPwk
8IKEK6CWT39Dbem7
-----END PRIVATE KEY-----

View File

@@ -21,7 +21,6 @@
<module>org.openhab.io.homekit</module>
<module>org.openhab.io.hueemulation</module>
<module>org.openhab.io.imperihome</module>
<module>org.openhab.io.mqttembeddedbroker</module>
<module>org.openhab.io.neeo</module>
<module>org.openhab.io.openhabcloud</module>
<module>org.openhab.io.transport.modbus</module>