[pulseaudio] Make the process method asynchronous (#15179)

* [pulseaudio] Make the process method asynchronous

And use the new 'complete' system to signal core that the sound is fully played.

---------

Signed-off-by: Gwendal Roulleau <gwendal.roulleau@gmail.com>
This commit is contained in:
Gwendal Roulleau 2023-07-06 19:21:12 +02:00 committed by GitHub
parent 7e07abbcab
commit 94a761f84e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 84 additions and 28 deletions

View File

@ -28,7 +28,7 @@ import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.openhab.core.audio.AudioFormat;
import org.openhab.core.audio.AudioStream;
import org.openhab.core.audio.FixedLengthAudioStream;
import org.openhab.core.audio.SizeableAudioStream;
import org.openhab.core.audio.UnsupportedAudioFormatException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -58,8 +58,8 @@ public class ConvertedInputStream extends InputStream {
throws UnsupportedAudioFormatException, UnsupportedAudioFileException, IOException {
this.audioFormat = innerInputStream.getFormat();
if (innerInputStream instanceof FixedLengthAudioStream) {
length = ((FixedLengthAudioStream) innerInputStream).length();
if (innerInputStream instanceof SizeableAudioStream sizeableAudioStream) {
length = sizeableAudioStream.length();
}
pcmNormalizedInputStream = getPCMStreamNormalized(getPCMStream(new BufferedInputStream(innerInputStream)));

View File

@ -16,9 +16,10 @@ import java.io.IOException;
import java.net.Socket;
import java.time.Duration;
import java.time.Instant;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.sound.sampled.UnsupportedAudioFileException;
@ -28,9 +29,11 @@ import org.openhab.binding.pulseaudio.internal.handler.PulseaudioHandler;
import org.openhab.core.audio.AudioFormat;
import org.openhab.core.audio.AudioSink;
import org.openhab.core.audio.AudioStream;
import org.openhab.core.audio.FixedLengthAudioStream;
import org.openhab.core.audio.FileAudioStream;
import org.openhab.core.audio.UnsupportedAudioFormatException;
import org.openhab.core.audio.UnsupportedAudioStreamException;
import org.openhab.core.audio.utils.AudioSinkUtils;
import org.openhab.core.common.Disposable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -47,24 +50,29 @@ public class PulseAudioAudioSink extends PulseaudioSimpleProtocolStream implemen
private final Logger logger = LoggerFactory.getLogger(PulseAudioAudioSink.class);
private static final HashSet<AudioFormat> SUPPORTED_FORMATS = new HashSet<>();
private static final HashSet<Class<? extends AudioStream>> SUPPORTED_STREAMS = new HashSet<>();
private AudioSinkUtils audioSinkUtils;
static {
SUPPORTED_FORMATS.add(AudioFormat.WAV);
SUPPORTED_FORMATS.add(AudioFormat.MP3);
SUPPORTED_STREAMS.add(FixedLengthAudioStream.class);
}
private static final Set<AudioFormat> SUPPORTED_FORMATS = Set.of(AudioFormat.WAV, AudioFormat.MP3);
private static final Set<Class<? extends AudioStream>> SUPPORTED_STREAMS = Set.of(AudioStream.class);
private static final AudioFormat TARGET_FORMAT = new AudioFormat(AudioFormat.CONTAINER_WAVE,
AudioFormat.CODEC_PCM_SIGNED, false, 16, 4 * 44100, 44100L, 2);
public PulseAudioAudioSink(PulseaudioHandler pulseaudioHandler, ScheduledExecutorService scheduler) {
public PulseAudioAudioSink(PulseaudioHandler pulseaudioHandler, ScheduledExecutorService scheduler,
AudioSinkUtils audioSinkUtils) {
super(pulseaudioHandler, scheduler);
this.audioSinkUtils = audioSinkUtils;
}
@Override
public void process(@Nullable AudioStream audioStream)
throws UnsupportedAudioFormatException, UnsupportedAudioStreamException {
processAndComplete(audioStream);
}
@Override
public CompletableFuture<@Nullable Void> processAndComplete(@Nullable AudioStream audioStream) {
if (audioStream == null) {
return;
return CompletableFuture.completedFuture(null);
}
addClientCount();
try (ConvertedInputStream normalizedPCMStream = new ConvertedInputStream(audioStream)) {
@ -75,18 +83,38 @@ public class PulseAudioAudioSink extends PulseaudioSimpleProtocolStream implemen
if (clientSocketLocal != null) {
// send raw audio to the socket and to pulse audio
Instant start = Instant.now();
normalizedPCMStream.transferTo(clientSocketLocal.getOutputStream());
if (normalizedPCMStream.getDuration() != -1) { // ensure, if the sound has a duration
if (normalizedPCMStream.getDuration() != -1) {
// ensure, if the sound has a duration
// that we let at least this time for the system to play
normalizedPCMStream.transferTo(clientSocketLocal.getOutputStream());
Instant end = Instant.now();
long millisSecondTimedToSendAudioData = Duration.between(start, end).toMillis();
if (millisSecondTimedToSendAudioData < normalizedPCMStream.getDuration()) {
long timeToSleep = normalizedPCMStream.getDuration() - millisSecondTimedToSendAudioData;
logger.debug("Sleep time to let the system play sound : {}", timeToSleep);
Thread.sleep(timeToSleep);
CompletableFuture<@Nullable Void> soundPlayed = new CompletableFuture<>();
long timeToWait = normalizedPCMStream.getDuration() - millisSecondTimedToSendAudioData;
logger.debug("Some time to let the system play sound : {}", timeToWait);
scheduler.schedule(() -> soundPlayed.complete(null), timeToWait, TimeUnit.MILLISECONDS);
return soundPlayed;
} else {
return CompletableFuture.completedFuture(null);
}
} else {
// We have a second method available to guess the duration, and it is during transfer
Long timeStampEnd = audioSinkUtils.transferAndAnalyzeLength(normalizedPCMStream,
clientSocketLocal.getOutputStream(), TARGET_FORMAT);
CompletableFuture<@Nullable Void> soundPlayed = new CompletableFuture<>();
if (timeStampEnd != null) {
long now = System.nanoTime();
long timeToWait = timeStampEnd - now;
if (timeToWait > 0) {
scheduler.schedule(() -> soundPlayed.complete(null), timeToWait,
TimeUnit.NANOSECONDS);
}
return soundPlayed;
} else {
return CompletableFuture.completedFuture(null);
}
}
break;
}
} catch (IOException e) {
disconnect(); // disconnect force to clear connection in case of socket not cleanly shutdown
@ -97,19 +125,34 @@ public class PulseAudioAudioSink extends PulseaudioSimpleProtocolStream implemen
logger.warn(
"Error while trying to send audio to pulseaudio audio sink. Cannot connect to {}:{}, error: {}",
pulseaudioHandler.getHost(), port, e.getMessage());
break;
return CompletableFuture.completedFuture(null);
}
} catch (InterruptedException ie) {
logger.info("Interrupted during sink audio connection: {}", ie.getMessage());
break;
return CompletableFuture.completedFuture(null);
}
}
} catch (UnsupportedAudioFileException | IOException e) {
throw new UnsupportedAudioFormatException("Cannot send sound to the pulseaudio sink",
audioStream.getFormat(), e);
} catch (UnsupportedAudioFileException | UnsupportedAudioFormatException | IOException e) {
return CompletableFuture.failedFuture(new UnsupportedAudioFormatException(
"Cannot send sound to the pulseaudio sink", audioStream.getFormat(), e));
} finally {
minusClientCount();
// if the stream is not needed anymore, then we should call back the AudioStream to let it a chance
// to auto dispose.
if (audioStream instanceof Disposable disposableAudioStream) {
try {
disposableAudioStream.dispose();
} catch (IOException e) {
String fileName = audioStream instanceof FileAudioStream file ? file.toString() : "unknown";
if (logger.isDebugEnabled()) {
logger.debug("Cannot dispose of stream {}", fileName, e);
} else {
logger.warn("Cannot dispose of stream {}, reason {}", fileName, e.getMessage());
}
}
}
}
return CompletableFuture.completedFuture(null);
}
@Override

View File

@ -25,6 +25,7 @@ import org.eclipse.jdt.annotation.Nullable;
import org.openhab.binding.pulseaudio.internal.discovery.PulseaudioDeviceDiscoveryService;
import org.openhab.binding.pulseaudio.internal.handler.PulseaudioBridgeHandler;
import org.openhab.binding.pulseaudio.internal.handler.PulseaudioHandler;
import org.openhab.core.audio.utils.AudioSinkUtils;
import org.openhab.core.config.core.Configuration;
import org.openhab.core.config.discovery.DiscoveryService;
import org.openhab.core.thing.Bridge;
@ -39,6 +40,7 @@ import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Modified;
import org.osgi.service.component.annotations.Reference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -61,6 +63,13 @@ public class PulseaudioHandlerFactory extends BaseThingHandlerFactory {
private PulseAudioBindingConfiguration configuration = new PulseAudioBindingConfiguration();
private AudioSinkUtils audioSinkUtils;
@Activate
public PulseaudioHandlerFactory(@Reference AudioSinkUtils audioSinkUtils) {
this.audioSinkUtils = audioSinkUtils;
}
@Override
public boolean supportsThingType(ThingTypeUID thingTypeUID) {
return SUPPORTED_THING_TYPES_UIDS.contains(thingTypeUID);
@ -119,7 +128,7 @@ public class PulseaudioHandlerFactory extends BaseThingHandlerFactory {
registerDeviceDiscoveryService(handler);
return handler;
} else if (PulseaudioHandler.SUPPORTED_THING_TYPES_UIDS.contains(thingTypeUID)) {
return new PulseaudioHandler(thing, bundleContext);
return new PulseaudioHandler(thing, bundleContext, audioSinkUtils);
}
return null;

View File

@ -40,6 +40,7 @@ import org.openhab.binding.pulseaudio.internal.items.Source;
import org.openhab.core.audio.AudioFormat;
import org.openhab.core.audio.AudioSink;
import org.openhab.core.audio.AudioSource;
import org.openhab.core.audio.utils.AudioSinkUtils;
import org.openhab.core.config.core.Configuration;
import org.openhab.core.library.types.DecimalType;
import org.openhab.core.library.types.IncreaseDecreaseType;
@ -89,9 +90,12 @@ public class PulseaudioHandler extends BaseThingHandler {
private final BundleContext bundleContext;
public PulseaudioHandler(Thing thing, BundleContext bundleContext) {
private AudioSinkUtils audioSinkUtils;
public PulseaudioHandler(Thing thing, BundleContext bundleContext, AudioSinkUtils audioSinkUtils) {
super(thing);
this.bundleContext = bundleContext;
this.audioSinkUtils = audioSinkUtils;
}
@Override
@ -127,7 +131,7 @@ public class PulseaudioHandler extends BaseThingHandler {
return;
}
final PulseaudioHandler thisHandler = this;
PulseAudioAudioSink audioSink = new PulseAudioAudioSink(thisHandler, scheduler);
PulseAudioAudioSink audioSink = new PulseAudioAudioSink(thisHandler, scheduler, audioSinkUtils);
scheduler.submit(new Runnable() {
@Override
public void run() {