[http] Add rate-limiting to channel refresh (#9509)

* add rate limiter for requests and catch transformation exception
* address review comment
* address review comments

Signed-off-by: Jan N. Klug <jan.n.klug@rub.de>
This commit is contained in:
J-N-K 2020-12-28 18:18:06 +01:00 committed by GitHub
parent 06690bd37e
commit c556f49691
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 222 additions and 55 deletions

View File

@ -15,6 +15,7 @@ It can be extended with different channels.
| `refresh` | no | 30 | Time in seconds between two refresh calls for the channels of this thing. | | `refresh` | no | 30 | Time in seconds between two refresh calls for the channels of this thing. |
| `timeout` | no | 3000 | Timeout for HTTP requests in ms. | | `timeout` | no | 3000 | Timeout for HTTP requests in ms. |
| `bufferSize` | no | 2048 | The buffer size for the response data (in kB). | | `bufferSize` | no | 2048 | The buffer size for the response data (in kB). |
| `delay` | no | 0 | Delay between two requests in ms (advanced parameter). |
| `username` | yes | - | Username for authentication (advanced parameter). | | `username` | yes | - | Username for authentication (advanced parameter). |
| `password` | yes | - | Password for authentication (advanced parameter). | | `password` | yes | - | Password for authentication (advanced parameter). |
| `authMode` | no | BASIC | Authentication mode, `BASIC` or `DIGEST` (advanced parameter). | | `authMode` | no | BASIC | Authentication mode, `BASIC` or `DIGEST` (advanced parameter). |
@ -26,6 +27,8 @@ It can be extended with different channels.
*Note:* optional "no" means that you have to configure a value unless a default is provided and you are ok with that setting. *Note:* optional "no" means that you have to configure a value unless a default is provided and you are ok with that setting.
*Note:* If you rate-limit requests by using the `delay` parameter you have to make sure that the time between two refreshes is larger than the time needed for one refresh cycle.
## Channels ## Channels
Each item type has its own channel-type. Each item type has its own channel-type.

View File

@ -101,7 +101,7 @@ public class HttpHandlerFactory extends BaseThingHandlerFactory
@Override @Override
public ValueTransformation getValueTransformation(@Nullable String pattern) { public ValueTransformation getValueTransformation(@Nullable String pattern) {
if (pattern == null) { if (pattern == null || pattern.isEmpty()) {
return NoOpValueTransformation.getInstance(); return NoOpValueTransformation.getInstance();
} }
return new CascadedValueTransformationImpl(pattern, return new CascadedValueTransformationImpl(pattern,

View File

@ -44,10 +44,7 @@ import org.openhab.binding.http.internal.converter.ImageItemConverter;
import org.openhab.binding.http.internal.converter.ItemValueConverter; import org.openhab.binding.http.internal.converter.ItemValueConverter;
import org.openhab.binding.http.internal.converter.PlayerItemConverter; import org.openhab.binding.http.internal.converter.PlayerItemConverter;
import org.openhab.binding.http.internal.converter.RollershutterItemConverter; import org.openhab.binding.http.internal.converter.RollershutterItemConverter;
import org.openhab.binding.http.internal.http.Content; import org.openhab.binding.http.internal.http.*;
import org.openhab.binding.http.internal.http.HttpAuthException;
import org.openhab.binding.http.internal.http.HttpResponseListener;
import org.openhab.binding.http.internal.http.RefreshingUrlCache;
import org.openhab.binding.http.internal.transform.ValueTransformationProvider; import org.openhab.binding.http.internal.transform.ValueTransformationProvider;
import org.openhab.core.library.types.DateTimeType; import org.openhab.core.library.types.DateTimeType;
import org.openhab.core.library.types.DecimalType; import org.openhab.core.library.types.DecimalType;
@ -81,6 +78,7 @@ public class HttpThingHandler extends BaseThingHandler {
private final ValueTransformationProvider valueTransformationProvider; private final ValueTransformationProvider valueTransformationProvider;
private final HttpClientProvider httpClientProvider; private final HttpClientProvider httpClientProvider;
private HttpClient httpClient; private HttpClient httpClient;
private RateLimitedHttpClient rateLimitedHttpClient;
private final HttpDynamicStateDescriptionProvider httpDynamicStateDescriptionProvider; private final HttpDynamicStateDescriptionProvider httpDynamicStateDescriptionProvider;
private HttpThingConfig config = new HttpThingConfig(); private HttpThingConfig config = new HttpThingConfig();
@ -95,6 +93,7 @@ public class HttpThingHandler extends BaseThingHandler {
super(thing); super(thing);
this.httpClientProvider = httpClientProvider; this.httpClientProvider = httpClientProvider;
this.httpClient = httpClientProvider.getSecureClient(); this.httpClient = httpClientProvider.getSecureClient();
this.rateLimitedHttpClient = new RateLimitedHttpClient(httpClient, scheduler);
this.valueTransformationProvider = valueTransformationProvider; this.valueTransformationProvider = valueTransformationProvider;
this.httpDynamicStateDescriptionProvider = httpDynamicStateDescriptionProvider; this.httpDynamicStateDescriptionProvider = httpDynamicStateDescriptionProvider;
} }
@ -139,6 +138,26 @@ public class HttpThingHandler extends BaseThingHandler {
"Parameter baseURL must not be empty!"); "Parameter baseURL must not be empty!");
return; return;
} }
if (config.ignoreSSLErrors) {
logger.info("Using the insecure client for thing '{}'.", thing.getUID());
httpClient = httpClientProvider.getInsecureClient();
} else {
logger.info("Using the secure client for thing '{}'.", thing.getUID());
httpClient = httpClientProvider.getSecureClient();
}
rateLimitedHttpClient.setHttpClient(httpClient);
rateLimitedHttpClient.setDelay(config.delay);
int channelCount = thing.getChannels().size();
if (channelCount * config.delay > config.refresh * 1000) {
// this should prevent the rate limit queue from filling up
config.refresh = (channelCount * config.delay) / 1000 + 1;
logger.warn(
"{} channels in thing {} with a delay of {} incompatible with the configured refresh time. Refresh-Time increased to the minimum of {}",
channelCount, thing.getUID(), config.delay, config.refresh);
}
authentication = null; authentication = null;
if (!config.username.isEmpty()) { if (!config.username.isEmpty()) {
try { try {
@ -170,14 +189,6 @@ public class HttpThingHandler extends BaseThingHandler {
logger.debug("No authentication configured for thing '{}'", thing.getUID()); logger.debug("No authentication configured for thing '{}'", thing.getUID());
} }
if (config.ignoreSSLErrors) {
logger.info("Using the insecure client for thing '{}'.", thing.getUID());
httpClient = httpClientProvider.getInsecureClient();
} else {
logger.info("Using the secure client for thing '{}'.", thing.getUID());
httpClient = httpClientProvider.getSecureClient();
}
thing.getChannels().forEach(this::createChannel); thing.getChannels().forEach(this::createChannel);
updateStatus(ThingStatus.ONLINE); updateStatus(ThingStatus.ONLINE);
@ -187,6 +198,7 @@ public class HttpThingHandler extends BaseThingHandler {
public void dispose() { public void dispose() {
// stop update tasks // stop update tasks
urlHandlers.values().forEach(RefreshingUrlCache::stop); urlHandlers.values().forEach(RefreshingUrlCache::stop);
rateLimitedHttpClient.shutdown();
// clear lists // clear lists
urlHandlers.clear(); urlHandlers.clear();
@ -266,7 +278,9 @@ public class HttpThingHandler extends BaseThingHandler {
channels.put(channelUID, itemValueConverter); channels.put(channelUID, itemValueConverter);
if (channelConfig.mode != HttpChannelMode.WRITEONLY) { if (channelConfig.mode != HttpChannelMode.WRITEONLY) {
channelUrls.put(channelUID, stateUrl); channelUrls.put(channelUID, stateUrl);
urlHandlers.computeIfAbsent(stateUrl, url -> new RefreshingUrlCache(scheduler, httpClient, url, config)) urlHandlers
.computeIfAbsent(stateUrl,
url -> new RefreshingUrlCache(scheduler, rateLimitedHttpClient, url, config))
.addConsumer(itemValueConverter::process); .addConsumer(itemValueConverter::process);
} }

View File

@ -29,6 +29,7 @@ public class HttpThingConfig {
public String baseURL = ""; public String baseURL = "";
public int refresh = 30; public int refresh = 30;
public int timeout = 3000; public int timeout = 3000;
public int delay = 0;
public String username = ""; public String username = "";
public String password = ""; public String password = "";

View File

@ -0,0 +1,129 @@
/**
* Copyright (c) 2010-2020 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.binding.http.internal.http;
import java.net.URI;
import java.util.concurrent.*;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.AuthenticationStore;
import org.eclipse.jetty.client.api.Request;
/**
* The {@link RateLimitedHttpClient} is a wrapper for a Jetty HTTP client that limits the number of requests by delaying
* the request creation
*
* @author Jan N. Klug - Initial contribution
*/
@NonNullByDefault
public class RateLimitedHttpClient {
private static final int MAX_QUEUE_SIZE = 1000; // maximum queue size
private HttpClient httpClient;
private int delay = 0; // in ms
private final ScheduledExecutorService scheduler;
private final LinkedBlockingQueue<RequestQueueEntry> requestQueue = new LinkedBlockingQueue<>(MAX_QUEUE_SIZE);
private @Nullable ScheduledFuture<?> processJob;
public RateLimitedHttpClient(HttpClient httpClient, ScheduledExecutorService scheduler) {
this.httpClient = httpClient;
this.scheduler = scheduler;
}
/**
* Stop processing the queue and clear it
*/
public void shutdown() {
stopProcessJob();
requestQueue.forEach(queueEntry -> queueEntry.future.completeExceptionally(new CancellationException()));
}
/**
* Set a new delay
*
* @param delay in ms between to requests
*/
public void setDelay(int delay) {
if (delay < 0) {
throw new IllegalArgumentException("Delay needs to be larger or equal to zero");
}
this.delay = delay;
stopProcessJob();
if (delay != 0) {
processJob = scheduler.scheduleWithFixedDelay(this::processQueue, 0, delay, TimeUnit.MILLISECONDS);
}
}
/**
* Set the HTTP client
*
* @param httpClient secure or insecure Jetty http client
*/
public void setHttpClient(HttpClient httpClient) {
this.httpClient = httpClient;
}
/**
* Create a new request to the given URL respecting rate-limits
*
* @param finalUrl the request URL
* @return a CompletableFuture that completes with the request
*/
public CompletableFuture<Request> newRequest(URI finalUrl) {
// if no delay is set, return a completed CompletableFuture
if (delay == 0) {
return CompletableFuture.completedFuture(httpClient.newRequest(finalUrl));
}
CompletableFuture<Request> future = new CompletableFuture<>();
if (!requestQueue.offer(new RequestQueueEntry(finalUrl, future))) {
future.completeExceptionally(new RejectedExecutionException("Maximum queue size exceeded."));
}
return future;
}
/**
* Get the AuthenticationStore from the wrapped client
*
* @return
*/
public AuthenticationStore getAuthenticationStore() {
return httpClient.getAuthenticationStore();
}
private void stopProcessJob() {
ScheduledFuture<?> processJob = this.processJob;
if (processJob != null) {
processJob.cancel(false);
this.processJob = null;
}
}
private void processQueue() {
RequestQueueEntry queueEntry = requestQueue.poll();
if (queueEntry != null) {
queueEntry.future.complete(httpClient.newRequest(queueEntry.finalUrl));
}
}
private static class RequestQueueEntry {
public URI finalUrl;
public CompletableFuture<Request> future;
public RequestQueueEntry(URI finalUrl, CompletableFuture<Request> future) {
this.finalUrl = finalUrl;
this.future = future;
}
}
}

View File

@ -18,19 +18,13 @@ import java.util.Date;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer; import java.util.function.Consumer;
import org.eclipse.jdt.annotation.NonNullByDefault; import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable; import org.eclipse.jdt.annotation.Nullable;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.Authentication; import org.eclipse.jetty.client.api.Authentication;
import org.eclipse.jetty.client.api.AuthenticationStore; import org.eclipse.jetty.client.api.AuthenticationStore;
import org.eclipse.jetty.client.api.Request;
import org.openhab.binding.http.internal.Util; import org.openhab.binding.http.internal.Util;
import org.openhab.binding.http.internal.config.HttpThingConfig; import org.openhab.binding.http.internal.config.HttpThingConfig;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -47,7 +41,7 @@ public class RefreshingUrlCache {
private final Logger logger = LoggerFactory.getLogger(RefreshingUrlCache.class); private final Logger logger = LoggerFactory.getLogger(RefreshingUrlCache.class);
private final String url; private final String url;
private final HttpClient httpClient; private final RateLimitedHttpClient httpClient;
private final int timeout; private final int timeout;
private final int bufferSize; private final int bufferSize;
private final @Nullable String fallbackEncoding; private final @Nullable String fallbackEncoding;
@ -57,7 +51,7 @@ public class RefreshingUrlCache {
private final ScheduledFuture<?> future; private final ScheduledFuture<?> future;
private @Nullable Content lastContent; private @Nullable Content lastContent;
public RefreshingUrlCache(ScheduledExecutorService executor, HttpClient httpClient, String url, public RefreshingUrlCache(ScheduledExecutorService executor, RateLimitedHttpClient httpClient, String url,
HttpThingConfig thingConfig) { HttpThingConfig thingConfig) {
this.httpClient = httpClient; this.httpClient = httpClient;
this.url = url; this.url = url;
@ -85,43 +79,53 @@ public class RefreshingUrlCache {
URI finalUrl = new URI(String.format(this.url, new Date())); URI finalUrl = new URI(String.format(this.url, new Date()));
logger.trace("Requesting refresh (retry={}) from '{}' with timeout {}ms", isRetry, finalUrl, timeout); logger.trace("Requesting refresh (retry={}) from '{}' with timeout {}ms", isRetry, finalUrl, timeout);
Request request = httpClient.newRequest(finalUrl).timeout(timeout, TimeUnit.MILLISECONDS);
headers.forEach(header -> { httpClient.newRequest(finalUrl).thenAccept(request -> {
String[] keyValuePair = header.split("=", 2); request.timeout(timeout, TimeUnit.MILLISECONDS);
if (keyValuePair.length == 2) {
request.header(keyValuePair[0].trim(), keyValuePair[1].trim());
} else {
logger.warn("Splitting header '{}' failed. No '=' was found. Ignoring", header);
}
});
CompletableFuture<@Nullable Content> response = new CompletableFuture<>(); headers.forEach(header -> {
response.exceptionally(e -> { String[] keyValuePair = header.split("=", 2);
if (e instanceof HttpAuthException) { if (keyValuePair.length == 2) {
if (isRetry) { request.header(keyValuePair[0].trim(), keyValuePair[1].trim());
logger.warn("Retry after authentication failure failed again for '{}', failing here",
finalUrl);
} else { } else {
AuthenticationStore authStore = httpClient.getAuthenticationStore(); logger.warn("Splitting header '{}' failed. No '=' was found. Ignoring", header);
Authentication.Result authResult = authStore.findAuthenticationResult(finalUrl); }
if (authResult != null) { });
authStore.removeAuthenticationResult(authResult);
logger.debug("Cleared authentication result for '{}', retrying immediately", finalUrl); CompletableFuture<@Nullable Content> response = new CompletableFuture<>();
refresh(true); response.exceptionally(e -> {
if (e instanceof HttpAuthException) {
if (isRetry) {
logger.warn("Retry after authentication failure failed again for '{}', failing here",
finalUrl);
} else { } else {
logger.warn("Could not find authentication result for '{}', failing here", finalUrl); AuthenticationStore authStore = httpClient.getAuthenticationStore();
Authentication.Result authResult = authStore.findAuthenticationResult(finalUrl);
if (authResult != null) {
authStore.removeAuthenticationResult(authResult);
logger.debug("Cleared authentication result for '{}', retrying immediately", finalUrl);
refresh(true);
} else {
logger.warn("Could not find authentication result for '{}', failing here", finalUrl);
}
} }
} }
return null;
}).thenAccept(this::processResult);
if (logger.isTraceEnabled()) {
logger.trace("Sending to '{}': {}", finalUrl, Util.requestToLogString(request));
}
request.send(new HttpResponseListener(response, fallbackEncoding, bufferSize));
}).exceptionally(e -> {
if (e instanceof CancellationException) {
logger.debug("Request to URL {} was cancelled by thing handler.", finalUrl);
} else {
logger.warn("Request to URL {} failed: {}", finalUrl, e.getMessage());
} }
return null; return null;
}).thenAccept(this::processResult); });
if (logger.isTraceEnabled()) {
logger.trace("Sending to '{}': {}", finalUrl, Util.requestToLogString(request));
}
request.send(new HttpResponseListener(response, fallbackEncoding, bufferSize));
} catch (IllegalArgumentException | URISyntaxException e) { } catch (IllegalArgumentException | URISyntaxException e) {
logger.warn("Creating request for '{}' failed: {}", url, e.getMessage()); logger.warn("Creating request for '{}' failed: {}", url, e.getMessage());
} }

View File

@ -21,6 +21,8 @@ import java.util.stream.Collectors;
import org.eclipse.jdt.annotation.NonNullByDefault; import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable; import org.eclipse.jdt.annotation.Nullable;
import org.openhab.core.transform.TransformationService; import org.openhab.core.transform.TransformationService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* The {@link CascadedValueTransformationImpl} implements {@link ValueTransformation for a cascaded set of * The {@link CascadedValueTransformationImpl} implements {@link ValueTransformation for a cascaded set of
@ -30,13 +32,21 @@ import org.openhab.core.transform.TransformationService;
*/ */
@NonNullByDefault @NonNullByDefault
public class CascadedValueTransformationImpl implements ValueTransformation { public class CascadedValueTransformationImpl implements ValueTransformation {
private final Logger logger = LoggerFactory.getLogger(CascadedValueTransformationImpl.class);
private final List<ValueTransformation> transformations; private final List<ValueTransformation> transformations;
public CascadedValueTransformationImpl(String transformationString, public CascadedValueTransformationImpl(String transformationString,
Function<String, @Nullable TransformationService> transformationServiceSupplier) { Function<String, @Nullable TransformationService> transformationServiceSupplier) {
transformations = Arrays.stream(transformationString.split("")).filter(s -> !s.isEmpty()) List<ValueTransformation> transformations;
.map(transformation -> new SingleValueTransformation(transformation, transformationServiceSupplier)) try {
.collect(Collectors.toList()); transformations = Arrays.stream(transformationString.split("")).filter(s -> !s.isEmpty())
.map(transformation -> new SingleValueTransformation(transformation, transformationServiceSupplier))
.collect(Collectors.toList());
} catch (IllegalArgumentException e) {
transformations = List.of(NoOpValueTransformation.getInstance());
logger.warn("Transformation ignore, failed to parse {}: {}", transformationString, e.getMessage());
}
this.transformations = transformations;
} }
@Override @Override

View File

@ -25,6 +25,12 @@
<description>The timeout in ms for each request</description> <description>The timeout in ms for each request</description>
<default>3000</default> <default>3000</default>
</parameter> </parameter>
<parameter name="delay" type="integer" unit="ms" min="0">
<label>Delay</label>
<description>Delay between to requests</description>
<default>0</default>
<advanced>true</advanced>
</parameter>
<parameter name="bufferSize" type="integer" min="0"> <parameter name="bufferSize" type="integer" min="0">
<label>Buffer Size</label> <label>Buffer Size</label>
<description>Size of the response buffer (default 2048 kB)</description> <description>Size of the response buffer (default 2048 kB)</description>