Codebase as of c53e4aed26 as an initial commit for the shrunk repo

Signed-off-by: Kai Kreuzer <kai@openhab.org>
This commit is contained in:
Kai Kreuzer
2010-02-20 19:23:32 +01:00
committed by Kai Kreuzer
commit bbf1a7fd29
302 changed files with 29726 additions and 0 deletions

View File

@@ -0,0 +1,11 @@
<?xml version="1.0" encoding="UTF-8"?>
<features name="org.openhab.persistence.dynamodb-${project.version}" xmlns="http://karaf.apache.org/xmlns/features/v1.4.0">
<repository>mvn:org.openhab.core.features.karaf/org.openhab.core.features.karaf.openhab-core/${ohc.version}/xml/features</repository>
<feature name="openhab-persistence-dynamodb" description="DynamoDB Persistence" version="${project.version}">
<feature>openhab-runtime-base</feature>
<bundle start-level="80">mvn:org.openhab.addons.bundles/org.openhab.persistence.dynamodb/${project.version}</bundle>
<configfile finalname="${openhab.conf}/services/dynamodb.cfg" override="false">mvn:${project.groupId}/openhab-addons-external3/${project.version}/cfg/dynamodb</configfile>
</feature>
</features>

View File

@@ -0,0 +1,129 @@
/**
* Copyright (c) 2010-2020 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.persistence.dynamodb.internal;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.openhab.core.items.Item;
import org.openhab.core.persistence.PersistenceService;
import org.openhab.core.types.State;
import org.openhab.core.types.UnDefType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Abstract class for buffered persistence services
*
* @param <T> Type of the state as accepted by the AWS SDK.
*
* @author Sami Salonen - Initial contribution
* @author Kai Kreuzer - Migration to 3.x
*
*/
@NonNullByDefault
public abstract class AbstractBufferedPersistenceService<T> implements PersistenceService {
private static final long BUFFER_OFFER_TIMEOUT_MILLIS = 500;
private final Logger logger = LoggerFactory.getLogger(AbstractBufferedPersistenceService.class);
protected @Nullable BlockingQueue<T> buffer;
private boolean writeImmediately;
protected void resetWithBufferSize(int bufferSize) {
int capacity = Math.max(1, bufferSize);
buffer = new ArrayBlockingQueue<>(capacity, true);
writeImmediately = bufferSize == 0;
}
protected abstract T persistenceItemFromState(String name, State state, ZonedDateTime time);
protected abstract boolean isReadyToStore();
protected abstract void flushBufferedData();
@Override
public void store(Item item) {
store(item, null);
}
@Override
public void store(Item item, @Nullable String alias) {
long storeStart = System.currentTimeMillis();
String uuid = UUID.randomUUID().toString();
if (item.getState() instanceof UnDefType) {
logger.debug("Undefined item state received. Not storing item {}.", item.getName());
return;
}
if (!isReadyToStore()) {
return;
}
if (buffer == null) {
throw new IllegalStateException("Buffer not initialized with resetWithBufferSize. Bug?");
}
ZonedDateTime time = ZonedDateTime.ofInstant(Instant.ofEpochMilli(storeStart), ZoneId.systemDefault());
String realName = item.getName();
String name = (alias != null) ? alias : realName;
State state = item.getState();
T persistenceItem = persistenceItemFromState(name, state, time);
logger.trace("store() called with item {}, which was converted to {} [{}]", item, persistenceItem, uuid);
if (writeImmediately) {
logger.debug("Writing immediately item {} [{}]", realName, uuid);
// We want to write everything immediately
// Synchronous behavior to ensure buffer does not get full.
synchronized (this) {
boolean buffered = addToBuffer(persistenceItem);
assert buffered;
flushBufferedData();
}
} else {
long bufferStart = System.currentTimeMillis();
boolean buffered = addToBuffer(persistenceItem);
if (buffered) {
logger.debug("Buffered item {} in {} ms. Total time for store(): {} [{}]", realName,
System.currentTimeMillis() - bufferStart, System.currentTimeMillis() - storeStart, uuid);
} else {
logger.debug(
"Buffer is full. Writing buffered data immediately and trying again. Consider increasing bufferSize");
// Buffer is full, commit it immediately
flushBufferedData();
boolean buffered2 = addToBuffer(persistenceItem);
if (buffered2) {
logger.debug("Buffered item in {} ms (2nd try, flushed buffer in-between) [{}]",
System.currentTimeMillis() - bufferStart, uuid);
} else {
// The unlikely case happened -- buffer got full again immediately
logger.warn("Buffering failed for the second time -- Too small bufferSize? Discarding data [{}]",
uuid);
}
}
}
}
protected boolean addToBuffer(T persistenceItem) {
try {
return buffer != null && buffer.offer(persistenceItem, BUFFER_OFFER_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
logger.warn("Interrupted when trying to buffer data! Dropping data");
return false;
}
}
}

View File

@@ -0,0 +1,216 @@
/**
* Copyright (c) 2010-2020 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.persistence.dynamodb.internal;
import java.math.BigDecimal;
import java.text.DateFormat;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.util.HashMap;
import java.util.Map;
import org.openhab.core.items.Item;
import org.openhab.core.library.items.CallItem;
import org.openhab.core.library.items.ColorItem;
import org.openhab.core.library.items.ContactItem;
import org.openhab.core.library.items.DateTimeItem;
import org.openhab.core.library.items.DimmerItem;
import org.openhab.core.library.items.LocationItem;
import org.openhab.core.library.items.NumberItem;
import org.openhab.core.library.items.PlayerItem;
import org.openhab.core.library.items.RollershutterItem;
import org.openhab.core.library.items.StringItem;
import org.openhab.core.library.items.SwitchItem;
import org.openhab.core.library.types.DateTimeType;
import org.openhab.core.library.types.DecimalType;
import org.openhab.core.library.types.HSBType;
import org.openhab.core.library.types.OnOffType;
import org.openhab.core.library.types.OpenClosedType;
import org.openhab.core.library.types.PercentType;
import org.openhab.core.library.types.PlayPauseType;
import org.openhab.core.library.types.PointType;
import org.openhab.core.library.types.RewindFastforwardType;
import org.openhab.core.library.types.StringListType;
import org.openhab.core.library.types.StringType;
import org.openhab.core.library.types.UpDownType;
import org.openhab.core.persistence.HistoricItem;
import org.openhab.core.types.State;
import org.openhab.core.types.UnDefType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Base class for all DynamoDBItem. Represents openHAB Item serialized in a suitable format for the database
*
* @param <T> Type of the state as accepted by the AWS SDK.
*
* @author Sami Salonen - Initial contribution
*/
public abstract class AbstractDynamoDBItem<T> implements DynamoDBItem<T> {
public static final DateTimeFormatter DATEFORMATTER = DateTimeFormatter.ofPattern(DATE_FORMAT)
.withZone(ZoneId.of("UTC"));
private static final String UNDEFINED_PLACEHOLDER = "<org.openhab.core.types.UnDefType.UNDEF>";
private static final Map<Class<? extends Item>, Class<? extends DynamoDBItem<?>>> ITEM_CLASS_MAP = new HashMap<>();
static {
ITEM_CLASS_MAP.put(CallItem.class, DynamoDBStringItem.class);
ITEM_CLASS_MAP.put(ContactItem.class, DynamoDBBigDecimalItem.class);
ITEM_CLASS_MAP.put(DateTimeItem.class, DynamoDBStringItem.class);
ITEM_CLASS_MAP.put(LocationItem.class, DynamoDBStringItem.class);
ITEM_CLASS_MAP.put(NumberItem.class, DynamoDBBigDecimalItem.class);
ITEM_CLASS_MAP.put(RollershutterItem.class, DynamoDBBigDecimalItem.class);
ITEM_CLASS_MAP.put(StringItem.class, DynamoDBStringItem.class);
ITEM_CLASS_MAP.put(SwitchItem.class, DynamoDBBigDecimalItem.class);
ITEM_CLASS_MAP.put(DimmerItem.class, DynamoDBBigDecimalItem.class); // inherited from SwitchItem (!)
ITEM_CLASS_MAP.put(ColorItem.class, DynamoDBStringItem.class); // inherited from DimmerItem
ITEM_CLASS_MAP.put(PlayerItem.class, DynamoDBStringItem.class);
}
public static final Class<DynamoDBItem<?>> getDynamoItemClass(Class<? extends Item> itemClass)
throws NullPointerException {
@SuppressWarnings("unchecked")
Class<DynamoDBItem<?>> dtoclass = (Class<DynamoDBItem<?>>) ITEM_CLASS_MAP.get(itemClass);
if (dtoclass == null) {
throw new IllegalArgumentException(String.format("Unknown item class %s", itemClass));
}
return dtoclass;
}
private final Logger logger = LoggerFactory.getLogger(AbstractDynamoDBItem.class);
protected String name;
protected T state;
protected ZonedDateTime time;
public AbstractDynamoDBItem(String name, T state, ZonedDateTime time) {
this.name = name;
this.state = state;
this.time = time;
}
public static DynamoDBItem<?> fromState(String name, State state, ZonedDateTime time) {
if (state instanceof DecimalType && !(state instanceof HSBType)) {
// also covers PercentType which is inherited from DecimalType
return new DynamoDBBigDecimalItem(name, ((DecimalType) state).toBigDecimal(), time);
} else if (state instanceof OnOffType) {
return new DynamoDBBigDecimalItem(name,
((OnOffType) state) == OnOffType.ON ? BigDecimal.ONE : BigDecimal.ZERO, time);
} else if (state instanceof OpenClosedType) {
return new DynamoDBBigDecimalItem(name,
((OpenClosedType) state) == OpenClosedType.OPEN ? BigDecimal.ONE : BigDecimal.ZERO, time);
} else if (state instanceof UpDownType) {
return new DynamoDBBigDecimalItem(name,
((UpDownType) state) == UpDownType.UP ? BigDecimal.ONE : BigDecimal.ZERO, time);
} else if (state instanceof DateTimeType) {
return new DynamoDBStringItem(name, ((DateTimeType) state).getZonedDateTime().format(DATEFORMATTER), time);
} else if (state instanceof UnDefType) {
return new DynamoDBStringItem(name, UNDEFINED_PLACEHOLDER, time);
} else if (state instanceof StringListType) {
return new DynamoDBStringItem(name, state.toFullString(), time);
} else {
// HSBType, PointType, PlayPauseType and StringType
return new DynamoDBStringItem(name, state.toFullString(), time);
}
}
@Override
public HistoricItem asHistoricItem(final Item item) {
final State[] state = new State[1];
accept(new DynamoDBItemVisitor() {
@Override
public void visit(DynamoDBStringItem dynamoStringItem) {
if (item instanceof ColorItem) {
state[0] = new HSBType(dynamoStringItem.getState());
} else if (item instanceof LocationItem) {
state[0] = new PointType(dynamoStringItem.getState());
} else if (item instanceof PlayerItem) {
String value = dynamoStringItem.getState();
try {
state[0] = PlayPauseType.valueOf(value);
} catch (IllegalArgumentException e) {
state[0] = RewindFastforwardType.valueOf(value);
}
} else if (item instanceof DateTimeItem) {
try {
// Parse ZoneDateTime from string. DATEFORMATTER assumes UTC in case it is not clear
// from the string (should be).
// We convert to default/local timezone for user convenience (e.g. display)
state[0] = new DateTimeType(ZonedDateTime.parse(dynamoStringItem.getState(), DATEFORMATTER)
.withZoneSameInstant(ZoneId.systemDefault()));
} catch (DateTimeParseException e) {
logger.warn("Failed to parse {} as date. Outputting UNDEF instead",
dynamoStringItem.getState());
state[0] = UnDefType.UNDEF;
}
} else if (dynamoStringItem.getState().equals(UNDEFINED_PLACEHOLDER)) {
state[0] = UnDefType.UNDEF;
} else if (item instanceof CallItem) {
String parts = dynamoStringItem.getState();
String[] strings = parts.split(",");
String orig = strings[0];
String dest = strings[1];
state[0] = new StringListType(orig, dest);
} else {
state[0] = new StringType(dynamoStringItem.getState());
}
}
@Override
public void visit(DynamoDBBigDecimalItem dynamoBigDecimalItem) {
if (item instanceof NumberItem) {
state[0] = new DecimalType(dynamoBigDecimalItem.getState());
} else if (item instanceof DimmerItem) {
state[0] = new PercentType(dynamoBigDecimalItem.getState());
} else if (item instanceof SwitchItem) {
state[0] = dynamoBigDecimalItem.getState().compareTo(BigDecimal.ONE) == 0 ? OnOffType.ON
: OnOffType.OFF;
} else if (item instanceof ContactItem) {
state[0] = dynamoBigDecimalItem.getState().compareTo(BigDecimal.ONE) == 0 ? OpenClosedType.OPEN
: OpenClosedType.CLOSED;
} else if (item instanceof RollershutterItem) {
state[0] = new PercentType(dynamoBigDecimalItem.getState());
} else {
logger.warn("Not sure how to convert big decimal item {} to type {}. Using StringType as fallback",
dynamoBigDecimalItem.getName(), item.getClass());
state[0] = new StringType(dynamoBigDecimalItem.getState().toString());
}
}
});
return new DynamoDBHistoricItem(getName(), state[0], getTime());
}
/**
* We define all getter and setters in the child class implement those. Having the getter
* and setter implementations here in the parent class does not work with introspection done by AWS SDK (1.11.56).
*/
/*
* (non-Javadoc)
*
* @see org.openhab.persistence.dynamodb.internal.DynamoItem#accept(org.openhab.persistence.dynamodb.internal.
* DynamoItemVisitor)
*/
@Override
public abstract void accept(DynamoDBItemVisitor visitor);
@Override
public String toString() {
return DateFormat.getDateTimeInstance().format(time) + ": " + name + " -> " + state.toString();
}
}

View File

@@ -0,0 +1,95 @@
/**
* Copyright (c) 2010-2020 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.persistence.dynamodb.internal;
import java.math.BigDecimal;
import java.math.MathContext;
import java.time.ZonedDateTime;
import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBAttribute;
import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBDocument;
import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBHashKey;
import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBRangeKey;
/**
* DynamoDBItem for items that can be serialized as DynamoDB number
*
* @author Sami Salonen - Initial contribution
*/
@DynamoDBDocument
public class DynamoDBBigDecimalItem extends AbstractDynamoDBItem<BigDecimal> {
/**
* We get the following error if the BigDecimal has too many digits
* "Attempting to store more than 38 significant digits in a Number"
*
* See "Data types" section in
* http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Limits.html
*/
private static final int MAX_DIGITS_SUPPORTED_BY_AMAZON = 38;
public DynamoDBBigDecimalItem() {
this(null, null, null);
}
public DynamoDBBigDecimalItem(String name, BigDecimal state, ZonedDateTime time) {
super(name, state, time);
}
@DynamoDBAttribute(attributeName = DynamoDBItem.ATTRIBUTE_NAME_ITEMSTATE)
@Override
public BigDecimal getState() {
// When serializing this to the wire, we round the number in order to ensure
// that it is within the dynamodb limits
return loseDigits(state);
}
@DynamoDBHashKey(attributeName = DynamoDBItem.ATTRIBUTE_NAME_ITEMNAME)
@Override
public String getName() {
return name;
}
@Override
@DynamoDBRangeKey(attributeName = ATTRIBUTE_NAME_TIMEUTC)
public ZonedDateTime getTime() {
return time;
}
@Override
public void setName(String name) {
this.name = name;
}
@Override
public void setState(BigDecimal state) {
this.state = state;
}
@Override
public void setTime(ZonedDateTime time) {
this.time = time;
}
@Override
public void accept(org.openhab.persistence.dynamodb.internal.DynamoDBItemVisitor visitor) {
visitor.visit(this);
}
static BigDecimal loseDigits(BigDecimal number) {
if (number == null) {
return null;
}
return number.round(new MathContext(MAX_DIGITS_SUPPORTED_BY_AMAZON));
}
}

View File

@@ -0,0 +1,66 @@
/**
* Copyright (c) 2010-2020 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.persistence.dynamodb.internal;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
/**
* Shallow wrapper for Dynamo DB wrappers
*
* @author Sami Salonen - Initial contribution
*/
public class DynamoDBClient {
private final Logger logger = LoggerFactory.getLogger(DynamoDBClient.class);
private DynamoDB dynamo;
private AmazonDynamoDB client;
public DynamoDBClient(AWSCredentials credentials, Regions region) {
client = AmazonDynamoDBClientBuilder.standard().withRegion(region)
.withCredentials(new AWSStaticCredentialsProvider(credentials)).build();
dynamo = new DynamoDB(client);
}
public DynamoDBClient(DynamoDBConfig clientConfig) {
this(clientConfig.getCredentials(), clientConfig.getRegion());
}
public AmazonDynamoDB getDynamoClient() {
return client;
}
public DynamoDB getDynamoDB() {
return dynamo;
}
public void shutdown() {
dynamo.shutdown();
}
public boolean checkConnection() {
try {
dynamo.listTables(1).firstPage();
} catch (Exception e) {
logger.warn("Got internal server error when trying to list tables: {}", e.getMessage());
return false;
}
return true;
}
}

View File

@@ -0,0 +1,195 @@
/**
* Copyright (c) 2010-2020 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.persistence.dynamodb.internal;
import java.util.Arrays;
import java.util.Map;
import java.util.stream.Collectors;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.profile.ProfilesConfigFile;
import com.amazonaws.regions.Regions;
/**
* Configuration for DynamoDB connections
*
* @author Sami Salonen - Initial contribution
*/
@NonNullByDefault
public class DynamoDBConfig {
public static final String DEFAULT_TABLE_PREFIX = "openhab-";
public static final boolean DEFAULT_CREATE_TABLE_ON_DEMAND = true;
public static final long DEFAULT_READ_CAPACITY_UNITS = 1;
public static final long DEFAULT_WRITE_CAPACITY_UNITS = 1;
public static final long DEFAULT_BUFFER_COMMIT_INTERVAL_MILLIS = 1000;
public static final int DEFAULT_BUFFER_SIZE = 1000;
private static final Logger LOGGER = LoggerFactory.getLogger(DynamoDBConfig.class);
private String tablePrefix = DEFAULT_TABLE_PREFIX;
private Regions region;
private AWSCredentials credentials;
private boolean createTable = DEFAULT_CREATE_TABLE_ON_DEMAND;
private long readCapacityUnits = DEFAULT_READ_CAPACITY_UNITS;
private long writeCapacityUnits = DEFAULT_WRITE_CAPACITY_UNITS;
private long bufferCommitIntervalMillis = DEFAULT_BUFFER_COMMIT_INTERVAL_MILLIS;
private int bufferSize = DEFAULT_BUFFER_SIZE;
/**
*
* @param config persistence service configuration
* @return DynamoDB configuration. Returns null in case of configuration errors
*/
public static @Nullable DynamoDBConfig fromConfig(Map<String, Object> config) {
try {
String regionName = (String) config.get("region");
if (regionName == null) {
return null;
}
final Regions region;
try {
region = Regions.fromName(regionName);
} catch (IllegalArgumentException e) {
LOGGER.error("Specify valid AWS region to use, got {}. Valid values include: {}", regionName, Arrays
.asList(Regions.values()).stream().map(r -> r.getName()).collect(Collectors.joining(",")));
return null;
}
AWSCredentials credentials;
String accessKey = (String) config.get("accessKey");
String secretKey = (String) config.get("secretKey");
if (accessKey != null && !accessKey.isBlank() && secretKey != null && !secretKey.isBlank()) {
LOGGER.debug("accessKey and secretKey specified. Using those.");
credentials = new BasicAWSCredentials(accessKey, secretKey);
} else {
LOGGER.debug("accessKey and/or secretKey blank. Checking profilesConfigFile and profile.");
String profilesConfigFile = (String) config.get("profilesConfigFile");
String profile = (String) config.get("profile");
if (profilesConfigFile == null || profilesConfigFile.isBlank() || profile == null
|| profile.isBlank()) {
LOGGER.error("Specify either 1) accessKey and secretKey; or 2) profilesConfigFile and "
+ "profile for providing AWS credentials");
return null;
}
credentials = new ProfilesConfigFile(profilesConfigFile).getCredentials(profile);
}
String table = (String) config.get("tablePrefix");
if (table == null || table.isBlank()) {
LOGGER.debug("Using default table name {}", DEFAULT_TABLE_PREFIX);
table = DEFAULT_TABLE_PREFIX;
}
final boolean createTable;
String createTableParam = (String) config.get("createTable");
if (createTableParam == null || createTableParam.isBlank()) {
LOGGER.debug("Creating table on demand: {}", DEFAULT_CREATE_TABLE_ON_DEMAND);
createTable = DEFAULT_CREATE_TABLE_ON_DEMAND;
} else {
createTable = Boolean.parseBoolean(createTableParam);
}
final long readCapacityUnits;
String readCapacityUnitsParam = (String) config.get("readCapacityUnits");
if (readCapacityUnitsParam == null || readCapacityUnitsParam.isBlank()) {
LOGGER.debug("Read capacity units: {}", DEFAULT_READ_CAPACITY_UNITS);
readCapacityUnits = DEFAULT_READ_CAPACITY_UNITS;
} else {
readCapacityUnits = Long.parseLong(readCapacityUnitsParam);
}
final long writeCapacityUnits;
String writeCapacityUnitsParam = (String) config.get("writeCapacityUnits");
if (writeCapacityUnitsParam == null || writeCapacityUnitsParam.isBlank()) {
LOGGER.debug("Write capacity units: {}", DEFAULT_WRITE_CAPACITY_UNITS);
writeCapacityUnits = DEFAULT_WRITE_CAPACITY_UNITS;
} else {
writeCapacityUnits = Long.parseLong(writeCapacityUnitsParam);
}
final long bufferCommitIntervalMillis;
String bufferCommitIntervalMillisParam = (String) config.get("bufferCommitIntervalMillis");
if (bufferCommitIntervalMillisParam == null || bufferCommitIntervalMillisParam.isBlank()) {
LOGGER.debug("Buffer commit interval millis: {}", DEFAULT_BUFFER_COMMIT_INTERVAL_MILLIS);
bufferCommitIntervalMillis = DEFAULT_BUFFER_COMMIT_INTERVAL_MILLIS;
} else {
bufferCommitIntervalMillis = Long.parseLong(bufferCommitIntervalMillisParam);
}
final int bufferSize;
String bufferSizeParam = (String) config.get("bufferSize");
if (bufferSizeParam == null || bufferSizeParam.isBlank()) {
LOGGER.debug("Buffer size: {}", DEFAULT_BUFFER_SIZE);
bufferSize = DEFAULT_BUFFER_SIZE;
} else {
bufferSize = Integer.parseInt(bufferSizeParam);
}
return new DynamoDBConfig(region, credentials, table, createTable, readCapacityUnits, writeCapacityUnits,
bufferCommitIntervalMillis, bufferSize);
} catch (Exception e) {
LOGGER.error("Error with configuration", e);
return null;
}
}
public DynamoDBConfig(Regions region, AWSCredentials credentials, String table, boolean createTable,
long readCapacityUnits, long writeCapacityUnits, long bufferCommitIntervalMillis, int bufferSize) {
this.region = region;
this.credentials = credentials;
this.tablePrefix = table;
this.createTable = createTable;
this.readCapacityUnits = readCapacityUnits;
this.writeCapacityUnits = writeCapacityUnits;
this.bufferCommitIntervalMillis = bufferCommitIntervalMillis;
this.bufferSize = bufferSize;
}
public AWSCredentials getCredentials() {
return credentials;
}
public String getTablePrefix() {
return tablePrefix;
}
public Regions getRegion() {
return region;
}
public boolean isCreateTable() {
return createTable;
}
public long getReadCapacityUnits() {
return readCapacityUnits;
}
public long getWriteCapacityUnits() {
return writeCapacityUnits;
}
public long getBufferCommitIntervalMillis() {
return bufferCommitIntervalMillis;
}
public int getBufferSize() {
return bufferSize;
}
}

View File

@@ -0,0 +1,58 @@
/**
* Copyright (c) 2010-2020 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.persistence.dynamodb.internal;
import java.text.DateFormat;
import java.time.ZonedDateTime;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.openhab.core.persistence.HistoricItem;
import org.openhab.core.types.State;
/**
* This is a Java bean used to return historic items from Dynamodb.
*
* @author Sami Salonen - Initial contribution
*/
@NonNullByDefault
public class DynamoDBHistoricItem implements HistoricItem {
private final String name;
private final State state;
private final ZonedDateTime timestamp;
public DynamoDBHistoricItem(String name, State state, ZonedDateTime timestamp) {
this.name = name;
this.state = state;
this.timestamp = timestamp;
}
@Override
public String getName() {
return name;
}
@Override
public ZonedDateTime getTimestamp() {
return timestamp;
}
@Override
public State getState() {
return state;
}
@Override
public String toString() {
return DateFormat.getDateTimeInstance().format(timestamp) + ": " + name + " -> " + state.toString();
}
}

View File

@@ -0,0 +1,58 @@
/**
* Copyright (c) 2010-2020 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.persistence.dynamodb.internal;
import java.time.ZonedDateTime;
import org.openhab.core.items.Item;
import org.openhab.core.persistence.HistoricItem;
/**
* Represents openHAB Item serialized in a suitable format for the database
*
* @param <T> Type of the state as accepted by the AWS SDK.
*
* @author Sami Salonen - Initial contribution
*/
public interface DynamoDBItem<T> {
static final String DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
static final String ATTRIBUTE_NAME_TIMEUTC = "timeutc";
static final String ATTRIBUTE_NAME_ITEMNAME = "itemname";
static final String ATTRIBUTE_NAME_ITEMSTATE = "itemstate";
/**
* Convert this AbstractDynamoItem as HistoricItem.
*
* @param item Item representing this item. Used to determine item type.
* @return HistoricItem representing this DynamoDBItem.
*/
HistoricItem asHistoricItem(Item item);
String getName();
T getState();
ZonedDateTime getTime();
void setName(String name);
void setState(T state);
void setTime(ZonedDateTime time);
void accept(DynamoDBItemVisitor visitor);
}

View File

@@ -0,0 +1,29 @@
/**
* Copyright (c) 2010-2020 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.persistence.dynamodb.internal;
import org.eclipse.jdt.annotation.NonNullByDefault;
/**
* Visitor for DynamoDBItem
*
* @author Sami Salonen - Initial contribution
*
*/
@NonNullByDefault
public interface DynamoDBItemVisitor {
public void visit(DynamoDBBigDecimalItem dynamoBigDecimalItem);
public void visit(DynamoDBStringItem dynamoStringItem);
}

View File

@@ -0,0 +1,569 @@
/**
* Copyright (c) 2010-2020 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.persistence.dynamodb.internal;
import java.time.ZonedDateTime;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.openhab.core.common.NamedThreadFactory;
import org.openhab.core.config.core.ConfigurableService;
import org.openhab.core.items.Item;
import org.openhab.core.items.ItemNotFoundException;
import org.openhab.core.items.ItemRegistry;
import org.openhab.core.persistence.FilterCriteria;
import org.openhab.core.persistence.HistoricItem;
import org.openhab.core.persistence.PersistenceItemInfo;
import org.openhab.core.persistence.PersistenceService;
import org.openhab.core.persistence.QueryablePersistenceService;
import org.openhab.core.persistence.strategy.PersistenceStrategy;
import org.openhab.core.types.State;
import org.osgi.framework.BundleContext;
import org.osgi.framework.Constants;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBMapper;
import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBMapper.FailedBatch;
import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBMapperConfig;
import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBMapperConfig.PaginationLoadingStrategy;
import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBQueryExpression;
import com.amazonaws.services.dynamodbv2.datamodeling.PaginatedQueryList;
import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome;
import com.amazonaws.services.dynamodbv2.model.CreateTableRequest;
import com.amazonaws.services.dynamodbv2.model.GlobalSecondaryIndex;
import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput;
import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException;
import com.amazonaws.services.dynamodbv2.model.TableDescription;
import com.amazonaws.services.dynamodbv2.model.TableStatus;
import com.amazonaws.services.dynamodbv2.model.WriteRequest;
/**
* This is the implementation of the DynamoDB {@link PersistenceService}. It persists item values
* using the <a href="https://aws.amazon.com/dynamodb/">Amazon DynamoDB</a> database. The states (
* {@link State}) of an {@link Item} are persisted in DynamoDB tables.
*
* The service creates tables automatically, one for numbers, and one for strings.
*
* @see AbstractDynamoDBItem.fromState for details how different items are persisted
*
* @author Sami Salonen - Initial contribution
* @author Kai Kreuzer - Migration to 3.x
*
*/
@NonNullByDefault
@Component(service = { PersistenceService.class,
QueryablePersistenceService.class }, configurationPid = "org.openhab.dynamodb", //
property = Constants.SERVICE_PID + "=org.openhab.dynamodb")
@ConfigurableService(category = "persistence", label = "DynamoDB Persistence Service", description_uri = DynamoDBPersistenceService.CONFIG_URI)
public class DynamoDBPersistenceService extends AbstractBufferedPersistenceService<DynamoDBItem<?>>
implements QueryablePersistenceService {
protected static final String CONFIG_URI = "persistence:dynamodb";
private class ExponentialBackoffRetry implements Runnable {
private int retry;
private Map<String, List<WriteRequest>> unprocessedItems;
private @Nullable Exception lastException;
public ExponentialBackoffRetry(Map<String, List<WriteRequest>> unprocessedItems) {
this.unprocessedItems = unprocessedItems;
}
@Override
public void run() {
logger.debug("Error storing object to dynamo, unprocessed items: {}. Retrying with exponential back-off",
unprocessedItems);
lastException = null;
while (!unprocessedItems.isEmpty() && retry < WAIT_MILLIS_IN_RETRIES.length) {
if (!sleep()) {
// Interrupted
return;
}
retry++;
try {
BatchWriteItemOutcome outcome = DynamoDBPersistenceService.this.db.getDynamoDB()
.batchWriteItemUnprocessed(unprocessedItems);
unprocessedItems = outcome.getUnprocessedItems();
lastException = null;
} catch (AmazonServiceException e) {
if (e instanceof ResourceNotFoundException) {
logger.debug(
"DynamoDB query raised unexpected exception: {}. This might happen if table was recently created",
e.getMessage());
} else {
logger.debug("DynamoDB query raised unexpected exception: {}.", e.getMessage());
}
lastException = e;
continue;
}
}
if (unprocessedItems.isEmpty()) {
logger.debug("After {} retries successfully wrote all unprocessed items", retry);
} else {
logger.warn(
"Even after retries failed to write some items. Last exception: {} {}, unprocessed items: {}",
lastException == null ? "null" : lastException.getClass().getName(),
lastException == null ? "null" : lastException.getMessage(), unprocessedItems);
}
}
private boolean sleep() {
try {
long sleepTime;
if (retry == 1 && lastException != null && lastException instanceof ResourceNotFoundException) {
sleepTime = WAIT_ON_FIRST_RESOURCE_NOT_FOUND_MILLIS;
} else {
sleepTime = WAIT_MILLIS_IN_RETRIES[retry];
}
Thread.sleep(sleepTime);
return true;
} catch (InterruptedException e) {
logger.debug("Interrupted while writing data!");
return false;
}
}
public Map<String, List<WriteRequest>> getUnprocessedItems() {
return unprocessedItems;
}
}
private static final int WAIT_ON_FIRST_RESOURCE_NOT_FOUND_MILLIS = 5000;
private static final int[] WAIT_MILLIS_IN_RETRIES = new int[] { 100, 100, 200, 300, 500 };
private static final String DYNAMODB_THREADPOOL_NAME = "dynamodbPersistenceService";
private final ItemRegistry itemRegistry;
private @Nullable DynamoDBClient db;
private final Logger logger = LoggerFactory.getLogger(DynamoDBPersistenceService.class);
private boolean isProperlyConfigured;
private @NonNullByDefault({}) DynamoDBConfig dbConfig;
private @NonNullByDefault({}) DynamoDBTableNameResolver tableNameResolver;
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1,
new NamedThreadFactory(DYNAMODB_THREADPOOL_NAME));
private @Nullable ScheduledFuture<?> writeBufferedDataFuture;
@Activate
public DynamoDBPersistenceService(final @Reference ItemRegistry itemRegistry) {
this.itemRegistry = itemRegistry;
}
/**
* For testing. Allows access to underlying DynamoDBClient.
*
* @return DynamoDBClient connected to AWS Dyanamo DB.
*/
@Nullable
DynamoDBClient getDb() {
return db;
}
@Activate
public void activate(final @Nullable BundleContext bundleContext, final Map<String, Object> config) {
resetClient();
dbConfig = DynamoDBConfig.fromConfig(config);
if (dbConfig == null) {
// Configuration was invalid. Abort service activation.
// Error is already logger in fromConfig.
return;
}
tableNameResolver = new DynamoDBTableNameResolver(dbConfig.getTablePrefix());
try {
if (!ensureClient()) {
logger.error("Error creating dynamodb database client. Aborting service activation.");
return;
}
} catch (Exception e) {
logger.error("Error constructing dynamodb client", e);
return;
}
writeBufferedDataFuture = null;
resetWithBufferSize(dbConfig.getBufferSize());
long commitIntervalMillis = dbConfig.getBufferCommitIntervalMillis();
if (commitIntervalMillis > 0) {
writeBufferedDataFuture = scheduler.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
DynamoDBPersistenceService.this.flushBufferedData();
} catch (RuntimeException e) {
// We want to catch all unexpected exceptions since all unhandled exceptions make
// ScheduledExecutorService halt the regular running of the task.
// It is better to print out the exception, and try again
// (on next cycle)
logger.warn(
"Execution of scheduled flushing of buffered data failed unexpectedly. Ignoring exception, trying again according to configured commit interval of {} ms.",
commitIntervalMillis, e);
}
}
}, 0, commitIntervalMillis, TimeUnit.MILLISECONDS);
}
isProperlyConfigured = true;
logger.debug("dynamodb persistence service activated");
}
@Deactivate
public void deactivate() {
logger.debug("dynamodb persistence service deactivated");
if (writeBufferedDataFuture != null) {
writeBufferedDataFuture.cancel(false);
writeBufferedDataFuture = null;
}
resetClient();
}
/**
* Initializes DynamoDBClient (db field)
*
* If DynamoDBClient constructor throws an exception, error is logged and false is returned.
*
* @return whether initialization was successful.
*/
private boolean ensureClient() {
if (db == null) {
try {
db = new DynamoDBClient(dbConfig);
} catch (Exception e) {
logger.error("Error constructing dynamodb client", e);
return false;
}
}
return true;
}
@Override
public DynamoDBItem<?> persistenceItemFromState(String name, State state, ZonedDateTime time) {
return AbstractDynamoDBItem.fromState(name, state, time);
}
/**
* Create table (if not present) and wait for table to become active.
*
* Synchronized in order to ensure that at most single thread is creating the table at a time
*
* @param mapper
* @param dtoClass
* @return whether table creation succeeded.
*/
private synchronized boolean createTable(DynamoDBMapper mapper, Class<?> dtoClass) {
if (db == null) {
return false;
}
String tableName;
try {
ProvisionedThroughput provisionedThroughput = new ProvisionedThroughput(dbConfig.getReadCapacityUnits(),
dbConfig.getWriteCapacityUnits());
CreateTableRequest request = mapper.generateCreateTableRequest(dtoClass);
request.setProvisionedThroughput(provisionedThroughput);
if (request.getGlobalSecondaryIndexes() != null) {
for (GlobalSecondaryIndex index : request.getGlobalSecondaryIndexes()) {
index.setProvisionedThroughput(provisionedThroughput);
}
}
tableName = request.getTableName();
try {
db.getDynamoClient().describeTable(tableName);
} catch (ResourceNotFoundException e) {
// No table present, continue with creation
db.getDynamoClient().createTable(request);
} catch (AmazonClientException e) {
logger.error("Table creation failed due to error in describeTable operation", e);
return false;
}
// table found or just created, wait
return waitForTableToBecomeActive(tableName);
} catch (AmazonClientException e) {
logger.error("Exception when creating table", e);
return false;
}
}
private boolean waitForTableToBecomeActive(String tableName) {
try {
logger.debug("Checking if table '{}' is created...", tableName);
final TableDescription tableDescription;
try {
tableDescription = db.getDynamoDB().getTable(tableName).waitForActive();
} catch (IllegalArgumentException e) {
logger.warn("Table '{}' is being deleted: {} {}", tableName, e.getClass().getSimpleName(),
e.getMessage());
return false;
} catch (ResourceNotFoundException e) {
logger.warn("Table '{}' was deleted unexpectedly: {} {}", tableName, e.getClass().getSimpleName(),
e.getMessage());
return false;
}
boolean success = TableStatus.ACTIVE.equals(TableStatus.fromValue(tableDescription.getTableStatus()));
if (success) {
logger.debug("Creation of table '{}' successful, table status is now {}", tableName,
tableDescription.getTableStatus());
} else {
logger.warn("Creation of table '{}' unsuccessful, table status is now {}", tableName,
tableDescription.getTableStatus());
}
return success;
} catch (AmazonClientException e) {
logger.error("Exception when checking table status (describe): {}", e.getMessage());
return false;
} catch (InterruptedException e) {
logger.error("Interrupted while trying to check table status: {}", e.getMessage());
return false;
}
}
private void resetClient() {
if (db == null) {
return;
}
db.shutdown();
db = null;
dbConfig = null;
tableNameResolver = null;
isProperlyConfigured = false;
}
private DynamoDBMapper getDBMapper(String tableName) {
try {
DynamoDBMapperConfig mapperConfig = new DynamoDBMapperConfig.Builder()
.withTableNameOverride(new DynamoDBMapperConfig.TableNameOverride(tableName))
.withPaginationLoadingStrategy(PaginationLoadingStrategy.LAZY_LOADING).build();
return new DynamoDBMapper(db.getDynamoClient(), mapperConfig);
} catch (AmazonClientException e) {
logger.error("Error getting db mapper: {}", e.getMessage());
throw e;
}
}
@Override
protected boolean isReadyToStore() {
return isProperlyConfigured && ensureClient();
}
@Override
public String getId() {
return "dynamodb";
}
@Override
public String getLabel(@Nullable Locale locale) {
return "DynamoDB";
}
@Override
public Set<PersistenceItemInfo> getItemInfo() {
return Collections.emptySet();
}
@Override
protected void flushBufferedData() {
if (buffer != null && buffer.isEmpty()) {
return;
}
logger.debug("Writing buffered data. Buffer size: {}", buffer.size());
for (;;) {
Map<String, Deque<DynamoDBItem<?>>> itemsByTable = readBuffer();
// Write batch of data, one table at a time
for (Entry<String, Deque<DynamoDBItem<?>>> entry : itemsByTable.entrySet()) {
String tableName = entry.getKey();
Deque<DynamoDBItem<?>> batch = entry.getValue();
if (!batch.isEmpty()) {
flushBatch(getDBMapper(tableName), batch);
}
}
if (buffer != null && buffer.isEmpty()) {
break;
}
}
}
private Map<String, Deque<DynamoDBItem<?>>> readBuffer() {
Map<String, Deque<DynamoDBItem<?>>> batchesByTable = new HashMap<>(2);
// Get batch of data
while (!buffer.isEmpty()) {
DynamoDBItem<?> dynamoItem = buffer.poll();
if (dynamoItem == null) {
break;
}
String tableName = tableNameResolver.fromItem(dynamoItem);
Deque<DynamoDBItem<?>> batch = batchesByTable.computeIfAbsent(tableName, new Function<>() {
@Override
public Deque<DynamoDBItem<?>> apply(String t) {
return new ArrayDeque<>();
}
});
batch.add(dynamoItem);
}
return batchesByTable;
}
/**
* Flush batch of data to DynamoDB
*
* @param mapper mapper associated with the batch
* @param batch batch of data to write to DynamoDB
*/
private void flushBatch(DynamoDBMapper mapper, Deque<DynamoDBItem<?>> batch) {
long currentTimeMillis = System.currentTimeMillis();
List<FailedBatch> failed = mapper.batchSave(batch);
for (FailedBatch failedBatch : failed) {
if (failedBatch.getException() instanceof ResourceNotFoundException) {
// Table did not exist. Try again after creating table
retryFlushAfterCreatingTable(mapper, batch, failedBatch);
} else {
logger.debug("Batch failed with {}. Retrying next with exponential back-off",
failedBatch.getException().getMessage());
new ExponentialBackoffRetry(failedBatch.getUnprocessedItems()).run();
}
}
if (failed.isEmpty()) {
logger.debug("flushBatch ended with {} items in {} ms: {}", batch.size(),
System.currentTimeMillis() - currentTimeMillis, batch);
} else {
logger.warn(
"flushBatch ended with {} items in {} ms: {}. There were some failed batches that were retried -- check logs for ERRORs to see if writes were successful",
batch.size(), System.currentTimeMillis() - currentTimeMillis, batch);
}
}
/**
* Retry flushing data after creating table associated with mapper
*
* @param mapper mapper associated with the batch
* @param batch original batch of data. Used for logging and to determine table name
* @param failedBatch failed batch that should be retried
*/
private void retryFlushAfterCreatingTable(DynamoDBMapper mapper, Deque<DynamoDBItem<?>> batch,
FailedBatch failedBatch) {
logger.debug("Table was not found. Trying to create table and try saving again");
if (createTable(mapper, batch.peek().getClass())) {
logger.debug("Table creation successful, trying to save again");
if (!failedBatch.getUnprocessedItems().isEmpty()) {
ExponentialBackoffRetry retry = new ExponentialBackoffRetry(failedBatch.getUnprocessedItems());
retry.run();
if (retry.getUnprocessedItems().isEmpty()) {
logger.debug("Successfully saved items after table creation");
}
}
} else {
logger.warn("Table creation failed. Not storing some parts of batch: {}. Unprocessed items: {}", batch,
failedBatch.getUnprocessedItems());
}
}
@Override
public Iterable<HistoricItem> query(FilterCriteria filter) {
logger.debug("got a query");
if (!isProperlyConfigured) {
logger.debug("Configuration for dynamodb not yet loaded or broken. Not storing item.");
return Collections.emptyList();
}
if (!ensureClient()) {
logger.warn("DynamoDB not connected. Not storing item.");
return Collections.emptyList();
}
String itemName = filter.getItemName();
Item item = getItemFromRegistry(itemName);
if (item == null) {
logger.warn("Could not get item {} from registry!", itemName);
return Collections.emptyList();
}
Class<DynamoDBItem<?>> dtoClass = AbstractDynamoDBItem.getDynamoItemClass(item.getClass());
String tableName = tableNameResolver.fromClass(dtoClass);
DynamoDBMapper mapper = getDBMapper(tableName);
logger.debug("item {} (class {}) will be tried to query using dto class {} from table {}", itemName,
item.getClass(), dtoClass, tableName);
List<HistoricItem> historicItems = new ArrayList<>();
DynamoDBQueryExpression<DynamoDBItem<?>> queryExpression = DynamoDBQueryUtils.createQueryExpression(dtoClass,
filter);
@SuppressWarnings("rawtypes")
final PaginatedQueryList<? extends DynamoDBItem> paginatedList;
try {
paginatedList = mapper.query(dtoClass, queryExpression);
} catch (AmazonServiceException e) {
logger.error(
"DynamoDB query raised unexpected exception: {}. Returning empty collection. "
+ "Status code 400 (resource not found) might occur if table was just created.",
e.getMessage());
return Collections.emptyList();
}
for (int itemIndexOnPage = 0; itemIndexOnPage < filter.getPageSize(); itemIndexOnPage++) {
int itemIndex = filter.getPageNumber() * filter.getPageSize() + itemIndexOnPage;
DynamoDBItem<?> dynamoItem;
try {
dynamoItem = paginatedList.get(itemIndex);
} catch (IndexOutOfBoundsException e) {
logger.debug("Index {} is out-of-bounds", itemIndex);
break;
}
if (dynamoItem != null) {
HistoricItem historicItem = dynamoItem.asHistoricItem(item);
logger.trace("Dynamo item {} converted to historic item: {}", item, historicItem);
historicItems.add(historicItem);
}
}
return historicItems;
}
/**
* Retrieves the item for the given name from the item registry
*
* @param itemName
* @return item with the given name, or null if no such item exists in item registry.
*/
private @Nullable Item getItemFromRegistry(String itemName) {
Item item = null;
try {
if (itemRegistry != null) {
item = itemRegistry.getItem(itemName);
}
} catch (ItemNotFoundException e1) {
logger.error("Unable to get item {} from registry", itemName);
}
return item;
}
@Override
public List<PersistenceStrategy> getDefaultStrategies() {
return List.of(PersistenceStrategy.Globals.RESTORE, PersistenceStrategy.Globals.CHANGE);
}
}

View File

@@ -0,0 +1,148 @@
/**
* Copyright (c) 2010-2020 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.persistence.dynamodb.internal;
import java.time.ZonedDateTime;
import java.util.Collections;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.openhab.core.persistence.FilterCriteria;
import org.openhab.core.persistence.FilterCriteria.Operator;
import org.openhab.core.persistence.FilterCriteria.Ordering;
import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBQueryExpression;
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
import com.amazonaws.services.dynamodbv2.model.ComparisonOperator;
import com.amazonaws.services.dynamodbv2.model.Condition;
/**
* Utility class
*
* @author Sami Salonen - Initial contribution
*/
@NonNullByDefault
public class DynamoDBQueryUtils {
/**
* Construct dynamodb query from filter
*
* @param filter
* @return DynamoDBQueryExpression corresponding to the given FilterCriteria
*/
public static DynamoDBQueryExpression<DynamoDBItem<?>> createQueryExpression(
Class<? extends DynamoDBItem<?>> dtoClass, FilterCriteria filter) {
DynamoDBItem<?> item = getDynamoDBHashKey(dtoClass, filter.getItemName());
final DynamoDBQueryExpression<DynamoDBItem<?>> queryExpression = new DynamoDBQueryExpression<DynamoDBItem<?>>()
.withHashKeyValues(item).withScanIndexForward(filter.getOrdering() == Ordering.ASCENDING)
.withLimit(filter.getPageSize());
maybeAddTimeFilter(queryExpression, filter);
maybeAddStateFilter(filter, queryExpression);
return queryExpression;
}
private static DynamoDBItem<?> getDynamoDBHashKey(Class<? extends DynamoDBItem<?>> dtoClass, String itemName) {
DynamoDBItem<?> item;
try {
item = dtoClass.newInstance();
} catch (InstantiationException e) {
throw new RuntimeException(e);
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
}
item.setName(itemName);
return item;
}
private static void maybeAddStateFilter(FilterCriteria filter,
final DynamoDBQueryExpression<DynamoDBItem<?>> queryExpression) {
if (filter.getOperator() != null && filter.getState() != null) {
// Convert filter's state to DynamoDBItem in order get suitable string representation for the state
final DynamoDBItem<?> filterState = AbstractDynamoDBItem.fromState(filter.getItemName(), filter.getState(),
ZonedDateTime.now());
queryExpression.setFilterExpression(String.format("%s %s :opstate", DynamoDBItem.ATTRIBUTE_NAME_ITEMSTATE,
operatorAsString(filter.getOperator())));
filterState.accept(new DynamoDBItemVisitor() {
@Override
public void visit(DynamoDBStringItem dynamoStringItem) {
queryExpression.setExpressionAttributeValues(Collections.singletonMap(":opstate",
new AttributeValue().withS(dynamoStringItem.getState())));
}
@Override
public void visit(DynamoDBBigDecimalItem dynamoBigDecimalItem) {
queryExpression.setExpressionAttributeValues(Collections.singletonMap(":opstate",
new AttributeValue().withN(dynamoBigDecimalItem.getState().toPlainString())));
}
});
}
}
private static @Nullable Condition maybeAddTimeFilter(
final DynamoDBQueryExpression<DynamoDBItem<?>> queryExpression, final FilterCriteria filter) {
final Condition timeCondition = constructTimeCondition(filter);
if (timeCondition != null) {
queryExpression.setRangeKeyConditions(
Collections.singletonMap(DynamoDBItem.ATTRIBUTE_NAME_TIMEUTC, timeCondition));
}
return timeCondition;
}
private static @Nullable Condition constructTimeCondition(FilterCriteria filter) {
boolean hasBegin = filter.getBeginDate() != null;
boolean hasEnd = filter.getEndDate() != null;
final Condition timeCondition;
if (!hasBegin && !hasEnd) {
timeCondition = null;
} else if (hasBegin && !hasEnd) {
timeCondition = new Condition().withComparisonOperator(ComparisonOperator.GE).withAttributeValueList(
new AttributeValue().withS(filter.getBeginDate().format(AbstractDynamoDBItem.DATEFORMATTER)));
} else if (!hasBegin && hasEnd) {
timeCondition = new Condition().withComparisonOperator(ComparisonOperator.LE).withAttributeValueList(
new AttributeValue().withS(filter.getEndDate().format(AbstractDynamoDBItem.DATEFORMATTER)));
} else {
timeCondition = new Condition().withComparisonOperator(ComparisonOperator.BETWEEN).withAttributeValueList(
new AttributeValue().withS(filter.getBeginDate().format(AbstractDynamoDBItem.DATEFORMATTER)),
new AttributeValue().withS(filter.getEndDate().format(AbstractDynamoDBItem.DATEFORMATTER)));
}
return timeCondition;
}
/**
* Convert op to string suitable for dynamodb filter expression
*
* @param op
* @return string representation corresponding to the given the Operator
*/
private static String operatorAsString(Operator op) {
switch (op) {
case EQ:
return "=";
case NEQ:
return "<>";
case LT:
return "<";
case LTE:
return "<=";
case GT:
return ">";
case GTE:
return ">=";
default:
throw new IllegalStateException("Unknown operator " + op);
}
}
}

View File

@@ -0,0 +1,75 @@
/**
* Copyright (c) 2010-2020 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.persistence.dynamodb.internal;
import java.time.ZonedDateTime;
import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBAttribute;
import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBDocument;
import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBHashKey;
import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBRangeKey;
/**
* DynamoDBItem for items that can be serialized as DynamoDB string
*
* @author Sami Salonen - Initial contribution
*/
@DynamoDBDocument
public class DynamoDBStringItem extends AbstractDynamoDBItem<String> {
public DynamoDBStringItem() {
this(null, null, null);
}
public DynamoDBStringItem(String name, String state, ZonedDateTime time) {
super(name, state, time);
}
@DynamoDBAttribute(attributeName = DynamoDBItem.ATTRIBUTE_NAME_ITEMSTATE)
@Override
public String getState() {
return state;
}
@DynamoDBHashKey(attributeName = DynamoDBItem.ATTRIBUTE_NAME_ITEMNAME)
@Override
public String getName() {
return name;
}
@Override
@DynamoDBRangeKey(attributeName = ATTRIBUTE_NAME_TIMEUTC)
public ZonedDateTime getTime() {
return time;
}
@Override
public void accept(org.openhab.persistence.dynamodb.internal.DynamoDBItemVisitor visitor) {
visitor.visit(this);
}
@Override
public void setName(String name) {
this.name = name;
}
@Override
public void setState(String state) {
this.state = state;
}
@Override
public void setTime(ZonedDateTime time) {
this.time = time;
}
}

View File

@@ -0,0 +1,65 @@
/**
* Copyright (c) 2010-2020 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.persistence.dynamodb.internal;
/**
* The DynamoDBTableNameResolver resolves DynamoDB table name for a given item.
*
* @author Sami Salonen - Initial contribution
*
*/
public class DynamoDBTableNameResolver {
private final String tablePrefix;
public DynamoDBTableNameResolver(String tablePrefix) {
this.tablePrefix = tablePrefix;
}
public String fromItem(DynamoDBItem<?> item) {
final String[] tableName = new String[1];
// Use the visitor pattern to deduce the table name
item.accept(new DynamoDBItemVisitor() {
@Override
public void visit(DynamoDBBigDecimalItem dynamoBigDecimalItem) {
tableName[0] = tablePrefix + "bigdecimal";
}
@Override
public void visit(DynamoDBStringItem dynamoStringItem) {
tableName[0] = tablePrefix + "string";
}
});
return tableName[0];
}
/**
* Construct DynamoDBTableNameResolver corresponding to DynamoDBItem class
*
* @param clazz
* @return
*/
public String fromClass(Class<? extends DynamoDBItem<?>> clazz) {
DynamoDBItem<?> dummy;
try {
// Construct new instance of this class (assuming presense no-argument constructor)
// in order to re-use fromItem(DynamoDBItem) constructor
dummy = clazz.getConstructor().newInstance();
} catch (Exception e) {
throw new IllegalStateException(String.format("Could not find suitable constructor for class %s", clazz));
}
return this.fromItem(dummy);
}
}

View File

@@ -0,0 +1,118 @@
<?xml version="1.0" encoding="UTF-8"?>
<config-description:config-descriptions
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:config-description="https://openhab.org/schemas/config-description/v1.0.0"
xsi:schemaLocation="https://openhab.org/schemas/config-description/v1.0.0
https://openhab.org/schemas/config-description-1.0.0.xsd">
<config-description uri="persistence:dynamodb">
<!--
############################ Amazon DynamoDB Persistence Service ##################################
#
# The following parameters are used to configure Amazon DynamoDB Persistence.
#
# Further details at https://docs.openhab.org/addons/persistence/dynamodb/readme.html
#
#
# CONNECTION SETTINGS (follow OPTION 1 or OPTION 2)
#
# OPTION 1 (using accessKey and secretKey)
#accessKey=AKIAIOSFODNN7EXAMPLE
#secretKey=3+AAAAABBBbbbCCCCCCdddddd+7mnbIOLH
#region=eu-west-1
# OPTION 2 (using profilesConfigFile and profile)
# where profilesConfigFile points to AWS credentials file
#profilesConfigFile=/etc/openhab2/aws_creds
#profile=fooprofile
#region=eu-west-1
# Credentials file example:
#
# [fooprofile]
# aws_access_key_id=AKIAIOSFODNN7EXAMPLE
# aws_secret_access_key=3+AAAAABBBbbbCCCCCCdddddd+7mnbIOLH
#
# ADVANCED CONFIGURATION (OPTIONAL)
#
# read capacity for the created tables
#readCapacityUnits=1
# write capacity for the created tables
#writeCapacityUnits=1
# table prefix used in the name of created tables
#tablePrefix=openhab-
-->
<parameter name="region" type="text" required="true">
<label>AWS region ID</label>
<description><![CDATA[AWS region ID as described in Step 2 in Setting up Amazon account.<br />
The region needs to match the region of the AWS user that will access Amazon DynamoDB.<br />
For example, eu-west-1.]]></description>
</parameter>
<parameter name="accessKey" type="text" required="false">
<label>AWS access key</label>
<description><![CDATA[AWS access key of the AWS user that will access Amazon DynamoDB.
<br />
Give either 1) access key and secret key, or 2) credentials file and profile name.
]]></description>
</parameter>
<parameter name="secretKey" type="text" required="false">
<label>AWS secret key</label>
<description><![CDATA[AWS secret key of the AWS user that will access Amazon DynamoDB.
<br />
Give either 1) access key and secret key, or 2) credentials file and profile name.
]]></description>
</parameter>
<parameter name="profilesConfigFile" type="text" required="false">
<label>AWS credentials file</label>
<description><![CDATA[Path to the AWS credentials file. <br />
For example, /etc/openhab2/aws_creds.
Please note that the user that runs openHAB must have approriate read rights to the credential file.
<br />
Give either 1) access key and secret key, or 2) credentials file and profile name.
]]></description>
</parameter>
<parameter name="profile" type="text" required="false">
<label>Profile name</label>
<description><![CDATA[Name of the profile to use in AWS credentials file
<br />
Give either 1) access key and secret key, or 2) credentials file and profile name.
]]></description>
</parameter>
<parameter name="readCapacityUnits" type="integer" required="false" min="1">
<description>Read capacity for the created tables. Default is 1.</description>
<label>Read capacity</label>
<advanced>true</advanced>
</parameter>
<parameter name="writeCapacityUnits" type="integer" required="false" min="1">
<label>Write capacity</label>
<description>Write capacity for the created tables. Default is 1.</description>
<advanced>true</advanced>
</parameter>
<parameter name="tablePrefix" type="text" required="false">
<label>Table prefix</label>
<description>Table prefix used in the name of created tables. Default is openhab-</description>
<advanced>true</advanced>
</parameter>
</config-description>
</config-description:config-descriptions>