[bluetooth] Add some utility classes (#9064)

* Add some utility classes that will be used by other bluetooth bindings.
* Add handle field to BluetoothDescriptor

Signed-off-by: Connor Petty <mistercpp2000+gitsignoff@gmail.com>
This commit is contained in:
Connor Petty 2020-11-23 02:34:39 -08:00 committed by GitHub
parent 38876647ad
commit 0b163f655c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 612 additions and 78 deletions

View File

@ -396,7 +396,7 @@ public class BlueZBluetoothDevice extends BaseBluetoothDevice implements BlueZEv
for (BluetoothGattDescriptor dBusBlueZDescriptor : dBusBlueZCharacteristic.getGattDescriptors()) {
BluetoothDescriptor descriptor = new BluetoothDescriptor(characteristic,
UUID.fromString(dBusBlueZDescriptor.getUuid()));
UUID.fromString(dBusBlueZDescriptor.getUuid()), 0);
characteristic.addDescriptor(descriptor);
}
service.addCharacteristic(characteristic);

View File

@ -12,9 +12,7 @@
*/
package org.openhab.binding.bluetooth.bluez.internal;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@ -22,6 +20,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.freedesktop.dbus.exceptions.DBusException;
import org.openhab.binding.bluetooth.util.RetryException;
import org.openhab.binding.bluetooth.util.RetryFuture;
import org.openhab.core.common.ThreadPoolManager;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
@ -71,7 +71,7 @@ public class DeviceManagerFactory {
public void initialize() {
logger.debug("initializing DeviceManagerFactory");
var stage1 = this.deviceManagerFuture = callAsync(() -> {
var stage1 = this.deviceManagerFuture = RetryFuture.callWithRetry(() -> {
try {
// if this is the first call to the library, this call
// should throw an exception (that we are catching)
@ -83,12 +83,10 @@ public class DeviceManagerFactory {
}
}, scheduler);
stage1.thenCompose(devManager -> {
this.deviceManagerWrapperFuture = stage1.thenCompose(devManager -> {
// lambdas can't modify outside variables due to scoping, so instead we use an AtomicInteger.
AtomicInteger tryCount = new AtomicInteger();
// We need to set deviceManagerWrapperFuture here since we want to be able to cancel the underlying
// AsyncCompletableFuture instance
return this.deviceManagerWrapperFuture = callAsync(() -> {
return RetryFuture.callWithRetry(() -> {
int count = tryCount.incrementAndGet();
try {
logger.debug("Registering property handler attempt: {}", count);
@ -127,60 +125,4 @@ public class DeviceManagerFactory {
}
this.deviceManagerWrapperFuture = null;
}
private static <T> CompletableFuture<T> callAsync(Callable<T> callable, ScheduledExecutorService scheduler) {
return new AsyncCompletableFuture<>(callable, scheduler);
}
// this is a utility class that allows use of Callable with CompletableFutures in a way such that the
// async future is cancellable thru this CompletableFuture instance.
private static class AsyncCompletableFuture<T> extends CompletableFuture<T> implements Runnable {
private final Callable<T> callable;
private final ScheduledExecutorService scheduler;
private final Object futureLock = new Object();
private Future<?> future;
public AsyncCompletableFuture(Callable<T> callable, ScheduledExecutorService scheduler) {
this.callable = callable;
this.scheduler = scheduler;
future = scheduler.submit(this);
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
synchronized (futureLock) {
future.cancel(mayInterruptIfRunning);
}
return super.cancel(mayInterruptIfRunning);
}
@Override
public void run() {
try {
complete(callable.call());
} catch (RetryException e) {
synchronized (futureLock) {
if (!future.isCancelled()) {
future = scheduler.schedule(this, e.delay, e.unit);
}
}
} catch (Exception e) {
completeExceptionally(e);
}
}
}
// this is a special exception to indicate to a AsyncCompletableFuture that the task needs to be retried.
private static class RetryException extends Exception {
private static final long serialVersionUID = 8512275408512109328L;
private long delay;
private TimeUnit unit;
public RetryException(long delay, TimeUnit unit) {
this.delay = delay;
this.unit = unit;
}
}
}

View File

@ -46,15 +46,21 @@ public class BluetoothBindingConstants {
public static final long BLUETOOTH_BASE_UUID = 0x800000805f9b34fbL;
// Bluetooth profile UUID definitions
public static final UUID PROFILE_GATT = UUID.fromString("00001801-0000-1000-8000-00805f9b34fb");
public static final UUID PROFILE_A2DP_SOURCE = UUID.fromString("0000110a-0000-1000-8000-00805f9b34fb");
public static final UUID PROFILE_A2DP_SINK = UUID.fromString("0000110b-0000-1000-8000-00805f9b34fb");
public static final UUID PROFILE_A2DP = UUID.fromString("0000110d-0000-1000-8000-00805f9b34fb");
public static final UUID PROFILE_AVRCP_REMOTE = UUID.fromString("0000110c-0000-1000-8000-00805f9b34fb");
public static final UUID PROFILE_CORDLESS_TELEPHONE = UUID.fromString("00001109-0000-1000-8000-00805f9b34fb");
public static final UUID PROFILE_DID_PNPINFO = UUID.fromString("00001200-0000-1000-8000-00805f9b34fb");
public static final UUID PROFILE_HEADSET = UUID.fromString("00001108-0000-1000-8000-00805f9b34fb");
public static final UUID PROFILE_HFP = UUID.fromString("0000111e-0000-1000-8000-00805f9b34fb");
public static final UUID PROFILE_HFP_AUDIOGATEWAY = UUID.fromString("0000111f-0000-1000-8000-00805f9b34fb");
public static UUID createBluetoothUUID(long uuid16) {
return new UUID((uuid16 << 32) | 0x1000, BluetoothBindingConstants.BLUETOOTH_BASE_UUID);
}
// Bluetooth profile UUID definitions
public static final UUID PROFILE_GATT = createBluetoothUUID(0x1801);
public static final UUID PROFILE_A2DP_SOURCE = createBluetoothUUID(0x110a);
public static final UUID PROFILE_A2DP_SINK = createBluetoothUUID(0x110b);
public static final UUID PROFILE_A2DP = createBluetoothUUID(0x110d);
public static final UUID PROFILE_AVRCP_REMOTE = createBluetoothUUID(0x110c);
public static final UUID PROFILE_CORDLESS_TELEPHONE = createBluetoothUUID(0x1109);
public static final UUID PROFILE_DID_PNPINFO = createBluetoothUUID(0x1200);
public static final UUID PROFILE_HEADSET = createBluetoothUUID(0x1108);
public static final UUID PROFILE_HFP = createBluetoothUUID(0x111e);
public static final UUID PROFILE_HFP_AUDIOGATEWAY = createBluetoothUUID(0x111f);
public static final UUID ATTR_CHARACTERISTIC_DECLARATION = createBluetoothUUID(0x2803);
}

View File

@ -672,7 +672,7 @@ public class BluetoothCharacteristic {
private UUID uuid;
private GattCharacteristic(long key) {
this.uuid = new UUID((key << 32) | 0x1000, BluetoothBindingConstants.BLUETOOTH_BASE_UUID);
this.uuid = BluetoothBindingConstants.createBluetoothUUID(key);
}
private static void initMapping() {

View File

@ -30,6 +30,7 @@ public class BluetoothDescriptor {
protected final BluetoothCharacteristic characteristic;
protected final UUID uuid;
protected final int handle;
protected byte[] value;
/**
@ -38,9 +39,10 @@ public class BluetoothDescriptor {
* @param characteristic the characteristic that this class describes
* @param uuid the uuid of the descriptor
*/
public BluetoothDescriptor(BluetoothCharacteristic characteristic, UUID uuid) {
public BluetoothDescriptor(BluetoothCharacteristic characteristic, UUID uuid, int handle) {
this.characteristic = characteristic;
this.uuid = uuid;
this.handle = handle;
}
/**
@ -70,6 +72,15 @@ public class BluetoothDescriptor {
return uuid;
}
/**
* Returns the handle for this descriptor
*
* @return the handle for the descriptor
*/
public int getHandle() {
return handle;
}
/**
* Returns the stored value for this descriptor. It doesn't read remote data.
*
@ -111,7 +122,7 @@ public class BluetoothDescriptor {
private final UUID uuid;
private GattDescriptor(long key) {
this.uuid = new UUID((key << 32) | 0x1000, BluetoothBindingConstants.BLUETOOTH_BASE_UUID);
this.uuid = BluetoothBindingConstants.createBluetoothUUID(key);
}
private static void initMapping() {

View File

@ -246,7 +246,7 @@ public class BluetoothService {
private UUID uuid;
private GattService(long key) {
this.uuid = new UUID((key << 32) | 0x1000, BluetoothBindingConstants.BLUETOOTH_BASE_UUID);
this.uuid = BluetoothBindingConstants.createBluetoothUUID(key);
}
private static void initMapping() {

View File

@ -0,0 +1,209 @@
/**
* Copyright (c) 2010-2020 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.binding.bluetooth.util;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.function.Function;
import java.util.function.Supplier;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
/**
* The {@code HeritableFuture} class extends {@link CompletableFuture} and adds the ability
* to cancel upstream CompletableFuture tasks. Normally when a CompletableFuture
* is cancelled only dependent futures cancel. This class will also cancel the parent
* HeritableFuture instances as well. All of the {@code CompletionStage} methods will
* return HeritableFuture children and thus by only maintaining a reference to the final future
* in the task chain it would be possible to cancel the entire chain by calling {@code cancel}.
* <p>
* Due to child futures now having a link to their parent futures, it is no longer possible
* for HeritableFuture to be garbage collected as upstream futures complete. It is highly
* advisable to only use a HeritableFuture for defining finite (preferably small) task trees. Do not use
* HeritableFuture in situations where you would endlessly append new tasks otherwise you will eventually
* cause an OutOfMemoryError.
*
* @author Connor Petty - Initial contribution
*
*/
@NonNullByDefault
public class HeritableFuture<T> extends CompletableFuture<T> {
protected final Object futureLock = new Object();
protected @Nullable Future<?> parentFuture;
public HeritableFuture() {
}
public HeritableFuture(Future<?> parent) {
this.parentFuture = parent;
}
/**
*
* {@inheritDoc}
*
* @implSpec
* This implementation returns a new HeritableFuture instance that uses
* the current instance as a parent. Cancellation of the child will result in
* cancellation of the parent.
*/
@Override
public <U> CompletableFuture<U> newIncompleteFuture() {
return new HeritableFuture<>(this);
}
protected void setParentFuture(Supplier<@Nullable Future<?>> futureSupplier) {
synchronized (futureLock) {
var future = futureSupplier.get();
if (future != this) {
if (isCancelled() && future != null) {
future.cancel(true);
} else {
parentFuture = future;
}
}
}
}
/**
*
* {@inheritDoc}
*
* @implSpec
* This implementation cancels this future first, then cancels the parent future.
*/
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (completeExceptionally(new CancellationException())) {
synchronized (futureLock) {
var future = parentFuture;
parentFuture = null;
if (future != null) {
future.cancel(mayInterruptIfRunning);
}
}
return true;
}
return isCancelled();
}
/**
*
* {@inheritDoc}
*
* @implSpec
* This implementation will treat the future returned by the function as a parent future.
*/
@Override
@NonNullByDefault({}) // the generics here don't play well with the null checker
public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn) {
return new ComposeFunctionWrapper<>(fn, false, null).returnedFuture;
}
/**
*
* {@inheritDoc}
*
* @implSpec
* This implementation will treat the future returned by the function as a parent future.
*/
@Override
@NonNullByDefault({}) // the generics here don't play well with the null checker
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) {
return new ComposeFunctionWrapper<>(fn, true, null).returnedFuture;
}
/**
*
* {@inheritDoc}
*
* @implSpec
* This implementation will treat the future returned by the function as a parent future.
*/
@Override
@NonNullByDefault({}) // the generics here don't play well with the null checker
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn,
Executor executor) {
return new ComposeFunctionWrapper<>(fn, true, executor).returnedFuture;
}
/**
* This class is responsible for wrapping the supplied compose function.
* The instant the function returns the next CompletionStage, the parentFuture of the downstream HeritableFuture
* will be reassigned to the completion stage. This way cancellations of
* downstream futures will be able to reach the future returned by the supplied function.
*
* Most of the complexity going on in this class is due to the fact that the apply function might be
* called while calling `super.thenCompose`. This would happen if the current future is already complete
* since the next stage would be started immediately either on the current thread or asynchronously.
*
* @param <U> the type to be returned by the composed future
*/
private class ComposeFunctionWrapper<U> implements Function<T, CompletionStage<U>> {
private final Object fieldsLock = new Object();
private final Function<? super T, ? extends CompletionStage<U>> fn;
private @Nullable HeritableFuture<U> composedFuture;
private @Nullable CompletionStage<U> innerStage;
// The final composed future to be used by users of this wrapper class
final HeritableFuture<U> returnedFuture;
public ComposeFunctionWrapper(Function<? super T, ? extends CompletionStage<U>> fn, boolean async,
@Nullable Executor executor) {
this.fn = fn;
var f = (HeritableFuture<U>) thenCompose(async, executor);
synchronized (fieldsLock) {
this.composedFuture = f;
var stage = innerStage;
if (stage != null) {
// getting here means that the `apply` function was run before `composedFuture` was initialized.
f.setParentFuture(stage::toCompletableFuture);
}
}
this.returnedFuture = f;
}
private CompletableFuture<U> thenCompose(boolean async, @Nullable Executor executor) {
if (!async) {
return HeritableFuture.super.thenCompose(this);
}
if (executor == null) {
return HeritableFuture.super.thenComposeAsync(this);
}
return HeritableFuture.super.thenComposeAsync(this, executor);
}
@Override
public CompletionStage<U> apply(T t) {
CompletionStage<U> stage = fn.apply(t);
synchronized (fieldsLock) {
var f = composedFuture;
if (f == null) {
// We got here before the wrapper finished initializing, so that
// means that the enclosing future was already complete at the time `super.thenCompose` was called.
// In which case the best we can do is save this stage so that the constructor can finish the job.
innerStage = stage;
} else {
f.setParentFuture(stage::toCompletableFuture);
}
}
return stage;
}
}
}

View File

@ -0,0 +1,37 @@
/**
* Copyright (c) 2010-2020 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.binding.bluetooth.util;
import java.util.concurrent.TimeUnit;
import org.eclipse.jdt.annotation.NonNullByDefault;
/**
* This is a special exception that can be thrown by Callable instances
* used by a RetryFuture.
*
* @author Connor Petty - Initial contribution
*
*/
@NonNullByDefault
public class RetryException extends Exception {
private static final long serialVersionUID = 8512275408512109328L;
final long delay;
final TimeUnit unit;
public RetryException(long delay, TimeUnit unit) {
this.delay = delay;
this.unit = unit;
}
}

View File

@ -0,0 +1,161 @@
/**
* Copyright (c) 2010-2020 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.binding.bluetooth.util;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.eclipse.jdt.annotation.NonNullByDefault;
/**
* This is a utility class that allows adding {@link CompletableFuture} capabilities to a {@link Callable}.
* The provided callable will be executed asynchronously and the result will be used
* to complete the {@code RetryFuture} instance. As per its namesake, the RetryFuture allows
* the callable to reschedule itself by throwing a {@link RetryException}. Any other exception
* will simply complete the RetryFuture exceptionally as per {@link CompletableFuture#completeExceptionally(Throwable)}.
*
* @author Connor Petty - Initial contribution
*
*/
@NonNullByDefault
public class RetryFuture<T> extends HeritableFuture<T> {
private final ScheduledExecutorService scheduler;
public RetryFuture(Callable<T> callable, ScheduledExecutorService scheduler, long delay, TimeUnit unit) {
this.scheduler = scheduler;
setParentFuture(() -> scheduler.schedule(new CallableTask(callable), delay, unit));
}
public RetryFuture(Supplier<CompletableFuture<T>> supplier, ScheduledExecutorService scheduler, long delay,
TimeUnit unit) {
this.scheduler = scheduler;
setParentFuture(() -> scheduler.schedule(new ComposeTask(supplier), delay, unit));
}
@Override
public Executor defaultExecutor() {
return scheduler;
}
private class CallableTask implements Runnable {
private final Callable<T> callable;
public CallableTask(Callable<T> callable) {
this.callable = callable;
}
@Override
public void run() {
try {
complete(callable.call());
} catch (RetryException e) {
setParentFuture(() -> {
if (!isDone()) {
return scheduler.schedule(this, e.delay, e.unit);
}
return null;
});
} catch (Exception e) {
completeExceptionally(e);
}
}
}
private class ComposeTask implements Runnable {
private final Supplier<CompletableFuture<T>> supplier;
public ComposeTask(Supplier<CompletableFuture<T>> supplier) {
this.supplier = supplier;
}
@Override
public void run() {
CompletableFuture<T> future = supplier.get();
setParentFuture(() -> future);
future.whenComplete((result, th) -> {
if (th instanceof CompletionException) {
th = th.getCause();
}
if (th instanceof RetryException) {
RetryException e = (RetryException) th;
setParentFuture(() -> {
if (!isDone()) {
return scheduler.schedule(this, e.delay, e.unit);
}
return null;
});
} else if (th != null) {
completeExceptionally(th);
} else {
complete(result);
}
});
}
}
/**
* This is a convinience method for calling {@code new RetryFuture<>(callable, scheduler)}
*
* @param <T> the result type of the callable task.
* @param callable the task to execute
* @param scheduler the scheduler to use
* @return a CompletableFuture that will return the result of the callable.
*/
public static <T> CompletableFuture<T> callWithRetry(Callable<T> callable, ScheduledExecutorService scheduler) {
return new RetryFuture<>(callable, scheduler, 0, TimeUnit.NANOSECONDS);
}
public static <T> CompletableFuture<T> scheduleWithRetry(Callable<T> callable, ScheduledExecutorService scheduler,
long delay, TimeUnit unit) {
return new RetryFuture<>(callable, scheduler, delay, unit);
}
@SafeVarargs
public static <T> CompletableFuture<T> scheduleWithRetryForExceptions(Callable<T> callable,
ScheduledExecutorService scheduler, long initDelay, long retryDelay, TimeUnit unit,
Class<? extends Exception>... exceptions) {
Callable<T> task = () -> {
try {
return callable.call();
} catch (RetryException ex) {
throw ex;
} catch (Exception ex) {
for (Class<? extends Exception> exClass : exceptions) {
if (exClass.isInstance(ex)) {
throw new RetryException(retryDelay, unit);
}
}
throw ex;
}
};
return new RetryFuture<>(task, scheduler, initDelay, unit);
}
public static <T> CompletableFuture<T> composeWithRetry(Supplier<CompletableFuture<T>> supplier,
ScheduledExecutorService scheduler) {
return new RetryFuture<>(supplier, scheduler, 0, TimeUnit.NANOSECONDS);
}
public static <T> CompletableFuture<T> composeWithRetry(Supplier<CompletableFuture<T>> supplier,
ScheduledExecutorService scheduler, long initDelay, TimeUnit unit) {
return new RetryFuture<>(supplier, scheduler, initDelay, unit);
}
}

View File

@ -0,0 +1,168 @@
/**
* Copyright (c) 2010-2020 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.binding.bluetooth.util;
import static org.junit.jupiter.api.Assertions.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.openhab.core.common.NamedThreadFactory;
/**
* @author Connor Petty - Initial contribution
*
*/
class RetryFutureTest {
private ScheduledExecutorService scheduler;
@BeforeEach
public void init() {
ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1,
new NamedThreadFactory("RetryFutureTest", true));
scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
scheduler.setRemoveOnCancelPolicy(true);
this.scheduler = scheduler;
}
@AfterEach
public void cleanup() {
scheduler.shutdownNow();
}
@Test
void callWithRetryNormal() throws InterruptedException {
Future<String> retryFuture = RetryFuture.callWithRetry(() -> "test", scheduler);
try {
assertEquals("test", retryFuture.get(100, TimeUnit.MILLISECONDS));
} catch (InterruptedException | ExecutionException | TimeoutException e) {
fail(e);
}
}
@Test
void callWithRetry1() throws InterruptedException {
AtomicInteger visitCount = new AtomicInteger();
Future<String> retryFuture = RetryFuture.callWithRetry(() -> {
if (visitCount.getAndIncrement() == 0) {
throw new RetryException(0, TimeUnit.SECONDS);
}
return "test";
}, scheduler);
try {
assertEquals("test", retryFuture.get(100, TimeUnit.MILLISECONDS));
} catch (InterruptedException | ExecutionException | TimeoutException e) {
fail(e);
}
}
@Test
void composeWithRetryNormal() throws InterruptedException {
CompletableFuture<?> composedFuture = new CompletableFuture<>();
Future<?> retryFuture = RetryFuture.composeWithRetry(() -> {
composedFuture.complete(null);
return composedFuture;
}, scheduler);
try {
retryFuture.get(100, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
fail(e);
}
assertTrue(composedFuture.isDone());
}
@Test
void composeWithRetryThrow() throws InterruptedException {
CompletableFuture<?> composedFuture = new CompletableFuture<>();
Future<?> retryFuture = RetryFuture.composeWithRetry(() -> {
composedFuture.completeExceptionally(new DummyException());
return composedFuture;
}, scheduler);
try {
retryFuture.get(100, TimeUnit.MILLISECONDS);
} catch (InterruptedException | TimeoutException e) {
fail(e);
} catch (ExecutionException ex) {
assertTrue(ex.getCause() instanceof DummyException);
}
assertTrue(composedFuture.isDone());
}
@Test
void composeWithRetry1() throws InterruptedException {
AtomicInteger visitCount = new AtomicInteger();
CompletableFuture<String> composedFuture = new CompletableFuture<>();
Future<String> retryFuture = RetryFuture.composeWithRetry(() -> {
if (visitCount.getAndIncrement() == 0) {
return CompletableFuture.failedFuture(new RetryException(0, TimeUnit.SECONDS));
}
composedFuture.complete("test");
return composedFuture;
}, scheduler);
try {
assertEquals("test", retryFuture.get(100, TimeUnit.MILLISECONDS));
} catch (InterruptedException | ExecutionException | TimeoutException e) {
fail(e);
}
assertEquals(2, visitCount.get());
assertTrue(composedFuture.isDone());
}
@Test
void composeWithRetry1Cancel() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
AtomicInteger visitCount = new AtomicInteger();
CompletableFuture<String> composedFuture = new CompletableFuture<>();
Future<String> retryFuture = RetryFuture.composeWithRetry(() -> {
if (visitCount.getAndIncrement() == 0) {
return CompletableFuture.failedFuture(new RetryException(0, TimeUnit.SECONDS));
}
latch.countDown();
return composedFuture;
}, scheduler);
try {
if (!latch.await(100, TimeUnit.MILLISECONDS)) {
fail("Timeout while waiting for latch");
}
Thread.sleep(1);
retryFuture.cancel(false);
assertTrue(composedFuture.isCancelled());
} catch (InterruptedException e) {
fail(e);
}
assertEquals(2, visitCount.get());
assertTrue(composedFuture.isDone());
}
private static class DummyException extends Exception {
}
}