Fix parallel MQTT itests execution (#8617)

* Improve exception handling of the embedded MQTT broker so the port can be reconfigured when it is already bound and it properly unlocks files
* Rework MQTT integration tests so they each run the embedded broker on their own reserved port

Signed-off-by: Wouter Born <github@maindrain.net>
This commit is contained in:
Wouter Born 2020-09-30 19:36:47 +02:00 committed by GitHub
parent ea55540f8b
commit fbafc365da
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 276 additions and 48 deletions

View File

@ -92,4 +92,27 @@
</dependency> </dependency>
</dependencies> </dependencies>
<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<executions>
<execution>
<id>reserve-network-port</id>
<goals>
<goal>reserve-network-port</goal>
</goals>
<phase>process-resources</phase>
<configuration>
<portNames>
<portName>mqttembeddedbroker.port</portName>
</portNames>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project> </project>

View File

@ -26,4 +26,14 @@ public class Constants {
* </pre> * </pre>
*/ */
public static final String CLIENTID = "embedded-mqtt-broker"; public static final String CLIENTID = "embedded-mqtt-broker";
/**
* The broker persistent identifier used for identifying configurations.
*/
public static final String PID = "org.openhab.core.mqttembeddedbroker";
/**
* The configuration key used for configuring the embedded broker port.
*/
public static final String PORT = "port";
} }

View File

@ -79,8 +79,8 @@ import io.netty.handler.ssl.SslContextBuilder;
* *
* @author David Graeff - Initial contribution * @author David Graeff - Initial contribution
*/ */
@Component(immediate = true, service = EmbeddedBrokerService.class, configurationPid = "org.openhab.core.mqttembeddedbroker", // @Component(immediate = true, service = EmbeddedBrokerService.class, configurationPid = Constants.PID, //
property = org.osgi.framework.Constants.SERVICE_PID + "=org.openhab.core.mqttembeddedbroker") property = org.osgi.framework.Constants.SERVICE_PID + "=" + Constants.PID)
@ConfigurableService(category = "MQTT", label = "MQTT Embedded Broker", description_uri = "mqtt:mqttembeddedbroker") @ConfigurableService(category = "MQTT", label = "MQTT Embedded Broker", description_uri = "mqtt:mqttembeddedbroker")
@NonNullByDefault @NonNullByDefault
public class EmbeddedBrokerService public class EmbeddedBrokerService
@ -309,7 +309,12 @@ public class EmbeddedBrokerService
// retry starting broker, if it fails again, don't catch exception // retry starting broker, if it fails again, don't catch exception
server.startServer(new MemoryConfig(properties), null, sslContextCreator, authentificator, authorizer); server.startServer(new MemoryConfig(properties), null, sslContextCreator, authentificator, authorizer);
} }
} catch (Exception e) {
logger.warn("Failed to start embedded MQTT server: {}", e.getMessage());
server.stopServer();
return;
} }
this.server = server; this.server = server;
server.addInterceptHandler(metrics); server.addInterceptHandler(metrics);
ScheduledExecutorService s = new ScheduledThreadPoolExecutor(1); ScheduledExecutorService s = new ScheduledThreadPoolExecutor(1);

View File

@ -13,6 +13,10 @@ Fragment-Host: org.openhab.binding.mqtt.homeassistant
-runblacklist: \ -runblacklist: \
bnd.identity;id='org.openhab.core.storage.json' bnd.identity;id='org.openhab.core.storage.json'
-runvm: \
-Dio.netty.noUnsafe=true,\
-Dmqttembeddedbroker.port=${mqttembeddedbroker.port}
# #
# done # done
# #
@ -87,7 +91,4 @@ Fragment-Host: org.openhab.binding.mqtt.homeassistant
org.openhab.core.transform;version='[3.0.0,3.0.1)',\ org.openhab.core.transform;version='[3.0.0,3.0.1)',\
org.openhab.io.mqttembeddedbroker;version='[3.0.0,3.0.1)',\ org.openhab.io.mqttembeddedbroker;version='[3.0.0,3.0.1)',\
org.opentest4j;version='[1.2.0,1.2.1)',\ org.opentest4j;version='[1.2.0,1.2.1)',\
org.reactivestreams.reactive-streams;version='[1.0.2,1.0.3)',\ org.reactivestreams.reactive-streams;version='[1.0.2,1.0.3)'
moquette-broker;version='[0.13.0,0.13.1)'
-runvm: -Dio.netty.noUnsafe=true

View File

@ -100,4 +100,27 @@
</dependency> </dependency>
</dependencies> </dependencies>
<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<executions>
<execution>
<id>reserve-network-port</id>
<goals>
<goal>reserve-network-port</goal>
</goals>
<phase>process-resources</phase>
<configuration>
<portNames>
<portName>mqttembeddedbroker.port</portName>
</portNames>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project> </project>

View File

@ -14,6 +14,9 @@ package org.openhab.binding.mqtt;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.IOException;
import java.util.Dictionary;
import java.util.Hashtable;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -25,23 +28,40 @@ import org.openhab.core.io.transport.mqtt.MqttConnectionState;
import org.openhab.core.io.transport.mqtt.MqttService; import org.openhab.core.io.transport.mqtt.MqttService;
import org.openhab.core.io.transport.mqtt.MqttServiceObserver; import org.openhab.core.io.transport.mqtt.MqttServiceObserver;
import org.openhab.io.mqttembeddedbroker.Constants; import org.openhab.io.mqttembeddedbroker.Constants;
import org.osgi.service.cm.Configuration;
import org.osgi.service.cm.ConfigurationAdmin;
/** /**
* A full implementation test, that starts the embedded MQTT broker and publishes a homeassistant MQTT discovery device * A full implementation test, that starts the embedded MQTT broker and publishes a homeassistant MQTT discovery device
* tree. * tree.
* *
* @author David Graeff - Initial contribution * @author David Graeff - Initial contribution
* @author Wouter Born - Support running MQTT itests in parallel by reconfiguring embedded broker port
*/ */
@NonNullByDefault @NonNullByDefault
public class EmbeddedBrokerTools { public class EmbeddedBrokerTools {
public @Nullable MqttBrokerConnection embeddedConnection = null;
private static final int BROKER_PORT = Integer.getInteger("mqttembeddedbroker.port", 1883);
private final ConfigurationAdmin configurationAdmin;
private final MqttService mqttService;
public @Nullable MqttBrokerConnection embeddedConnection;
public EmbeddedBrokerTools(ConfigurationAdmin configurationAdmin, MqttService mqttService) {
this.configurationAdmin = configurationAdmin;
this.mqttService = mqttService;
}
/** /**
* Request the embedded broker connection from the {@link MqttService} and wait for a connection to be established. * Request the embedded broker connection from the {@link MqttService} and wait for a connection to be established.
* *
* @throws InterruptedException * @throws InterruptedException
* @throws IOException
*/ */
public MqttBrokerConnection waitForConnection(MqttService mqttService) throws InterruptedException { public MqttBrokerConnection waitForConnection() throws InterruptedException, IOException {
reconfigurePort();
embeddedConnection = mqttService.getBrokerConnection(Constants.CLIENTID); embeddedConnection = mqttService.getBrokerConnection(Constants.CLIENTID);
if (embeddedConnection == null) { if (embeddedConnection == null) {
Semaphore semaphore = new Semaphore(1); Semaphore semaphore = new Semaphore(1);
@ -61,7 +81,7 @@ public class EmbeddedBrokerTools {
} }
}; };
mqttService.addBrokersListener(observer); mqttService.addBrokersListener(observer);
assertTrue(semaphore.tryAcquire(1000, TimeUnit.MILLISECONDS), "Wait for embedded connection client failed"); assertTrue(semaphore.tryAcquire(5, TimeUnit.SECONDS), "Wait for embedded connection client failed");
} }
MqttBrokerConnection embeddedConnection = this.embeddedConnection; MqttBrokerConnection embeddedConnection = this.embeddedConnection;
if (embeddedConnection == null) { if (embeddedConnection == null) {
@ -79,8 +99,25 @@ public class EmbeddedBrokerTools {
if (embeddedConnection.connectionState() == MqttConnectionState.CONNECTED) { if (embeddedConnection.connectionState() == MqttConnectionState.CONNECTED) {
semaphore.release(); semaphore.release();
} }
assertTrue(semaphore.tryAcquire(500, TimeUnit.MILLISECONDS), "Connection " + embeddedConnection.getClientId() assertTrue(semaphore.tryAcquire(5, TimeUnit.SECONDS), "Connection " + embeddedConnection.getClientId()
+ " failed. State: " + embeddedConnection.connectionState()); + " failed. State: " + embeddedConnection.connectionState());
return embeddedConnection; return embeddedConnection;
} }
public void reconfigurePort() throws IOException {
Configuration configuration = configurationAdmin.getConfiguration(Constants.PID, null);
Dictionary<String, Object> properties = configuration.getProperties();
if (properties == null) {
properties = new Hashtable<>();
}
Integer currentPort = (Integer) properties.get(Constants.PORT);
if (currentPort == null || currentPort.intValue() != BROKER_PORT) {
properties.put(Constants.PORT, BROKER_PORT);
configuration.update(properties);
// Remove the connection to make sure the test waits for the new connection to become available
mqttService.removeBrokerConnection(Constants.CLIENTID);
}
}
} }

View File

@ -57,6 +57,7 @@ import org.openhab.core.test.java.JavaOSGiTest;
import org.openhab.core.types.State; import org.openhab.core.types.State;
import org.openhab.core.types.UnDefType; import org.openhab.core.types.UnDefType;
import org.openhab.core.util.UIDUtils; import org.openhab.core.util.UIDUtils;
import org.osgi.service.cm.ConfigurationAdmin;
import com.google.gson.Gson; import com.google.gson.Gson;
import com.google.gson.GsonBuilder; import com.google.gson.GsonBuilder;
@ -69,6 +70,7 @@ import com.google.gson.GsonBuilder;
*/ */
@NonNullByDefault @NonNullByDefault
public class HomeAssistantMQTTImplementationTest extends JavaOSGiTest { public class HomeAssistantMQTTImplementationTest extends JavaOSGiTest {
private @NonNullByDefault({}) ConfigurationAdmin configurationAdmin;
private @NonNullByDefault({}) MqttService mqttService; private @NonNullByDefault({}) MqttService mqttService;
private @NonNullByDefault({}) MqttBrokerConnection embeddedConnection; private @NonNullByDefault({}) MqttBrokerConnection embeddedConnection;
private @NonNullByDefault({}) MqttBrokerConnection connection; private @NonNullByDefault({}) MqttBrokerConnection connection;
@ -94,14 +96,15 @@ public class HomeAssistantMQTTImplementationTest extends JavaOSGiTest {
public void beforeEach() throws Exception { public void beforeEach() throws Exception {
registerVolatileStorageService(); registerVolatileStorageService();
mocksCloseable = openMocks(this); mocksCloseable = openMocks(this);
configurationAdmin = getService(ConfigurationAdmin.class);
mqttService = getService(MqttService.class); mqttService = getService(MqttService.class);
// Wait for the EmbeddedBrokerService internal connection to be connected // Wait for the EmbeddedBrokerService internal connection to be connected
embeddedConnection = new EmbeddedBrokerTools().waitForConnection(mqttService); embeddedConnection = new EmbeddedBrokerTools(configurationAdmin, mqttService).waitForConnection();
connection = new MqttBrokerConnection(embeddedConnection.getHost(), embeddedConnection.getPort(), connection = new MqttBrokerConnection(embeddedConnection.getHost(), embeddedConnection.getPort(),
embeddedConnection.isSecure(), "ha_mqtt"); embeddedConnection.isSecure(), "ha_mqtt");
connection.start().get(1000, TimeUnit.MILLISECONDS); connection.start().get(2, TimeUnit.SECONDS);
assertThat(connection.connectionState(), is(MqttConnectionState.CONNECTED)); assertThat(connection.connectionState(), is(MqttConnectionState.CONNECTED));
// If the connection state changes in between -> fail // If the connection state changes in between -> fail
@ -117,7 +120,7 @@ public class HomeAssistantMQTTImplementationTest extends JavaOSGiTest {
futures.add(embeddedConnection.publish(testObjectTopic + "/state", "ON".getBytes(), 0, true)); futures.add(embeddedConnection.publish(testObjectTopic + "/state", "ON".getBytes(), 0, true));
registeredTopics = futures.size(); registeredTopics = futures.size();
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(1000, TimeUnit.MILLISECONDS); CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(2, TimeUnit.SECONDS);
failure = null; failure = null;
@ -128,7 +131,7 @@ public class HomeAssistantMQTTImplementationTest extends JavaOSGiTest {
public void afterEach() throws Exception { public void afterEach() throws Exception {
if (connection != null) { if (connection != null) {
connection.removeConnectionObserver(failIfChange); connection.removeConnectionObserver(failIfChange);
connection.stop().get(1000, TimeUnit.MILLISECONDS); connection.stop().get(2, TimeUnit.SECONDS);
} }
mocksCloseable.close(); mocksCloseable.close();
@ -137,18 +140,18 @@ public class HomeAssistantMQTTImplementationTest extends JavaOSGiTest {
@Test @Test
public void reconnectTest() throws InterruptedException, ExecutionException, TimeoutException { public void reconnectTest() throws InterruptedException, ExecutionException, TimeoutException {
connection.removeConnectionObserver(failIfChange); connection.removeConnectionObserver(failIfChange);
connection.stop().get(2000, TimeUnit.MILLISECONDS); connection.stop().get(2, TimeUnit.SECONDS);
connection = new MqttBrokerConnection(embeddedConnection.getHost(), embeddedConnection.getPort(), connection = new MqttBrokerConnection(embeddedConnection.getHost(), embeddedConnection.getPort(),
embeddedConnection.isSecure(), "ha_mqtt"); embeddedConnection.isSecure(), "ha_mqtt");
connection.start().get(2000, TimeUnit.MILLISECONDS); connection.start().get(2, TimeUnit.SECONDS);
} }
@Test @Test
public void retrieveAllTopics() throws InterruptedException, ExecutionException, TimeoutException { public void retrieveAllTopics() throws InterruptedException, ExecutionException, TimeoutException {
CountDownLatch c = new CountDownLatch(registeredTopics); CountDownLatch c = new CountDownLatch(registeredTopics);
connection.subscribe("homeassistant/+/+/" + ThingChannelConstants.testHomeAssistantThing.getId() + "/#", connection.subscribe("homeassistant/+/+/" + ThingChannelConstants.testHomeAssistantThing.getId() + "/#",
(topic, payload) -> c.countDown()).get(1000, TimeUnit.MILLISECONDS); (topic, payload) -> c.countDown()).get(2, TimeUnit.SECONDS);
assertTrue(c.await(1000, TimeUnit.MILLISECONDS), assertTrue(c.await(2, TimeUnit.SECONDS),
"Connection " + connection.getClientId() + " not retrieving all topics"); "Connection " + connection.getClientId() + " not retrieving all topics");
} }
@ -183,8 +186,8 @@ public class HomeAssistantMQTTImplementationTest extends JavaOSGiTest {
return null; return null;
}); });
assertTrue(latch.await(4000, TimeUnit.MILLISECONDS)); assertTrue(latch.await(4, TimeUnit.SECONDS));
future.get(2000, TimeUnit.MILLISECONDS); future.get(2, TimeUnit.SECONDS);
// No failure expected and one discovered result // No failure expected and one discovered result
assertNull(failure); assertNull(failure);

View File

@ -13,6 +13,10 @@ Fragment-Host: org.openhab.binding.mqtt.homie
-runblacklist: \ -runblacklist: \
bnd.identity;id='org.openhab.core.storage.json' bnd.identity;id='org.openhab.core.storage.json'
-runvm: \
-Dio.netty.noUnsafe=true,\
-Dmqttembeddedbroker.port=${mqttembeddedbroker.port}
# #
# done # done
# #
@ -89,4 +93,4 @@ Fragment-Host: org.openhab.binding.mqtt.homie
org.opentest4j;version='[1.2.0,1.2.1)',\ org.opentest4j;version='[1.2.0,1.2.1)',\
org.reactivestreams.reactive-streams;version='[1.0.2,1.0.3)',\ org.reactivestreams.reactive-streams;version='[1.0.2,1.0.3)',\
moquette-broker;version='[0.13.0,0.13.1)' moquette-broker;version='[0.13.0,0.13.1)'
-runvm: -Dio.netty.noUnsafe=true

View File

@ -82,7 +82,6 @@
<groupId>com.h2database</groupId> <groupId>com.h2database</groupId>
<artifactId>h2-mvstore</artifactId> <artifactId>h2-mvstore</artifactId>
<version>1.4.199</version> <version>1.4.199</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>io.netty</groupId> <groupId>io.netty</groupId>
@ -101,4 +100,27 @@
</dependency> </dependency>
</dependencies> </dependencies>
<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<executions>
<execution>
<id>reserve-network-port</id>
<goals>
<goal>reserve-network-port</goal>
</goals>
<phase>process-resources</phase>
<configuration>
<portNames>
<portName>mqttembeddedbroker.port</portName>
</portNames>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project> </project>

View File

@ -14,6 +14,9 @@ package org.openhab.binding.mqtt;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.IOException;
import java.util.Dictionary;
import java.util.Hashtable;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -25,26 +28,40 @@ import org.openhab.core.io.transport.mqtt.MqttConnectionState;
import org.openhab.core.io.transport.mqtt.MqttService; import org.openhab.core.io.transport.mqtt.MqttService;
import org.openhab.core.io.transport.mqtt.MqttServiceObserver; import org.openhab.core.io.transport.mqtt.MqttServiceObserver;
import org.openhab.io.mqttembeddedbroker.Constants; import org.openhab.io.mqttembeddedbroker.Constants;
import org.slf4j.Logger; import org.osgi.service.cm.Configuration;
import org.slf4j.LoggerFactory; import org.osgi.service.cm.ConfigurationAdmin;
/** /**
* A full implementation test, that starts the embedded MQTT broker and publishes a homeassistant MQTT discovery device * A full implementation test, that starts the embedded MQTT broker and publishes a homeassistant MQTT discovery device
* tree. * tree.
* *
* @author David Graeff - Initial contribution * @author David Graeff - Initial contribution
* @author Wouter Born - Support running MQTT itests in parallel by reconfiguring embedded broker port
*/ */
@NonNullByDefault @NonNullByDefault
public class EmbeddedBrokerTools { public class EmbeddedBrokerTools {
private final Logger logger = LoggerFactory.getLogger(EmbeddedBrokerTools.class);
public @Nullable MqttBrokerConnection embeddedConnection = null; private static final int BROKER_PORT = Integer.getInteger("mqttembeddedbroker.port", 1883);
private final ConfigurationAdmin configurationAdmin;
private final MqttService mqttService;
public @Nullable MqttBrokerConnection embeddedConnection;
public EmbeddedBrokerTools(ConfigurationAdmin configurationAdmin, MqttService mqttService) {
this.configurationAdmin = configurationAdmin;
this.mqttService = mqttService;
}
/** /**
* Request the embedded broker connection from the {@link MqttService} and wait for a connection to be established. * Request the embedded broker connection from the {@link MqttService} and wait for a connection to be established.
* *
* @throws InterruptedException * @throws InterruptedException
* @throws IOException
*/ */
public MqttBrokerConnection waitForConnection(MqttService mqttService) throws InterruptedException { public MqttBrokerConnection waitForConnection() throws InterruptedException, IOException {
reconfigurePort();
embeddedConnection = mqttService.getBrokerConnection(Constants.CLIENTID); embeddedConnection = mqttService.getBrokerConnection(Constants.CLIENTID);
if (embeddedConnection == null) { if (embeddedConnection == null) {
Semaphore semaphore = new Semaphore(1); Semaphore semaphore = new Semaphore(1);
@ -64,14 +81,13 @@ public class EmbeddedBrokerTools {
} }
}; };
mqttService.addBrokersListener(observer); mqttService.addBrokersListener(observer);
assertTrue(semaphore.tryAcquire(700, TimeUnit.MILLISECONDS), "Wait for embedded connection client failed"); assertTrue(semaphore.tryAcquire(5, TimeUnit.SECONDS), "Wait for embedded connection client failed");
} }
MqttBrokerConnection embeddedConnection = this.embeddedConnection; MqttBrokerConnection embeddedConnection = this.embeddedConnection;
if (embeddedConnection == null) { if (embeddedConnection == null) {
throw new IllegalStateException(); throw new IllegalStateException();
} }
logger.warn("waitForConnection {}", embeddedConnection.connectionState());
Semaphore semaphore = new Semaphore(1); Semaphore semaphore = new Semaphore(1);
semaphore.acquire(); semaphore.acquire();
MqttConnectionObserver mqttConnectionObserver = (state, error) -> { MqttConnectionObserver mqttConnectionObserver = (state, error) -> {
@ -83,8 +99,25 @@ public class EmbeddedBrokerTools {
if (embeddedConnection.connectionState() == MqttConnectionState.CONNECTED) { if (embeddedConnection.connectionState() == MqttConnectionState.CONNECTED) {
semaphore.release(); semaphore.release();
} }
assertTrue(semaphore.tryAcquire(500, TimeUnit.MILLISECONDS), "Connection " + embeddedConnection.getClientId() assertTrue(semaphore.tryAcquire(5, TimeUnit.SECONDS), "Connection " + embeddedConnection.getClientId()
+ " failed. State: " + embeddedConnection.connectionState()); + " failed. State: " + embeddedConnection.connectionState());
return embeddedConnection; return embeddedConnection;
} }
public void reconfigurePort() throws IOException {
Configuration configuration = configurationAdmin.getConfiguration(Constants.PID, null);
Dictionary<String, Object> properties = configuration.getProperties();
if (properties == null) {
properties = new Hashtable<>();
}
Integer currentPort = (Integer) properties.get(Constants.PORT);
if (currentPort == null || currentPort.intValue() != BROKER_PORT) {
properties.put(Constants.PORT, BROKER_PORT);
configuration.update(properties);
// Remove the connection to make sure the test waits for the new connection to become available
mqttService.removeBrokerConnection(Constants.CLIENTID);
}
}
} }

View File

@ -59,6 +59,7 @@ import org.openhab.core.library.types.DecimalType;
import org.openhab.core.library.types.OnOffType; import org.openhab.core.library.types.OnOffType;
import org.openhab.core.test.java.JavaOSGiTest; import org.openhab.core.test.java.JavaOSGiTest;
import org.openhab.core.types.UnDefType; import org.openhab.core.types.UnDefType;
import org.osgi.service.cm.ConfigurationAdmin;
/** /**
* A full implementation test, that starts the embedded MQTT broker and publishes a homie device tree. * A full implementation test, that starts the embedded MQTT broker and publishes a homie device tree.
@ -71,6 +72,7 @@ public class HomieImplementationTest extends JavaOSGiTest {
private static final String DEVICE_ID = ThingChannelConstants.testHomieThing.getId(); private static final String DEVICE_ID = ThingChannelConstants.testHomieThing.getId();
private static final String DEVICE_TOPIC = BASE_TOPIC + "/" + DEVICE_ID; private static final String DEVICE_TOPIC = BASE_TOPIC + "/" + DEVICE_ID;
private @NonNullByDefault({}) ConfigurationAdmin configurationAdmin;
private @NonNullByDefault({}) MqttService mqttService; private @NonNullByDefault({}) MqttService mqttService;
private @NonNullByDefault({}) MqttBrokerConnection embeddedConnection; private @NonNullByDefault({}) MqttBrokerConnection embeddedConnection;
private @NonNullByDefault({}) MqttBrokerConnection connection; private @NonNullByDefault({}) MqttBrokerConnection connection;
@ -99,15 +101,17 @@ public class HomieImplementationTest extends JavaOSGiTest {
public void beforeEach() throws Exception { public void beforeEach() throws Exception {
registerVolatileStorageService(); registerVolatileStorageService();
mocksCloseable = openMocks(this); mocksCloseable = openMocks(this);
configurationAdmin = getService(ConfigurationAdmin.class);
mqttService = getService(MqttService.class); mqttService = getService(MqttService.class);
embeddedConnection = new EmbeddedBrokerTools().waitForConnection(mqttService); // Wait for the EmbeddedBrokerService internal connection to be connected
embeddedConnection = new EmbeddedBrokerTools(configurationAdmin, mqttService).waitForConnection();
embeddedConnection.setQos(1); embeddedConnection.setQos(1);
connection = new MqttBrokerConnection(embeddedConnection.getHost(), embeddedConnection.getPort(), connection = new MqttBrokerConnection(embeddedConnection.getHost(), embeddedConnection.getPort(),
embeddedConnection.isSecure(), "homie"); embeddedConnection.isSecure(), "homie");
connection.setQos(1); connection.setQos(1);
connection.start().get(500, TimeUnit.MILLISECONDS); connection.start().get(5, TimeUnit.SECONDS);
assertThat(connection.connectionState(), is(MqttConnectionState.CONNECTED)); assertThat(connection.connectionState(), is(MqttConnectionState.CONNECTED));
// If the connection state changes in between -> fail // If the connection state changes in between -> fail
connection.addConnectionObserver(failIfChange); connection.addConnectionObserver(failIfChange);
@ -146,7 +150,7 @@ public class HomieImplementationTest extends JavaOSGiTest {
futures.add(publish(propertyTestTopic + "/$datatype", "boolean")); futures.add(publish(propertyTestTopic + "/$datatype", "boolean"));
registeredTopics = futures.size(); registeredTopics = futures.size();
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(1000, TimeUnit.MILLISECONDS); CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(2, TimeUnit.SECONDS);
scheduler = new ScheduledThreadPoolExecutor(6); scheduler = new ScheduledThreadPoolExecutor(6);
} }
@ -159,9 +163,11 @@ public class HomieImplementationTest extends JavaOSGiTest {
public void afterEach() throws Exception { public void afterEach() throws Exception {
if (connection != null) { if (connection != null) {
connection.removeConnectionObserver(failIfChange); connection.removeConnectionObserver(failIfChange);
connection.stop().get(500, TimeUnit.MILLISECONDS); connection.stop().get(2, TimeUnit.SECONDS);
}
if (scheduler != null) {
scheduler.shutdownNow();
} }
scheduler.shutdownNow();
mocksCloseable.close(); mocksCloseable.close();
} }
@ -169,9 +175,8 @@ public class HomieImplementationTest extends JavaOSGiTest {
public void retrieveAllTopics() throws InterruptedException, ExecutionException, TimeoutException { public void retrieveAllTopics() throws InterruptedException, ExecutionException, TimeoutException {
// four topics are not under /testnode ! // four topics are not under /testnode !
CountDownLatch c = new CountDownLatch(registeredTopics - 4); CountDownLatch c = new CountDownLatch(registeredTopics - 4);
connection.subscribe(DEVICE_TOPIC + "/testnode/#", (topic, payload) -> c.countDown()).get(5000, connection.subscribe(DEVICE_TOPIC + "/testnode/#", (topic, payload) -> c.countDown()).get(5, TimeUnit.SECONDS);
TimeUnit.MILLISECONDS); assertTrue(c.await(5, TimeUnit.SECONDS),
assertTrue(c.await(5000, TimeUnit.MILLISECONDS),
"Connection " + connection.getClientId() + " not retrieving all topics "); "Connection " + connection.getClientId() + " not retrieving all topics ");
} }

View File

@ -10,6 +10,10 @@ Fragment-Host: org.openhab.io.mqttembeddedbroker
-runblacklist: \ -runblacklist: \
bnd.identity;id='org.openhab.core.storage.json' bnd.identity;id='org.openhab.core.storage.json'
-runvm: \
-Dio.netty.noUnsafe=true,\
-Dmqttembeddedbroker.port=${mqttembeddedbroker.port}
# #
# done # done
# #
@ -73,4 +77,3 @@ Fragment-Host: org.openhab.io.mqttembeddedbroker
org.opentest4j;version='[1.2.0,1.2.1)',\ org.opentest4j;version='[1.2.0,1.2.1)',\
org.reactivestreams.reactive-streams;version='[1.0.2,1.0.3)',\ org.reactivestreams.reactive-streams;version='[1.0.2,1.0.3)',\
moquette-broker;version='[0.13.0,0.13.1)' moquette-broker;version='[0.13.0,0.13.1)'
-runvm: -Dio.netty.noUnsafe=true

View File

@ -66,4 +66,27 @@
</dependency> </dependency>
</dependencies> </dependencies>
<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<executions>
<execution>
<id>reserve-network-port</id>
<goals>
<goal>reserve-network-port</goal>
</goals>
<phase>process-resources</phase>
<configuration>
<portNames>
<portName>mqttembeddedbroker.port</portName>
</portNames>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project> </project>

View File

@ -14,6 +14,9 @@ package org.openhab.io.mqttembeddedbroker;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.IOException;
import java.util.Dictionary;
import java.util.Hashtable;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -24,26 +27,40 @@ import org.openhab.core.io.transport.mqtt.MqttConnectionObserver;
import org.openhab.core.io.transport.mqtt.MqttConnectionState; import org.openhab.core.io.transport.mqtt.MqttConnectionState;
import org.openhab.core.io.transport.mqtt.MqttService; import org.openhab.core.io.transport.mqtt.MqttService;
import org.openhab.core.io.transport.mqtt.MqttServiceObserver; import org.openhab.core.io.transport.mqtt.MqttServiceObserver;
import org.slf4j.Logger; import org.osgi.service.cm.Configuration;
import org.slf4j.LoggerFactory; import org.osgi.service.cm.ConfigurationAdmin;
/** /**
* A full implementation test, that starts the embedded MQTT broker and publishes a homeassistant MQTT discovery device * A full implementation test, that starts the embedded MQTT broker and publishes a homeassistant MQTT discovery device
* tree. * tree.
* *
* @author David Graeff - Initial contribution * @author David Graeff - Initial contribution
* @author Wouter Born - Support running MQTT itests in parallel by reconfiguring embedded broker port
*/ */
@NonNullByDefault @NonNullByDefault
public class EmbeddedBrokerTools { public class EmbeddedBrokerTools {
private final Logger logger = LoggerFactory.getLogger(EmbeddedBrokerTools.class);
public @Nullable MqttBrokerConnection embeddedConnection = null; private static final int BROKER_PORT = Integer.getInteger("mqttembeddedbroker.port", 1883);
private final ConfigurationAdmin configurationAdmin;
private final MqttService mqttService;
public @Nullable MqttBrokerConnection embeddedConnection;
public EmbeddedBrokerTools(ConfigurationAdmin configurationAdmin, MqttService mqttService) {
this.configurationAdmin = configurationAdmin;
this.mqttService = mqttService;
}
/** /**
* Request the embedded broker connection from the {@link MqttService} and wait for a connection to be established. * Request the embedded broker connection from the {@link MqttService} and wait for a connection to be established.
* *
* @throws InterruptedException * @throws InterruptedException
* @throws IOException
*/ */
public MqttBrokerConnection waitForConnection(MqttService mqttService) throws InterruptedException { public MqttBrokerConnection waitForConnection() throws InterruptedException, IOException {
reconfigurePort();
embeddedConnection = mqttService.getBrokerConnection(Constants.CLIENTID); embeddedConnection = mqttService.getBrokerConnection(Constants.CLIENTID);
if (embeddedConnection == null) { if (embeddedConnection == null) {
Semaphore semaphore = new Semaphore(1); Semaphore semaphore = new Semaphore(1);
@ -63,14 +80,13 @@ public class EmbeddedBrokerTools {
} }
}; };
mqttService.addBrokersListener(observer); mqttService.addBrokersListener(observer);
assertTrue(semaphore.tryAcquire(700, TimeUnit.MILLISECONDS), "Wait for embedded connection client failed"); assertTrue(semaphore.tryAcquire(5, TimeUnit.SECONDS), "Wait for embedded connection client failed");
} }
MqttBrokerConnection embeddedConnection = this.embeddedConnection; MqttBrokerConnection embeddedConnection = this.embeddedConnection;
if (embeddedConnection == null) { if (embeddedConnection == null) {
throw new IllegalStateException(); throw new IllegalStateException();
} }
logger.warn("waitForConnection {}", embeddedConnection.connectionState());
Semaphore semaphore = new Semaphore(1); Semaphore semaphore = new Semaphore(1);
semaphore.acquire(); semaphore.acquire();
MqttConnectionObserver mqttConnectionObserver = (state, error) -> { MqttConnectionObserver mqttConnectionObserver = (state, error) -> {
@ -82,8 +98,25 @@ public class EmbeddedBrokerTools {
if (embeddedConnection.connectionState() == MqttConnectionState.CONNECTED) { if (embeddedConnection.connectionState() == MqttConnectionState.CONNECTED) {
semaphore.release(); semaphore.release();
} }
assertTrue(semaphore.tryAcquire(500, TimeUnit.MILLISECONDS), "Connection " + embeddedConnection.getClientId() assertTrue(semaphore.tryAcquire(5, TimeUnit.SECONDS), "Connection " + embeddedConnection.getClientId()
+ " failed. State: " + embeddedConnection.connectionState()); + " failed. State: " + embeddedConnection.connectionState());
return embeddedConnection; return embeddedConnection;
} }
public void reconfigurePort() throws IOException {
Configuration configuration = configurationAdmin.getConfiguration(Constants.PID, null);
Dictionary<String, Object> properties = configuration.getProperties();
if (properties == null) {
properties = new Hashtable<>();
}
Integer currentPort = (Integer) properties.get(Constants.PORT);
if (currentPort == null || currentPort.intValue() != BROKER_PORT) {
properties.put(Constants.PORT, BROKER_PORT);
configuration.update(properties);
// Remove the connection to make sure the test waits for the new connection to become available
mqttService.removeBrokerConnection(Constants.CLIENTID);
}
}
} }

View File

@ -35,6 +35,7 @@ import org.openhab.core.io.transport.mqtt.MqttConnectionObserver;
import org.openhab.core.io.transport.mqtt.MqttConnectionState; import org.openhab.core.io.transport.mqtt.MqttConnectionState;
import org.openhab.core.io.transport.mqtt.MqttService; import org.openhab.core.io.transport.mqtt.MqttService;
import org.openhab.core.test.java.JavaOSGiTest; import org.openhab.core.test.java.JavaOSGiTest;
import org.osgi.service.cm.ConfigurationAdmin;
/** /**
* Moquette test * Moquette test
@ -47,6 +48,7 @@ public class MoquetteTest extends JavaOSGiTest {
private @NonNullByDefault({}) AutoCloseable mocksCloseable; private @NonNullByDefault({}) AutoCloseable mocksCloseable;
private @NonNullByDefault({}) ConfigurationAdmin configurationAdmin;
private @NonNullByDefault({}) MqttService mqttService; private @NonNullByDefault({}) MqttService mqttService;
private @NonNullByDefault({}) MqttBrokerConnection embeddedConnection; private @NonNullByDefault({}) MqttBrokerConnection embeddedConnection;
private @NonNullByDefault({}) MqttBrokerConnection clientConnection; private @NonNullByDefault({}) MqttBrokerConnection clientConnection;
@ -62,10 +64,11 @@ public class MoquetteTest extends JavaOSGiTest {
public void beforeEach() throws Exception { public void beforeEach() throws Exception {
registerVolatileStorageService(); registerVolatileStorageService();
mocksCloseable = openMocks(this); mocksCloseable = openMocks(this);
configurationAdmin = getService(ConfigurationAdmin.class);
mqttService = getService(MqttService.class); mqttService = getService(MqttService.class);
// Wait for the EmbeddedBrokerService internal connection to be connected // Wait for the EmbeddedBrokerService internal connection to be connected
embeddedConnection = new EmbeddedBrokerTools().waitForConnection(mqttService); embeddedConnection = new EmbeddedBrokerTools(configurationAdmin, mqttService).waitForConnection();
embeddedConnection.setQos(1); embeddedConnection.setQos(1);
clientConnection = new MqttBrokerConnection(embeddedConnection.getHost(), embeddedConnection.getPort(), clientConnection = new MqttBrokerConnection(embeddedConnection.getHost(), embeddedConnection.getPort(),