From 94a761f84eab95b70331e7047256bd7162eb1e65 Mon Sep 17 00:00:00 2001 From: Gwendal Roulleau Date: Thu, 6 Jul 2023 19:21:12 +0200 Subject: [PATCH] [pulseaudio] Make the process method asynchronous (#15179) * [pulseaudio] Make the process method asynchronous And use the new 'complete' system to signal core that the sound is fully played. --------- Signed-off-by: Gwendal Roulleau --- .../internal/ConvertedInputStream.java | 6 +- .../internal/PulseAudioAudioSink.java | 87 ++++++++++++++----- .../internal/PulseaudioHandlerFactory.java | 11 ++- .../internal/handler/PulseaudioHandler.java | 8 +- 4 files changed, 84 insertions(+), 28 deletions(-) diff --git a/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/ConvertedInputStream.java b/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/ConvertedInputStream.java index cdda35fa6..d96512562 100644 --- a/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/ConvertedInputStream.java +++ b/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/ConvertedInputStream.java @@ -28,7 +28,7 @@ import org.eclipse.jdt.annotation.NonNullByDefault; import org.eclipse.jdt.annotation.Nullable; import org.openhab.core.audio.AudioFormat; import org.openhab.core.audio.AudioStream; -import org.openhab.core.audio.FixedLengthAudioStream; +import org.openhab.core.audio.SizeableAudioStream; import org.openhab.core.audio.UnsupportedAudioFormatException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,8 +58,8 @@ public class ConvertedInputStream extends InputStream { throws UnsupportedAudioFormatException, UnsupportedAudioFileException, IOException { this.audioFormat = innerInputStream.getFormat(); - if (innerInputStream instanceof FixedLengthAudioStream) { - length = ((FixedLengthAudioStream) innerInputStream).length(); + if (innerInputStream instanceof SizeableAudioStream sizeableAudioStream) { + length = sizeableAudioStream.length(); } pcmNormalizedInputStream = getPCMStreamNormalized(getPCMStream(new BufferedInputStream(innerInputStream))); diff --git a/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/PulseAudioAudioSink.java b/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/PulseAudioAudioSink.java index 077bb5881..b9bd298ca 100644 --- a/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/PulseAudioAudioSink.java +++ b/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/PulseAudioAudioSink.java @@ -16,9 +16,10 @@ import java.io.IOException; import java.net.Socket; import java.time.Duration; import java.time.Instant; -import java.util.HashSet; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import javax.sound.sampled.UnsupportedAudioFileException; @@ -28,9 +29,11 @@ import org.openhab.binding.pulseaudio.internal.handler.PulseaudioHandler; import org.openhab.core.audio.AudioFormat; import org.openhab.core.audio.AudioSink; import org.openhab.core.audio.AudioStream; -import org.openhab.core.audio.FixedLengthAudioStream; +import org.openhab.core.audio.FileAudioStream; import org.openhab.core.audio.UnsupportedAudioFormatException; import org.openhab.core.audio.UnsupportedAudioStreamException; +import org.openhab.core.audio.utils.AudioSinkUtils; +import org.openhab.core.common.Disposable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,24 +50,29 @@ public class PulseAudioAudioSink extends PulseaudioSimpleProtocolStream implemen private final Logger logger = LoggerFactory.getLogger(PulseAudioAudioSink.class); - private static final HashSet SUPPORTED_FORMATS = new HashSet<>(); - private static final HashSet> SUPPORTED_STREAMS = new HashSet<>(); + private AudioSinkUtils audioSinkUtils; - static { - SUPPORTED_FORMATS.add(AudioFormat.WAV); - SUPPORTED_FORMATS.add(AudioFormat.MP3); - SUPPORTED_STREAMS.add(FixedLengthAudioStream.class); - } + private static final Set SUPPORTED_FORMATS = Set.of(AudioFormat.WAV, AudioFormat.MP3); + private static final Set> SUPPORTED_STREAMS = Set.of(AudioStream.class); + private static final AudioFormat TARGET_FORMAT = new AudioFormat(AudioFormat.CONTAINER_WAVE, + AudioFormat.CODEC_PCM_SIGNED, false, 16, 4 * 44100, 44100L, 2); - public PulseAudioAudioSink(PulseaudioHandler pulseaudioHandler, ScheduledExecutorService scheduler) { + public PulseAudioAudioSink(PulseaudioHandler pulseaudioHandler, ScheduledExecutorService scheduler, + AudioSinkUtils audioSinkUtils) { super(pulseaudioHandler, scheduler); + this.audioSinkUtils = audioSinkUtils; } @Override public void process(@Nullable AudioStream audioStream) throws UnsupportedAudioFormatException, UnsupportedAudioStreamException { + processAndComplete(audioStream); + } + + @Override + public CompletableFuture<@Nullable Void> processAndComplete(@Nullable AudioStream audioStream) { if (audioStream == null) { - return; + return CompletableFuture.completedFuture(null); } addClientCount(); try (ConvertedInputStream normalizedPCMStream = new ConvertedInputStream(audioStream)) { @@ -75,18 +83,38 @@ public class PulseAudioAudioSink extends PulseaudioSimpleProtocolStream implemen if (clientSocketLocal != null) { // send raw audio to the socket and to pulse audio Instant start = Instant.now(); - normalizedPCMStream.transferTo(clientSocketLocal.getOutputStream()); - if (normalizedPCMStream.getDuration() != -1) { // ensure, if the sound has a duration + if (normalizedPCMStream.getDuration() != -1) { + // ensure, if the sound has a duration // that we let at least this time for the system to play + normalizedPCMStream.transferTo(clientSocketLocal.getOutputStream()); Instant end = Instant.now(); long millisSecondTimedToSendAudioData = Duration.between(start, end).toMillis(); if (millisSecondTimedToSendAudioData < normalizedPCMStream.getDuration()) { - long timeToSleep = normalizedPCMStream.getDuration() - millisSecondTimedToSendAudioData; - logger.debug("Sleep time to let the system play sound : {}", timeToSleep); - Thread.sleep(timeToSleep); + CompletableFuture<@Nullable Void> soundPlayed = new CompletableFuture<>(); + long timeToWait = normalizedPCMStream.getDuration() - millisSecondTimedToSendAudioData; + logger.debug("Some time to let the system play sound : {}", timeToWait); + scheduler.schedule(() -> soundPlayed.complete(null), timeToWait, TimeUnit.MILLISECONDS); + return soundPlayed; + } else { + return CompletableFuture.completedFuture(null); + } + } else { + // We have a second method available to guess the duration, and it is during transfer + Long timeStampEnd = audioSinkUtils.transferAndAnalyzeLength(normalizedPCMStream, + clientSocketLocal.getOutputStream(), TARGET_FORMAT); + CompletableFuture<@Nullable Void> soundPlayed = new CompletableFuture<>(); + if (timeStampEnd != null) { + long now = System.nanoTime(); + long timeToWait = timeStampEnd - now; + if (timeToWait > 0) { + scheduler.schedule(() -> soundPlayed.complete(null), timeToWait, + TimeUnit.NANOSECONDS); + } + return soundPlayed; + } else { + return CompletableFuture.completedFuture(null); } } - break; } } catch (IOException e) { disconnect(); // disconnect force to clear connection in case of socket not cleanly shutdown @@ -97,19 +125,34 @@ public class PulseAudioAudioSink extends PulseaudioSimpleProtocolStream implemen logger.warn( "Error while trying to send audio to pulseaudio audio sink. Cannot connect to {}:{}, error: {}", pulseaudioHandler.getHost(), port, e.getMessage()); - break; + return CompletableFuture.completedFuture(null); } } catch (InterruptedException ie) { logger.info("Interrupted during sink audio connection: {}", ie.getMessage()); - break; + return CompletableFuture.completedFuture(null); } } - } catch (UnsupportedAudioFileException | IOException e) { - throw new UnsupportedAudioFormatException("Cannot send sound to the pulseaudio sink", - audioStream.getFormat(), e); + } catch (UnsupportedAudioFileException | UnsupportedAudioFormatException | IOException e) { + return CompletableFuture.failedFuture(new UnsupportedAudioFormatException( + "Cannot send sound to the pulseaudio sink", audioStream.getFormat(), e)); } finally { minusClientCount(); + // if the stream is not needed anymore, then we should call back the AudioStream to let it a chance + // to auto dispose. + if (audioStream instanceof Disposable disposableAudioStream) { + try { + disposableAudioStream.dispose(); + } catch (IOException e) { + String fileName = audioStream instanceof FileAudioStream file ? file.toString() : "unknown"; + if (logger.isDebugEnabled()) { + logger.debug("Cannot dispose of stream {}", fileName, e); + } else { + logger.warn("Cannot dispose of stream {}, reason {}", fileName, e.getMessage()); + } + } + } } + return CompletableFuture.completedFuture(null); } @Override diff --git a/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/PulseaudioHandlerFactory.java b/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/PulseaudioHandlerFactory.java index 2d3d9ab41..73952f249 100644 --- a/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/PulseaudioHandlerFactory.java +++ b/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/PulseaudioHandlerFactory.java @@ -25,6 +25,7 @@ import org.eclipse.jdt.annotation.Nullable; import org.openhab.binding.pulseaudio.internal.discovery.PulseaudioDeviceDiscoveryService; import org.openhab.binding.pulseaudio.internal.handler.PulseaudioBridgeHandler; import org.openhab.binding.pulseaudio.internal.handler.PulseaudioHandler; +import org.openhab.core.audio.utils.AudioSinkUtils; import org.openhab.core.config.core.Configuration; import org.openhab.core.config.discovery.DiscoveryService; import org.openhab.core.thing.Bridge; @@ -39,6 +40,7 @@ import org.osgi.service.component.ComponentContext; import org.osgi.service.component.annotations.Activate; import org.osgi.service.component.annotations.Component; import org.osgi.service.component.annotations.Modified; +import org.osgi.service.component.annotations.Reference; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,6 +63,13 @@ public class PulseaudioHandlerFactory extends BaseThingHandlerFactory { private PulseAudioBindingConfiguration configuration = new PulseAudioBindingConfiguration(); + private AudioSinkUtils audioSinkUtils; + + @Activate + public PulseaudioHandlerFactory(@Reference AudioSinkUtils audioSinkUtils) { + this.audioSinkUtils = audioSinkUtils; + } + @Override public boolean supportsThingType(ThingTypeUID thingTypeUID) { return SUPPORTED_THING_TYPES_UIDS.contains(thingTypeUID); @@ -119,7 +128,7 @@ public class PulseaudioHandlerFactory extends BaseThingHandlerFactory { registerDeviceDiscoveryService(handler); return handler; } else if (PulseaudioHandler.SUPPORTED_THING_TYPES_UIDS.contains(thingTypeUID)) { - return new PulseaudioHandler(thing, bundleContext); + return new PulseaudioHandler(thing, bundleContext, audioSinkUtils); } return null; diff --git a/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/handler/PulseaudioHandler.java b/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/handler/PulseaudioHandler.java index 4472ca39a..9658e5741 100644 --- a/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/handler/PulseaudioHandler.java +++ b/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/handler/PulseaudioHandler.java @@ -40,6 +40,7 @@ import org.openhab.binding.pulseaudio.internal.items.Source; import org.openhab.core.audio.AudioFormat; import org.openhab.core.audio.AudioSink; import org.openhab.core.audio.AudioSource; +import org.openhab.core.audio.utils.AudioSinkUtils; import org.openhab.core.config.core.Configuration; import org.openhab.core.library.types.DecimalType; import org.openhab.core.library.types.IncreaseDecreaseType; @@ -89,9 +90,12 @@ public class PulseaudioHandler extends BaseThingHandler { private final BundleContext bundleContext; - public PulseaudioHandler(Thing thing, BundleContext bundleContext) { + private AudioSinkUtils audioSinkUtils; + + public PulseaudioHandler(Thing thing, BundleContext bundleContext, AudioSinkUtils audioSinkUtils) { super(thing); this.bundleContext = bundleContext; + this.audioSinkUtils = audioSinkUtils; } @Override @@ -127,7 +131,7 @@ public class PulseaudioHandler extends BaseThingHandler { return; } final PulseaudioHandler thisHandler = this; - PulseAudioAudioSink audioSink = new PulseAudioAudioSink(thisHandler, scheduler); + PulseAudioAudioSink audioSink = new PulseAudioAudioSink(thisHandler, scheduler, audioSinkUtils); scheduler.submit(new Runnable() { @Override public void run() {