[rest] stream json without starting a new thread (#4136)

Signed-off-by: Jörg Sautter <joerg.sautter@gmx.net>
This commit is contained in:
joerg1985 2024-03-10 09:46:39 +01:00 committed by GitHub
parent 942929973d
commit 094e4a6e04
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 74 additions and 49 deletions

View File

@ -17,6 +17,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.when;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
@ -24,6 +25,7 @@ import java.nio.charset.StandardCharsets;
import java.util.List;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.StreamingOutput;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.junit.jupiter.api.BeforeEach;
@ -79,7 +81,7 @@ public class ConfigDescriptionResourceTest {
public void shouldReturnAllConfigDescriptions() throws IOException {
Response response = resource.getAll(null, null);
assertThat(response.getStatus(), is(200));
assertThat(new String(((InputStream) response.getEntity()).readAllBytes(), StandardCharsets.UTF_8), is(
assertThat(toString(response.getEntity()), is(
"[{\"uri\":\"system:i18n\",\"parameters\":[{\"default\":\"test\",\"name\":\"name\",\"required\":false,\"type\":\"TEXT\",\"readOnly\":false,\"multiple\":false,\"advanced\":false,\"verify\":false,\"limitToOptions\":true,\"options\":[],\"filterCriteria\":[]}],\"parameterGroups\":[]},{\"uri\":\"system:ephemeris\",\"parameters\":[{\"name\":\"country\",\"required\":false,\"type\":\"TEXT\",\"readOnly\":false,\"multiple\":false,\"advanced\":false,\"verify\":false,\"limitToOptions\":true,\"options\":[],\"filterCriteria\":[]}],\"parameterGroups\":[]}]"));
}
@ -87,7 +89,7 @@ public class ConfigDescriptionResourceTest {
public void shouldReturnAConfigDescription() throws IOException {
Response response = resource.getByURI(null, CONFIG_DESCRIPTION_SYSTEM_I18N_URI);
assertThat(response.getStatus(), is(200));
assertThat(new String(((InputStream) response.getEntity()).readAllBytes(), StandardCharsets.UTF_8), is(
assertThat(toString(response.getEntity()), is(
"{\"uri\":\"system:i18n\",\"parameters\":[{\"default\":\"test\",\"name\":\"name\",\"required\":false,\"type\":\"TEXT\",\"readOnly\":false,\"multiple\":false,\"advanced\":false,\"verify\":false,\"limitToOptions\":true,\"options\":[],\"filterCriteria\":[]}],\"parameterGroups\":[]}"));
}
@ -96,4 +98,17 @@ public class ConfigDescriptionResourceTest {
Response response = resource.getByURI(null, "uri:invalid");
assertThat(response.getStatus(), is(404));
}
public String toString(Object entity) throws IOException {
byte[] bytes;
if (entity instanceof StreamingOutput streaming) {
try (ByteArrayOutputStream buffer = new ByteArrayOutputStream()) {
streaming.write(buffer);
bytes = buffer.toByteArray();
}
} else {
bytes = ((InputStream) entity).readAllBytes();
}
return new String(bytes, StandardCharsets.UTF_8);
}
}

View File

@ -23,6 +23,7 @@ import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.ResponseBuilder;
import javax.ws.rs.core.StreamingOutput;
import org.openhab.core.library.types.DateTimeType;
import org.slf4j.Logger;
@ -31,7 +32,6 @@ import org.slf4j.LoggerFactory;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonElement;
import com.google.gson.JsonIOException;
import com.google.gson.JsonObject;
import com.google.gson.stream.JsonWriter;
@ -40,6 +40,7 @@ import com.google.gson.stream.JsonWriter;
*
* @author Joerg Plewe - Initial contribution
* @author Henning Treu - Provide streaming capabilities
* @author Jörg Sautter - Improve streaming capabilities
*/
public class JSONResponse {
@ -152,33 +153,15 @@ public class JSONResponse {
return rp.build();
}
// The PipedOutputStream will only be closed by the writing thread
// since closing it during this method call would be too early.
// The receiver of the response will read from the pipe after this method returns.
PipedOutputStream out = new PipedOutputStream();
rp.entity((StreamingOutput) (target) -> {
// target must not be closed, see javadoc of javax.ws.rs.ext.MessageBodyWriter
JsonWriter jsonWriter = new JsonWriter(
new BufferedWriter(new OutputStreamWriter(target, StandardCharsets.UTF_8)));
try {
// we will not actively close the PipedInputStream since it is read by the receiving end
// and will be GC'ed once the response is consumed.
PipedJSONInputStream in = new PipedJSONInputStream(out);
rp.entity(in);
} catch (IOException e) {
throw new IllegalStateException(e);
}
Thread writerThread = new Thread(() -> {
try (JsonWriter jsonWriter = new JsonWriter(
new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8)))) {
gson.toJson(entity, entity.getClass(), jsonWriter);
jsonWriter.flush();
} catch (IOException | JsonIOException e) {
logger.debug("Error streaming JSON through PipedInputStream / PipedOutputStream.", e);
}
gson.toJson(entity, entity.getClass(), jsonWriter);
jsonWriter.flush();
});
writerThread.setDaemon(true); // daemonize thread to permit the JVM shutdown even if we stream JSON.
writerThread.start();
return rp.build();
}

View File

@ -17,8 +17,8 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.object.IsCompatibleType.typeCompatibleWith;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
@ -27,6 +27,7 @@ import java.util.List;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.StreamingOutput;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.junit.jupiter.api.Test;
@ -92,12 +93,11 @@ public class JSONResponseTest {
assertThat(response.getMediaType(), is(MediaType.APPLICATION_JSON_TYPE));
Object entity = response.getEntity();
assertThat(entity.getClass(), is(typeCompatibleWith(InputStream.class)));
assertThat(entity.getClass(), is(typeCompatibleWith(StreamingOutput.class)));
try (InputStream entityInStream = (InputStream) entity) {
byte[] entityValue = new byte[ENTITY_JSON_VALUE.length()];
entityInStream.read(entityValue);
assertThat(new String(entityValue), is(ENTITY_JSON_VALUE));
try (ByteArrayOutputStream buffer = new ByteArrayOutputStream()) {
((StreamingOutput) entity).write(buffer);
assertThat(new String(buffer.toByteArray(), StandardCharsets.UTF_8), is(ENTITY_JSON_VALUE));
}
}
@ -120,10 +120,11 @@ public class JSONResponseTest {
assertThat(response.getMediaType(), is(MediaType.APPLICATION_JSON_TYPE));
Object entity = response.getEntity();
assertThat(entity.getClass(), is(typeCompatibleWith(InputStream.class)));
assertThat(entity.getClass(), is(typeCompatibleWith(StreamingOutput.class)));
try (InputStream entityInStream = (InputStream) entity) {
String largeEntityJSON = new String(entityInStream.readAllBytes(), StandardCharsets.UTF_8);
try (ByteArrayOutputStream buffer = new ByteArrayOutputStream()) {
((StreamingOutput) entity).write(buffer);
String largeEntityJSON = new String(buffer.toByteArray(), StandardCharsets.UTF_8);
assertThat(largeEntityJSON, is(notNullValue()));
assertTrue(largeEntityJSON.startsWith("{"));
assertTrue(largeEntityJSON.endsWith("}"));

View File

@ -20,6 +20,7 @@ import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
@ -33,6 +34,7 @@ import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Request;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.StreamingOutput;
import javax.ws.rs.core.UriBuilder;
import javax.ws.rs.core.UriInfo;
@ -204,8 +206,7 @@ public class ItemResourceOSGiTest extends JavaOSGiTest {
Response response = itemResource.getItems(uriInfoMock, httpHeadersMock, request, null, null, "MyTag", null,
false, "type,name", false);
JsonElement result = JsonParser
.parseString(new String(((InputStream) response.getEntity()).readAllBytes(), StandardCharsets.UTF_8));
JsonElement result = JsonParser.parseString(toString(response.getEntity()));
JsonElement expected = JsonParser.parseString("[{type: \"Switch\", name: \"Switch\"}]");
assertEquals(expected, result);
}
@ -227,12 +228,12 @@ public class ItemResourceOSGiTest extends JavaOSGiTest {
}
private List<String> readItemNamesFromResponse(Response response) throws IOException {
String jsonResponse = new String(((InputStream) response.getEntity()).readAllBytes(), StandardCharsets.UTF_8);
String jsonResponse = toString(response.getEntity());
return JsonPath.read(jsonResponse, "$..name");
}
private List<String> readItemLabelsFromResponse(Response response) throws IOException, TransformationException {
String jsonResponse = new String(((InputStream) response.getEntity()).readAllBytes(), StandardCharsets.UTF_8);
String jsonResponse = toString(response.getEntity());
return JsonPath.read(jsonResponse, "$..label");
}
@ -256,7 +257,7 @@ public class ItemResourceOSGiTest extends JavaOSGiTest {
items = itemList.toArray(items);
Response response = itemResource.createOrUpdateItems(items);
String jsonResponse = new String(((InputStream) response.getEntity()).readAllBytes(), StandardCharsets.UTF_8);
String jsonResponse = toString(response.getEntity());
List<String> statusCodes = JsonPath.read(jsonResponse, "$..status");
// expect 2x created
@ -274,7 +275,7 @@ public class ItemResourceOSGiTest extends JavaOSGiTest {
items = itemList.toArray(items);
response = itemResource.createOrUpdateItems(items);
jsonResponse = new String(((InputStream) response.getEntity()).readAllBytes(), StandardCharsets.UTF_8);
jsonResponse = toString(response.getEntity());
statusCodes = JsonPath.read(jsonResponse, "$..status");
// expect error and updated
@ -380,4 +381,17 @@ public class ItemResourceOSGiTest extends JavaOSGiTest {
assertThat(response.getStatus(), is(404));
}
}
public String toString(Object entity) throws IOException {
byte[] bytes;
if (entity instanceof StreamingOutput streaming) {
try (ByteArrayOutputStream buffer = new ByteArrayOutputStream()) {
streaming.write(buffer);
bytes = buffer.toByteArray();
}
} else {
bytes = ((InputStream) entity).readAllBytes();
}
return new String(bytes, StandardCharsets.UTF_8);
}
}

View File

@ -19,6 +19,7 @@ import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.when;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
@ -27,6 +28,7 @@ import java.util.List;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.StreamingOutput;
import javax.ws.rs.core.UriBuilder;
import javax.ws.rs.core.UriInfo;
@ -148,29 +150,39 @@ public class ItemChannelLinkResourceOSGiTest extends JavaOSGiTest {
public void shouldIncludeEditableFields() throws IOException, JsonSyntaxException {
managedItemChannelLinkProvider.add(link1);
Response response = itemChannelLinkResource.getLink(ITEM_NAME1, CHANNEL_UID1);
JsonElement result = JsonParser
.parseString(new String(((InputStream) response.getEntity()).readAllBytes(), StandardCharsets.UTF_8));
JsonElement result = JsonParser.parseString(toString(response.getEntity()));
JsonElement expected = JsonParser.parseString("{channelUID:\"" + CHANNEL_UID1
+ "\", configuration:{}, editable:true, itemName:\"" + ITEM_NAME1 + "\"}");
assertEquals(expected, result);
response = itemChannelLinkResource.getAll(CHANNEL_UID1, ITEM_NAME1);
result = JsonParser
.parseString(new String(((InputStream) response.getEntity()).readAllBytes(), StandardCharsets.UTF_8));
result = JsonParser.parseString(toString(response.getEntity()));
expected = JsonParser.parseString("[{channelUID:\"" + CHANNEL_UID1
+ "\", configuration:{}, editable:true, itemName:\"" + ITEM_NAME1 + "\"}]");
assertEquals(expected, result);
response = itemChannelLinkResource.getLink(ITEM_NAME2, CHANNEL_UID2);
result = JsonParser
.parseString(new String(((InputStream) response.getEntity()).readAllBytes(), StandardCharsets.UTF_8));
result = JsonParser.parseString(toString(response.getEntity()));
expected = JsonParser.parseString("{channelUID:\"" + CHANNEL_UID2
+ "\", configuration:{}, editable:false, itemName:\"" + ITEM_NAME2 + "\", configuration:{}}");
assertEquals(expected, result);
}
private List<String> readItemNamesFromResponse(Response response) throws IOException {
String jsonResponse = new String(((InputStream) response.getEntity()).readAllBytes(), StandardCharsets.UTF_8);
String jsonResponse = toString(response.getEntity());
return JsonPath.read(jsonResponse, "$..itemName");
}
public String toString(Object entity) throws IOException {
byte[] bytes;
if (entity instanceof StreamingOutput streaming) {
try (ByteArrayOutputStream buffer = new ByteArrayOutputStream()) {
streaming.write(buffer);
bytes = buffer.toByteArray();
}
} else {
bytes = ((InputStream) entity).readAllBytes();
}
return new String(bytes, StandardCharsets.UTF_8);
}
}