From 0e99ef6013594234a9f2d762a711b13fd4f9a21e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Dywicki?= Date: Tue, 25 Jun 2024 21:27:43 +0200 Subject: [PATCH] Make sure that energy meter packets are not queued up. (#16841) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Hide `open()` socket call beneath PacketListener, so caller do not need to care about that. Signed-off-by: Łukasz Dywicki Signed-off-by: Ciprian Pascu --- .../README.md | 12 ++--- .../SMAEnergyMeterDiscoveryService.java | 1 - .../handler/SMAEnergyMeterHandler.java | 25 ++++++----- .../packet/DefaultPacketListenerRegistry.java | 11 +++-- .../packet/FilteringPayloadHandler.java | 42 +++++++++++++++++ .../internal/packet/PacketListener.java | 42 ++++++++++------- .../packet/ThrottlingPayloadHandler.java | 45 +++++++++++++++++++ 7 files changed, 138 insertions(+), 40 deletions(-) create mode 100644 bundles/org.openhab.binding.smaenergymeter/src/main/java/org/openhab/binding/smaenergymeter/internal/packet/FilteringPayloadHandler.java create mode 100644 bundles/org.openhab.binding.smaenergymeter/src/main/java/org/openhab/binding/smaenergymeter/internal/packet/ThrottlingPayloadHandler.java diff --git a/bundles/org.openhab.binding.smaenergymeter/README.md b/bundles/org.openhab.binding.smaenergymeter/README.md index 8f6be8db0d0..eb5ef308802 100644 --- a/bundles/org.openhab.binding.smaenergymeter/README.md +++ b/bundles/org.openhab.binding.smaenergymeter/README.md @@ -20,12 +20,12 @@ No binding configuration required. Usually no manual configuration is required, as the multicast IP address and the port remain on their factory set values. Optionally, a refresh interval (in seconds) can be defined. -| Parameter | Name | Description | Required | Default | -|------------------|-----------------|---------------------------------------|----------|-----------------| -| `serialNumber` | Serial number | Serial number of a meter. | yes | | -| `mcastGroup` | Multicast Group | Multicast group used by meter. | yes | 239.12.255.254 | -| `port` | Port | Port number used by meter. | no | 9522 | -| `pollingPeriod` | Polling Period | Polling period used to readout meter. | no | 30 | +| Parameter | Name | Description | Required | Default | +|------------------|-----------------|------------------------------------------------------------|----------|-----------------| +| `serialNumber` | Serial number | Serial number of a meter. | yes | | +| `mcastGroup` | Multicast Group | Multicast group used by meter. | yes | 239.12.255.254 | +| `port` | Port | Port number used by meter. | no | 9522 | +| `pollingPeriod` | Polling Period | Polling period used to publish meter reading (in seconds). | no | 30 | The polling period parameter is used to trigger readout of meter. In case if two consecutive readout attempts fail thing will report offline status. diff --git a/bundles/org.openhab.binding.smaenergymeter/src/main/java/org/openhab/binding/smaenergymeter/internal/discovery/SMAEnergyMeterDiscoveryService.java b/bundles/org.openhab.binding.smaenergymeter/src/main/java/org/openhab/binding/smaenergymeter/internal/discovery/SMAEnergyMeterDiscoveryService.java index e959f046646..295f3926665 100644 --- a/bundles/org.openhab.binding.smaenergymeter/src/main/java/org/openhab/binding/smaenergymeter/internal/discovery/SMAEnergyMeterDiscoveryService.java +++ b/bundles/org.openhab.binding.smaenergymeter/src/main/java/org/openhab/binding/smaenergymeter/internal/discovery/SMAEnergyMeterDiscoveryService.java @@ -74,7 +74,6 @@ public class SMAEnergyMeterDiscoveryService extends AbstractDiscoveryService imp try { packetListener = listenerRegistry.getListener(PacketListener.DEFAULT_MCAST_GRP, PacketListener.DEFAULT_MCAST_PORT); - packetListener.open(30); } catch (IOException e) { logger.warn("Could not start background discovery", e); return; diff --git a/bundles/org.openhab.binding.smaenergymeter/src/main/java/org/openhab/binding/smaenergymeter/internal/handler/SMAEnergyMeterHandler.java b/bundles/org.openhab.binding.smaenergymeter/src/main/java/org/openhab/binding/smaenergymeter/internal/handler/SMAEnergyMeterHandler.java index 6308a4bf16a..be470a88191 100644 --- a/bundles/org.openhab.binding.smaenergymeter/src/main/java/org/openhab/binding/smaenergymeter/internal/handler/SMAEnergyMeterHandler.java +++ b/bundles/org.openhab.binding.smaenergymeter/src/main/java/org/openhab/binding/smaenergymeter/internal/handler/SMAEnergyMeterHandler.java @@ -15,12 +15,15 @@ package org.openhab.binding.smaenergymeter.internal.handler; import static org.openhab.binding.smaenergymeter.internal.SMAEnergyMeterBindingConstants.*; import java.io.IOException; +import java.util.concurrent.TimeUnit; import org.eclipse.jdt.annotation.Nullable; import org.openhab.binding.smaenergymeter.internal.configuration.EnergyMeterConfig; +import org.openhab.binding.smaenergymeter.internal.packet.FilteringPayloadHandler; import org.openhab.binding.smaenergymeter.internal.packet.PacketListener; import org.openhab.binding.smaenergymeter.internal.packet.PacketListenerRegistry; import org.openhab.binding.smaenergymeter.internal.packet.PayloadHandler; +import org.openhab.binding.smaenergymeter.internal.packet.ThrottlingPayloadHandler; import org.openhab.core.thing.ChannelUID; import org.openhab.core.thing.Thing; import org.openhab.core.thing.ThingStatus; @@ -42,7 +45,8 @@ public class SMAEnergyMeterHandler extends BaseThingHandler implements PayloadHa private final Logger logger = LoggerFactory.getLogger(SMAEnergyMeterHandler.class); private final PacketListenerRegistry listenerRegistry; private @Nullable PacketListener listener; - private @Nullable String serialNumber; + private @Nullable PayloadHandler handler; + private String serialNumber; public SMAEnergyMeterHandler(Thing thing, PacketListenerRegistry listenerRegistry) { super(thing); @@ -84,9 +88,13 @@ public class SMAEnergyMeterHandler extends BaseThingHandler implements PayloadHa updateStatus(ThingStatus.UNKNOWN); logger.debug("Activated handler for SMA Energy Meter with S/N '{}'", serialNumber); - listener.addPayloadHandler(this); - - listener.open(config.getPollingPeriod()); + if (config.getPollingPeriod() <= 1) { + listener.addPayloadHandler(handler = new FilteringPayloadHandler(this, serialNumber)); + } else { + listener.addPayloadHandler(handler = new FilteringPayloadHandler( + new ThrottlingPayloadHandler(this, TimeUnit.SECONDS.toMillis(config.getPollingPeriod())), + serialNumber)); + } this.listener = listener; logger.debug("Polling job scheduled to run every {} sec. for '{}'", config.getPollingPeriod(), getThing().getUID()); @@ -100,18 +108,15 @@ public class SMAEnergyMeterHandler extends BaseThingHandler implements PayloadHa public void dispose() { logger.debug("Disposing SMAEnergyMeter handler '{}'", getThing().getUID()); PacketListener listener = this.listener; - if (listener != null) { - listener.removePayloadHandler(this); + PayloadHandler handler = this.handler; + if (listener != null && handler != null) { + listener.removePayloadHandler(handler); this.listener = null; } } @Override public void handle(EnergyMeter energyMeter) { - String serialNumber = this.serialNumber; - if (serialNumber == null || !serialNumber.equals(energyMeter.getSerialNumber())) { - return; - } updateStatus(ThingStatus.ONLINE); logger.debug("Update SMAEnergyMeter {} data '{}'", serialNumber, getThing().getUID()); diff --git a/bundles/org.openhab.binding.smaenergymeter/src/main/java/org/openhab/binding/smaenergymeter/internal/packet/DefaultPacketListenerRegistry.java b/bundles/org.openhab.binding.smaenergymeter/src/main/java/org/openhab/binding/smaenergymeter/internal/packet/DefaultPacketListenerRegistry.java index a479d6acae8..9acd53572a9 100644 --- a/bundles/org.openhab.binding.smaenergymeter/src/main/java/org/openhab/binding/smaenergymeter/internal/packet/DefaultPacketListenerRegistry.java +++ b/bundles/org.openhab.binding.smaenergymeter/src/main/java/org/openhab/binding/smaenergymeter/internal/packet/DefaultPacketListenerRegistry.java @@ -16,7 +16,6 @@ import java.io.IOException; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -24,6 +23,7 @@ import java.util.concurrent.TimeUnit; import org.eclipse.jdt.annotation.NonNullByDefault; import org.openhab.binding.smaenergymeter.internal.SMAEnergyMeterBindingConstants; import org.openhab.binding.smaenergymeter.internal.packet.PacketListener.ReceivingTask; +import org.openhab.core.common.ThreadPoolManager; import org.osgi.service.component.annotations.Component; import org.osgi.service.component.annotations.Deactivate; import org.slf4j.Logger; @@ -40,9 +40,8 @@ import org.slf4j.LoggerFactory; public class DefaultPacketListenerRegistry implements PacketListenerRegistry { private final Logger logger = LoggerFactory.getLogger(DefaultPacketListenerRegistry.class); - private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, - (runnable) -> new Thread(runnable, - "OH-binding-" + SMAEnergyMeterBindingConstants.BINDING_ID + "-listener")); + private final ScheduledExecutorService scheduler = ThreadPoolManager + .getScheduledPool("OH-binding-" + SMAEnergyMeterBindingConstants.BINDING_ID + "-listener"); private final Map listeners = new ConcurrentHashMap<>(); @Override @@ -68,8 +67,8 @@ public class DefaultPacketListenerRegistry implements PacketListenerRegistry { scheduler.shutdownNow(); } - public ScheduledFuture addTask(Runnable runnable, int intervalSec) { - return scheduler.scheduleWithFixedDelay(runnable, 0, intervalSec, TimeUnit.SECONDS); + public ScheduledFuture addTask(ReceivingTask runnable) { + return scheduler.scheduleWithFixedDelay(runnable, 0, 1000, TimeUnit.MILLISECONDS); } public void execute(ReceivingTask receivingTask) { diff --git a/bundles/org.openhab.binding.smaenergymeter/src/main/java/org/openhab/binding/smaenergymeter/internal/packet/FilteringPayloadHandler.java b/bundles/org.openhab.binding.smaenergymeter/src/main/java/org/openhab/binding/smaenergymeter/internal/packet/FilteringPayloadHandler.java new file mode 100644 index 00000000000..576af21c007 --- /dev/null +++ b/bundles/org.openhab.binding.smaenergymeter/src/main/java/org/openhab/binding/smaenergymeter/internal/packet/FilteringPayloadHandler.java @@ -0,0 +1,42 @@ +/** + * Copyright (c) 2010-2024 Contributors to the openHAB project + * + * See the NOTICE file(s) distributed with this work for additional + * information. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.openhab.binding.smaenergymeter.internal.packet; + +import java.io.IOException; + +import org.eclipse.jdt.annotation.NonNullByDefault; +import org.openhab.binding.smaenergymeter.internal.handler.EnergyMeter; + +/** + * Payload handler which define acceptance criteria for received meter data. + * + * @author Łukasz Dywicki - Initial contribution + */ +@NonNullByDefault +public class FilteringPayloadHandler implements PayloadHandler { + + private final PayloadHandler delegate; + private final String serialNumber; + + public FilteringPayloadHandler(PayloadHandler delegate, String serialNumber) { + this.delegate = delegate; + this.serialNumber = serialNumber; + } + + @Override + public void handle(EnergyMeter energyMeter) throws IOException { + if (this.serialNumber.equals(energyMeter.getSerialNumber())) { + delegate.handle(energyMeter); + } + } +} diff --git a/bundles/org.openhab.binding.smaenergymeter/src/main/java/org/openhab/binding/smaenergymeter/internal/packet/PacketListener.java b/bundles/org.openhab.binding.smaenergymeter/src/main/java/org/openhab/binding/smaenergymeter/internal/packet/PacketListener.java index 976c617712c..49a4a911dc2 100644 --- a/bundles/org.openhab.binding.smaenergymeter/src/main/java/org/openhab/binding/smaenergymeter/internal/packet/PacketListener.java +++ b/bundles/org.openhab.binding.smaenergymeter/src/main/java/org/openhab/binding/smaenergymeter/internal/packet/PacketListener.java @@ -56,6 +56,9 @@ public class PacketListener { } public void addPayloadHandler(PayloadHandler handler) { + if (handlers.isEmpty()) { + open(); + } handlers.add(handler); } @@ -72,18 +75,22 @@ public class PacketListener { return socket != null && socket.isConnected(); } - public void open(int intervalSec) throws IOException { + private void open() { if (isOpen()) { // no need to bind socket second time return; } - MulticastSocket socket = new MulticastSocket(port); - socket.setSoTimeout(5000); - InetAddress address = InetAddress.getByName(multicastGroup); - socket.joinGroup(address); + try { + MulticastSocket socket = new MulticastSocket(port); + socket.setSoTimeout(5000); + InetAddress address = InetAddress.getByName(multicastGroup); + socket.joinGroup(address); - future = registry.addTask(new ReceivingTask(socket, multicastGroup + ":" + port, handlers), intervalSec); - this.socket = socket; + future = registry.addTask(new ReceivingTask(socket, multicastGroup + ":" + port, handlers)); + this.socket = socket; + } catch (IOException e) { + throw new RuntimeException("Could not open socket", e); + } } void close() throws IOException { @@ -122,24 +129,25 @@ public class PacketListener { } public void run() { - try { - byte[] bytes = new byte[608]; - DatagramPacket msgPacket = new DatagramPacket(bytes, bytes.length); - DatagramSocket socket = this.socket; - socket.receive(msgPacket); + byte[] bytes = new byte[608]; + DatagramPacket msgPacket = new DatagramPacket(bytes, bytes.length); + DatagramSocket socket = this.socket; - try { + try { + do { + // this loop is intended to receive all packets queued on the socket, + // having a receive() call without loop causes packets to get queued over time, + // if more than one meter present because we consume one packet per second + socket.receive(msgPacket); EnergyMeter meter = new EnergyMeter(); meter.parse(bytes); for (PayloadHandler handler : handlers) { handler.handle(meter); } - } catch (IOException e) { - logger.debug("Unexpected payload received for group {}", group, e); - } + } while (msgPacket.getLength() == 608); } catch (IOException e) { - logger.warn("Failed to receive data for multicast group {}", group, e); + logger.debug("Unexpected payload received for group {}", group, e); } } } diff --git a/bundles/org.openhab.binding.smaenergymeter/src/main/java/org/openhab/binding/smaenergymeter/internal/packet/ThrottlingPayloadHandler.java b/bundles/org.openhab.binding.smaenergymeter/src/main/java/org/openhab/binding/smaenergymeter/internal/packet/ThrottlingPayloadHandler.java new file mode 100644 index 00000000000..bdd173a4282 --- /dev/null +++ b/bundles/org.openhab.binding.smaenergymeter/src/main/java/org/openhab/binding/smaenergymeter/internal/packet/ThrottlingPayloadHandler.java @@ -0,0 +1,45 @@ +/** + * Copyright (c) 2010-2024 Contributors to the openHAB project + * + * See the NOTICE file(s) distributed with this work for additional + * information. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.openhab.binding.smaenergymeter.internal.packet; + +import java.io.IOException; + +import org.eclipse.jdt.annotation.NonNullByDefault; +import org.openhab.binding.smaenergymeter.internal.handler.EnergyMeter; + +/** + * Payload handler which defer publishing of meter data by given amount of time. + * + * @author Łukasz Dywicki - Initial contribution + */ +@NonNullByDefault +public class ThrottlingPayloadHandler implements PayloadHandler { + + private final PayloadHandler delegate; + private final long pollingPeriodMs; + private long publishTime = 0; + + public ThrottlingPayloadHandler(PayloadHandler delegate, long pollingPeriodMs) { + this.delegate = delegate; + this.pollingPeriodMs = pollingPeriodMs; + } + + @Override + public void handle(EnergyMeter energyMeter) throws IOException { + long ts = System.currentTimeMillis(); + if (publishTime <= ts) { + delegate.handle(energyMeter); + publishTime = ts + pollingPeriodMs; + } + } +}