Make sure that energy meter packets are not queued up. (#16841)

Hide `open()` socket call beneath PacketListener, so caller do not need to care about that.
Signed-off-by: Łukasz Dywicki <luke@code-house.org>
Signed-off-by: Ciprian Pascu <contact@ciprianpascu.ro>
This commit is contained in:
Łukasz Dywicki 2024-06-25 21:27:43 +02:00 committed by Ciprian Pascu
parent b9eb939e54
commit 0e99ef6013
7 changed files with 138 additions and 40 deletions

View File

@ -20,12 +20,12 @@ No binding configuration required.
Usually no manual configuration is required, as the multicast IP address and the port remain on their factory set values.
Optionally, a refresh interval (in seconds) can be defined.
| Parameter | Name | Description | Required | Default |
|------------------|-----------------|---------------------------------------|----------|-----------------|
| `serialNumber` | Serial number | Serial number of a meter. | yes | |
| `mcastGroup` | Multicast Group | Multicast group used by meter. | yes | 239.12.255.254 |
| `port` | Port | Port number used by meter. | no | 9522 |
| `pollingPeriod` | Polling Period | Polling period used to readout meter. | no | 30 |
| Parameter | Name | Description | Required | Default |
|------------------|-----------------|------------------------------------------------------------|----------|-----------------|
| `serialNumber` | Serial number | Serial number of a meter. | yes | |
| `mcastGroup` | Multicast Group | Multicast group used by meter. | yes | 239.12.255.254 |
| `port` | Port | Port number used by meter. | no | 9522 |
| `pollingPeriod` | Polling Period | Polling period used to publish meter reading (in seconds). | no | 30 |
The polling period parameter is used to trigger readout of meter. In case if two consecutive readout attempts fail thing will report offline status.

View File

@ -74,7 +74,6 @@ public class SMAEnergyMeterDiscoveryService extends AbstractDiscoveryService imp
try {
packetListener = listenerRegistry.getListener(PacketListener.DEFAULT_MCAST_GRP,
PacketListener.DEFAULT_MCAST_PORT);
packetListener.open(30);
} catch (IOException e) {
logger.warn("Could not start background discovery", e);
return;

View File

@ -15,12 +15,15 @@ package org.openhab.binding.smaenergymeter.internal.handler;
import static org.openhab.binding.smaenergymeter.internal.SMAEnergyMeterBindingConstants.*;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.eclipse.jdt.annotation.Nullable;
import org.openhab.binding.smaenergymeter.internal.configuration.EnergyMeterConfig;
import org.openhab.binding.smaenergymeter.internal.packet.FilteringPayloadHandler;
import org.openhab.binding.smaenergymeter.internal.packet.PacketListener;
import org.openhab.binding.smaenergymeter.internal.packet.PacketListenerRegistry;
import org.openhab.binding.smaenergymeter.internal.packet.PayloadHandler;
import org.openhab.binding.smaenergymeter.internal.packet.ThrottlingPayloadHandler;
import org.openhab.core.thing.ChannelUID;
import org.openhab.core.thing.Thing;
import org.openhab.core.thing.ThingStatus;
@ -42,7 +45,8 @@ public class SMAEnergyMeterHandler extends BaseThingHandler implements PayloadHa
private final Logger logger = LoggerFactory.getLogger(SMAEnergyMeterHandler.class);
private final PacketListenerRegistry listenerRegistry;
private @Nullable PacketListener listener;
private @Nullable String serialNumber;
private @Nullable PayloadHandler handler;
private String serialNumber;
public SMAEnergyMeterHandler(Thing thing, PacketListenerRegistry listenerRegistry) {
super(thing);
@ -84,9 +88,13 @@ public class SMAEnergyMeterHandler extends BaseThingHandler implements PayloadHa
updateStatus(ThingStatus.UNKNOWN);
logger.debug("Activated handler for SMA Energy Meter with S/N '{}'", serialNumber);
listener.addPayloadHandler(this);
listener.open(config.getPollingPeriod());
if (config.getPollingPeriod() <= 1) {
listener.addPayloadHandler(handler = new FilteringPayloadHandler(this, serialNumber));
} else {
listener.addPayloadHandler(handler = new FilteringPayloadHandler(
new ThrottlingPayloadHandler(this, TimeUnit.SECONDS.toMillis(config.getPollingPeriod())),
serialNumber));
}
this.listener = listener;
logger.debug("Polling job scheduled to run every {} sec. for '{}'", config.getPollingPeriod(),
getThing().getUID());
@ -100,18 +108,15 @@ public class SMAEnergyMeterHandler extends BaseThingHandler implements PayloadHa
public void dispose() {
logger.debug("Disposing SMAEnergyMeter handler '{}'", getThing().getUID());
PacketListener listener = this.listener;
if (listener != null) {
listener.removePayloadHandler(this);
PayloadHandler handler = this.handler;
if (listener != null && handler != null) {
listener.removePayloadHandler(handler);
this.listener = null;
}
}
@Override
public void handle(EnergyMeter energyMeter) {
String serialNumber = this.serialNumber;
if (serialNumber == null || !serialNumber.equals(energyMeter.getSerialNumber())) {
return;
}
updateStatus(ThingStatus.ONLINE);
logger.debug("Update SMAEnergyMeter {} data '{}'", serialNumber, getThing().getUID());

View File

@ -16,7 +16,6 @@ import java.io.IOException;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@ -24,6 +23,7 @@ import java.util.concurrent.TimeUnit;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.openhab.binding.smaenergymeter.internal.SMAEnergyMeterBindingConstants;
import org.openhab.binding.smaenergymeter.internal.packet.PacketListener.ReceivingTask;
import org.openhab.core.common.ThreadPoolManager;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.slf4j.Logger;
@ -40,9 +40,8 @@ import org.slf4j.LoggerFactory;
public class DefaultPacketListenerRegistry implements PacketListenerRegistry {
private final Logger logger = LoggerFactory.getLogger(DefaultPacketListenerRegistry.class);
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1,
(runnable) -> new Thread(runnable,
"OH-binding-" + SMAEnergyMeterBindingConstants.BINDING_ID + "-listener"));
private final ScheduledExecutorService scheduler = ThreadPoolManager
.getScheduledPool("OH-binding-" + SMAEnergyMeterBindingConstants.BINDING_ID + "-listener");
private final Map<String, PacketListener> listeners = new ConcurrentHashMap<>();
@Override
@ -68,8 +67,8 @@ public class DefaultPacketListenerRegistry implements PacketListenerRegistry {
scheduler.shutdownNow();
}
public ScheduledFuture<?> addTask(Runnable runnable, int intervalSec) {
return scheduler.scheduleWithFixedDelay(runnable, 0, intervalSec, TimeUnit.SECONDS);
public ScheduledFuture<?> addTask(ReceivingTask runnable) {
return scheduler.scheduleWithFixedDelay(runnable, 0, 1000, TimeUnit.MILLISECONDS);
}
public void execute(ReceivingTask receivingTask) {

View File

@ -0,0 +1,42 @@
/**
* Copyright (c) 2010-2024 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.binding.smaenergymeter.internal.packet;
import java.io.IOException;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.openhab.binding.smaenergymeter.internal.handler.EnergyMeter;
/**
* Payload handler which define acceptance criteria for received meter data.
*
* @author Łukasz Dywicki - Initial contribution
*/
@NonNullByDefault
public class FilteringPayloadHandler implements PayloadHandler {
private final PayloadHandler delegate;
private final String serialNumber;
public FilteringPayloadHandler(PayloadHandler delegate, String serialNumber) {
this.delegate = delegate;
this.serialNumber = serialNumber;
}
@Override
public void handle(EnergyMeter energyMeter) throws IOException {
if (this.serialNumber.equals(energyMeter.getSerialNumber())) {
delegate.handle(energyMeter);
}
}
}

View File

@ -56,6 +56,9 @@ public class PacketListener {
}
public void addPayloadHandler(PayloadHandler handler) {
if (handlers.isEmpty()) {
open();
}
handlers.add(handler);
}
@ -72,18 +75,22 @@ public class PacketListener {
return socket != null && socket.isConnected();
}
public void open(int intervalSec) throws IOException {
private void open() {
if (isOpen()) {
// no need to bind socket second time
return;
}
MulticastSocket socket = new MulticastSocket(port);
socket.setSoTimeout(5000);
InetAddress address = InetAddress.getByName(multicastGroup);
socket.joinGroup(address);
try {
MulticastSocket socket = new MulticastSocket(port);
socket.setSoTimeout(5000);
InetAddress address = InetAddress.getByName(multicastGroup);
socket.joinGroup(address);
future = registry.addTask(new ReceivingTask(socket, multicastGroup + ":" + port, handlers), intervalSec);
this.socket = socket;
future = registry.addTask(new ReceivingTask(socket, multicastGroup + ":" + port, handlers));
this.socket = socket;
} catch (IOException e) {
throw new RuntimeException("Could not open socket", e);
}
}
void close() throws IOException {
@ -122,24 +129,25 @@ public class PacketListener {
}
public void run() {
try {
byte[] bytes = new byte[608];
DatagramPacket msgPacket = new DatagramPacket(bytes, bytes.length);
DatagramSocket socket = this.socket;
socket.receive(msgPacket);
byte[] bytes = new byte[608];
DatagramPacket msgPacket = new DatagramPacket(bytes, bytes.length);
DatagramSocket socket = this.socket;
try {
try {
do {
// this loop is intended to receive all packets queued on the socket,
// having a receive() call without loop causes packets to get queued over time,
// if more than one meter present because we consume one packet per second
socket.receive(msgPacket);
EnergyMeter meter = new EnergyMeter();
meter.parse(bytes);
for (PayloadHandler handler : handlers) {
handler.handle(meter);
}
} catch (IOException e) {
logger.debug("Unexpected payload received for group {}", group, e);
}
} while (msgPacket.getLength() == 608);
} catch (IOException e) {
logger.warn("Failed to receive data for multicast group {}", group, e);
logger.debug("Unexpected payload received for group {}", group, e);
}
}
}

View File

@ -0,0 +1,45 @@
/**
* Copyright (c) 2010-2024 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.binding.smaenergymeter.internal.packet;
import java.io.IOException;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.openhab.binding.smaenergymeter.internal.handler.EnergyMeter;
/**
* Payload handler which defer publishing of meter data by given amount of time.
*
* @author Łukasz Dywicki - Initial contribution
*/
@NonNullByDefault
public class ThrottlingPayloadHandler implements PayloadHandler {
private final PayloadHandler delegate;
private final long pollingPeriodMs;
private long publishTime = 0;
public ThrottlingPayloadHandler(PayloadHandler delegate, long pollingPeriodMs) {
this.delegate = delegate;
this.pollingPeriodMs = pollingPeriodMs;
}
@Override
public void handle(EnergyMeter energyMeter) throws IOException {
long ts = System.currentTimeMillis();
if (publishTime <= ts) {
delegate.handle(energyMeter);
publishTime = ts + pollingPeriodMs;
}
}
}