diff --git a/bundles/org.openhab.binding.http/README.md b/bundles/org.openhab.binding.http/README.md index 74b5084d4..44dce4aad 100644 --- a/bundles/org.openhab.binding.http/README.md +++ b/bundles/org.openhab.binding.http/README.md @@ -15,6 +15,7 @@ It can be extended with different channels. | `refresh` | no | 30 | Time in seconds between two refresh calls for the channels of this thing. | | `timeout` | no | 3000 | Timeout for HTTP requests in ms. | | `bufferSize` | no | 2048 | The buffer size for the response data (in kB). | +| `delay` | no | 0 | Delay between two requests in ms (advanced parameter). | | `username` | yes | - | Username for authentication (advanced parameter). | | `password` | yes | - | Password for authentication (advanced parameter). | | `authMode` | no | BASIC | Authentication mode, `BASIC` or `DIGEST` (advanced parameter). | @@ -26,6 +27,8 @@ It can be extended with different channels. *Note:* optional "no" means that you have to configure a value unless a default is provided and you are ok with that setting. +*Note:* If you rate-limit requests by using the `delay` parameter you have to make sure that the time between two refreshes is larger than the time needed for one refresh cycle. + ## Channels Each item type has its own channel-type. diff --git a/bundles/org.openhab.binding.http/src/main/java/org/openhab/binding/http/internal/HttpHandlerFactory.java b/bundles/org.openhab.binding.http/src/main/java/org/openhab/binding/http/internal/HttpHandlerFactory.java index 7b1444156..ae153ca51 100644 --- a/bundles/org.openhab.binding.http/src/main/java/org/openhab/binding/http/internal/HttpHandlerFactory.java +++ b/bundles/org.openhab.binding.http/src/main/java/org/openhab/binding/http/internal/HttpHandlerFactory.java @@ -101,7 +101,7 @@ public class HttpHandlerFactory extends BaseThingHandlerFactory @Override public ValueTransformation getValueTransformation(@Nullable String pattern) { - if (pattern == null) { + if (pattern == null || pattern.isEmpty()) { return NoOpValueTransformation.getInstance(); } return new CascadedValueTransformationImpl(pattern, diff --git a/bundles/org.openhab.binding.http/src/main/java/org/openhab/binding/http/internal/HttpThingHandler.java b/bundles/org.openhab.binding.http/src/main/java/org/openhab/binding/http/internal/HttpThingHandler.java index f7ce74a0f..343145a92 100644 --- a/bundles/org.openhab.binding.http/src/main/java/org/openhab/binding/http/internal/HttpThingHandler.java +++ b/bundles/org.openhab.binding.http/src/main/java/org/openhab/binding/http/internal/HttpThingHandler.java @@ -44,10 +44,7 @@ import org.openhab.binding.http.internal.converter.ImageItemConverter; import org.openhab.binding.http.internal.converter.ItemValueConverter; import org.openhab.binding.http.internal.converter.PlayerItemConverter; import org.openhab.binding.http.internal.converter.RollershutterItemConverter; -import org.openhab.binding.http.internal.http.Content; -import org.openhab.binding.http.internal.http.HttpAuthException; -import org.openhab.binding.http.internal.http.HttpResponseListener; -import org.openhab.binding.http.internal.http.RefreshingUrlCache; +import org.openhab.binding.http.internal.http.*; import org.openhab.binding.http.internal.transform.ValueTransformationProvider; import org.openhab.core.library.types.DateTimeType; import org.openhab.core.library.types.DecimalType; @@ -81,6 +78,7 @@ public class HttpThingHandler extends BaseThingHandler { private final ValueTransformationProvider valueTransformationProvider; private final HttpClientProvider httpClientProvider; private HttpClient httpClient; + private RateLimitedHttpClient rateLimitedHttpClient; private final HttpDynamicStateDescriptionProvider httpDynamicStateDescriptionProvider; private HttpThingConfig config = new HttpThingConfig(); @@ -95,6 +93,7 @@ public class HttpThingHandler extends BaseThingHandler { super(thing); this.httpClientProvider = httpClientProvider; this.httpClient = httpClientProvider.getSecureClient(); + this.rateLimitedHttpClient = new RateLimitedHttpClient(httpClient, scheduler); this.valueTransformationProvider = valueTransformationProvider; this.httpDynamicStateDescriptionProvider = httpDynamicStateDescriptionProvider; } @@ -139,6 +138,26 @@ public class HttpThingHandler extends BaseThingHandler { "Parameter baseURL must not be empty!"); return; } + + if (config.ignoreSSLErrors) { + logger.info("Using the insecure client for thing '{}'.", thing.getUID()); + httpClient = httpClientProvider.getInsecureClient(); + } else { + logger.info("Using the secure client for thing '{}'.", thing.getUID()); + httpClient = httpClientProvider.getSecureClient(); + } + rateLimitedHttpClient.setHttpClient(httpClient); + rateLimitedHttpClient.setDelay(config.delay); + + int channelCount = thing.getChannels().size(); + if (channelCount * config.delay > config.refresh * 1000) { + // this should prevent the rate limit queue from filling up + config.refresh = (channelCount * config.delay) / 1000 + 1; + logger.warn( + "{} channels in thing {} with a delay of {} incompatible with the configured refresh time. Refresh-Time increased to the minimum of {}", + channelCount, thing.getUID(), config.delay, config.refresh); + } + authentication = null; if (!config.username.isEmpty()) { try { @@ -170,14 +189,6 @@ public class HttpThingHandler extends BaseThingHandler { logger.debug("No authentication configured for thing '{}'", thing.getUID()); } - if (config.ignoreSSLErrors) { - logger.info("Using the insecure client for thing '{}'.", thing.getUID()); - httpClient = httpClientProvider.getInsecureClient(); - } else { - logger.info("Using the secure client for thing '{}'.", thing.getUID()); - httpClient = httpClientProvider.getSecureClient(); - } - thing.getChannels().forEach(this::createChannel); updateStatus(ThingStatus.ONLINE); @@ -187,6 +198,7 @@ public class HttpThingHandler extends BaseThingHandler { public void dispose() { // stop update tasks urlHandlers.values().forEach(RefreshingUrlCache::stop); + rateLimitedHttpClient.shutdown(); // clear lists urlHandlers.clear(); @@ -266,7 +278,9 @@ public class HttpThingHandler extends BaseThingHandler { channels.put(channelUID, itemValueConverter); if (channelConfig.mode != HttpChannelMode.WRITEONLY) { channelUrls.put(channelUID, stateUrl); - urlHandlers.computeIfAbsent(stateUrl, url -> new RefreshingUrlCache(scheduler, httpClient, url, config)) + urlHandlers + .computeIfAbsent(stateUrl, + url -> new RefreshingUrlCache(scheduler, rateLimitedHttpClient, url, config)) .addConsumer(itemValueConverter::process); } diff --git a/bundles/org.openhab.binding.http/src/main/java/org/openhab/binding/http/internal/config/HttpThingConfig.java b/bundles/org.openhab.binding.http/src/main/java/org/openhab/binding/http/internal/config/HttpThingConfig.java index 19aeae84d..258f94466 100644 --- a/bundles/org.openhab.binding.http/src/main/java/org/openhab/binding/http/internal/config/HttpThingConfig.java +++ b/bundles/org.openhab.binding.http/src/main/java/org/openhab/binding/http/internal/config/HttpThingConfig.java @@ -29,6 +29,7 @@ public class HttpThingConfig { public String baseURL = ""; public int refresh = 30; public int timeout = 3000; + public int delay = 0; public String username = ""; public String password = ""; diff --git a/bundles/org.openhab.binding.http/src/main/java/org/openhab/binding/http/internal/http/RateLimitedHttpClient.java b/bundles/org.openhab.binding.http/src/main/java/org/openhab/binding/http/internal/http/RateLimitedHttpClient.java new file mode 100644 index 000000000..d639c04c7 --- /dev/null +++ b/bundles/org.openhab.binding.http/src/main/java/org/openhab/binding/http/internal/http/RateLimitedHttpClient.java @@ -0,0 +1,129 @@ +/** + * Copyright (c) 2010-2020 Contributors to the openHAB project + * + * See the NOTICE file(s) distributed with this work for additional + * information. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.openhab.binding.http.internal.http; + +import java.net.URI; +import java.util.concurrent.*; + +import org.eclipse.jdt.annotation.NonNullByDefault; +import org.eclipse.jdt.annotation.Nullable; +import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.client.api.AuthenticationStore; +import org.eclipse.jetty.client.api.Request; + +/** + * The {@link RateLimitedHttpClient} is a wrapper for a Jetty HTTP client that limits the number of requests by delaying + * the request creation + * + * @author Jan N. Klug - Initial contribution + */ +@NonNullByDefault +public class RateLimitedHttpClient { + private static final int MAX_QUEUE_SIZE = 1000; // maximum queue size + private HttpClient httpClient; + private int delay = 0; // in ms + private final ScheduledExecutorService scheduler; + private final LinkedBlockingQueue requestQueue = new LinkedBlockingQueue<>(MAX_QUEUE_SIZE); + + private @Nullable ScheduledFuture processJob; + + public RateLimitedHttpClient(HttpClient httpClient, ScheduledExecutorService scheduler) { + this.httpClient = httpClient; + this.scheduler = scheduler; + } + + /** + * Stop processing the queue and clear it + */ + public void shutdown() { + stopProcessJob(); + requestQueue.forEach(queueEntry -> queueEntry.future.completeExceptionally(new CancellationException())); + } + + /** + * Set a new delay + * + * @param delay in ms between to requests + */ + public void setDelay(int delay) { + if (delay < 0) { + throw new IllegalArgumentException("Delay needs to be larger or equal to zero"); + } + this.delay = delay; + stopProcessJob(); + if (delay != 0) { + processJob = scheduler.scheduleWithFixedDelay(this::processQueue, 0, delay, TimeUnit.MILLISECONDS); + } + } + + /** + * Set the HTTP client + * + * @param httpClient secure or insecure Jetty http client + */ + public void setHttpClient(HttpClient httpClient) { + this.httpClient = httpClient; + } + + /** + * Create a new request to the given URL respecting rate-limits + * + * @param finalUrl the request URL + * @return a CompletableFuture that completes with the request + */ + public CompletableFuture newRequest(URI finalUrl) { + // if no delay is set, return a completed CompletableFuture + if (delay == 0) { + return CompletableFuture.completedFuture(httpClient.newRequest(finalUrl)); + } + CompletableFuture future = new CompletableFuture<>(); + if (!requestQueue.offer(new RequestQueueEntry(finalUrl, future))) { + future.completeExceptionally(new RejectedExecutionException("Maximum queue size exceeded.")); + } + return future; + } + + /** + * Get the AuthenticationStore from the wrapped client + * + * @return + */ + public AuthenticationStore getAuthenticationStore() { + return httpClient.getAuthenticationStore(); + } + + private void stopProcessJob() { + ScheduledFuture processJob = this.processJob; + if (processJob != null) { + processJob.cancel(false); + this.processJob = null; + } + } + + private void processQueue() { + RequestQueueEntry queueEntry = requestQueue.poll(); + if (queueEntry != null) { + queueEntry.future.complete(httpClient.newRequest(queueEntry.finalUrl)); + } + } + + private static class RequestQueueEntry { + public URI finalUrl; + public CompletableFuture future; + + public RequestQueueEntry(URI finalUrl, CompletableFuture future) { + this.finalUrl = finalUrl; + this.future = future; + } + } +} diff --git a/bundles/org.openhab.binding.http/src/main/java/org/openhab/binding/http/internal/http/RefreshingUrlCache.java b/bundles/org.openhab.binding.http/src/main/java/org/openhab/binding/http/internal/http/RefreshingUrlCache.java index 0a0a5a5de..f0ae9c9ec 100644 --- a/bundles/org.openhab.binding.http/src/main/java/org/openhab/binding/http/internal/http/RefreshingUrlCache.java +++ b/bundles/org.openhab.binding.http/src/main/java/org/openhab/binding/http/internal/http/RefreshingUrlCache.java @@ -18,19 +18,13 @@ import java.util.Date; import java.util.List; import java.util.Optional; import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.function.Consumer; import org.eclipse.jdt.annotation.NonNullByDefault; import org.eclipse.jdt.annotation.Nullable; -import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.api.Authentication; import org.eclipse.jetty.client.api.AuthenticationStore; -import org.eclipse.jetty.client.api.Request; import org.openhab.binding.http.internal.Util; import org.openhab.binding.http.internal.config.HttpThingConfig; import org.slf4j.Logger; @@ -47,7 +41,7 @@ public class RefreshingUrlCache { private final Logger logger = LoggerFactory.getLogger(RefreshingUrlCache.class); private final String url; - private final HttpClient httpClient; + private final RateLimitedHttpClient httpClient; private final int timeout; private final int bufferSize; private final @Nullable String fallbackEncoding; @@ -57,7 +51,7 @@ public class RefreshingUrlCache { private final ScheduledFuture future; private @Nullable Content lastContent; - public RefreshingUrlCache(ScheduledExecutorService executor, HttpClient httpClient, String url, + public RefreshingUrlCache(ScheduledExecutorService executor, RateLimitedHttpClient httpClient, String url, HttpThingConfig thingConfig) { this.httpClient = httpClient; this.url = url; @@ -85,43 +79,53 @@ public class RefreshingUrlCache { URI finalUrl = new URI(String.format(this.url, new Date())); logger.trace("Requesting refresh (retry={}) from '{}' with timeout {}ms", isRetry, finalUrl, timeout); - Request request = httpClient.newRequest(finalUrl).timeout(timeout, TimeUnit.MILLISECONDS); - headers.forEach(header -> { - String[] keyValuePair = header.split("=", 2); - if (keyValuePair.length == 2) { - request.header(keyValuePair[0].trim(), keyValuePair[1].trim()); - } else { - logger.warn("Splitting header '{}' failed. No '=' was found. Ignoring", header); - } - }); + httpClient.newRequest(finalUrl).thenAccept(request -> { + request.timeout(timeout, TimeUnit.MILLISECONDS); - CompletableFuture<@Nullable Content> response = new CompletableFuture<>(); - response.exceptionally(e -> { - if (e instanceof HttpAuthException) { - if (isRetry) { - logger.warn("Retry after authentication failure failed again for '{}', failing here", - finalUrl); + headers.forEach(header -> { + String[] keyValuePair = header.split("=", 2); + if (keyValuePair.length == 2) { + request.header(keyValuePair[0].trim(), keyValuePair[1].trim()); } else { - AuthenticationStore authStore = httpClient.getAuthenticationStore(); - Authentication.Result authResult = authStore.findAuthenticationResult(finalUrl); - if (authResult != null) { - authStore.removeAuthenticationResult(authResult); - logger.debug("Cleared authentication result for '{}', retrying immediately", finalUrl); - refresh(true); + logger.warn("Splitting header '{}' failed. No '=' was found. Ignoring", header); + } + }); + + CompletableFuture<@Nullable Content> response = new CompletableFuture<>(); + response.exceptionally(e -> { + if (e instanceof HttpAuthException) { + if (isRetry) { + logger.warn("Retry after authentication failure failed again for '{}', failing here", + finalUrl); } else { - logger.warn("Could not find authentication result for '{}', failing here", finalUrl); + AuthenticationStore authStore = httpClient.getAuthenticationStore(); + Authentication.Result authResult = authStore.findAuthenticationResult(finalUrl); + if (authResult != null) { + authStore.removeAuthenticationResult(authResult); + logger.debug("Cleared authentication result for '{}', retrying immediately", finalUrl); + refresh(true); + } else { + logger.warn("Could not find authentication result for '{}', failing here", finalUrl); + } } } + return null; + }).thenAccept(this::processResult); + + if (logger.isTraceEnabled()) { + logger.trace("Sending to '{}': {}", finalUrl, Util.requestToLogString(request)); + } + + request.send(new HttpResponseListener(response, fallbackEncoding, bufferSize)); + }).exceptionally(e -> { + if (e instanceof CancellationException) { + logger.debug("Request to URL {} was cancelled by thing handler.", finalUrl); + } else { + logger.warn("Request to URL {} failed: {}", finalUrl, e.getMessage()); } return null; - }).thenAccept(this::processResult); - - if (logger.isTraceEnabled()) { - logger.trace("Sending to '{}': {}", finalUrl, Util.requestToLogString(request)); - } - - request.send(new HttpResponseListener(response, fallbackEncoding, bufferSize)); + }); } catch (IllegalArgumentException | URISyntaxException e) { logger.warn("Creating request for '{}' failed: {}", url, e.getMessage()); } diff --git a/bundles/org.openhab.binding.http/src/main/java/org/openhab/binding/http/internal/transform/CascadedValueTransformationImpl.java b/bundles/org.openhab.binding.http/src/main/java/org/openhab/binding/http/internal/transform/CascadedValueTransformationImpl.java index a163c9fdc..e5e8c9c02 100644 --- a/bundles/org.openhab.binding.http/src/main/java/org/openhab/binding/http/internal/transform/CascadedValueTransformationImpl.java +++ b/bundles/org.openhab.binding.http/src/main/java/org/openhab/binding/http/internal/transform/CascadedValueTransformationImpl.java @@ -21,6 +21,8 @@ import java.util.stream.Collectors; import org.eclipse.jdt.annotation.NonNullByDefault; import org.eclipse.jdt.annotation.Nullable; import org.openhab.core.transform.TransformationService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * The {@link CascadedValueTransformationImpl} implements {@link ValueTransformation for a cascaded set of @@ -30,13 +32,21 @@ import org.openhab.core.transform.TransformationService; */ @NonNullByDefault public class CascadedValueTransformationImpl implements ValueTransformation { + private final Logger logger = LoggerFactory.getLogger(CascadedValueTransformationImpl.class); private final List transformations; public CascadedValueTransformationImpl(String transformationString, Function transformationServiceSupplier) { - transformations = Arrays.stream(transformationString.split("∩")).filter(s -> !s.isEmpty()) - .map(transformation -> new SingleValueTransformation(transformation, transformationServiceSupplier)) - .collect(Collectors.toList()); + List transformations; + try { + transformations = Arrays.stream(transformationString.split("∩")).filter(s -> !s.isEmpty()) + .map(transformation -> new SingleValueTransformation(transformation, transformationServiceSupplier)) + .collect(Collectors.toList()); + } catch (IllegalArgumentException e) { + transformations = List.of(NoOpValueTransformation.getInstance()); + logger.warn("Transformation ignore, failed to parse {}: {}", transformationString, e.getMessage()); + } + this.transformations = transformations; } @Override diff --git a/bundles/org.openhab.binding.http/src/main/resources/OH-INF/thing/thing-types.xml b/bundles/org.openhab.binding.http/src/main/resources/OH-INF/thing/thing-types.xml index b3907c83c..f298a53c5 100644 --- a/bundles/org.openhab.binding.http/src/main/resources/OH-INF/thing/thing-types.xml +++ b/bundles/org.openhab.binding.http/src/main/resources/OH-INF/thing/thing-types.xml @@ -25,6 +25,12 @@ The timeout in ms for each request 3000 + + + Delay between to requests + 0 + true + Size of the response buffer (default 2048 kB)