diff --git a/bundles/org.openhab.persistence.rrd4j/src/main/java/org/openhab/persistence/rrd4j/internal/RRD4jPersistenceService.java b/bundles/org.openhab.persistence.rrd4j/src/main/java/org/openhab/persistence/rrd4j/internal/RRD4jPersistenceService.java index b0d309560..9e8ef7136 100644 --- a/bundles/org.openhab.persistence.rrd4j/src/main/java/org/openhab/persistence/rrd4j/internal/RRD4jPersistenceService.java +++ b/bundles/org.openhab.persistence.rrd4j/src/main/java/org/openhab/persistence/rrd4j/internal/RRD4jPersistenceService.java @@ -20,13 +20,13 @@ import java.time.ZoneId; import java.time.ZonedDateTime; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; @@ -71,6 +71,7 @@ import org.openhab.core.types.State; import org.osgi.service.component.annotations.Activate; import org.osgi.service.component.annotations.Component; import org.osgi.service.component.annotations.ConfigurationPolicy; +import org.osgi.service.component.annotations.Deactivate; import org.osgi.service.component.annotations.Modified; import org.osgi.service.component.annotations.Reference; import org.rrd4j.ConsolFun; @@ -108,11 +109,13 @@ public class RRD4jPersistenceService implements QueryablePersistenceService { private static final Set SUPPORTED_TYPES = Set.of(CoreItemFactory.SWITCH, CoreItemFactory.CONTACT, CoreItemFactory.DIMMER, CoreItemFactory.NUMBER, CoreItemFactory.ROLLERSHUTTER, CoreItemFactory.COLOR); - private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(3, + private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, new NamedThreadFactory("RRD4j")); private final Map rrdDefs = new ConcurrentHashMap<>(); + private final ConcurrentSkipListMap> storageMap = new ConcurrentSkipListMap<>(); + private static final String DATASOURCE_STATE = "state"; private static final Path DB_FOLDER = Path.of(OpenHAB.getUserDataFolder(), "persistence", "rrd4j").toAbsolutePath(); @@ -120,10 +123,8 @@ public class RRD4jPersistenceService implements QueryablePersistenceService { private static final RrdDbPool DATABASE_POOL = new RrdDbPool(); private final Logger logger = LoggerFactory.getLogger(RRD4jPersistenceService.class); - - private final Map> scheduledJobs = new HashMap<>(); - private final ItemRegistry itemRegistry; + private boolean active = false; public static Path getDatabasePath(String name) { return DB_FOLDER.resolve(name + ".rrd"); @@ -133,9 +134,132 @@ public class RRD4jPersistenceService implements QueryablePersistenceService { return DATABASE_POOL; } + private final ScheduledFuture storeJob; + @Activate - public RRD4jPersistenceService(final @Reference ItemRegistry itemRegistry) { + public RRD4jPersistenceService(final @Reference ItemRegistry itemRegistry, Map config) { this.itemRegistry = itemRegistry; + storeJob = scheduler.scheduleWithFixedDelay(() -> doStore(false), 1, 1, TimeUnit.SECONDS); + modified(config); + active = true; + } + + @Modified + protected void modified(final Map config) { + // clean existing definitions + rrdDefs.clear(); + + // add default configurations + + RrdDefConfig defaultNumeric = new RrdDefConfig(DEFAULT_NUMERIC); + // use 10 seconds as a step size for numeric values and allow a 10 minute silence between updates + defaultNumeric.setDef("GAUGE,600,U,U,10"); + // define 5 different boxes: + // 1. granularity of 10s for the last hour + // 2. granularity of 1m for the last week + // 3. granularity of 15m for the last year + // 4. granularity of 1h for the last 5 years + // 5. granularity of 1d for the last 10 years + defaultNumeric + .addArchives("LAST,0.5,1,360:LAST,0.5,6,10080:LAST,0.5,90,36500:LAST,0.5,360,43800:LAST,0.5,8640,3650"); + rrdDefs.put(DEFAULT_NUMERIC, defaultNumeric); + + RrdDefConfig defaultQuantifiable = new RrdDefConfig(DEFAULT_QUANTIFIABLE); + // use 10 seconds as a step size for numeric values and allow a 10 minute silence between updates + defaultQuantifiable.setDef("GAUGE,600,U,U,10"); + // define 5 different boxes: + // 1. granularity of 10s for the last hour + // 2. granularity of 1m for the last week + // 3. granularity of 15m for the last year + // 4. granularity of 1h for the last 5 years + // 5. granularity of 1d for the last 10 years + defaultQuantifiable.addArchives( + "AVERAGE,0.5,1,360:AVERAGE,0.5,6,10080:AVERAGE,0.5,90,36500:AVERAGE,0.5,360,43800:AVERAGE,0.5,8640,3650"); + rrdDefs.put(DEFAULT_QUANTIFIABLE, defaultQuantifiable); + + RrdDefConfig defaultOther = new RrdDefConfig(DEFAULT_OTHER); + // use 5 seconds as a step size for discrete values and allow a 1h silence between updates + defaultOther.setDef("GAUGE,3600,U,U,5"); + // define 4 different boxes: + // 1. granularity of 5s for the last hour + // 2. granularity of 1m for the last week + // 3. granularity of 15m for the last year + // 4. granularity of 4h for the last 10 years + defaultOther.addArchives("LAST,0.5,1,720:LAST,0.5,12,10080:LAST,0.5,180,35040:LAST,0.5,2880,21900"); + rrdDefs.put(DEFAULT_OTHER, defaultOther); + + if (config.isEmpty()) { + logger.debug("using default configuration only"); + return; + } + + Iterator keys = config.keySet().iterator(); + while (keys.hasNext()) { + String key = keys.next(); + + if ("service.pid".equals(key) || "component.name".equals(key)) { + // ignore service.pid and name + continue; + } + + String[] subkeys = key.split("\\."); + if (subkeys.length != 2) { + logger.debug("config '{}' should have the format 'name.configkey'", key); + continue; + } + + Object v = config.get(key); + if (v instanceof String) { + String value = (String) v; + String name = subkeys[0].toLowerCase(); + String property = subkeys[1].toLowerCase(); + + if (value.isBlank()) { + logger.trace("Config is empty: {}", property); + continue; + } else { + logger.trace("Processing config: {} = {}", property, value); + } + + RrdDefConfig rrdDef = rrdDefs.get(name); + if (rrdDef == null) { + rrdDef = new RrdDefConfig(name); + rrdDefs.put(name, rrdDef); + } + + try { + if ("def".equals(property)) { + rrdDef.setDef(value); + } else if ("archives".equals(property)) { + rrdDef.addArchives(value); + } else if ("items".equals(property)) { + rrdDef.addItems(value); + } else { + logger.debug("Unknown property {} : {}", property, value); + } + } catch (IllegalArgumentException e) { + logger.warn("Ignoring illegal configuration: {}", e.getMessage()); + } + } + } + + for (RrdDefConfig rrdDef : rrdDefs.values()) { + if (rrdDef.isValid()) { + logger.debug("Created {}", rrdDef); + } else { + logger.info("Removing invalid definition {}", rrdDef); + rrdDefs.remove(rrdDef.name); + } + } + } + + @Deactivate + protected void deactivate() { + active = false; + storeJob.cancel(false); + + // make sure we really store everything + doStore(true); } @Override @@ -150,6 +274,11 @@ public class RRD4jPersistenceService implements QueryablePersistenceService { @Override public void store(final Item item, @Nullable final String alias) { + if (!active) { + logger.warn("Tried to store {} but service is not yet ready (or shutting down).", item); + return; + } + if (!isSupportedItemType(item)) { logger.trace("Ignoring item '{}' since its type {} is not supported", item.getName(), item.getType()); return; @@ -158,9 +287,7 @@ public class RRD4jPersistenceService implements QueryablePersistenceService { Double value; - if (item instanceof NumberItem && item.getState() instanceof QuantityType) { - NumberItem nItem = (NumberItem) item; - QuantityType qState = (QuantityType) item.getState(); + if (item instanceof NumberItem nItem && item.getState() instanceof QuantityType qState) { Unit> unit = nItem.getUnit(); if (unit != null) { QuantityType convertedState = qState.toUnit(unit); @@ -190,11 +317,30 @@ public class RRD4jPersistenceService implements QueryablePersistenceService { } long now = System.currentTimeMillis() / 1000; - - scheduler.schedule(() -> internalStore(name, value, now, true), 0, TimeUnit.SECONDS); + Double oldValue = storageMap.computeIfAbsent(now, t -> new ConcurrentHashMap<>()).put(name, value); + if (oldValue != null && !oldValue.equals(value)) { + logger.debug( + "Discarding value {} for item {} with timestamp {} because a new value ({}) arrived with the same timestamp.", + oldValue, name, now, value); + } } - private synchronized void internalStore(String name, double value, long now, boolean retry) { + private void doStore(boolean force) { + while (!storageMap.isEmpty()) { + long timestamp = storageMap.firstKey(); + long now = System.currentTimeMillis() / 1000; + if (now > timestamp || force) { + // no new elements can be added for this timestamp because we are already past that time or the service + // requires forced storing + Map values = storageMap.pollFirstEntry().getValue(); + values.forEach((name, value) -> writePointToDatabase(name, value, timestamp)); + } else { + return; + } + } + } + + private synchronized void writePointToDatabase(String name, double value, long timestamp) { RrdDb db = null; try { db = getDB(name, true); @@ -205,41 +351,31 @@ public class RRD4jPersistenceService implements QueryablePersistenceService { return; } - try { - if (now < db.getLastUpdateTime()) { - logger.warn("RRD4J does not support adding past value this={}, last update={}. Discarding {} - {}", now, - db.getLastUpdateTime(), name, value); - return; - } - } catch (IOException ignored) { - // we can ignore that here, we'll fail again later. - } - ConsolFun function = getConsolidationFunction(db); if (function != ConsolFun.AVERAGE) { try { // we store the last value again, so that the value change // in the database is not interpolated, but // happens right at this spot - if (now - 1 > db.getLastUpdateTime()) { + if (timestamp - 1 > db.getLastUpdateTime()) { // only do it if there is not already a value double lastValue = db.getLastDatasourceValue(DATASOURCE_STATE); if (!Double.isNaN(lastValue)) { Sample sample = db.createSample(); - sample.setTime(now - 1); + sample.setTime(timestamp - 1); sample.setValue(DATASOURCE_STATE, lastValue); sample.update(); logger.debug("Stored '{}' as value '{}' with timestamp {} in rrd4j database (again)", name, - lastValue, now - 1); + lastValue, timestamp - 1); } } } catch (IOException e) { - logger.debug("Error storing last value (again): {}", e.getMessage()); + logger.debug("Error storing last value (again) for {}: {}", e.getMessage(), name); } } try { Sample sample = db.createSample(); - sample.setTime(now); + sample.setTime(timestamp); double storeValue = value; if (db.getDatasource(DATASOURCE_STATE).getType() == DsType.COUNTER) { // counter values must be adjusted by stepsize @@ -247,20 +383,7 @@ public class RRD4jPersistenceService implements QueryablePersistenceService { } sample.setValue(DATASOURCE_STATE, storeValue); sample.update(); - logger.debug("Stored '{}' as value '{}' with timestamp {} in rrd4j database", name, storeValue, now); - } catch (IllegalArgumentException e) { - String message = e.getMessage(); - if (message != null && message.contains("at least one second step is required") && retry) { - // we try to store the value one second later - ScheduledFuture job = scheduledJobs.get(name); - if (job != null) { - job.cancel(true); - scheduledJobs.remove(name); - } - internalStore(name, value, now + 1, false); - } else { - logger.warn("Could not persist '{}' to rrd4j database: {}", name, e.getMessage()); - } + logger.debug("Stored '{}' as value '{}' with timestamp {} in rrd4j database", name, storeValue, timestamp); } catch (Exception e) { logger.warn("Could not persist '{}' to rrd4j database: {}", name, e.getMessage()); } @@ -524,120 +647,6 @@ public class RRD4jPersistenceService implements QueryablePersistenceService { } } - @Activate - protected void activate(final Map config) { - modified(config); - } - - @Modified - protected void modified(final Map config) { - // clean existing definitions - rrdDefs.clear(); - - // add default configurations - - RrdDefConfig defaultNumeric = new RrdDefConfig(DEFAULT_NUMERIC); - // use 10 seconds as a step size for numeric values and allow a 10 minute silence between updates - defaultNumeric.setDef("GAUGE,600,U,U,10"); - // define 5 different boxes: - // 1. granularity of 10s for the last hour - // 2. granularity of 1m for the last week - // 3. granularity of 15m for the last year - // 4. granularity of 1h for the last 5 years - // 5. granularity of 1d for the last 10 years - defaultNumeric - .addArchives("LAST,0.5,1,360:LAST,0.5,6,10080:LAST,0.5,90,36500:LAST,0.5,360,43800:LAST,0.5,8640,3650"); - rrdDefs.put(DEFAULT_NUMERIC, defaultNumeric); - - RrdDefConfig defaultQuantifiable = new RrdDefConfig(DEFAULT_QUANTIFIABLE); - // use 10 seconds as a step size for numeric values and allow a 10 minute silence between updates - defaultQuantifiable.setDef("GAUGE,600,U,U,10"); - // define 5 different boxes: - // 1. granularity of 10s for the last hour - // 2. granularity of 1m for the last week - // 3. granularity of 15m for the last year - // 4. granularity of 1h for the last 5 years - // 5. granularity of 1d for the last 10 years - defaultQuantifiable.addArchives( - "AVERAGE,0.5,1,360:AVERAGE,0.5,6,10080:AVERAGE,0.5,90,36500:AVERAGE,0.5,360,43800:AVERAGE,0.5,8640,3650"); - rrdDefs.put(DEFAULT_QUANTIFIABLE, defaultQuantifiable); - - RrdDefConfig defaultOther = new RrdDefConfig(DEFAULT_OTHER); - // use 5 seconds as a step size for discrete values and allow a 1h silence between updates - defaultOther.setDef("GAUGE,3600,U,U,5"); - // define 4 different boxes: - // 1. granularity of 5s for the last hour - // 2. granularity of 1m for the last week - // 3. granularity of 15m for the last year - // 4. granularity of 4h for the last 10 years - defaultOther.addArchives("LAST,0.5,1,720:LAST,0.5,12,10080:LAST,0.5,180,35040:LAST,0.5,2880,21900"); - rrdDefs.put(DEFAULT_OTHER, defaultOther); - - if (config.isEmpty()) { - logger.debug("using default configuration only"); - return; - } - - Iterator keys = config.keySet().iterator(); - while (keys.hasNext()) { - String key = keys.next(); - - if ("service.pid".equals(key) || "component.name".equals(key)) { - // ignore service.pid and name - continue; - } - - String[] subkeys = key.split("\\."); - if (subkeys.length != 2) { - logger.debug("config '{}' should have the format 'name.configkey'", key); - continue; - } - - Object v = config.get(key); - if (v instanceof String) { - String value = (String) v; - String name = subkeys[0].toLowerCase(); - String property = subkeys[1].toLowerCase(); - - if (value.isBlank()) { - logger.trace("Config is empty: {}", property); - continue; - } else { - logger.trace("Processing config: {} = {}", property, value); - } - - RrdDefConfig rrdDef = rrdDefs.get(name); - if (rrdDef == null) { - rrdDef = new RrdDefConfig(name); - rrdDefs.put(name, rrdDef); - } - - try { - if ("def".equals(property)) { - rrdDef.setDef(value); - } else if ("archives".equals(property)) { - rrdDef.addArchives(value); - } else if ("items".equals(property)) { - rrdDef.addItems(value); - } else { - logger.debug("Unknown property {} : {}", property, value); - } - } catch (IllegalArgumentException e) { - logger.warn("Ignoring illegal configuration: {}", e.getMessage()); - } - } - } - - for (RrdDefConfig rrdDef : rrdDefs.values()) { - if (rrdDef.isValid()) { - logger.debug("Created {}", rrdDef); - } else { - logger.info("Removing invalid definition {}", rrdDef); - rrdDefs.remove(rrdDef.name); - } - } - } - private static class RrdArchiveDef { public @Nullable ConsolFun fcn; public double xff;