[pulseaudio] Apply real disconnection when needed (#13338)

* [pulseaudio] Removing isIdle test

The isIdle boolean was not properly handled.
When disconnection is called, isIdle is not relevant : we should always honnor the disconnection request.
In fact, isIdle prevented disconnection when it is necessary (example : when a IOException occurs when sending audio to sink)

+Little bug fix on volume parsing: some volume request doesn't respond with a space after the comma separating left/right channel.

* [pulseaudio] Enhancement to the idle detection for disconnection

Using a counter to count client instead of a isIdle variable, which was not thread safe.
The PulseaudioSimpleProtocolStream parent class is now the sole responsible for closing source or sink stream.

* [pulseaudio] Small performance enhancement

Avoid a costly synchronized operation for a method called very often.

Signed-off-by: Gwendal Roulleau <gwendal.roulleau@gmail.com>
This commit is contained in:
Gwendal Roulleau 2022-09-25 12:59:30 +02:00 committed by GitHub
parent af0ac6e474
commit b7cbf2ba72
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 85 additions and 46 deletions

View File

@ -66,6 +66,7 @@ public class PulseAudioAudioSink extends PulseaudioSimpleProtocolStream implemen
if (audioStream == null) { if (audioStream == null) {
return; return;
} }
addClientCount();
try (ConvertedInputStream normalizedPCMStream = new ConvertedInputStream(audioStream)) { try (ConvertedInputStream normalizedPCMStream = new ConvertedInputStream(audioStream)) {
for (int countAttempt = 1; countAttempt <= 2; countAttempt++) { // two attempts allowed for (int countAttempt = 1; countAttempt <= 2; countAttempt++) { // two attempts allowed
try { try {
@ -73,7 +74,6 @@ public class PulseAudioAudioSink extends PulseaudioSimpleProtocolStream implemen
final Socket clientSocketLocal = clientSocket; final Socket clientSocketLocal = clientSocket;
if (clientSocketLocal != null) { if (clientSocketLocal != null) {
// send raw audio to the socket and to pulse audio // send raw audio to the socket and to pulse audio
setIdle(false);
Instant start = Instant.now(); Instant start = Instant.now();
normalizedPCMStream.transferTo(clientSocketLocal.getOutputStream()); normalizedPCMStream.transferTo(clientSocketLocal.getOutputStream());
if (normalizedPCMStream.getDuration() != -1) { // ensure, if the sound has a duration if (normalizedPCMStream.getDuration() != -1) { // ensure, if the sound has a duration
@ -108,9 +108,8 @@ public class PulseAudioAudioSink extends PulseaudioSimpleProtocolStream implemen
throw new UnsupportedAudioFormatException("Cannot send sound to the pulseaudio sink", throw new UnsupportedAudioFormatException("Cannot send sound to the pulseaudio sink",
audioStream.getFormat(), e); audioStream.getFormat(), e);
} finally { } finally {
scheduleDisconnect(); minusClientCount();
} }
setIdle(true);
} }
@Override @Override

View File

@ -23,7 +23,6 @@ import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;
import org.eclipse.jdt.annotation.NonNullByDefault; import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable; import org.eclipse.jdt.annotation.Nullable;
@ -84,7 +83,6 @@ public class PulseAudioAudioSource extends PulseaudioSimpleProtocolStream implem
if (!audioFormat.isCompatible(sourceFormat)) { if (!audioFormat.isCompatible(sourceFormat)) {
throw new AudioException("Incompatible audio format requested"); throw new AudioException("Incompatible audio format requested");
} }
setIdle(true);
var pipeOutput = new PipedOutputStream(); var pipeOutput = new PipedOutputStream();
var pipeInput = new PipedInputStream(pipeOutput, 1024 * 10) { var pipeInput = new PipedInputStream(pipeOutput, 1024 * 10) {
@Override @Override
@ -95,14 +93,9 @@ public class PulseAudioAudioSource extends PulseaudioSimpleProtocolStream implem
}; };
registerPipe(pipeOutput); registerPipe(pipeOutput);
// get raw audio from the pulse audio socket // get raw audio from the pulse audio socket
return new PulseAudioStream(sourceFormat, pipeInput, (idle) -> { return new PulseAudioStream(sourceFormat, pipeInput, () -> {
setIdle(idle); // ensure pipe is writing
if (idle) { startPipeWrite();
scheduleDisconnect();
} else {
// ensure pipe is writing
startPipeWrite();
}
}); });
} catch (IOException e) { } catch (IOException e) {
disconnect(); // disconnect to force clear connection in case of socket not cleanly shutdown disconnect(); // disconnect to force clear connection in case of socket not cleanly shutdown
@ -113,31 +106,40 @@ public class PulseAudioAudioSource extends PulseaudioSimpleProtocolStream implem
logger.warn( logger.warn(
"Error while trying to get audio from pulseaudio audio source. Cannot connect to {}:{}, error: {}", "Error while trying to get audio from pulseaudio audio source. Cannot connect to {}:{}, error: {}",
pulseaudioHandler.getHost(), port, e.getMessage()); pulseaudioHandler.getHost(), port, e.getMessage());
setIdle(true);
throw e; throw e;
} }
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
logger.info("Interrupted during source audio connection: {}", ie.getMessage()); logger.info("Interrupted during source audio connection: {}", ie.getMessage());
setIdle(true);
throw new AudioException(ie); throw new AudioException(ie);
} }
countAttempt++; countAttempt++;
} }
} catch (IOException e) { } catch (IOException e) {
throw new AudioException(e); throw new AudioException(e);
} finally {
scheduleDisconnect();
} }
setIdle(true);
throw new AudioException("Unable to create input stream"); throw new AudioException("Unable to create input stream");
} }
private synchronized void registerPipe(PipedOutputStream pipeOutput) { private synchronized void registerPipe(PipedOutputStream pipeOutput) {
this.pipeOutputs.add(pipeOutput); boolean isAdded = this.pipeOutputs.add(pipeOutput);
if (isAdded) {
addClientCount();
}
startPipeWrite(); startPipeWrite();
} }
private synchronized void startPipeWrite() { /**
* As startPipeWrite is called for every chunk read,
* this wrapper method make the test before effectively
* locking the object (which is a costly operation)
*/
private void startPipeWrite() {
if (this.pipeWriteTask == null) {
startPipeWriteSynchronized();
}
}
private synchronized void startPipeWriteSynchronized() {
if (this.pipeWriteTask == null) { if (this.pipeWriteTask == null) {
this.pipeWriteTask = executor.submit(() -> { this.pipeWriteTask = executor.submit(() -> {
int lengthRead; int lengthRead;
@ -191,7 +193,10 @@ public class PulseAudioAudioSource extends PulseaudioSimpleProtocolStream implem
} }
private synchronized void unregisterPipe(PipedOutputStream pipeOutput) { private synchronized void unregisterPipe(PipedOutputStream pipeOutput) {
this.pipeOutputs.remove(pipeOutput); boolean isRemoved = this.pipeOutputs.remove(pipeOutput);
if (isRemoved) {
minusClientCount();
}
try { try {
Thread.sleep(0); Thread.sleep(0);
} catch (InterruptedException ignored) { } catch (InterruptedException ignored) {
@ -243,13 +248,13 @@ public class PulseAudioAudioSource extends PulseaudioSimpleProtocolStream implem
private final Logger logger = LoggerFactory.getLogger(PulseAudioAudioSource.class); private final Logger logger = LoggerFactory.getLogger(PulseAudioAudioSource.class);
private final AudioFormat format; private final AudioFormat format;
private final InputStream input; private final InputStream input;
private final Consumer<Boolean> setIdle; private final Runnable activity;
private boolean closed = false; private boolean closed = false;
public PulseAudioStream(AudioFormat format, InputStream input, Consumer<Boolean> setIdle) { public PulseAudioStream(AudioFormat format, InputStream input, Runnable activity) {
this.input = input; this.input = input;
this.format = format; this.format = format;
this.setIdle = setIdle; this.activity = activity;
} }
@Override @Override
@ -282,14 +287,13 @@ public class PulseAudioAudioSource extends PulseaudioSimpleProtocolStream implem
if (closed) { if (closed) {
throw new IOException("Stream is closed"); throw new IOException("Stream is closed");
} }
setIdle.accept(false); activity.run();
return input.read(b, off, len); return input.read(b, off, len);
} }
@Override @Override
public void close() throws IOException { public void close() throws IOException {
closed = true; closed = true;
setIdle.accept(true);
input.close(); input.close();
} }
}; };

View File

@ -18,6 +18,7 @@ import java.util.Locale;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import org.eclipse.jdt.annotation.NonNullByDefault; import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable; import org.eclipse.jdt.annotation.Nullable;
@ -43,7 +44,8 @@ public abstract class PulseaudioSimpleProtocolStream {
protected @Nullable Socket clientSocket; protected @Nullable Socket clientSocket;
private boolean isIdle = true; private ReentrantLock countClientLock = new ReentrantLock();
private Integer countClient = 0;
private @Nullable ScheduledFuture<?> scheduledDisconnection; private @Nullable ScheduledFuture<?> scheduledDisconnection;
@ -54,6 +56,7 @@ public abstract class PulseaudioSimpleProtocolStream {
/** /**
* Connect to pulseaudio with the simple protocol * Connect to pulseaudio with the simple protocol
* Will schedule an attempt for disconnection after timeout
* *
* @throws IOException * @throws IOException
* @throws InterruptedException when interrupted during the loading module wait * @throws InterruptedException when interrupted during the loading module wait
@ -61,12 +64,13 @@ public abstract class PulseaudioSimpleProtocolStream {
public void connectIfNeeded() throws IOException, InterruptedException { public void connectIfNeeded() throws IOException, InterruptedException {
Socket clientSocketLocal = clientSocket; Socket clientSocketLocal = clientSocket;
if (clientSocketLocal == null || !clientSocketLocal.isConnected() || clientSocketLocal.isClosed()) { if (clientSocketLocal == null || !clientSocketLocal.isConnected() || clientSocketLocal.isClosed()) {
logger.debug("Simple TCP Stream connecting"); logger.debug("Simple TCP Stream connecting for {}", getLabel(null));
String host = pulseaudioHandler.getHost(); String host = pulseaudioHandler.getHost();
int port = pulseaudioHandler.getSimpleTcpPortAndLoadModuleIfNecessary(); int port = pulseaudioHandler.getSimpleTcpPortAndLoadModuleIfNecessary();
var clientSocketFinal = new Socket(host, port); var clientSocketFinal = new Socket(host, port);
clientSocketFinal.setSoTimeout(pulseaudioHandler.getBasicProtocolSOTimeout()); clientSocketFinal.setSoTimeout(pulseaudioHandler.getBasicProtocolSOTimeout());
clientSocket = clientSocketFinal; clientSocket = clientSocketFinal;
scheduleDisconnectIfNoClient();
} }
} }
@ -75,8 +79,8 @@ public abstract class PulseaudioSimpleProtocolStream {
*/ */
public void disconnect() { public void disconnect() {
final Socket clientSocketLocal = clientSocket; final Socket clientSocketLocal = clientSocket;
if (clientSocketLocal != null && isIdle) { if (clientSocketLocal != null) {
logger.debug("Simple TCP Stream disconnecting"); logger.debug("Simple TCP Stream disconnecting for {}", getLabel(null));
try { try {
clientSocketLocal.close(); clientSocketLocal.close();
} catch (IOException ignored) { } catch (IOException ignored) {
@ -86,15 +90,23 @@ public abstract class PulseaudioSimpleProtocolStream {
} }
} }
public void scheduleDisconnect() { private void scheduleDisconnectIfNoClient() {
var scheduledDisconnectionFinal = scheduledDisconnection; countClientLock.lock();
if (scheduledDisconnectionFinal != null) { try {
scheduledDisconnectionFinal.cancel(true); if (countClient <= 0) {
} var scheduledDisconnectionFinal = scheduledDisconnection;
int idleTimeout = pulseaudioHandler.getIdleTimeout(); if (scheduledDisconnectionFinal != null) {
if (idleTimeout > -1) { logger.debug("Aborting next disconnect");
logger.debug("Scheduling disconnect"); scheduledDisconnectionFinal.cancel(true);
scheduledDisconnection = scheduler.schedule(this::disconnect, idleTimeout, TimeUnit.MILLISECONDS); }
int idleTimeout = pulseaudioHandler.getIdleTimeout();
if (idleTimeout > -1) {
logger.debug("Scheduling next disconnect");
scheduledDisconnection = scheduler.schedule(this::disconnect, idleTimeout, TimeUnit.MILLISECONDS);
}
}
} finally {
countClientLock.unlock();
} }
} }
@ -115,7 +127,35 @@ public abstract class PulseaudioSimpleProtocolStream {
return label != null ? label : pulseaudioHandler.getThing().getUID().getId(); return label != null ? label : pulseaudioHandler.getThing().getUID().getId();
} }
public void setIdle(boolean idle) { protected void addClientCount() {
isIdle = idle; countClientLock.lock();
try {
countClient += 1;
logger.debug("Adding new client for pulseaudio sink/source {}. Current count: {}", getLabel(null),
countClient);
if (countClient <= 0) { // safe against misuse
countClient = 1;
}
var scheduledDisconnectionFinal = scheduledDisconnection;
if (scheduledDisconnectionFinal != null) {
logger.debug("Aborting next disconnect");
scheduledDisconnectionFinal.cancel(true);
}
} finally {
countClientLock.unlock();
}
}
protected void minusClientCount() {
countClientLock.lock();
countClient -= 1;
logger.debug("Removing client for pulseaudio sink/source {}. Current count: {}", getLabel(null), countClient);
if (countClient < 0) { // safe against misuse
countClient = 0;
}
countClientLock.unlock();
if (countClient <= 0) {
scheduleDisconnectIfNoClient();
}
} }
} }

View File

@ -352,7 +352,7 @@ public class Parser {
private static int parseVolume(String vol) { private static int parseVolume(String vol) {
int volumeTotal = 0; int volumeTotal = 0;
int nChannels = 0; int nChannels = 0;
for (String channel : vol.split(", ")) { for (String channel : vol.split(",")) {
Matcher matcher = VOLUME_PATTERN.matcher(channel.trim()); Matcher matcher = VOLUME_PATTERN.matcher(channel.trim());
if (matcher.find()) { if (matcher.find()) {
volumeTotal += Integer.valueOf(matcher.group(3)); volumeTotal += Integer.valueOf(matcher.group(3));

View File

@ -140,8 +140,6 @@ public class PulseaudioHandler extends BaseThingHandler {
} catch (InterruptedException i) { } catch (InterruptedException i) {
logger.info("Interrupted during sink audio connection: {}", i.getMessage()); logger.info("Interrupted during sink audio connection: {}", i.getMessage());
return; return;
} finally {
audioSink.scheduleDisconnect();
} }
} }
}); });
@ -194,8 +192,6 @@ public class PulseaudioHandler extends BaseThingHandler {
} catch (InterruptedException i) { } catch (InterruptedException i) {
logger.info("Interrupted during source audio connection: {}", i.getMessage()); logger.info("Interrupted during source audio connection: {}", i.getMessage());
return; return;
} finally {
audioSource.scheduleDisconnect();
} }
} }
}); });