[influxdb] Fix problems when querying old historical data (#10680)
Fix problem with historical Influx1 data that was generated with addon version <3.0.0 that didn't write generate item tag Signed-off-by: Joan Pujol <joanpujol@gmail.com>
This commit is contained in:
parent
bafa841390
commit
801100cee1
@ -14,6 +14,7 @@ package org.openhab.persistence.influxdb.internal;
|
|||||||
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Optional;
|
||||||
import java.util.StringJoiner;
|
import java.util.StringJoiner;
|
||||||
|
|
||||||
import org.eclipse.jdt.annotation.NonNullByDefault;
|
import org.eclipse.jdt.annotation.NonNullByDefault;
|
||||||
@ -61,7 +62,7 @@ public class InfluxDBConfiguration {
|
|||||||
token = (String) config.getOrDefault(TOKEN_PARAM, "");
|
token = (String) config.getOrDefault(TOKEN_PARAM, "");
|
||||||
databaseName = (String) config.getOrDefault(DATABASE_PARAM, "openhab");
|
databaseName = (String) config.getOrDefault(DATABASE_PARAM, "openhab");
|
||||||
retentionPolicy = (String) config.getOrDefault(RETENTION_POLICY_PARAM, "autogen");
|
retentionPolicy = (String) config.getOrDefault(RETENTION_POLICY_PARAM, "autogen");
|
||||||
version = parseInfluxVersion(config.getOrDefault(VERSION_PARAM, InfluxDBVersion.V1.name()));
|
version = parseInfluxVersion((String) config.getOrDefault(VERSION_PARAM, InfluxDBVersion.V1.name()));
|
||||||
|
|
||||||
replaceUnderscore = getConfigBooleanValue(config, REPLACE_UNDERSCORE_PARAM, false);
|
replaceUnderscore = getConfigBooleanValue(config, REPLACE_UNDERSCORE_PARAM, false);
|
||||||
addCategoryTag = getConfigBooleanValue(config, ADD_CATEGORY_TAG_PARAM, false);
|
addCategoryTag = getConfigBooleanValue(config, ADD_CATEGORY_TAG_PARAM, false);
|
||||||
@ -80,9 +81,9 @@ public class InfluxDBConfiguration {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private InfluxDBVersion parseInfluxVersion(@Nullable Object value) {
|
private InfluxDBVersion parseInfluxVersion(@Nullable String value) {
|
||||||
try {
|
try {
|
||||||
return InfluxDBVersion.valueOf((String) value);
|
return Optional.ofNullable(value).map(InfluxDBVersion::valueOf).orElse(InfluxDBVersion.UNKNOWN);
|
||||||
} catch (RuntimeException e) {
|
} catch (RuntimeException e) {
|
||||||
logger.warn("Invalid version {}", value);
|
logger.warn("Invalid version {}", value);
|
||||||
return InfluxDBVersion.UNKNOWN;
|
return InfluxDBVersion.UNKNOWN;
|
||||||
|
|||||||
@ -93,7 +93,7 @@ public class InfluxDBStateConvertUtils {
|
|||||||
* @return the state of the item represented by the itemName parameter, else the string value of
|
* @return the state of the item represented by the itemName parameter, else the string value of
|
||||||
* the Object parameter
|
* the Object parameter
|
||||||
*/
|
*/
|
||||||
public static State objectToState(Object value, String itemName, @Nullable ItemRegistry itemRegistry) {
|
public static State objectToState(@Nullable Object value, String itemName, @Nullable ItemRegistry itemRegistry) {
|
||||||
State state = null;
|
State state = null;
|
||||||
if (itemRegistry != null) {
|
if (itemRegistry != null) {
|
||||||
try {
|
try {
|
||||||
@ -111,9 +111,10 @@ public class InfluxDBStateConvertUtils {
|
|||||||
return state;
|
return state;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static State objectToState(Object value, Item itemToSetState) {
|
public static State objectToState(@Nullable Object value, Item itemToSetState) {
|
||||||
String valueStr = String.valueOf(value);
|
String valueStr = String.valueOf(value);
|
||||||
|
|
||||||
|
@Nullable
|
||||||
Item item = itemToSetState;
|
Item item = itemToSetState;
|
||||||
if (item instanceof GroupItem) {
|
if (item instanceof GroupItem) {
|
||||||
item = ((GroupItem) item).getBaseItem();
|
item = ((GroupItem) item).getBaseItem();
|
||||||
|
|||||||
@ -12,11 +12,14 @@
|
|||||||
*/
|
*/
|
||||||
package org.openhab.persistence.influxdb.internal;
|
package org.openhab.persistence.influxdb.internal;
|
||||||
|
|
||||||
|
import org.eclipse.jdt.annotation.NonNullByDefault;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* InfluxDB version
|
* InfluxDB version
|
||||||
*
|
*
|
||||||
* @author Joan Pujol Espinar - Initial contribution
|
* @author Joan Pujol Espinar - Initial contribution
|
||||||
*/
|
*/
|
||||||
|
@NonNullByDefault
|
||||||
public enum InfluxDBVersion {
|
public enum InfluxDBVersion {
|
||||||
V1,
|
V1,
|
||||||
V2,
|
V2,
|
||||||
|
|||||||
@ -15,6 +15,7 @@ package org.openhab.persistence.influxdb.internal;
|
|||||||
import java.time.Instant;
|
import java.time.Instant;
|
||||||
|
|
||||||
import org.eclipse.jdt.annotation.NonNullByDefault;
|
import org.eclipse.jdt.annotation.NonNullByDefault;
|
||||||
|
import org.eclipse.jdt.annotation.Nullable;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Row data returned from database query
|
* Row data returned from database query
|
||||||
@ -25,9 +26,9 @@ import org.eclipse.jdt.annotation.NonNullByDefault;
|
|||||||
public class InfluxRow {
|
public class InfluxRow {
|
||||||
private final String itemName;
|
private final String itemName;
|
||||||
private final Instant time;
|
private final Instant time;
|
||||||
private final Object value;
|
private final @Nullable Object value;
|
||||||
|
|
||||||
public InfluxRow(Instant time, String itemName, Object value) {
|
public InfluxRow(Instant time, String itemName, @Nullable Object value) {
|
||||||
this.time = time;
|
this.time = time;
|
||||||
this.itemName = itemName;
|
this.itemName = itemName;
|
||||||
this.value = value;
|
this.value = value;
|
||||||
@ -41,7 +42,7 @@ public class InfluxRow {
|
|||||||
return itemName;
|
return itemName;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Object getValue() {
|
public @Nullable Object getValue() {
|
||||||
return value;
|
return value;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -22,6 +22,8 @@ import java.util.ArrayList;
|
|||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Objects;
|
||||||
|
import java.util.Optional;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.eclipse.jdt.annotation.NonNullByDefault;
|
import org.eclipse.jdt.annotation.NonNullByDefault;
|
||||||
@ -171,8 +173,7 @@ public class InfluxDB1RepositoryImpl implements InfluxDBRepository {
|
|||||||
} else {
|
} else {
|
||||||
for (QueryResult.Series series : seriess) {
|
for (QueryResult.Series series : seriess) {
|
||||||
logger.trace("series {}", series.toString());
|
logger.trace("series {}", series.toString());
|
||||||
String itemName = series.getName();
|
List<List<@Nullable Object>> valuess = series.getValues();
|
||||||
List<List<Object>> valuess = series.getValues();
|
|
||||||
if (valuess == null) {
|
if (valuess == null) {
|
||||||
logger.debug("query returned no values");
|
logger.debug("query returned no values");
|
||||||
} else {
|
} else {
|
||||||
@ -196,12 +197,14 @@ public class InfluxDB1RepositoryImpl implements InfluxDBRepository {
|
|||||||
throw new IllegalStateException("missing column");
|
throw new IllegalStateException("missing column");
|
||||||
}
|
}
|
||||||
for (int i = 0; i < valuess.size(); i++) {
|
for (int i = 0; i < valuess.size(); i++) {
|
||||||
Double rawTime = (Double) valuess.get(i).get(timestampColumn);
|
Double rawTime = (Double) Objects.requireNonNull(valuess.get(i).get(timestampColumn));
|
||||||
Instant time = Instant.ofEpochMilli(rawTime.longValue());
|
Instant time = Instant.ofEpochMilli(rawTime.longValue());
|
||||||
|
@Nullable
|
||||||
Object value = valuess.get(i).get(valueColumn);
|
Object value = valuess.get(i).get(valueColumn);
|
||||||
if (itemNameColumn != null) {
|
var currentI = i;
|
||||||
itemName = (String) valuess.get(i).get(itemNameColumn);
|
String itemName = Optional.ofNullable(itemNameColumn)
|
||||||
}
|
.flatMap(inc -> Optional.ofNullable((String) valuess.get(currentI).get(inc)))
|
||||||
|
.orElse(series.getName());
|
||||||
logger.trace("adding historic item {}: time {} value {}", itemName, time, value);
|
logger.trace("adding historic item {}: time {} value {}", itemName, time, value);
|
||||||
rows.add(new InfluxRow(time, itemName, value));
|
rows.add(new InfluxRow(time, itemName, value));
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user