[influxdb] Add option for using metadata value as measurement name (#9943)
* Add option for using metadata value as measurement name Also-by: Joan Pujol <joanpujol@gmail.com> Signed-off-by: Johannes Ott <info@johannes-ott.net>
This commit is contained in:
parent
bc3725a9a3
commit
cdd99c9e5b
@ -1,18 +1,16 @@
|
||||
# InfluxDB (0.9 and newer) Persistence
|
||||
|
||||
This service allows you to persist and query states using the [InfluxDB](https://www.influxdata.com/products/influxdb-overview/) and [InfluxDB 2.0](https://v2.docs.influxdata.com/v2.0/) time series database. The persisted values can be queried from within openHAB.
|
||||
This service allows you to persist and query states using the [InfluxDB](https://www.influxdata.com/products/influxdb-overview/) and [InfluxDB 2.0](https://v2.docs.influxdata.com/v2.0/) time series database. The persisted values can be queried from within openHAB.
|
||||
There also are nice tools on the web for visualizing InfluxDB time series, such as [Grafana](http://grafana.org/) and new Influx DB 2.0 version introduces [powerful data processing features.](https://docs.influxdata.com/influxdb/v2.0/process-data/get-started/)
|
||||
|
||||
## Database Structure
|
||||
|
||||
|
||||
- This service allows you to persist and query states using the time series database.
|
||||
- The states of an item are persisted in *measurements* points with names equal to the name of the item, or the alias, if one is provided. In both variants, a *tag* named "item" is added, containing the item name.
|
||||
All values are stored in a *field* called "value" using the following types:
|
||||
- **float** for DecimalType and QuantityType
|
||||
- **integer** for `OnOffType` and `OpenClosedType` (values are stored using 0 or 1) and `DateTimeType` (milliseconds since 1970-01-01T00:00:00Z)
|
||||
- **string** for the rest of types
|
||||
|
||||
- The states of an item are persisted in _measurements_ points with names equal to the name of the item, its alias, or from some metadata depending on the configuration. In all variants, a tag named "item" is added, containing the item name.
|
||||
All values are stored in a _field_ called "value" using the following types:
|
||||
- **float** for DecimalType and QuantityType
|
||||
- **integer** for `OnOffType` and `OpenClosedType` (values are stored using 0 or 1) and `DateTimeType` (milliseconds since 1970-01-01T00:00:00Z)
|
||||
- **string** for the rest of types
|
||||
- If configured, extra tags for item category, label or type can be added fore each point.
|
||||
|
||||
Some example entries for an item with the name "speedtest" without any further configuration would look like this:
|
||||
@ -22,34 +20,85 @@ Some example entries for an item with the name "speedtest" without any further c
|
||||
|> range(start: -30d)
|
||||
|> filter(fn: (r) => r._measurement == "speedtest")
|
||||
name: speedtest
|
||||
|
||||
|
||||
_time _item _value
|
||||
----- ----- ------
|
||||
1558302027124000000 speedtest 123289369.0
|
||||
1558332852716000000 speedtest 80423789.0
|
||||
|
||||
|
||||
## Prerequisites
|
||||
|
||||
First of all you have to setup and run an InfluxDB 1.X or 2.X server.
|
||||
This is very easy and you will find good documentation on it on the
|
||||
First of all, you have to setup and run an InfluxDB 1.X or 2.X server.
|
||||
This is very easy and you will find good documentation on it on the
|
||||
[InfluxDB web site for 2.X version](https://v2.docs.influxdata.com/v2.0/get-started/) and [InfluxDB web site for 1.X version](https://docs.influxdata.com/influxdb/v1.7/).
|
||||
|
||||
## Configuration
|
||||
|
||||
This service can be configured in the file `services/influxdb.cfg`.
|
||||
|
||||
| Property | Default | Required | Description |
|
||||
|------------------------------------|-------------------------|----------|------------------------------------------|
|
||||
| version | V1 | No | InfluxDB database version V1 for 1.X and V2 for 2.x|
|
||||
| url | http://127.0.0.1:8086 | No | database URL |
|
||||
| user | openhab | No | name of the database user, e.g. `openhab`|
|
||||
| password | | No(*) | password of the database user you choose |
|
||||
| token | | No(*) | token to authenticate the database (only for V2) [Intructions about how to create one](https://v2.docs.influxdata.com/v2.0/security/tokens/create-token/) |
|
||||
| db | openhab | No | name of the database for V1 and name of the organization for V2 |
|
||||
| retentionPolicy | autogen | No | name of the retention policy for V1 and name of the bucket for V2 |
|
||||
| Property | Default | Required | Description |
|
||||
| --------------- | --------------------- | -------- | --------------------------------------------------------------------------------------------------------------------------------------------------------- |
|
||||
| version | V1 | No | InfluxDB database version V1 for 1.X and V2 for 2.x |
|
||||
| url | http://127.0.0.1:8086 | No | database URL |
|
||||
| user | openhab | No | name of the database user, e.g. `openhab` |
|
||||
| password | | No(\*) | password of the database user you choose |
|
||||
| token | | No(\*) | token to authenticate the database (only for V2) [Intructions about how to create one](https://v2.docs.influxdata.com/v2.0/security/tokens/create-token/) |
|
||||
| db | openhab | No | name of the database for V1 and name of the organization for V2 |
|
||||
| retentionPolicy | autogen | No | name of the retention policy for V1 and name of the bucket for V2 |
|
||||
|
||||
(*) For 1.X version you must provide user and password, for 2.X you can use user and password or a token. That means
|
||||
that if you use all default values at minimum you must provide a password or a token.
|
||||
(\*) For 1.X version you must provide user and password, for 2.X you can use user and password or a token. That means
|
||||
that if you use all default values at minimum you must provide a password or a token.
|
||||
|
||||
All item- and event-related configuration is defined in the file `persistence/influxdb.persist`.
|
||||
|
||||
### Additional configuration for customized storage options in InfluxDB
|
||||
|
||||
By default, the plugin writes the data to a `measurement` name equals to the `item's name` and adds a tag with key item and value `item's name` as well.
|
||||
You can customize that behavior and use a single measurement for several items using item metadata.
|
||||
|
||||
#### Measurement name by Item Metadata
|
||||
|
||||
By setting the `influxdb` metadata key you can change the name of the measurement by setting the desired name as metadata value.
|
||||
You can also add additional tags for structuring your data. For example, you can add a floor tag to all sensors to filter all sensors from the first floor or combine all temperature sensors into one measurement.
|
||||
|
||||
The item configuration will look like this:
|
||||
|
||||
```
|
||||
Group:Number:AVG gTempSensors
|
||||
|
||||
Number:Temperature tempLivingRoom (gTempSensors) { influxdb="temperature" [floor="groundfloor"] }
|
||||
Number:Temperature tempKitchen (gTempSensors) { influxdb="temperature" [floor="groundfloor"] }
|
||||
|
||||
|
||||
Number:Temperature tempBedRoom (gTempSensors) { influxdb="temperature" [floor="firstfloor"] }
|
||||
Number:Temperature tempBath (gTempSensors) { influxdb="temperature" [floor="firstfloor"] }
|
||||
|
||||
```
|
||||
|
||||
You can also set the `influxdb` metadata using the UI. From each item configuration screen do:
|
||||
|
||||
`Metadata` → `Add Metadata` → `Enter Custom Namespace` → Enter `influxdb` as namespace name → And enter your desired item name in value field. i.e.:
|
||||
|
||||
value: temperature
|
||||
config: {}
|
||||
|
||||
This will end up with one measurement named temperature and four different series inside:
|
||||
|
||||
```
|
||||
temperature,item=tempLivingRoom,floor=groundfloor
|
||||
temperature,item=tempKitchen,floor=groundfloor
|
||||
temperature,item=tempBedRoom,floor=firstfloor
|
||||
temperature,item=tempBath,floor=firstfloor
|
||||
```
|
||||
|
||||
You can now easily select all temperatures of the firstfloor or the average temperature of the groundfloor.
|
||||
|
||||
#### Extended automatic tagging
|
||||
|
||||
Besides the metadata tags, there are additional configuration parameters to activate different automatic tags generation.
|
||||
|
||||
| Property | Default | Required | Description |
|
||||
| -------------- | ------- | -------- | ---------------------------------------------------------------------------------------------------- |
|
||||
| addCategoryTag | false | no | Should the category of the item be included as tag "category"? If no category is set, "n/a" is used. |
|
||||
| addTypeTag | false | no | Should the item type be included as tag "type"? |
|
||||
| addLabelTag | false | no | Should the item label be included as tag "label"? If no label is set, "n/a" is used. |
|
||||
|
||||
@ -54,20 +54,23 @@ import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* This is the implementation of the InfluxDB {@link PersistenceService}. It persists item values
|
||||
* using the <a href="http://influxdb.org">InfluxDB time series database. The states (
|
||||
* {@link State}) of an {@link Item} are persisted by default in a time series with names equal to the name of
|
||||
* the item.
|
||||
* This is the implementation of the InfluxDB {@link PersistenceService}. It
|
||||
* persists item values using the <a href="http://influxdb.org">InfluxDB time
|
||||
* series database. The states ( {@link State}) of an {@link Item} are persisted
|
||||
* by default in a time series with names equal to the name of the item.
|
||||
*
|
||||
* This addon supports 1.X and 2.X versions, as two versions are incompatible and use different drivers the
|
||||
* specific code for each version is accessed by {@link InfluxDBRepository} and {@link FilterCriteriaQueryCreator}
|
||||
* interfaces and specific implementation reside in {@link org.openhab.persistence.influxdb.internal.influx1} and
|
||||
* This addon supports 1.X and 2.X versions, as two versions are incompatible
|
||||
* and use different drivers the specific code for each version is accessed by
|
||||
* {@link InfluxDBRepository} and {@link FilterCriteriaQueryCreator} interfaces
|
||||
* and specific implementation reside in
|
||||
* {@link org.openhab.persistence.influxdb.internal.influx1} and
|
||||
* {@link org.openhab.persistence.influxdb.internal.influx2} packages
|
||||
*
|
||||
* @author Theo Weiss - Initial contribution, rewrite of org.openhab.persistence.influxdb
|
||||
* @author Joan Pujol Espinar - Addon rewrite refactoring code and adding support for InfluxDB 2.0. Some tag code is
|
||||
* based
|
||||
* from not integrated branch from Dominik Vorreiter
|
||||
* @author Theo Weiss - Initial contribution, rewrite of
|
||||
* org.openhab.persistence.influxdb
|
||||
* @author Joan Pujol Espinar - Addon rewrite refactoring code and adding
|
||||
* support for InfluxDB 2.0. Some tag code is based from not integrated
|
||||
* branch from Dominik Vorreiter
|
||||
*/
|
||||
@NonNullByDefault
|
||||
@Component(service = { PersistenceService.class,
|
||||
@ -222,7 +225,7 @@ public class InfluxDBPersistenceService implements QueryablePersistenceService {
|
||||
filter.getItemName(), filter.getOrdering().toString(), filter.getState(), filter.getOperator(),
|
||||
filter.getBeginDate(), filter.getEndDate(), filter.getPageSize(), filter.getPageNumber());
|
||||
|
||||
String query = RepositoryFactory.createQueryCreator(configuration).createQuery(filter,
|
||||
String query = RepositoryFactory.createQueryCreator(configuration, metadataRegistry).createQuery(filter,
|
||||
configuration.getRetentionPolicy());
|
||||
logger.trace("Query {}", query);
|
||||
List<InfluxRow> results = influxDBRepository.query(query);
|
||||
|
||||
@ -0,0 +1,51 @@
|
||||
/**
|
||||
* Copyright (c) 2010-2021 Contributors to the openHAB project
|
||||
*
|
||||
* See the NOTICE file(s) distributed with this work for additional
|
||||
* information.
|
||||
*
|
||||
* This program and the accompanying materials are made available under the
|
||||
* terms of the Eclipse Public License 2.0 which is available at
|
||||
* http://www.eclipse.org/legal/epl-2.0
|
||||
*
|
||||
* SPDX-License-Identifier: EPL-2.0
|
||||
*/
|
||||
package org.openhab.persistence.influxdb.internal;
|
||||
|
||||
import org.eclipse.jdt.annotation.NonNullByDefault;
|
||||
import org.eclipse.jdt.annotation.Nullable;
|
||||
import org.openhab.core.items.Metadata;
|
||||
import org.openhab.core.items.MetadataKey;
|
||||
import org.openhab.core.items.MetadataRegistry;
|
||||
import org.openhab.persistence.influxdb.InfluxDBPersistenceService;
|
||||
|
||||
/**
|
||||
* Logic to use items metadata from an openHAB {@link Item}
|
||||
*
|
||||
* @author Johannes Ott - Initial contribution
|
||||
*/
|
||||
@NonNullByDefault
|
||||
public class InfluxDBMetadataUtils {
|
||||
|
||||
private InfluxDBMetadataUtils() {
|
||||
}
|
||||
|
||||
public static String calculateMeasurementNameFromMetadataIfPresent(
|
||||
final @Nullable MetadataRegistry currentMetadataRegistry, String name, @Nullable String itemName) {
|
||||
|
||||
if (itemName == null || currentMetadataRegistry == null) {
|
||||
return name;
|
||||
}
|
||||
|
||||
MetadataKey key = new MetadataKey(InfluxDBPersistenceService.SERVICE_NAME, itemName);
|
||||
Metadata metadata = currentMetadataRegistry.get(key);
|
||||
if (metadata != null) {
|
||||
String metaName = metadata.getValue();
|
||||
if (!metaName.isBlank()) {
|
||||
name = metaName;
|
||||
}
|
||||
}
|
||||
|
||||
return name;
|
||||
}
|
||||
}
|
||||
@ -64,6 +64,9 @@ public class ItemToStorePointCreator {
|
||||
private String calculateMeasurementName(Item item, @Nullable String storeAlias) {
|
||||
String name = storeAlias != null && !storeAlias.isBlank() ? storeAlias : item.getName();
|
||||
|
||||
name = InfluxDBMetadataUtils.calculateMeasurementNameFromMetadataIfPresent(metadataRegistry, name,
|
||||
item.getName());
|
||||
|
||||
if (configuration.isReplaceUnderscore()) {
|
||||
name = name.replace('_', '.');
|
||||
}
|
||||
|
||||
@ -13,14 +13,16 @@
|
||||
package org.openhab.persistence.influxdb.internal;
|
||||
|
||||
import org.eclipse.jdt.annotation.NonNullByDefault;
|
||||
import org.openhab.core.items.MetadataRegistry;
|
||||
import org.openhab.persistence.influxdb.internal.influx1.Influx1FilterCriteriaQueryCreatorImpl;
|
||||
import org.openhab.persistence.influxdb.internal.influx1.InfluxDB1RepositoryImpl;
|
||||
import org.openhab.persistence.influxdb.internal.influx2.Influx2FilterCriteriaQueryCreatorImpl;
|
||||
import org.openhab.persistence.influxdb.internal.influx2.InfluxDB2RepositoryImpl;
|
||||
|
||||
/**
|
||||
* Factory that returns {@link InfluxDBRepository} and {@link FilterCriteriaQueryCreator} implementations
|
||||
* depending on InfluxDB version
|
||||
* Factory that returns {@link InfluxDBRepository} and
|
||||
* {@link FilterCriteriaQueryCreator} implementations depending on InfluxDB
|
||||
* version
|
||||
*
|
||||
* @author Joan Pujol Espinar - Initial contribution
|
||||
*/
|
||||
@ -38,12 +40,13 @@ public class RepositoryFactory {
|
||||
}
|
||||
}
|
||||
|
||||
public static FilterCriteriaQueryCreator createQueryCreator(InfluxDBConfiguration influxDBConfiguration) {
|
||||
public static FilterCriteriaQueryCreator createQueryCreator(InfluxDBConfiguration influxDBConfiguration,
|
||||
MetadataRegistry metadataRegistry) {
|
||||
switch (influxDBConfiguration.getVersion()) {
|
||||
case V1:
|
||||
return new Influx1FilterCriteriaQueryCreatorImpl();
|
||||
return new Influx1FilterCriteriaQueryCreatorImpl(influxDBConfiguration, metadataRegistry);
|
||||
case V2:
|
||||
return new Influx2FilterCriteriaQueryCreatorImpl();
|
||||
return new Influx2FilterCriteriaQueryCreatorImpl(influxDBConfiguration, metadataRegistry);
|
||||
default:
|
||||
throw new UnnexpectedConditionException("Not expected version " + influxDBConfiguration.getVersion());
|
||||
}
|
||||
|
||||
@ -17,14 +17,18 @@ import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.*;
|
||||
import static org.openhab.persistence.influxdb.internal.InfluxDBStateConvertUtils.stateToObject;
|
||||
|
||||
import org.eclipse.jdt.annotation.NonNullByDefault;
|
||||
import org.eclipse.jdt.annotation.Nullable;
|
||||
import org.influxdb.dto.Query;
|
||||
import org.influxdb.querybuilder.Appender;
|
||||
import org.influxdb.querybuilder.BuiltQuery;
|
||||
import org.influxdb.querybuilder.Select;
|
||||
import org.influxdb.querybuilder.Where;
|
||||
import org.influxdb.querybuilder.clauses.SimpleClause;
|
||||
import org.openhab.core.items.MetadataRegistry;
|
||||
import org.openhab.core.persistence.FilterCriteria;
|
||||
import org.openhab.persistence.influxdb.internal.FilterCriteriaQueryCreator;
|
||||
import org.openhab.persistence.influxdb.internal.InfluxDBConfiguration;
|
||||
import org.openhab.persistence.influxdb.internal.InfluxDBMetadataUtils;
|
||||
import org.openhab.persistence.influxdb.internal.InfluxDBVersion;
|
||||
|
||||
/**
|
||||
@ -35,20 +39,33 @@ import org.openhab.persistence.influxdb.internal.InfluxDBVersion;
|
||||
@NonNullByDefault
|
||||
public class Influx1FilterCriteriaQueryCreatorImpl implements FilterCriteriaQueryCreator {
|
||||
|
||||
private InfluxDBConfiguration configuration;
|
||||
private MetadataRegistry metadataRegistry;
|
||||
|
||||
public Influx1FilterCriteriaQueryCreatorImpl(InfluxDBConfiguration configuration,
|
||||
MetadataRegistry metadataRegistry) {
|
||||
this.configuration = configuration;
|
||||
this.metadataRegistry = metadataRegistry;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String createQuery(FilterCriteria criteria, String retentionPolicy) {
|
||||
final String tableName;
|
||||
boolean hasCriteriaName = criteria.getItemName() != null;
|
||||
if (hasCriteriaName) {
|
||||
tableName = criteria.getItemName();
|
||||
} else {
|
||||
tableName = "/.*/";
|
||||
}
|
||||
final String itemName = criteria.getItemName();
|
||||
boolean hasCriteriaName = itemName != null;
|
||||
|
||||
Select select = select(COLUMN_VALUE_NAME_V1).fromRaw(null,
|
||||
fullQualifiedTableName(retentionPolicy, tableName, hasCriteriaName));
|
||||
tableName = calculateTableName(itemName);
|
||||
|
||||
Select select = select().column("\"" + COLUMN_VALUE_NAME_V1 + "\"::field")
|
||||
.column("\"" + TAG_ITEM_NAME + "\"::tag")
|
||||
.fromRaw(null, fullQualifiedTableName(retentionPolicy, tableName, hasCriteriaName));
|
||||
|
||||
Where where = select.where();
|
||||
|
||||
if (itemName != null && !tableName.equals(itemName)) {
|
||||
where = where.and(BuiltQuery.QueryBuilder.eq(TAG_ITEM_NAME, itemName));
|
||||
}
|
||||
|
||||
if (criteria.getBeginDate() != null) {
|
||||
where = where.and(
|
||||
BuiltQuery.QueryBuilder.gte(COLUMN_TIME_NAME_V1, criteria.getBeginDate().toInstant().toString()));
|
||||
@ -82,6 +99,22 @@ public class Influx1FilterCriteriaQueryCreatorImpl implements FilterCriteriaQuer
|
||||
return query.getCommand();
|
||||
}
|
||||
|
||||
private String calculateTableName(@Nullable String itemName) {
|
||||
if (itemName == null) {
|
||||
return "/.*/";
|
||||
}
|
||||
|
||||
String name = itemName;
|
||||
|
||||
name = InfluxDBMetadataUtils.calculateMeasurementNameFromMetadataIfPresent(metadataRegistry, name, itemName);
|
||||
|
||||
if (configuration.isReplaceUnderscore()) {
|
||||
name = name.replace('_', '.');
|
||||
}
|
||||
|
||||
return name;
|
||||
}
|
||||
|
||||
private String fullQualifiedTableName(String retentionPolicy, String tableName, boolean escapeTableName) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
Appender.appendName(retentionPolicy, sb);
|
||||
|
||||
@ -15,6 +15,7 @@ package org.openhab.persistence.influxdb.internal.influx1;
|
||||
import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.COLUMN_TIME_NAME_V1;
|
||||
import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.COLUMN_VALUE_NAME_V1;
|
||||
import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.FIELD_VALUE_NAME;
|
||||
import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.TAG_ITEM_NAME;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.ArrayList;
|
||||
@ -42,8 +43,10 @@ import org.slf4j.LoggerFactory;
|
||||
/**
|
||||
* Implementation of {@link InfluxDBRepository} for InfluxDB 1.0
|
||||
*
|
||||
* @author Joan Pujol Espinar - Initial contribution. Most code has been moved from
|
||||
* {@link org.openhab.persistence.influxdb.InfluxDBPersistenceService} where it was in previous version
|
||||
* @author Joan Pujol Espinar - Initial contribution. Most code has been moved
|
||||
* from
|
||||
* {@link org.openhab.persistence.influxdb.InfluxDBPersistenceService}
|
||||
* where it was in previous version
|
||||
*/
|
||||
@NonNullByDefault
|
||||
public class InfluxDB1RepositoryImpl implements InfluxDBRepository {
|
||||
@ -178,12 +181,15 @@ public class InfluxDB1RepositoryImpl implements InfluxDBRepository {
|
||||
if (columns != null) {
|
||||
Integer timestampColumn = null;
|
||||
Integer valueColumn = null;
|
||||
Integer itemNameColumn = null;
|
||||
for (int i = 0; i < columns.size(); i++) {
|
||||
String columnName = columns.get(i);
|
||||
if (columnName.equals(COLUMN_TIME_NAME_V1)) {
|
||||
timestampColumn = i;
|
||||
} else if (columnName.equals(COLUMN_VALUE_NAME_V1)) {
|
||||
valueColumn = i;
|
||||
} else if (columnName.equals(TAG_ITEM_NAME)) {
|
||||
itemNameColumn = i;
|
||||
}
|
||||
}
|
||||
if (valueColumn == null || timestampColumn == null) {
|
||||
@ -193,6 +199,9 @@ public class InfluxDB1RepositoryImpl implements InfluxDBRepository {
|
||||
Double rawTime = (Double) valuess.get(i).get(timestampColumn);
|
||||
Instant time = Instant.ofEpochMilli(rawTime.longValue());
|
||||
Object value = valuess.get(i).get(valueColumn);
|
||||
if (itemNameColumn != null) {
|
||||
itemName = (String) valuess.get(i).get(itemNameColumn);
|
||||
}
|
||||
logger.trace("adding historic item {}: time {} value {}", itemName, time, value);
|
||||
rows.add(new InfluxRow(time, itemName, value));
|
||||
}
|
||||
|
||||
@ -13,14 +13,18 @@
|
||||
package org.openhab.persistence.influxdb.internal.influx2;
|
||||
|
||||
import static com.influxdb.query.dsl.functions.restriction.Restrictions.measurement;
|
||||
import static com.influxdb.query.dsl.functions.restriction.Restrictions.tag;
|
||||
import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.*;
|
||||
import static org.openhab.persistence.influxdb.internal.InfluxDBStateConvertUtils.stateToObject;
|
||||
|
||||
import java.time.temporal.ChronoUnit;
|
||||
|
||||
import org.eclipse.jdt.annotation.NonNullByDefault;
|
||||
import org.openhab.core.items.MetadataRegistry;
|
||||
import org.openhab.core.persistence.FilterCriteria;
|
||||
import org.openhab.persistence.influxdb.internal.FilterCriteriaQueryCreator;
|
||||
import org.openhab.persistence.influxdb.internal.InfluxDBConfiguration;
|
||||
import org.openhab.persistence.influxdb.internal.InfluxDBMetadataUtils;
|
||||
import org.openhab.persistence.influxdb.internal.InfluxDBVersion;
|
||||
|
||||
import com.influxdb.query.dsl.Flux;
|
||||
@ -34,6 +38,16 @@ import com.influxdb.query.dsl.functions.restriction.Restrictions;
|
||||
*/
|
||||
@NonNullByDefault
|
||||
public class Influx2FilterCriteriaQueryCreatorImpl implements FilterCriteriaQueryCreator {
|
||||
|
||||
private InfluxDBConfiguration configuration;
|
||||
private MetadataRegistry metadataRegistry;
|
||||
|
||||
public Influx2FilterCriteriaQueryCreatorImpl(InfluxDBConfiguration configuration,
|
||||
MetadataRegistry metadataRegistry) {
|
||||
this.configuration = configuration;
|
||||
this.metadataRegistry = metadataRegistry;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String createQuery(FilterCriteria criteria, String retentionPolicy) {
|
||||
Flux flux = Flux.from(retentionPolicy);
|
||||
@ -49,8 +63,13 @@ public class Influx2FilterCriteriaQueryCreatorImpl implements FilterCriteriaQuer
|
||||
}
|
||||
flux = range;
|
||||
|
||||
if (criteria.getItemName() != null) {
|
||||
flux = flux.filter(measurement().equal(criteria.getItemName()));
|
||||
String itemName = criteria.getItemName();
|
||||
if (itemName != null) {
|
||||
String measurementName = calculateMeasurementName(itemName);
|
||||
flux = flux.filter(measurement().equal(measurementName));
|
||||
if (!measurementName.equals(itemName)) {
|
||||
flux = flux.filter(tag("item").equal(itemName));
|
||||
}
|
||||
}
|
||||
|
||||
if (criteria.getState() != null && criteria.getOperator() != null) {
|
||||
@ -72,4 +91,16 @@ public class Influx2FilterCriteriaQueryCreatorImpl implements FilterCriteriaQuer
|
||||
|
||||
return flux.toString();
|
||||
}
|
||||
|
||||
private String calculateMeasurementName(String itemName) {
|
||||
String name = itemName;
|
||||
|
||||
name = InfluxDBMetadataUtils.calculateMeasurementNameFromMetadataIfPresent(metadataRegistry, name, itemName);
|
||||
|
||||
if (configuration.isReplaceUnderscore()) {
|
||||
name = name.replace('_', '.');
|
||||
}
|
||||
|
||||
return name;
|
||||
}
|
||||
}
|
||||
|
||||
@ -14,7 +14,7 @@
|
||||
|
||||
<parameter-group name="tags">
|
||||
<label>Additional Tags</label>
|
||||
<description>This group defines additional tags which can be added.</description>
|
||||
<description>This group defines additional tags which can be added to your measurements.</description>
|
||||
<advanced>false</advanced>
|
||||
</parameter-group>
|
||||
|
||||
|
||||
@ -14,25 +14,35 @@ package org.openhab.persistence.influxdb.internal;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.equalTo;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.time.ZoneId;
|
||||
import java.time.ZonedDateTime;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.time.temporal.ChronoUnit;
|
||||
import java.util.Map;
|
||||
|
||||
import org.eclipse.jdt.annotation.DefaultLocation;
|
||||
import org.eclipse.jdt.annotation.NonNullByDefault;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
import org.openhab.core.items.Metadata;
|
||||
import org.openhab.core.items.MetadataKey;
|
||||
import org.openhab.core.items.MetadataRegistry;
|
||||
import org.openhab.core.library.types.PercentType;
|
||||
import org.openhab.core.persistence.FilterCriteria;
|
||||
import org.openhab.persistence.influxdb.InfluxDBPersistenceService;
|
||||
import org.openhab.persistence.influxdb.internal.influx1.Influx1FilterCriteriaQueryCreatorImpl;
|
||||
import org.openhab.persistence.influxdb.internal.influx2.Influx2FilterCriteriaQueryCreatorImpl;
|
||||
|
||||
/**
|
||||
* @author Joan Pujol Espinar - Initial contribution
|
||||
*/
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
@NonNullByDefault({ DefaultLocation.RETURN_TYPE, DefaultLocation.PARAMETER })
|
||||
public class InfluxFilterCriteriaQueryCreatorImplTest {
|
||||
private static final String RETENTION_POLICY = "origin";
|
||||
@ -41,19 +51,24 @@ public class InfluxFilterCriteriaQueryCreatorImplTest {
|
||||
private static final DateTimeFormatter INFLUX2_DATE_FORMATTER = DateTimeFormatter
|
||||
.ofPattern("yyyy-MM-dd'T'HH:mm:ss.nnnnnnnnn'Z'").withZone(ZoneId.of("UTC"));
|
||||
|
||||
private @Mock InfluxDBConfiguration influxDBConfiguration;
|
||||
private @Mock MetadataRegistry metadataRegistry;
|
||||
|
||||
private Influx1FilterCriteriaQueryCreatorImpl instanceV1;
|
||||
private Influx2FilterCriteriaQueryCreatorImpl instanceV2;
|
||||
|
||||
@BeforeEach
|
||||
public void before() {
|
||||
instanceV1 = new Influx1FilterCriteriaQueryCreatorImpl();
|
||||
instanceV2 = new Influx2FilterCriteriaQueryCreatorImpl();
|
||||
instanceV1 = new Influx1FilterCriteriaQueryCreatorImpl(influxDBConfiguration, metadataRegistry);
|
||||
instanceV2 = new Influx2FilterCriteriaQueryCreatorImpl(influxDBConfiguration, metadataRegistry);
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
public void after() {
|
||||
instanceV1 = null;
|
||||
instanceV2 = null;
|
||||
influxDBConfiguration = null;
|
||||
metadataRegistry = null;
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -61,32 +76,20 @@ public class InfluxFilterCriteriaQueryCreatorImplTest {
|
||||
FilterCriteria criteria = createBaseCriteria();
|
||||
|
||||
String queryV1 = instanceV1.createQuery(criteria, RETENTION_POLICY);
|
||||
assertThat(queryV1, equalTo("SELECT value FROM origin.sampleItem;"));
|
||||
assertThat(queryV1, equalTo("SELECT \"value\"::field,\"item\"::tag FROM origin.sampleItem;"));
|
||||
|
||||
String queryV2 = instanceV2.createQuery(criteria, RETENTION_POLICY);
|
||||
assertThat(queryV2, equalTo("from(bucket:\"origin\")\n\t" + "|> range(start:-100y)\n\t"
|
||||
+ "|> filter(fn: (r) => r[\"_measurement\"] == \"sampleItem\")"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEscapeSimpleItem() {
|
||||
FilterCriteria criteria = createBaseCriteria("sample.Item");
|
||||
|
||||
String queryV1 = instanceV1.createQuery(criteria, RETENTION_POLICY);
|
||||
assertThat(queryV1, equalTo("SELECT value FROM origin.\"sample.Item\";"));
|
||||
|
||||
String queryV2 = instanceV2.createQuery(criteria, RETENTION_POLICY);
|
||||
assertThat(queryV2, equalTo("from(bucket:\"origin\")\n\t" + "|> range(start:-100y)\n\t"
|
||||
+ "|> filter(fn: (r) => r[\"_measurement\"] == \"sample.Item\")"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSimpleUnboundedItemWithoutParams() {
|
||||
FilterCriteria criteria = new FilterCriteria();
|
||||
criteria.setOrdering(null);
|
||||
|
||||
String queryV1 = instanceV1.createQuery(criteria, RETENTION_POLICY);
|
||||
assertThat(queryV1, equalTo("SELECT value FROM origin./.*/;"));
|
||||
assertThat(queryV1, equalTo("SELECT \"value\"::field,\"item\"::tag FROM origin./.*/;"));
|
||||
|
||||
String queryV2 = instanceV2.createQuery(criteria, RETENTION_POLICY);
|
||||
assertThat(queryV2, equalTo("from(bucket:\"origin\")\n\t" + "|> range(start:-100y)"));
|
||||
@ -102,8 +105,8 @@ public class InfluxFilterCriteriaQueryCreatorImplTest {
|
||||
|
||||
String queryV1 = instanceV1.createQuery(criteria, RETENTION_POLICY);
|
||||
String expectedQueryV1 = String.format(
|
||||
"SELECT value FROM origin.sampleItem WHERE time >= '%s' AND time <= '%s';", now.toInstant(),
|
||||
tomorrow.toInstant());
|
||||
"SELECT \"value\"::field,\"item\"::tag FROM origin.sampleItem WHERE time >= '%s' AND time <= '%s';",
|
||||
now.toInstant(), tomorrow.toInstant());
|
||||
assertThat(queryV1, equalTo(expectedQueryV1));
|
||||
|
||||
String queryV2 = instanceV2.createQuery(criteria, RETENTION_POLICY);
|
||||
@ -121,7 +124,7 @@ public class InfluxFilterCriteriaQueryCreatorImplTest {
|
||||
criteria.setState(new PercentType(90));
|
||||
|
||||
String query = instanceV1.createQuery(criteria, RETENTION_POLICY);
|
||||
assertThat(query, equalTo("SELECT value FROM origin.sampleItem WHERE value <= 90;"));
|
||||
assertThat(query, equalTo("SELECT \"value\"::field,\"item\"::tag FROM origin.sampleItem WHERE value <= 90;"));
|
||||
|
||||
String queryV2 = instanceV2.createQuery(criteria, RETENTION_POLICY);
|
||||
assertThat(queryV2,
|
||||
@ -137,7 +140,7 @@ public class InfluxFilterCriteriaQueryCreatorImplTest {
|
||||
criteria.setPageSize(10);
|
||||
|
||||
String query = instanceV1.createQuery(criteria, RETENTION_POLICY);
|
||||
assertThat(query, equalTo("SELECT value FROM origin.sampleItem LIMIT 10 OFFSET 20;"));
|
||||
assertThat(query, equalTo("SELECT \"value\"::field,\"item\"::tag FROM origin.sampleItem LIMIT 10 OFFSET 20;"));
|
||||
|
||||
String queryV2 = instanceV2.createQuery(criteria, RETENTION_POLICY);
|
||||
assertThat(queryV2, equalTo("from(bucket:\"origin\")\n\t" + "|> range(start:-100y)\n\t"
|
||||
@ -150,7 +153,7 @@ public class InfluxFilterCriteriaQueryCreatorImplTest {
|
||||
criteria.setOrdering(FilterCriteria.Ordering.ASCENDING);
|
||||
|
||||
String query = instanceV1.createQuery(criteria, RETENTION_POLICY);
|
||||
assertThat(query, equalTo("SELECT value FROM origin.sampleItem ORDER BY time ASC;"));
|
||||
assertThat(query, equalTo("SELECT \"value\"::field,\"item\"::tag FROM origin.sampleItem ORDER BY time ASC;"));
|
||||
|
||||
String queryV2 = instanceV2.createQuery(criteria, RETENTION_POLICY);
|
||||
assertThat(queryV2,
|
||||
@ -169,4 +172,33 @@ public class InfluxFilterCriteriaQueryCreatorImplTest {
|
||||
criteria.setOrdering(null);
|
||||
return criteria;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMeasurementNameFromMetadata() {
|
||||
FilterCriteria criteria = createBaseCriteria();
|
||||
MetadataKey metadataKey = new MetadataKey(InfluxDBPersistenceService.SERVICE_NAME, "sampleItem");
|
||||
|
||||
when(metadataRegistry.get(metadataKey))
|
||||
.thenReturn(new Metadata(metadataKey, "measurementName", Map.of("key1", "val1", "key2", "val2")));
|
||||
|
||||
String queryV1 = instanceV1.createQuery(criteria, RETENTION_POLICY);
|
||||
assertThat(queryV1, equalTo(
|
||||
"SELECT \"value\"::field,\"item\"::tag FROM origin.measurementName WHERE item = 'sampleItem';"));
|
||||
|
||||
String queryV2 = instanceV2.createQuery(criteria, RETENTION_POLICY);
|
||||
assertThat(queryV2,
|
||||
equalTo("from(bucket:\"origin\")\n\t" + "|> range(start:-100y)\n\t"
|
||||
+ "|> filter(fn: (r) => r[\"_measurement\"] == \"measurementName\")\n\t"
|
||||
+ "|> filter(fn: (r) => r[\"item\"] == \"sampleItem\")"));
|
||||
|
||||
when(metadataRegistry.get(metadataKey))
|
||||
.thenReturn(new Metadata(metadataKey, "", Map.of("key1", "val1", "key2", "val2")));
|
||||
|
||||
queryV1 = instanceV1.createQuery(criteria, RETENTION_POLICY);
|
||||
assertThat(queryV1, equalTo("SELECT \"value\"::field,\"item\"::tag FROM origin.sampleItem;"));
|
||||
|
||||
queryV2 = instanceV2.createQuery(criteria, RETENTION_POLICY);
|
||||
assertThat(queryV2, equalTo("from(bucket:\"origin\")\n\t" + "|> range(start:-100y)\n\t"
|
||||
+ "|> filter(fn: (r) => r[\"_measurement\"] == \"sampleItem\")"));
|
||||
}
|
||||
}
|
||||
|
||||
@ -140,4 +140,31 @@ public class ItemToStorePointCreatorTest {
|
||||
assertThat(point.getTags(), hasEntry("key1", "val1"));
|
||||
assertThat(point.getTags(), hasEntry("key2", "val2"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldUseMeasurementNameFromMetadataIfProvided() {
|
||||
NumberItem item = ItemTestHelper.createNumberItem("myitem", 5);
|
||||
MetadataKey metadataKey = new MetadataKey(InfluxDBPersistenceService.SERVICE_NAME, item.getName());
|
||||
|
||||
InfluxPoint point = instance.convert(item, null);
|
||||
assertThat(point.getMeasurementName(), equalTo(item.getName()));
|
||||
|
||||
point = instance.convert(item, null);
|
||||
assertThat(point.getMeasurementName(), equalTo(item.getName()));
|
||||
assertThat(point.getTags(), hasEntry("item", item.getName()));
|
||||
|
||||
when(metadataRegistry.get(metadataKey))
|
||||
.thenReturn(new Metadata(metadataKey, "measurementName", Map.of("key1", "val1", "key2", "val2")));
|
||||
|
||||
point = instance.convert(item, null);
|
||||
assertThat(point.getMeasurementName(), equalTo("measurementName"));
|
||||
assertThat(point.getTags(), hasEntry("item", item.getName()));
|
||||
|
||||
when(metadataRegistry.get(metadataKey))
|
||||
.thenReturn(new Metadata(metadataKey, "", Map.of("key1", "val1", "key2", "val2")));
|
||||
|
||||
point = instance.convert(item, null);
|
||||
assertThat(point.getMeasurementName(), equalTo(item.getName()));
|
||||
assertThat(point.getTags(), hasEntry("item", item.getName()));
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user