From b9772316b8d175a36a479f0003031d28c5309163 Mon Sep 17 00:00:00 2001 From: Jorge Bescos Gascon Date: Thu, 8 Jul 2021 10:48:49 +0200 Subject: [PATCH] Pending requests is accessed concurrently Signed-off-by: Jorge Bescos Gascon --- .../internal/DestinationConnectionPool.java | 5 +- .../jdk/connector/internal/StressTest.java | 84 ++++++++----------- 2 files changed, 35 insertions(+), 54 deletions(-) diff --git a/connectors/jdk-connector/src/main/java/org/glassfish/jersey/jdk/connector/internal/DestinationConnectionPool.java b/connectors/jdk-connector/src/main/java/org/glassfish/jersey/jdk/connector/internal/DestinationConnectionPool.java index 2ef9b9c4dfd..11319253b7e 100644 --- a/connectors/jdk-connector/src/main/java/org/glassfish/jersey/jdk/connector/internal/DestinationConnectionPool.java +++ b/connectors/jdk-connector/src/main/java/org/glassfish/jersey/jdk/connector/internal/DestinationConnectionPool.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2015, 2019 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2015, 2021 Oracle and/or its affiliates. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v. 2.0, which is available at @@ -20,7 +20,6 @@ import java.net.CookieManager; import java.net.URI; import java.util.Collections; -import java.util.HashMap; import java.util.Map; import java.util.Queue; import java.util.Set; @@ -37,7 +36,7 @@ class DestinationConnectionPool { private final Queue idleConnections = new ConcurrentLinkedDeque<>(); private final Set connections = Collections.newSetFromMap(new ConcurrentHashMap<>()); private final Queue pendingRequests = new ConcurrentLinkedDeque<>(); - private final Map requestsInProgress = new HashMap<>(); + private final Map requestsInProgress = new ConcurrentHashMap<>(); private final CookieManager cookieManager; private final ScheduledExecutorService scheduler; private final ConnectionStateListener connectionStateListener; diff --git a/connectors/jdk-connector/src/test/java/org/glassfish/jersey/jdk/connector/internal/StressTest.java b/connectors/jdk-connector/src/test/java/org/glassfish/jersey/jdk/connector/internal/StressTest.java index 40d5bc9fff7..2b6c9d9691a 100644 --- a/connectors/jdk-connector/src/test/java/org/glassfish/jersey/jdk/connector/internal/StressTest.java +++ b/connectors/jdk-connector/src/test/java/org/glassfish/jersey/jdk/connector/internal/StressTest.java @@ -19,10 +19,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import java.net.URI; import java.util.ArrayList; import java.util.List; -import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -30,16 +28,14 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLParameters; import javax.ws.rs.GET; import javax.ws.rs.Path; import javax.ws.rs.core.Application; import javax.ws.rs.core.Response; -import javax.ws.rs.core.UriBuilder; -import org.glassfish.jersey.SslConfigurator; import org.glassfish.jersey.client.ClientConfig; +import org.glassfish.jersey.internal.guava.ThreadFactoryBuilder; +import org.glassfish.jersey.jdk.connector.JdkConnectorProperties; import org.glassfish.jersey.jdk.connector.JdkConnectorProvider; import org.glassfish.jersey.server.ResourceConfig; import org.glassfish.jersey.test.JerseyTest; @@ -48,44 +44,25 @@ public class StressTest extends JerseyTest { - private static final int SSL_PORT = 29999; - private static final int ITERATIONS = 10000; - // Must be less than the server thread pool. Is there any way to get that value?. - private static final int N_REQUESTS = 10; - private static final SSLContext SSL_CONTEXT; - private static final ExecutorService executor = Executors.newFixedThreadPool(N_REQUESTS); + private static final int PARALLELISM = 10; + private static final int IDLE_TIMEOUT = 50; + private static final int ITERATIONS = 1000; private static CountDownLatch requests; private static CountDownLatch latch; - static { - System.setProperty("javax.net.ssl.keyStore", SslFilterTest.class.getResource("/keystore_server").getPath()); - System.setProperty("javax.net.ssl.keyStorePassword", "asdfgh"); - System.setProperty("javax.net.ssl.trustStore", SslFilterTest.class.getResource("/truststore_server").getPath()); - System.setProperty("javax.net.ssl.trustStorePassword", "asdfgh"); - try { - SSL_CONTEXT = SslConfigurator.newInstance() - .trustStoreFile(SslFilterTest.class.getResource("/truststore_server").getPath()) - .trustStorePassword("asdfgh") - .keyStoreFile(SslFilterTest.class.getResource("/keystore_server").getPath()) - .keyStorePassword("asdfgh").createSSLContext(); - } catch (Exception e) { - e.printStackTrace(); - throw new IllegalStateException(e); - } - } - @Path("/test") public static class TestResource { @GET @Path("/1") public String test1() throws InterruptedException { requests.countDown(); - if (latch.await(100, TimeUnit.SECONDS)) { + if (latch.await(10, TimeUnit.SECONDS)) { return "test1"; } else { throw new IllegalStateException("Timeout"); } } + @GET @Path("/2") public String test2() { @@ -102,49 +79,41 @@ protected Application configure() { @Override protected void configureClient(ClientConfig config) { + config.property(JdkConnectorProperties.MAX_CONNECTIONS_PER_DESTINATION, PARALLELISM); + config.property(JdkConnectorProperties.CONNECTION_IDLE_TIMEOUT, IDLE_TIMEOUT); config.connectorProvider(new JdkConnectorProvider()); } - @Override - protected Optional getSslContext() { - return Optional.of(SSL_CONTEXT); - } - - @Override - protected Optional getSslParameters() { - return Optional.of(SSL_CONTEXT.createSSLEngine("localhost", SSL_PORT).getSSLParameters()); - } - - @Override - protected URI getBaseUri() { - return UriBuilder.fromUri("https://localhost/").port(SSL_PORT).build(); - } - @Test public void hangAllRequestsStatus200() throws InterruptedException, ExecutionException { - assertEquals("https", getBaseUri().getScheme()); + ExecutorService executor = Executors.newFixedThreadPool(PARALLELISM, + new ThreadFactoryBuilder().setNameFormat("client-%d").build()); for (int i = 0; i < ITERATIONS; i++) { - requests = new CountDownLatch(N_REQUESTS); + requests = new CountDownLatch(PARALLELISM); latch = new CountDownLatch(1); List> responses = new ArrayList<>(); - for (int j = 0; j < N_REQUESTS; j++) { + for (int j = 0; j < PARALLELISM; j++) { Future future = executor.submit(() -> target("/test/1").request().get()); responses.add(future); } - assertTrue(requests.await(100, TimeUnit.SECONDS)); + assertTrue(requests.await(20, TimeUnit.SECONDS)); latch.countDown(); for (Future response : responses) { assertEquals(200, response.get().getStatus()); } } + executor.shutdown(); + assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS)); } @Test public void randomnessStatus200() throws InterruptedException, ExecutionException { - assertEquals("https", getBaseUri().getScheme()); + ExecutorService executor = Executors.newFixedThreadPool(PARALLELISM, + new ThreadFactoryBuilder().setNameFormat("client-%d").build()); for (int i = 0; i < ITERATIONS; i++) { + System.out.println("Iteration " + i); List> responses = new ArrayList<>(); - for (int j = 0; j < N_REQUESTS; j++) { + for (int j = 0; j < 100; j++) { Future future = executor.submit(() -> target("/test/2").request().get()); responses.add(future); } @@ -152,6 +121,19 @@ public void randomnessStatus200() throws InterruptedException, ExecutionExceptio assertEquals(200, response.get().getStatus()); } } + executor.shutdown(); + assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS)); } + @Test + public void syncTest() { + Response response = target("/test/2").request().get(); + assertEquals(200, response.getStatus()); + } + + @Test + public void asyncTest() throws InterruptedException, ExecutionException { + Future response = target("/test/2").request().async().get(); + assertEquals(200, response.get().getStatus()); + } }