[mqtt] Support Ruuvitags via Ruuvi Gateway (#13315)

Signed-off-by: Sami Salonen <ssalonen@gmail.com>
This commit is contained in:
Sami Salonen
2023-03-27 15:11:07 +03:00
committed by GitHub
parent 210aff461d
commit 18e7d81e4d
26 changed files with 2692 additions and 0 deletions

View File

@@ -0,0 +1,13 @@
This content is produced and maintained by the openHAB project.
* Project home: https://www.openhab.org
== Declared Project Licenses
This program and the accompanying materials are made available under the terms
of the Eclipse Public License 2.0 which is available at
https://www.eclipse.org/legal/epl-2.0/.
== Source Code
https://github.com/openhab/openhab-addons

View File

@@ -0,0 +1,108 @@
-include: ../itest-common.bndrun
Bundle-SymbolicName: ${project.artifactId}
Fragment-Host: org.openhab.binding.mqtt.ruuvigateway
Import-Package: \
com.bugsnag.*;resolution:=optional,\
com.librato.metrics.reporter.*;resolution:=optional,\
*
-includeresource: \
moquette-broker-[0-9.]*.jar;lib:=true
-runrequires: \
bnd.identity;id='org.openhab.binding.mqtt.ruuvigateway.tests'
# We would like to use the "volatile" storage only
-runblacklist: \
bnd.identity;id='org.openhab.core.storage.json'
-runvm.mqtt: \
-Dio.netty.noUnsafe=true,\
-Dmqttbroker.port=${mqttbroker.port}
#
# done
#
-runbundles: \
biz.aQute.tester.junit-platform;version='[6.4.0,6.4.1)',\
ch.qos.logback.classic;version='[1.2.11,1.2.12)',\
ch.qos.logback.core;version='[1.2.11,1.2.12)',\
com.google.dagger;version='[2.27.0,2.27.1)',\
com.google.gson;version='[2.9.1,2.9.2)',\
com.h2database.mvstore;version='[1.4.199,1.4.200)',\
com.hivemq.client.mqtt;version='[1.2.2,1.2.3)',\
com.sun.jna;version='[5.12.1,5.12.2)',\
com.sun.xml.bind.jaxb-osgi;version='[2.3.3,2.3.4)',\
com.zaxxer.HikariCP;version='[2.4.7,2.4.8)',\
io.dropwizard.metrics.core;version='[3.2.2,3.2.3)',\
io.methvin.directory-watcher;version='[0.17.1,0.17.2)',\
io.netty.buffer;version='[4.1.72,4.1.73)',\
io.netty.codec;version='[4.1.72,4.1.73)',\
io.netty.codec-http;version='[4.1.59,4.1.60)',\
io.netty.codec-mqtt;version='[4.1.72,4.1.73)',\
io.netty.codec-socks;version='[4.1.72,4.1.73)',\
io.netty.common;version='[4.1.72,4.1.73)',\
io.netty.handler;version='[4.1.72,4.1.73)',\
io.netty.handler-proxy;version='[4.1.72,4.1.73)',\
io.netty.resolver;version='[4.1.72,4.1.73)',\
io.netty.tcnative-classes;version='[2.0.46,2.0.47)',\
io.netty.transport;version='[4.1.72,4.1.73)',\
io.netty.transport-native-epoll;version='[4.1.59,4.1.60)',\
io.netty.transport-native-unix-common;version='[4.1.59,4.1.60)',\
io.reactivex.rxjava2.rxjava;version='[2.2.19,2.2.20)',\
jakarta.annotation-api;version='[2.0.0,2.0.1)',\
jakarta.inject.jakarta.inject-api;version='[2.0.0,2.0.1)',\
jakarta.ws.rs-api;version='[2.1.6,2.1.7)',\
jakarta.xml.bind-api;version='[2.3.3,2.3.4)',\
javax.measure.unit-api;version='[2.1.2,2.1.3)',\
junit-jupiter-api;version='[5.9.2,5.9.3)',\
junit-jupiter-engine;version='[5.9.2,5.9.3)',\
junit-jupiter-params;version='[5.9.2,5.9.3)',\
junit-platform-commons;version='[1.9.2,1.9.3)',\
junit-platform-engine;version='[1.9.2,1.9.3)',\
junit-platform-launcher;version='[1.9.2,1.9.3)',\
org.apache.aries.javax.jax.rs-api;version='[1.0.1,1.0.2)',\
org.apache.commons.commons-codec;version='[1.15.0,1.15.1)',\
org.apache.felix.configadmin;version='[1.9.26,1.9.27)',\
org.apache.felix.http.servlet-api;version='[1.2.0,1.2.1)',\
org.apache.felix.scr;version='[2.2.4,2.2.5)',\
org.apache.servicemix.specs.activation-api-1.2.1;version='[1.2.1,1.2.2)',\
org.eclipse.equinox.event;version='[1.4.300,1.4.301)',\
org.eclipse.jetty.http;version='[9.4.50,9.4.51)',\
org.eclipse.jetty.io;version='[9.4.50,9.4.51)',\
org.eclipse.jetty.security;version='[9.4.50,9.4.51)',\
org.eclipse.jetty.server;version='[9.4.50,9.4.51)',\
org.eclipse.jetty.servlet;version='[9.4.50,9.4.51)',\
org.eclipse.jetty.util;version='[9.4.50,9.4.51)',\
org.eclipse.jetty.util.ajax;version='[9.4.50,9.4.51)',\
org.glassfish.hk2.external.javax.inject;version='[2.4.0,2.4.1)',\
org.glassfish.hk2.osgi-resource-locator;version='[1.0.3,1.0.4)',\
org.hamcrest;version='[2.2.0,2.2.1)',\
org.jctools.core;version='[2.1.2,2.1.3)',\
org.jsr-305;version='[3.0.2,3.0.3)',\
org.openhab.binding.mqtt;version='[4.0.0,4.0.1)',\
org.openhab.binding.mqtt.generic;version='[4.0.0,4.0.1)',\
org.openhab.binding.mqtt.ruuvigateway;version='[4.0.0,4.0.1)',\
org.openhab.binding.mqtt.ruuvigateway.tests;version='[4.0.0,4.0.1)',\
org.openhab.core;version='[4.0.0,4.0.1)',\
org.openhab.core.config.core;version='[4.0.0,4.0.1)',\
org.openhab.core.config.discovery;version='[4.0.0,4.0.1)',\
org.openhab.core.io.console;version='[4.0.0,4.0.1)',\
org.openhab.core.io.transport.mqtt;version='[4.0.0,4.0.1)',\
org.openhab.core.test;version='[4.0.0,4.0.1)',\
org.openhab.core.thing;version='[4.0.0,4.0.1)',\
org.openhab.core.transform;version='[4.0.0,4.0.1)',\
org.opentest4j;version='[1.2.0,1.2.1)',\
org.ops4j.pax.logging.pax-logging-api;version='[2.2.0,2.2.1)',\
org.osgi.service.component;version='[1.5.0,1.5.1)',\
org.osgi.service.event;version='[1.4.0,1.4.1)',\
org.osgi.util.function;version='[1.2.0,1.2.1)',\
org.osgi.util.promise;version='[1.2.0,1.2.1)',\
org.reactivestreams.reactive-streams;version='[1.0.3,1.0.4)',\
si-units;version='[2.1.0,2.1.1)',\
si.uom.si-quantity;version='[2.1.0,2.1.1)',\
tech.units.indriya;version='[2.1.2,2.1.3)',\
uom-lib-common;version='[2.1.0,2.1.1)',\
xstream;version='[1.4.20,1.4.21)'

View File

@@ -0,0 +1,112 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.openhab.addons.itests</groupId>
<artifactId>org.openhab.addons.reactor.itests</artifactId>
<version>4.0.0-SNAPSHOT</version>
</parent>
<artifactId>org.openhab.binding.mqtt.ruuvigateway.tests</artifactId>
<name>openHAB Add-ons :: Integration Tests :: MQTT Ruuvi Gateway Tests</name>
<properties>
<mqttbroker.port>1884</mqttbroker.port>
</properties>
<dependencies>
<dependency>
<groupId>org.openhab.addons.bundles</groupId>
<artifactId>org.openhab.binding.mqtt</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.openhab.addons.bundles</groupId>
<artifactId>org.openhab.binding.mqtt.generic</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.openhab.addons.bundles</groupId>
<artifactId>org.openhab.binding.mqtt.ruuvigateway</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2-mvstore</artifactId>
<version>1.4.199</version>
</dependency>
<dependency>
<groupId>io.moquette</groupId>
<artifactId>moquette-broker</artifactId>
<version>0.15</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-mqtt</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler-proxy</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-resolver</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport</artifactId>
<version>${netty.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<executions>
<execution>
<id>reserve-mqtt-broker-port</id>
<goals>
<goal>reserve-network-port</goal>
</goals>
<phase>process-resources</phase>
<configuration>
<portNames>
<portName>mqttbroker.port</portName>
</portNames>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@@ -0,0 +1,130 @@
/**
* Copyright (c) 2010-2023 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.binding.mqtt.ruuvigateway;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import java.nio.charset.StandardCharsets;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.openhab.core.config.discovery.inbox.Inbox;
import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
import org.openhab.core.io.transport.mqtt.MqttConnectionState;
import org.openhab.core.items.ItemProvider;
import org.openhab.core.items.ItemRegistry;
import org.openhab.core.items.ManagedItemProvider;
import org.openhab.core.test.java.JavaOSGiTest;
import org.openhab.core.thing.ManagedThingProvider;
import org.openhab.core.thing.ThingProvider;
import org.openhab.core.thing.link.ItemChannelLinkProvider;
import org.openhab.core.thing.link.ManagedItemChannelLinkProvider;
import io.moquette.BrokerConstants;
import io.moquette.broker.Server;
/**
* Creates a Moquette MQTT broker instance and a {@link MqttBrokerConnection} for testing MQTT bindings.
*
* @author Wouter Born - Initial contribution
* @author Sami Salonen - Copied to MQTT Ruuvi Gateway addon
*/
@NonNullByDefault
public class MqttOSGiTest extends JavaOSGiTest {
private static final String BROKER_ID = "test-broker";
@SuppressWarnings("null")
private static final int BROKER_PORT = Integer.getInteger("mqttbroker.port", 1883);
protected @NonNullByDefault({}) MqttBrokerConnection brokerConnection;
private Server moquetteServer = new Server();
protected @NonNullByDefault({}) ManagedThingProvider thingProvider;
protected @NonNullByDefault({}) ManagedItemProvider itemProvider;
protected @NonNullByDefault({}) ItemRegistry itemRegistry;
protected @NonNullByDefault({}) ManagedItemChannelLinkProvider itemChannelLinkProvider;
protected @NonNullByDefault({}) Inbox inbox;
@BeforeEach
public void beforeEach() throws Exception {
registerVolatileStorageService();
thingProvider = getService(ThingProvider.class, ManagedThingProvider.class);
assertNotNull(thingProvider, "Could not get ManagedThingProvider");
itemProvider = getService(ItemProvider.class, ManagedItemProvider.class);
assertNotNull(itemProvider, "Could not get ManagedItemProvider");
itemRegistry = getService(ItemRegistry.class);
assertNotNull(itemProvider, "Could not get ItemRegistry");
itemChannelLinkProvider = getService(ItemChannelLinkProvider.class, ManagedItemChannelLinkProvider.class);
assertNotNull(itemChannelLinkProvider, "Could not get ManagedItemChannelLinkProvider");
inbox = getService(Inbox.class);
assertNotNull(inbox, "Could not get Inbox");
moquetteServer = new Server();
moquetteServer.startServer(brokerProperties());
brokerConnection = createBrokerConnection(BROKER_ID);
}
@AfterEach
public void afterEach() throws Exception {
brokerConnection.stop().get(5, TimeUnit.SECONDS);
moquetteServer.stopServer();
}
private Properties brokerProperties() {
Properties properties = new Properties();
properties.put(BrokerConstants.HOST_PROPERTY_NAME, BrokerConstants.HOST);
properties.put(BrokerConstants.PORT_PROPERTY_NAME, String.valueOf(BROKER_PORT));
properties.put(BrokerConstants.SSL_PORT_PROPERTY_NAME, BrokerConstants.DISABLED_PORT_BIND);
properties.put(BrokerConstants.WEB_SOCKET_PORT_PROPERTY_NAME, BrokerConstants.DISABLED_PORT_BIND);
properties.put(BrokerConstants.WSS_PORT_PROPERTY_NAME, BrokerConstants.DISABLED_PORT_BIND);
return properties;
}
protected MqttBrokerConnection createBrokerConnection(String clientId) throws Exception {
MqttBrokerConnection connection = new MqttBrokerConnection(BrokerConstants.HOST, BROKER_PORT, false, clientId);
connection.setQos(1);
connection.start().get(5, TimeUnit.SECONDS);
waitForAssert(() -> assertThat(connection.connectionState(), is(MqttConnectionState.CONNECTED)));
return connection;
}
protected CompletableFuture<Boolean> publish(String topic, String message) {
return brokerConnection.publish(topic, message.getBytes(StandardCharsets.UTF_8), 1, true);
}
/**
* Whether tests are run in Continuous Integration environment, i.e. Jenkins or Travis CI
*
* Travis CI is detected using CI environment variable, see https://docs.travis-ci.com/us>
* Jenkins CI is detected using JENKINS_HOME environment variable
*
* @return
*/
protected boolean isRunningInCI() {
String jenkinsHome = System.getenv("JENKINS_HOME");
return "true".equals(System.getenv("CI")) || (jenkinsHome != null && !jenkinsHome.isBlank());
}
}

View File

@@ -0,0 +1,574 @@
/**
* Copyright (c) 2010-2023 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.binding.mqtt.ruuvigateway;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.*;
import static org.openhab.binding.mqtt.ruuvigateway.internal.RuuviGatewayBindingConstants.*;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.math.BigDecimal;
import java.time.Instant;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.measure.quantity.Acceleration;
import javax.measure.quantity.Dimensionless;
import javax.measure.quantity.ElectricPotential;
import javax.measure.quantity.Power;
import javax.measure.quantity.Pressure;
import javax.measure.quantity.Temperature;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.openhab.binding.mqtt.discovery.MQTTTopicDiscoveryService;
import org.openhab.binding.mqtt.ruuvigateway.internal.RuuviGatewayBindingConstants;
import org.openhab.binding.mqtt.ruuvigateway.internal.discovery.RuuviGatewayDiscoveryService;
import org.openhab.binding.mqtt.ruuvigateway.internal.handler.RuuviTagHandler;
import org.openhab.core.config.core.Configuration;
import org.openhab.core.config.discovery.DiscoveryResult;
import org.openhab.core.io.transport.mqtt.MqttBrokerConnection;
import org.openhab.core.io.transport.mqtt.MqttConnectionObserver;
import org.openhab.core.io.transport.mqtt.MqttConnectionState;
import org.openhab.core.items.GenericItem;
import org.openhab.core.library.CoreItemFactory;
import org.openhab.core.library.types.DateTimeType;
import org.openhab.core.library.types.DecimalType;
import org.openhab.core.library.types.QuantityType;
import org.openhab.core.library.types.StringType;
import org.openhab.core.library.unit.SIUnits;
import org.openhab.core.library.unit.Units;
import org.openhab.core.thing.Bridge;
import org.openhab.core.thing.Channel;
import org.openhab.core.thing.ChannelUID;
import org.openhab.core.thing.Thing;
import org.openhab.core.thing.ThingStatus;
import org.openhab.core.thing.ThingStatusDetail;
import org.openhab.core.thing.ThingStatusInfo;
import org.openhab.core.thing.ThingTypeUID;
import org.openhab.core.thing.ThingUID;
import org.openhab.core.thing.binding.ThingHandler;
import org.openhab.core.thing.binding.builder.BridgeBuilder;
import org.openhab.core.thing.binding.builder.ChannelBuilder;
import org.openhab.core.thing.binding.builder.ThingBuilder;
import org.openhab.core.thing.link.ItemChannelLink;
import org.openhab.core.types.State;
import org.openhab.core.types.UnDefType;
/**
* A full implementation test, that starts the embedded MQTT broker and publishes test data
*
* @author David Graeff - Initial contribution
* @author Sami Salonen - Adapted and extended to Ruuvi Gateway tests
*/
@NonNullByDefault
public class RuuviGatewayTest extends MqttOSGiTest {
private static final String BASE_TOPIC_RUUVI = "ruuvi";
private static final Map<String, String> CHANNEL_TO_ITEM_TYPE = new HashMap<>();
static {
CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_ACCELERATIONX, "Number:Acceleration");
CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_ACCELERATIONY, "Number:Acceleration");
CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_ACCELERATIONZ, "Number:Acceleration");
CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_BATTERY, "Number:ElectricPotential");
CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_DATA_FORMAT, "Number");
CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_HUMIDITY, "Number:Dimensionless");
CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_MEASUREMENT_SEQUENCE_NUMBER, "Number");
CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_MOVEMENT_COUNTER, "Number");
CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_PRESSURE, "Number:Pressure");
CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_TEMPERATURE, "Number:Temperature");
CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_TX_POWER, "Number:Power");
CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_RSSI, "Number:Power");
CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_TS, "DateTime");
CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_GWTS, "DateTime");
CHANNEL_TO_ITEM_TYPE.put(CHANNEL_ID_GWMAC, "String");
}
private ThingStatusInfoChangedSubscriber statusSubscriber = new ThingStatusInfoChangedSubscriber();
private @NonNullByDefault({}) MqttBrokerConnection mqttConnection;
private int registeredTopics = 100;
private @NonNullByDefault({}) ScheduledExecutorService scheduler;
/**
* Create an observer that fails the test as soon as the broker client connection changes its connection state
* to something else then CONNECTED.
*/
private MqttConnectionObserver failIfChange = (state, error) -> assertThat(state,
is(MqttConnectionState.CONNECTED));
@SuppressWarnings("unused") // used indirectly with Inbox
private @NonNullByDefault({}) RuuviGatewayDiscoveryService ruuviDiscoveryService;
private Set<Thing> things = new HashSet<>();
private Bridge createMqttBrokerBridge() {
Configuration configuration = new Configuration();
configuration.put("host", "127.0.0.1");
configuration.put("port", brokerConnection.getPort());
Bridge bridge = BridgeBuilder.create(new ThingTypeUID("mqtt", "broker"), "mybroker").withLabel("MQTT Broker")
.withConfiguration(configuration).build();
thingProvider.add(bridge);
waitForAssert(() -> assertNotNull(bridge.getHandler()));
assertNotNull(bridge.getConfiguration());
things.add(bridge);
return bridge;
}
private Thing createRuuviThing(String brokerPrefix, String topic, @Nullable Integer timeoutMillisecs) {
Configuration configuration = new Configuration();
configuration.put(RuuviGatewayBindingConstants.CONFIGURATION_PROPERTY_TOPIC, topic);
if (timeoutMillisecs != null) {
configuration.put(RuuviGatewayBindingConstants.CONFIGURATION_PROPERTY_TIMEOUT, timeoutMillisecs);
}
ThingUID bridgeThingUID = new ThingUID("mqtt", "broker", "mybroker");
ThingUID thingUID = new ThingUID(RuuviGatewayBindingConstants.THING_TYPE_BEACON,
topic.replaceAll("[:_/]", "_"));
ThingBuilder thingBuilder = ThingBuilder.create(RuuviGatewayBindingConstants.THING_TYPE_BEACON, thingUID)
.withBridge(bridgeThingUID).withLabel("Ruuvi " + topic).withConfiguration(configuration);
CHANNEL_TO_ITEM_TYPE.forEach((channelId, _itemType) -> {
thingBuilder.withChannel(ChannelBuilder.create(new ChannelUID(thingUID, channelId)).build());
});
Thing thing = thingBuilder.build();
thingProvider.add(thing);
waitForAssert(() -> assertNotNull(thing.getHandler()));
assertNotNull(thing.getConfiguration());
things.add(thing);
return thing;
}
private void triggerTimeoutHandling(Thing ruuviThing) {
// Simulate some time passing, so that RuuviTagHandler.heartbeat() is called twice
// Two heartbeat calls happens to trigger timeout handling in handler, one is not enough.
// (this is really implementation detail of RuuviTagHandler, making this test slightly
// error prone to possible changes in RuuviTagHandler implementation)
//
// 0. Assume some data received already, RuuviTagHandler.receivedData is true
// 1. First heartbeat sets receivedData=false; no further action is taken yet
// 2. Second heartbeat acts on false receivedData, e.g. updating Thing Status
for (int i = 0; i < 2; i++) {
callInternalHeartbeat(ruuviThing);
}
}
private void callInternalHeartbeat(Thing ruuviThing) {
ThingHandler handler = ruuviThing.getHandler();
Objects.requireNonNull(handler);
assertInstanceOf(RuuviTagHandler.class, handler);
RuuviTagHandler ruuviHandler = (RuuviTagHandler) handler;
try {
Method heartbeatMethod = RuuviTagHandler.class.getDeclaredMethod("heartbeat");
Objects.requireNonNull(heartbeatMethod);
heartbeatMethod.setAccessible(true);
heartbeatMethod.invoke(ruuviHandler);
} catch (NoSuchMethodException | SecurityException | IllegalAccessException | IllegalArgumentException
| InvocationTargetException e) {
fail("Failed to call heartbeat method of thing handler via reflection. Bug in test? Details: "
+ e.getClass().getSimpleName() + ": " + e.getMessage());
throw new RuntimeException(e);
}
}
private String itemName(ChannelUID channelUID) {
return channelUID.getAsString().replace(":", "_");
}
private String linkChannelToAutogeneratedItem(ChannelUID channelUID) {
String itemName = itemName(channelUID);
String itemType = CHANNEL_TO_ITEM_TYPE.get(channelUID.getId());
GenericItem item = new CoreItemFactory().createItem(itemType, itemName);
assertNotNull(item, itemType);
itemProvider.add(item);
itemChannelLinkProvider.add(new ItemChannelLink(itemName, channelUID));
return itemName;
}
@Override
@BeforeEach
public void beforeEach() throws Exception {
super.beforeEach();
statusSubscriber.statusUpdates.clear();
registerService(statusSubscriber);
MQTTTopicDiscoveryService mqttTopicDiscoveryService = getService(MQTTTopicDiscoveryService.class);
assertNotNull(mqttTopicDiscoveryService);
ruuviDiscoveryService = new RuuviGatewayDiscoveryService(mqttTopicDiscoveryService);
createMqttBrokerBridge();
mqttConnection = createBrokerConnection("myclientid");
// If the connection state changes in between -> fail
mqttConnection.addConnectionObserver(failIfChange);
List<CompletableFuture<Boolean>> futures = new ArrayList<>();
futures.add(publish(BASE_TOPIC_RUUVI + "/mygwid/DE:AD:BE:EF:AA:00", "{}"));
registeredTopics = futures.size();
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(5, TimeUnit.SECONDS);
scheduler = new ScheduledThreadPoolExecutor(6);
}
@Override
@AfterEach
public void afterEach() throws Exception {
if (mqttConnection != null) {
mqttConnection.removeConnectionObserver(failIfChange);
mqttConnection.stop().get(5, TimeUnit.SECONDS);
}
things.stream().map(thing -> thingProvider.remove(thing.getUID()));
unregisterService(statusSubscriber);
if (scheduler != null) {
scheduler.shutdownNow();
}
super.afterEach();
}
@Test
public void retrieveAllRuuviPrefixedTopics() throws Exception {
CountDownLatch c = new CountDownLatch(registeredTopics);
mqttConnection.subscribe(BASE_TOPIC_RUUVI + "/#", (topic, payload) -> c.countDown()).get(5, TimeUnit.SECONDS);
assertTrue(c.await(5, TimeUnit.SECONDS),
"Connection " + mqttConnection.getClientId() + " not retrieving all topics ");
}
private void assertThingStatus(List<ThingStatusInfo> statusUpdates, int index, ThingStatus status,
@Nullable ThingStatusDetail detail, @Nullable String description) {
assertTrue(statusUpdates.size() > index,
String.format("Not enough status updates. Expected %d, but only had %d. Status updates received: %s",
index + 1, statusUpdates.size(),
statusUpdates.stream().map(ThingStatusInfo::getStatus).collect(Collectors.toList())));
assertEquals(status, statusUpdates.get(index).getStatus(), statusUpdates.get(index).toString());
assertEquals(detail, statusUpdates.get(index).getStatusDetail(), statusUpdates.get(index).toString());
assertEquals(description, statusUpdates.get(index).getDescription(), statusUpdates.get(index).toString());
}
@SuppressWarnings("null")
private void assertThingStatusWithDescriptionPattern(List<ThingStatusInfo> statusUpdates, int index,
ThingStatus status, ThingStatusDetail detail, String descriptionPattern) {
assertTrue(statusUpdates.size() > index, "assert " + statusUpdates.size() + " > " + index + " failed");
assertEquals(status, statusUpdates.get(index).getStatus(), statusUpdates.get(index).toString());
assertEquals(detail, statusUpdates.get(index).getStatusDetail(), statusUpdates.get(index).toString());
assertTrue(statusUpdates.get(index).getDescription().matches(descriptionPattern),
statusUpdates.get(index).toString());
}
private void assertThingStatus(List<ThingStatusInfo> statusUpdates, int index, ThingStatus status) {
assertThingStatus(statusUpdates, index, status, ThingStatusDetail.NONE, null);
}
private void assertItems(Function<String, State> channelStateGetter, String temperatureCelsius,
String accelerationXStandardGravity, String accelerationYStandardGravity,
String accelerationZStandardGravity, String batteryVolt, int dataFormat, String humidityPercent,
int measurementSequenceNumber, int movementCounter, String pressurePascal, String txPowerDecibelMilliwatts,
String rssiDecibelMilliwatts, Instant ts, Instant gwts, String gwMac) {
assertEquals(new QuantityType<Temperature>(new BigDecimal(temperatureCelsius), SIUnits.CELSIUS),
channelStateGetter.apply(CHANNEL_ID_TEMPERATURE));
assertEquals(
new QuantityType<Acceleration>(new BigDecimal(accelerationXStandardGravity), Units.STANDARD_GRAVITY),
channelStateGetter.apply(CHANNEL_ID_ACCELERATIONX));
assertEquals(
new QuantityType<Acceleration>(new BigDecimal(accelerationYStandardGravity), Units.STANDARD_GRAVITY),
channelStateGetter.apply(CHANNEL_ID_ACCELERATIONY));
assertEquals(
new QuantityType<Acceleration>(new BigDecimal(accelerationZStandardGravity), Units.STANDARD_GRAVITY),
channelStateGetter.apply(CHANNEL_ID_ACCELERATIONZ));
assertEquals(new QuantityType<ElectricPotential>(new BigDecimal(batteryVolt), Units.VOLT),
channelStateGetter.apply(CHANNEL_ID_BATTERY));
assertEquals(new DecimalType(dataFormat), channelStateGetter.apply(CHANNEL_ID_DATA_FORMAT));
assertEquals(new QuantityType<Dimensionless>(new BigDecimal(humidityPercent), Units.PERCENT),
channelStateGetter.apply(CHANNEL_ID_HUMIDITY));
assertEquals(new DecimalType(new BigDecimal(measurementSequenceNumber)),
channelStateGetter.apply(CHANNEL_ID_MEASUREMENT_SEQUENCE_NUMBER));
assertEquals(new DecimalType(new BigDecimal(movementCounter)),
channelStateGetter.apply(CHANNEL_ID_MOVEMENT_COUNTER));
assertEquals(new QuantityType<Pressure>(new BigDecimal(pressurePascal), SIUnits.PASCAL),
channelStateGetter.apply(CHANNEL_ID_PRESSURE));
assertEquals(new QuantityType<Power>(new BigDecimal(txPowerDecibelMilliwatts), Units.DECIBEL_MILLIWATTS),
channelStateGetter.apply(CHANNEL_ID_TX_POWER));
assertEquals(new QuantityType<Power>(new BigDecimal(rssiDecibelMilliwatts), Units.DECIBEL_MILLIWATTS),
channelStateGetter.apply(CHANNEL_ID_RSSI));
assertEquals(new DateTimeType(ts.atZone(ZoneId.of("UTC"))), channelStateGetter.apply(CHANNEL_ID_TS));
assertEquals(new DateTimeType(gwts.atZone(ZoneId.of("UTC"))), channelStateGetter.apply(CHANNEL_ID_GWTS));
assertEquals(new StringType(gwMac), channelStateGetter.apply(CHANNEL_ID_GWMAC));
}
@ParameterizedTest
@CsvSource(delimiter = '@', value = { //
BASE_TOPIC_RUUVI + "mygwid/DE:AD:BE:EF:AA:01 @" + "{}", // empty json
BASE_TOPIC_RUUVI + "mygwid/DE:AD:BE:EF:BB:03 @" + "invalid json", // invalid json
BASE_TOPIC_RUUVI + "mygwid/DE:AD:BE:EF:BB:04 @" + "0201061BFF990405", // payload too short
BASE_TOPIC_RUUVI + "mygwid/DE:AD:BE:EF:BB:05 @"
+ "0201061BFF99050512FC5394C37C0004FFFC040CAC364200CDCBB8334C884F", // wrong manufacturer id (the
// two bytes after FF do not
// match 99 04)
BASE_TOPIC_RUUVI + "mygwid/DE:AD:BE:EF:BB:06 @"
+ "0201061BFA99040512FC5394C37C0004FFFC040CAC364200CDCBB8334C884F", // unexpected advertisement (no
// FF to indicate 'manufacturer
// specific' advertisement)
BASE_TOPIC_RUUVI + "mygwid/DE:AD:BE:EF:BB:07 @" + "{" + " \"gw_mac\": \"DE:AD:BE:EF:00\","
+ " \"rssi\": -82," + " \"aoa\": [],"
// data field is number, not a string
+ " \"gwts\": \"1659365432\"," + " \"ts\": \"1659365222\"," + " \"data\": 999,"
+ " \"coords\": \"\" }", // wrong json data types
})
public void testInvalidCases(String topic, String val) throws Exception {
final String jsonPayload;
if (val.contains("{")) {
// test argument is specifiying the whole json payload
jsonPayload = val;
} else {
// test argument is only specifiying the data field in the json payload
// Fill rest of the fields with some valid values
jsonPayload = "{" + " \"gw_mac\": \"DE:AD:BE:EF:00\"," + " \"rssi\": -82," + " \"aoa\": [],"
+ " \"gwts\": \"1659365432\"," + " \"ts\": \"1659365222\"," + " \"data\": \"" + val + "\","
+ " \"coords\": \"\" }";
}
Thing ruuviThing = createRuuviThing("mygwid", topic, 100);
waitForAssert(() -> {
List<ThingStatusInfo> statusUpdates = statusSubscriber.statusUpdates.get(ruuviThing.getUID());
assertNotNull(statusUpdates);
int statusUpdateIndex = 0;
assertThingStatus(statusUpdates, statusUpdateIndex++, ThingStatus.INITIALIZING);
assertThingStatus(statusUpdates, statusUpdateIndex++, ThingStatus.UNKNOWN);
assertThingStatus(statusUpdates, statusUpdateIndex++, ThingStatus.ONLINE, ThingStatusDetail.NONE,
"Waiting for initial data");
assertThingStatus(statusUpdates, statusUpdateIndex++, ThingStatus.OFFLINE,
ThingStatusDetail.COMMUNICATION_ERROR, "No valid data received for some time");
scheduler.execute(() -> publish(topic, jsonPayload));
assertThingStatusWithDescriptionPattern(statusUpdates, statusUpdateIndex++, ThingStatus.OFFLINE,
ThingStatusDetail.COMMUNICATION_ERROR, ".*could not be parsed.*");
assertEquals(statusUpdateIndex, statusUpdates.size());
});
}
@SuppressWarnings("null")
@Test
public void testDiscovery() {
scheduler.execute(() -> publish(BASE_TOPIC_RUUVI + "/mygwid/DE:AD:BE:EF:BB:02",
"{" + " \"gw_mac\": \"DE:AD:BE:EF:00\"," + " \"rssi\": -82," + " \"aoa\": [],"
+ " \"gwts\": \"1659365432\"," + " \"ts\": \"1659365222\","
+ " \"data\": \"0201061BFF99040512FC5394C37C0004FFFC040CAC364200CDCBB8334C884F\","
+ " \"coords\": \"\" }"));
waitForAssert(() -> {
assertEquals(2, inbox.getAll().size(), inbox.getAll().toString());
var discovered = new HashSet<DiscoveryResult>();
discovered.addAll(inbox.getAll());
for (var result : discovered) {
assertEquals(THING_TYPE_BEACON, result.getThingTypeUID());
assertEquals("topic", result.getRepresentationProperty());
Object topic = result.getProperties().get("topic");
assertNotNull(topic);
assertTrue(
// published in this test
topic.equals((BASE_TOPIC_RUUVI + "/mygwid/DE:AD:BE:EF:BB:02"))
// published in beforeEach
|| result.getProperties().get("topic")
.equals((BASE_TOPIC_RUUVI + "/mygwid/DE:AD:BE:EF:AA:00")));
}
});
}
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testHappyFlow(boolean quickTimeout) {
// with quickTimeout=false, heartbeat is effectively disabled. Thing will not "timeout" and go OFFLINE
// with quickTimeout=true, timeout happens very fast. In CI we use infinite timeout and trigger timeout manually
Thing ruuviThing = createRuuviThing("mygwid", BASE_TOPIC_RUUVI + "/mygwid/DE:AD:BE:EF:BB:02",
quickTimeout ? (isRunningInCI() ? 9_000_000 : 100) : 9_000_000);
// Link all channels to freshly created items
ruuviThing.getChannels().stream().map(Channel::getUID).forEach(this::linkChannelToAutogeneratedItem);
@SuppressWarnings("null")
Function<String, State> getItemState = channelId -> itemRegistry
.get(itemName(ruuviThing.getChannel(channelId).getUID())).getState();
AtomicInteger statusUpdateIndex = new AtomicInteger();
waitForAssert(() -> {
List<ThingStatusInfo> statusUpdates = statusSubscriber.statusUpdates.get(ruuviThing.getUID());
assertNotNull(statusUpdates);
assertThingStatus(statusUpdates, statusUpdateIndex.get(), ThingStatus.INITIALIZING);
assertThingStatus(statusUpdates, statusUpdateIndex.get() + 1, ThingStatus.UNKNOWN);
assertThingStatus(statusUpdates, statusUpdateIndex.get() + 2, ThingStatus.ONLINE, ThingStatusDetail.NONE,
"Waiting for initial data");
statusUpdateIndex.set(statusUpdateIndex.get() + 3);
});
List<ThingStatusInfo> statusUpdates = statusSubscriber.statusUpdates.get(ruuviThing.getUID());
assertNotNull(statusUpdates);
if (quickTimeout) {
if (isRunningInCI()) {
triggerTimeoutHandling(ruuviThing);
}
waitForAssert(() -> {
assertThingStatus(statusUpdates, statusUpdateIndex.get(), ThingStatus.OFFLINE,
ThingStatusDetail.COMMUNICATION_ERROR, "No valid data received for some time");
CHANNEL_TO_ITEM_TYPE.keySet()
.forEach(channelId -> assertEquals(UnDefType.UNDEF, getItemState.apply(channelId)));
statusUpdateIndex.incrementAndGet();
});
}
// publish some valid data ("valid case" test vector from
// https://docs.ruuvi.com/communication/bluetooth-advertisements/data-format-5-rawv2)
scheduler.execute(() -> publish(BASE_TOPIC_RUUVI + "/mygwid/DE:AD:BE:EF:BB:02",
"{" + " \"gw_mac\": \"DE:AD:BE:EF:00\"," + " \"rssi\": -82," + " \"aoa\": [],"
+ " \"gwts\": \"1659365432\"," + " \"ts\": \"1659365222\","
+ " \"data\": \"0201061BFF99040512FC5394C37C0004FFFC040CAC364200CDCBB8334C884F\","
+ " \"coords\": \"\" }"));
waitForAssert(() -> {
assertThingStatus(statusUpdates, statusUpdateIndex.get(), ThingStatus.ONLINE);
statusUpdateIndex.incrementAndGet();
});
waitForAssert(() -> {
assertItems(getItemState, //
"24.3", // temperature, Celsius
"0.004", // acc X, g
"-0.004", // acc Y, g
"1.036", // acc Z, g
"2.9770000000000003", // battery, volt
5, // data format
"53.49", // humidity %
205, // measurement seq
66, // movement
"100044", // pressure, pascal
"4", // tx power, dBm
"-82", // RSSI, dBm
Instant.ofEpochSecond(1659365222), // ts
Instant.ofEpochSecond(1659365432), // gwts
"DE:AD:BE:EF:00" // gw mac
);
});
if (quickTimeout) {
if (isRunningInCI()) {
triggerTimeoutHandling(ruuviThing);
}
waitForAssert(() -> {
assertThingStatus(statusUpdates, statusUpdateIndex.get(), ThingStatus.OFFLINE,
ThingStatusDetail.COMMUNICATION_ERROR, "No valid data received for some time");
CHANNEL_TO_ITEM_TYPE.keySet()
.forEach(channelId -> assertEquals(UnDefType.UNDEF, getItemState.apply(channelId)));
statusUpdateIndex.incrementAndGet();
});
}
// Another mqtt update (("minimum values" test vector from
// https://docs.ruuvi.com/communication/bluetooth-advertisements/data-format-5-rawv2)
scheduler.execute(() -> publish(BASE_TOPIC_RUUVI + "/mygwid/DE:AD:BE:EF:BB:02",
"{" + " \"gw_mac\": \"DE:AD:BE:EF:00\"," + " \"rssi\": -66," + " \"aoa\": [],"
+ " \"gwts\": \"1659365431\"," + " \"ts\": \"1659365221\","
+ " \"data\": \"0201061BFF9904058001000000008001800180010000000000CBB8334C884F\","
+ " \"coords\": \"\" }"));
if (quickTimeout) {
// With quick timeout we were previously offline, so now we should be back online
// with valid channels.
waitForAssert(() -> {
assertThingStatus(statusUpdates, statusUpdateIndex.get(), ThingStatus.ONLINE);
statusUpdateIndex.getAndIncrement();
});
// ...after a while all items are updated
waitForAssert(() -> {
assertItems(getItemState, //
"-163.835", // temperature, Celsius
"-32.767", // acc X, g
"-32.767", // acc Y, g
"-32.767", // acc Z, g
"1.6", // battery, volt
5, // data format
"0.0", // humidity %
0, // measurement seq
0, // movement
"50000", // pressure, pascal
"-40", // tx power, dBm
"-66", // RSSI, dBm
Instant.ofEpochSecond(1659365221), // ts
Instant.ofEpochSecond(1659365431), // gwts
"DE:AD:BE:EF:00" // gw mac
);
});
// ...after which timeout will happen again
if (isRunningInCI()) {
triggerTimeoutHandling(ruuviThing);
}
waitForAssert(() -> {
assertThingStatus(statusUpdates, statusUpdateIndex.get(), ThingStatus.OFFLINE,
ThingStatusDetail.COMMUNICATION_ERROR, "No valid data received for some time");
CHANNEL_TO_ITEM_TYPE.keySet()
.forEach(channelId -> assertEquals(UnDefType.UNDEF, getItemState.apply(channelId)));
statusUpdateIndex.getAndIncrement();
});
} else {
// with non-quick timeout we are still online, and items are updated
waitForAssert(() -> {
assertItems(getItemState, //
"-163.835", // temperature, Celsius
"-32.767", // acc X, g
"-32.767", // acc Y, g
"-32.767", // acc Z, g
"1.6", // battery, volt
5, // data format
"0.0", // humidity %
0, // measurement seq
0, // movement
"50000", // pressure, pascal
"-40", // tx power, dBm
"-66", // RSSI, dBm
Instant.ofEpochSecond(1659365221), // ts
Instant.ofEpochSecond(1659365431), // gwts
"DE:AD:BE:EF:00" // gw mac
);
});
}
// assert that we have processed all status updates
assertEquals(statusUpdateIndex.get(), statusUpdates.size());
}
}

View File

@@ -0,0 +1,70 @@
/**
* Copyright (c) 2010-2023 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.binding.mqtt.ruuvigateway;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import org.eclipse.jdt.annotation.NonNull;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.openhab.core.events.Event;
import org.openhab.core.events.EventFilter;
import org.openhab.core.events.EventSubscriber;
import org.openhab.core.thing.ThingStatusInfo;
import org.openhab.core.thing.ThingUID;
import org.openhab.core.thing.events.ThingStatusInfoChangedEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test utility capturing thing status updates
*
* @author Sami Salonen - Initial contribution
*/
@NonNullByDefault
public class ThingStatusInfoChangedSubscriber implements EventSubscriber {
private final Logger logger = LoggerFactory.getLogger(ThingStatusInfoChangedSubscriber.class);
public Map<ThingUID, List<ThingStatusInfo>> statusUpdates = new HashMap<>();
@Override
public Set<@NonNull String> getSubscribedEventTypes() {
return Collections.singleton(ThingStatusInfoChangedEvent.TYPE);
}
@Override
public @Nullable EventFilter getEventFilter() {
return null;
}
@Override
public void receive(Event event) {
// Expecting only state updates in the tests
assertInstanceOf(ThingStatusInfoChangedEvent.class, event);
ThingStatusInfoChangedEvent statusEvent = (ThingStatusInfoChangedEvent) event;
logger.trace("Captured event: {} ", event);
List<ThingStatusInfo> updates = statusUpdates.computeIfAbsent(statusEvent.getThingUID(),
item -> new CopyOnWriteArrayList<>());
Objects.requireNonNull(updates); // To make compiler happy
updates.add(statusEvent.getStatusInfo());
}
}