diff --git a/bundles/org.openhab.io.mqttembeddedbroker/pom.xml b/bundles/org.openhab.io.mqttembeddedbroker/pom.xml
index ae9f42ed1..d733a1154 100644
--- a/bundles/org.openhab.io.mqttembeddedbroker/pom.xml
+++ b/bundles/org.openhab.io.mqttembeddedbroker/pom.xml
@@ -92,4 +92,27 @@
+
+
+
+ org.codehaus.mojo
+ build-helper-maven-plugin
+
+
+ reserve-network-port
+
+ reserve-network-port
+
+ process-resources
+
+
+ mqttembeddedbroker.port
+
+
+
+
+
+
+
+
diff --git a/bundles/org.openhab.io.mqttembeddedbroker/src/main/java/org/openhab/io/mqttembeddedbroker/Constants.java b/bundles/org.openhab.io.mqttembeddedbroker/src/main/java/org/openhab/io/mqttembeddedbroker/Constants.java
index 9b888f8c6..34b85e590 100644
--- a/bundles/org.openhab.io.mqttembeddedbroker/src/main/java/org/openhab/io/mqttembeddedbroker/Constants.java
+++ b/bundles/org.openhab.io.mqttembeddedbroker/src/main/java/org/openhab/io/mqttembeddedbroker/Constants.java
@@ -26,4 +26,14 @@ public class Constants {
*
*/
public static final String CLIENTID = "embedded-mqtt-broker";
+
+ /**
+ * The broker persistent identifier used for identifying configurations.
+ */
+ public static final String PID = "org.openhab.core.mqttembeddedbroker";
+
+ /**
+ * The configuration key used for configuring the embedded broker port.
+ */
+ public static final String PORT = "port";
}
diff --git a/bundles/org.openhab.io.mqttembeddedbroker/src/main/java/org/openhab/io/mqttembeddedbroker/internal/EmbeddedBrokerService.java b/bundles/org.openhab.io.mqttembeddedbroker/src/main/java/org/openhab/io/mqttembeddedbroker/internal/EmbeddedBrokerService.java
index bbdc53320..a87cada20 100644
--- a/bundles/org.openhab.io.mqttembeddedbroker/src/main/java/org/openhab/io/mqttembeddedbroker/internal/EmbeddedBrokerService.java
+++ b/bundles/org.openhab.io.mqttembeddedbroker/src/main/java/org/openhab/io/mqttembeddedbroker/internal/EmbeddedBrokerService.java
@@ -79,8 +79,8 @@ import io.netty.handler.ssl.SslContextBuilder;
*
* @author David Graeff - Initial contribution
*/
-@Component(immediate = true, service = EmbeddedBrokerService.class, configurationPid = "org.openhab.core.mqttembeddedbroker", //
- property = org.osgi.framework.Constants.SERVICE_PID + "=org.openhab.core.mqttembeddedbroker")
+@Component(immediate = true, service = EmbeddedBrokerService.class, configurationPid = Constants.PID, //
+ property = org.osgi.framework.Constants.SERVICE_PID + "=" + Constants.PID)
@ConfigurableService(category = "MQTT", label = "MQTT Embedded Broker", description_uri = "mqtt:mqttembeddedbroker")
@NonNullByDefault
public class EmbeddedBrokerService
@@ -309,7 +309,12 @@ public class EmbeddedBrokerService
// retry starting broker, if it fails again, don't catch exception
server.startServer(new MemoryConfig(properties), null, sslContextCreator, authentificator, authorizer);
}
+ } catch (Exception e) {
+ logger.warn("Failed to start embedded MQTT server: {}", e.getMessage());
+ server.stopServer();
+ return;
}
+
this.server = server;
server.addInterceptHandler(metrics);
ScheduledExecutorService s = new ScheduledThreadPoolExecutor(1);
diff --git a/itests/org.openhab.binding.mqtt.homeassistant.tests/itest.bndrun b/itests/org.openhab.binding.mqtt.homeassistant.tests/itest.bndrun
index bafc46417..49b1796db 100644
--- a/itests/org.openhab.binding.mqtt.homeassistant.tests/itest.bndrun
+++ b/itests/org.openhab.binding.mqtt.homeassistant.tests/itest.bndrun
@@ -13,6 +13,10 @@ Fragment-Host: org.openhab.binding.mqtt.homeassistant
-runblacklist: \
bnd.identity;id='org.openhab.core.storage.json'
+-runvm: \
+ -Dio.netty.noUnsafe=true,\
+ -Dmqttembeddedbroker.port=${mqttembeddedbroker.port}
+
#
# done
#
@@ -87,7 +91,4 @@ Fragment-Host: org.openhab.binding.mqtt.homeassistant
org.openhab.core.transform;version='[3.0.0,3.0.1)',\
org.openhab.io.mqttembeddedbroker;version='[3.0.0,3.0.1)',\
org.opentest4j;version='[1.2.0,1.2.1)',\
- org.reactivestreams.reactive-streams;version='[1.0.2,1.0.3)',\
- moquette-broker;version='[0.13.0,0.13.1)'
-
--runvm: -Dio.netty.noUnsafe=true
+ org.reactivestreams.reactive-streams;version='[1.0.2,1.0.3)'
diff --git a/itests/org.openhab.binding.mqtt.homeassistant.tests/pom.xml b/itests/org.openhab.binding.mqtt.homeassistant.tests/pom.xml
index 110ca397a..2c8e11c8a 100644
--- a/itests/org.openhab.binding.mqtt.homeassistant.tests/pom.xml
+++ b/itests/org.openhab.binding.mqtt.homeassistant.tests/pom.xml
@@ -100,4 +100,27 @@
+
+
+
+ org.codehaus.mojo
+ build-helper-maven-plugin
+
+
+ reserve-network-port
+
+ reserve-network-port
+
+ process-resources
+
+
+ mqttembeddedbroker.port
+
+
+
+
+
+
+
+
diff --git a/itests/org.openhab.binding.mqtt.homeassistant.tests/src/main/java/org/openhab/binding/mqtt/EmbeddedBrokerTools.java b/itests/org.openhab.binding.mqtt.homeassistant.tests/src/main/java/org/openhab/binding/mqtt/EmbeddedBrokerTools.java
index c318ed535..de7e511ad 100644
--- a/itests/org.openhab.binding.mqtt.homeassistant.tests/src/main/java/org/openhab/binding/mqtt/EmbeddedBrokerTools.java
+++ b/itests/org.openhab.binding.mqtt.homeassistant.tests/src/main/java/org/openhab/binding/mqtt/EmbeddedBrokerTools.java
@@ -14,6 +14,9 @@ package org.openhab.binding.mqtt;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import java.io.IOException;
+import java.util.Dictionary;
+import java.util.Hashtable;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@@ -25,23 +28,40 @@ import org.openhab.core.io.transport.mqtt.MqttConnectionState;
import org.openhab.core.io.transport.mqtt.MqttService;
import org.openhab.core.io.transport.mqtt.MqttServiceObserver;
import org.openhab.io.mqttembeddedbroker.Constants;
+import org.osgi.service.cm.Configuration;
+import org.osgi.service.cm.ConfigurationAdmin;
/**
* A full implementation test, that starts the embedded MQTT broker and publishes a homeassistant MQTT discovery device
* tree.
*
* @author David Graeff - Initial contribution
+ * @author Wouter Born - Support running MQTT itests in parallel by reconfiguring embedded broker port
*/
@NonNullByDefault
public class EmbeddedBrokerTools {
- public @Nullable MqttBrokerConnection embeddedConnection = null;
+
+ private static final int BROKER_PORT = Integer.getInteger("mqttembeddedbroker.port", 1883);
+
+ private final ConfigurationAdmin configurationAdmin;
+ private final MqttService mqttService;
+
+ public @Nullable MqttBrokerConnection embeddedConnection;
+
+ public EmbeddedBrokerTools(ConfigurationAdmin configurationAdmin, MqttService mqttService) {
+ this.configurationAdmin = configurationAdmin;
+ this.mqttService = mqttService;
+ }
/**
* Request the embedded broker connection from the {@link MqttService} and wait for a connection to be established.
*
* @throws InterruptedException
+ * @throws IOException
*/
- public MqttBrokerConnection waitForConnection(MqttService mqttService) throws InterruptedException {
+ public MqttBrokerConnection waitForConnection() throws InterruptedException, IOException {
+ reconfigurePort();
+
embeddedConnection = mqttService.getBrokerConnection(Constants.CLIENTID);
if (embeddedConnection == null) {
Semaphore semaphore = new Semaphore(1);
@@ -61,7 +81,7 @@ public class EmbeddedBrokerTools {
}
};
mqttService.addBrokersListener(observer);
- assertTrue(semaphore.tryAcquire(1000, TimeUnit.MILLISECONDS), "Wait for embedded connection client failed");
+ assertTrue(semaphore.tryAcquire(5, TimeUnit.SECONDS), "Wait for embedded connection client failed");
}
MqttBrokerConnection embeddedConnection = this.embeddedConnection;
if (embeddedConnection == null) {
@@ -79,8 +99,25 @@ public class EmbeddedBrokerTools {
if (embeddedConnection.connectionState() == MqttConnectionState.CONNECTED) {
semaphore.release();
}
- assertTrue(semaphore.tryAcquire(500, TimeUnit.MILLISECONDS), "Connection " + embeddedConnection.getClientId()
+ assertTrue(semaphore.tryAcquire(5, TimeUnit.SECONDS), "Connection " + embeddedConnection.getClientId()
+ " failed. State: " + embeddedConnection.connectionState());
return embeddedConnection;
}
+
+ public void reconfigurePort() throws IOException {
+ Configuration configuration = configurationAdmin.getConfiguration(Constants.PID, null);
+
+ Dictionary properties = configuration.getProperties();
+ if (properties == null) {
+ properties = new Hashtable<>();
+ }
+
+ Integer currentPort = (Integer) properties.get(Constants.PORT);
+ if (currentPort == null || currentPort.intValue() != BROKER_PORT) {
+ properties.put(Constants.PORT, BROKER_PORT);
+ configuration.update(properties);
+ // Remove the connection to make sure the test waits for the new connection to become available
+ mqttService.removeBrokerConnection(Constants.CLIENTID);
+ }
+ }
}
diff --git a/itests/org.openhab.binding.mqtt.homeassistant.tests/src/main/java/org/openhab/binding/mqtt/HomeAssistantMQTTImplementationTest.java b/itests/org.openhab.binding.mqtt.homeassistant.tests/src/main/java/org/openhab/binding/mqtt/HomeAssistantMQTTImplementationTest.java
index a4ae5fc8d..96aaf90bf 100644
--- a/itests/org.openhab.binding.mqtt.homeassistant.tests/src/main/java/org/openhab/binding/mqtt/HomeAssistantMQTTImplementationTest.java
+++ b/itests/org.openhab.binding.mqtt.homeassistant.tests/src/main/java/org/openhab/binding/mqtt/HomeAssistantMQTTImplementationTest.java
@@ -57,6 +57,7 @@ import org.openhab.core.test.java.JavaOSGiTest;
import org.openhab.core.types.State;
import org.openhab.core.types.UnDefType;
import org.openhab.core.util.UIDUtils;
+import org.osgi.service.cm.ConfigurationAdmin;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
@@ -69,6 +70,7 @@ import com.google.gson.GsonBuilder;
*/
@NonNullByDefault
public class HomeAssistantMQTTImplementationTest extends JavaOSGiTest {
+ private @NonNullByDefault({}) ConfigurationAdmin configurationAdmin;
private @NonNullByDefault({}) MqttService mqttService;
private @NonNullByDefault({}) MqttBrokerConnection embeddedConnection;
private @NonNullByDefault({}) MqttBrokerConnection connection;
@@ -94,14 +96,15 @@ public class HomeAssistantMQTTImplementationTest extends JavaOSGiTest {
public void beforeEach() throws Exception {
registerVolatileStorageService();
mocksCloseable = openMocks(this);
+ configurationAdmin = getService(ConfigurationAdmin.class);
mqttService = getService(MqttService.class);
// Wait for the EmbeddedBrokerService internal connection to be connected
- embeddedConnection = new EmbeddedBrokerTools().waitForConnection(mqttService);
+ embeddedConnection = new EmbeddedBrokerTools(configurationAdmin, mqttService).waitForConnection();
connection = new MqttBrokerConnection(embeddedConnection.getHost(), embeddedConnection.getPort(),
embeddedConnection.isSecure(), "ha_mqtt");
- connection.start().get(1000, TimeUnit.MILLISECONDS);
+ connection.start().get(2, TimeUnit.SECONDS);
assertThat(connection.connectionState(), is(MqttConnectionState.CONNECTED));
// If the connection state changes in between -> fail
@@ -117,7 +120,7 @@ public class HomeAssistantMQTTImplementationTest extends JavaOSGiTest {
futures.add(embeddedConnection.publish(testObjectTopic + "/state", "ON".getBytes(), 0, true));
registeredTopics = futures.size();
- CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(1000, TimeUnit.MILLISECONDS);
+ CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(2, TimeUnit.SECONDS);
failure = null;
@@ -128,7 +131,7 @@ public class HomeAssistantMQTTImplementationTest extends JavaOSGiTest {
public void afterEach() throws Exception {
if (connection != null) {
connection.removeConnectionObserver(failIfChange);
- connection.stop().get(1000, TimeUnit.MILLISECONDS);
+ connection.stop().get(2, TimeUnit.SECONDS);
}
mocksCloseable.close();
@@ -137,18 +140,18 @@ public class HomeAssistantMQTTImplementationTest extends JavaOSGiTest {
@Test
public void reconnectTest() throws InterruptedException, ExecutionException, TimeoutException {
connection.removeConnectionObserver(failIfChange);
- connection.stop().get(2000, TimeUnit.MILLISECONDS);
+ connection.stop().get(2, TimeUnit.SECONDS);
connection = new MqttBrokerConnection(embeddedConnection.getHost(), embeddedConnection.getPort(),
embeddedConnection.isSecure(), "ha_mqtt");
- connection.start().get(2000, TimeUnit.MILLISECONDS);
+ connection.start().get(2, TimeUnit.SECONDS);
}
@Test
public void retrieveAllTopics() throws InterruptedException, ExecutionException, TimeoutException {
CountDownLatch c = new CountDownLatch(registeredTopics);
connection.subscribe("homeassistant/+/+/" + ThingChannelConstants.testHomeAssistantThing.getId() + "/#",
- (topic, payload) -> c.countDown()).get(1000, TimeUnit.MILLISECONDS);
- assertTrue(c.await(1000, TimeUnit.MILLISECONDS),
+ (topic, payload) -> c.countDown()).get(2, TimeUnit.SECONDS);
+ assertTrue(c.await(2, TimeUnit.SECONDS),
"Connection " + connection.getClientId() + " not retrieving all topics");
}
@@ -183,8 +186,8 @@ public class HomeAssistantMQTTImplementationTest extends JavaOSGiTest {
return null;
});
- assertTrue(latch.await(4000, TimeUnit.MILLISECONDS));
- future.get(2000, TimeUnit.MILLISECONDS);
+ assertTrue(latch.await(4, TimeUnit.SECONDS));
+ future.get(2, TimeUnit.SECONDS);
// No failure expected and one discovered result
assertNull(failure);
diff --git a/itests/org.openhab.binding.mqtt.homie.tests/itest.bndrun b/itests/org.openhab.binding.mqtt.homie.tests/itest.bndrun
index e3be5b0f1..8d9ecadab 100644
--- a/itests/org.openhab.binding.mqtt.homie.tests/itest.bndrun
+++ b/itests/org.openhab.binding.mqtt.homie.tests/itest.bndrun
@@ -13,6 +13,10 @@ Fragment-Host: org.openhab.binding.mqtt.homie
-runblacklist: \
bnd.identity;id='org.openhab.core.storage.json'
+-runvm: \
+ -Dio.netty.noUnsafe=true,\
+ -Dmqttembeddedbroker.port=${mqttembeddedbroker.port}
+
#
# done
#
@@ -89,4 +93,4 @@ Fragment-Host: org.openhab.binding.mqtt.homie
org.opentest4j;version='[1.2.0,1.2.1)',\
org.reactivestreams.reactive-streams;version='[1.0.2,1.0.3)',\
moquette-broker;version='[0.13.0,0.13.1)'
--runvm: -Dio.netty.noUnsafe=true
+
diff --git a/itests/org.openhab.binding.mqtt.homie.tests/pom.xml b/itests/org.openhab.binding.mqtt.homie.tests/pom.xml
index 90f6b77ec..d6965a099 100644
--- a/itests/org.openhab.binding.mqtt.homie.tests/pom.xml
+++ b/itests/org.openhab.binding.mqtt.homie.tests/pom.xml
@@ -82,7 +82,6 @@
com.h2database
h2-mvstore
1.4.199
-
io.netty
@@ -101,4 +100,27 @@
+
+
+
+ org.codehaus.mojo
+ build-helper-maven-plugin
+
+
+ reserve-network-port
+
+ reserve-network-port
+
+ process-resources
+
+
+ mqttembeddedbroker.port
+
+
+
+
+
+
+
+
diff --git a/itests/org.openhab.binding.mqtt.homie.tests/src/main/java/org/openhab/binding/mqtt/EmbeddedBrokerTools.java b/itests/org.openhab.binding.mqtt.homie.tests/src/main/java/org/openhab/binding/mqtt/EmbeddedBrokerTools.java
index ec6923875..de7e511ad 100644
--- a/itests/org.openhab.binding.mqtt.homie.tests/src/main/java/org/openhab/binding/mqtt/EmbeddedBrokerTools.java
+++ b/itests/org.openhab.binding.mqtt.homie.tests/src/main/java/org/openhab/binding/mqtt/EmbeddedBrokerTools.java
@@ -14,6 +14,9 @@ package org.openhab.binding.mqtt;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import java.io.IOException;
+import java.util.Dictionary;
+import java.util.Hashtable;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@@ -25,26 +28,40 @@ import org.openhab.core.io.transport.mqtt.MqttConnectionState;
import org.openhab.core.io.transport.mqtt.MqttService;
import org.openhab.core.io.transport.mqtt.MqttServiceObserver;
import org.openhab.io.mqttembeddedbroker.Constants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.osgi.service.cm.Configuration;
+import org.osgi.service.cm.ConfigurationAdmin;
/**
* A full implementation test, that starts the embedded MQTT broker and publishes a homeassistant MQTT discovery device
* tree.
*
* @author David Graeff - Initial contribution
+ * @author Wouter Born - Support running MQTT itests in parallel by reconfiguring embedded broker port
*/
@NonNullByDefault
public class EmbeddedBrokerTools {
- private final Logger logger = LoggerFactory.getLogger(EmbeddedBrokerTools.class);
- public @Nullable MqttBrokerConnection embeddedConnection = null;
+
+ private static final int BROKER_PORT = Integer.getInteger("mqttembeddedbroker.port", 1883);
+
+ private final ConfigurationAdmin configurationAdmin;
+ private final MqttService mqttService;
+
+ public @Nullable MqttBrokerConnection embeddedConnection;
+
+ public EmbeddedBrokerTools(ConfigurationAdmin configurationAdmin, MqttService mqttService) {
+ this.configurationAdmin = configurationAdmin;
+ this.mqttService = mqttService;
+ }
/**
* Request the embedded broker connection from the {@link MqttService} and wait for a connection to be established.
*
* @throws InterruptedException
+ * @throws IOException
*/
- public MqttBrokerConnection waitForConnection(MqttService mqttService) throws InterruptedException {
+ public MqttBrokerConnection waitForConnection() throws InterruptedException, IOException {
+ reconfigurePort();
+
embeddedConnection = mqttService.getBrokerConnection(Constants.CLIENTID);
if (embeddedConnection == null) {
Semaphore semaphore = new Semaphore(1);
@@ -64,14 +81,13 @@ public class EmbeddedBrokerTools {
}
};
mqttService.addBrokersListener(observer);
- assertTrue(semaphore.tryAcquire(700, TimeUnit.MILLISECONDS), "Wait for embedded connection client failed");
+ assertTrue(semaphore.tryAcquire(5, TimeUnit.SECONDS), "Wait for embedded connection client failed");
}
MqttBrokerConnection embeddedConnection = this.embeddedConnection;
if (embeddedConnection == null) {
throw new IllegalStateException();
}
- logger.warn("waitForConnection {}", embeddedConnection.connectionState());
Semaphore semaphore = new Semaphore(1);
semaphore.acquire();
MqttConnectionObserver mqttConnectionObserver = (state, error) -> {
@@ -83,8 +99,25 @@ public class EmbeddedBrokerTools {
if (embeddedConnection.connectionState() == MqttConnectionState.CONNECTED) {
semaphore.release();
}
- assertTrue(semaphore.tryAcquire(500, TimeUnit.MILLISECONDS), "Connection " + embeddedConnection.getClientId()
+ assertTrue(semaphore.tryAcquire(5, TimeUnit.SECONDS), "Connection " + embeddedConnection.getClientId()
+ " failed. State: " + embeddedConnection.connectionState());
return embeddedConnection;
}
+
+ public void reconfigurePort() throws IOException {
+ Configuration configuration = configurationAdmin.getConfiguration(Constants.PID, null);
+
+ Dictionary properties = configuration.getProperties();
+ if (properties == null) {
+ properties = new Hashtable<>();
+ }
+
+ Integer currentPort = (Integer) properties.get(Constants.PORT);
+ if (currentPort == null || currentPort.intValue() != BROKER_PORT) {
+ properties.put(Constants.PORT, BROKER_PORT);
+ configuration.update(properties);
+ // Remove the connection to make sure the test waits for the new connection to become available
+ mqttService.removeBrokerConnection(Constants.CLIENTID);
+ }
+ }
}
diff --git a/itests/org.openhab.binding.mqtt.homie.tests/src/main/java/org/openhab/binding/mqtt/HomieImplementationTest.java b/itests/org.openhab.binding.mqtt.homie.tests/src/main/java/org/openhab/binding/mqtt/HomieImplementationTest.java
index 776f8fb91..816ec3b75 100644
--- a/itests/org.openhab.binding.mqtt.homie.tests/src/main/java/org/openhab/binding/mqtt/HomieImplementationTest.java
+++ b/itests/org.openhab.binding.mqtt.homie.tests/src/main/java/org/openhab/binding/mqtt/HomieImplementationTest.java
@@ -59,6 +59,7 @@ import org.openhab.core.library.types.DecimalType;
import org.openhab.core.library.types.OnOffType;
import org.openhab.core.test.java.JavaOSGiTest;
import org.openhab.core.types.UnDefType;
+import org.osgi.service.cm.ConfigurationAdmin;
/**
* A full implementation test, that starts the embedded MQTT broker and publishes a homie device tree.
@@ -71,6 +72,7 @@ public class HomieImplementationTest extends JavaOSGiTest {
private static final String DEVICE_ID = ThingChannelConstants.testHomieThing.getId();
private static final String DEVICE_TOPIC = BASE_TOPIC + "/" + DEVICE_ID;
+ private @NonNullByDefault({}) ConfigurationAdmin configurationAdmin;
private @NonNullByDefault({}) MqttService mqttService;
private @NonNullByDefault({}) MqttBrokerConnection embeddedConnection;
private @NonNullByDefault({}) MqttBrokerConnection connection;
@@ -99,15 +101,17 @@ public class HomieImplementationTest extends JavaOSGiTest {
public void beforeEach() throws Exception {
registerVolatileStorageService();
mocksCloseable = openMocks(this);
+ configurationAdmin = getService(ConfigurationAdmin.class);
mqttService = getService(MqttService.class);
- embeddedConnection = new EmbeddedBrokerTools().waitForConnection(mqttService);
+ // Wait for the EmbeddedBrokerService internal connection to be connected
+ embeddedConnection = new EmbeddedBrokerTools(configurationAdmin, mqttService).waitForConnection();
embeddedConnection.setQos(1);
connection = new MqttBrokerConnection(embeddedConnection.getHost(), embeddedConnection.getPort(),
embeddedConnection.isSecure(), "homie");
connection.setQos(1);
- connection.start().get(500, TimeUnit.MILLISECONDS);
+ connection.start().get(5, TimeUnit.SECONDS);
assertThat(connection.connectionState(), is(MqttConnectionState.CONNECTED));
// If the connection state changes in between -> fail
connection.addConnectionObserver(failIfChange);
@@ -146,7 +150,7 @@ public class HomieImplementationTest extends JavaOSGiTest {
futures.add(publish(propertyTestTopic + "/$datatype", "boolean"));
registeredTopics = futures.size();
- CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(1000, TimeUnit.MILLISECONDS);
+ CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(2, TimeUnit.SECONDS);
scheduler = new ScheduledThreadPoolExecutor(6);
}
@@ -159,9 +163,11 @@ public class HomieImplementationTest extends JavaOSGiTest {
public void afterEach() throws Exception {
if (connection != null) {
connection.removeConnectionObserver(failIfChange);
- connection.stop().get(500, TimeUnit.MILLISECONDS);
+ connection.stop().get(2, TimeUnit.SECONDS);
+ }
+ if (scheduler != null) {
+ scheduler.shutdownNow();
}
- scheduler.shutdownNow();
mocksCloseable.close();
}
@@ -169,9 +175,8 @@ public class HomieImplementationTest extends JavaOSGiTest {
public void retrieveAllTopics() throws InterruptedException, ExecutionException, TimeoutException {
// four topics are not under /testnode !
CountDownLatch c = new CountDownLatch(registeredTopics - 4);
- connection.subscribe(DEVICE_TOPIC + "/testnode/#", (topic, payload) -> c.countDown()).get(5000,
- TimeUnit.MILLISECONDS);
- assertTrue(c.await(5000, TimeUnit.MILLISECONDS),
+ connection.subscribe(DEVICE_TOPIC + "/testnode/#", (topic, payload) -> c.countDown()).get(5, TimeUnit.SECONDS);
+ assertTrue(c.await(5, TimeUnit.SECONDS),
"Connection " + connection.getClientId() + " not retrieving all topics ");
}
diff --git a/itests/org.openhab.io.mqttembeddedbroker.tests/itest.bndrun b/itests/org.openhab.io.mqttembeddedbroker.tests/itest.bndrun
index 82764488e..e690bf692 100644
--- a/itests/org.openhab.io.mqttembeddedbroker.tests/itest.bndrun
+++ b/itests/org.openhab.io.mqttembeddedbroker.tests/itest.bndrun
@@ -10,6 +10,10 @@ Fragment-Host: org.openhab.io.mqttembeddedbroker
-runblacklist: \
bnd.identity;id='org.openhab.core.storage.json'
+-runvm: \
+ -Dio.netty.noUnsafe=true,\
+ -Dmqttembeddedbroker.port=${mqttembeddedbroker.port}
+
#
# done
#
@@ -73,4 +77,3 @@ Fragment-Host: org.openhab.io.mqttembeddedbroker
org.opentest4j;version='[1.2.0,1.2.1)',\
org.reactivestreams.reactive-streams;version='[1.0.2,1.0.3)',\
moquette-broker;version='[0.13.0,0.13.1)'
--runvm: -Dio.netty.noUnsafe=true
diff --git a/itests/org.openhab.io.mqttembeddedbroker.tests/pom.xml b/itests/org.openhab.io.mqttembeddedbroker.tests/pom.xml
index ac22da343..3b7d7792d 100644
--- a/itests/org.openhab.io.mqttembeddedbroker.tests/pom.xml
+++ b/itests/org.openhab.io.mqttembeddedbroker.tests/pom.xml
@@ -66,4 +66,27 @@
+
+
+
+ org.codehaus.mojo
+ build-helper-maven-plugin
+
+
+ reserve-network-port
+
+ reserve-network-port
+
+ process-resources
+
+
+ mqttembeddedbroker.port
+
+
+
+
+
+
+
+
diff --git a/itests/org.openhab.io.mqttembeddedbroker.tests/src/main/java/org/openhab/io/mqttembeddedbroker/EmbeddedBrokerTools.java b/itests/org.openhab.io.mqttembeddedbroker.tests/src/main/java/org/openhab/io/mqttembeddedbroker/EmbeddedBrokerTools.java
index cc41838a0..9ebdf503c 100644
--- a/itests/org.openhab.io.mqttembeddedbroker.tests/src/main/java/org/openhab/io/mqttembeddedbroker/EmbeddedBrokerTools.java
+++ b/itests/org.openhab.io.mqttembeddedbroker.tests/src/main/java/org/openhab/io/mqttembeddedbroker/EmbeddedBrokerTools.java
@@ -14,6 +14,9 @@ package org.openhab.io.mqttembeddedbroker;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import java.io.IOException;
+import java.util.Dictionary;
+import java.util.Hashtable;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@@ -24,26 +27,40 @@ import org.openhab.core.io.transport.mqtt.MqttConnectionObserver;
import org.openhab.core.io.transport.mqtt.MqttConnectionState;
import org.openhab.core.io.transport.mqtt.MqttService;
import org.openhab.core.io.transport.mqtt.MqttServiceObserver;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.osgi.service.cm.Configuration;
+import org.osgi.service.cm.ConfigurationAdmin;
/**
* A full implementation test, that starts the embedded MQTT broker and publishes a homeassistant MQTT discovery device
* tree.
*
* @author David Graeff - Initial contribution
+ * @author Wouter Born - Support running MQTT itests in parallel by reconfiguring embedded broker port
*/
@NonNullByDefault
public class EmbeddedBrokerTools {
- private final Logger logger = LoggerFactory.getLogger(EmbeddedBrokerTools.class);
- public @Nullable MqttBrokerConnection embeddedConnection = null;
+
+ private static final int BROKER_PORT = Integer.getInteger("mqttembeddedbroker.port", 1883);
+
+ private final ConfigurationAdmin configurationAdmin;
+ private final MqttService mqttService;
+
+ public @Nullable MqttBrokerConnection embeddedConnection;
+
+ public EmbeddedBrokerTools(ConfigurationAdmin configurationAdmin, MqttService mqttService) {
+ this.configurationAdmin = configurationAdmin;
+ this.mqttService = mqttService;
+ }
/**
* Request the embedded broker connection from the {@link MqttService} and wait for a connection to be established.
*
* @throws InterruptedException
+ * @throws IOException
*/
- public MqttBrokerConnection waitForConnection(MqttService mqttService) throws InterruptedException {
+ public MqttBrokerConnection waitForConnection() throws InterruptedException, IOException {
+ reconfigurePort();
+
embeddedConnection = mqttService.getBrokerConnection(Constants.CLIENTID);
if (embeddedConnection == null) {
Semaphore semaphore = new Semaphore(1);
@@ -63,14 +80,13 @@ public class EmbeddedBrokerTools {
}
};
mqttService.addBrokersListener(observer);
- assertTrue(semaphore.tryAcquire(700, TimeUnit.MILLISECONDS), "Wait for embedded connection client failed");
+ assertTrue(semaphore.tryAcquire(5, TimeUnit.SECONDS), "Wait for embedded connection client failed");
}
MqttBrokerConnection embeddedConnection = this.embeddedConnection;
if (embeddedConnection == null) {
throw new IllegalStateException();
}
- logger.warn("waitForConnection {}", embeddedConnection.connectionState());
Semaphore semaphore = new Semaphore(1);
semaphore.acquire();
MqttConnectionObserver mqttConnectionObserver = (state, error) -> {
@@ -82,8 +98,25 @@ public class EmbeddedBrokerTools {
if (embeddedConnection.connectionState() == MqttConnectionState.CONNECTED) {
semaphore.release();
}
- assertTrue(semaphore.tryAcquire(500, TimeUnit.MILLISECONDS), "Connection " + embeddedConnection.getClientId()
+ assertTrue(semaphore.tryAcquire(5, TimeUnit.SECONDS), "Connection " + embeddedConnection.getClientId()
+ " failed. State: " + embeddedConnection.connectionState());
return embeddedConnection;
}
+
+ public void reconfigurePort() throws IOException {
+ Configuration configuration = configurationAdmin.getConfiguration(Constants.PID, null);
+
+ Dictionary properties = configuration.getProperties();
+ if (properties == null) {
+ properties = new Hashtable<>();
+ }
+
+ Integer currentPort = (Integer) properties.get(Constants.PORT);
+ if (currentPort == null || currentPort.intValue() != BROKER_PORT) {
+ properties.put(Constants.PORT, BROKER_PORT);
+ configuration.update(properties);
+ // Remove the connection to make sure the test waits for the new connection to become available
+ mqttService.removeBrokerConnection(Constants.CLIENTID);
+ }
+ }
}
diff --git a/itests/org.openhab.io.mqttembeddedbroker.tests/src/main/java/org/openhab/io/mqttembeddedbroker/MoquetteTest.java b/itests/org.openhab.io.mqttembeddedbroker.tests/src/main/java/org/openhab/io/mqttembeddedbroker/MoquetteTest.java
index f15a4245e..a22a69477 100644
--- a/itests/org.openhab.io.mqttembeddedbroker.tests/src/main/java/org/openhab/io/mqttembeddedbroker/MoquetteTest.java
+++ b/itests/org.openhab.io.mqttembeddedbroker.tests/src/main/java/org/openhab/io/mqttembeddedbroker/MoquetteTest.java
@@ -35,6 +35,7 @@ import org.openhab.core.io.transport.mqtt.MqttConnectionObserver;
import org.openhab.core.io.transport.mqtt.MqttConnectionState;
import org.openhab.core.io.transport.mqtt.MqttService;
import org.openhab.core.test.java.JavaOSGiTest;
+import org.osgi.service.cm.ConfigurationAdmin;
/**
* Moquette test
@@ -47,6 +48,7 @@ public class MoquetteTest extends JavaOSGiTest {
private @NonNullByDefault({}) AutoCloseable mocksCloseable;
+ private @NonNullByDefault({}) ConfigurationAdmin configurationAdmin;
private @NonNullByDefault({}) MqttService mqttService;
private @NonNullByDefault({}) MqttBrokerConnection embeddedConnection;
private @NonNullByDefault({}) MqttBrokerConnection clientConnection;
@@ -62,10 +64,11 @@ public class MoquetteTest extends JavaOSGiTest {
public void beforeEach() throws Exception {
registerVolatileStorageService();
mocksCloseable = openMocks(this);
+ configurationAdmin = getService(ConfigurationAdmin.class);
mqttService = getService(MqttService.class);
// Wait for the EmbeddedBrokerService internal connection to be connected
- embeddedConnection = new EmbeddedBrokerTools().waitForConnection(mqttService);
+ embeddedConnection = new EmbeddedBrokerTools(configurationAdmin, mqttService).waitForConnection();
embeddedConnection.setQos(1);
clientConnection = new MqttBrokerConnection(embeddedConnection.getHost(), embeddedConnection.getPort(),