Skip to content

Commit

Permalink
Merge pull request quarkusio#29623 from mailq/main
Browse files Browse the repository at this point in the history
OutboundSseEvent is not correctly serialized
  • Loading branch information
geoand authored Dec 3, 2022
2 parents b82d50f + dc5f031 commit 1a5dd2a
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.sse.OutboundSseEvent;
import javax.ws.rs.sse.Sse;

import org.jboss.resteasy.reactive.common.util.MultiCollectors;
import org.reactivestreams.Publisher;
Expand Down Expand Up @@ -152,4 +155,13 @@ public Multi<String> sse() {
public Multi<String> sseThrows() {
throw new IllegalStateException("STOP");
}

@Path("sse/raw")
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
public Multi<OutboundSseEvent> sseRaw(@Context Sse sse) {
return Multi.createFrom().items(sse.newEventBuilder().id("one").data("uno").name("eins").build(),
sse.newEventBuilder().id("two").data("dos").name("zwei").build(),
sse.newEventBuilder().id("three").data("tres").name("drei").build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -237,4 +237,32 @@ public void testSseThrows() throws InterruptedException {
Assertions.assertEquals(1, errors.size());
}
}

@Test
public void testSseForMultiWithOutboundSseEvent() throws InterruptedException {
Client client = ClientBuilder.newBuilder().build();
WebTarget target = client.target(this.uri.toString() + "stream/sse/raw");
try (SseEventSource sse = SseEventSource.target(target).build()) {
CountDownLatch latch = new CountDownLatch(1);
List<Throwable> errors = new CopyOnWriteArrayList<>();
List<String> results = new CopyOnWriteArrayList<>();
List<String> ids = new CopyOnWriteArrayList<>();
List<String> names = new CopyOnWriteArrayList<>();
sse.register(event -> {
results.add(event.readData());
ids.add(event.getId());
names.add(event.getName());
}, error -> {
errors.add(error);
}, () -> {
latch.countDown();
});
sse.open();
Assertions.assertTrue(latch.await(20, TimeUnit.SECONDS));
Assertions.assertEquals(Arrays.asList("uno", "dos", "tres"), results);
Assertions.assertEquals(Arrays.asList("one", "two", "three"), ids);
Assertions.assertEquals(Arrays.asList("eins", "zwei", "drei"), names);
Assertions.assertEquals(0, errors.size());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.util.function.Consumer;

import javax.ws.rs.core.MediaType;
import javax.ws.rs.sse.OutboundSseEvent;

import org.jboss.logging.Logger;
import org.jboss.resteasy.reactive.common.util.RestMediaType;
Expand Down Expand Up @@ -49,7 +50,12 @@ private static class SseMultiSubscriber extends AbstractMultiSubscriber {

@Override
public void onNext(Object item) {
OutboundSseEventImpl event = new OutboundSseEventImpl.BuilderImpl().data(item).build();
OutboundSseEvent event;
if (item instanceof OutboundSseEvent) {
event = (OutboundSseEvent) item;
} else {
event = new OutboundSseEventImpl.BuilderImpl().data(item).build();
}
SseUtil.send(requestContext, event, customizers).whenComplete(new BiConsumer<Object, Throwable>() {
@Override
public void accept(Object v, Throwable t) {
Expand Down

0 comments on commit 1a5dd2a

Please sign in to comment.