From 923c0f1db2a2921701753b47db764fa93476919a Mon Sep 17 00:00:00 2001 From: Sami Salonen Date: Tue, 6 Dec 2022 14:02:45 +0200 Subject: [PATCH] [mqtt] Avoid parallel streams with common thread pool to avoid deadlocks (#13621) To mitigate issue https://github.com/openhab/openhab-core/issues/3125 (common thread pool exhaustion when combining parallel streams with synchronization or locks) Signed-off-by: Sami Salonen --- .../binding/mqtt/generic/AbstractMQTTThingHandler.java | 7 +++++-- .../mqtt/homeassistant/internal/DiscoverComponents.java | 6 +++--- .../internal/component/AbstractComponent.java | 4 ++-- .../internal/handler/HomeAssistantThingHandler.java | 4 ++-- 4 files changed, 12 insertions(+), 9 deletions(-) diff --git a/bundles/org.openhab.binding.mqtt.generic/src/main/java/org/openhab/binding/mqtt/generic/AbstractMQTTThingHandler.java b/bundles/org.openhab.binding.mqtt.generic/src/main/java/org/openhab/binding/mqtt/generic/AbstractMQTTThingHandler.java index 9f7cc39be..898463ef4 100644 --- a/bundles/org.openhab.binding.mqtt.generic/src/main/java/org/openhab/binding/mqtt/generic/AbstractMQTTThingHandler.java +++ b/bundles/org.openhab.binding.mqtt.generic/src/main/java/org/openhab/binding/mqtt/generic/AbstractMQTTThingHandler.java @@ -105,8 +105,11 @@ public abstract class AbstractMQTTThingHandler extends BaseThingHandler * @return A future that completes normal on success and exceptionally on any errors. */ protected CompletableFuture<@Nullable Void> start(MqttBrokerConnection connection) { - return availabilityStates.values().parallelStream().map(cChannel -> cChannel.start(connection, scheduler, 0)) - .collect(FutureCollector.allOf()); + return availabilityStates.values().stream().map(cChannel -> { + final CompletableFuture<@Nullable Void> fut = cChannel == null ? CompletableFuture.completedFuture(null) + : cChannel.start(connection, scheduler, 0); + return fut; + }).collect(FutureCollector.allOf()); } /** diff --git a/bundles/org.openhab.binding.mqtt.homeassistant/src/main/java/org/openhab/binding/mqtt/homeassistant/internal/DiscoverComponents.java b/bundles/org.openhab.binding.mqtt.homeassistant/src/main/java/org/openhab/binding/mqtt/homeassistant/internal/DiscoverComponents.java index 1e2077491..3c4bb42d6 100644 --- a/bundles/org.openhab.binding.mqtt.homeassistant/src/main/java/org/openhab/binding/mqtt/homeassistant/internal/DiscoverComponents.java +++ b/bundles/org.openhab.binding.mqtt.homeassistant/src/main/java/org/openhab/binding/mqtt/homeassistant/internal/DiscoverComponents.java @@ -149,7 +149,7 @@ public class DiscoverComponents implements MqttMessageSubscriber { this.connectionRef = new WeakReference<>(connection); // Subscribe to the wildcard topic and start receive MQTT retained topics - this.topics.parallelStream().map(t -> connection.subscribe(t, this)).collect(FutureCollector.allOf()) + this.topics.stream().map(t -> connection.subscribe(t, this)).collect(FutureCollector.allOf()) .thenRun(this::subscribeSuccess).exceptionally(this::subscribeFail); return discoverFinishedFuture; @@ -161,7 +161,7 @@ public class DiscoverComponents implements MqttMessageSubscriber { if (connection != null && discoverTime > 0) { this.stopDiscoveryFuture = scheduler.schedule(() -> { this.stopDiscoveryFuture = null; - this.topics.parallelStream().forEach(t -> connection.unsubscribe(t, this)); + this.topics.stream().forEach(t -> connection.unsubscribe(t, this)); this.discoveredListener = null; discoverFinishedFuture.complete(null); }, discoverTime, TimeUnit.MILLISECONDS); @@ -180,7 +180,7 @@ public class DiscoverComponents implements MqttMessageSubscriber { this.discoveredListener = null; final MqttBrokerConnection connection = connectionRef.get(); if (connection != null) { - this.topics.parallelStream().forEach(t -> connection.unsubscribe(t, this)); + this.topics.stream().forEach(t -> connection.unsubscribe(t, this)); connectionRef.clear(); } discoverFinishedFuture.completeExceptionally(e); diff --git a/bundles/org.openhab.binding.mqtt.homeassistant/src/main/java/org/openhab/binding/mqtt/homeassistant/internal/component/AbstractComponent.java b/bundles/org.openhab.binding.mqtt.homeassistant/src/main/java/org/openhab/binding/mqtt/homeassistant/internal/component/AbstractComponent.java index e5d896339..c286693e9 100644 --- a/bundles/org.openhab.binding.mqtt.homeassistant/src/main/java/org/openhab/binding/mqtt/homeassistant/internal/component/AbstractComponent.java +++ b/bundles/org.openhab.binding.mqtt.homeassistant/src/main/java/org/openhab/binding/mqtt/homeassistant/internal/component/AbstractComponent.java @@ -120,7 +120,7 @@ public abstract class AbstractComponent */ public CompletableFuture<@Nullable Void> start(MqttBrokerConnection connection, ScheduledExecutorService scheduler, int timeout) { - return channels.values().parallelStream().map(cChannel -> cChannel.start(connection, scheduler, timeout)) + return channels.values().stream().map(cChannel -> cChannel.start(connection, scheduler, timeout)) .collect(FutureCollector.allOf()); } @@ -131,7 +131,7 @@ public abstract class AbstractComponent * exceptionally on errors. */ public CompletableFuture<@Nullable Void> stop() { - return channels.values().parallelStream().map(ComponentChannel::stop).collect(FutureCollector.allOf()); + return channels.values().stream().map(ComponentChannel::stop).collect(FutureCollector.allOf()); } /** diff --git a/bundles/org.openhab.binding.mqtt.homeassistant/src/main/java/org/openhab/binding/mqtt/homeassistant/internal/handler/HomeAssistantThingHandler.java b/bundles/org.openhab.binding.mqtt.homeassistant/src/main/java/org/openhab/binding/mqtt/homeassistant/internal/handler/HomeAssistantThingHandler.java index 89f560ab2..820d2b02e 100644 --- a/bundles/org.openhab.binding.mqtt.homeassistant/src/main/java/org/openhab/binding/mqtt/homeassistant/internal/handler/HomeAssistantThingHandler.java +++ b/bundles/org.openhab.binding.mqtt.homeassistant/src/main/java/org/openhab/binding/mqtt/homeassistant/internal/handler/HomeAssistantThingHandler.java @@ -198,7 +198,7 @@ public class HomeAssistantThingHandler extends AbstractMQTTThingHandler // Start all known components and channels within the components and put the Thing offline // if any subscribing failed ( == broker connection lost) CompletableFuture<@Nullable Void> future = CompletableFuture.allOf(super.start(connection), - haComponents.values().parallelStream().map(e -> e.start(connection, scheduler, attributeReceiveTimeout)) + haComponents.values().stream().map(e -> e.start(connection, scheduler, attributeReceiveTimeout)) .reduce(CompletableFuture.completedFuture(null), (a, v) -> a.thenCompose(b -> v)) // reduce to // one .exceptionally(e -> { @@ -216,7 +216,7 @@ public class HomeAssistantThingHandler extends AbstractMQTTThingHandler discoverComponents.stopDiscovery(); delayedProcessing.join(); // haComponents does not need to be synchronised -> the discovery thread is disabled - haComponents.values().parallelStream().map(AbstractComponent::stop) // + haComponents.values().stream().map(AbstractComponent::stop) // // we need to join all the stops, otherwise they might not be done when start is called .collect(FutureCollector.allOf()).join();