From 094e4a6e040795d58463c4a0b5e74593543aab22 Mon Sep 17 00:00:00 2001 From: joerg1985 <16140691+joerg1985@users.noreply.github.com> Date: Sun, 10 Mar 2024 09:46:39 +0100 Subject: [PATCH] [rest] stream json without starting a new thread (#4136) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jörg Sautter --- .../config/ConfigDescriptionResourceTest.java | 19 +++++++++-- .../openhab/core/io/rest/JSONResponse.java | 33 +++++-------------- .../core/io/rest/JSONResponseTest.java | 19 ++++++----- .../internal/item/ItemResourceOSGiTest.java | 26 +++++++++++---- .../link/ItemChannelLinkResourceOSGiTest.java | 26 +++++++++++---- 5 files changed, 74 insertions(+), 49 deletions(-) diff --git a/bundles/org.openhab.core.io.rest.core/src/test/java/org/openhab/core/io/rest/core/internal/config/ConfigDescriptionResourceTest.java b/bundles/org.openhab.core.io.rest.core/src/test/java/org/openhab/core/io/rest/core/internal/config/ConfigDescriptionResourceTest.java index 3e3bc0447..1fa018aab 100644 --- a/bundles/org.openhab.core.io.rest.core/src/test/java/org/openhab/core/io/rest/core/internal/config/ConfigDescriptionResourceTest.java +++ b/bundles/org.openhab.core.io.rest.core/src/test/java/org/openhab/core/io/rest/core/internal/config/ConfigDescriptionResourceTest.java @@ -17,6 +17,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.when; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.net.URI; @@ -24,6 +25,7 @@ import java.nio.charset.StandardCharsets; import java.util.List; import javax.ws.rs.core.Response; +import javax.ws.rs.core.StreamingOutput; import org.eclipse.jdt.annotation.NonNullByDefault; import org.junit.jupiter.api.BeforeEach; @@ -79,7 +81,7 @@ public class ConfigDescriptionResourceTest { public void shouldReturnAllConfigDescriptions() throws IOException { Response response = resource.getAll(null, null); assertThat(response.getStatus(), is(200)); - assertThat(new String(((InputStream) response.getEntity()).readAllBytes(), StandardCharsets.UTF_8), is( + assertThat(toString(response.getEntity()), is( "[{\"uri\":\"system:i18n\",\"parameters\":[{\"default\":\"test\",\"name\":\"name\",\"required\":false,\"type\":\"TEXT\",\"readOnly\":false,\"multiple\":false,\"advanced\":false,\"verify\":false,\"limitToOptions\":true,\"options\":[],\"filterCriteria\":[]}],\"parameterGroups\":[]},{\"uri\":\"system:ephemeris\",\"parameters\":[{\"name\":\"country\",\"required\":false,\"type\":\"TEXT\",\"readOnly\":false,\"multiple\":false,\"advanced\":false,\"verify\":false,\"limitToOptions\":true,\"options\":[],\"filterCriteria\":[]}],\"parameterGroups\":[]}]")); } @@ -87,7 +89,7 @@ public class ConfigDescriptionResourceTest { public void shouldReturnAConfigDescription() throws IOException { Response response = resource.getByURI(null, CONFIG_DESCRIPTION_SYSTEM_I18N_URI); assertThat(response.getStatus(), is(200)); - assertThat(new String(((InputStream) response.getEntity()).readAllBytes(), StandardCharsets.UTF_8), is( + assertThat(toString(response.getEntity()), is( "{\"uri\":\"system:i18n\",\"parameters\":[{\"default\":\"test\",\"name\":\"name\",\"required\":false,\"type\":\"TEXT\",\"readOnly\":false,\"multiple\":false,\"advanced\":false,\"verify\":false,\"limitToOptions\":true,\"options\":[],\"filterCriteria\":[]}],\"parameterGroups\":[]}")); } @@ -96,4 +98,17 @@ public class ConfigDescriptionResourceTest { Response response = resource.getByURI(null, "uri:invalid"); assertThat(response.getStatus(), is(404)); } + + public String toString(Object entity) throws IOException { + byte[] bytes; + if (entity instanceof StreamingOutput streaming) { + try (ByteArrayOutputStream buffer = new ByteArrayOutputStream()) { + streaming.write(buffer); + bytes = buffer.toByteArray(); + } + } else { + bytes = ((InputStream) entity).readAllBytes(); + } + return new String(bytes, StandardCharsets.UTF_8); + } } diff --git a/bundles/org.openhab.core.io.rest/src/main/java/org/openhab/core/io/rest/JSONResponse.java b/bundles/org.openhab.core.io.rest/src/main/java/org/openhab/core/io/rest/JSONResponse.java index 9032b2150..acf74f0ef 100644 --- a/bundles/org.openhab.core.io.rest/src/main/java/org/openhab/core/io/rest/JSONResponse.java +++ b/bundles/org.openhab.core.io.rest/src/main/java/org/openhab/core/io/rest/JSONResponse.java @@ -23,6 +23,7 @@ import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.ResponseBuilder; +import javax.ws.rs.core.StreamingOutput; import org.openhab.core.library.types.DateTimeType; import org.slf4j.Logger; @@ -31,7 +32,6 @@ import org.slf4j.LoggerFactory; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.JsonElement; -import com.google.gson.JsonIOException; import com.google.gson.JsonObject; import com.google.gson.stream.JsonWriter; @@ -40,6 +40,7 @@ import com.google.gson.stream.JsonWriter; * * @author Joerg Plewe - Initial contribution * @author Henning Treu - Provide streaming capabilities + * @author Jörg Sautter - Improve streaming capabilities */ public class JSONResponse { @@ -152,33 +153,15 @@ public class JSONResponse { return rp.build(); } - // The PipedOutputStream will only be closed by the writing thread - // since closing it during this method call would be too early. - // The receiver of the response will read from the pipe after this method returns. - PipedOutputStream out = new PipedOutputStream(); + rp.entity((StreamingOutput) (target) -> { + // target must not be closed, see javadoc of javax.ws.rs.ext.MessageBodyWriter + JsonWriter jsonWriter = new JsonWriter( + new BufferedWriter(new OutputStreamWriter(target, StandardCharsets.UTF_8))); - try { - // we will not actively close the PipedInputStream since it is read by the receiving end - // and will be GC'ed once the response is consumed. - PipedJSONInputStream in = new PipedJSONInputStream(out); - rp.entity(in); - } catch (IOException e) { - throw new IllegalStateException(e); - } - - Thread writerThread = new Thread(() -> { - try (JsonWriter jsonWriter = new JsonWriter( - new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8)))) { - gson.toJson(entity, entity.getClass(), jsonWriter); - jsonWriter.flush(); - } catch (IOException | JsonIOException e) { - logger.debug("Error streaming JSON through PipedInputStream / PipedOutputStream.", e); - } + gson.toJson(entity, entity.getClass(), jsonWriter); + jsonWriter.flush(); }); - writerThread.setDaemon(true); // daemonize thread to permit the JVM shutdown even if we stream JSON. - writerThread.start(); - return rp.build(); } diff --git a/bundles/org.openhab.core.io.rest/src/test/java/org/openhab/core/io/rest/JSONResponseTest.java b/bundles/org.openhab.core.io.rest/src/test/java/org/openhab/core/io/rest/JSONResponseTest.java index 329b2af83..ad851227e 100644 --- a/bundles/org.openhab.core.io.rest/src/test/java/org/openhab/core/io/rest/JSONResponseTest.java +++ b/bundles/org.openhab.core.io.rest/src/test/java/org/openhab/core/io/rest/JSONResponseTest.java @@ -17,8 +17,8 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.object.IsCompatibleType.typeCompatibleWith; import static org.junit.jupiter.api.Assertions.assertTrue; +import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.io.InputStream; import java.math.BigDecimal; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -27,6 +27,7 @@ import java.util.List; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; +import javax.ws.rs.core.StreamingOutput; import org.eclipse.jdt.annotation.NonNullByDefault; import org.junit.jupiter.api.Test; @@ -92,12 +93,11 @@ public class JSONResponseTest { assertThat(response.getMediaType(), is(MediaType.APPLICATION_JSON_TYPE)); Object entity = response.getEntity(); - assertThat(entity.getClass(), is(typeCompatibleWith(InputStream.class))); + assertThat(entity.getClass(), is(typeCompatibleWith(StreamingOutput.class))); - try (InputStream entityInStream = (InputStream) entity) { - byte[] entityValue = new byte[ENTITY_JSON_VALUE.length()]; - entityInStream.read(entityValue); - assertThat(new String(entityValue), is(ENTITY_JSON_VALUE)); + try (ByteArrayOutputStream buffer = new ByteArrayOutputStream()) { + ((StreamingOutput) entity).write(buffer); + assertThat(new String(buffer.toByteArray(), StandardCharsets.UTF_8), is(ENTITY_JSON_VALUE)); } } @@ -120,10 +120,11 @@ public class JSONResponseTest { assertThat(response.getMediaType(), is(MediaType.APPLICATION_JSON_TYPE)); Object entity = response.getEntity(); - assertThat(entity.getClass(), is(typeCompatibleWith(InputStream.class))); + assertThat(entity.getClass(), is(typeCompatibleWith(StreamingOutput.class))); - try (InputStream entityInStream = (InputStream) entity) { - String largeEntityJSON = new String(entityInStream.readAllBytes(), StandardCharsets.UTF_8); + try (ByteArrayOutputStream buffer = new ByteArrayOutputStream()) { + ((StreamingOutput) entity).write(buffer); + String largeEntityJSON = new String(buffer.toByteArray(), StandardCharsets.UTF_8); assertThat(largeEntityJSON, is(notNullValue())); assertTrue(largeEntityJSON.startsWith("{")); assertTrue(largeEntityJSON.endsWith("}")); diff --git a/itests/org.openhab.core.io.rest.core.tests/src/main/java/org/openhab/core/io/rest/core/internal/item/ItemResourceOSGiTest.java b/itests/org.openhab.core.io.rest.core.tests/src/main/java/org/openhab/core/io/rest/core/internal/item/ItemResourceOSGiTest.java index bb1afad1e..f897c1474 100644 --- a/itests/org.openhab.core.io.rest.core.tests/src/main/java/org/openhab/core/io/rest/core/internal/item/ItemResourceOSGiTest.java +++ b/itests/org.openhab.core.io.rest.core.tests/src/main/java/org/openhab/core/io/rest/core/internal/item/ItemResourceOSGiTest.java @@ -20,6 +20,7 @@ import static org.junit.jupiter.api.Assertions.*; import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.*; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.net.URI; @@ -33,6 +34,7 @@ import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.Request; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; +import javax.ws.rs.core.StreamingOutput; import javax.ws.rs.core.UriBuilder; import javax.ws.rs.core.UriInfo; @@ -204,8 +206,7 @@ public class ItemResourceOSGiTest extends JavaOSGiTest { Response response = itemResource.getItems(uriInfoMock, httpHeadersMock, request, null, null, "MyTag", null, false, "type,name", false); - JsonElement result = JsonParser - .parseString(new String(((InputStream) response.getEntity()).readAllBytes(), StandardCharsets.UTF_8)); + JsonElement result = JsonParser.parseString(toString(response.getEntity())); JsonElement expected = JsonParser.parseString("[{type: \"Switch\", name: \"Switch\"}]"); assertEquals(expected, result); } @@ -227,12 +228,12 @@ public class ItemResourceOSGiTest extends JavaOSGiTest { } private List readItemNamesFromResponse(Response response) throws IOException { - String jsonResponse = new String(((InputStream) response.getEntity()).readAllBytes(), StandardCharsets.UTF_8); + String jsonResponse = toString(response.getEntity()); return JsonPath.read(jsonResponse, "$..name"); } private List readItemLabelsFromResponse(Response response) throws IOException, TransformationException { - String jsonResponse = new String(((InputStream) response.getEntity()).readAllBytes(), StandardCharsets.UTF_8); + String jsonResponse = toString(response.getEntity()); return JsonPath.read(jsonResponse, "$..label"); } @@ -256,7 +257,7 @@ public class ItemResourceOSGiTest extends JavaOSGiTest { items = itemList.toArray(items); Response response = itemResource.createOrUpdateItems(items); - String jsonResponse = new String(((InputStream) response.getEntity()).readAllBytes(), StandardCharsets.UTF_8); + String jsonResponse = toString(response.getEntity()); List statusCodes = JsonPath.read(jsonResponse, "$..status"); // expect 2x created @@ -274,7 +275,7 @@ public class ItemResourceOSGiTest extends JavaOSGiTest { items = itemList.toArray(items); response = itemResource.createOrUpdateItems(items); - jsonResponse = new String(((InputStream) response.getEntity()).readAllBytes(), StandardCharsets.UTF_8); + jsonResponse = toString(response.getEntity()); statusCodes = JsonPath.read(jsonResponse, "$..status"); // expect error and updated @@ -380,4 +381,17 @@ public class ItemResourceOSGiTest extends JavaOSGiTest { assertThat(response.getStatus(), is(404)); } } + + public String toString(Object entity) throws IOException { + byte[] bytes; + if (entity instanceof StreamingOutput streaming) { + try (ByteArrayOutputStream buffer = new ByteArrayOutputStream()) { + streaming.write(buffer); + bytes = buffer.toByteArray(); + } + } else { + bytes = ((InputStream) entity).readAllBytes(); + } + return new String(bytes, StandardCharsets.UTF_8); + } } diff --git a/itests/org.openhab.core.io.rest.core.tests/src/main/java/org/openhab/core/io/rest/core/internal/link/ItemChannelLinkResourceOSGiTest.java b/itests/org.openhab.core.io.rest.core.tests/src/main/java/org/openhab/core/io/rest/core/internal/link/ItemChannelLinkResourceOSGiTest.java index 012cd6087..e8aaffef9 100644 --- a/itests/org.openhab.core.io.rest.core.tests/src/main/java/org/openhab/core/io/rest/core/internal/link/ItemChannelLinkResourceOSGiTest.java +++ b/itests/org.openhab.core.io.rest.core.tests/src/main/java/org/openhab/core/io/rest/core/internal/link/ItemChannelLinkResourceOSGiTest.java @@ -19,6 +19,7 @@ import static org.junit.jupiter.api.Assertions.*; import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.when; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.net.URI; @@ -27,6 +28,7 @@ import java.util.List; import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.Response; +import javax.ws.rs.core.StreamingOutput; import javax.ws.rs.core.UriBuilder; import javax.ws.rs.core.UriInfo; @@ -148,29 +150,39 @@ public class ItemChannelLinkResourceOSGiTest extends JavaOSGiTest { public void shouldIncludeEditableFields() throws IOException, JsonSyntaxException { managedItemChannelLinkProvider.add(link1); Response response = itemChannelLinkResource.getLink(ITEM_NAME1, CHANNEL_UID1); - JsonElement result = JsonParser - .parseString(new String(((InputStream) response.getEntity()).readAllBytes(), StandardCharsets.UTF_8)); + JsonElement result = JsonParser.parseString(toString(response.getEntity())); JsonElement expected = JsonParser.parseString("{channelUID:\"" + CHANNEL_UID1 + "\", configuration:{}, editable:true, itemName:\"" + ITEM_NAME1 + "\"}"); assertEquals(expected, result); response = itemChannelLinkResource.getAll(CHANNEL_UID1, ITEM_NAME1); - result = JsonParser - .parseString(new String(((InputStream) response.getEntity()).readAllBytes(), StandardCharsets.UTF_8)); + result = JsonParser.parseString(toString(response.getEntity())); expected = JsonParser.parseString("[{channelUID:\"" + CHANNEL_UID1 + "\", configuration:{}, editable:true, itemName:\"" + ITEM_NAME1 + "\"}]"); assertEquals(expected, result); response = itemChannelLinkResource.getLink(ITEM_NAME2, CHANNEL_UID2); - result = JsonParser - .parseString(new String(((InputStream) response.getEntity()).readAllBytes(), StandardCharsets.UTF_8)); + result = JsonParser.parseString(toString(response.getEntity())); expected = JsonParser.parseString("{channelUID:\"" + CHANNEL_UID2 + "\", configuration:{}, editable:false, itemName:\"" + ITEM_NAME2 + "\", configuration:{}}"); assertEquals(expected, result); } private List readItemNamesFromResponse(Response response) throws IOException { - String jsonResponse = new String(((InputStream) response.getEntity()).readAllBytes(), StandardCharsets.UTF_8); + String jsonResponse = toString(response.getEntity()); return JsonPath.read(jsonResponse, "$..itemName"); } + + public String toString(Object entity) throws IOException { + byte[] bytes; + if (entity instanceof StreamingOutput streaming) { + try (ByteArrayOutputStream buffer = new ByteArrayOutputStream()) { + streaming.write(buffer); + bytes = buffer.toByteArray(); + } + } else { + bytes = ((InputStream) entity).readAllBytes(); + } + return new String(bytes, StandardCharsets.UTF_8); + } }