[rrd4j] Write asynchronously to database (#14913)
* [rrd4j] Write asynchronously to database * add timestamps to log message --------- Signed-off-by: Jan N. Klug <github@klug.nrw>
This commit is contained in:
parent
a2dcf2fcef
commit
5fe99a18a9
|
@ -145,13 +145,52 @@ public class RRD4jPersistenceService implements QueryablePersistenceService {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void store(final Item item, @Nullable final String alias) {
|
public void store(final Item item, @Nullable final String alias) {
|
||||||
if (!isSupportedItemType(item)) {
|
if (!isSupportedItemType(item)) {
|
||||||
logger.trace("Ignoring item '{}' since its type {} is not supported", item.getName(), item.getType());
|
logger.trace("Ignoring item '{}' since its type {} is not supported", item.getName(), item.getType());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
final String name = alias == null ? item.getName() : alias;
|
final String name = alias == null ? item.getName() : alias;
|
||||||
|
|
||||||
|
Double value;
|
||||||
|
|
||||||
|
if (item instanceof NumberItem && item.getState() instanceof QuantityType) {
|
||||||
|
NumberItem nItem = (NumberItem) item;
|
||||||
|
QuantityType<?> qState = (QuantityType<?>) item.getState();
|
||||||
|
Unit<? extends Quantity<?>> unit = nItem.getUnit();
|
||||||
|
if (unit != null) {
|
||||||
|
QuantityType<?> convertedState = qState.toUnit(unit);
|
||||||
|
if (convertedState != null) {
|
||||||
|
value = convertedState.doubleValue();
|
||||||
|
} else {
|
||||||
|
value = null;
|
||||||
|
logger.warn(
|
||||||
|
"Failed to convert state '{}' to unit '{}'. Please check your item definition for correctness.",
|
||||||
|
qState, unit);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
value = qState.doubleValue();
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
DecimalType state = item.getStateAs(DecimalType.class);
|
||||||
|
if (state != null) {
|
||||||
|
value = state.toBigDecimal().doubleValue();
|
||||||
|
} else {
|
||||||
|
value = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (value == null) {
|
||||||
|
// we could not convert the value
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
long now = System.currentTimeMillis() / 1000;
|
||||||
|
|
||||||
|
scheduler.schedule(() -> internalStore(name, value, now, true), 0, TimeUnit.SECONDS);
|
||||||
|
}
|
||||||
|
|
||||||
|
private synchronized void internalStore(String name, double value, long now, boolean retry) {
|
||||||
RrdDb db = null;
|
RrdDb db = null;
|
||||||
try {
|
try {
|
||||||
db = getDB(name);
|
db = getDB(name);
|
||||||
|
@ -163,7 +202,6 @@ public class RRD4jPersistenceService implements QueryablePersistenceService {
|
||||||
}
|
}
|
||||||
|
|
||||||
ConsolFun function = getConsolidationFunction(db);
|
ConsolFun function = getConsolidationFunction(db);
|
||||||
long now = System.currentTimeMillis() / 1000;
|
|
||||||
if (function != ConsolFun.AVERAGE) {
|
if (function != ConsolFun.AVERAGE) {
|
||||||
try {
|
try {
|
||||||
// we store the last value again, so that the value change
|
// we store the last value again, so that the value change
|
||||||
|
@ -177,7 +215,8 @@ public class RRD4jPersistenceService implements QueryablePersistenceService {
|
||||||
sample.setTime(now - 1);
|
sample.setTime(now - 1);
|
||||||
sample.setValue(DATASOURCE_STATE, lastValue);
|
sample.setValue(DATASOURCE_STATE, lastValue);
|
||||||
sample.update();
|
sample.update();
|
||||||
logger.debug("Stored '{}' as value '{}' in rrd4j database (again)", name, lastValue);
|
logger.debug("Stored '{}' as value '{}' with timestamp {} in rrd4j database (again)", name,
|
||||||
|
lastValue, now - 1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
|
@ -187,50 +226,24 @@ public class RRD4jPersistenceService implements QueryablePersistenceService {
|
||||||
try {
|
try {
|
||||||
Sample sample = db.createSample();
|
Sample sample = db.createSample();
|
||||||
sample.setTime(now);
|
sample.setTime(now);
|
||||||
|
double storeValue = value;
|
||||||
Double value = null;
|
if (db.getDatasource(DATASOURCE_STATE).getType() == DsType.COUNTER) { // counter values must be
|
||||||
|
// adjusted by stepsize
|
||||||
if (item instanceof NumberItem && item.getState() instanceof QuantityType) {
|
storeValue = value * db.getRrdDef().getStep();
|
||||||
NumberItem nItem = (NumberItem) item;
|
|
||||||
QuantityType<?> qState = (QuantityType<?>) item.getState();
|
|
||||||
Unit<? extends Quantity<?>> unit = nItem.getUnit();
|
|
||||||
if (unit != null) {
|
|
||||||
QuantityType<?> convertedState = qState.toUnit(unit);
|
|
||||||
if (convertedState != null) {
|
|
||||||
value = convertedState.doubleValue();
|
|
||||||
} else {
|
|
||||||
logger.warn(
|
|
||||||
"Failed to convert state '{}' to unit '{}'. Please check your item definition for correctness.",
|
|
||||||
qState, unit);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
value = qState.doubleValue();
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
DecimalType state = item.getStateAs(DecimalType.class);
|
|
||||||
if (state != null) {
|
|
||||||
value = state.toBigDecimal().doubleValue();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (value != null) {
|
|
||||||
if (db.getDatasource(DATASOURCE_STATE).getType() == DsType.COUNTER) { // counter values must be
|
|
||||||
// adjusted by stepsize
|
|
||||||
value = value * db.getRrdDef().getStep();
|
|
||||||
}
|
|
||||||
sample.setValue(DATASOURCE_STATE, value);
|
|
||||||
sample.update();
|
|
||||||
logger.debug("Stored '{}' as value '{}' in rrd4j database", name, value);
|
|
||||||
}
|
}
|
||||||
|
sample.setValue(DATASOURCE_STATE, storeValue);
|
||||||
|
sample.update();
|
||||||
|
logger.debug("Stored '{}' as value '{}' with timestamp {} in rrd4j database", name, storeValue, now);
|
||||||
} catch (IllegalArgumentException e) {
|
} catch (IllegalArgumentException e) {
|
||||||
String message = e.getMessage();
|
String message = e.getMessage();
|
||||||
if (message != null && message.contains("at least one second step is required")) {
|
if (message != null && message.contains("at least one second step is required") && retry) {
|
||||||
// we try to store the value one second later
|
// we try to store the value one second later
|
||||||
ScheduledFuture<?> job = scheduledJobs.get(name);
|
ScheduledFuture<?> job = scheduledJobs.get(name);
|
||||||
if (job != null) {
|
if (job != null) {
|
||||||
job.cancel(true);
|
job.cancel(true);
|
||||||
scheduledJobs.remove(name);
|
scheduledJobs.remove(name);
|
||||||
}
|
}
|
||||||
job = scheduler.schedule(() -> store(item, name), 1, TimeUnit.SECONDS);
|
job = scheduler.schedule(() -> internalStore(name, value, now + 1, false), 1, TimeUnit.SECONDS);
|
||||||
scheduledJobs.put(name, job);
|
scheduledJobs.put(name, job);
|
||||||
} else {
|
} else {
|
||||||
logger.warn("Could not persist '{}' to rrd4j database: {}", name, e.getMessage());
|
logger.warn("Could not persist '{}' to rrd4j database: {}", name, e.getMessage());
|
||||||
|
|
Loading…
Reference in New Issue