[modbus] Modbus poolconfig handling (#2199)

* [modbus] More strict nullness annotations
Taking into consideration that we always have default
EndpointPoolConfiguration
* [modbus] Utilize default pool configuration consistently
* [modbus] Make (internal) ModbusCommunicationInterfaceImpl constr private
* [modbus] Handle null config of modbus comm when detecting open conns
* [modbus] default connection timeout of 10s with default pool config

Signed-off-by: Sami Salonen <ssalonen@gmail.com>
This commit is contained in:
Sami Salonen 2021-02-14 18:14:43 +02:00 committed by GitHub
parent f061512dd5
commit ebda155b47
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 79 additions and 86 deletions

View File

@ -29,7 +29,7 @@ public interface ModbusManager {
* Open communication interface to endpoint
*
* @param endpoint endpoint pointing to modbus slave
* @param configuration configuration for the endpoint
* @param configuration configuration for the endpoint. Use null to use default pool configuration
* @return Communication interface for interacting with the slave
* @throws IllegalArgumentException if there is already open communication interface with same endpoint but
* differing configuration
@ -45,5 +45,5 @@ public interface ModbusManager {
* @param endpoint endpoint to query
* @return general connection settings of the given endpoint
*/
public @Nullable EndpointPoolConfiguration getEndpointPoolConfiguration(ModbusSlaveEndpoint endpoint);
public EndpointPoolConfiguration getEndpointPoolConfiguration(ModbusSlaveEndpoint endpoint);
}

View File

@ -25,6 +25,7 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import javax.imageio.IIOException;
@ -273,6 +274,11 @@ public class ModbusManagerImpl implements ModbusManager {
*/
public static final long DEFAULT_SERIAL_INTER_TRANSACTION_DELAY_MILLIS = 35;
/**
* Default connection timeout
*/
public static final int DEFAULT_CONNECT_TIMEOUT_MILLIS = 10_000;
/**
* Thread naming for modbus read & write requests. Also used by the monitor thread
*/
@ -292,6 +298,39 @@ public class ModbusManagerImpl implements ModbusManager {
*/
private static final long WARN_QUEUE_SIZE = 500;
private static final long MONITOR_QUEUE_INTERVAL_MILLIS = 10000;
private static final Function<ModbusSlaveEndpoint, EndpointPoolConfiguration> DEFAULT_POOL_CONFIGURATION = endpoint -> {
return endpoint.accept(new ModbusSlaveEndpointVisitor<EndpointPoolConfiguration>() {
@Override
public @NonNull EndpointPoolConfiguration visit(ModbusTCPSlaveEndpoint modbusIPSlavePoolingKey) {
EndpointPoolConfiguration endpointPoolConfig = new EndpointPoolConfiguration();
endpointPoolConfig.setInterTransactionDelayMillis(DEFAULT_TCP_INTER_TRANSACTION_DELAY_MILLIS);
endpointPoolConfig.setConnectMaxTries(Modbus.DEFAULT_RETRIES);
endpointPoolConfig.setConnectTimeoutMillis(DEFAULT_CONNECT_TIMEOUT_MILLIS);
return endpointPoolConfig;
}
@Override
public @NonNull EndpointPoolConfiguration visit(ModbusSerialSlaveEndpoint modbusSerialSlavePoolingKey) {
EndpointPoolConfiguration endpointPoolConfig = new EndpointPoolConfiguration();
// never "disconnect" (close/open serial port) serial connection between borrows
endpointPoolConfig.setReconnectAfterMillis(-1);
endpointPoolConfig.setInterTransactionDelayMillis(DEFAULT_SERIAL_INTER_TRANSACTION_DELAY_MILLIS);
endpointPoolConfig.setConnectMaxTries(Modbus.DEFAULT_RETRIES);
endpointPoolConfig.setConnectTimeoutMillis(DEFAULT_CONNECT_TIMEOUT_MILLIS);
return endpointPoolConfig;
}
@Override
public @NonNull EndpointPoolConfiguration visit(ModbusUDPSlaveEndpoint modbusUDPSlavePoolingKey) {
EndpointPoolConfiguration endpointPoolConfig = new EndpointPoolConfiguration();
endpointPoolConfig.setInterTransactionDelayMillis(DEFAULT_TCP_INTER_TRANSACTION_DELAY_MILLIS);
endpointPoolConfig.setConnectMaxTries(Modbus.DEFAULT_RETRIES);
endpointPoolConfig.setConnectTimeoutMillis(DEFAULT_CONNECT_TIMEOUT_MILLIS);
return endpointPoolConfig;
}
});
};
private final PollOperation pollOperation = new PollOperation();
private final WriteOperation writeOperation = new WriteOperation();
@ -319,38 +358,8 @@ public class ModbusManagerImpl implements ModbusManager {
private volatile Set<ModbusCommunicationInterfaceImpl> communicationInterfaces = ConcurrentHashMap.newKeySet();
private void constructConnectionPool() {
ModbusSlaveConnectionFactoryImpl connectionFactory = new ModbusSlaveConnectionFactoryImpl();
connectionFactory.setDefaultPoolConfigurationFactory(endpoint -> {
return endpoint.accept(new ModbusSlaveEndpointVisitor<EndpointPoolConfiguration>() {
@Override
public @NonNull EndpointPoolConfiguration visit(ModbusTCPSlaveEndpoint modbusIPSlavePoolingKey) {
EndpointPoolConfiguration endpointPoolConfig = new EndpointPoolConfiguration();
endpointPoolConfig.setInterTransactionDelayMillis(DEFAULT_TCP_INTER_TRANSACTION_DELAY_MILLIS);
endpointPoolConfig.setConnectMaxTries(Modbus.DEFAULT_RETRIES);
return endpointPoolConfig;
}
@Override
public @NonNull EndpointPoolConfiguration visit(ModbusSerialSlaveEndpoint modbusSerialSlavePoolingKey) {
EndpointPoolConfiguration endpointPoolConfig = new EndpointPoolConfiguration();
// never "disconnect" (close/open serial port) serial connection between borrows
endpointPoolConfig.setReconnectAfterMillis(-1);
endpointPoolConfig.setInterTransactionDelayMillis(DEFAULT_SERIAL_INTER_TRANSACTION_DELAY_MILLIS);
endpointPoolConfig.setConnectMaxTries(Modbus.DEFAULT_RETRIES);
return endpointPoolConfig;
}
@Override
public @NonNull EndpointPoolConfiguration visit(ModbusUDPSlaveEndpoint modbusUDPSlavePoolingKey) {
EndpointPoolConfiguration endpointPoolConfig = new EndpointPoolConfiguration();
endpointPoolConfig.setInterTransactionDelayMillis(DEFAULT_TCP_INTER_TRANSACTION_DELAY_MILLIS);
endpointPoolConfig.setConnectMaxTries(Modbus.DEFAULT_RETRIES);
return endpointPoolConfig;
}
});
});
ModbusSlaveConnectionFactoryImpl connectionFactory = new ModbusSlaveConnectionFactoryImpl(
DEFAULT_POOL_CONFIGURATION);
GenericKeyedObjectPool<ModbusSlaveEndpoint, ModbusSlaveConnection> genericKeyedObjectPool = new ModbusConnectionPool(
connectionFactory);
genericKeyedObjectPool.setSwallowedExceptionListener(new SwallowedExceptionListener() {
@ -540,9 +549,7 @@ public class ModbusManagerImpl implements ModbusManager {
F failureCallback = task.getFailureCallback();
int maxTries = task.getMaxTries();
AtomicReference<@Nullable Exception> lastError = new AtomicReference<>();
@SuppressWarnings("null") // since cfg in lambda cannot be really null
long retryDelay = Optional.ofNullable(connectionFactory.getEndpointPoolConfiguration(endpoint))
.map(cfg -> cfg.getInterTransactionDelayMillis()).orElse(0L);
long retryDelay = getEndpointPoolConfiguration(endpoint).getInterTransactionDelayMillis();
if (maxTries <= 0) {
throw new IllegalArgumentException("maxTries should be positive");
@ -729,7 +736,7 @@ public class ModbusManagerImpl implements ModbusManager {
private @Nullable EndpointPoolConfiguration configuration;
@SuppressWarnings("null")
public ModbusCommunicationInterfaceImpl(ModbusSlaveEndpoint endpoint,
private ModbusCommunicationInterfaceImpl(ModbusSlaveEndpoint endpoint,
@Nullable EndpointPoolConfiguration configuration) {
this.endpoint = endpoint;
this.configuration = configuration;
@ -879,7 +886,10 @@ public class ModbusManagerImpl implements ModbusManager {
@Nullable EndpointPoolConfiguration configuration) throws IllegalArgumentException {
boolean openCommFoundWithSameEndpointDifferentConfig = communicationInterfaces.stream()
.filter(comm -> comm.endpoint.equals(endpoint))
.anyMatch(comm -> comm.configuration != null && !comm.configuration.equals(configuration));
.anyMatch(comm -> !Optional.ofNullable(comm.configuration)
.orElseGet(() -> DEFAULT_POOL_CONFIGURATION.apply(endpoint))
.equals(Optional.ofNullable(configuration)
.orElseGet(() -> DEFAULT_POOL_CONFIGURATION.apply(endpoint))));
if (openCommFoundWithSameEndpointDifferentConfig) {
throw new IllegalArgumentException(
"Communication interface is already open with different configuration to this same endpoint");
@ -891,7 +901,7 @@ public class ModbusManagerImpl implements ModbusManager {
}
@Override
public @Nullable EndpointPoolConfiguration getEndpointPoolConfiguration(ModbusSlaveEndpoint endpoint) {
public EndpointPoolConfiguration getEndpointPoolConfiguration(ModbusSlaveEndpoint endpoint) {
Objects.requireNonNull(connectionFactory, "Not activated!");
return connectionFactory.getEndpointPoolConfiguration(endpoint);
}

View File

@ -15,6 +15,7 @@ package org.openhab.core.io.transport.modbus.internal.pooling;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
@ -97,9 +98,8 @@ public class ModbusSlaveConnectionFactoryImpl
ModbusSlaveConnection connection = getObject();
@Nullable
EndpointPoolConfiguration configuration = endpointPoolConfigs.get(localEndpoint);
long reconnectAfterMillis = configuration == null ? 0 : configuration.getReconnectAfterMillis();
EndpointPoolConfiguration configuration = getEndpointPoolConfiguration(localEndpoint);
long reconnectAfterMillis = configuration.getReconnectAfterMillis();
long connectionAgeMillis = System.currentTimeMillis() - localLastConnected;
long disconnectIfConnectedBeforeMillis = disconnectIfConnectedBefore.getOrDefault(localEndpoint, -1L);
boolean disconnectSinceTooOldConnection = disconnectIfConnectedBeforeMillis < 0L ? false
@ -125,11 +125,16 @@ public class ModbusSlaveConnectionFactoryImpl
}
private final Logger logger = LoggerFactory.getLogger(ModbusSlaveConnectionFactoryImpl.class);
private volatile Map<ModbusSlaveEndpoint, @Nullable EndpointPoolConfiguration> endpointPoolConfigs = new ConcurrentHashMap<>();
private volatile Map<ModbusSlaveEndpoint, EndpointPoolConfiguration> endpointPoolConfigs = new ConcurrentHashMap<>();
private volatile Map<ModbusSlaveEndpoint, Long> lastPassivateMillis = new ConcurrentHashMap<>();
private volatile Map<ModbusSlaveEndpoint, Long> lastConnectMillis = new ConcurrentHashMap<>();
private volatile Map<ModbusSlaveEndpoint, Long> disconnectIfConnectedBefore = new ConcurrentHashMap<>();
private volatile Function<ModbusSlaveEndpoint, @Nullable EndpointPoolConfiguration> defaultPoolConfigurationFactory = endpoint -> null;
private final Function<ModbusSlaveEndpoint, EndpointPoolConfiguration> defaultPoolConfigurationFactory;
public ModbusSlaveConnectionFactoryImpl(
Function<ModbusSlaveEndpoint, EndpointPoolConfiguration> defaultPoolConfigurationFactory) {
this.defaultPoolConfigurationFactory = defaultPoolConfigurationFactory;
}
private @Nullable InetAddress getInetAddress(ModbusIPSlaveEndpoint key) {
try {
@ -157,11 +162,7 @@ public class ModbusSlaveConnectionFactoryImpl
if (address == null) {
return null;
}
EndpointPoolConfiguration config = getEndpointPoolConfiguration(key);
int connectTimeoutMillis = 0;
if (config != null) {
connectTimeoutMillis = config.getConnectTimeoutMillis();
}
int connectTimeoutMillis = getEndpointPoolConfiguration(key).getConnectTimeoutMillis();
TCPMasterConnection connection = new TCPMasterConnection(address, key.getPort(), connectTimeoutMillis,
key.getRtuEncoded());
logger.trace("Created connection {} for endpoint {}", connection, key);
@ -204,18 +205,15 @@ public class ModbusSlaveConnectionFactoryImpl
}
ModbusSlaveConnection connection = obj.getObject();
try {
@Nullable
EndpointPoolConfiguration config = getEndpointPoolConfiguration(endpoint);
if (!connection.isConnected()) {
tryConnect(endpoint, obj, connection, config);
}
if (config != null) {
long waited = waitAtleast(lastPassivateMillis.get(endpoint), config.getInterTransactionDelayMillis());
logger.trace(
"Waited {}ms (interTransactionDelayMillis {}ms) before giving returning connection {} for endpoint {}, to ensure delay between transactions.",
waited, config.getInterTransactionDelayMillis(), obj.getObject(), endpoint);
}
long waited = waitAtleast(lastPassivateMillis.get(endpoint), config.getInterTransactionDelayMillis());
logger.trace(
"Waited {}ms (interTransactionDelayMillis {}ms) before giving returning connection {} for endpoint {}, to ensure delay between transactions.",
waited, config.getInterTransactionDelayMillis(), obj.getObject(), endpoint);
} catch (InterruptedException e) {
// Someone wants to cancel us, reset the connection and abort
if (connection.isConnected()) {
@ -268,45 +266,30 @@ public class ModbusSlaveConnectionFactoryImpl
* @param endpoint endpoint to query
* @return general connection settings of the given endpoint
*/
@SuppressWarnings("null")
public @Nullable EndpointPoolConfiguration getEndpointPoolConfiguration(ModbusSlaveEndpoint endpoint) {
@Nullable
EndpointPoolConfiguration config = endpointPoolConfigs.computeIfAbsent(endpoint,
defaultPoolConfigurationFactory);
return config;
}
/**
* Set default factory for {@link EndpointPoolConfiguration}
*
* @param defaultPoolConfigurationFactory function providing defaults for a given endpoint
*/
public void setDefaultPoolConfigurationFactory(
Function<ModbusSlaveEndpoint, @Nullable EndpointPoolConfiguration> defaultPoolConfigurationFactory) {
this.defaultPoolConfigurationFactory = defaultPoolConfigurationFactory;
public EndpointPoolConfiguration getEndpointPoolConfiguration(ModbusSlaveEndpoint endpoint) {
return Optional.ofNullable(endpointPoolConfigs.get(endpoint))
.orElseGet(() -> defaultPoolConfigurationFactory.apply(endpoint));
}
private void tryConnect(ModbusSlaveEndpoint endpoint, PooledObject<ModbusSlaveConnection> obj,
ModbusSlaveConnection connection, @Nullable EndpointPoolConfiguration config) throws Exception {
ModbusSlaveConnection connection, EndpointPoolConfiguration config) throws Exception {
if (connection.isConnected()) {
return;
}
int tryIndex = 0;
Long lastConnect = lastConnectMillis.get(endpoint);
int maxTries = config == null ? 1 : config.getConnectMaxTries();
int maxTries = config.getConnectMaxTries();
do {
try {
if (config != null) {
long waited = waitAtleast(lastConnect,
Math.max(config.getInterConnectDelayMillis(), config.getInterTransactionDelayMillis()));
if (waited > 0) {
logger.trace(
"Waited {}ms (interConnectDelayMillis {}ms, interTransactionDelayMillis {}ms) before "
+ "connecting disconnected connection {} for endpoint {}, to allow delay "
+ "between connections re-connects",
waited, config.getInterConnectDelayMillis(), config.getInterTransactionDelayMillis(),
obj.getObject(), endpoint);
}
long waited = waitAtleast(lastConnect,
Math.max(config.getInterConnectDelayMillis(), config.getInterTransactionDelayMillis()));
if (waited > 0) {
logger.trace(
"Waited {}ms (interConnectDelayMillis {}ms, interTransactionDelayMillis {}ms) before "
+ "connecting disconnected connection {} for endpoint {}, to allow delay "
+ "between connections re-connects",
waited, config.getInterConnectDelayMillis(), config.getInterTransactionDelayMillis(),
obj.getObject(), endpoint);
}
connection.connect();
long curTime = System.currentTimeMillis();