[influxdb] Fix re-connection issues (#14296)

Signed-off-by: Jan N. Klug <github@klug.nrw>
This commit is contained in:
J-N-K 2023-01-30 20:04:57 +01:00 committed by GitHub
parent 9a0d0ba409
commit e5e067b79b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -14,7 +14,6 @@ package org.openhab.persistence.influxdb;
import java.time.ZoneId; import java.time.ZoneId;
import java.time.ZonedDateTime; import java.time.ZonedDateTime;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.Map; import java.util.Map;
@ -95,6 +94,8 @@ public class InfluxDBPersistenceService implements QueryablePersistenceService {
private @NonNullByDefault({}) ItemToStorePointCreator itemToStorePointCreator; private @NonNullByDefault({}) ItemToStorePointCreator itemToStorePointCreator;
private @NonNullByDefault({}) InfluxDBRepository influxDBRepository; private @NonNullByDefault({}) InfluxDBRepository influxDBRepository;
private boolean tryReconnection = false;
@Activate @Activate
public InfluxDBPersistenceService(final @Reference ItemRegistry itemRegistry, public InfluxDBPersistenceService(final @Reference ItemRegistry itemRegistry,
final @Reference MetadataRegistry metadataRegistry) { final @Reference MetadataRegistry metadataRegistry) {
@ -113,8 +114,10 @@ public class InfluxDBPersistenceService implements QueryablePersistenceService {
itemToStorePointCreator = new ItemToStorePointCreator(configuration, metadataRegistry); itemToStorePointCreator = new ItemToStorePointCreator(configuration, metadataRegistry);
influxDBRepository = createInfluxDBRepository(); influxDBRepository = createInfluxDBRepository();
influxDBRepository.connect(); influxDBRepository.connect();
tryReconnection = true;
} else { } else {
logger.error("Cannot load configuration, persistence service wont work"); logger.error("Cannot load configuration, persistence service wont work");
tryReconnection = false;
} }
logger.debug("InfluxDB persistence service is now activated"); logger.debug("InfluxDB persistence service is now activated");
@ -131,6 +134,7 @@ public class InfluxDBPersistenceService implements QueryablePersistenceService {
@Deactivate @Deactivate
public void deactivate() { public void deactivate() {
logger.debug("InfluxDB persistence service deactivated"); logger.debug("InfluxDB persistence service deactivated");
tryReconnection = false;
if (influxDBRepository != null) { if (influxDBRepository != null) {
influxDBRepository.disconnect(); influxDBRepository.disconnect();
influxDBRepository = null; influxDBRepository = null;
@ -185,13 +189,13 @@ public class InfluxDBPersistenceService implements QueryablePersistenceService {
@Override @Override
public Set<PersistenceItemInfo> getItemInfo() { public Set<PersistenceItemInfo> getItemInfo() {
if (influxDBRepository != null && influxDBRepository.isConnected()) { if (checkConnection()) {
return influxDBRepository.getStoredItemsCount().entrySet().stream() return influxDBRepository.getStoredItemsCount().entrySet().stream()
.map(entry -> new InfluxDBPersistentItemInfo(entry.getKey(), entry.getValue())) .map(entry -> new InfluxDBPersistentItemInfo(entry.getKey(), entry.getValue()))
.collect(Collectors.toUnmodifiableSet()); .collect(Collectors.toUnmodifiableSet());
} else { } else {
logger.info("getItemInfo ignored, InfluxDB is not yet connected"); logger.info("getItemInfo ignored, InfluxDB is not yet connected");
return Collections.emptySet(); return Set.of();
} }
} }
@ -202,7 +206,7 @@ public class InfluxDBPersistenceService implements QueryablePersistenceService {
@Override @Override
public void store(Item item, @Nullable String alias) { public void store(Item item, @Nullable String alias) {
if (influxDBRepository != null && influxDBRepository.isConnected()) { if (checkConnection()) {
InfluxPoint point = itemToStorePointCreator.convert(item, alias); InfluxPoint point = itemToStorePointCreator.convert(item, alias);
if (point != null) { if (point != null) {
logger.trace("Storing item {} in InfluxDB point {}", item, point); logger.trace("Storing item {} in InfluxDB point {}", item, point);
@ -219,7 +223,7 @@ public class InfluxDBPersistenceService implements QueryablePersistenceService {
public Iterable<HistoricItem> query(FilterCriteria filter) { public Iterable<HistoricItem> query(FilterCriteria filter) {
logger.debug("Got a query for historic points!"); logger.debug("Got a query for historic points!");
if (influxDBRepository != null && influxDBRepository.isConnected()) { if (checkConnection()) {
logger.trace( logger.trace(
"Filter: itemname: {}, ordering: {}, state: {}, operator: {}, getBeginDate: {}, getEndDate: {}, getPageSize: {}, getPageNumber: {}", "Filter: itemname: {}, ordering: {}, state: {}, operator: {}, getBeginDate: {}, getEndDate: {}, getPageSize: {}, getPageNumber: {}",
filter.getItemName(), filter.getOrdering().toString(), filter.getState(), filter.getOperator(), filter.getItemName(), filter.getOrdering().toString(), filter.getState(), filter.getOperator(),
@ -232,7 +236,7 @@ public class InfluxDBPersistenceService implements QueryablePersistenceService {
return results.stream().map(this::mapRow2HistoricItem).collect(Collectors.toList()); return results.stream().map(this::mapRow2HistoricItem).collect(Collectors.toList());
} else { } else {
logger.debug("query ignored, InfluxDB is not yet connected"); logger.debug("query ignored, InfluxDB is not yet connected");
return Collections.emptyList(); return List.of();
} }
} }
@ -246,4 +250,22 @@ public class InfluxDBPersistenceService implements QueryablePersistenceService {
public List<PersistenceStrategy> getDefaultStrategies() { public List<PersistenceStrategy> getDefaultStrategies() {
return List.of(PersistenceStrategy.Globals.RESTORE, PersistenceStrategy.Globals.CHANGE); return List.of(PersistenceStrategy.Globals.RESTORE, PersistenceStrategy.Globals.CHANGE);
} }
/**
* check connection and try reconnect
*
* @return true if connected
*/
private boolean checkConnection() {
if (influxDBRepository == null) {
return false;
} else if (influxDBRepository.isConnected()) {
return true;
} else if (tryReconnection) {
logger.debug("Connection lost, trying re-connection");
influxDBRepository.connect();
return influxDBRepository.isConnected();
}
return false;
}
} }