Make sure that energy meter packets are not queued up. (#16841)

Hide `open()` socket call beneath PacketListener, so caller do not need to care about that.
Signed-off-by: Łukasz Dywicki <luke@code-house.org>
This commit is contained in:
Łukasz Dywicki 2024-06-25 21:27:43 +02:00 committed by GitHub
parent 4401de57aa
commit 44b0c52690
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 138 additions and 40 deletions

View File

@ -20,12 +20,12 @@ No binding configuration required.
Usually no manual configuration is required, as the multicast IP address and the port remain on their factory set values. Usually no manual configuration is required, as the multicast IP address and the port remain on their factory set values.
Optionally, a refresh interval (in seconds) can be defined. Optionally, a refresh interval (in seconds) can be defined.
| Parameter | Name | Description | Required | Default | | Parameter | Name | Description | Required | Default |
|------------------|-----------------|---------------------------------------|----------|-----------------| |------------------|-----------------|------------------------------------------------------------|----------|-----------------|
| `serialNumber` | Serial number | Serial number of a meter. | yes | | | `serialNumber` | Serial number | Serial number of a meter. | yes | |
| `mcastGroup` | Multicast Group | Multicast group used by meter. | yes | 239.12.255.254 | | `mcastGroup` | Multicast Group | Multicast group used by meter. | yes | 239.12.255.254 |
| `port` | Port | Port number used by meter. | no | 9522 | | `port` | Port | Port number used by meter. | no | 9522 |
| `pollingPeriod` | Polling Period | Polling period used to readout meter. | no | 30 | | `pollingPeriod` | Polling Period | Polling period used to publish meter reading (in seconds). | no | 30 |
The polling period parameter is used to trigger readout of meter. In case if two consecutive readout attempts fail thing will report offline status. The polling period parameter is used to trigger readout of meter. In case if two consecutive readout attempts fail thing will report offline status.

View File

@ -74,7 +74,6 @@ public class SMAEnergyMeterDiscoveryService extends AbstractDiscoveryService imp
try { try {
packetListener = listenerRegistry.getListener(PacketListener.DEFAULT_MCAST_GRP, packetListener = listenerRegistry.getListener(PacketListener.DEFAULT_MCAST_GRP,
PacketListener.DEFAULT_MCAST_PORT); PacketListener.DEFAULT_MCAST_PORT);
packetListener.open(30);
} catch (IOException e) { } catch (IOException e) {
logger.warn("Could not start background discovery", e); logger.warn("Could not start background discovery", e);
return; return;

View File

@ -15,12 +15,15 @@ package org.openhab.binding.smaenergymeter.internal.handler;
import static org.openhab.binding.smaenergymeter.internal.SMAEnergyMeterBindingConstants.*; import static org.openhab.binding.smaenergymeter.internal.SMAEnergyMeterBindingConstants.*;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.eclipse.jdt.annotation.Nullable; import org.eclipse.jdt.annotation.Nullable;
import org.openhab.binding.smaenergymeter.internal.configuration.EnergyMeterConfig; import org.openhab.binding.smaenergymeter.internal.configuration.EnergyMeterConfig;
import org.openhab.binding.smaenergymeter.internal.packet.FilteringPayloadHandler;
import org.openhab.binding.smaenergymeter.internal.packet.PacketListener; import org.openhab.binding.smaenergymeter.internal.packet.PacketListener;
import org.openhab.binding.smaenergymeter.internal.packet.PacketListenerRegistry; import org.openhab.binding.smaenergymeter.internal.packet.PacketListenerRegistry;
import org.openhab.binding.smaenergymeter.internal.packet.PayloadHandler; import org.openhab.binding.smaenergymeter.internal.packet.PayloadHandler;
import org.openhab.binding.smaenergymeter.internal.packet.ThrottlingPayloadHandler;
import org.openhab.core.thing.ChannelUID; import org.openhab.core.thing.ChannelUID;
import org.openhab.core.thing.Thing; import org.openhab.core.thing.Thing;
import org.openhab.core.thing.ThingStatus; import org.openhab.core.thing.ThingStatus;
@ -42,7 +45,8 @@ public class SMAEnergyMeterHandler extends BaseThingHandler implements PayloadHa
private final Logger logger = LoggerFactory.getLogger(SMAEnergyMeterHandler.class); private final Logger logger = LoggerFactory.getLogger(SMAEnergyMeterHandler.class);
private final PacketListenerRegistry listenerRegistry; private final PacketListenerRegistry listenerRegistry;
private @Nullable PacketListener listener; private @Nullable PacketListener listener;
private @Nullable String serialNumber; private @Nullable PayloadHandler handler;
private String serialNumber;
public SMAEnergyMeterHandler(Thing thing, PacketListenerRegistry listenerRegistry) { public SMAEnergyMeterHandler(Thing thing, PacketListenerRegistry listenerRegistry) {
super(thing); super(thing);
@ -84,9 +88,13 @@ public class SMAEnergyMeterHandler extends BaseThingHandler implements PayloadHa
updateStatus(ThingStatus.UNKNOWN); updateStatus(ThingStatus.UNKNOWN);
logger.debug("Activated handler for SMA Energy Meter with S/N '{}'", serialNumber); logger.debug("Activated handler for SMA Energy Meter with S/N '{}'", serialNumber);
listener.addPayloadHandler(this); if (config.getPollingPeriod() <= 1) {
listener.addPayloadHandler(handler = new FilteringPayloadHandler(this, serialNumber));
listener.open(config.getPollingPeriod()); } else {
listener.addPayloadHandler(handler = new FilteringPayloadHandler(
new ThrottlingPayloadHandler(this, TimeUnit.SECONDS.toMillis(config.getPollingPeriod())),
serialNumber));
}
this.listener = listener; this.listener = listener;
logger.debug("Polling job scheduled to run every {} sec. for '{}'", config.getPollingPeriod(), logger.debug("Polling job scheduled to run every {} sec. for '{}'", config.getPollingPeriod(),
getThing().getUID()); getThing().getUID());
@ -100,18 +108,15 @@ public class SMAEnergyMeterHandler extends BaseThingHandler implements PayloadHa
public void dispose() { public void dispose() {
logger.debug("Disposing SMAEnergyMeter handler '{}'", getThing().getUID()); logger.debug("Disposing SMAEnergyMeter handler '{}'", getThing().getUID());
PacketListener listener = this.listener; PacketListener listener = this.listener;
if (listener != null) { PayloadHandler handler = this.handler;
listener.removePayloadHandler(this); if (listener != null && handler != null) {
listener.removePayloadHandler(handler);
this.listener = null; this.listener = null;
} }
} }
@Override @Override
public void handle(EnergyMeter energyMeter) { public void handle(EnergyMeter energyMeter) {
String serialNumber = this.serialNumber;
if (serialNumber == null || !serialNumber.equals(energyMeter.getSerialNumber())) {
return;
}
updateStatus(ThingStatus.ONLINE); updateStatus(ThingStatus.ONLINE);
logger.debug("Update SMAEnergyMeter {} data '{}'", serialNumber, getThing().getUID()); logger.debug("Update SMAEnergyMeter {} data '{}'", serialNumber, getThing().getUID());

View File

@ -16,7 +16,6 @@ import java.io.IOException;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -24,6 +23,7 @@ import java.util.concurrent.TimeUnit;
import org.eclipse.jdt.annotation.NonNullByDefault; import org.eclipse.jdt.annotation.NonNullByDefault;
import org.openhab.binding.smaenergymeter.internal.SMAEnergyMeterBindingConstants; import org.openhab.binding.smaenergymeter.internal.SMAEnergyMeterBindingConstants;
import org.openhab.binding.smaenergymeter.internal.packet.PacketListener.ReceivingTask; import org.openhab.binding.smaenergymeter.internal.packet.PacketListener.ReceivingTask;
import org.openhab.core.common.ThreadPoolManager;
import org.osgi.service.component.annotations.Component; import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate; import org.osgi.service.component.annotations.Deactivate;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -40,9 +40,8 @@ import org.slf4j.LoggerFactory;
public class DefaultPacketListenerRegistry implements PacketListenerRegistry { public class DefaultPacketListenerRegistry implements PacketListenerRegistry {
private final Logger logger = LoggerFactory.getLogger(DefaultPacketListenerRegistry.class); private final Logger logger = LoggerFactory.getLogger(DefaultPacketListenerRegistry.class);
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, private final ScheduledExecutorService scheduler = ThreadPoolManager
(runnable) -> new Thread(runnable, .getScheduledPool("OH-binding-" + SMAEnergyMeterBindingConstants.BINDING_ID + "-listener");
"OH-binding-" + SMAEnergyMeterBindingConstants.BINDING_ID + "-listener"));
private final Map<String, PacketListener> listeners = new ConcurrentHashMap<>(); private final Map<String, PacketListener> listeners = new ConcurrentHashMap<>();
@Override @Override
@ -68,8 +67,8 @@ public class DefaultPacketListenerRegistry implements PacketListenerRegistry {
scheduler.shutdownNow(); scheduler.shutdownNow();
} }
public ScheduledFuture<?> addTask(Runnable runnable, int intervalSec) { public ScheduledFuture<?> addTask(ReceivingTask runnable) {
return scheduler.scheduleWithFixedDelay(runnable, 0, intervalSec, TimeUnit.SECONDS); return scheduler.scheduleWithFixedDelay(runnable, 0, 1000, TimeUnit.MILLISECONDS);
} }
public void execute(ReceivingTask receivingTask) { public void execute(ReceivingTask receivingTask) {

View File

@ -0,0 +1,42 @@
/**
* Copyright (c) 2010-2024 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.binding.smaenergymeter.internal.packet;
import java.io.IOException;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.openhab.binding.smaenergymeter.internal.handler.EnergyMeter;
/**
* Payload handler which define acceptance criteria for received meter data.
*
* @author Łukasz Dywicki - Initial contribution
*/
@NonNullByDefault
public class FilteringPayloadHandler implements PayloadHandler {
private final PayloadHandler delegate;
private final String serialNumber;
public FilteringPayloadHandler(PayloadHandler delegate, String serialNumber) {
this.delegate = delegate;
this.serialNumber = serialNumber;
}
@Override
public void handle(EnergyMeter energyMeter) throws IOException {
if (this.serialNumber.equals(energyMeter.getSerialNumber())) {
delegate.handle(energyMeter);
}
}
}

View File

@ -56,6 +56,9 @@ public class PacketListener {
} }
public void addPayloadHandler(PayloadHandler handler) { public void addPayloadHandler(PayloadHandler handler) {
if (handlers.isEmpty()) {
open();
}
handlers.add(handler); handlers.add(handler);
} }
@ -72,18 +75,22 @@ public class PacketListener {
return socket != null && socket.isConnected(); return socket != null && socket.isConnected();
} }
public void open(int intervalSec) throws IOException { private void open() {
if (isOpen()) { if (isOpen()) {
// no need to bind socket second time // no need to bind socket second time
return; return;
} }
MulticastSocket socket = new MulticastSocket(port); try {
socket.setSoTimeout(5000); MulticastSocket socket = new MulticastSocket(port);
InetAddress address = InetAddress.getByName(multicastGroup); socket.setSoTimeout(5000);
socket.joinGroup(address); InetAddress address = InetAddress.getByName(multicastGroup);
socket.joinGroup(address);
future = registry.addTask(new ReceivingTask(socket, multicastGroup + ":" + port, handlers), intervalSec); future = registry.addTask(new ReceivingTask(socket, multicastGroup + ":" + port, handlers));
this.socket = socket; this.socket = socket;
} catch (IOException e) {
throw new RuntimeException("Could not open socket", e);
}
} }
void close() throws IOException { void close() throws IOException {
@ -122,24 +129,25 @@ public class PacketListener {
} }
public void run() { public void run() {
try { byte[] bytes = new byte[608];
byte[] bytes = new byte[608]; DatagramPacket msgPacket = new DatagramPacket(bytes, bytes.length);
DatagramPacket msgPacket = new DatagramPacket(bytes, bytes.length); DatagramSocket socket = this.socket;
DatagramSocket socket = this.socket;
socket.receive(msgPacket);
try { try {
do {
// this loop is intended to receive all packets queued on the socket,
// having a receive() call without loop causes packets to get queued over time,
// if more than one meter present because we consume one packet per second
socket.receive(msgPacket);
EnergyMeter meter = new EnergyMeter(); EnergyMeter meter = new EnergyMeter();
meter.parse(bytes); meter.parse(bytes);
for (PayloadHandler handler : handlers) { for (PayloadHandler handler : handlers) {
handler.handle(meter); handler.handle(meter);
} }
} catch (IOException e) { } while (msgPacket.getLength() == 608);
logger.debug("Unexpected payload received for group {}", group, e);
}
} catch (IOException e) { } catch (IOException e) {
logger.warn("Failed to receive data for multicast group {}", group, e); logger.debug("Unexpected payload received for group {}", group, e);
} }
} }
} }

View File

@ -0,0 +1,45 @@
/**
* Copyright (c) 2010-2024 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.binding.smaenergymeter.internal.packet;
import java.io.IOException;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.openhab.binding.smaenergymeter.internal.handler.EnergyMeter;
/**
* Payload handler which defer publishing of meter data by given amount of time.
*
* @author Łukasz Dywicki - Initial contribution
*/
@NonNullByDefault
public class ThrottlingPayloadHandler implements PayloadHandler {
private final PayloadHandler delegate;
private final long pollingPeriodMs;
private long publishTime = 0;
public ThrottlingPayloadHandler(PayloadHandler delegate, long pollingPeriodMs) {
this.delegate = delegate;
this.pollingPeriodMs = pollingPeriodMs;
}
@Override
public void handle(EnergyMeter energyMeter) throws IOException {
long ts = System.currentTimeMillis();
if (publishTime <= ts) {
delegate.handle(energyMeter);
publishTime = ts + pollingPeriodMs;
}
}
}