From ebda155b47891e441d287a449ff0408df26303f2 Mon Sep 17 00:00:00 2001 From: Sami Salonen Date: Sun, 14 Feb 2021 18:14:43 +0200 Subject: [PATCH] [modbus] Modbus poolconfig handling (#2199) * [modbus] More strict nullness annotations Taking into consideration that we always have default EndpointPoolConfiguration * [modbus] Utilize default pool configuration consistently * [modbus] Make (internal) ModbusCommunicationInterfaceImpl constr private * [modbus] Handle null config of modbus comm when detecting open conns * [modbus] default connection timeout of 10s with default pool config Signed-off-by: Sami Salonen --- .../io/transport/modbus/ModbusManager.java | 4 +- .../modbus/internal/ModbusManagerImpl.java | 86 +++++++++++-------- .../ModbusSlaveConnectionFactoryImpl.java | 75 +++++++--------- 3 files changed, 79 insertions(+), 86 deletions(-) diff --git a/bundles/org.openhab.core.io.transport.modbus/src/main/java/org/openhab/core/io/transport/modbus/ModbusManager.java b/bundles/org.openhab.core.io.transport.modbus/src/main/java/org/openhab/core/io/transport/modbus/ModbusManager.java index 036b892f4..4b5c2c5eb 100644 --- a/bundles/org.openhab.core.io.transport.modbus/src/main/java/org/openhab/core/io/transport/modbus/ModbusManager.java +++ b/bundles/org.openhab.core.io.transport.modbus/src/main/java/org/openhab/core/io/transport/modbus/ModbusManager.java @@ -29,7 +29,7 @@ public interface ModbusManager { * Open communication interface to endpoint * * @param endpoint endpoint pointing to modbus slave - * @param configuration configuration for the endpoint + * @param configuration configuration for the endpoint. Use null to use default pool configuration * @return Communication interface for interacting with the slave * @throws IllegalArgumentException if there is already open communication interface with same endpoint but * differing configuration @@ -45,5 +45,5 @@ public interface ModbusManager { * @param endpoint endpoint to query * @return general connection settings of the given endpoint */ - public @Nullable EndpointPoolConfiguration getEndpointPoolConfiguration(ModbusSlaveEndpoint endpoint); + public EndpointPoolConfiguration getEndpointPoolConfiguration(ModbusSlaveEndpoint endpoint); } diff --git a/bundles/org.openhab.core.io.transport.modbus/src/main/java/org/openhab/core/io/transport/modbus/internal/ModbusManagerImpl.java b/bundles/org.openhab.core.io.transport.modbus/src/main/java/org/openhab/core/io/transport/modbus/internal/ModbusManagerImpl.java index 8c817519a..7f9800491 100644 --- a/bundles/org.openhab.core.io.transport.modbus/src/main/java/org/openhab/core/io/transport/modbus/internal/ModbusManagerImpl.java +++ b/bundles/org.openhab.core.io.transport.modbus/src/main/java/org/openhab/core/io/transport/modbus/internal/ModbusManagerImpl.java @@ -25,6 +25,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; import javax.imageio.IIOException; @@ -273,6 +274,11 @@ public class ModbusManagerImpl implements ModbusManager { */ public static final long DEFAULT_SERIAL_INTER_TRANSACTION_DELAY_MILLIS = 35; + /** + * Default connection timeout + */ + public static final int DEFAULT_CONNECT_TIMEOUT_MILLIS = 10_000; + /** * Thread naming for modbus read & write requests. Also used by the monitor thread */ @@ -292,6 +298,39 @@ public class ModbusManagerImpl implements ModbusManager { */ private static final long WARN_QUEUE_SIZE = 500; private static final long MONITOR_QUEUE_INTERVAL_MILLIS = 10000; + private static final Function DEFAULT_POOL_CONFIGURATION = endpoint -> { + return endpoint.accept(new ModbusSlaveEndpointVisitor() { + + @Override + public @NonNull EndpointPoolConfiguration visit(ModbusTCPSlaveEndpoint modbusIPSlavePoolingKey) { + EndpointPoolConfiguration endpointPoolConfig = new EndpointPoolConfiguration(); + endpointPoolConfig.setInterTransactionDelayMillis(DEFAULT_TCP_INTER_TRANSACTION_DELAY_MILLIS); + endpointPoolConfig.setConnectMaxTries(Modbus.DEFAULT_RETRIES); + endpointPoolConfig.setConnectTimeoutMillis(DEFAULT_CONNECT_TIMEOUT_MILLIS); + return endpointPoolConfig; + } + + @Override + public @NonNull EndpointPoolConfiguration visit(ModbusSerialSlaveEndpoint modbusSerialSlavePoolingKey) { + EndpointPoolConfiguration endpointPoolConfig = new EndpointPoolConfiguration(); + // never "disconnect" (close/open serial port) serial connection between borrows + endpointPoolConfig.setReconnectAfterMillis(-1); + endpointPoolConfig.setInterTransactionDelayMillis(DEFAULT_SERIAL_INTER_TRANSACTION_DELAY_MILLIS); + endpointPoolConfig.setConnectMaxTries(Modbus.DEFAULT_RETRIES); + endpointPoolConfig.setConnectTimeoutMillis(DEFAULT_CONNECT_TIMEOUT_MILLIS); + return endpointPoolConfig; + } + + @Override + public @NonNull EndpointPoolConfiguration visit(ModbusUDPSlaveEndpoint modbusUDPSlavePoolingKey) { + EndpointPoolConfiguration endpointPoolConfig = new EndpointPoolConfiguration(); + endpointPoolConfig.setInterTransactionDelayMillis(DEFAULT_TCP_INTER_TRANSACTION_DELAY_MILLIS); + endpointPoolConfig.setConnectMaxTries(Modbus.DEFAULT_RETRIES); + endpointPoolConfig.setConnectTimeoutMillis(DEFAULT_CONNECT_TIMEOUT_MILLIS); + return endpointPoolConfig; + } + }); + }; private final PollOperation pollOperation = new PollOperation(); private final WriteOperation writeOperation = new WriteOperation(); @@ -319,38 +358,8 @@ public class ModbusManagerImpl implements ModbusManager { private volatile Set communicationInterfaces = ConcurrentHashMap.newKeySet(); private void constructConnectionPool() { - ModbusSlaveConnectionFactoryImpl connectionFactory = new ModbusSlaveConnectionFactoryImpl(); - connectionFactory.setDefaultPoolConfigurationFactory(endpoint -> { - return endpoint.accept(new ModbusSlaveEndpointVisitor() { - - @Override - public @NonNull EndpointPoolConfiguration visit(ModbusTCPSlaveEndpoint modbusIPSlavePoolingKey) { - EndpointPoolConfiguration endpointPoolConfig = new EndpointPoolConfiguration(); - endpointPoolConfig.setInterTransactionDelayMillis(DEFAULT_TCP_INTER_TRANSACTION_DELAY_MILLIS); - endpointPoolConfig.setConnectMaxTries(Modbus.DEFAULT_RETRIES); - return endpointPoolConfig; - } - - @Override - public @NonNull EndpointPoolConfiguration visit(ModbusSerialSlaveEndpoint modbusSerialSlavePoolingKey) { - EndpointPoolConfiguration endpointPoolConfig = new EndpointPoolConfiguration(); - // never "disconnect" (close/open serial port) serial connection between borrows - endpointPoolConfig.setReconnectAfterMillis(-1); - endpointPoolConfig.setInterTransactionDelayMillis(DEFAULT_SERIAL_INTER_TRANSACTION_DELAY_MILLIS); - endpointPoolConfig.setConnectMaxTries(Modbus.DEFAULT_RETRIES); - return endpointPoolConfig; - } - - @Override - public @NonNull EndpointPoolConfiguration visit(ModbusUDPSlaveEndpoint modbusUDPSlavePoolingKey) { - EndpointPoolConfiguration endpointPoolConfig = new EndpointPoolConfiguration(); - endpointPoolConfig.setInterTransactionDelayMillis(DEFAULT_TCP_INTER_TRANSACTION_DELAY_MILLIS); - endpointPoolConfig.setConnectMaxTries(Modbus.DEFAULT_RETRIES); - return endpointPoolConfig; - } - }); - }); - + ModbusSlaveConnectionFactoryImpl connectionFactory = new ModbusSlaveConnectionFactoryImpl( + DEFAULT_POOL_CONFIGURATION); GenericKeyedObjectPool genericKeyedObjectPool = new ModbusConnectionPool( connectionFactory); genericKeyedObjectPool.setSwallowedExceptionListener(new SwallowedExceptionListener() { @@ -540,9 +549,7 @@ public class ModbusManagerImpl implements ModbusManager { F failureCallback = task.getFailureCallback(); int maxTries = task.getMaxTries(); AtomicReference<@Nullable Exception> lastError = new AtomicReference<>(); - @SuppressWarnings("null") // since cfg in lambda cannot be really null - long retryDelay = Optional.ofNullable(connectionFactory.getEndpointPoolConfiguration(endpoint)) - .map(cfg -> cfg.getInterTransactionDelayMillis()).orElse(0L); + long retryDelay = getEndpointPoolConfiguration(endpoint).getInterTransactionDelayMillis(); if (maxTries <= 0) { throw new IllegalArgumentException("maxTries should be positive"); @@ -729,7 +736,7 @@ public class ModbusManagerImpl implements ModbusManager { private @Nullable EndpointPoolConfiguration configuration; @SuppressWarnings("null") - public ModbusCommunicationInterfaceImpl(ModbusSlaveEndpoint endpoint, + private ModbusCommunicationInterfaceImpl(ModbusSlaveEndpoint endpoint, @Nullable EndpointPoolConfiguration configuration) { this.endpoint = endpoint; this.configuration = configuration; @@ -879,7 +886,10 @@ public class ModbusManagerImpl implements ModbusManager { @Nullable EndpointPoolConfiguration configuration) throws IllegalArgumentException { boolean openCommFoundWithSameEndpointDifferentConfig = communicationInterfaces.stream() .filter(comm -> comm.endpoint.equals(endpoint)) - .anyMatch(comm -> comm.configuration != null && !comm.configuration.equals(configuration)); + .anyMatch(comm -> !Optional.ofNullable(comm.configuration) + .orElseGet(() -> DEFAULT_POOL_CONFIGURATION.apply(endpoint)) + .equals(Optional.ofNullable(configuration) + .orElseGet(() -> DEFAULT_POOL_CONFIGURATION.apply(endpoint)))); if (openCommFoundWithSameEndpointDifferentConfig) { throw new IllegalArgumentException( "Communication interface is already open with different configuration to this same endpoint"); @@ -891,7 +901,7 @@ public class ModbusManagerImpl implements ModbusManager { } @Override - public @Nullable EndpointPoolConfiguration getEndpointPoolConfiguration(ModbusSlaveEndpoint endpoint) { + public EndpointPoolConfiguration getEndpointPoolConfiguration(ModbusSlaveEndpoint endpoint) { Objects.requireNonNull(connectionFactory, "Not activated!"); return connectionFactory.getEndpointPoolConfiguration(endpoint); } diff --git a/bundles/org.openhab.core.io.transport.modbus/src/main/java/org/openhab/core/io/transport/modbus/internal/pooling/ModbusSlaveConnectionFactoryImpl.java b/bundles/org.openhab.core.io.transport.modbus/src/main/java/org/openhab/core/io/transport/modbus/internal/pooling/ModbusSlaveConnectionFactoryImpl.java index 8063e2462..64e2db57c 100644 --- a/bundles/org.openhab.core.io.transport.modbus/src/main/java/org/openhab/core/io/transport/modbus/internal/pooling/ModbusSlaveConnectionFactoryImpl.java +++ b/bundles/org.openhab.core.io.transport.modbus/src/main/java/org/openhab/core/io/transport/modbus/internal/pooling/ModbusSlaveConnectionFactoryImpl.java @@ -15,6 +15,7 @@ package org.openhab.core.io.transport.modbus.internal.pooling; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; @@ -97,9 +98,8 @@ public class ModbusSlaveConnectionFactoryImpl ModbusSlaveConnection connection = getObject(); - @Nullable - EndpointPoolConfiguration configuration = endpointPoolConfigs.get(localEndpoint); - long reconnectAfterMillis = configuration == null ? 0 : configuration.getReconnectAfterMillis(); + EndpointPoolConfiguration configuration = getEndpointPoolConfiguration(localEndpoint); + long reconnectAfterMillis = configuration.getReconnectAfterMillis(); long connectionAgeMillis = System.currentTimeMillis() - localLastConnected; long disconnectIfConnectedBeforeMillis = disconnectIfConnectedBefore.getOrDefault(localEndpoint, -1L); boolean disconnectSinceTooOldConnection = disconnectIfConnectedBeforeMillis < 0L ? false @@ -125,11 +125,16 @@ public class ModbusSlaveConnectionFactoryImpl } private final Logger logger = LoggerFactory.getLogger(ModbusSlaveConnectionFactoryImpl.class); - private volatile Map endpointPoolConfigs = new ConcurrentHashMap<>(); + private volatile Map endpointPoolConfigs = new ConcurrentHashMap<>(); private volatile Map lastPassivateMillis = new ConcurrentHashMap<>(); private volatile Map lastConnectMillis = new ConcurrentHashMap<>(); private volatile Map disconnectIfConnectedBefore = new ConcurrentHashMap<>(); - private volatile Function defaultPoolConfigurationFactory = endpoint -> null; + private final Function defaultPoolConfigurationFactory; + + public ModbusSlaveConnectionFactoryImpl( + Function defaultPoolConfigurationFactory) { + this.defaultPoolConfigurationFactory = defaultPoolConfigurationFactory; + } private @Nullable InetAddress getInetAddress(ModbusIPSlaveEndpoint key) { try { @@ -157,11 +162,7 @@ public class ModbusSlaveConnectionFactoryImpl if (address == null) { return null; } - EndpointPoolConfiguration config = getEndpointPoolConfiguration(key); - int connectTimeoutMillis = 0; - if (config != null) { - connectTimeoutMillis = config.getConnectTimeoutMillis(); - } + int connectTimeoutMillis = getEndpointPoolConfiguration(key).getConnectTimeoutMillis(); TCPMasterConnection connection = new TCPMasterConnection(address, key.getPort(), connectTimeoutMillis, key.getRtuEncoded()); logger.trace("Created connection {} for endpoint {}", connection, key); @@ -204,18 +205,15 @@ public class ModbusSlaveConnectionFactoryImpl } ModbusSlaveConnection connection = obj.getObject(); try { - @Nullable EndpointPoolConfiguration config = getEndpointPoolConfiguration(endpoint); if (!connection.isConnected()) { tryConnect(endpoint, obj, connection, config); } - if (config != null) { - long waited = waitAtleast(lastPassivateMillis.get(endpoint), config.getInterTransactionDelayMillis()); - logger.trace( - "Waited {}ms (interTransactionDelayMillis {}ms) before giving returning connection {} for endpoint {}, to ensure delay between transactions.", - waited, config.getInterTransactionDelayMillis(), obj.getObject(), endpoint); - } + long waited = waitAtleast(lastPassivateMillis.get(endpoint), config.getInterTransactionDelayMillis()); + logger.trace( + "Waited {}ms (interTransactionDelayMillis {}ms) before giving returning connection {} for endpoint {}, to ensure delay between transactions.", + waited, config.getInterTransactionDelayMillis(), obj.getObject(), endpoint); } catch (InterruptedException e) { // Someone wants to cancel us, reset the connection and abort if (connection.isConnected()) { @@ -268,45 +266,30 @@ public class ModbusSlaveConnectionFactoryImpl * @param endpoint endpoint to query * @return general connection settings of the given endpoint */ - @SuppressWarnings("null") - public @Nullable EndpointPoolConfiguration getEndpointPoolConfiguration(ModbusSlaveEndpoint endpoint) { - @Nullable - EndpointPoolConfiguration config = endpointPoolConfigs.computeIfAbsent(endpoint, - defaultPoolConfigurationFactory); - return config; - } - - /** - * Set default factory for {@link EndpointPoolConfiguration} - * - * @param defaultPoolConfigurationFactory function providing defaults for a given endpoint - */ - public void setDefaultPoolConfigurationFactory( - Function defaultPoolConfigurationFactory) { - this.defaultPoolConfigurationFactory = defaultPoolConfigurationFactory; + public EndpointPoolConfiguration getEndpointPoolConfiguration(ModbusSlaveEndpoint endpoint) { + return Optional.ofNullable(endpointPoolConfigs.get(endpoint)) + .orElseGet(() -> defaultPoolConfigurationFactory.apply(endpoint)); } private void tryConnect(ModbusSlaveEndpoint endpoint, PooledObject obj, - ModbusSlaveConnection connection, @Nullable EndpointPoolConfiguration config) throws Exception { + ModbusSlaveConnection connection, EndpointPoolConfiguration config) throws Exception { if (connection.isConnected()) { return; } int tryIndex = 0; Long lastConnect = lastConnectMillis.get(endpoint); - int maxTries = config == null ? 1 : config.getConnectMaxTries(); + int maxTries = config.getConnectMaxTries(); do { try { - if (config != null) { - long waited = waitAtleast(lastConnect, - Math.max(config.getInterConnectDelayMillis(), config.getInterTransactionDelayMillis())); - if (waited > 0) { - logger.trace( - "Waited {}ms (interConnectDelayMillis {}ms, interTransactionDelayMillis {}ms) before " - + "connecting disconnected connection {} for endpoint {}, to allow delay " - + "between connections re-connects", - waited, config.getInterConnectDelayMillis(), config.getInterTransactionDelayMillis(), - obj.getObject(), endpoint); - } + long waited = waitAtleast(lastConnect, + Math.max(config.getInterConnectDelayMillis(), config.getInterTransactionDelayMillis())); + if (waited > 0) { + logger.trace( + "Waited {}ms (interConnectDelayMillis {}ms, interTransactionDelayMillis {}ms) before " + + "connecting disconnected connection {} for endpoint {}, to allow delay " + + "between connections re-connects", + waited, config.getInterConnectDelayMillis(), config.getInterTransactionDelayMillis(), + obj.getObject(), endpoint); } connection.connect(); long curTime = System.currentTimeMillis();