[mqtt] Avoid parallel streams with common thread pool to avoid deadlocks (#13621)
To mitigate issue https://github.com/openhab/openhab-core/issues/3125 (common thread pool exhaustion when combining parallel streams with synchronization or locks) Signed-off-by: Sami Salonen <ssalonen@gmail.com>
This commit is contained in:
parent
be6bfb0f9b
commit
923c0f1db2
|
@ -105,8 +105,11 @@ public abstract class AbstractMQTTThingHandler extends BaseThingHandler
|
||||||
* @return A future that completes normal on success and exceptionally on any errors.
|
* @return A future that completes normal on success and exceptionally on any errors.
|
||||||
*/
|
*/
|
||||||
protected CompletableFuture<@Nullable Void> start(MqttBrokerConnection connection) {
|
protected CompletableFuture<@Nullable Void> start(MqttBrokerConnection connection) {
|
||||||
return availabilityStates.values().parallelStream().map(cChannel -> cChannel.start(connection, scheduler, 0))
|
return availabilityStates.values().stream().map(cChannel -> {
|
||||||
.collect(FutureCollector.allOf());
|
final CompletableFuture<@Nullable Void> fut = cChannel == null ? CompletableFuture.completedFuture(null)
|
||||||
|
: cChannel.start(connection, scheduler, 0);
|
||||||
|
return fut;
|
||||||
|
}).collect(FutureCollector.allOf());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -149,7 +149,7 @@ public class DiscoverComponents implements MqttMessageSubscriber {
|
||||||
this.connectionRef = new WeakReference<>(connection);
|
this.connectionRef = new WeakReference<>(connection);
|
||||||
|
|
||||||
// Subscribe to the wildcard topic and start receive MQTT retained topics
|
// Subscribe to the wildcard topic and start receive MQTT retained topics
|
||||||
this.topics.parallelStream().map(t -> connection.subscribe(t, this)).collect(FutureCollector.allOf())
|
this.topics.stream().map(t -> connection.subscribe(t, this)).collect(FutureCollector.allOf())
|
||||||
.thenRun(this::subscribeSuccess).exceptionally(this::subscribeFail);
|
.thenRun(this::subscribeSuccess).exceptionally(this::subscribeFail);
|
||||||
|
|
||||||
return discoverFinishedFuture;
|
return discoverFinishedFuture;
|
||||||
|
@ -161,7 +161,7 @@ public class DiscoverComponents implements MqttMessageSubscriber {
|
||||||
if (connection != null && discoverTime > 0) {
|
if (connection != null && discoverTime > 0) {
|
||||||
this.stopDiscoveryFuture = scheduler.schedule(() -> {
|
this.stopDiscoveryFuture = scheduler.schedule(() -> {
|
||||||
this.stopDiscoveryFuture = null;
|
this.stopDiscoveryFuture = null;
|
||||||
this.topics.parallelStream().forEach(t -> connection.unsubscribe(t, this));
|
this.topics.stream().forEach(t -> connection.unsubscribe(t, this));
|
||||||
this.discoveredListener = null;
|
this.discoveredListener = null;
|
||||||
discoverFinishedFuture.complete(null);
|
discoverFinishedFuture.complete(null);
|
||||||
}, discoverTime, TimeUnit.MILLISECONDS);
|
}, discoverTime, TimeUnit.MILLISECONDS);
|
||||||
|
@ -180,7 +180,7 @@ public class DiscoverComponents implements MqttMessageSubscriber {
|
||||||
this.discoveredListener = null;
|
this.discoveredListener = null;
|
||||||
final MqttBrokerConnection connection = connectionRef.get();
|
final MqttBrokerConnection connection = connectionRef.get();
|
||||||
if (connection != null) {
|
if (connection != null) {
|
||||||
this.topics.parallelStream().forEach(t -> connection.unsubscribe(t, this));
|
this.topics.stream().forEach(t -> connection.unsubscribe(t, this));
|
||||||
connectionRef.clear();
|
connectionRef.clear();
|
||||||
}
|
}
|
||||||
discoverFinishedFuture.completeExceptionally(e);
|
discoverFinishedFuture.completeExceptionally(e);
|
||||||
|
|
|
@ -120,7 +120,7 @@ public abstract class AbstractComponent<C extends AbstractChannelConfiguration>
|
||||||
*/
|
*/
|
||||||
public CompletableFuture<@Nullable Void> start(MqttBrokerConnection connection, ScheduledExecutorService scheduler,
|
public CompletableFuture<@Nullable Void> start(MqttBrokerConnection connection, ScheduledExecutorService scheduler,
|
||||||
int timeout) {
|
int timeout) {
|
||||||
return channels.values().parallelStream().map(cChannel -> cChannel.start(connection, scheduler, timeout))
|
return channels.values().stream().map(cChannel -> cChannel.start(connection, scheduler, timeout))
|
||||||
.collect(FutureCollector.allOf());
|
.collect(FutureCollector.allOf());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -131,7 +131,7 @@ public abstract class AbstractComponent<C extends AbstractChannelConfiguration>
|
||||||
* exceptionally on errors.
|
* exceptionally on errors.
|
||||||
*/
|
*/
|
||||||
public CompletableFuture<@Nullable Void> stop() {
|
public CompletableFuture<@Nullable Void> stop() {
|
||||||
return channels.values().parallelStream().map(ComponentChannel::stop).collect(FutureCollector.allOf());
|
return channels.values().stream().map(ComponentChannel::stop).collect(FutureCollector.allOf());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -198,7 +198,7 @@ public class HomeAssistantThingHandler extends AbstractMQTTThingHandler
|
||||||
// Start all known components and channels within the components and put the Thing offline
|
// Start all known components and channels within the components and put the Thing offline
|
||||||
// if any subscribing failed ( == broker connection lost)
|
// if any subscribing failed ( == broker connection lost)
|
||||||
CompletableFuture<@Nullable Void> future = CompletableFuture.allOf(super.start(connection),
|
CompletableFuture<@Nullable Void> future = CompletableFuture.allOf(super.start(connection),
|
||||||
haComponents.values().parallelStream().map(e -> e.start(connection, scheduler, attributeReceiveTimeout))
|
haComponents.values().stream().map(e -> e.start(connection, scheduler, attributeReceiveTimeout))
|
||||||
.reduce(CompletableFuture.completedFuture(null), (a, v) -> a.thenCompose(b -> v)) // reduce to
|
.reduce(CompletableFuture.completedFuture(null), (a, v) -> a.thenCompose(b -> v)) // reduce to
|
||||||
// one
|
// one
|
||||||
.exceptionally(e -> {
|
.exceptionally(e -> {
|
||||||
|
@ -216,7 +216,7 @@ public class HomeAssistantThingHandler extends AbstractMQTTThingHandler
|
||||||
discoverComponents.stopDiscovery();
|
discoverComponents.stopDiscovery();
|
||||||
delayedProcessing.join();
|
delayedProcessing.join();
|
||||||
// haComponents does not need to be synchronised -> the discovery thread is disabled
|
// haComponents does not need to be synchronised -> the discovery thread is disabled
|
||||||
haComponents.values().parallelStream().map(AbstractComponent::stop) //
|
haComponents.values().stream().map(AbstractComponent::stop) //
|
||||||
// we need to join all the stops, otherwise they might not be done when start is called
|
// we need to join all the stops, otherwise they might not be done when start is called
|
||||||
.collect(FutureCollector.allOf()).join();
|
.collect(FutureCollector.allOf()).join();
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue