[ipcamera] Fix multiple mjpeg issues and allow stream to stay alive (#11921)

* Fix for a camera that has a space in boundary
* Fixes to ipcamera.mjpeg

Signed-off-by: Matthew Skinner <matt@pcmus.com>
This commit is contained in:
Matthew Skinner 2022-01-09 23:42:16 +11:00 committed by GitHub
parent 46ba275edb
commit cb3496f967
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 54 additions and 25 deletions

View File

@ -212,7 +212,7 @@ public class DahuaHandler extends ChannelDuplexHandler {
} }
try { try {
String content = msg.toString(); String content = msg.toString();
if (content.startsWith("--myboundary")) { if (content.startsWith("--myboundary") || content.startsWith("-- myboundary")) {
processEvent(content); processEvent(content);
return; return;
} }

View File

@ -232,17 +232,17 @@ public class IpCameraHandler extends BaseThingHandler {
} }
} }
if (contentType.contains("multipart")) { if (contentType.contains("multipart")) {
boundary = Helper.searchString(contentType, "boundary=");
if (mjpegUri.equals(requestUrl)) { if (mjpegUri.equals(requestUrl)) {
if (msg instanceof HttpMessage) { if (msg instanceof HttpMessage) {
// very start of stream only // very start of stream only
mjpegContentType = contentType; mjpegContentType = contentType;
CameraServlet localServlet = servlet; CameraServlet localServlet = servlet;
if (localServlet != null) { if (localServlet != null) {
localServlet.openStreams.updateContentType(contentType); logger.debug("Setting Content-Type to:{}", contentType);
localServlet.openStreams.updateContentType(contentType, boundary);
} }
} }
} else {
boundary = Helper.searchString(contentType, "boundary=");
} }
} else if (contentType.contains("image/jp")) { } else if (contentType.contains("image/jp")) {
if (bytesToRecieve == 0) { if (bytesToRecieve == 0) {
@ -669,8 +669,13 @@ public class IpCameraHandler extends BaseThingHandler {
} }
public void openCamerasStream() { public void openCamerasStream() {
if (mjpegUri.isEmpty() || "ffmpeg".equals(mjpegUri)) {
setupFfmpegFormat(FFmpegFormat.MJPEG);
return;
}
closeChannel(getTinyUrl(mjpegUri)); closeChannel(getTinyUrl(mjpegUri));
mainEventLoopGroup.schedule(this::openMjpegStream, 0, TimeUnit.MILLISECONDS); // Dahua cameras crash if you refresh (close and open) the stream without this delay.
mainEventLoopGroup.schedule(this::openMjpegStream, 300, TimeUnit.MILLISECONDS);
} }
private void openMjpegStream() { private void openMjpegStream() {
@ -1311,6 +1316,12 @@ public class IpCameraHandler extends BaseThingHandler {
pollCameraJob = threadPool.scheduleWithFixedDelay(this::pollCameraRunnable, 1000, 8000, TimeUnit.MILLISECONDS); pollCameraJob = threadPool.scheduleWithFixedDelay(this::pollCameraRunnable, 1000, 8000, TimeUnit.MILLISECONDS);
// auto restart mjpeg stream now camera is back online.
CameraServlet localServlet = servlet;
if (localServlet != null && !localServlet.openStreams.isEmpty()) {
openCamerasStream();
}
if (!rtspUri.isEmpty()) { if (!rtspUri.isEmpty()) {
updateState(CHANNEL_RTSP_URL, new StringType(rtspUri)); updateState(CHANNEL_RTSP_URL, new StringType(rtspUri));
} }
@ -1342,6 +1353,7 @@ public class IpCameraHandler extends BaseThingHandler {
} }
void pollingCameraConnection() { void pollingCameraConnection() {
keepMjpegRunning();
if (thing.getThingTypeUID().getId().equals(GENERIC_THING)) { if (thing.getThingTypeUID().getId().equals(GENERIC_THING)) {
if (rtspUri.isEmpty()) { if (rtspUri.isEmpty()) {
logger.warn("Binding has not been supplied with a FFmpeg Input URL, so some features will not work."); logger.warn("Binding has not been supplied with a FFmpeg Input URL, so some features will not work.");
@ -1643,7 +1655,17 @@ public class IpCameraHandler extends BaseThingHandler {
// Only use ONVIF events if it is not an API camera. // Only use ONVIF events if it is not an API camera.
onvifCamera.connect(thing.getThingTypeUID().getId().equals(ONVIF_THING)); onvifCamera.connect(thing.getThingTypeUID().getId().equals(ONVIF_THING));
} }
cameraConnectionJob = threadPool.scheduleWithFixedDelay(this::pollingCameraConnection, 4, 30, TimeUnit.SECONDS); cameraConnectionJob = threadPool.scheduleWithFixedDelay(this::pollingCameraConnection, 4, 8, TimeUnit.SECONDS);
}
private void keepMjpegRunning() {
CameraServlet localServlet = servlet;
if (localServlet != null && !localServlet.openStreams.isEmpty()) {
if (!mjpegUri.isEmpty() && !"ffmpeg".equals(mjpegUri)) {
localServlet.openStreams.queueFrame(("--" + localServlet.openStreams.boundary + "\r\n\r\n").getBytes());
}
localServlet.openStreams.queueFrame(getSnapshot());
}
} }
// What the camera needs to re-connect if the initialize() is not called. // What the camera needs to re-connect if the initialize() is not called.

View File

@ -175,17 +175,17 @@ public class CameraServlet extends IpCameraServlet {
} }
} while (true); } while (true);
case "/ipcamera.mjpeg": case "/ipcamera.mjpeg":
if (handler.mjpegUri.isEmpty() || "ffmpeg".equals(handler.mjpegUri)) {
if (openStreams.isEmpty()) { if (openStreams.isEmpty()) {
handler.setupFfmpegFormat(FFmpegFormat.MJPEG);
}
output = new StreamOutput(resp);
openStreams.addStream(output);
} else if (openStreams.isEmpty()) {
logger.debug("First stream requested, opening up stream from camera"); logger.debug("First stream requested, opening up stream from camera");
handler.openCamerasStream(); handler.openCamerasStream();
if (handler.mjpegUri.isEmpty() || "ffmpeg".equals(handler.mjpegUri)) {
output = new StreamOutput(resp);
} else {
output = new StreamOutput(resp, handler.mjpegContentType); output = new StreamOutput(resp, handler.mjpegContentType);
openStreams.addStream(output); }
} else {
if (handler.mjpegUri.isEmpty() || "ffmpeg".equals(handler.mjpegUri)) {
output = new StreamOutput(resp);
} else { } else {
ChannelTracking tracker = handler.channelTrackingMap.get(handler.mjpegUri); ChannelTracking tracker = handler.channelTrackingMap.get(handler.mjpegUri);
if (tracker == null || !tracker.getChannel().isOpen()) { if (tracker == null || !tracker.getChannel().isOpen()) {
@ -194,8 +194,9 @@ public class CameraServlet extends IpCameraServlet {
openStreams.closeAllStreams(); openStreams.closeAllStreams();
} }
output = new StreamOutput(resp, handler.mjpegContentType); output = new StreamOutput(resp, handler.mjpegContentType);
openStreams.addStream(output);
} }
}
openStreams.addStream(output);
do { do {
try { try {
output.sendFrame(); output.sendFrame();

View File

@ -29,6 +29,7 @@ import org.eclipse.jdt.annotation.NonNullByDefault;
@NonNullByDefault @NonNullByDefault
public class OpenStreams { public class OpenStreams {
private List<StreamOutput> openStreams = Collections.synchronizedList(new ArrayList<StreamOutput>()); private List<StreamOutput> openStreams = Collections.synchronizedList(new ArrayList<StreamOutput>());
public String boundary = "thisMjpegStream";
public synchronized void addStream(StreamOutput stream) { public synchronized void addStream(StreamOutput stream) {
openStreams.add(stream); openStreams.add(stream);
@ -46,7 +47,8 @@ public class OpenStreams {
return openStreams.isEmpty(); return openStreams.isEmpty();
} }
public synchronized void updateContentType(String contentType) { public synchronized void updateContentType(String contentType, String boundary) {
this.boundary = boundary;
for (StreamOutput stream : openStreams) { for (StreamOutput stream : openStreams) {
stream.updateContentType(contentType); stream.updateContentType(contentType);
} }

View File

@ -20,6 +20,8 @@ import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpServletResponse;
import org.eclipse.jdt.annotation.NonNullByDefault; import org.eclipse.jdt.annotation.NonNullByDefault;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* The {@link StreamOutput} Streams mjpeg out to a client * The {@link StreamOutput} Streams mjpeg out to a client
@ -29,11 +31,12 @@ import org.eclipse.jdt.annotation.NonNullByDefault;
@NonNullByDefault @NonNullByDefault
public class StreamOutput { public class StreamOutput {
public final Logger logger = LoggerFactory.getLogger(getClass());
private final HttpServletResponse response; private final HttpServletResponse response;
private final String boundary; private final String boundary;
private String contentType; private String contentType;
private final ServletOutputStream output; private final ServletOutputStream output;
private BlockingQueue<byte[]> fifo = new ArrayBlockingQueue<byte[]>(6); private BlockingQueue<byte[]> fifo = new ArrayBlockingQueue<byte[]>(30);
private boolean connected = false; private boolean connected = false;
public boolean isSnapshotBased = false; public boolean isSnapshotBased = false;
@ -76,6 +79,7 @@ public class StreamOutput {
try { try {
fifo.add(frame); fifo.add(frame);
} catch (IllegalStateException e) { } catch (IllegalStateException e) {
logger.debug("FIFO buffer has run out of space:{}", e.getMessage());
fifo.remove(); fifo.remove();
fifo.add(frame); fifo.add(frame);
} }