[openhab-cloud] Fixes Jetty upgrade issue (#10487)

This removes the ResponseListener class and replaces it with lambda functions for the various Jetty response listeners. This seems to fix our duplicate onContent issue in #10470 , although i'm not at all clear why.
Fixes #10470

Signed-off-by: Dan Cunningham <dan@digitaldan.com>
This commit is contained in:
Dan Cunningham 2021-04-08 13:12:44 -07:00 committed by GitHub
parent f166220068
commit c5a2882a16
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 81 additions and 116 deletions

View File

@ -18,22 +18,16 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.URLEncoder;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.eclipse.jdt.annotation.Nullable;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Request.FailureListener;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Response.ContentListener;
import org.eclipse.jetty.client.api.Response.HeadersListener;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.util.BytesContentProvider;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpFields;
@ -44,7 +38,6 @@ import org.eclipse.jetty.util.URIUtil;
import org.json.JSONException;
import org.json.JSONObject;
import org.openhab.core.OpenHAB;
import org.openhab.core.common.ThreadPoolManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -282,15 +275,19 @@ public class CloudClient {
logger.debug("Got request {}", requestId);
// Get request path
String requestPath = data.getString("path");
logger.debug("Path {}", requestPath);
// Get request method
String requestMethod = data.getString("method");
// Get request body
String requestBody = data.getString("body");
logger.debug("Method {}", requestMethod);
// Get JSONObject for request headers
JSONObject requestHeadersJson = data.getJSONObject("headers");
logger.debug("{}", requestHeadersJson.toString());
logger.debug("Headers: {}", requestHeadersJson.toString());
// Get request body
String requestBody = data.getString("body");
logger.trace("Body {}", requestBody);
// Get JSONObject for request query parameters
JSONObject requestQueryJson = data.getJSONObject("query");
logger.debug("Query {}", requestQueryJson.toString());
// Create URI builder with base request URI of openHAB and path from request
String newPath = URIUtil.addPaths(localBaseUrl, requestPath);
Iterator<String> queryIterator = requestQueryJson.keys();
@ -332,8 +329,71 @@ public class CloudClient {
logger.warn("Unsupported request method {}", requestMethod);
return;
}
ResponseListener listener = new ResponseListener(requestId);
request.onResponseHeaders(listener).onResponseContent(listener).onRequestFailure(listener).send(listener);
request.onResponseHeaders(response -> {
logger.debug("onHeaders {}", requestId);
JSONObject responseJson = new JSONObject();
try {
responseJson.put("id", requestId);
responseJson.put("headers", getJSONHeaders(response.getHeaders()));
responseJson.put("responseStatusCode", response.getStatus());
responseJson.put("responseStatusText", "OK");
socket.emit("responseHeader", responseJson);
logger.trace("Sent headers to request {}", requestId);
logger.trace("{}", responseJson.toString());
} catch (JSONException e) {
logger.debug("{}", e.getMessage());
}
}).onResponseContent((theResponse, content) -> {
logger.debug("onResponseContent: {}, content size {}", requestId, String.valueOf(content.remaining()));
JSONObject responseJson = new JSONObject();
try {
responseJson.put("id", requestId);
responseJson.put("body", BufferUtil.toArray(content));
if (logger.isTraceEnabled()) {
logger.trace("{}", StandardCharsets.UTF_8.decode(content).toString());
}
socket.emit("responseContentBinary", responseJson);
logger.trace("Sent content to request {}", requestId);
} catch (JSONException e) {
logger.debug("{}", e.getMessage());
}
}).onRequestFailure((origRequest, failure) -> {
logger.debug("onRequestFailure: {}, {}", requestId, failure.getMessage());
JSONObject responseJson = new JSONObject();
try {
responseJson.put("id", requestId);
responseJson.put("responseStatusText", "openHAB connection error: " + failure.getMessage());
socket.emit("responseError", responseJson);
} catch (JSONException e) {
logger.debug("{}", e.getMessage());
}
}).send(result -> {
logger.debug("onComplete: {}", requestId);
// Remove this request from list of running requests
runningRequests.remove(requestId);
if ((result != null && result.isFailed())
&& (result.getResponse() != null && result.getResponse().getStatus() != HttpStatus.OK_200)) {
if (result.getFailure() != null) {
logger.debug("Jetty request {} failed: {}", requestId, result.getFailure().getMessage());
}
if (result.getRequestFailure() != null) {
logger.debug("Request Failure: {}", result.getRequestFailure().getMessage());
}
if (result.getResponseFailure() != null) {
logger.debug("Response Failure: {}", result.getResponseFailure().getMessage());
}
}
JSONObject responseJson = new JSONObject();
try {
responseJson.put("id", requestId);
socket.emit("responseFinished", responseJson);
logger.debug("Finished responding to request {}", requestId);
} catch (JSONException e) {
logger.debug("{}", e.getMessage());
}
});
// If successfully submitted request to http client, add it to the list of currently
// running requests to be able to cancel it if needed
runningRequests.put(requestId, request);
@ -512,110 +572,15 @@ public class CloudClient {
this.listener = listener;
}
/*
* An internal class which forwards response headers and data back to the openHAB Cloud
*/
private class ResponseListener
implements Response.CompleteListener, HeadersListener, ContentListener, FailureListener {
private static final String THREADPOOL_OPENHABCLOUD = "openhabcloud";
private int mRequestId;
private boolean mHeadersSent = false;
public ResponseListener(int requestId) {
mRequestId = requestId;
}
private JSONObject getJSONHeaders(HttpFields httpFields) {
JSONObject headersJSON = new JSONObject();
try {
for (HttpField field : httpFields) {
headersJSON.put(field.getName(), field.getValue());
}
} catch (JSONException e) {
logger.warn("Error forming response headers: {}", e.getMessage());
}
return headersJSON;
}
@Override
public void onComplete(Result result) {
// Remove this request from list of running requests
runningRequests.remove(mRequestId);
if ((result != null && result.isFailed())
&& (result.getResponse() != null && result.getResponse().getStatus() != HttpStatus.OK_200)) {
if (result.getFailure() != null) {
logger.warn("Jetty request {} failed: {}", mRequestId, result.getFailure().getMessage());
}
if (result.getRequestFailure() != null) {
logger.warn("Request Failure: {}", result.getRequestFailure().getMessage());
}
if (result.getResponseFailure() != null) {
logger.warn("Response Failure: {}", result.getResponseFailure().getMessage());
}
}
/**
* What is this? In some cases where latency is very low the myopenhab service
* can receive responseFinished before the headers or content are received and I
* cannot find another workaround to prevent it.
*/
ThreadPoolManager.getScheduledPool(THREADPOOL_OPENHABCLOUD).schedule(() -> {
JSONObject responseJson = new JSONObject();
try {
responseJson.put("id", mRequestId);
socket.emit("responseFinished", responseJson);
logger.debug("Finished responding to request {}", mRequestId);
} catch (JSONException e) {
logger.debug("{}", e.getMessage());
}
}, 1, TimeUnit.MILLISECONDS);
}
@Override
public synchronized void onFailure(Request request, Throwable failure) {
JSONObject responseJson = new JSONObject();
try {
responseJson.put("id", mRequestId);
responseJson.put("responseStatusText", "openHAB connection error: " + failure.getMessage());
socket.emit("responseError", responseJson);
} catch (JSONException e) {
logger.debug("{}", e.getMessage());
}
}
@Override
public void onContent(Response response, ByteBuffer content) {
logger.debug("Jetty received response content of size {}", String.valueOf(content.remaining()));
JSONObject responseJson = new JSONObject();
try {
responseJson.put("id", mRequestId);
responseJson.put("body", BufferUtil.toArray(content));
socket.emit("responseContentBinary", responseJson);
logger.debug("Sent content to request {}", mRequestId);
} catch (JSONException e) {
logger.debug("{}", e.getMessage());
}
}
@Override
public void onHeaders(Response response) {
if (!mHeadersSent) {
logger.debug("Jetty finished receiving response header");
JSONObject responseJson = new JSONObject();
mHeadersSent = true;
try {
responseJson.put("id", mRequestId);
responseJson.put("headers", getJSONHeaders(response.getHeaders()));
responseJson.put("responseStatusCode", response.getStatus());
responseJson.put("responseStatusText", "OK");
socket.emit("responseHeader", responseJson);
logger.debug("Sent headers to request {}", mRequestId);
logger.debug("{}", responseJson.toString());
} catch (JSONException e) {
logger.debug("{}", e.getMessage());
}
private JSONObject getJSONHeaders(HttpFields httpFields) {
JSONObject headersJSON = new JSONObject();
try {
for (HttpField field : httpFields) {
headersJSON.put(field.getName(), field.getValue());
}
} catch (JSONException e) {
logger.warn("Error forming response headers: {}", e.getMessage());
}
return headersJSON;
}
}