From b7cbf2ba72bf137853c9ffbdf0392f7b0762dfbc Mon Sep 17 00:00:00 2001 From: Gwendal Roulleau Date: Sun, 25 Sep 2022 12:59:30 +0200 Subject: [PATCH] [pulseaudio] Apply real disconnection when needed (#13338) * [pulseaudio] Removing isIdle test The isIdle boolean was not properly handled. When disconnection is called, isIdle is not relevant : we should always honnor the disconnection request. In fact, isIdle prevented disconnection when it is necessary (example : when a IOException occurs when sending audio to sink) +Little bug fix on volume parsing: some volume request doesn't respond with a space after the comma separating left/right channel. * [pulseaudio] Enhancement to the idle detection for disconnection Using a counter to count client instead of a isIdle variable, which was not thread safe. The PulseaudioSimpleProtocolStream parent class is now the sole responsible for closing source or sink stream. * [pulseaudio] Small performance enhancement Avoid a costly synchronized operation for a method called very often. Signed-off-by: Gwendal Roulleau --- .../internal/PulseAudioAudioSink.java | 5 +- .../internal/PulseAudioAudioSource.java | 50 +++++++------ .../PulseaudioSimpleProtocolStream.java | 70 +++++++++++++++---- .../pulseaudio/internal/cli/Parser.java | 2 +- .../internal/handler/PulseaudioHandler.java | 4 -- 5 files changed, 85 insertions(+), 46 deletions(-) diff --git a/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/PulseAudioAudioSink.java b/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/PulseAudioAudioSink.java index 8e42bb1c261..dad493e5e85 100644 --- a/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/PulseAudioAudioSink.java +++ b/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/PulseAudioAudioSink.java @@ -66,6 +66,7 @@ public class PulseAudioAudioSink extends PulseaudioSimpleProtocolStream implemen if (audioStream == null) { return; } + addClientCount(); try (ConvertedInputStream normalizedPCMStream = new ConvertedInputStream(audioStream)) { for (int countAttempt = 1; countAttempt <= 2; countAttempt++) { // two attempts allowed try { @@ -73,7 +74,6 @@ public class PulseAudioAudioSink extends PulseaudioSimpleProtocolStream implemen final Socket clientSocketLocal = clientSocket; if (clientSocketLocal != null) { // send raw audio to the socket and to pulse audio - setIdle(false); Instant start = Instant.now(); normalizedPCMStream.transferTo(clientSocketLocal.getOutputStream()); if (normalizedPCMStream.getDuration() != -1) { // ensure, if the sound has a duration @@ -108,9 +108,8 @@ public class PulseAudioAudioSink extends PulseaudioSimpleProtocolStream implemen throw new UnsupportedAudioFormatException("Cannot send sound to the pulseaudio sink", audioStream.getFormat(), e); } finally { - scheduleDisconnect(); + minusClientCount(); } - setIdle(true); } @Override diff --git a/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/PulseAudioAudioSource.java b/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/PulseAudioAudioSource.java index 5369f600114..f7acb4b0af2 100644 --- a/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/PulseAudioAudioSource.java +++ b/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/PulseAudioAudioSource.java @@ -23,7 +23,6 @@ import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; -import java.util.function.Consumer; import org.eclipse.jdt.annotation.NonNullByDefault; import org.eclipse.jdt.annotation.Nullable; @@ -84,7 +83,6 @@ public class PulseAudioAudioSource extends PulseaudioSimpleProtocolStream implem if (!audioFormat.isCompatible(sourceFormat)) { throw new AudioException("Incompatible audio format requested"); } - setIdle(true); var pipeOutput = new PipedOutputStream(); var pipeInput = new PipedInputStream(pipeOutput, 1024 * 10) { @Override @@ -95,14 +93,9 @@ public class PulseAudioAudioSource extends PulseaudioSimpleProtocolStream implem }; registerPipe(pipeOutput); // get raw audio from the pulse audio socket - return new PulseAudioStream(sourceFormat, pipeInput, (idle) -> { - setIdle(idle); - if (idle) { - scheduleDisconnect(); - } else { - // ensure pipe is writing - startPipeWrite(); - } + return new PulseAudioStream(sourceFormat, pipeInput, () -> { + // ensure pipe is writing + startPipeWrite(); }); } catch (IOException e) { disconnect(); // disconnect to force clear connection in case of socket not cleanly shutdown @@ -113,31 +106,40 @@ public class PulseAudioAudioSource extends PulseaudioSimpleProtocolStream implem logger.warn( "Error while trying to get audio from pulseaudio audio source. Cannot connect to {}:{}, error: {}", pulseaudioHandler.getHost(), port, e.getMessage()); - setIdle(true); throw e; } } catch (InterruptedException ie) { logger.info("Interrupted during source audio connection: {}", ie.getMessage()); - setIdle(true); throw new AudioException(ie); } countAttempt++; } } catch (IOException e) { throw new AudioException(e); - } finally { - scheduleDisconnect(); } - setIdle(true); throw new AudioException("Unable to create input stream"); } private synchronized void registerPipe(PipedOutputStream pipeOutput) { - this.pipeOutputs.add(pipeOutput); + boolean isAdded = this.pipeOutputs.add(pipeOutput); + if (isAdded) { + addClientCount(); + } startPipeWrite(); } - private synchronized void startPipeWrite() { + /** + * As startPipeWrite is called for every chunk read, + * this wrapper method make the test before effectively + * locking the object (which is a costly operation) + */ + private void startPipeWrite() { + if (this.pipeWriteTask == null) { + startPipeWriteSynchronized(); + } + } + + private synchronized void startPipeWriteSynchronized() { if (this.pipeWriteTask == null) { this.pipeWriteTask = executor.submit(() -> { int lengthRead; @@ -191,7 +193,10 @@ public class PulseAudioAudioSource extends PulseaudioSimpleProtocolStream implem } private synchronized void unregisterPipe(PipedOutputStream pipeOutput) { - this.pipeOutputs.remove(pipeOutput); + boolean isRemoved = this.pipeOutputs.remove(pipeOutput); + if (isRemoved) { + minusClientCount(); + } try { Thread.sleep(0); } catch (InterruptedException ignored) { @@ -243,13 +248,13 @@ public class PulseAudioAudioSource extends PulseaudioSimpleProtocolStream implem private final Logger logger = LoggerFactory.getLogger(PulseAudioAudioSource.class); private final AudioFormat format; private final InputStream input; - private final Consumer setIdle; + private final Runnable activity; private boolean closed = false; - public PulseAudioStream(AudioFormat format, InputStream input, Consumer setIdle) { + public PulseAudioStream(AudioFormat format, InputStream input, Runnable activity) { this.input = input; this.format = format; - this.setIdle = setIdle; + this.activity = activity; } @Override @@ -282,14 +287,13 @@ public class PulseAudioAudioSource extends PulseaudioSimpleProtocolStream implem if (closed) { throw new IOException("Stream is closed"); } - setIdle.accept(false); + activity.run(); return input.read(b, off, len); } @Override public void close() throws IOException { closed = true; - setIdle.accept(true); input.close(); } }; diff --git a/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/PulseaudioSimpleProtocolStream.java b/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/PulseaudioSimpleProtocolStream.java index 98c8bfb3d78..8889a8ea300 100644 --- a/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/PulseaudioSimpleProtocolStream.java +++ b/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/PulseaudioSimpleProtocolStream.java @@ -18,6 +18,7 @@ import java.util.Locale; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; import org.eclipse.jdt.annotation.NonNullByDefault; import org.eclipse.jdt.annotation.Nullable; @@ -43,7 +44,8 @@ public abstract class PulseaudioSimpleProtocolStream { protected @Nullable Socket clientSocket; - private boolean isIdle = true; + private ReentrantLock countClientLock = new ReentrantLock(); + private Integer countClient = 0; private @Nullable ScheduledFuture scheduledDisconnection; @@ -54,6 +56,7 @@ public abstract class PulseaudioSimpleProtocolStream { /** * Connect to pulseaudio with the simple protocol + * Will schedule an attempt for disconnection after timeout * * @throws IOException * @throws InterruptedException when interrupted during the loading module wait @@ -61,12 +64,13 @@ public abstract class PulseaudioSimpleProtocolStream { public void connectIfNeeded() throws IOException, InterruptedException { Socket clientSocketLocal = clientSocket; if (clientSocketLocal == null || !clientSocketLocal.isConnected() || clientSocketLocal.isClosed()) { - logger.debug("Simple TCP Stream connecting"); + logger.debug("Simple TCP Stream connecting for {}", getLabel(null)); String host = pulseaudioHandler.getHost(); int port = pulseaudioHandler.getSimpleTcpPortAndLoadModuleIfNecessary(); var clientSocketFinal = new Socket(host, port); clientSocketFinal.setSoTimeout(pulseaudioHandler.getBasicProtocolSOTimeout()); clientSocket = clientSocketFinal; + scheduleDisconnectIfNoClient(); } } @@ -75,8 +79,8 @@ public abstract class PulseaudioSimpleProtocolStream { */ public void disconnect() { final Socket clientSocketLocal = clientSocket; - if (clientSocketLocal != null && isIdle) { - logger.debug("Simple TCP Stream disconnecting"); + if (clientSocketLocal != null) { + logger.debug("Simple TCP Stream disconnecting for {}", getLabel(null)); try { clientSocketLocal.close(); } catch (IOException ignored) { @@ -86,15 +90,23 @@ public abstract class PulseaudioSimpleProtocolStream { } } - public void scheduleDisconnect() { - var scheduledDisconnectionFinal = scheduledDisconnection; - if (scheduledDisconnectionFinal != null) { - scheduledDisconnectionFinal.cancel(true); - } - int idleTimeout = pulseaudioHandler.getIdleTimeout(); - if (idleTimeout > -1) { - logger.debug("Scheduling disconnect"); - scheduledDisconnection = scheduler.schedule(this::disconnect, idleTimeout, TimeUnit.MILLISECONDS); + private void scheduleDisconnectIfNoClient() { + countClientLock.lock(); + try { + if (countClient <= 0) { + var scheduledDisconnectionFinal = scheduledDisconnection; + if (scheduledDisconnectionFinal != null) { + logger.debug("Aborting next disconnect"); + scheduledDisconnectionFinal.cancel(true); + } + int idleTimeout = pulseaudioHandler.getIdleTimeout(); + if (idleTimeout > -1) { + logger.debug("Scheduling next disconnect"); + scheduledDisconnection = scheduler.schedule(this::disconnect, idleTimeout, TimeUnit.MILLISECONDS); + } + } + } finally { + countClientLock.unlock(); } } @@ -115,7 +127,35 @@ public abstract class PulseaudioSimpleProtocolStream { return label != null ? label : pulseaudioHandler.getThing().getUID().getId(); } - public void setIdle(boolean idle) { - isIdle = idle; + protected void addClientCount() { + countClientLock.lock(); + try { + countClient += 1; + logger.debug("Adding new client for pulseaudio sink/source {}. Current count: {}", getLabel(null), + countClient); + if (countClient <= 0) { // safe against misuse + countClient = 1; + } + var scheduledDisconnectionFinal = scheduledDisconnection; + if (scheduledDisconnectionFinal != null) { + logger.debug("Aborting next disconnect"); + scheduledDisconnectionFinal.cancel(true); + } + } finally { + countClientLock.unlock(); + } + } + + protected void minusClientCount() { + countClientLock.lock(); + countClient -= 1; + logger.debug("Removing client for pulseaudio sink/source {}. Current count: {}", getLabel(null), countClient); + if (countClient < 0) { // safe against misuse + countClient = 0; + } + countClientLock.unlock(); + if (countClient <= 0) { + scheduleDisconnectIfNoClient(); + } } } diff --git a/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/cli/Parser.java b/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/cli/Parser.java index b9bcd09074e..e5bdf08cf27 100644 --- a/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/cli/Parser.java +++ b/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/cli/Parser.java @@ -352,7 +352,7 @@ public class Parser { private static int parseVolume(String vol) { int volumeTotal = 0; int nChannels = 0; - for (String channel : vol.split(", ")) { + for (String channel : vol.split(",")) { Matcher matcher = VOLUME_PATTERN.matcher(channel.trim()); if (matcher.find()) { volumeTotal += Integer.valueOf(matcher.group(3)); diff --git a/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/handler/PulseaudioHandler.java b/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/handler/PulseaudioHandler.java index a4f48ef0d25..502f2301981 100644 --- a/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/handler/PulseaudioHandler.java +++ b/bundles/org.openhab.binding.pulseaudio/src/main/java/org/openhab/binding/pulseaudio/internal/handler/PulseaudioHandler.java @@ -140,8 +140,6 @@ public class PulseaudioHandler extends BaseThingHandler { } catch (InterruptedException i) { logger.info("Interrupted during sink audio connection: {}", i.getMessage()); return; - } finally { - audioSink.scheduleDisconnect(); } } }); @@ -194,8 +192,6 @@ public class PulseaudioHandler extends BaseThingHandler { } catch (InterruptedException i) { logger.info("Interrupted during source audio connection: {}", i.getMessage()); return; - } finally { - audioSource.scheduleDisconnect(); } } });