From fee7f7e9c57704af8d3ebc8cfb810f31ecd48e9f Mon Sep 17 00:00:00 2001 From: GiviMAD Date: Fri, 18 Mar 2022 00:41:00 +0100 Subject: [PATCH] [pulseaudio] source: use thread safe collection and force reconnection (#12441) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * [pulseaudio] use thread safe collection * [pulseaudio] source: connect pipe before store ref * [pulseaudio] source: improve warning messages * [pulseaudio] fix IOException when closing all sources * [pulseaudio] prevent warning when InterruptedIOException on source close Signed-off-by: Miguel Álvarez Díez --- .../internal/PulseAudioAudioSource.java | 59 +++++++++++++++---- 1 file changed, 48 insertions(+), 11 deletions(-) diff --git a/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/PulseAudioAudioSource.java b/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/PulseAudioAudioSource.java index a44b2badc..04bc37636 100644 --- a/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/PulseAudioAudioSource.java +++ b/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/PulseAudioAudioSource.java @@ -14,11 +14,13 @@ package org.openhab.binding.pulseaudio.internal; import java.io.IOException; import java.io.InputStream; +import java.io.InterruptedIOException; import java.io.PipedInputStream; import java.io.PipedOutputStream; import java.net.Socket; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.function.Consumer; @@ -44,7 +46,7 @@ import org.slf4j.LoggerFactory; public class PulseAudioAudioSource extends PulseaudioSimpleProtocolStream implements AudioSource { private final Logger logger = LoggerFactory.getLogger(PulseAudioAudioSource.class); - private final Set pipeOutputs = new HashSet<>(); + private final ConcurrentLinkedQueue pipeOutputs = new ConcurrentLinkedQueue<>(); private final ScheduledExecutorService executor; private @Nullable Future pipeWriteTask; @@ -84,14 +86,14 @@ public class PulseAudioAudioSource extends PulseaudioSimpleProtocolStream implem } setIdle(true); var pipeOutput = new PipedOutputStream(); - registerPipe(pipeOutput); - var pipeInput = new PipedInputStream(pipeOutput, 1024 * 20) { + var pipeInput = new PipedInputStream(pipeOutput, 1024 * 10) { @Override public void close() throws IOException { unregisterPipe(pipeOutput); super.close(); } }; + registerPipe(pipeOutput); // get raw audio from the pulse audio socket return new PulseAudioStream(sourceFormat, pipeInput, (idle) -> { setIdle(idle); @@ -103,7 +105,7 @@ public class PulseAudioAudioSource extends PulseaudioSimpleProtocolStream implem } }); } catch (IOException e) { - disconnect(); // disconnect force to clear connection in case of socket not cleanly shutdown + disconnect(); // disconnect to force clear connection in case of socket not cleanly shutdown if (countAttempt == 2) { // we won't retry : log and quit String port = clientSocket != null ? Integer.toString(clientSocket.getPort()) : "unknown"; logger.warn( @@ -133,24 +135,46 @@ public class PulseAudioAudioSource extends PulseaudioSimpleProtocolStream implem startPipeWrite(); } - private void startPipeWrite() { - if (pipeWriteTask == null) { + private synchronized void startPipeWrite() { + if (this.pipeWriteTask == null) { this.pipeWriteTask = executor.submit(() -> { int lengthRead; byte[] buffer = new byte[1024]; + int readRetries = 3; while (!pipeOutputs.isEmpty()) { var stream = getSourceInputStream(); if (stream != null) { try { lengthRead = stream.read(buffer); + readRetries = 3; for (var output : pipeOutputs) { - output.write(buffer, 0, lengthRead); - output.flush(); + try { + output.write(buffer, 0, lengthRead); + if (pipeOutputs.contains(output)) { + output.flush(); + } + } catch (IOException e) { + if (e instanceof InterruptedIOException && pipeOutputs.isEmpty()) { + // task has been ended while writing + return; + } + logger.warn("IOException while writing to from pulse source pipe: {}", + getExceptionMessage(e)); + } catch (RuntimeException e) { + logger.warn("RuntimeException while writing to pulse source pipe: {}", + getExceptionMessage(e)); + } } } catch (IOException e) { - logger.warn("IOException while reading from pulse source: {}", e.getMessage()); + logger.warn("IOException while reading from pulse source: {}", getExceptionMessage(e)); + if (readRetries == 0) { + // force reconnection on persistent IOException + super.disconnect(); + } else { + readRetries--; + } } catch (RuntimeException e) { - logger.warn("RuntimeException while reading from pulse source: {}", e.getMessage()); + logger.warn("RuntimeException while reading from pulse source: {}", getExceptionMessage(e)); } } else { logger.warn("Unable to get source input stream"); @@ -163,6 +187,10 @@ public class PulseAudioAudioSource extends PulseaudioSimpleProtocolStream implem private synchronized void unregisterPipe(PipedOutputStream pipeOutput) { this.pipeOutputs.remove(pipeOutput); + try { + Thread.sleep(0); + } catch (InterruptedException ignored) { + } stopPipeWriteTask(); try { pipeOutput.close(); @@ -170,7 +198,7 @@ public class PulseAudioAudioSource extends PulseaudioSimpleProtocolStream implem } } - private void stopPipeWriteTask() { + private synchronized void stopPipeWriteTask() { var pipeWriteTask = this.pipeWriteTask; if (pipeOutputs.isEmpty() && pipeWriteTask != null) { pipeWriteTask.cancel(true); @@ -178,6 +206,15 @@ public class PulseAudioAudioSource extends PulseaudioSimpleProtocolStream implem } } + private @Nullable String getExceptionMessage(Exception e) { + String message = e.getMessage(); + var cause = e.getCause(); + if (message == null && cause != null) { + message = cause.getMessage(); + } + return message; + } + private @Nullable InputStream getSourceInputStream() { try { connectIfNeeded();