[pulseaudio] source: use thread safe collection and force reconnection (#12441)

* [pulseaudio] use thread safe collection
* [pulseaudio] source: connect pipe before store ref
* [pulseaudio] source: improve warning messages
* [pulseaudio] fix IOException when closing all sources
* [pulseaudio] prevent warning when InterruptedIOException on source close

Signed-off-by: Miguel Álvarez Díez <miguelwork92@gmail.com>
This commit is contained in:
GiviMAD 2022-03-18 00:41:00 +01:00 committed by GitHub
parent 409a4a6fac
commit fee7f7e9c5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -14,11 +14,13 @@ package org.openhab.binding.pulseaudio.internal;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.PipedInputStream; import java.io.PipedInputStream;
import java.io.PipedOutputStream; import java.io.PipedOutputStream;
import java.net.Socket; import java.net.Socket;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer; import java.util.function.Consumer;
@ -44,7 +46,7 @@ import org.slf4j.LoggerFactory;
public class PulseAudioAudioSource extends PulseaudioSimpleProtocolStream implements AudioSource { public class PulseAudioAudioSource extends PulseaudioSimpleProtocolStream implements AudioSource {
private final Logger logger = LoggerFactory.getLogger(PulseAudioAudioSource.class); private final Logger logger = LoggerFactory.getLogger(PulseAudioAudioSource.class);
private final Set<PipedOutputStream> pipeOutputs = new HashSet<>(); private final ConcurrentLinkedQueue<PipedOutputStream> pipeOutputs = new ConcurrentLinkedQueue<>();
private final ScheduledExecutorService executor; private final ScheduledExecutorService executor;
private @Nullable Future<?> pipeWriteTask; private @Nullable Future<?> pipeWriteTask;
@ -84,14 +86,14 @@ public class PulseAudioAudioSource extends PulseaudioSimpleProtocolStream implem
} }
setIdle(true); setIdle(true);
var pipeOutput = new PipedOutputStream(); var pipeOutput = new PipedOutputStream();
registerPipe(pipeOutput); var pipeInput = new PipedInputStream(pipeOutput, 1024 * 10) {
var pipeInput = new PipedInputStream(pipeOutput, 1024 * 20) {
@Override @Override
public void close() throws IOException { public void close() throws IOException {
unregisterPipe(pipeOutput); unregisterPipe(pipeOutput);
super.close(); super.close();
} }
}; };
registerPipe(pipeOutput);
// get raw audio from the pulse audio socket // get raw audio from the pulse audio socket
return new PulseAudioStream(sourceFormat, pipeInput, (idle) -> { return new PulseAudioStream(sourceFormat, pipeInput, (idle) -> {
setIdle(idle); setIdle(idle);
@ -103,7 +105,7 @@ public class PulseAudioAudioSource extends PulseaudioSimpleProtocolStream implem
} }
}); });
} catch (IOException e) { } catch (IOException e) {
disconnect(); // disconnect force to clear connection in case of socket not cleanly shutdown disconnect(); // disconnect to force clear connection in case of socket not cleanly shutdown
if (countAttempt == 2) { // we won't retry : log and quit if (countAttempt == 2) { // we won't retry : log and quit
String port = clientSocket != null ? Integer.toString(clientSocket.getPort()) : "unknown"; String port = clientSocket != null ? Integer.toString(clientSocket.getPort()) : "unknown";
logger.warn( logger.warn(
@ -133,24 +135,46 @@ public class PulseAudioAudioSource extends PulseaudioSimpleProtocolStream implem
startPipeWrite(); startPipeWrite();
} }
private void startPipeWrite() { private synchronized void startPipeWrite() {
if (pipeWriteTask == null) { if (this.pipeWriteTask == null) {
this.pipeWriteTask = executor.submit(() -> { this.pipeWriteTask = executor.submit(() -> {
int lengthRead; int lengthRead;
byte[] buffer = new byte[1024]; byte[] buffer = new byte[1024];
int readRetries = 3;
while (!pipeOutputs.isEmpty()) { while (!pipeOutputs.isEmpty()) {
var stream = getSourceInputStream(); var stream = getSourceInputStream();
if (stream != null) { if (stream != null) {
try { try {
lengthRead = stream.read(buffer); lengthRead = stream.read(buffer);
readRetries = 3;
for (var output : pipeOutputs) { for (var output : pipeOutputs) {
try {
output.write(buffer, 0, lengthRead); output.write(buffer, 0, lengthRead);
if (pipeOutputs.contains(output)) {
output.flush(); output.flush();
} }
} catch (IOException e) { } catch (IOException e) {
logger.warn("IOException while reading from pulse source: {}", e.getMessage()); if (e instanceof InterruptedIOException && pipeOutputs.isEmpty()) {
// task has been ended while writing
return;
}
logger.warn("IOException while writing to from pulse source pipe: {}",
getExceptionMessage(e));
} catch (RuntimeException e) { } catch (RuntimeException e) {
logger.warn("RuntimeException while reading from pulse source: {}", e.getMessage()); logger.warn("RuntimeException while writing to pulse source pipe: {}",
getExceptionMessage(e));
}
}
} catch (IOException e) {
logger.warn("IOException while reading from pulse source: {}", getExceptionMessage(e));
if (readRetries == 0) {
// force reconnection on persistent IOException
super.disconnect();
} else {
readRetries--;
}
} catch (RuntimeException e) {
logger.warn("RuntimeException while reading from pulse source: {}", getExceptionMessage(e));
} }
} else { } else {
logger.warn("Unable to get source input stream"); logger.warn("Unable to get source input stream");
@ -163,6 +187,10 @@ public class PulseAudioAudioSource extends PulseaudioSimpleProtocolStream implem
private synchronized void unregisterPipe(PipedOutputStream pipeOutput) { private synchronized void unregisterPipe(PipedOutputStream pipeOutput) {
this.pipeOutputs.remove(pipeOutput); this.pipeOutputs.remove(pipeOutput);
try {
Thread.sleep(0);
} catch (InterruptedException ignored) {
}
stopPipeWriteTask(); stopPipeWriteTask();
try { try {
pipeOutput.close(); pipeOutput.close();
@ -170,7 +198,7 @@ public class PulseAudioAudioSource extends PulseaudioSimpleProtocolStream implem
} }
} }
private void stopPipeWriteTask() { private synchronized void stopPipeWriteTask() {
var pipeWriteTask = this.pipeWriteTask; var pipeWriteTask = this.pipeWriteTask;
if (pipeOutputs.isEmpty() && pipeWriteTask != null) { if (pipeOutputs.isEmpty() && pipeWriteTask != null) {
pipeWriteTask.cancel(true); pipeWriteTask.cancel(true);
@ -178,6 +206,15 @@ public class PulseAudioAudioSource extends PulseaudioSimpleProtocolStream implem
} }
} }
private @Nullable String getExceptionMessage(Exception e) {
String message = e.getMessage();
var cause = e.getCause();
if (message == null && cause != null) {
message = cause.getMessage();
}
return message;
}
private @Nullable InputStream getSourceInputStream() { private @Nullable InputStream getSourceInputStream() {
try { try {
connectIfNeeded(); connectIfNeeded();