From e2ba4868a8b50f3b74665db589efcd5024cc4b0a Mon Sep 17 00:00:00 2001 From: Daniel Kec Date: Wed, 10 Jun 2020 10:59:57 +0200 Subject: [PATCH] SSE Flow.Subscriber injectable as event sink * Java Flow api used only by JerseyEventSink in java 11+ * Reactive streams tck tests for JerseyEventSink * Cancel on close propagation * Remove tests contradicting reactive streams spec https://github.com/reactive-streams/reactive-streams-jvm#2.13 Signed-off-by: Daniel Kec --- .../jersey/helidon/connector/sse/SseTest.java | 14 +- .../internal/jsr166/JerseyFlowSubscriber.java | 34 ++ .../internal/jsr166/JerseyFlowSubscriber.java | 20 ++ .../jersey/server/ChunkedOutput.java | 9 + .../server/model/IntrospectionModeller.java | 6 +- .../server/model/ResourceMethodValidator.java | 9 +- .../JavaResourceMethodDispatcherProvider.java | 4 +- .../model/internal/SseTypeResolver.java | 49 +++ .../media/sse/internal/JerseyEventSink.java | 108 ++++-- .../SseEventSinkValueParamProvider.java | 6 +- .../jersey/media/sse/localization.properties | 3 +- .../sse/internal/JerseyEventSinkTest.java | 40 +-- tests/integration/pom.xml | 1 + tests/integration/reactive-streams/pom.xml | 48 +++ .../integration/reactive-streams/sse/pom.xml | 104 ++++++ ...seyEventSinkBlackBoxSubscriberTckTest.java | 41 +++ ...seyEventSinkWhiteBoxSubscriberTckTest.java | 184 ++++++++++ .../sse/internal/JerseyFlowAdapters.java | 73 ++++ .../media/sse/internal/SseSubscriberTest.java | 313 ++++++++++++++++++ .../reactive-streams/sse/tck-suite.xml | 28 ++ 20 files changed, 1017 insertions(+), 77 deletions(-) create mode 100644 core-common/src/main/java11/org/glassfish/jersey/internal/jsr166/JerseyFlowSubscriber.java create mode 100644 core-common/src/main/java8/org/glassfish/jersey/internal/jsr166/JerseyFlowSubscriber.java create mode 100644 core-server/src/main/java/org/glassfish/jersey/server/model/internal/SseTypeResolver.java create mode 100644 tests/integration/reactive-streams/pom.xml create mode 100644 tests/integration/reactive-streams/sse/pom.xml create mode 100644 tests/integration/reactive-streams/sse/src/test/java/org/glassfish/jersey/media/sse/internal/JerseyEventSinkBlackBoxSubscriberTckTest.java create mode 100644 tests/integration/reactive-streams/sse/src/test/java/org/glassfish/jersey/media/sse/internal/JerseyEventSinkWhiteBoxSubscriberTckTest.java create mode 100644 tests/integration/reactive-streams/sse/src/test/java/org/glassfish/jersey/media/sse/internal/JerseyFlowAdapters.java create mode 100644 tests/integration/reactive-streams/sse/src/test/java/org/glassfish/jersey/media/sse/internal/SseSubscriberTest.java create mode 100644 tests/integration/reactive-streams/sse/tck-suite.xml diff --git a/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/sse/SseTest.java b/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/sse/SseTest.java index 61a03bf641..cba0763f2d 100644 --- a/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/sse/SseTest.java +++ b/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/sse/SseTest.java @@ -110,8 +110,10 @@ public void testSend() throws InterruptedException { final StringBuilder sb = new StringBuilder(); final CountDownLatch latch = new CountDownLatch(10); try (SseEventSource source = SseEventSource.target(target().path("simple")).build()) { - source.register((event) -> sb.append(event.readData())); - source.register((event) -> latch.countDown()); + source.register((event) -> { + sb.append(event.readData()); + latch.countDown(); + }); source.open(); latch.await(WAIT_TIME, TimeUnit.MILLISECONDS); @@ -158,9 +160,11 @@ private BroadcasterClient(WebTarget target) { private void register() throws InterruptedException { final CountDownLatch latch = new CountDownLatch(1); - source.register((event) -> message.append(event.readData())); - source.register((event) -> latch.countDown()); - source.register((event) -> messageLatch.countDown()); + source.register((event) -> { + message.append(event.readData()); + latch.countDown(); + messageLatch.countDown(); + }); source.open(); latch.await(WAIT_TIME, TimeUnit.MILLISECONDS); diff --git a/core-common/src/main/java11/org/glassfish/jersey/internal/jsr166/JerseyFlowSubscriber.java b/core-common/src/main/java11/org/glassfish/jersey/internal/jsr166/JerseyFlowSubscriber.java new file mode 100644 index 0000000000..f774e4f37a --- /dev/null +++ b/core-common/src/main/java11/org/glassfish/jersey/internal/jsr166/JerseyFlowSubscriber.java @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0, which is available at + * http://www.eclipse.org/legal/epl-2.0. + * + * This Source Code may also be made available under the following Secondary + * Licenses when the conditions for such availability set forth in the + * Eclipse Public License v. 2.0 are satisfied: GNU General Public License, + * version 2 with the GNU Classpath Exception, which is available at + * https://www.gnu.org/software/classpath/license.html. + * + * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + */ + +package org.glassfish.jersey.internal.jsr166; + +public interface JerseyFlowSubscriber extends Flow.Subscriber, java.util.concurrent.Flow.Subscriber { + @Override + default void onSubscribe(java.util.concurrent.Flow.Subscription subscription) { + this.onSubscribe(new Flow.Subscription() { + @Override + public void request(final long n) { + subscription.request(n); + } + + @Override + public void cancel() { + subscription.cancel(); + } + }); + } +} diff --git a/core-common/src/main/java8/org/glassfish/jersey/internal/jsr166/JerseyFlowSubscriber.java b/core-common/src/main/java8/org/glassfish/jersey/internal/jsr166/JerseyFlowSubscriber.java new file mode 100644 index 0000000000..dd25372cfc --- /dev/null +++ b/core-common/src/main/java8/org/glassfish/jersey/internal/jsr166/JerseyFlowSubscriber.java @@ -0,0 +1,20 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0, which is available at + * http://www.eclipse.org/legal/epl-2.0. + * + * This Source Code may also be made available under the following Secondary + * Licenses when the conditions for such availability set forth in the + * Eclipse Public License v. 2.0 are satisfied: GNU General Public License, + * version 2 with the GNU Classpath Exception, which is available at + * https://www.gnu.org/software/classpath/license.html. + * + * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + */ + +package org.glassfish.jersey.internal.jsr166; + +public interface JerseyFlowSubscriber extends Flow.Subscriber { +} diff --git a/core-server/src/main/java/org/glassfish/jersey/server/ChunkedOutput.java b/core-server/src/main/java/org/glassfish/jersey/server/ChunkedOutput.java index 616374604d..12cecf50b7 100644 --- a/core-server/src/main/java/org/glassfish/jersey/server/ChunkedOutput.java +++ b/core-server/src/main/java/org/glassfish/jersey/server/ChunkedOutput.java @@ -299,6 +299,7 @@ public Void call() throws IOException { closed = true; // remember the exception (it will get rethrown from finally clause, once it does it's work) ex = e; + onClose(e); } finally { if (closed) { try { @@ -349,6 +350,14 @@ public boolean isClosed() { return closed; } + /** + * Executed only in case of close being triggered by client. + * @param e Exception causing the close + */ + protected void onClose(Exception e){ + + } + @SuppressWarnings("EqualsWhichDoesntCheckParameterClass") @Override public boolean equals(final Object obj) { diff --git a/core-server/src/main/java/org/glassfish/jersey/server/model/IntrospectionModeller.java b/core-server/src/main/java/org/glassfish/jersey/server/model/IntrospectionModeller.java index 71e237cd1e..b2a42c9698 100644 --- a/core-server/src/main/java/org/glassfish/jersey/server/model/IntrospectionModeller.java +++ b/core-server/src/main/java/org/glassfish/jersey/server/model/IntrospectionModeller.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2010, 2019 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2010, 2020 Oracle and/or its affiliates. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v. 2.0, which is available at @@ -40,7 +40,6 @@ import javax.ws.rs.container.AsyncResponse; import javax.ws.rs.container.Suspended; import javax.ws.rs.core.MediaType; -import javax.ws.rs.sse.SseEventSink; import org.glassfish.jersey.internal.Errors; import org.glassfish.jersey.internal.util.Producer; @@ -49,6 +48,7 @@ import org.glassfish.jersey.server.ManagedAsync; import org.glassfish.jersey.server.internal.LocalizationMessages; import org.glassfish.jersey.server.model.internal.ModelHelper; +import org.glassfish.jersey.server.model.internal.SseTypeResolver; /** * Utility class for constructing resource model from JAX-RS annotated POJO. @@ -298,7 +298,7 @@ private static void introspectAsyncFeatures(AnnotatedMethod am, ResourceMethod.B } for (Class paramType : am.getParameterTypes()) { - if (SseEventSink.class.equals(paramType)) { + if (SseTypeResolver.isSseSinkParam(paramType)) { resourceMethodBuilder.sse(); } } diff --git a/core-server/src/main/java/org/glassfish/jersey/server/model/ResourceMethodValidator.java b/core-server/src/main/java/org/glassfish/jersey/server/model/ResourceMethodValidator.java index 583982f3d5..aa20dc8a1e 100644 --- a/core-server/src/main/java/org/glassfish/jersey/server/model/ResourceMethodValidator.java +++ b/core-server/src/main/java/org/glassfish/jersey/server/model/ResourceMethodValidator.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2012, 2018 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2012, 2020 Oracle and/or its affiliates. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v. 2.0, which is available at @@ -37,11 +37,11 @@ import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.QueryParam; -import javax.ws.rs.sse.SseEventSink; import org.glassfish.jersey.internal.Errors; import org.glassfish.jersey.server.ContainerRequest; import org.glassfish.jersey.server.internal.LocalizationMessages; +import org.glassfish.jersey.server.model.internal.SseTypeResolver; import org.glassfish.jersey.server.spi.internal.ParameterValueHelper; import org.glassfish.jersey.server.spi.internal.ValueParamProvider; @@ -85,7 +85,7 @@ private void checkMethod(ResourceMethod method) { if ("GET".equals(method.getHttpMethod())) { final long eventSinkCount = invocable.getParameters() .stream() - .filter(parameter -> SseEventSink.class.equals(parameter.getRawType())) + .filter(parameter -> SseTypeResolver.isSseSinkParam(parameter.getRawType())) .count(); final boolean isSse = eventSinkCount > 0; @@ -213,7 +213,8 @@ private void checkParameters(ResourceMethod method) { } private boolean isSseInjected(final Invocable invocable) { - return invocable.getParameters().stream().anyMatch(parameter -> SseEventSink.class.equals(parameter.getRawType())); + return invocable.getParameters().stream() + .anyMatch(parameter -> SseTypeResolver.isSseSinkParam(parameter.getRawType())); } private static final Set PARAM_ANNOTATION_SET = createParamAnnotationSet(); diff --git a/core-server/src/main/java/org/glassfish/jersey/server/model/internal/JavaResourceMethodDispatcherProvider.java b/core-server/src/main/java/org/glassfish/jersey/server/model/internal/JavaResourceMethodDispatcherProvider.java index 5c16fcbe30..c4598ac7d2 100644 --- a/core-server/src/main/java/org/glassfish/jersey/server/model/internal/JavaResourceMethodDispatcherProvider.java +++ b/core-server/src/main/java/org/glassfish/jersey/server/model/internal/JavaResourceMethodDispatcherProvider.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2011, 2019 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2011, 2020 Oracle and/or its affiliates. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v. 2.0, which is available at @@ -76,7 +76,7 @@ public ResourceMethodDispatcher create(final Invocable resourceMethod, // return type is void int i = 0; for (final Parameter parameter : resourceMethod.getParameters()) { - if (SseEventSink.class.equals(parameter.getRawType())) { + if (SseTypeResolver.isSseSinkParam(parameter.getRawType())) { resourceMethodDispatcher = new SseEventSinkInvoker(resourceMethod, invocationHandler, valueProviders, validator, i); break; diff --git a/core-server/src/main/java/org/glassfish/jersey/server/model/internal/SseTypeResolver.java b/core-server/src/main/java/org/glassfish/jersey/server/model/internal/SseTypeResolver.java new file mode 100644 index 0000000000..c9abef4cb4 --- /dev/null +++ b/core-server/src/main/java/org/glassfish/jersey/server/model/internal/SseTypeResolver.java @@ -0,0 +1,49 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0, which is available at + * http://www.eclipse.org/legal/epl-2.0. + * + * This Source Code may also be made available under the following Secondary + * Licenses when the conditions for such availability set forth in the + * Eclipse Public License v. 2.0 are satisfied: GNU General Public License, + * version 2 with the GNU Classpath Exception, which is available at + * https://www.gnu.org/software/classpath/license.html. + * + * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + */ +package org.glassfish.jersey.server.model.internal; + +import java.security.AccessController; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +import org.glassfish.jersey.internal.util.ReflectionHelper; + +public final class SseTypeResolver { + + private static final Set> SUPPORTED_SSE_SINK_TYPES; + + private SseTypeResolver() { + } + + static { + Set> set = new HashSet<>(8); + + set.add(org.glassfish.jersey.internal.jsr166.Flow.Subscriber.class); + set.add(javax.ws.rs.sse.SseEventSink.class); + Class clazz = AccessController + .doPrivileged(ReflectionHelper.classForNamePA("java.util.concurrent.Flow$Subscriber", null)); + + if (clazz != null) { + set.add(clazz); + } + SUPPORTED_SSE_SINK_TYPES = Collections.unmodifiableSet(set); + } + + public static boolean isSseSinkParam(Class type) { + return SUPPORTED_SSE_SINK_TYPES.contains(type); + } +} diff --git a/media/sse/src/main/java/org/glassfish/jersey/media/sse/internal/JerseyEventSink.java b/media/sse/src/main/java/org/glassfish/jersey/media/sse/internal/JerseyEventSink.java index 0206b0de8e..b43d5448b9 100644 --- a/media/sse/src/main/java/org/glassfish/jersey/media/sse/internal/JerseyEventSink.java +++ b/media/sse/src/main/java/org/glassfish/jersey/media/sse/internal/JerseyEventSink.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017, 2019 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2017, 2020 Oracle and/or its affiliates. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v. 2.0, which is available at @@ -18,19 +18,22 @@ import java.io.Flushable; import java.io.IOException; -import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Level; import java.util.logging.Logger; +import javax.inject.Provider; +import javax.ws.rs.core.MediaType; import javax.ws.rs.sse.OutboundSseEvent; import javax.ws.rs.sse.SseEventSink; -import javax.inject.Provider; - import org.glassfish.jersey.internal.jsr166.Flow; +import org.glassfish.jersey.internal.jsr166.JerseyFlowSubscriber; import org.glassfish.jersey.media.sse.LocalizationMessages; +import org.glassfish.jersey.media.sse.OutboundEvent; import org.glassfish.jersey.server.AsyncContext; import org.glassfish.jersey.server.ChunkedOutput; @@ -39,14 +42,16 @@ *

* The reference should be obtained via injection into the resource method. * - * @author Adam Lindenthal] + * @author Adam Lindenthal */ class JerseyEventSink extends ChunkedOutput - implements SseEventSink, Flushable, Flow.Subscriber { + implements SseEventSink, Flushable, JerseyFlowSubscriber { private static final Logger LOGGER = Logger.getLogger(JerseyEventSink.class.getName()); - private static final byte[] SSE_EVENT_DELIMITER = "\n".getBytes(Charset.forName("UTF-8")); + private static final byte[] SSE_EVENT_DELIMITER = "\n".getBytes(StandardCharsets.UTF_8); private Flow.Subscription subscription = null; + private final AtomicBoolean subscribed = new AtomicBoolean(false); + private volatile MediaType implicitMediaType = null; JerseyEventSink(Provider asyncContextProvider) { super(SSE_EVENT_DELIMITER, asyncContextProvider); @@ -54,43 +59,77 @@ class JerseyEventSink extends ChunkedOutput @Override public void onSubscribe(final Flow.Subscription subscription) { - checkClosed(); if (subscription == null) { throw new NullPointerException(LocalizationMessages.PARAM_NULL("subscription")); } + if (subscribed.getAndSet(true)) { + subscription.cancel(); + return; + } + this.subscription = subscription; - subscription.request(Long.MAX_VALUE); + if (isClosed()) { + subscription.cancel(); + } else { + subscription.request(Long.MAX_VALUE); + } } @Override - public void onNext(final OutboundSseEvent item) { - checkClosed(); + public void onNext(final Object item) { if (item == null) { throw new NullPointerException(LocalizationMessages.PARAM_NULL("outboundSseEvent")); } try { - write(item); - } catch (final IOException e) { - onError(e); + checkClosed(); + MediaType implicitType = resolveMediaType(item); + if (MediaType.SERVER_SENT_EVENTS_TYPE.equals(implicitType)) { + // already wrapped + write((OutboundSseEvent) item); + } else { + // implicit wrapping + // TODO: Jersey annotation for explicit media type + write(new OutboundEvent.Builder() + .mediaType(implicitType) + .data(item) + .build()); + } + } catch (final Throwable e) { + // spec allows only NPE to be thrown from onNext + LOGGER.log(Level.SEVERE, LocalizationMessages.EVENT_SINK_NEXT_FAILED(), e); + cancelSubscription(); } } @Override public void onError(final Throwable throwable) { - checkClosed(); if (throwable == null) { throw new NullPointerException(LocalizationMessages.PARAM_NULL("throwable")); } - subscription.cancel(); + try { + LOGGER.log(Level.SEVERE, LocalizationMessages.EVENT_SOURCE_DEFAULT_ONERROR(), throwable); + super.close(); + } catch (IOException e) { + LOGGER.log(Level.SEVERE, LocalizationMessages.EVENT_SINK_CLOSE_FAILED(), e); + } + } + + public void onComplete() { + try { + super.close(); + } catch (Throwable e) { + LOGGER.log(Level.SEVERE, LocalizationMessages.EVENT_SINK_CLOSE_FAILED(), e); + } } @Override public void close() { try { + cancelSubscription(); super.close(); } catch (IOException e) { - LOGGER.log(Level.INFO, LocalizationMessages.EVENT_SINK_CLOSE_FAILED(), e); + LOGGER.log(Level.SEVERE, LocalizationMessages.EVENT_SINK_CLOSE_FAILED(), e); } } @@ -101,7 +140,9 @@ public CompletionStage send(OutboundSseEvent event) { this.write(event); return CompletableFuture.completedFuture(null); } catch (IOException e) { - return CompletableFuture.completedFuture(e); + CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally(e); + return future; } } @@ -118,15 +159,38 @@ public void flush() throws IOException { super.flushQueue(); } - public void onComplete() { - checkClosed(); - subscription.cancel(); - close(); + @Override + protected void onClose(Exception e) { + cancelSubscription(); + } + + private void cancelSubscription() { + if (subscription != null) { + subscription.cancel(); + } } private void checkClosed() { if (isClosed()) { + cancelSubscription(); throw new IllegalStateException(LocalizationMessages.EVENT_SOURCE_ALREADY_CLOSED()); } } + + private MediaType resolveMediaType(Object item) { + // resolve lazily as all stream items are presumed to be of a same type + if (implicitMediaType == null) { + Class clazz = item.getClass(); + if (String.class.equals(clazz) + || Number.class.isAssignableFrom(clazz) + || Character.class.equals(clazz) + || Boolean.class.equals(clazz)) { + implicitMediaType = MediaType.TEXT_PLAIN_TYPE; + return implicitMediaType; + } + // unknown unwrapped objects are treated as json media type + implicitMediaType = MediaType.APPLICATION_JSON_TYPE; + } + return implicitMediaType; + } } diff --git a/media/sse/src/main/java/org/glassfish/jersey/media/sse/internal/SseEventSinkValueParamProvider.java b/media/sse/src/main/java/org/glassfish/jersey/media/sse/internal/SseEventSinkValueParamProvider.java index fa04efa88c..9bd85bd169 100644 --- a/media/sse/src/main/java/org/glassfish/jersey/media/sse/internal/SseEventSinkValueParamProvider.java +++ b/media/sse/src/main/java/org/glassfish/jersey/media/sse/internal/SseEventSinkValueParamProvider.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017, 2019 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2017, 2020 Oracle and/or its affiliates. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v. 2.0, which is available at @@ -29,6 +29,7 @@ import org.glassfish.jersey.server.internal.inject.AbstractValueParamProvider; import org.glassfish.jersey.server.internal.inject.MultivaluedParameterExtractorProvider; import org.glassfish.jersey.server.model.Parameter; +import org.glassfish.jersey.server.model.internal.SseTypeResolver; import org.glassfish.jersey.server.spi.internal.ValueParamProvider; /** @@ -59,7 +60,8 @@ protected Function createValueProvider(Parameter } final Class rawParameterType = parameter.getRawType(); - if (rawParameterType == SseEventSink.class && parameter.isAnnotationPresent(Context.class)) { + if (SseTypeResolver.isSseSinkParam(rawParameterType) + && parameter.isAnnotationPresent(Context.class)) { return new SseEventSinkValueSupplier(asyncContextSupplier); } return null; diff --git a/media/sse/src/main/resources/org/glassfish/jersey/media/sse/localization.properties b/media/sse/src/main/resources/org/glassfish/jersey/media/sse/localization.properties index d8ee42b27e..e00058451c 100644 --- a/media/sse/src/main/resources/org/glassfish/jersey/media/sse/localization.properties +++ b/media/sse/src/main/resources/org/glassfish/jersey/media/sse/localization.properties @@ -1,5 +1,5 @@ # -# Copyright (c) 2012, 2018 Oracle and/or its affiliates. All rights reserved. +# Copyright (c) 2012, 2020 Oracle and/or its affiliates. All rights reserved. # # This program and the accompanying materials are made available under the # terms of the Eclipse Public License v. 2.0, which is available at @@ -34,5 +34,6 @@ out.event.not.buildable=Cannot build outbound event. Either a comment or non-nul param.null="{0}" parameter is null. params.null=One or more of parameters is null. event.sink.close.failed=Closing EventSink failed. Could not close chunked output. +event.sink.next.failed=Processing onNext signal failed. unsupported.webtarget.type=Argument {0} is not a valid JerseyWebTarget instance. SseEventSource does not support other \ WebTarget implementations. diff --git a/media/sse/src/test/java/org/glassfish/jersey/media/sse/internal/JerseyEventSinkTest.java b/media/sse/src/test/java/org/glassfish/jersey/media/sse/internal/JerseyEventSinkTest.java index 465227a159..6b7da0d6cf 100644 --- a/media/sse/src/test/java/org/glassfish/jersey/media/sse/internal/JerseyEventSinkTest.java +++ b/media/sse/src/test/java/org/glassfish/jersey/media/sse/internal/JerseyEventSinkTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017, 2019 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2017, 2020 Oracle and/or its affiliates. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v. 2.0, which is available at @@ -16,6 +16,7 @@ package org.glassfish.jersey.media.sse.internal; +import org.glassfish.jersey.media.sse.OutboundEvent; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -28,43 +29,6 @@ public class JerseyEventSinkTest { @Rule public ExpectedException thrown = ExpectedException.none(); - - @Test - public void onSubscribe() throws Exception { - JerseyEventSink eventSink = new JerseyEventSink(null); - - eventSink.close(); - thrown.expect(IllegalStateException.class); - eventSink.onSubscribe(null); - } - - @Test - public void onNext() throws Exception { - JerseyEventSink eventSink = new JerseyEventSink(null); - - eventSink.close(); - thrown.expect(IllegalStateException.class); - eventSink.onNext(null); - } - - @Test - public void onError() throws Exception { - JerseyEventSink eventSink = new JerseyEventSink(null); - - eventSink.close(); - thrown.expect(IllegalStateException.class); - eventSink.onError(null); - } - - @Test - public void onComplete() throws Exception { - JerseyEventSink eventSink = new JerseyEventSink(null); - - eventSink.close(); - thrown.expect(IllegalStateException.class); - eventSink.onComplete(); - } - @Test public void test() throws Exception { JerseyEventSink eventSink = new JerseyEventSink(null); diff --git a/tests/integration/pom.xml b/tests/integration/pom.xml index 0b722a7d20..181b7f402f 100644 --- a/tests/integration/pom.xml +++ b/tests/integration/pom.xml @@ -133,6 +133,7 @@ spring4 spring5 tracing-support + reactive-streams diff --git a/tests/integration/reactive-streams/pom.xml b/tests/integration/reactive-streams/pom.xml new file mode 100644 index 0000000000..13f75d185b --- /dev/null +++ b/tests/integration/reactive-streams/pom.xml @@ -0,0 +1,48 @@ + + + + + project + org.glassfish.jersey.tests.integration + 2.32.0-SNAPSHOT + + 4.0.0 + pom + + org.glassfish.jersey.tests.integration.reactive + reactive-streams-integration-project + reactive-streams-integration-project + + sse + + + + + + org.apache.maven.plugins + maven-install-plugin + + false + + + + + \ No newline at end of file diff --git a/tests/integration/reactive-streams/sse/pom.xml b/tests/integration/reactive-streams/sse/pom.xml new file mode 100644 index 0000000000..c26b03bf7f --- /dev/null +++ b/tests/integration/reactive-streams/sse/pom.xml @@ -0,0 +1,104 @@ + + + + + reactive-streams-integration-project + org.glassfish.jersey.tests.integration.reactive + 2.32.0-SNAPSHOT + + 4.0.0 + + sse-reactive-streams-tck + + + + org.glassfish.jersey.media + jersey-media-sse + + + org.reactivestreams + reactive-streams-tck + 1.0.3 + test + + + junit + junit + test + + + org.testng + testng + test + + + io.reactivex.rxjava2 + rxjava + ${rxjava2.version} + test + + + org.glassfish.jersey.media + jersey-media-json-binding + ${project.version} + test + + + + org.glassfish.jersey.test-framework.providers + jersey-test-framework-provider-bundle + ${project.version} + pom + test + + + + + + + org.apache.maven.plugins + maven-failsafe-plugin + + + + integration-test + verify + + + + + + **/*TckTest.java + + + + + org.apache.maven.surefire + surefire-testng + 3.0.0-M3 + + + + + + + + \ No newline at end of file diff --git a/tests/integration/reactive-streams/sse/src/test/java/org/glassfish/jersey/media/sse/internal/JerseyEventSinkBlackBoxSubscriberTckTest.java b/tests/integration/reactive-streams/sse/src/test/java/org/glassfish/jersey/media/sse/internal/JerseyEventSinkBlackBoxSubscriberTckTest.java new file mode 100644 index 0000000000..6e6a1d599d --- /dev/null +++ b/tests/integration/reactive-streams/sse/src/test/java/org/glassfish/jersey/media/sse/internal/JerseyEventSinkBlackBoxSubscriberTckTest.java @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0, which is available at + * http://www.eclipse.org/legal/epl-2.0. + * + * This Source Code may also be made available under the following Secondary + * Licenses when the conditions for such availability set forth in the + * Eclipse Public License v. 2.0 are satisfied: GNU General Public License, + * version 2 with the GNU Classpath Exception, which is available at + * https://www.gnu.org/software/classpath/license.html. + * + * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + */ + +package org.glassfish.jersey.media.sse.internal; + +import org.reactivestreams.Subscriber; +import org.reactivestreams.tck.SubscriberBlackboxVerification; +import org.reactivestreams.tck.TestEnvironment; + +public class JerseyEventSinkBlackBoxSubscriberTckTest extends SubscriberBlackboxVerification { + + static final TestEnvironment env = new TestEnvironment(250); + + public JerseyEventSinkBlackBoxSubscriberTckTest() { + super(env); + } + + @Override + public Subscriber createSubscriber() { + JerseyEventSink jerseyEventSink = new JerseyEventSink(null); + return JerseyFlowAdapters.toSubscriber(jerseyEventSink); + } + + @Override + public String createElement(final int i) { + return "test" + i; + } +} diff --git a/tests/integration/reactive-streams/sse/src/test/java/org/glassfish/jersey/media/sse/internal/JerseyEventSinkWhiteBoxSubscriberTckTest.java b/tests/integration/reactive-streams/sse/src/test/java/org/glassfish/jersey/media/sse/internal/JerseyEventSinkWhiteBoxSubscriberTckTest.java new file mode 100644 index 0000000000..6aa5a863c7 --- /dev/null +++ b/tests/integration/reactive-streams/sse/src/test/java/org/glassfish/jersey/media/sse/internal/JerseyEventSinkWhiteBoxSubscriberTckTest.java @@ -0,0 +1,184 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0, which is available at + * http://www.eclipse.org/legal/epl-2.0. + * + * This Source Code may also be made available under the following Secondary + * Licenses when the conditions for such availability set forth in the + * Eclipse Public License v. 2.0 are satisfied: GNU General Public License, + * version 2 with the GNU Classpath Exception, which is available at + * https://www.gnu.org/software/classpath/license.html. + * + * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + */ + +package org.glassfish.jersey.media.sse.internal; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.testng.Assert.fail; + +import org.glassfish.jersey.internal.jsr166.Flow; +import org.reactivestreams.Subscriber; +import org.reactivestreams.tck.SubscriberWhiteboxVerification; +import org.reactivestreams.tck.TestEnvironment; +import org.testng.TestException; +import org.testng.annotations.Test; + +public class JerseyEventSinkWhiteBoxSubscriberTckTest extends SubscriberWhiteboxVerification { + + static final TestEnvironment env = new TestEnvironment(250); + + public JerseyEventSinkWhiteBoxSubscriberTckTest() { + super(env); + } + + @Test + @SuppressWarnings("unchecked") + public void noopOnNextAfterClose() throws InterruptedException { + WhiteboxTestStage stage = new WhiteboxTestStage(env, true); + SubscriberPuppet puppet = stage.puppet(); + WhiteboxSubscriberProbe probe = stage.probe; + JerseyEventSink eventSink = (JerseyEventSink) + ((JerseyFlowAdapters.AdaptedSubscriber) stage.sub()).jerseySubscriber; + puppet.triggerRequest(2); + stage.expectRequest(); + probe.expectNext(stage.signalNext()); + probe.expectNext(stage.signalNext()); + + puppet.triggerRequest(3000); + eventSink.close(); + stage.expectCancelling(); + stage.signalNext(); + } + + @Test + @SuppressWarnings("unchecked") + public void noopOnCompleteAfterClose() throws InterruptedException { + WhiteboxTestStage stage = new WhiteboxTestStage(env, true); + SubscriberPuppet puppet = stage.puppet(); + WhiteboxSubscriberProbe probe = stage.probe; + JerseyEventSink eventSink = (JerseyEventSink) + ((JerseyFlowAdapters.AdaptedSubscriber) stage.sub()).jerseySubscriber; + puppet.triggerRequest(2); + stage.expectRequest(); + probe.expectNext(stage.signalNext()); + probe.expectNext(stage.signalNext()); + + puppet.triggerRequest(3000); + eventSink.close(); + stage.sendCompletion(); + probe.expectCompletion(); + } + + @Test + @SuppressWarnings("unchecked") + public void noopOnErrorAfterClose() throws InterruptedException { + WhiteboxTestStage stage = new WhiteboxTestStage(env, true); + SubscriberPuppet puppet = stage.puppet(); + WhiteboxSubscriberProbe probe = stage.probe; + JerseyEventSink eventSink = (JerseyEventSink) + ((JerseyFlowAdapters.AdaptedSubscriber) stage.sub()).jerseySubscriber; + puppet.triggerRequest(2); + stage.expectRequest(); + probe.expectNext(stage.signalNext()); + probe.expectNext(stage.signalNext()); + + puppet.triggerRequest(3000); + eventSink.close(); + + TestException testException = new TestException("BOOM JERSEY!"); + + stage.sendError(testException); + probe.expectError(testException); + } + + @Test + @SuppressWarnings("unchecked") + public void cancelSubscriptionAfterClose() throws InterruptedException { + WhiteboxTestStage stage = new WhiteboxTestStage(env, true); + SubscriberPuppet puppet = stage.puppet(); + WhiteboxSubscriberProbe probe = stage.probe; + JerseyEventSink eventSink = (JerseyEventSink) + ((JerseyFlowAdapters.AdaptedSubscriber) stage.sub()).jerseySubscriber; + puppet.triggerRequest(2); + stage.expectRequest(); + probe.expectNext(stage.signalNext()); + probe.expectNext(stage.signalNext()); + + puppet.triggerRequest(3000); + eventSink.close(); + + stage.expectCancelling(); + + CompletableFuture cancelled2ndSubscription = new CompletableFuture<>(); + + eventSink.onSubscribe(new Flow.Subscription() { + @Override + public void request(final long n) { + + } + + @Override + public void cancel() { + cancelled2ndSubscription.complete(null); + } + }); + + try { + cancelled2ndSubscription.get(env.defaultTimeoutMillis(), TimeUnit.MILLISECONDS); + } catch (ExecutionException | TimeoutException e) { + fail("Cancel is expected on subscription on closed JerseyEventSink"); + } + } + + @Override + public Subscriber createSubscriber(final WhiteboxSubscriberProbe probe) { + JerseyEventSink jerseyEventSink = new JerseyEventSink(null) { + @Override + public void onSubscribe(final Flow.Subscription subscription) { + super.onSubscribe(subscription); + probe.registerOnSubscribe(new SubscriberPuppet() { + @Override + public void triggerRequest(final long elements) { + subscription.request(elements); + } + + @Override + public void signalCancel() { + subscription.cancel(); + } + }); + } + + @Override + public void onNext(final Object item) { + super.onNext(item); + probe.registerOnNext(item); + } + + @Override + public void onError(final Throwable throwable) { + super.onError(throwable); + probe.registerOnError(throwable); + } + + @Override + public void onComplete() { + super.onComplete(); + probe.registerOnComplete(); + } + }; + return JerseyFlowAdapters.toSubscriber(jerseyEventSink); + } + + @Override + public String createElement(final int i) { + return "test" + i; + } +} diff --git a/tests/integration/reactive-streams/sse/src/test/java/org/glassfish/jersey/media/sse/internal/JerseyFlowAdapters.java b/tests/integration/reactive-streams/sse/src/test/java/org/glassfish/jersey/media/sse/internal/JerseyFlowAdapters.java new file mode 100644 index 0000000000..57403595ab --- /dev/null +++ b/tests/integration/reactive-streams/sse/src/test/java/org/glassfish/jersey/media/sse/internal/JerseyFlowAdapters.java @@ -0,0 +1,73 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0, which is available at + * http://www.eclipse.org/legal/epl-2.0. + * + * This Source Code may also be made available under the following Secondary + * Licenses when the conditions for such availability set forth in the + * Eclipse Public License v. 2.0 are satisfied: GNU General Public License, + * version 2 with the GNU Classpath Exception, which is available at + * https://www.gnu.org/software/classpath/license.html. + * + * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + */ + +package org.glassfish.jersey.media.sse.internal; + +import org.glassfish.jersey.internal.jsr166.Flow; + +public class JerseyFlowAdapters { + + /** + * Adapt {@link org.glassfish.jersey.internal.jsr166.Flow.Subscriber} to + * {@link org.reactivestreams.Subscriber}. + * + * @param jerseySubscriber Jersey's repackaged {@link org.glassfish.jersey.internal.jsr166.Flow.Subscriber} + * @param payload type + * @return Reactive Streams's {@link org.reactivestreams.Subscriber} + */ + static org.reactivestreams.Subscriber toSubscriber(Flow.Subscriber jerseySubscriber) { + return new AdaptedSubscriber(jerseySubscriber); + } + + public static class AdaptedSubscriber implements org.reactivestreams.Subscriber { + + public final Flow.Subscriber jerseySubscriber; + + public AdaptedSubscriber(Flow.Subscriber jerseySubscriber) { + this.jerseySubscriber = jerseySubscriber; + } + + @Override + public void onSubscribe(final org.reactivestreams.Subscription subscription) { + jerseySubscriber.onSubscribe(new Flow.Subscription() { + @Override + public void request(final long n) { + subscription.request(n); + } + + @Override + public void cancel() { + subscription.cancel(); + } + }); + } + + @Override + public void onNext(final T t) { + jerseySubscriber.onNext(t); + } + + @Override + public void onError(final Throwable throwable) { + jerseySubscriber.onError(throwable); + } + + @Override + public void onComplete() { + jerseySubscriber.onComplete(); + } + } +} diff --git a/tests/integration/reactive-streams/sse/src/test/java/org/glassfish/jersey/media/sse/internal/SseSubscriberTest.java b/tests/integration/reactive-streams/sse/src/test/java/org/glassfish/jersey/media/sse/internal/SseSubscriberTest.java new file mode 100644 index 0000000000..27c06b1237 --- /dev/null +++ b/tests/integration/reactive-streams/sse/src/test/java/org/glassfish/jersey/media/sse/internal/SseSubscriberTest.java @@ -0,0 +1,313 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0, which is available at + * http://www.eclipse.org/legal/epl-2.0. + * + * This Source Code may also be made available under the following Secondary + * Licenses when the conditions for such availability set forth in the + * Eclipse Public License v. 2.0 are satisfied: GNU General Public License, + * version 2 with the GNU Classpath Exception, which is available at + * https://www.gnu.org/software/classpath/license.html. + * + * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + */ + +package org.glassfish.jersey.media.sse.internal; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import javax.inject.Singleton; +import javax.json.Json; +import javax.json.JsonBuilderFactory; +import javax.json.JsonObject; +import javax.json.bind.Jsonb; +import javax.json.bind.JsonbBuilder; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.client.WebTarget; +import javax.ws.rs.core.Application; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.sse.SseEventSource; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import io.reactivex.Flowable; +import org.glassfish.jersey.internal.jsr166.Flow; +import org.glassfish.jersey.server.ResourceConfig; +import org.glassfish.jersey.test.JerseyTest; +import org.junit.Test; + +/** + * @author Daniel Kec + */ +public class SseSubscriberTest extends JerseyTest { + + private static final int NUMBER_OF_TEST_MESSAGES = 5; + private static final String TEST_MESSAGE = "Jersey"; + private static final JsonBuilderFactory JSON_BUILDER = Json.createBuilderFactory(Collections.emptyMap()); + + @Override + protected Application configure() { + return new ResourceConfig(SseEndpoint.class); + } + + @Singleton + @Path("sse") + public static class SseEndpoint { + + @GET + @Path("short") + @Produces(MediaType.SERVER_SENT_EVENTS) + public void sseShort(@Context Flow.Subscriber subscriber) { + Flowable.interval(20, TimeUnit.MILLISECONDS) + .take(NUMBER_OF_TEST_MESSAGES) + .map(Long::shortValue) + .subscribe(JerseyFlowAdapters.toSubscriber(subscriber)); + } + + @GET + @Path("double") + @Produces(MediaType.SERVER_SENT_EVENTS) + public void sseDouble(@Context Flow.Subscriber subscriber) { + Flowable.interval(20, TimeUnit.MILLISECONDS) + .take(NUMBER_OF_TEST_MESSAGES) + .map(Long::doubleValue) + .subscribe(JerseyFlowAdapters.toSubscriber(subscriber)); + } + + @GET + @Path("byte") + @Produces(MediaType.SERVER_SENT_EVENTS) + public void sseByte(@Context Flow.Subscriber subscriber) { + Flowable.interval(20, TimeUnit.MILLISECONDS) + .take(NUMBER_OF_TEST_MESSAGES) + .map(Long::byteValue) + .subscribe(JerseyFlowAdapters.toSubscriber(subscriber)); + } + + @GET + @Path("integer") + @Produces(MediaType.SERVER_SENT_EVENTS) + public void sseInteger(@Context Flow.Subscriber subscriber) { + Flowable.interval(20, TimeUnit.MILLISECONDS) + .take(NUMBER_OF_TEST_MESSAGES) + .map(Long::intValue) + .subscribe(JerseyFlowAdapters.toSubscriber(subscriber)); + } + + @GET + @Path("long") + @Produces(MediaType.SERVER_SENT_EVENTS) + public void sseLong(@Context Flow.Subscriber subscriber) { + Flowable.just(0L, 1L, 2L, 3L, 4L) + .take(NUMBER_OF_TEST_MESSAGES) + .subscribe(JerseyFlowAdapters.toSubscriber(subscriber)); + } + + @GET + @Path("string") + @Produces(MediaType.SERVER_SENT_EVENTS) + public void sseString(@Context Flow.Subscriber subscriber) { + Flowable.interval(20, TimeUnit.MILLISECONDS) + .take(NUMBER_OF_TEST_MESSAGES) + .map(l -> TEST_MESSAGE + l) + .subscribe(JerseyFlowAdapters.toSubscriber(subscriber)); + } + + @GET + @Path("boolean") + @Produces(MediaType.SERVER_SENT_EVENTS) + public void sseBoolean(@Context Flow.Subscriber subscriber) { + Flowable.interval(20, TimeUnit.MILLISECONDS) + .take(NUMBER_OF_TEST_MESSAGES) + .map(l -> (l % 2) == 0) + .subscribe(JerseyFlowAdapters.toSubscriber(subscriber)); + } + + @GET + @Path("char") + @Produces(MediaType.SERVER_SENT_EVENTS) + public void sseChar(@Context Flow.Subscriber subscriber) { + Flowable.just("FRANK") + .flatMap(s -> Flowable.fromArray(s.chars().mapToObj(ch -> (char) ch).toArray(Character[]::new))) + .subscribe(JerseyFlowAdapters.toSubscriber(subscriber)); + } + + @GET + @Path("json-obj") + @Produces(MediaType.SERVER_SENT_EVENTS) + public void sseJsonObj(@Context Flow.Subscriber subscriber) { + Flowable.interval(20, TimeUnit.MILLISECONDS) + .take(NUMBER_OF_TEST_MESSAGES) + .map(l -> JSON_BUILDER.createObjectBuilder() + .add("brand", TEST_MESSAGE) + .add("model", "Model " + l) + .build()) + .subscribe(JerseyFlowAdapters.toSubscriber(subscriber)); + } + + @GET + @Path("json") + @Produces(MediaType.SERVER_SENT_EVENTS) + public void sseJson(@Context Flow.Subscriber subscriber) { + Flowable.interval(20, TimeUnit.MILLISECONDS) + .take(NUMBER_OF_TEST_MESSAGES) + .map(l -> new Car(TEST_MESSAGE, "Model " + l)) + .subscribe(JerseyFlowAdapters.toSubscriber(subscriber)); + } + } + + + @Test + public void testShort() throws InterruptedException { + assertEquals(Arrays.asList((short) 0, (short) 1, (short) 2, (short) 3, (short) 4), receive(Short.class, "sse/short")); + } + + @Test + public void testDouble() throws InterruptedException { + assertEquals(Arrays.asList(0.0, 1.0, 2.0, 3.0, 4.0), receive(Double.class, "sse/double")); + } + + @Test + public void testByte() throws InterruptedException { + assertEquals(Arrays.asList((byte) 0, (byte) 1, (byte) 2, (byte) 3, (byte) 4), receive(Byte.class, "sse/byte")); + } + + @Test + public void testInteger() throws InterruptedException { + assertEquals(Arrays.asList(0, 1, 2, 3, 4), receive(Integer.class, "sse/integer")); + } + + @Test + public void testBoolean() throws InterruptedException { + assertEquals(Arrays.asList(true, false, true, false, true), receive(Boolean.class, "sse/boolean")); + } + + @Test + public void testLong() throws InterruptedException { + assertEquals(Arrays.asList(0L, 1L, 2L, 3L, 4L), receive(Long.class, "sse/long")); + } + + @Test + public void testString() throws InterruptedException { + assertEquals(Arrays.asList(TEST_MESSAGE + 0, TEST_MESSAGE + 1, TEST_MESSAGE + 2, TEST_MESSAGE + 3, TEST_MESSAGE + 4), + receive(String.class, "sse/string")); + } + + @Test + public void testChar() throws InterruptedException { + assertEquals(Arrays.asList('F', 'R', 'A', 'N', 'K'), + receive(Character.class, "sse/char")); + } + + @Test + public void testJsonObj() throws InterruptedException { + Jsonb jsonb = JsonbBuilder.create(); + assertEquals(Arrays.asList( + new Car(TEST_MESSAGE, "Model 0"), + new Car(TEST_MESSAGE, "Model 1"), + new Car(TEST_MESSAGE, "Model 2"), + new Car(TEST_MESSAGE, "Model 3"), + new Car(TEST_MESSAGE, "Model 4") + ), + receive(String.class, "sse/json-obj") + .stream() + .map(s -> jsonb.fromJson(s, Car.class)) + .collect(Collectors.toList())); + } + + @Test + public void testJson() throws InterruptedException { + Jsonb jsonb = JsonbBuilder.create(); + assertEquals(Arrays.asList( + new Car(TEST_MESSAGE, "Model 0"), + new Car(TEST_MESSAGE, "Model 1"), + new Car(TEST_MESSAGE, "Model 2"), + new Car(TEST_MESSAGE, "Model 3"), + new Car(TEST_MESSAGE, "Model 4") + ), + receive(String.class, "sse/json") + .stream() + .map(s -> jsonb.fromJson(s, Car.class)) + .collect(Collectors.toList())); + } + + private List receive(Class type, String path) throws InterruptedException { + WebTarget sseTarget = target(path); + + ArrayList result = new ArrayList<>(NUMBER_OF_TEST_MESSAGES); + + final CountDownLatch eventLatch = new CountDownLatch(NUMBER_OF_TEST_MESSAGES); + SseEventSource eventSource = SseEventSource.target(sseTarget).build(); + eventSource.register((event) -> { + System.out.println("### Client received: " + event); + result.add(event.readData(type)); + eventLatch.countDown(); + }); + eventSource.open(); + + // client waiting for confirmation that resource method ended. + assertTrue(eventLatch.await(2, TimeUnit.SECONDS)); + return result; + } + + public static class Car { + private String brand; + private String model; + + public Car() { + } + + public Car(final String brand, final String model) { + this.brand = brand; + this.model = model; + } + + public String getBrand() { + return brand; + } + + public void setBrand(final String brand) { + this.brand = brand; + } + + public String getModel() { + return model; + } + + public void setModel(final String model) { + this.model = model; + } + + @Override + public String toString() { + return "Car{brand='" + brand + "', model='" + model + "'}"; + } + + @Override + public boolean equals(final Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Car car = (Car) o; + return Objects.equals(brand, car.brand) + && Objects.equals(model, car.model); + } + + @Override + public int hashCode() { + return Objects.hash(brand, model); + } + } +} diff --git a/tests/integration/reactive-streams/sse/tck-suite.xml b/tests/integration/reactive-streams/sse/tck-suite.xml new file mode 100644 index 0000000000..533b8d052d --- /dev/null +++ b/tests/integration/reactive-streams/sse/tck-suite.xml @@ -0,0 +1,28 @@ + + + + + + + + + + + + \ No newline at end of file