diff --git a/dependencies/pom.xml b/dependencies/pom.xml index eed10ed1b69..fe18857c4b3 100644 --- a/dependencies/pom.xml +++ b/dependencies/pom.xml @@ -73,7 +73,7 @@ 7.6.0.Final 1.0.0.Final 3.1.0 - 2.31 + 2.32 1.0.2 1.1.6 1.1.6 diff --git a/microprofile/server/src/test/java/io/helidon/microprofile/server/ServerSseTest.java b/microprofile/server/src/test/java/io/helidon/microprofile/server/ServerSseTest.java index e26740e13b7..76a44189da5 100644 --- a/microprofile/server/src/test/java/io/helidon/microprofile/server/ServerSseTest.java +++ b/microprofile/server/src/test/java/io/helidon/microprofile/server/ServerSseTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2019, 2020 Oracle and/or its affiliates. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,6 +18,9 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.Flow; import java.util.concurrent.TimeUnit; import javax.ws.rs.GET; @@ -33,10 +36,13 @@ import javax.ws.rs.sse.SseEventSink; import javax.ws.rs.sse.SseEventSource; +import io.helidon.common.reactive.Multi; + import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.fail; /** @@ -45,7 +51,8 @@ class ServerSseTest { private static Client client; - private CompletableFuture connClosedFuture = new CompletableFuture<>(); + private final CompletableFuture connClosedFuture = new CompletableFuture<>(); + private final CompletableFuture multiTestFuture = new CompletableFuture<>(); @BeforeAll static void initClass() { @@ -59,6 +66,15 @@ static void destroyClass() { @Test void testSse() throws Exception { + innerTest("test1", connClosedFuture); + } + + @Test + void testSseMulti() throws Exception { + innerTest("test2", multiTestFuture); + } + + private void innerTest(String endpoint, CompletableFuture future) throws InterruptedException { Server server = Server.builder() .addApplication("/", new TestApplication1()) .build(); @@ -66,18 +82,22 @@ void testSse() throws Exception { try { // Set up SSE event source - WebTarget target = client.target("http://localhost:" + server.port()).path("test1").path("sse"); + WebTarget target = client.target("http://localhost:" + server.port()).path(endpoint).path("sse"); SseEventSource sseEventSource = SseEventSource.target(target).build(); - sseEventSource.register(event -> System.out.println(event.readData(String.class))); + CountDownLatch count = new CountDownLatch(4); + sseEventSource.register(event -> { + event.readData(String.class); + count.countDown(); + }); // Open SSE source for a few millis and then close it sseEventSource.open(); - Thread.sleep(200); + assertThat("Await method should have not timeout", count.await(250, TimeUnit.MILLISECONDS)); sseEventSource.close(); // Wait for server to detect connection closed try { - connClosedFuture.get(2000, TimeUnit.MILLISECONDS); + future.get(2000, TimeUnit.MILLISECONDS); } catch (Exception e) { fail("Closing of SSE connection not detected!"); } @@ -89,7 +109,7 @@ void testSse() throws Exception { private final class TestApplication1 extends Application { @Override public Set getSingletons() { - return Set.of(new TestResource1()); + return Set.of(new TestResource1(), new TestResource2()); } } @@ -100,18 +120,34 @@ public final class TestResource1 { @Produces(MediaType.SERVER_SENT_EVENTS) public void listenToEvents(@Context SseEventSink eventSink, @Context Sse sse) { while (true) { - eventSink.send(sse.newEvent("hello")).thenAccept(t -> { - if (t != null) { - System.out.println(t); - connClosedFuture.complete(null); - } - }); try { - Thread.sleep(50); + eventSink.send(sse.newEvent("hello")).thenAccept(t -> { + if (t != null) { + System.out.println(t); + connClosedFuture.complete(null); + } + }); + TimeUnit.MILLISECONDS.sleep(5); } catch (InterruptedException e) { // falls through + } catch (IllegalStateException e) { + //https://github.com/oracle/helidon/issues/1290 + connClosedFuture.complete(null); } } } } + + @Path("/test2") + public final class TestResource2 { + @GET + @Path("sse") + @Produces(MediaType.SERVER_SENT_EVENTS) + public void listenToEvents(@Context Flow.Subscriber sub, @Context Sse sse) { + Multi.interval(5, TimeUnit.MILLISECONDS, Executors.newScheduledThreadPool(1)) + .map(String::valueOf) + .onCancel(() -> multiTestFuture.complete(null)) + .subscribe(sub); + } + } }