diff --git a/containers/netty-http/pom.xml b/containers/netty-http/pom.xml index 5b8282875ef..f0d8169ad09 100644 --- a/containers/netty-http/pom.xml +++ b/containers/netty-http/pom.xml @@ -42,6 +42,27 @@ jersey-netty-connector ${project.version} + + org.glassfish.jersey.test-framework + jersey-test-framework-core + ${project.version} + test + + + com.google.guava + guava + test + + + org.glassfish.jersey.inject + jersey-hk2 + ${project.version} + + + org.glassfish.jersey.core + jersey-server + ${project.version} + diff --git a/containers/netty-http/src/test/java/org/glassfish/jersey/netty/httpserver/HelloWorldTest.java b/containers/netty-http/src/test/java/org/glassfish/jersey/netty/httpserver/HelloWorldTest.java new file mode 100644 index 00000000000..c3791938156 --- /dev/null +++ b/containers/netty-http/src/test/java/org/glassfish/jersey/netty/httpserver/HelloWorldTest.java @@ -0,0 +1,175 @@ +/* + * Copyright (c) 2016, 2019 Oracle and/or its affiliates. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0, which is available at + * http://www.eclipse.org/legal/epl-2.0. + * + * This Source Code may also be made available under the following Secondary + * Licenses when the conditions for such availability set forth in the + * Eclipse Public License v. 2.0 are satisfied: GNU General Public License, + * version 2 with the GNU Classpath Exception, which is available at + * https://www.gnu.org/software/classpath/license.html. + * + * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + */ + +package org.glassfish.jersey.netty.httpserver; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.logging.Logger; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.client.InvocationCallback; +import javax.ws.rs.core.Application; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import org.glassfish.jersey.client.ClientConfig; +import org.glassfish.jersey.client.ClientProperties; +import org.glassfish.jersey.logging.LoggingFeature; +import org.glassfish.jersey.netty.connector.NettyConnectorProvider; +import org.glassfish.jersey.server.ResourceConfig; +import org.glassfish.jersey.test.JerseyTest; +import org.junit.Test; + +/** + * @author Pavel Bucek + */ +public class HelloWorldTest extends JerseyTest { + + private static final Logger LOGGER = Logger.getLogger(HelloWorldTest.class.getName()); + private static final String ROOT_PATH = "helloworld"; + + public HelloWorldTest() { + super(new NettyTestContainerFactory()); + } + + @Path("helloworld") + public static class HelloWorldResource { + public static final String CLICHED_MESSAGE = "Hello World!"; + + @GET + @Produces("text/plain") + public String getHello() { + return CLICHED_MESSAGE; + } + } + + @Override + protected Application configure() { + ResourceConfig config = new ResourceConfig(HelloWorldResource.class); + config.register(new LoggingFeature(LOGGER, LoggingFeature.Verbosity.PAYLOAD_ANY)); + return config; + } + + @Override + protected void configureClient(ClientConfig config) { + config.property(ClientProperties.ASYNC_THREADPOOL_SIZE, 20); + config.connectorProvider(new NettyConnectorProvider()); + } + + @Test + public void testConnection() { + Response response = target().path(ROOT_PATH).request("text/plain").get(); + assertEquals(200, response.getStatus()); + } + + @Test + public void testClientStringResponse() { + String s = target().path(ROOT_PATH).request().get(String.class); + assertEquals(HelloWorldResource.CLICHED_MESSAGE, s); + } + + @Test + public void testAsyncClientRequests() throws InterruptedException { + final int REQUESTS = 20; + final CountDownLatch latch = new CountDownLatch(REQUESTS); + final long tic = System.currentTimeMillis(); + for (int i = 0; i < REQUESTS; i++) { + final int id = i; + target().path(ROOT_PATH).request().async().get(new InvocationCallback() { + @Override + public void completed(Response response) { + try { + final String result = response.readEntity(String.class); + assertEquals(HelloWorldResource.CLICHED_MESSAGE, result); + } finally { + latch.countDown(); + } + } + + @Override + public void failed(Throwable error) { + error.printStackTrace(); + latch.countDown(); + } + }); + } + assertTrue(latch.await(10 * getAsyncTimeoutMultiplier(), TimeUnit.SECONDS)); + final long toc = System.currentTimeMillis(); + Logger.getLogger(HelloWorldTest.class.getName()).info("Executed in: " + (toc - tic)); + } + + @Test + public void testHead() { + Response response = target().path(ROOT_PATH).request().head(); + assertEquals(200, response.getStatus()); + assertEquals(MediaType.TEXT_PLAIN_TYPE, response.getMediaType()); + } + + @Test + public void testFooBarOptions() { + Response response = target().path(ROOT_PATH).request().header("Accept", "foo/bar").options(); + assertEquals(200, response.getStatus()); + final String allowHeader = response.getHeaderString("Allow"); + _checkAllowContent(allowHeader); + assertEquals("foo/bar", response.getMediaType().toString()); + assertEquals(0, response.getLength()); + } + + @Test + public void testTextPlainOptions() { + Response response = target().path(ROOT_PATH).request().header("Accept", MediaType.TEXT_PLAIN).options(); + assertEquals(200, response.getStatus()); + final String allowHeader = response.getHeaderString("Allow"); + _checkAllowContent(allowHeader); + assertEquals(MediaType.TEXT_PLAIN_TYPE, response.getMediaType()); + final String responseBody = response.readEntity(String.class); + _checkAllowContent(responseBody); + } + + public void testJson() { + Response response = target().path(ROOT_PATH).request().header("Accept", MediaType.APPLICATION_JSON).options(); + assertEquals(200, response.getStatus()); + final String allowHeader = response.getHeaderString("Allow"); + _checkAllowContent(allowHeader); + assertEquals(MediaType.APPLICATION_JSON, response.getMediaType()); + final String responseBody = response.readEntity(String.class); + _checkAllowContent(responseBody); + + } + + private void _checkAllowContent(final String content) { + assertTrue(content.contains("GET")); + assertTrue(content.contains("HEAD")); + assertTrue(content.contains("OPTIONS")); + } + + @Test + public void testMissingResourceNotFound() { + Response response; + + response = target().path(ROOT_PATH + "arbitrary").request().get(); + assertEquals(404, response.getStatus()); + response.close(); + + response = target().path(ROOT_PATH).path("arbitrary").request().get(); + assertEquals(404, response.getStatus()); + response.close(); + } + +} diff --git a/containers/netty-http/src/test/java/org/glassfish/jersey/netty/httpserver/Helper.java b/containers/netty-http/src/test/java/org/glassfish/jersey/netty/httpserver/Helper.java new file mode 100644 index 00000000000..fa64d2f1a5e --- /dev/null +++ b/containers/netty-http/src/test/java/org/glassfish/jersey/netty/httpserver/Helper.java @@ -0,0 +1,89 @@ +/* + * Copyright (c) 2016, 2019 Oracle and/or its affiliates. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0, which is available at + * http://www.eclipse.org/legal/epl-2.0. + * + * This Source Code may also be made available under the following Secondary + * Licenses when the conditions for such availability set forth in the + * Eclipse Public License v. 2.0 are satisfied: GNU General Public License, + * version 2 with the GNU Classpath Exception, which is available at + * https://www.gnu.org/software/classpath/license.html. + * + * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + */ + +package org.glassfish.jersey.netty.httpserver; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.lang.annotation.Annotation; +import java.lang.reflect.Type; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.MultivaluedMap; +import javax.ws.rs.ext.MessageBodyWriter; +import org.glassfish.jersey.netty.httpserver.HugeEntityTest.TestEntity; + +public class Helper { + + public static final int ONE_MB_IN_BYTES = 1024 * 1024; // 1M + public static final long TWENTY_GB_IN_BYTES = 20L * 1024L * 1024L * 1024L; // 20G seems sufficient + + public static long drainAndCountInputStream(InputStream in) throws IOException { + long totalBytesRead = 0L; + + byte[] buffer = new byte[ONE_MB_IN_BYTES]; + int read; + do { + read = in.read(buffer); + if (read > 0) { + totalBytesRead += read; + } + } while (read != -1); + + return totalBytesRead; + } + + /** + * Utility writer that generates that many zero bytes as given by the input entity size field. + */ + public static class TestEntityWriter implements MessageBodyWriter { + + @Override + public boolean isWriteable(Class type, Type genericType, Annotation[] annotations, MediaType mediaType) { + return type == TestEntity.class; + } + + @Override + public long getSize(TestEntity t, Class type, Type genericType, Annotation[] annotations, MediaType mediaType) { + return -1; // no matter what we return here, the output will get chunk-encoded + } + + @Override + public void writeTo(TestEntity t, + Class type, + Type genericType, + Annotation[] annotations, + MediaType mediaType, + MultivaluedMap httpHeaders, + OutputStream entityStream) throws IOException, WebApplicationException { + + final byte[] buffer = new byte[Helper.ONE_MB_IN_BYTES]; + final long bufferCount = t.size / Helper.ONE_MB_IN_BYTES; + final int remainder = (int) (t.size % Helper.ONE_MB_IN_BYTES); + + for (long b = 0; b < bufferCount; b++) { + entityStream.write(buffer); + } + + if (remainder > 0) { + entityStream.write(buffer, 0, remainder); + } + } + } + + +} diff --git a/containers/netty-http/src/test/java/org/glassfish/jersey/netty/httpserver/HugeEntityTest.java b/containers/netty-http/src/test/java/org/glassfish/jersey/netty/httpserver/HugeEntityTest.java new file mode 100644 index 00000000000..ede70800898 --- /dev/null +++ b/containers/netty-http/src/test/java/org/glassfish/jersey/netty/httpserver/HugeEntityTest.java @@ -0,0 +1,166 @@ +/* + * Copyright (c) 2016, 2019 Oracle and/or its affiliates. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0, which is available at + * http://www.eclipse.org/legal/epl-2.0. + * + * This Source Code may also be made available under the following Secondary + * Licenses when the conditions for such availability set forth in the + * Eclipse Public License v. 2.0 are satisfied: GNU General Public License, + * version 2 with the GNU Classpath Exception, which is available at + * https://www.gnu.org/software/classpath/license.html. + * + * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + */ + +package org.glassfish.jersey.netty.httpserver; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertEquals; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.concurrent.Future; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.ProcessingException; +import javax.ws.rs.Produces; +import javax.ws.rs.client.Entity; +import javax.ws.rs.core.Application; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import org.glassfish.jersey.client.ClientConfig; +import org.glassfish.jersey.client.ClientProperties; +import org.glassfish.jersey.client.RequestEntityProcessing; +import org.glassfish.jersey.netty.connector.NettyConnectorProvider; +import org.glassfish.jersey.netty.httpserver.Helper.TestEntityWriter; +import org.glassfish.jersey.server.ResourceConfig; +import org.glassfish.jersey.test.JerseyTest; +import org.junit.Assert; +import org.junit.Test; + +/** + * Test to make sure huge entity gets chunk-encoded. + * + * @author Jakub Podlesak + */ +public class HugeEntityTest extends JerseyTest { + + public HugeEntityTest() { + super(new NettyTestContainerFactory()); + } + /** + * JERSEY-2337 reproducer. The resource is used to check the right amount of data + * is being received from the client and also gives us ability to check we receive + * correct data. + */ + @Path("/") + public static class ConsumerResource { + + /** + * Return back the count of bytes received. + * This way, we should be able to consume a huge amount of data. + */ + @POST + @Path("size") + public String post(InputStream in) throws IOException { + return String.valueOf(Helper.drainAndCountInputStream(in)); + } + + @POST + @Path("echo") + public String echo(String s) { + return s; + } + + @POST + @Path("buffer_overflow_test") + @Produces(MediaType.TEXT_PLAIN) + public Response bufferOverflowTest(InputStream in) throws IOException { + assertEquals(Helper.ONE_MB_IN_BYTES, Helper.drainAndCountInputStream(in)); + return Response.ok(new ByteArrayInputStream(new byte[Helper.ONE_MB_IN_BYTES]), MediaType.TEXT_PLAIN).build(); + } + } + + @Override + protected Application configure() { + return new ResourceConfig(ConsumerResource.class); + } + + @Override + protected void configureClient(ClientConfig config) { + config.register(TestEntityWriter.class); + config.property(ClientProperties.REQUEST_ENTITY_PROCESSING, RequestEntityProcessing.CHUNKED); + config.connectorProvider(new NettyConnectorProvider()); + } + + public static class TestEntity { + + final long size; + + public TestEntity(long size) { + this.size = size; + } + } + /** + * JERSEY-2337 reproducer. We are going to send huge amount of data over the wire. + * Should not the data have been chunk-encoded, we would easily run out of memory. + * + * @throws Exception in case of a test error. + */ + @Test + public void testPost() throws Exception { + Response response = target("/size").request() + .post(Entity.entity(new TestEntity(Helper.TWENTY_GB_IN_BYTES), + MediaType.APPLICATION_OCTET_STREAM_TYPE)); + String content = response.readEntity(String.class); + assertThat(Long.parseLong(content), equalTo(Helper.TWENTY_GB_IN_BYTES)); + + // just to check the right data have been transfered. + response = target("/echo").request().post(Entity.text("Hey Sync!")); + assertThat(response.readEntity(String.class), equalTo("Hey Sync!")); + } + + /** + * Tests buffer overflow - https://github.com/eclipse-ee4j/jersey/issues/3500 + * @throws IOException + * + * @throws Exception + */ + @Test + public void testPostResponse() throws IOException { + Response response = target("/buffer_overflow_test").request().post(Entity + .entity(new TestEntity(Helper.ONE_MB_IN_BYTES), MediaType.APPLICATION_OCTET_STREAM_TYPE)); + try { + InputStream is = response.readEntity(InputStream.class); + assertEquals(Helper.ONE_MB_IN_BYTES, Helper.drainAndCountInputStream(is)); + } catch (ProcessingException pe) { + Assert.fail(); + } + + // just to check the right data have been transfered. + response = target("/echo").request().post(Entity.text("Hey Sync!")); + assertThat(response.readEntity(String.class), equalTo("Hey Sync!")); + } + + /** + * JERSEY-2337 reproducer. We are going to send huge amount of data over the wire. This time in an async fashion. + * Should not the data have been chunk-encoded, we would easily run out of memory. + * + * @throws Exception in case of a test error. + */ + @Test + public void testAsyncPost() throws Exception { + Future response = target("/size").request().async() + .post(Entity.entity(new TestEntity(Helper.TWENTY_GB_IN_BYTES), + MediaType.APPLICATION_OCTET_STREAM_TYPE)); + final String content = response.get().readEntity(String.class); + assertThat(Long.parseLong(content), equalTo(Helper.TWENTY_GB_IN_BYTES)); + + // just to check the right data have been transfered. + response = target("/echo").request().async().post(Entity.text("Hey Async!")); + assertThat(response.get().readEntity(String.class), equalTo("Hey Async!")); + } +} diff --git a/containers/netty-http/src/test/java/org/glassfish/jersey/netty/httpserver/NettyTestContainerFactory.java b/containers/netty-http/src/test/java/org/glassfish/jersey/netty/httpserver/NettyTestContainerFactory.java new file mode 100644 index 00000000000..1b957e2d929 --- /dev/null +++ b/containers/netty-http/src/test/java/org/glassfish/jersey/netty/httpserver/NettyTestContainerFactory.java @@ -0,0 +1,81 @@ +/* + * Copyright (c) 2016, 2019 Oracle and/or its affiliates. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0, which is available at + * http://www.eclipse.org/legal/epl-2.0. + * + * This Source Code may also be made available under the following Secondary + * Licenses when the conditions for such availability set forth in the + * Eclipse Public License v. 2.0 are satisfied: GNU General Public License, + * version 2 with the GNU Classpath Exception, which is available at + * https://www.gnu.org/software/classpath/license.html. + * + * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + */ + +package org.glassfish.jersey.netty.httpserver; + +import java.net.URI; +import javax.ws.rs.core.UriBuilder; +import org.glassfish.jersey.client.ClientConfig; +import org.glassfish.jersey.test.DeploymentContext; +import org.glassfish.jersey.test.spi.TestContainer; +import org.glassfish.jersey.test.spi.TestContainerFactory; +import io.netty.channel.Channel; + +/** + * Netty test container factory. + * + * @author Pavel Bucek + */ +public class NettyTestContainerFactory implements TestContainerFactory { + + @Override + public TestContainer create(URI baseUri, DeploymentContext deploymentContext) { + return new NettyTestContainer(baseUri, deploymentContext); + } + + /** + * Netty Test Container. + *

+ * All functionality is deferred to {@link NettyHttpContainerProvider}. + */ + private static class NettyTestContainer implements TestContainer { + + private final URI baseUri; + private final DeploymentContext deploymentContext; + + private volatile Channel server; + + NettyTestContainer(URI baseUri, DeploymentContext deploymentContext) { + this.baseUri = UriBuilder.fromUri(baseUri).path(deploymentContext.getContextPath()).build(); + this.deploymentContext = deploymentContext; + } + + @Override + public ClientConfig getClientConfig() { + return null; + } + + @Override + public URI getBaseUri() { + return baseUri; + } + + @Override + public void start() { + server = NettyHttpContainerProvider.createServer(getBaseUri(), deploymentContext.getResourceConfig(), false); + } + + @Override + public void stop() { + try { + server.close().sync(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + +} diff --git a/containers/netty-http/src/test/java/org/glassfish/jersey/netty/httpserver/ParallelTest.java b/containers/netty-http/src/test/java/org/glassfish/jersey/netty/httpserver/ParallelTest.java new file mode 100644 index 00000000000..1fa616d7a85 --- /dev/null +++ b/containers/netty-http/src/test/java/org/glassfish/jersey/netty/httpserver/ParallelTest.java @@ -0,0 +1,164 @@ +/* + * Copyright (c) 2016, 2018 Oracle and/or its affiliates. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0, which is available at + * http://www.eclipse.org/legal/epl-2.0. + * + * This Source Code may also be made available under the following Secondary + * Licenses when the conditions for such availability set forth in the + * Eclipse Public License v. 2.0 are satisfied: GNU General Public License, + * version 2 with the GNU Classpath Exception, which is available at + * https://www.gnu.org/software/classpath/license.html. + * + * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + */ + +package org.glassfish.jersey.netty.httpserver; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Level; +import java.util.logging.Logger; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.client.Entity; +import javax.ws.rs.client.WebTarget; +import javax.ws.rs.core.Application; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import org.glassfish.jersey.client.ClientConfig; +import org.glassfish.jersey.client.ClientProperties; +import org.glassfish.jersey.client.RequestEntityProcessing; +import org.glassfish.jersey.netty.connector.NettyConnectorProvider; +import org.glassfish.jersey.netty.httpserver.Helper.TestEntityWriter; +import org.glassfish.jersey.netty.httpserver.HugeEntityTest.TestEntity; +import org.glassfish.jersey.server.ResourceConfig; +import org.glassfish.jersey.test.JerseyTest; +import org.junit.Assert; +import org.junit.Test; + +/** + * Tests the parallel execution of multiple requests. + * + * @author Stepan Kopriva + */ +public class ParallelTest extends JerseyTest { + private static final Logger LOGGER = Logger.getLogger(ParallelTest.class.getName()); + + private static final int PARALLEL_CLIENTS = 10; + private static final String PATH = "test"; + private static final AtomicInteger receivedCounter = new AtomicInteger(0); + private static final AtomicInteger resourceCounter = new AtomicInteger(0); + private static final CyclicBarrier startBarrier = new CyclicBarrier(PARALLEL_CLIENTS + 1); + private static final CountDownLatch doneLatch = new CountDownLatch(PARALLEL_CLIENTS); + + public ParallelTest() { + super(new NettyTestContainerFactory()); + } + + @Path(PATH) + public static class MyResource { + + @GET + public String get() { + sleep(); + resourceCounter.addAndGet(1); + return "GET"; + } + + /** + * If connections are leaked, this test will fail. + * Validates https://github.com/eclipse-ee4j/jersey/issues/3568 + * + * @return + */ + @POST + @Produces(MediaType.TEXT_PLAIN) + public Response post(InputStream in) throws IOException { + sleep(); + resourceCounter.addAndGet(1); + return Response.ok(new ByteArrayInputStream(new byte[Helper.ONE_MB_IN_BYTES]), MediaType.TEXT_PLAIN).build(); + } + + private void sleep() { + try { + Thread.sleep(10); + } catch (InterruptedException ex) { + Logger.getLogger(ParallelTest.class.getName()).log(Level.SEVERE, null, ex); + } + } + } + + @Override + protected Application configure() { + return new ResourceConfig(MyResource.class); + } + + @Override + protected void configureClient(ClientConfig config) { + config.register(TestEntityWriter.class); + config.property(ClientProperties.REQUEST_ENTITY_PROCESSING, RequestEntityProcessing.CHUNKED); + config.connectorProvider(new NettyConnectorProvider()); + } + + @Test + public void testParallel() throws BrokenBarrierException, InterruptedException, TimeoutException, IOException { + final ScheduledExecutorService executor = Executors.newScheduledThreadPool(PARALLEL_CLIENTS); + try { + final WebTarget target = target(); + for (int i = 1; i <= PARALLEL_CLIENTS; i++) { + final int id = i; + executor.submit(new Runnable() { + @Override + public void run() { + try { + startBarrier.await(); + Response response; + response = target.path(PATH).request().post(Entity.entity( + new TestEntity(Helper.ONE_MB_IN_BYTES), MediaType.APPLICATION_OCTET_STREAM_TYPE)); + assertEquals(Helper.ONE_MB_IN_BYTES, + Helper.drainAndCountInputStream(response.readEntity(InputStream.class))); + receivedCounter.incrementAndGet(); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + LOGGER.log(Level.WARNING, "Client thread " + id + " interrupted.", ex); + } catch (BrokenBarrierException ex) { + LOGGER.log(Level.INFO, "Client thread " + id + " failed on broken barrier.", ex); + } catch (Throwable t) { + t.printStackTrace(); + LOGGER.log(Level.WARNING, "Client thread " + id + " failed on unexpected exception.", t); + } finally { + doneLatch.countDown(); + } + } + }); + } + + startBarrier.await(1, TimeUnit.SECONDS); + + assertTrue("Waiting for clients to finish has timed out.", doneLatch.await(5 * getAsyncTimeoutMultiplier(), + TimeUnit.SECONDS)); + + assertEquals("Resource counter", PARALLEL_CLIENTS, resourceCounter.get()); + + assertEquals("Received counter", PARALLEL_CLIENTS, receivedCounter.get()); + } finally { + executor.shutdownNow(); + Assert.assertTrue("Executor termination", executor.awaitTermination(5, TimeUnit.SECONDS)); + } + } +}