From c5a2882a160189bd83954aae59b979c62ba74da6 Mon Sep 17 00:00:00 2001 From: Dan Cunningham Date: Thu, 8 Apr 2021 13:12:44 -0700 Subject: [PATCH] [openhab-cloud] Fixes Jetty upgrade issue (#10487) This removes the ResponseListener class and replaces it with lambda functions for the various Jetty response listeners. This seems to fix our duplicate onContent issue in #10470 , although i'm not at all clear why. Fixes #10470 Signed-off-by: Dan Cunningham --- .../io/openhabcloud/internal/CloudClient.java | 197 +++++++----------- 1 file changed, 81 insertions(+), 116 deletions(-) diff --git a/bundles/org.openhab.io.openhabcloud/src/main/java/org/openhab/io/openhabcloud/internal/CloudClient.java b/bundles/org.openhab.io.openhabcloud/src/main/java/org/openhab/io/openhabcloud/internal/CloudClient.java index 91ba21da1..47ce5cb54 100644 --- a/bundles/org.openhab.io.openhabcloud/src/main/java/org/openhab/io/openhabcloud/internal/CloudClient.java +++ b/bundles/org.openhab.io.openhabcloud/src/main/java/org/openhab/io/openhabcloud/internal/CloudClient.java @@ -18,22 +18,16 @@ import java.net.URI; import java.net.URISyntaxException; import java.net.URL; import java.net.URLEncoder; -import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; import org.eclipse.jdt.annotation.Nullable; import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.api.Request; -import org.eclipse.jetty.client.api.Request.FailureListener; -import org.eclipse.jetty.client.api.Response; -import org.eclipse.jetty.client.api.Response.ContentListener; -import org.eclipse.jetty.client.api.Response.HeadersListener; -import org.eclipse.jetty.client.api.Result; import org.eclipse.jetty.client.util.BytesContentProvider; import org.eclipse.jetty.http.HttpField; import org.eclipse.jetty.http.HttpFields; @@ -44,7 +38,6 @@ import org.eclipse.jetty.util.URIUtil; import org.json.JSONException; import org.json.JSONObject; import org.openhab.core.OpenHAB; -import org.openhab.core.common.ThreadPoolManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -282,15 +275,19 @@ public class CloudClient { logger.debug("Got request {}", requestId); // Get request path String requestPath = data.getString("path"); + logger.debug("Path {}", requestPath); // Get request method String requestMethod = data.getString("method"); - // Get request body - String requestBody = data.getString("body"); + logger.debug("Method {}", requestMethod); // Get JSONObject for request headers JSONObject requestHeadersJson = data.getJSONObject("headers"); - logger.debug("{}", requestHeadersJson.toString()); + logger.debug("Headers: {}", requestHeadersJson.toString()); + // Get request body + String requestBody = data.getString("body"); + logger.trace("Body {}", requestBody); // Get JSONObject for request query parameters JSONObject requestQueryJson = data.getJSONObject("query"); + logger.debug("Query {}", requestQueryJson.toString()); // Create URI builder with base request URI of openHAB and path from request String newPath = URIUtil.addPaths(localBaseUrl, requestPath); Iterator queryIterator = requestQueryJson.keys(); @@ -332,8 +329,71 @@ public class CloudClient { logger.warn("Unsupported request method {}", requestMethod); return; } - ResponseListener listener = new ResponseListener(requestId); - request.onResponseHeaders(listener).onResponseContent(listener).onRequestFailure(listener).send(listener); + + request.onResponseHeaders(response -> { + logger.debug("onHeaders {}", requestId); + JSONObject responseJson = new JSONObject(); + try { + responseJson.put("id", requestId); + responseJson.put("headers", getJSONHeaders(response.getHeaders())); + responseJson.put("responseStatusCode", response.getStatus()); + responseJson.put("responseStatusText", "OK"); + socket.emit("responseHeader", responseJson); + logger.trace("Sent headers to request {}", requestId); + logger.trace("{}", responseJson.toString()); + } catch (JSONException e) { + logger.debug("{}", e.getMessage()); + } + }).onResponseContent((theResponse, content) -> { + logger.debug("onResponseContent: {}, content size {}", requestId, String.valueOf(content.remaining())); + JSONObject responseJson = new JSONObject(); + try { + responseJson.put("id", requestId); + responseJson.put("body", BufferUtil.toArray(content)); + if (logger.isTraceEnabled()) { + logger.trace("{}", StandardCharsets.UTF_8.decode(content).toString()); + } + socket.emit("responseContentBinary", responseJson); + logger.trace("Sent content to request {}", requestId); + } catch (JSONException e) { + logger.debug("{}", e.getMessage()); + } + }).onRequestFailure((origRequest, failure) -> { + logger.debug("onRequestFailure: {}, {}", requestId, failure.getMessage()); + JSONObject responseJson = new JSONObject(); + try { + responseJson.put("id", requestId); + responseJson.put("responseStatusText", "openHAB connection error: " + failure.getMessage()); + socket.emit("responseError", responseJson); + } catch (JSONException e) { + logger.debug("{}", e.getMessage()); + } + }).send(result -> { + logger.debug("onComplete: {}", requestId); + // Remove this request from list of running requests + runningRequests.remove(requestId); + if ((result != null && result.isFailed()) + && (result.getResponse() != null && result.getResponse().getStatus() != HttpStatus.OK_200)) { + if (result.getFailure() != null) { + logger.debug("Jetty request {} failed: {}", requestId, result.getFailure().getMessage()); + } + if (result.getRequestFailure() != null) { + logger.debug("Request Failure: {}", result.getRequestFailure().getMessage()); + } + if (result.getResponseFailure() != null) { + logger.debug("Response Failure: {}", result.getResponseFailure().getMessage()); + } + } + JSONObject responseJson = new JSONObject(); + try { + responseJson.put("id", requestId); + socket.emit("responseFinished", responseJson); + logger.debug("Finished responding to request {}", requestId); + } catch (JSONException e) { + logger.debug("{}", e.getMessage()); + } + }); + // If successfully submitted request to http client, add it to the list of currently // running requests to be able to cancel it if needed runningRequests.put(requestId, request); @@ -512,110 +572,15 @@ public class CloudClient { this.listener = listener; } - /* - * An internal class which forwards response headers and data back to the openHAB Cloud - */ - private class ResponseListener - implements Response.CompleteListener, HeadersListener, ContentListener, FailureListener { - - private static final String THREADPOOL_OPENHABCLOUD = "openhabcloud"; - private int mRequestId; - private boolean mHeadersSent = false; - - public ResponseListener(int requestId) { - mRequestId = requestId; - } - - private JSONObject getJSONHeaders(HttpFields httpFields) { - JSONObject headersJSON = new JSONObject(); - try { - for (HttpField field : httpFields) { - headersJSON.put(field.getName(), field.getValue()); - } - } catch (JSONException e) { - logger.warn("Error forming response headers: {}", e.getMessage()); - } - return headersJSON; - } - - @Override - public void onComplete(Result result) { - // Remove this request from list of running requests - runningRequests.remove(mRequestId); - if ((result != null && result.isFailed()) - && (result.getResponse() != null && result.getResponse().getStatus() != HttpStatus.OK_200)) { - if (result.getFailure() != null) { - logger.warn("Jetty request {} failed: {}", mRequestId, result.getFailure().getMessage()); - } - if (result.getRequestFailure() != null) { - logger.warn("Request Failure: {}", result.getRequestFailure().getMessage()); - } - if (result.getResponseFailure() != null) { - logger.warn("Response Failure: {}", result.getResponseFailure().getMessage()); - } - } - - /** - * What is this? In some cases where latency is very low the myopenhab service - * can receive responseFinished before the headers or content are received and I - * cannot find another workaround to prevent it. - */ - ThreadPoolManager.getScheduledPool(THREADPOOL_OPENHABCLOUD).schedule(() -> { - JSONObject responseJson = new JSONObject(); - try { - responseJson.put("id", mRequestId); - socket.emit("responseFinished", responseJson); - logger.debug("Finished responding to request {}", mRequestId); - } catch (JSONException e) { - logger.debug("{}", e.getMessage()); - } - }, 1, TimeUnit.MILLISECONDS); - } - - @Override - public synchronized void onFailure(Request request, Throwable failure) { - JSONObject responseJson = new JSONObject(); - try { - responseJson.put("id", mRequestId); - responseJson.put("responseStatusText", "openHAB connection error: " + failure.getMessage()); - socket.emit("responseError", responseJson); - } catch (JSONException e) { - logger.debug("{}", e.getMessage()); - } - } - - @Override - public void onContent(Response response, ByteBuffer content) { - logger.debug("Jetty received response content of size {}", String.valueOf(content.remaining())); - JSONObject responseJson = new JSONObject(); - try { - responseJson.put("id", mRequestId); - responseJson.put("body", BufferUtil.toArray(content)); - socket.emit("responseContentBinary", responseJson); - logger.debug("Sent content to request {}", mRequestId); - } catch (JSONException e) { - logger.debug("{}", e.getMessage()); - } - } - - @Override - public void onHeaders(Response response) { - if (!mHeadersSent) { - logger.debug("Jetty finished receiving response header"); - JSONObject responseJson = new JSONObject(); - mHeadersSent = true; - try { - responseJson.put("id", mRequestId); - responseJson.put("headers", getJSONHeaders(response.getHeaders())); - responseJson.put("responseStatusCode", response.getStatus()); - responseJson.put("responseStatusText", "OK"); - socket.emit("responseHeader", responseJson); - logger.debug("Sent headers to request {}", mRequestId); - logger.debug("{}", responseJson.toString()); - } catch (JSONException e) { - logger.debug("{}", e.getMessage()); - } + private JSONObject getJSONHeaders(HttpFields httpFields) { + JSONObject headersJSON = new JSONObject(); + try { + for (HttpField field : httpFields) { + headersJSON.put(field.getName(), field.getValue()); } + } catch (JSONException e) { + logger.warn("Error forming response headers: {}", e.getMessage()); } + return headersJSON; } }