[hue] Improve connection stability (#15477)
Signed-off-by: Andrew Fiddian-Green <software@whitebear.ch>
This commit is contained in:
parent
1cf57e7dfe
commit
7fb9efc885
|
@ -37,6 +37,9 @@ import java.util.concurrent.Future;
|
||||||
import java.util.concurrent.Semaphore;
|
import java.util.concurrent.Semaphore;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
|
import java.util.concurrent.locks.Lock;
|
||||||
|
import java.util.concurrent.locks.ReadWriteLock;
|
||||||
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import javax.ws.rs.core.MediaType;
|
import javax.ws.rs.core.MediaType;
|
||||||
|
@ -55,6 +58,7 @@ import org.eclipse.jetty.http.HttpURI;
|
||||||
import org.eclipse.jetty.http.HttpVersion;
|
import org.eclipse.jetty.http.HttpVersion;
|
||||||
import org.eclipse.jetty.http.MetaData;
|
import org.eclipse.jetty.http.MetaData;
|
||||||
import org.eclipse.jetty.http.MetaData.Response;
|
import org.eclipse.jetty.http.MetaData.Response;
|
||||||
|
import org.eclipse.jetty.http2.ErrorCode;
|
||||||
import org.eclipse.jetty.http2.api.Session;
|
import org.eclipse.jetty.http2.api.Session;
|
||||||
import org.eclipse.jetty.http2.api.Stream;
|
import org.eclipse.jetty.http2.api.Stream;
|
||||||
import org.eclipse.jetty.http2.client.HTTP2Client;
|
import org.eclipse.jetty.http2.client.HTTP2Client;
|
||||||
|
@ -116,6 +120,7 @@ public class Clip2Bridge implements Closeable {
|
||||||
private abstract class BaseStreamListenerAdapter<T> extends Stream.Listener.Adapter {
|
private abstract class BaseStreamListenerAdapter<T> extends Stream.Listener.Adapter {
|
||||||
protected final CompletableFuture<T> completable = new CompletableFuture<T>();
|
protected final CompletableFuture<T> completable = new CompletableFuture<T>();
|
||||||
private String contentType = "UNDEFINED";
|
private String contentType = "UNDEFINED";
|
||||||
|
private int status;
|
||||||
|
|
||||||
protected T awaitResult() throws ExecutionException, InterruptedException, TimeoutException {
|
protected T awaitResult() throws ExecutionException, InterruptedException, TimeoutException {
|
||||||
return completable.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
|
return completable.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
|
||||||
|
@ -130,7 +135,22 @@ public class Clip2Bridge implements Closeable {
|
||||||
return contentType;
|
return contentType;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void handleHttp2Error(Http2Error error) {
|
/**
|
||||||
|
* Return the HTTP status code.
|
||||||
|
*
|
||||||
|
* @return status code e.g. 200
|
||||||
|
*/
|
||||||
|
protected int getStatus() {
|
||||||
|
return status;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Handle an HTTP2 error.
|
||||||
|
*
|
||||||
|
* @param error the type of error.
|
||||||
|
* @param session the session on which the error occurred.
|
||||||
|
*/
|
||||||
|
protected void handleHttp2Error(Http2Error error, Session session) {
|
||||||
Http2Exception e = new Http2Exception(error);
|
Http2Exception e = new Http2Exception(error);
|
||||||
if (Http2Error.UNAUTHORIZED.equals(error)) {
|
if (Http2Error.UNAUTHORIZED.equals(error)) {
|
||||||
// for external error handling, abstract authorization errors into a separate exception
|
// for external error handling, abstract authorization errors into a separate exception
|
||||||
|
@ -138,7 +158,7 @@ public class Clip2Bridge implements Closeable {
|
||||||
} else {
|
} else {
|
||||||
completable.completeExceptionally(e);
|
completable.completeExceptionally(e);
|
||||||
}
|
}
|
||||||
fatalErrorDelayed(this, e);
|
fatalErrorDelayed(this, e, session);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -146,18 +166,19 @@ public class Clip2Bridge implements Closeable {
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void onHeaders(@Nullable Stream stream, @Nullable HeadersFrame frame) {
|
public void onHeaders(@Nullable Stream stream, @Nullable HeadersFrame frame) {
|
||||||
|
Objects.requireNonNull(stream);
|
||||||
Objects.requireNonNull(frame);
|
Objects.requireNonNull(frame);
|
||||||
MetaData metaData = frame.getMetaData();
|
MetaData metaData = frame.getMetaData();
|
||||||
if (metaData.isResponse()) {
|
if (metaData.isResponse()) {
|
||||||
Response responseMetaData = (Response) metaData;
|
Response responseMetaData = (Response) metaData;
|
||||||
int httpStatus = responseMetaData.getStatus();
|
contentType = responseMetaData.getFields().get(HttpHeader.CONTENT_TYPE).toLowerCase();
|
||||||
switch (httpStatus) {
|
status = responseMetaData.getStatus();
|
||||||
|
switch (status) {
|
||||||
case HttpStatus.UNAUTHORIZED_401:
|
case HttpStatus.UNAUTHORIZED_401:
|
||||||
case HttpStatus.FORBIDDEN_403:
|
case HttpStatus.FORBIDDEN_403:
|
||||||
handleHttp2Error(Http2Error.UNAUTHORIZED);
|
handleHttp2Error(Http2Error.UNAUTHORIZED, stream.getSession());
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
contentType = responseMetaData.getFields().get(HttpHeader.CONTENT_TYPE).toLowerCase();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -193,13 +214,15 @@ public class Clip2Bridge implements Closeable {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean onIdleTimeout(@Nullable Stream stream, @Nullable Throwable x) {
|
public boolean onIdleTimeout(@Nullable Stream stream, @Nullable Throwable x) {
|
||||||
handleHttp2Error(Http2Error.IDLE);
|
Objects.requireNonNull(stream);
|
||||||
|
handleHttp2Error(Http2Error.IDLE, stream.getSession());
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onTimeout(@Nullable Stream stream, @Nullable Throwable x) {
|
public void onTimeout(@Nullable Stream stream, @Nullable Throwable x) {
|
||||||
handleHttp2Error(Http2Error.TIMEOUT);
|
Objects.requireNonNull(stream);
|
||||||
|
handleHttp2Error(Http2Error.TIMEOUT, stream.getSession());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -255,7 +278,8 @@ public class Clip2Bridge implements Closeable {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onClosed(@Nullable Stream stream) {
|
public void onClosed(@Nullable Stream stream) {
|
||||||
handleHttp2Error(Http2Error.CLOSED);
|
Objects.requireNonNull(stream);
|
||||||
|
handleHttp2Error(Http2Error.CLOSED, stream.getSession());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -300,7 +324,8 @@ public class Clip2Bridge implements Closeable {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onReset(@Nullable Stream stream, @Nullable ResetFrame frame) {
|
public void onReset(@Nullable Stream stream, @Nullable ResetFrame frame) {
|
||||||
handleHttp2Error(Http2Error.RESET);
|
Objects.requireNonNull(stream);
|
||||||
|
handleHttp2Error(Http2Error.RESET, stream.getSession());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -349,17 +374,29 @@ public class Clip2Bridge implements Closeable {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onClose(@Nullable Session session, @Nullable GoAwayFrame frame) {
|
public void onClose(@Nullable Session session, @Nullable GoAwayFrame frame) {
|
||||||
fatalErrorDelayed(this, new Http2Exception(Http2Error.CLOSED));
|
Objects.requireNonNull(session);
|
||||||
|
fatalErrorDelayed(this, new Http2Exception(Http2Error.CLOSED), session);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onFailure(@Nullable Session session, @Nullable Throwable failure) {
|
public void onFailure(@Nullable Session session, @Nullable Throwable failure) {
|
||||||
fatalErrorDelayed(this, new Http2Exception(Http2Error.FAILURE));
|
Objects.requireNonNull(session);
|
||||||
|
fatalErrorDelayed(this, new Http2Exception(Http2Error.FAILURE), session);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The Hue bridge uses the 'nginx' web server which sends HTTP2 GO_AWAY frames after a certain number (normally
|
||||||
|
* 999) of GET/PUT calls. This is normal behaviour so we just start a new thread to close and reopen the
|
||||||
|
* session.
|
||||||
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void onGoAway(@Nullable Session session, @Nullable GoAwayFrame frame) {
|
public void onGoAway(@Nullable Session session, @Nullable GoAwayFrame frame) {
|
||||||
fatalErrorDelayed(this, new Http2Exception(Http2Error.GO_AWAY));
|
Objects.requireNonNull(session);
|
||||||
|
if (http2Session == session) {
|
||||||
|
Thread recreateThread = new Thread(() -> recreateSession());
|
||||||
|
Clip2Bridge.this.recreateThread = recreateThread;
|
||||||
|
recreateThread.start();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -369,15 +406,39 @@ public class Clip2Bridge implements Closeable {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onPing(@Nullable Session session, @Nullable PingFrame frame) {
|
public void onPing(@Nullable Session session, @Nullable PingFrame frame) {
|
||||||
checkAliveOk();
|
Objects.requireNonNull(session);
|
||||||
if (Objects.nonNull(session) && Objects.nonNull(frame) && !frame.isReply()) {
|
Objects.requireNonNull(frame);
|
||||||
session.ping(new PingFrame(true), Callback.NOOP);
|
if (http2Session == session) {
|
||||||
|
checkAliveOk();
|
||||||
|
if (!frame.isReply()) {
|
||||||
|
session.ping(new PingFrame(true), Callback.NOOP);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onReset(@Nullable Session session, @Nullable ResetFrame frame) {
|
public void onReset(@Nullable Session session, @Nullable ResetFrame frame) {
|
||||||
fatalErrorDelayed(this, new Http2Exception(Http2Error.RESET));
|
Objects.requireNonNull(session);
|
||||||
|
fatalErrorDelayed(this, new Http2Exception(Http2Error.RESET), session);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Synchronizer for accessing the HTTP2 session object. This method wraps the 'sessionUseCreateLock' ReadWriteLock
|
||||||
|
* so that GET/PUT methods can access the session on multiple concurrent threads via the 'read' access lock, yet are
|
||||||
|
* forced to wait if the session is being created via its single thread access 'write' lock.
|
||||||
|
*/
|
||||||
|
private class SessionSynchronizer implements AutoCloseable {
|
||||||
|
private final Optional<Lock> lockOptional;
|
||||||
|
|
||||||
|
SessionSynchronizer(boolean requireExclusiveAccess) throws InterruptedException {
|
||||||
|
Lock lock = requireExclusiveAccess ? sessionUseCreateLock.writeLock() : sessionUseCreateLock.readLock();
|
||||||
|
lockOptional = lock.tryLock(TIMEOUT_SECONDS, TimeUnit.SECONDS) ? Optional.of(lock) : Optional.empty();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
lockOptional.ifPresent(lock -> lock.unlock());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -494,16 +555,19 @@ public class Clip2Bridge implements Closeable {
|
||||||
private final String applicationKey;
|
private final String applicationKey;
|
||||||
private final Clip2BridgeHandler bridgeHandler;
|
private final Clip2BridgeHandler bridgeHandler;
|
||||||
private final Gson jsonParser = new Gson();
|
private final Gson jsonParser = new Gson();
|
||||||
private final Semaphore streamMutex = new Semaphore(MAX_CONCURRENT_STREAMS, true);
|
private final Semaphore streamMutex = new Semaphore(MAX_CONCURRENT_STREAMS, true); // i.e. fair
|
||||||
|
private final ReadWriteLock sessionUseCreateLock = new ReentrantReadWriteLock(true); // i.e. fair
|
||||||
|
private final Map<Integer, Future<?>> fatalErrorTasks = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
private boolean recreatingSession;
|
||||||
private boolean closing;
|
private boolean closing;
|
||||||
private State onlineState = State.CLOSED;
|
private State onlineState = State.CLOSED;
|
||||||
private Optional<Instant> lastRequestTime = Optional.empty();
|
private Optional<Instant> lastRequestTime = Optional.empty();
|
||||||
private Instant sessionExpireTime = Instant.MAX;
|
private Instant sessionExpireTime = Instant.MAX;
|
||||||
private @Nullable Session http2Session;
|
|
||||||
|
|
||||||
|
private @Nullable Session http2Session;
|
||||||
|
private @Nullable Thread recreateThread;
|
||||||
private @Nullable Future<?> checkAliveTask;
|
private @Nullable Future<?> checkAliveTask;
|
||||||
private Map<Integer, Future<?>> fatalErrorTasks = new ConcurrentHashMap<>();
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructor.
|
* Constructor.
|
||||||
|
@ -521,11 +585,7 @@ public class Clip2Bridge implements Closeable {
|
||||||
http2Client = httpClientFactory.createHttp2Client("hue-clip2", httpClient.getSslContextFactory());
|
http2Client = httpClientFactory.createHttp2Client("hue-clip2", httpClient.getSslContextFactory());
|
||||||
http2Client.setConnectTimeout(Clip2Bridge.TIMEOUT_SECONDS * 1000);
|
http2Client.setConnectTimeout(Clip2Bridge.TIMEOUT_SECONDS * 1000);
|
||||||
http2Client.setIdleTimeout(-1);
|
http2Client.setIdleTimeout(-1);
|
||||||
try {
|
startHttp2Client();
|
||||||
http2Client.start();
|
|
||||||
} catch (Exception e) {
|
|
||||||
throw new ApiException("Error starting HTTP/2 client", e);
|
|
||||||
}
|
|
||||||
this.bridgeHandler = bridgeHandler;
|
this.bridgeHandler = bridgeHandler;
|
||||||
this.hostName = hostName;
|
this.hostName = hostName;
|
||||||
this.applicationKey = applicationKey;
|
this.applicationKey = applicationKey;
|
||||||
|
@ -557,9 +617,9 @@ public class Clip2Bridge implements Closeable {
|
||||||
Session session = http2Session;
|
Session session = http2Session;
|
||||||
if (Objects.nonNull(session)) {
|
if (Objects.nonNull(session)) {
|
||||||
session.ping(new PingFrame(false), Callback.NOOP);
|
session.ping(new PingFrame(false), Callback.NOOP);
|
||||||
}
|
if (Instant.now().isAfter(sessionExpireTime)) {
|
||||||
if (Instant.now().isAfter(sessionExpireTime)) {
|
fatalError(this, new Http2Exception(Http2Error.TIMEOUT), session.hashCode());
|
||||||
fatalError(this, new Http2Exception(Http2Error.TIMEOUT));
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -578,10 +638,14 @@ public class Clip2Bridge implements Closeable {
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public void close() {
|
||||||
closing = true;
|
closing = true;
|
||||||
|
Thread recreateThread = this.recreateThread;
|
||||||
|
if (Objects.nonNull(recreateThread) && recreateThread.isAlive()) {
|
||||||
|
recreateThread.interrupt();
|
||||||
|
}
|
||||||
close2();
|
close2();
|
||||||
try {
|
try {
|
||||||
http2Client.stop();
|
stopHttp2Client();
|
||||||
} catch (Exception e) {
|
} catch (ApiException e) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -591,7 +655,7 @@ public class Clip2Bridge implements Closeable {
|
||||||
private void close2() {
|
private void close2() {
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
LOGGER.debug("close2()");
|
LOGGER.debug("close2()");
|
||||||
boolean notifyHandler = onlineState == State.ACTIVE && !closing;
|
boolean notifyHandler = onlineState == State.ACTIVE && !closing && !recreatingSession;
|
||||||
onlineState = State.CLOSED;
|
onlineState = State.CLOSED;
|
||||||
synchronized (fatalErrorTasks) {
|
synchronized (fatalErrorTasks) {
|
||||||
fatalErrorTasks.values().forEach(task -> cancelTask(task, true));
|
fatalErrorTasks.values().forEach(task -> cancelTask(task, true));
|
||||||
|
@ -599,6 +663,7 @@ public class Clip2Bridge implements Closeable {
|
||||||
}
|
}
|
||||||
cancelTask(checkAliveTask, true);
|
cancelTask(checkAliveTask, true);
|
||||||
checkAliveTask = null;
|
checkAliveTask = null;
|
||||||
|
closeEventStream();
|
||||||
closeSession();
|
closeSession();
|
||||||
if (notifyHandler) {
|
if (notifyHandler) {
|
||||||
bridgeHandler.onConnectionOffline();
|
bridgeHandler.onConnectionOffline();
|
||||||
|
@ -606,39 +671,74 @@ public class Clip2Bridge implements Closeable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Close the event stream(s) if necessary.
|
||||||
|
*/
|
||||||
|
private void closeEventStream() {
|
||||||
|
Session session = http2Session;
|
||||||
|
if (Objects.nonNull(session)) {
|
||||||
|
final int sessionId = session.hashCode();
|
||||||
|
session.getStreams().stream().filter(s -> Objects.nonNull(s.getAttribute(EVENT_STREAM_ID)) && !s.isReset())
|
||||||
|
.forEach(s -> {
|
||||||
|
int streamId = s.getId();
|
||||||
|
LOGGER.debug("closeEventStream() sessionId:{}, streamId:{}", sessionId, streamId);
|
||||||
|
s.reset(new ResetFrame(streamId, ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Close the HTTP 2 session if necessary.
|
* Close the HTTP 2 session if necessary.
|
||||||
*/
|
*/
|
||||||
private void closeSession() {
|
private void closeSession() {
|
||||||
LOGGER.debug("closeSession()");
|
|
||||||
Session session = http2Session;
|
Session session = http2Session;
|
||||||
if (Objects.nonNull(session) && !session.isClosed()) {
|
if (Objects.nonNull(session)) {
|
||||||
session.close(0, null, Callback.NOOP);
|
LOGGER.debug("closeSession() sessionId:{}, openStreamCount:{}", session.hashCode(),
|
||||||
|
session.getStreams().size());
|
||||||
|
session.close(ErrorCode.NO_ERROR.code, "closeSession", Callback.NOOP);
|
||||||
}
|
}
|
||||||
http2Session = null;
|
http2Session = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Method that is called back in case of fatal stream or session events. Note: under normal operation, the Hue
|
* Close the given stream.
|
||||||
* Bridge sends a 'soft' GO_AWAY command every nine or ten hours, so we handle such soft errors by attempting to
|
*
|
||||||
* silently close and re-open the connection without notifying the handler of an actual 'hard' error.
|
* @param stream to be closed.
|
||||||
|
*/
|
||||||
|
private void closeStream(@Nullable Stream stream) {
|
||||||
|
if (Objects.nonNull(stream) && !stream.isReset()) {
|
||||||
|
stream.reset(new ResetFrame(stream.getId(), ErrorCode.NO_ERROR.code), Callback.NOOP);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Method that is called back in case of fatal stream or session events. The error is only processed if the
|
||||||
|
* connection is online, not in process of closing, and the identities of the current session and the session that
|
||||||
|
* caused the error are the same. In other words it ignores errors relating to expired sessions.
|
||||||
*
|
*
|
||||||
* @param listener the entity that caused this method to be called.
|
* @param listener the entity that caused this method to be called.
|
||||||
* @param cause the exception that caused the error.
|
* @param cause the type of exception that caused the error.
|
||||||
|
* @param sessionId the identity of the session on which the error occurred.
|
||||||
*/
|
*/
|
||||||
private synchronized void fatalError(Object listener, Http2Exception cause) {
|
private synchronized void fatalError(Object listener, Http2Exception cause, int sessionId) {
|
||||||
if (onlineState == State.CLOSED || closing) {
|
if (onlineState == State.CLOSED || closing) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
String causeId = listener.getClass().getSimpleName();
|
Session session = http2Session;
|
||||||
|
if (Objects.isNull(session) || session.hashCode() != sessionId) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
String listenerId = listener.getClass().getSimpleName();
|
||||||
if (listener instanceof ContentStreamListenerAdapter) {
|
if (listener instanceof ContentStreamListenerAdapter) {
|
||||||
// on GET / PUT requests the caller handles errors and closes the stream; the session is still OK
|
// on GET / PUT requests the caller handles errors and closes the stream; the session is still OK
|
||||||
LOGGER.debug("fatalError() {} {} ignoring", causeId, cause.error);
|
LOGGER.debug("fatalError() listener:{}, sessionId:{}, error:{} => ignoring", listenerId, sessionId,
|
||||||
|
cause.error);
|
||||||
} else {
|
} else {
|
||||||
if (LOGGER.isDebugEnabled()) {
|
if (LOGGER.isDebugEnabled()) {
|
||||||
LOGGER.debug("fatalError() {} {} closing", causeId, cause.error, cause);
|
LOGGER.debug("fatalError() listener:{}, sessionId:{}, error:{} => closing", listenerId, sessionId,
|
||||||
|
cause.error, cause);
|
||||||
} else {
|
} else {
|
||||||
LOGGER.warn("Fatal error {} {} => closing session.", causeId, cause.error);
|
LOGGER.warn("Fatal error '{}' from '{}' => closing session.", cause.error, listenerId);
|
||||||
}
|
}
|
||||||
close2();
|
close2();
|
||||||
}
|
}
|
||||||
|
@ -649,13 +749,15 @@ public class Clip2Bridge implements Closeable {
|
||||||
* delay in order to prevent sequencing issues.
|
* delay in order to prevent sequencing issues.
|
||||||
*
|
*
|
||||||
* @param listener the entity that caused this method to be called.
|
* @param listener the entity that caused this method to be called.
|
||||||
* @param cause the exception that caused the error.
|
* @param cause the type of exception that caused the error.
|
||||||
|
* @param session the session on which the error occurred.
|
||||||
*/
|
*/
|
||||||
protected void fatalErrorDelayed(Object listener, Http2Exception cause) {
|
protected void fatalErrorDelayed(Object listener, Http2Exception cause, Session session) {
|
||||||
synchronized (fatalErrorTasks) {
|
synchronized (fatalErrorTasks) {
|
||||||
final int index = fatalErrorTasks.size();
|
final int index = fatalErrorTasks.size();
|
||||||
|
final int sessionId = session.hashCode();
|
||||||
fatalErrorTasks.put(index, bridgeHandler.getScheduler().schedule(() -> {
|
fatalErrorTasks.put(index, bridgeHandler.getScheduler().schedule(() -> {
|
||||||
fatalError(listener, cause);
|
fatalError(listener, cause, sessionId);
|
||||||
fatalErrorTasks.remove(index);
|
fatalErrorTasks.remove(index);
|
||||||
}, 1, TimeUnit.SECONDS));
|
}, 1, TimeUnit.SECONDS));
|
||||||
}
|
}
|
||||||
|
@ -676,14 +778,16 @@ public class Clip2Bridge implements Closeable {
|
||||||
* @throws InterruptedException
|
* @throws InterruptedException
|
||||||
*/
|
*/
|
||||||
public Resources getResources(ResourceReference reference) throws ApiException, InterruptedException {
|
public Resources getResources(ResourceReference reference) throws ApiException, InterruptedException {
|
||||||
if (onlineState == State.CLOSED) {
|
if (onlineState == State.CLOSED && !recreatingSession) {
|
||||||
throw new ApiException("getResources() offline");
|
throw new ApiException("Connection is closed");
|
||||||
}
|
}
|
||||||
return getResourcesImpl(reference);
|
return getResourcesImpl(reference);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Internal method to send an HTTP 2 GET request to the Hue Bridge and process its response.
|
* Internal method to send an HTTP 2 GET request to the Hue Bridge and process its response. Uses a Throttler to
|
||||||
|
* prevent too many concurrent calls, and to prevent too frequent calls on the Hue bridge server. Also uses a
|
||||||
|
* SessionSynchronizer to delay accessing the session while it is being recreated.
|
||||||
*
|
*
|
||||||
* @param reference the Reference class to get.
|
* @param reference the Reference class to get.
|
||||||
* @return a Resource object containing either a list of Resources or a list of Errors.
|
* @return a Resource object containing either a list of Resources or a list of Errors.
|
||||||
|
@ -693,11 +797,16 @@ public class Clip2Bridge implements Closeable {
|
||||||
*/
|
*/
|
||||||
private Resources getResourcesImpl(ResourceReference reference)
|
private Resources getResourcesImpl(ResourceReference reference)
|
||||||
throws HttpUnauthorizedException, ApiException, InterruptedException {
|
throws HttpUnauthorizedException, ApiException, InterruptedException {
|
||||||
Session session = http2Session;
|
// work around for issue #15468 (and similar)
|
||||||
if (Objects.isNull(session) || session.isClosed()) {
|
ResourceType resourceType = reference.getType();
|
||||||
throw new ApiException("HTTP 2 session is null or closed");
|
if (resourceType == ResourceType.ERROR) {
|
||||||
|
LOGGER.warn("Resource '{}' type '{}' unknown => GET aborted", reference.getId(), resourceType);
|
||||||
|
return new Resources();
|
||||||
}
|
}
|
||||||
try (Throttler throttler = new Throttler(1)) {
|
Stream stream = null;
|
||||||
|
try (Throttler throttler = new Throttler(1);
|
||||||
|
SessionSynchronizer sessionSynchronizer = new SessionSynchronizer(false)) {
|
||||||
|
Session session = getSession();
|
||||||
String url = getUrl(reference);
|
String url = getUrl(reference);
|
||||||
LOGGER.trace("GET {} HTTP/2", url);
|
LOGGER.trace("GET {} HTTP/2", url);
|
||||||
HeadersFrame headers = prepareHeaders(url, MediaType.APPLICATION_JSON);
|
HeadersFrame headers = prepareHeaders(url, MediaType.APPLICATION_JSON);
|
||||||
|
@ -705,11 +814,15 @@ public class Clip2Bridge implements Closeable {
|
||||||
ContentStreamListenerAdapter contentStreamListener = new ContentStreamListenerAdapter();
|
ContentStreamListenerAdapter contentStreamListener = new ContentStreamListenerAdapter();
|
||||||
session.newStream(headers, streamPromise, contentStreamListener);
|
session.newStream(headers, streamPromise, contentStreamListener);
|
||||||
// wait for stream to be opened
|
// wait for stream to be opened
|
||||||
Objects.requireNonNull(streamPromise.get(TIMEOUT_SECONDS, TimeUnit.SECONDS));
|
stream = Objects.requireNonNull(streamPromise.get(TIMEOUT_SECONDS, TimeUnit.SECONDS));
|
||||||
// wait for HTTP response contents
|
// wait for HTTP response contents
|
||||||
String contentJson = contentStreamListener.awaitResult();
|
String contentJson = contentStreamListener.awaitResult();
|
||||||
String contentType = contentStreamListener.getContentType();
|
String contentType = contentStreamListener.getContentType();
|
||||||
LOGGER.trace("HTTP/2 200 OK (Content-Type: {}) << {}", contentType, contentJson);
|
int status = contentStreamListener.getStatus();
|
||||||
|
LOGGER.trace("HTTP/2 {} (Content-Type: {}) << {}", status, contentType, contentJson);
|
||||||
|
if (status != HttpStatus.OK_200) {
|
||||||
|
throw new ApiException(String.format("Unexpected HTTP status '%d'", status));
|
||||||
|
}
|
||||||
if (!MediaType.APPLICATION_JSON.equals(contentType)) {
|
if (!MediaType.APPLICATION_JSON.equals(contentType)) {
|
||||||
throw new ApiException("Unexpected Content-Type: " + contentType);
|
throw new ApiException("Unexpected Content-Type: " + contentType);
|
||||||
}
|
}
|
||||||
|
@ -730,9 +843,25 @@ public class Clip2Bridge implements Closeable {
|
||||||
throw new ApiException("Error sending request", e);
|
throw new ApiException("Error sending request", e);
|
||||||
} catch (TimeoutException e) {
|
} catch (TimeoutException e) {
|
||||||
throw new ApiException("Error sending request", e);
|
throw new ApiException("Error sending request", e);
|
||||||
|
} finally {
|
||||||
|
closeStream(stream);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Safe access to the session object.
|
||||||
|
*
|
||||||
|
* @return the session.
|
||||||
|
* @throws ApiException if session is null or closed.
|
||||||
|
*/
|
||||||
|
private Session getSession() throws ApiException {
|
||||||
|
Session session = http2Session;
|
||||||
|
if (Objects.isNull(session) || session.isClosed()) {
|
||||||
|
throw new ApiException("HTTP/2 session is null or closed");
|
||||||
|
}
|
||||||
|
return session;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Build a full path to a server end point, based on a Reference class instance. If the reference contains only
|
* Build a full path to a server end point, based on a Reference class instance. If the reference contains only
|
||||||
* a resource type, the method returns the end point url to get all resources of the given resource type. Whereas if
|
* a resource type, the method returns the end point url to get all resources of the given resource type. Whereas if
|
||||||
|
@ -756,7 +885,7 @@ public class Clip2Bridge implements Closeable {
|
||||||
* @param data the incoming (presumed to be JSON) text.
|
* @param data the incoming (presumed to be JSON) text.
|
||||||
*/
|
*/
|
||||||
protected void onEventData(String data) {
|
protected void onEventData(String data) {
|
||||||
if (onlineState != State.ACTIVE) {
|
if (onlineState != State.ACTIVE && !recreatingSession) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (LOGGER.isTraceEnabled()) {
|
if (LOGGER.isTraceEnabled()) {
|
||||||
|
@ -842,18 +971,14 @@ public class Clip2Bridge implements Closeable {
|
||||||
* @throws InterruptedException
|
* @throws InterruptedException
|
||||||
*/
|
*/
|
||||||
private void openEventStream() throws ApiException, InterruptedException {
|
private void openEventStream() throws ApiException, InterruptedException {
|
||||||
Session session = http2Session;
|
Session session = getSession();
|
||||||
if (Objects.isNull(session) || session.isClosed()) {
|
|
||||||
throw new ApiException("HTTP 2 session is null or closed");
|
|
||||||
}
|
|
||||||
if (session.getStreams().stream().anyMatch(stream -> Objects.nonNull(stream.getAttribute(EVENT_STREAM_ID)))) {
|
if (session.getStreams().stream().anyMatch(stream -> Objects.nonNull(stream.getAttribute(EVENT_STREAM_ID)))) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
LOGGER.debug("openEventStream()");
|
|
||||||
HeadersFrame headers = prepareHeaders(eventUrl, MediaType.SERVER_SENT_EVENTS);
|
|
||||||
LOGGER.trace("GET {} HTTP/2", eventUrl);
|
LOGGER.trace("GET {} HTTP/2", eventUrl);
|
||||||
Stream stream = null;
|
Stream stream = null;
|
||||||
try {
|
try {
|
||||||
|
HeadersFrame headers = prepareHeaders(eventUrl, MediaType.SERVER_SENT_EVENTS);
|
||||||
Completable<@Nullable Stream> streamPromise = new Completable<>();
|
Completable<@Nullable Stream> streamPromise = new Completable<>();
|
||||||
EventStreamListenerAdapter eventStreamListener = new EventStreamListenerAdapter();
|
EventStreamListenerAdapter eventStreamListener = new EventStreamListenerAdapter();
|
||||||
session.newStream(headers, streamPromise, eventStreamListener);
|
session.newStream(headers, streamPromise, eventStreamListener);
|
||||||
|
@ -863,9 +988,10 @@ public class Clip2Bridge implements Closeable {
|
||||||
stream.setAttribute(EVENT_STREAM_ID, session);
|
stream.setAttribute(EVENT_STREAM_ID, session);
|
||||||
// wait for "hi" from the bridge
|
// wait for "hi" from the bridge
|
||||||
eventStreamListener.awaitResult();
|
eventStreamListener.awaitResult();
|
||||||
|
LOGGER.debug("openEventStream() sessionId:{} streamId:{}", session.hashCode(), stream.getId());
|
||||||
} catch (ExecutionException | TimeoutException e) {
|
} catch (ExecutionException | TimeoutException e) {
|
||||||
if (Objects.nonNull(stream)) {
|
if (Objects.nonNull(stream) && !stream.isReset()) {
|
||||||
stream.reset(new ResetFrame(stream.getId(), 0), Callback.NOOP);
|
stream.reset(new ResetFrame(stream.getId(), ErrorCode.HTTP_CONNECT_ERROR.code), Callback.NOOP);
|
||||||
}
|
}
|
||||||
throw new ApiException("Error opening event stream", e);
|
throw new ApiException("Error opening event stream", e);
|
||||||
}
|
}
|
||||||
|
@ -898,17 +1024,18 @@ public class Clip2Bridge implements Closeable {
|
||||||
if (Objects.nonNull(session) && !session.isClosed()) {
|
if (Objects.nonNull(session) && !session.isClosed()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
LOGGER.debug("openSession()");
|
|
||||||
InetSocketAddress address = new InetSocketAddress(hostName, 443);
|
|
||||||
try {
|
try {
|
||||||
|
InetSocketAddress address = new InetSocketAddress(hostName, 443);
|
||||||
SessionListenerAdapter sessionListener = new SessionListenerAdapter();
|
SessionListenerAdapter sessionListener = new SessionListenerAdapter();
|
||||||
Completable<@Nullable Session> sessionPromise = new Completable<>();
|
Completable<@Nullable Session> sessionPromise = new Completable<>();
|
||||||
http2Client.connect(http2Client.getBean(SslContextFactory.class), address, sessionListener, sessionPromise);
|
http2Client.connect(http2Client.getBean(SslContextFactory.class), address, sessionListener, sessionPromise);
|
||||||
// wait for the (SSL) session to be opened
|
// wait for the (SSL) session to be opened
|
||||||
http2Session = Objects.requireNonNull(sessionPromise.get(TIMEOUT_SECONDS, TimeUnit.SECONDS));
|
session = Objects.requireNonNull(sessionPromise.get(TIMEOUT_SECONDS, TimeUnit.SECONDS));
|
||||||
|
LOGGER.debug("openSession() sessionId:{}", session.hashCode());
|
||||||
|
http2Session = session;
|
||||||
checkAliveOk(); // initialise the session timeout window
|
checkAliveOk(); // initialise the session timeout window
|
||||||
} catch (ExecutionException | TimeoutException e) {
|
} catch (ExecutionException | TimeoutException e) {
|
||||||
throw new ApiException("Error opening HTTP 2 session", e);
|
throw new ApiException("Error opening HTTP/2 session", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -949,23 +1076,19 @@ public class Clip2Bridge implements Closeable {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Use an HTTP/2 PUT command to send a resource to the server. Note: the Hue Bridge can get confused by parallel
|
* Use an HTTP/2 PUT command to send a resource to the server. Uses a Throttler to prevent too many concurrent
|
||||||
* overlapping PUT resp. GET commands which cause it to respond with an HTML error page. So this method acquires all
|
* calls, and to prevent too frequent calls on the Hue bridge server. Also uses a SessionSynchronizer to delay
|
||||||
* of the stream access permits (given by MAX_CONCURRENT_STREAMS) in order to prevent such overlaps.
|
* accessing the session while it is being recreated.
|
||||||
*
|
*
|
||||||
* @param resource the resource to put.
|
* @param resource the resource to put.
|
||||||
* @throws ApiException if something fails.
|
* @throws ApiException if something fails.
|
||||||
* @throws InterruptedException
|
* @throws InterruptedException
|
||||||
*/
|
*/
|
||||||
public void putResource(Resource resource) throws ApiException, InterruptedException {
|
public void putResource(Resource resource) throws ApiException, InterruptedException {
|
||||||
if (onlineState == State.CLOSED) {
|
Stream stream = null;
|
||||||
return;
|
try (Throttler throttler = new Throttler(MAX_CONCURRENT_STREAMS);
|
||||||
}
|
SessionSynchronizer sessionSynchronizer = new SessionSynchronizer(false)) {
|
||||||
Session session = http2Session;
|
Session session = getSession();
|
||||||
if (Objects.isNull(session) || session.isClosed()) {
|
|
||||||
throw new ApiException("HTTP 2 session is null or closed");
|
|
||||||
}
|
|
||||||
try (Throttler throttler = new Throttler(MAX_CONCURRENT_STREAMS)) {
|
|
||||||
String requestJson = jsonParser.toJson(resource);
|
String requestJson = jsonParser.toJson(resource);
|
||||||
ByteBuffer requestBytes = ByteBuffer.wrap(requestJson.getBytes(StandardCharsets.UTF_8));
|
ByteBuffer requestBytes = ByteBuffer.wrap(requestJson.getBytes(StandardCharsets.UTF_8));
|
||||||
String url = getUrl(new ResourceReference().setId(resource.getId()).setType(resource.getType()));
|
String url = getUrl(new ResourceReference().setId(resource.getId()).setType(resource.getType()));
|
||||||
|
@ -976,12 +1099,16 @@ public class Clip2Bridge implements Closeable {
|
||||||
ContentStreamListenerAdapter contentStreamListener = new ContentStreamListenerAdapter();
|
ContentStreamListenerAdapter contentStreamListener = new ContentStreamListenerAdapter();
|
||||||
session.newStream(headers, streamPromise, contentStreamListener);
|
session.newStream(headers, streamPromise, contentStreamListener);
|
||||||
// wait for stream to be opened
|
// wait for stream to be opened
|
||||||
Stream stream = Objects.requireNonNull(streamPromise.get(TIMEOUT_SECONDS, TimeUnit.SECONDS));
|
stream = Objects.requireNonNull(streamPromise.get(TIMEOUT_SECONDS, TimeUnit.SECONDS));
|
||||||
stream.data(new DataFrame(stream.getId(), requestBytes, true), Callback.NOOP);
|
stream.data(new DataFrame(stream.getId(), requestBytes, true), Callback.NOOP);
|
||||||
// wait for HTTP response
|
// wait for HTTP response
|
||||||
String contentJson = contentStreamListener.awaitResult();
|
String contentJson = contentStreamListener.awaitResult();
|
||||||
String contentType = contentStreamListener.getContentType();
|
String contentType = contentStreamListener.getContentType();
|
||||||
LOGGER.trace("HTTP/2 200 OK (Content-Type: {}) << {}", contentType, contentJson);
|
int status = contentStreamListener.getStatus();
|
||||||
|
LOGGER.trace("HTTP/2 {} (Content-Type: {}) << {}", status, contentType, contentJson);
|
||||||
|
if (status != HttpStatus.OK_200) {
|
||||||
|
throw new ApiException(String.format("Unexpected HTTP status '%d'", status));
|
||||||
|
}
|
||||||
if (!MediaType.APPLICATION_JSON.equals(contentType)) {
|
if (!MediaType.APPLICATION_JSON.equals(contentType)) {
|
||||||
throw new ApiException("Unexpected Content-Type: " + contentType);
|
throw new ApiException("Unexpected Content-Type: " + contentType);
|
||||||
}
|
}
|
||||||
|
@ -995,7 +1122,41 @@ public class Clip2Bridge implements Closeable {
|
||||||
throw new ApiException("Parsing error", e);
|
throw new ApiException("Parsing error", e);
|
||||||
}
|
}
|
||||||
} catch (ExecutionException | TimeoutException e) {
|
} catch (ExecutionException | TimeoutException e) {
|
||||||
throw new ApiException("putResource() error sending request", e);
|
throw new ApiException("Error sending PUT request", e);
|
||||||
|
} finally {
|
||||||
|
closeStream(stream);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Close and re-open the session. Called when the server sends a GO_AWAY message. Acquires a SessionSynchronizer
|
||||||
|
* 'write' lock to ensure single thread access while the new session is being created. Therefore it waits for any
|
||||||
|
* already running GET/PUT method calls, which have a 'read' lock, to complete. And also causes any new GET/PUT
|
||||||
|
* method calls to wait until this method releases the 'write' lock again. Whereby such GET/PUT calls are postponed
|
||||||
|
* to the new session.
|
||||||
|
*/
|
||||||
|
private synchronized void recreateSession() {
|
||||||
|
try (SessionSynchronizer sessionSynchronizer = new SessionSynchronizer(true)) {
|
||||||
|
LOGGER.debug("recreateSession()");
|
||||||
|
recreatingSession = true;
|
||||||
|
State onlineState = this.onlineState;
|
||||||
|
close2();
|
||||||
|
stopHttp2Client();
|
||||||
|
//
|
||||||
|
startHttp2Client();
|
||||||
|
openPassive();
|
||||||
|
if (onlineState == State.ACTIVE) {
|
||||||
|
openActive();
|
||||||
|
}
|
||||||
|
} catch (ApiException | InterruptedException e) {
|
||||||
|
if (LOGGER.isDebugEnabled()) {
|
||||||
|
LOGGER.debug("recreateSession() exception", e);
|
||||||
|
} else {
|
||||||
|
LOGGER.warn("recreateSession() {}: {}", e.getClass().getSimpleName(), e.getMessage());
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
recreatingSession = false;
|
||||||
|
LOGGER.debug("recreateSession() done");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1029,7 +1190,7 @@ public class Clip2Bridge implements Closeable {
|
||||||
json = contentResponse.getContentAsString().trim();
|
json = contentResponse.getContentAsString().trim();
|
||||||
LOGGER.trace("HTTP/1.1 {} {} << {}", httpStatus, contentResponse.getReason(), json);
|
LOGGER.trace("HTTP/1.1 {} {} << {}", httpStatus, contentResponse.getReason(), json);
|
||||||
if (httpStatus != HttpStatus.OK_200) {
|
if (httpStatus != HttpStatus.OK_200) {
|
||||||
throw new ApiException("HTTP bad response");
|
throw new ApiException(String.format("HTTP bad response '%d'", httpStatus));
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
List<SuccessResponse> entries = jsonParser.fromJson(json, SuccessResponse.GSON_TYPE);
|
List<SuccessResponse> entries = jsonParser.fromJson(json, SuccessResponse.GSON_TYPE);
|
||||||
|
@ -1049,6 +1210,22 @@ public class Clip2Bridge implements Closeable {
|
||||||
throw new HttpUnauthorizedException("Application key registration failed");
|
throw new HttpUnauthorizedException("Application key registration failed");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void startHttp2Client() throws ApiException {
|
||||||
|
try {
|
||||||
|
http2Client.start();
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new ApiException("Error starting HTTP/2 client", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void stopHttp2Client() throws ApiException {
|
||||||
|
try {
|
||||||
|
http2Client.stop();
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new ApiException("Error stopping HTTP/2 client", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test the Hue Bridge connection state by attempting to connect and trying to execute a basic command that requires
|
* Test the Hue Bridge connection state by attempting to connect and trying to execute a basic command that requires
|
||||||
* authentication.
|
* authentication.
|
||||||
|
|
|
@ -66,12 +66,20 @@ public enum ResourceType {
|
||||||
// fall through
|
// fall through
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return ERROR;
|
return ERROR.setUnknownTypeId(value);
|
||||||
|
}
|
||||||
|
|
||||||
|
private @Nullable String unknownTypeId;
|
||||||
|
|
||||||
|
private ResourceType setUnknownTypeId(@Nullable String value) {
|
||||||
|
unknownTypeId = value;
|
||||||
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
String s = this.name().replace("_", " ");
|
String s = this.name().replace("_", " ");
|
||||||
return s.substring(0, 1).toUpperCase() + s.substring(1).toLowerCase();
|
s = s.substring(0, 1).toUpperCase() + s.substring(1).toLowerCase();
|
||||||
|
return unknownTypeId == null ? s : s + String.format(" (%s)", unknownTypeId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue