From fbafc365dac3ea75dbe2d1044b52c65aa297d2bc Mon Sep 17 00:00:00 2001 From: Wouter Born Date: Wed, 30 Sep 2020 19:36:47 +0200 Subject: [PATCH] Fix parallel MQTT itests execution (#8617) * Improve exception handling of the embedded MQTT broker so the port can be reconfigured when it is already bound and it properly unlocks files * Rework MQTT integration tests so they each run the embedded broker on their own reserved port Signed-off-by: Wouter Born --- .../org.openhab.io.mqttembeddedbroker/pom.xml | 23 +++++++++ .../io/mqttembeddedbroker/Constants.java | 10 ++++ .../internal/EmbeddedBrokerService.java | 9 +++- .../itest.bndrun | 9 ++-- .../pom.xml | 23 +++++++++ .../binding/mqtt/EmbeddedBrokerTools.java | 45 +++++++++++++++-- .../HomeAssistantMQTTImplementationTest.java | 23 +++++---- .../itest.bndrun | 6 ++- .../pom.xml | 24 ++++++++- .../binding/mqtt/EmbeddedBrokerTools.java | 49 ++++++++++++++++--- .../binding/mqtt/HomieImplementationTest.java | 21 +++++--- .../itest.bndrun | 5 +- .../pom.xml | 23 +++++++++ .../EmbeddedBrokerTools.java | 49 ++++++++++++++++--- .../io/mqttembeddedbroker/MoquetteTest.java | 5 +- 15 files changed, 276 insertions(+), 48 deletions(-) diff --git a/bundles/org.openhab.io.mqttembeddedbroker/pom.xml b/bundles/org.openhab.io.mqttembeddedbroker/pom.xml index ae9f42ed169..d733a11549f 100644 --- a/bundles/org.openhab.io.mqttembeddedbroker/pom.xml +++ b/bundles/org.openhab.io.mqttembeddedbroker/pom.xml @@ -92,4 +92,27 @@ + + + + org.codehaus.mojo + build-helper-maven-plugin + + + reserve-network-port + + reserve-network-port + + process-resources + + + mqttembeddedbroker.port + + + + + + + + diff --git a/bundles/org.openhab.io.mqttembeddedbroker/src/main/java/org/openhab/io/mqttembeddedbroker/Constants.java b/bundles/org.openhab.io.mqttembeddedbroker/src/main/java/org/openhab/io/mqttembeddedbroker/Constants.java index 9b888f8c6e8..34b85e59082 100644 --- a/bundles/org.openhab.io.mqttembeddedbroker/src/main/java/org/openhab/io/mqttembeddedbroker/Constants.java +++ b/bundles/org.openhab.io.mqttembeddedbroker/src/main/java/org/openhab/io/mqttembeddedbroker/Constants.java @@ -26,4 +26,14 @@ public class Constants { * */ public static final String CLIENTID = "embedded-mqtt-broker"; + + /** + * The broker persistent identifier used for identifying configurations. + */ + public static final String PID = "org.openhab.core.mqttembeddedbroker"; + + /** + * The configuration key used for configuring the embedded broker port. + */ + public static final String PORT = "port"; } diff --git a/bundles/org.openhab.io.mqttembeddedbroker/src/main/java/org/openhab/io/mqttembeddedbroker/internal/EmbeddedBrokerService.java b/bundles/org.openhab.io.mqttembeddedbroker/src/main/java/org/openhab/io/mqttembeddedbroker/internal/EmbeddedBrokerService.java index bbdc533206f..a87cada2015 100644 --- a/bundles/org.openhab.io.mqttembeddedbroker/src/main/java/org/openhab/io/mqttembeddedbroker/internal/EmbeddedBrokerService.java +++ b/bundles/org.openhab.io.mqttembeddedbroker/src/main/java/org/openhab/io/mqttembeddedbroker/internal/EmbeddedBrokerService.java @@ -79,8 +79,8 @@ import io.netty.handler.ssl.SslContextBuilder; * * @author David Graeff - Initial contribution */ -@Component(immediate = true, service = EmbeddedBrokerService.class, configurationPid = "org.openhab.core.mqttembeddedbroker", // - property = org.osgi.framework.Constants.SERVICE_PID + "=org.openhab.core.mqttembeddedbroker") +@Component(immediate = true, service = EmbeddedBrokerService.class, configurationPid = Constants.PID, // + property = org.osgi.framework.Constants.SERVICE_PID + "=" + Constants.PID) @ConfigurableService(category = "MQTT", label = "MQTT Embedded Broker", description_uri = "mqtt:mqttembeddedbroker") @NonNullByDefault public class EmbeddedBrokerService @@ -309,7 +309,12 @@ public class EmbeddedBrokerService // retry starting broker, if it fails again, don't catch exception server.startServer(new MemoryConfig(properties), null, sslContextCreator, authentificator, authorizer); } + } catch (Exception e) { + logger.warn("Failed to start embedded MQTT server: {}", e.getMessage()); + server.stopServer(); + return; } + this.server = server; server.addInterceptHandler(metrics); ScheduledExecutorService s = new ScheduledThreadPoolExecutor(1); diff --git a/itests/org.openhab.binding.mqtt.homeassistant.tests/itest.bndrun b/itests/org.openhab.binding.mqtt.homeassistant.tests/itest.bndrun index bafc464171d..49b1796db10 100644 --- a/itests/org.openhab.binding.mqtt.homeassistant.tests/itest.bndrun +++ b/itests/org.openhab.binding.mqtt.homeassistant.tests/itest.bndrun @@ -13,6 +13,10 @@ Fragment-Host: org.openhab.binding.mqtt.homeassistant -runblacklist: \ bnd.identity;id='org.openhab.core.storage.json' +-runvm: \ + -Dio.netty.noUnsafe=true,\ + -Dmqttembeddedbroker.port=${mqttembeddedbroker.port} + # # done # @@ -87,7 +91,4 @@ Fragment-Host: org.openhab.binding.mqtt.homeassistant org.openhab.core.transform;version='[3.0.0,3.0.1)',\ org.openhab.io.mqttembeddedbroker;version='[3.0.0,3.0.1)',\ org.opentest4j;version='[1.2.0,1.2.1)',\ - org.reactivestreams.reactive-streams;version='[1.0.2,1.0.3)',\ - moquette-broker;version='[0.13.0,0.13.1)' - --runvm: -Dio.netty.noUnsafe=true + org.reactivestreams.reactive-streams;version='[1.0.2,1.0.3)' diff --git a/itests/org.openhab.binding.mqtt.homeassistant.tests/pom.xml b/itests/org.openhab.binding.mqtt.homeassistant.tests/pom.xml index 110ca397a2a..2c8e11c8a84 100644 --- a/itests/org.openhab.binding.mqtt.homeassistant.tests/pom.xml +++ b/itests/org.openhab.binding.mqtt.homeassistant.tests/pom.xml @@ -100,4 +100,27 @@ + + + + org.codehaus.mojo + build-helper-maven-plugin + + + reserve-network-port + + reserve-network-port + + process-resources + + + mqttembeddedbroker.port + + + + + + + + diff --git a/itests/org.openhab.binding.mqtt.homeassistant.tests/src/main/java/org/openhab/binding/mqtt/EmbeddedBrokerTools.java b/itests/org.openhab.binding.mqtt.homeassistant.tests/src/main/java/org/openhab/binding/mqtt/EmbeddedBrokerTools.java index c318ed535d1..de7e511ade1 100644 --- a/itests/org.openhab.binding.mqtt.homeassistant.tests/src/main/java/org/openhab/binding/mqtt/EmbeddedBrokerTools.java +++ b/itests/org.openhab.binding.mqtt.homeassistant.tests/src/main/java/org/openhab/binding/mqtt/EmbeddedBrokerTools.java @@ -14,6 +14,9 @@ package org.openhab.binding.mqtt; import static org.junit.jupiter.api.Assertions.assertTrue; +import java.io.IOException; +import java.util.Dictionary; +import java.util.Hashtable; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; @@ -25,23 +28,40 @@ import org.openhab.core.io.transport.mqtt.MqttConnectionState; import org.openhab.core.io.transport.mqtt.MqttService; import org.openhab.core.io.transport.mqtt.MqttServiceObserver; import org.openhab.io.mqttembeddedbroker.Constants; +import org.osgi.service.cm.Configuration; +import org.osgi.service.cm.ConfigurationAdmin; /** * A full implementation test, that starts the embedded MQTT broker and publishes a homeassistant MQTT discovery device * tree. * * @author David Graeff - Initial contribution + * @author Wouter Born - Support running MQTT itests in parallel by reconfiguring embedded broker port */ @NonNullByDefault public class EmbeddedBrokerTools { - public @Nullable MqttBrokerConnection embeddedConnection = null; + + private static final int BROKER_PORT = Integer.getInteger("mqttembeddedbroker.port", 1883); + + private final ConfigurationAdmin configurationAdmin; + private final MqttService mqttService; + + public @Nullable MqttBrokerConnection embeddedConnection; + + public EmbeddedBrokerTools(ConfigurationAdmin configurationAdmin, MqttService mqttService) { + this.configurationAdmin = configurationAdmin; + this.mqttService = mqttService; + } /** * Request the embedded broker connection from the {@link MqttService} and wait for a connection to be established. * * @throws InterruptedException + * @throws IOException */ - public MqttBrokerConnection waitForConnection(MqttService mqttService) throws InterruptedException { + public MqttBrokerConnection waitForConnection() throws InterruptedException, IOException { + reconfigurePort(); + embeddedConnection = mqttService.getBrokerConnection(Constants.CLIENTID); if (embeddedConnection == null) { Semaphore semaphore = new Semaphore(1); @@ -61,7 +81,7 @@ public class EmbeddedBrokerTools { } }; mqttService.addBrokersListener(observer); - assertTrue(semaphore.tryAcquire(1000, TimeUnit.MILLISECONDS), "Wait for embedded connection client failed"); + assertTrue(semaphore.tryAcquire(5, TimeUnit.SECONDS), "Wait for embedded connection client failed"); } MqttBrokerConnection embeddedConnection = this.embeddedConnection; if (embeddedConnection == null) { @@ -79,8 +99,25 @@ public class EmbeddedBrokerTools { if (embeddedConnection.connectionState() == MqttConnectionState.CONNECTED) { semaphore.release(); } - assertTrue(semaphore.tryAcquire(500, TimeUnit.MILLISECONDS), "Connection " + embeddedConnection.getClientId() + assertTrue(semaphore.tryAcquire(5, TimeUnit.SECONDS), "Connection " + embeddedConnection.getClientId() + " failed. State: " + embeddedConnection.connectionState()); return embeddedConnection; } + + public void reconfigurePort() throws IOException { + Configuration configuration = configurationAdmin.getConfiguration(Constants.PID, null); + + Dictionary properties = configuration.getProperties(); + if (properties == null) { + properties = new Hashtable<>(); + } + + Integer currentPort = (Integer) properties.get(Constants.PORT); + if (currentPort == null || currentPort.intValue() != BROKER_PORT) { + properties.put(Constants.PORT, BROKER_PORT); + configuration.update(properties); + // Remove the connection to make sure the test waits for the new connection to become available + mqttService.removeBrokerConnection(Constants.CLIENTID); + } + } } diff --git a/itests/org.openhab.binding.mqtt.homeassistant.tests/src/main/java/org/openhab/binding/mqtt/HomeAssistantMQTTImplementationTest.java b/itests/org.openhab.binding.mqtt.homeassistant.tests/src/main/java/org/openhab/binding/mqtt/HomeAssistantMQTTImplementationTest.java index a4ae5fc8d4c..96aaf90bfe0 100644 --- a/itests/org.openhab.binding.mqtt.homeassistant.tests/src/main/java/org/openhab/binding/mqtt/HomeAssistantMQTTImplementationTest.java +++ b/itests/org.openhab.binding.mqtt.homeassistant.tests/src/main/java/org/openhab/binding/mqtt/HomeAssistantMQTTImplementationTest.java @@ -57,6 +57,7 @@ import org.openhab.core.test.java.JavaOSGiTest; import org.openhab.core.types.State; import org.openhab.core.types.UnDefType; import org.openhab.core.util.UIDUtils; +import org.osgi.service.cm.ConfigurationAdmin; import com.google.gson.Gson; import com.google.gson.GsonBuilder; @@ -69,6 +70,7 @@ import com.google.gson.GsonBuilder; */ @NonNullByDefault public class HomeAssistantMQTTImplementationTest extends JavaOSGiTest { + private @NonNullByDefault({}) ConfigurationAdmin configurationAdmin; private @NonNullByDefault({}) MqttService mqttService; private @NonNullByDefault({}) MqttBrokerConnection embeddedConnection; private @NonNullByDefault({}) MqttBrokerConnection connection; @@ -94,14 +96,15 @@ public class HomeAssistantMQTTImplementationTest extends JavaOSGiTest { public void beforeEach() throws Exception { registerVolatileStorageService(); mocksCloseable = openMocks(this); + configurationAdmin = getService(ConfigurationAdmin.class); mqttService = getService(MqttService.class); // Wait for the EmbeddedBrokerService internal connection to be connected - embeddedConnection = new EmbeddedBrokerTools().waitForConnection(mqttService); + embeddedConnection = new EmbeddedBrokerTools(configurationAdmin, mqttService).waitForConnection(); connection = new MqttBrokerConnection(embeddedConnection.getHost(), embeddedConnection.getPort(), embeddedConnection.isSecure(), "ha_mqtt"); - connection.start().get(1000, TimeUnit.MILLISECONDS); + connection.start().get(2, TimeUnit.SECONDS); assertThat(connection.connectionState(), is(MqttConnectionState.CONNECTED)); // If the connection state changes in between -> fail @@ -117,7 +120,7 @@ public class HomeAssistantMQTTImplementationTest extends JavaOSGiTest { futures.add(embeddedConnection.publish(testObjectTopic + "/state", "ON".getBytes(), 0, true)); registeredTopics = futures.size(); - CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(1000, TimeUnit.MILLISECONDS); + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(2, TimeUnit.SECONDS); failure = null; @@ -128,7 +131,7 @@ public class HomeAssistantMQTTImplementationTest extends JavaOSGiTest { public void afterEach() throws Exception { if (connection != null) { connection.removeConnectionObserver(failIfChange); - connection.stop().get(1000, TimeUnit.MILLISECONDS); + connection.stop().get(2, TimeUnit.SECONDS); } mocksCloseable.close(); @@ -137,18 +140,18 @@ public class HomeAssistantMQTTImplementationTest extends JavaOSGiTest { @Test public void reconnectTest() throws InterruptedException, ExecutionException, TimeoutException { connection.removeConnectionObserver(failIfChange); - connection.stop().get(2000, TimeUnit.MILLISECONDS); + connection.stop().get(2, TimeUnit.SECONDS); connection = new MqttBrokerConnection(embeddedConnection.getHost(), embeddedConnection.getPort(), embeddedConnection.isSecure(), "ha_mqtt"); - connection.start().get(2000, TimeUnit.MILLISECONDS); + connection.start().get(2, TimeUnit.SECONDS); } @Test public void retrieveAllTopics() throws InterruptedException, ExecutionException, TimeoutException { CountDownLatch c = new CountDownLatch(registeredTopics); connection.subscribe("homeassistant/+/+/" + ThingChannelConstants.testHomeAssistantThing.getId() + "/#", - (topic, payload) -> c.countDown()).get(1000, TimeUnit.MILLISECONDS); - assertTrue(c.await(1000, TimeUnit.MILLISECONDS), + (topic, payload) -> c.countDown()).get(2, TimeUnit.SECONDS); + assertTrue(c.await(2, TimeUnit.SECONDS), "Connection " + connection.getClientId() + " not retrieving all topics"); } @@ -183,8 +186,8 @@ public class HomeAssistantMQTTImplementationTest extends JavaOSGiTest { return null; }); - assertTrue(latch.await(4000, TimeUnit.MILLISECONDS)); - future.get(2000, TimeUnit.MILLISECONDS); + assertTrue(latch.await(4, TimeUnit.SECONDS)); + future.get(2, TimeUnit.SECONDS); // No failure expected and one discovered result assertNull(failure); diff --git a/itests/org.openhab.binding.mqtt.homie.tests/itest.bndrun b/itests/org.openhab.binding.mqtt.homie.tests/itest.bndrun index e3be5b0f16d..8d9ecadab7d 100644 --- a/itests/org.openhab.binding.mqtt.homie.tests/itest.bndrun +++ b/itests/org.openhab.binding.mqtt.homie.tests/itest.bndrun @@ -13,6 +13,10 @@ Fragment-Host: org.openhab.binding.mqtt.homie -runblacklist: \ bnd.identity;id='org.openhab.core.storage.json' +-runvm: \ + -Dio.netty.noUnsafe=true,\ + -Dmqttembeddedbroker.port=${mqttembeddedbroker.port} + # # done # @@ -89,4 +93,4 @@ Fragment-Host: org.openhab.binding.mqtt.homie org.opentest4j;version='[1.2.0,1.2.1)',\ org.reactivestreams.reactive-streams;version='[1.0.2,1.0.3)',\ moquette-broker;version='[0.13.0,0.13.1)' --runvm: -Dio.netty.noUnsafe=true + diff --git a/itests/org.openhab.binding.mqtt.homie.tests/pom.xml b/itests/org.openhab.binding.mqtt.homie.tests/pom.xml index 90f6b77ece5..d6965a09999 100644 --- a/itests/org.openhab.binding.mqtt.homie.tests/pom.xml +++ b/itests/org.openhab.binding.mqtt.homie.tests/pom.xml @@ -82,7 +82,6 @@ com.h2database h2-mvstore 1.4.199 - io.netty @@ -101,4 +100,27 @@ + + + + org.codehaus.mojo + build-helper-maven-plugin + + + reserve-network-port + + reserve-network-port + + process-resources + + + mqttembeddedbroker.port + + + + + + + + diff --git a/itests/org.openhab.binding.mqtt.homie.tests/src/main/java/org/openhab/binding/mqtt/EmbeddedBrokerTools.java b/itests/org.openhab.binding.mqtt.homie.tests/src/main/java/org/openhab/binding/mqtt/EmbeddedBrokerTools.java index ec69238753f..de7e511ade1 100644 --- a/itests/org.openhab.binding.mqtt.homie.tests/src/main/java/org/openhab/binding/mqtt/EmbeddedBrokerTools.java +++ b/itests/org.openhab.binding.mqtt.homie.tests/src/main/java/org/openhab/binding/mqtt/EmbeddedBrokerTools.java @@ -14,6 +14,9 @@ package org.openhab.binding.mqtt; import static org.junit.jupiter.api.Assertions.assertTrue; +import java.io.IOException; +import java.util.Dictionary; +import java.util.Hashtable; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; @@ -25,26 +28,40 @@ import org.openhab.core.io.transport.mqtt.MqttConnectionState; import org.openhab.core.io.transport.mqtt.MqttService; import org.openhab.core.io.transport.mqtt.MqttServiceObserver; import org.openhab.io.mqttembeddedbroker.Constants; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.osgi.service.cm.Configuration; +import org.osgi.service.cm.ConfigurationAdmin; /** * A full implementation test, that starts the embedded MQTT broker and publishes a homeassistant MQTT discovery device * tree. * * @author David Graeff - Initial contribution + * @author Wouter Born - Support running MQTT itests in parallel by reconfiguring embedded broker port */ @NonNullByDefault public class EmbeddedBrokerTools { - private final Logger logger = LoggerFactory.getLogger(EmbeddedBrokerTools.class); - public @Nullable MqttBrokerConnection embeddedConnection = null; + + private static final int BROKER_PORT = Integer.getInteger("mqttembeddedbroker.port", 1883); + + private final ConfigurationAdmin configurationAdmin; + private final MqttService mqttService; + + public @Nullable MqttBrokerConnection embeddedConnection; + + public EmbeddedBrokerTools(ConfigurationAdmin configurationAdmin, MqttService mqttService) { + this.configurationAdmin = configurationAdmin; + this.mqttService = mqttService; + } /** * Request the embedded broker connection from the {@link MqttService} and wait for a connection to be established. * * @throws InterruptedException + * @throws IOException */ - public MqttBrokerConnection waitForConnection(MqttService mqttService) throws InterruptedException { + public MqttBrokerConnection waitForConnection() throws InterruptedException, IOException { + reconfigurePort(); + embeddedConnection = mqttService.getBrokerConnection(Constants.CLIENTID); if (embeddedConnection == null) { Semaphore semaphore = new Semaphore(1); @@ -64,14 +81,13 @@ public class EmbeddedBrokerTools { } }; mqttService.addBrokersListener(observer); - assertTrue(semaphore.tryAcquire(700, TimeUnit.MILLISECONDS), "Wait for embedded connection client failed"); + assertTrue(semaphore.tryAcquire(5, TimeUnit.SECONDS), "Wait for embedded connection client failed"); } MqttBrokerConnection embeddedConnection = this.embeddedConnection; if (embeddedConnection == null) { throw new IllegalStateException(); } - logger.warn("waitForConnection {}", embeddedConnection.connectionState()); Semaphore semaphore = new Semaphore(1); semaphore.acquire(); MqttConnectionObserver mqttConnectionObserver = (state, error) -> { @@ -83,8 +99,25 @@ public class EmbeddedBrokerTools { if (embeddedConnection.connectionState() == MqttConnectionState.CONNECTED) { semaphore.release(); } - assertTrue(semaphore.tryAcquire(500, TimeUnit.MILLISECONDS), "Connection " + embeddedConnection.getClientId() + assertTrue(semaphore.tryAcquire(5, TimeUnit.SECONDS), "Connection " + embeddedConnection.getClientId() + " failed. State: " + embeddedConnection.connectionState()); return embeddedConnection; } + + public void reconfigurePort() throws IOException { + Configuration configuration = configurationAdmin.getConfiguration(Constants.PID, null); + + Dictionary properties = configuration.getProperties(); + if (properties == null) { + properties = new Hashtable<>(); + } + + Integer currentPort = (Integer) properties.get(Constants.PORT); + if (currentPort == null || currentPort.intValue() != BROKER_PORT) { + properties.put(Constants.PORT, BROKER_PORT); + configuration.update(properties); + // Remove the connection to make sure the test waits for the new connection to become available + mqttService.removeBrokerConnection(Constants.CLIENTID); + } + } } diff --git a/itests/org.openhab.binding.mqtt.homie.tests/src/main/java/org/openhab/binding/mqtt/HomieImplementationTest.java b/itests/org.openhab.binding.mqtt.homie.tests/src/main/java/org/openhab/binding/mqtt/HomieImplementationTest.java index 776f8fb91f8..816ec3b7588 100644 --- a/itests/org.openhab.binding.mqtt.homie.tests/src/main/java/org/openhab/binding/mqtt/HomieImplementationTest.java +++ b/itests/org.openhab.binding.mqtt.homie.tests/src/main/java/org/openhab/binding/mqtt/HomieImplementationTest.java @@ -59,6 +59,7 @@ import org.openhab.core.library.types.DecimalType; import org.openhab.core.library.types.OnOffType; import org.openhab.core.test.java.JavaOSGiTest; import org.openhab.core.types.UnDefType; +import org.osgi.service.cm.ConfigurationAdmin; /** * A full implementation test, that starts the embedded MQTT broker and publishes a homie device tree. @@ -71,6 +72,7 @@ public class HomieImplementationTest extends JavaOSGiTest { private static final String DEVICE_ID = ThingChannelConstants.testHomieThing.getId(); private static final String DEVICE_TOPIC = BASE_TOPIC + "/" + DEVICE_ID; + private @NonNullByDefault({}) ConfigurationAdmin configurationAdmin; private @NonNullByDefault({}) MqttService mqttService; private @NonNullByDefault({}) MqttBrokerConnection embeddedConnection; private @NonNullByDefault({}) MqttBrokerConnection connection; @@ -99,15 +101,17 @@ public class HomieImplementationTest extends JavaOSGiTest { public void beforeEach() throws Exception { registerVolatileStorageService(); mocksCloseable = openMocks(this); + configurationAdmin = getService(ConfigurationAdmin.class); mqttService = getService(MqttService.class); - embeddedConnection = new EmbeddedBrokerTools().waitForConnection(mqttService); + // Wait for the EmbeddedBrokerService internal connection to be connected + embeddedConnection = new EmbeddedBrokerTools(configurationAdmin, mqttService).waitForConnection(); embeddedConnection.setQos(1); connection = new MqttBrokerConnection(embeddedConnection.getHost(), embeddedConnection.getPort(), embeddedConnection.isSecure(), "homie"); connection.setQos(1); - connection.start().get(500, TimeUnit.MILLISECONDS); + connection.start().get(5, TimeUnit.SECONDS); assertThat(connection.connectionState(), is(MqttConnectionState.CONNECTED)); // If the connection state changes in between -> fail connection.addConnectionObserver(failIfChange); @@ -146,7 +150,7 @@ public class HomieImplementationTest extends JavaOSGiTest { futures.add(publish(propertyTestTopic + "/$datatype", "boolean")); registeredTopics = futures.size(); - CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(1000, TimeUnit.MILLISECONDS); + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(2, TimeUnit.SECONDS); scheduler = new ScheduledThreadPoolExecutor(6); } @@ -159,9 +163,11 @@ public class HomieImplementationTest extends JavaOSGiTest { public void afterEach() throws Exception { if (connection != null) { connection.removeConnectionObserver(failIfChange); - connection.stop().get(500, TimeUnit.MILLISECONDS); + connection.stop().get(2, TimeUnit.SECONDS); + } + if (scheduler != null) { + scheduler.shutdownNow(); } - scheduler.shutdownNow(); mocksCloseable.close(); } @@ -169,9 +175,8 @@ public class HomieImplementationTest extends JavaOSGiTest { public void retrieveAllTopics() throws InterruptedException, ExecutionException, TimeoutException { // four topics are not under /testnode ! CountDownLatch c = new CountDownLatch(registeredTopics - 4); - connection.subscribe(DEVICE_TOPIC + "/testnode/#", (topic, payload) -> c.countDown()).get(5000, - TimeUnit.MILLISECONDS); - assertTrue(c.await(5000, TimeUnit.MILLISECONDS), + connection.subscribe(DEVICE_TOPIC + "/testnode/#", (topic, payload) -> c.countDown()).get(5, TimeUnit.SECONDS); + assertTrue(c.await(5, TimeUnit.SECONDS), "Connection " + connection.getClientId() + " not retrieving all topics "); } diff --git a/itests/org.openhab.io.mqttembeddedbroker.tests/itest.bndrun b/itests/org.openhab.io.mqttembeddedbroker.tests/itest.bndrun index 82764488e87..e690bf692d5 100644 --- a/itests/org.openhab.io.mqttembeddedbroker.tests/itest.bndrun +++ b/itests/org.openhab.io.mqttembeddedbroker.tests/itest.bndrun @@ -10,6 +10,10 @@ Fragment-Host: org.openhab.io.mqttembeddedbroker -runblacklist: \ bnd.identity;id='org.openhab.core.storage.json' +-runvm: \ + -Dio.netty.noUnsafe=true,\ + -Dmqttembeddedbroker.port=${mqttembeddedbroker.port} + # # done # @@ -73,4 +77,3 @@ Fragment-Host: org.openhab.io.mqttembeddedbroker org.opentest4j;version='[1.2.0,1.2.1)',\ org.reactivestreams.reactive-streams;version='[1.0.2,1.0.3)',\ moquette-broker;version='[0.13.0,0.13.1)' --runvm: -Dio.netty.noUnsafe=true diff --git a/itests/org.openhab.io.mqttembeddedbroker.tests/pom.xml b/itests/org.openhab.io.mqttembeddedbroker.tests/pom.xml index ac22da3434c..3b7d7792d28 100644 --- a/itests/org.openhab.io.mqttembeddedbroker.tests/pom.xml +++ b/itests/org.openhab.io.mqttembeddedbroker.tests/pom.xml @@ -66,4 +66,27 @@ + + + + org.codehaus.mojo + build-helper-maven-plugin + + + reserve-network-port + + reserve-network-port + + process-resources + + + mqttembeddedbroker.port + + + + + + + + diff --git a/itests/org.openhab.io.mqttembeddedbroker.tests/src/main/java/org/openhab/io/mqttembeddedbroker/EmbeddedBrokerTools.java b/itests/org.openhab.io.mqttembeddedbroker.tests/src/main/java/org/openhab/io/mqttembeddedbroker/EmbeddedBrokerTools.java index cc41838a063..9ebdf503cb1 100644 --- a/itests/org.openhab.io.mqttembeddedbroker.tests/src/main/java/org/openhab/io/mqttembeddedbroker/EmbeddedBrokerTools.java +++ b/itests/org.openhab.io.mqttembeddedbroker.tests/src/main/java/org/openhab/io/mqttembeddedbroker/EmbeddedBrokerTools.java @@ -14,6 +14,9 @@ package org.openhab.io.mqttembeddedbroker; import static org.junit.jupiter.api.Assertions.assertTrue; +import java.io.IOException; +import java.util.Dictionary; +import java.util.Hashtable; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; @@ -24,26 +27,40 @@ import org.openhab.core.io.transport.mqtt.MqttConnectionObserver; import org.openhab.core.io.transport.mqtt.MqttConnectionState; import org.openhab.core.io.transport.mqtt.MqttService; import org.openhab.core.io.transport.mqtt.MqttServiceObserver; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.osgi.service.cm.Configuration; +import org.osgi.service.cm.ConfigurationAdmin; /** * A full implementation test, that starts the embedded MQTT broker and publishes a homeassistant MQTT discovery device * tree. * * @author David Graeff - Initial contribution + * @author Wouter Born - Support running MQTT itests in parallel by reconfiguring embedded broker port */ @NonNullByDefault public class EmbeddedBrokerTools { - private final Logger logger = LoggerFactory.getLogger(EmbeddedBrokerTools.class); - public @Nullable MqttBrokerConnection embeddedConnection = null; + + private static final int BROKER_PORT = Integer.getInteger("mqttembeddedbroker.port", 1883); + + private final ConfigurationAdmin configurationAdmin; + private final MqttService mqttService; + + public @Nullable MqttBrokerConnection embeddedConnection; + + public EmbeddedBrokerTools(ConfigurationAdmin configurationAdmin, MqttService mqttService) { + this.configurationAdmin = configurationAdmin; + this.mqttService = mqttService; + } /** * Request the embedded broker connection from the {@link MqttService} and wait for a connection to be established. * * @throws InterruptedException + * @throws IOException */ - public MqttBrokerConnection waitForConnection(MqttService mqttService) throws InterruptedException { + public MqttBrokerConnection waitForConnection() throws InterruptedException, IOException { + reconfigurePort(); + embeddedConnection = mqttService.getBrokerConnection(Constants.CLIENTID); if (embeddedConnection == null) { Semaphore semaphore = new Semaphore(1); @@ -63,14 +80,13 @@ public class EmbeddedBrokerTools { } }; mqttService.addBrokersListener(observer); - assertTrue(semaphore.tryAcquire(700, TimeUnit.MILLISECONDS), "Wait for embedded connection client failed"); + assertTrue(semaphore.tryAcquire(5, TimeUnit.SECONDS), "Wait for embedded connection client failed"); } MqttBrokerConnection embeddedConnection = this.embeddedConnection; if (embeddedConnection == null) { throw new IllegalStateException(); } - logger.warn("waitForConnection {}", embeddedConnection.connectionState()); Semaphore semaphore = new Semaphore(1); semaphore.acquire(); MqttConnectionObserver mqttConnectionObserver = (state, error) -> { @@ -82,8 +98,25 @@ public class EmbeddedBrokerTools { if (embeddedConnection.connectionState() == MqttConnectionState.CONNECTED) { semaphore.release(); } - assertTrue(semaphore.tryAcquire(500, TimeUnit.MILLISECONDS), "Connection " + embeddedConnection.getClientId() + assertTrue(semaphore.tryAcquire(5, TimeUnit.SECONDS), "Connection " + embeddedConnection.getClientId() + " failed. State: " + embeddedConnection.connectionState()); return embeddedConnection; } + + public void reconfigurePort() throws IOException { + Configuration configuration = configurationAdmin.getConfiguration(Constants.PID, null); + + Dictionary properties = configuration.getProperties(); + if (properties == null) { + properties = new Hashtable<>(); + } + + Integer currentPort = (Integer) properties.get(Constants.PORT); + if (currentPort == null || currentPort.intValue() != BROKER_PORT) { + properties.put(Constants.PORT, BROKER_PORT); + configuration.update(properties); + // Remove the connection to make sure the test waits for the new connection to become available + mqttService.removeBrokerConnection(Constants.CLIENTID); + } + } } diff --git a/itests/org.openhab.io.mqttembeddedbroker.tests/src/main/java/org/openhab/io/mqttembeddedbroker/MoquetteTest.java b/itests/org.openhab.io.mqttembeddedbroker.tests/src/main/java/org/openhab/io/mqttembeddedbroker/MoquetteTest.java index f15a4245e11..a22a6947768 100644 --- a/itests/org.openhab.io.mqttembeddedbroker.tests/src/main/java/org/openhab/io/mqttembeddedbroker/MoquetteTest.java +++ b/itests/org.openhab.io.mqttembeddedbroker.tests/src/main/java/org/openhab/io/mqttembeddedbroker/MoquetteTest.java @@ -35,6 +35,7 @@ import org.openhab.core.io.transport.mqtt.MqttConnectionObserver; import org.openhab.core.io.transport.mqtt.MqttConnectionState; import org.openhab.core.io.transport.mqtt.MqttService; import org.openhab.core.test.java.JavaOSGiTest; +import org.osgi.service.cm.ConfigurationAdmin; /** * Moquette test @@ -47,6 +48,7 @@ public class MoquetteTest extends JavaOSGiTest { private @NonNullByDefault({}) AutoCloseable mocksCloseable; + private @NonNullByDefault({}) ConfigurationAdmin configurationAdmin; private @NonNullByDefault({}) MqttService mqttService; private @NonNullByDefault({}) MqttBrokerConnection embeddedConnection; private @NonNullByDefault({}) MqttBrokerConnection clientConnection; @@ -62,10 +64,11 @@ public class MoquetteTest extends JavaOSGiTest { public void beforeEach() throws Exception { registerVolatileStorageService(); mocksCloseable = openMocks(this); + configurationAdmin = getService(ConfigurationAdmin.class); mqttService = getService(MqttService.class); // Wait for the EmbeddedBrokerService internal connection to be connected - embeddedConnection = new EmbeddedBrokerTools().waitForConnection(mqttService); + embeddedConnection = new EmbeddedBrokerTools(configurationAdmin, mqttService).waitForConnection(); embeddedConnection.setQos(1); clientConnection = new MqttBrokerConnection(embeddedConnection.getHost(), embeddedConnection.getPort(),