Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issues with ChunkedInputStream when using Apache Connector #4338

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,15 @@ public final class ApacheClientProperties {
*/
public static final String KEEPALIVE_STRATEGY = "jersey.config.apache.client.keepAliveStrategy";


/**
* Strategy that closes the Apache Connection. Accepts an instance of {@link ApacheConnectionClosingStrategy}.
*
* @see ApacheConnectionClosingStrategy
* @since 2.30
*/
public static final String CONNECTION_CLOSING_STRATEGY = "jersey.config.apache.client.connectionClosingStrategy";

/**
* Get the value of the specified property.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright (c) 2019 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
* http://www.eclipse.org/legal/epl-2.0.
*
* This Source Code may also be made available under the following Secondary
* Licenses when the conditions for such availability set forth in the
* Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
* version 2 with the GNU Classpath Exception, which is available at
* https://www.gnu.org/software/classpath/license.html.
*
* SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
*/

package org.glassfish.jersey.apache.connector;

import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpUriRequest;
import org.glassfish.jersey.client.ClientRequest;

import java.io.IOException;
import java.io.InputStream;

/**
* /**
* Strategy that defines the way the Apache client releases resources. The client enables closing the content stream
* and the response. From the Apache documentation:
* <pre>
* The difference between closing the content stream and closing the response is that
* the former will attempt to keep the underlying connection alive by consuming the
* entity content while the latter immediately shuts down and discards the connection.
* </pre>
* With Apache Client before 4.5.1, it was ok to close the response and the content stream. This is the default for
* Apache Client 4.5 and older.
* <p/>
* For Apache Client 4.5.1+, first the content stream and the response is should be closed.
* <p/>
* In the case of Chunk content stream, the stream is not closed on the server side, and the client can hung on reading
* the closing chunk. Using the {@link org.glassfish.jersey.client.ClientProperties#READ_TIMEOUT} property can prevent
* this hanging forever and the reading of the closing chunk is terminated when the time is out. The other option, when
* the timeout is not set, is to abort the Apache client request. This is the default for Apache Client 4.5.1+ when the
* read timeout is not set.
* <p/>
* Another option is not to close the content stream, which is possible by the Apache client documentation. In this case,
* however, the server side may not be notified and would not not close its chunk stream.
*/
public interface ApacheConnectionClosingStrategy {
/**
* Method to close the connection.
* @param clientRequest The {@link ClientRequest} to get {@link ClientRequest#getConfiguration() configuration},
* and {@link ClientRequest#resolveProperty(String, Class) resolve properties}.
* @param request Apache {@code HttpUriRequest} that can be {@code abort}ed.
* @param response Apache {@code CloseableHttpResponse} that can be {@code close}d.
* @param stream The entity stream that can be {@link InputStream#close() closed}.
* @throws IOException In case of some of the closing methods throws {@link IOException}
*/
void close(ClientRequest clientRequest, HttpUriRequest request, CloseableHttpResponse response, InputStream stream)
throws IOException;

/**
* Strategy that aborts Apache HttpRequests for the case of Chunked Stream, closes the stream, and response next.
*/
class GracefulClosingStrategy implements ApacheConnectionClosingStrategy {
static final GracefulClosingStrategy INSTANCE = new GracefulClosingStrategy();

@Override
public void close(ClientRequest clientRequest, HttpUriRequest request, CloseableHttpResponse response, InputStream stream)
throws IOException {
if (response.getEntity() != null && response.getEntity().isChunked()) {
request.abort();
}
try {
stream.close();
} catch (IOException ex) {
// Ignore
} finally {
response.close();
}
}
}

/**
* Strategy that closes the response and content stream next. This is a behaviour of Jersey 2.28.
*/
class ImmediateClosingStrategy implements ApacheConnectionClosingStrategy {
static final ImmediateClosingStrategy INSTANCE = new ImmediateClosingStrategy();

@Override
public void close(ClientRequest clientRequest, HttpUriRequest request, CloseableHttpResponse response, InputStream stream)
throws IOException {
response.close();
stream.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,15 @@
import java.io.OutputStream;
import java.net.URI;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;

import javax.ws.rs.ProcessingException;
import javax.ws.rs.client.Client;
Expand Down Expand Up @@ -173,7 +174,6 @@
class ApacheConnector implements Connector {

private static final Logger LOGGER = Logger.getLogger(ApacheConnector.class.getName());

private static final VersionInfo vi;
private static final String release;

Expand Down Expand Up @@ -325,14 +325,16 @@ class ApacheConnector implements Connector {
}
clientBuilder.setDefaultRequestConfig(requestConfig);

Optional<Object> contract = config.getInstances().stream()
.filter(a -> ApacheHttpClientBuilderConfigurator.class.isInstance(a)).findFirst();
LinkedList<Object> contracts = config.getInstances().stream()
.filter(ApacheHttpClientBuilderConfigurator.class::isInstance)
.collect(Collectors.toCollection(LinkedList::new));

final HttpClientBuilder configuredBuilder = contract.isPresent()
? ((ApacheHttpClientBuilderConfigurator) contract.get()).configure(clientBuilder)
: null;
HttpClientBuilder configuredBuilder = clientBuilder;
for (Object configurator : contracts) {
configuredBuilder = ((ApacheHttpClientBuilderConfigurator) configurator).configure(configuredBuilder);
}

this.client = configuredBuilder != null ? configuredBuilder.build() : clientBuilder.build();
this.client = configuredBuilder.build();
}

private HttpClientConnectionManager getConnectionManager(final Client client,
Expand Down Expand Up @@ -515,7 +517,8 @@ public ClientResponse apply(final ClientRequest clientRequest) throws Processing
}

try {
responseContext.setEntityStream(getInputStream(response));
final ConnectionClosingMechanism closingMechanism = new ConnectionClosingMechanism(clientRequest, request);
responseContext.setEntityStream(getInputStream(response, closingMechanism));
} catch (final IOException e) {
LOGGER.log(Level.SEVERE, null, e);
}
Expand Down Expand Up @@ -655,8 +658,8 @@ private static Map<String, String> writeOutBoundHeaders(final ClientRequest clie
return stringHeaders;
}

private static InputStream getInputStream(final CloseableHttpResponse response) throws IOException {

private static InputStream getInputStream(final CloseableHttpResponse response,
final ConnectionClosingMechanism closingMechanism) throws IOException {
final InputStream inputStream;

if (response.getEntity() == null) {
Expand All @@ -670,18 +673,57 @@ private static InputStream getInputStream(final CloseableHttpResponse response)
}
}

return new FilterInputStream(inputStream) {
@Override
public void close() throws IOException {
try {
super.close();
} catch (IOException ex) {
// Ignore
} finally {
response.close();
return closingMechanism.getEntityStream(inputStream, response);
}

/**
* The way the Apache CloseableHttpResponse is to be closed.
* See https://github.com/eclipse-ee4j/jersey/issues/4321
* {@link ApacheClientProperties#CONNECTION_CLOSING_STRATEGY}
*/
private final class ConnectionClosingMechanism {
private ApacheConnectionClosingStrategy connectionClosingStrategy = null;
private final ClientRequest clientRequest;
private final HttpUriRequest apacheRequest;

private ConnectionClosingMechanism(ClientRequest clientRequest, HttpUriRequest apacheRequest) {
this.clientRequest = clientRequest;
this.apacheRequest = apacheRequest;
Object closingStrategyProperty = clientRequest
.resolveProperty(ApacheClientProperties.CONNECTION_CLOSING_STRATEGY, Object.class);
if (closingStrategyProperty != null) {
if (ApacheConnectionClosingStrategy.class.isInstance(closingStrategyProperty)) {
connectionClosingStrategy = (ApacheConnectionClosingStrategy) closingStrategyProperty;
} else {
LOGGER.log(
Level.WARNING,
LocalizationMessages.IGNORING_VALUE_OF_PROPERTY(
ApacheClientProperties.CONNECTION_CLOSING_STRATEGY,
closingStrategyProperty,
ApacheConnectionClosingStrategy.class.getName())
);
}
}
};

if (connectionClosingStrategy == null) {
if (vi.getRelease().compareTo("4.5") > 0) {
connectionClosingStrategy = ApacheConnectionClosingStrategy.GracefulClosingStrategy.INSTANCE;
} else {
connectionClosingStrategy = ApacheConnectionClosingStrategy.ImmediateClosingStrategy.INSTANCE;
}
}
}

private InputStream getEntityStream(final InputStream inputStream,
final CloseableHttpResponse response) {
InputStream filterStream = new FilterInputStream(inputStream) {
@Override
public void close() throws IOException {
connectionClosingStrategy.close(clientRequest, apacheRequest, response, in);
}
};
return filterStream;
}
}

private static class ConnectionFactory extends ManagedHttpClientConnectionFactory {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@

import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.atomic.AtomicInteger;

import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.client.Invocation;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Application;
import javax.ws.rs.core.MediaType;
Expand Down Expand Up @@ -49,21 +51,13 @@ public class StreamingTest extends JerseyTest {
* Test that a data stream can be terminated from the client side.
*/
@Test
public void clientCloseTest() throws IOException {
// start streaming
InputStream inputStream = target().path("/streamingEndpoint").request()
.property(ClientProperties.READ_TIMEOUT, 1_000).get(InputStream.class);
public void clientCloseNoTimeoutTest() throws IOException {
clientCloseTest(-1);
}

WebTarget sendTarget = target().path("/streamingEndpoint/send");
// trigger sending 'A' to the stream; OK is sent if everything on the server was OK
assertEquals("OK", sendTarget.request().get().readEntity(String.class));
// check 'A' has been sent
assertEquals('A', inputStream.read());
// closing the stream should tear down the connection
inputStream.close();
// trigger sending another 'A' to the stream; it should fail
// (indicating that the streaming has been terminated on the server)
assertEquals("NOK", sendTarget.request().get().readEntity(String.class));
@Test
public void clientCloseWithTimeOutTest() throws IOException {
clientCloseTest(1_000);
}

/**
Expand Down Expand Up @@ -103,6 +97,43 @@ protected Application configure() {
return new ResourceConfig(StreamingEndpoint.class);
}

/**
* Test that a data stream can be terminated from the client side.
*/
private void clientCloseTest(int readTimeout) throws IOException {
// start streaming
AtomicInteger counter = new AtomicInteger(0);
Invocation.Builder builder = target().path("/streamingEndpoint").request();
if (readTimeout > -1) {
counter.set(1);
builder.property(ClientProperties.READ_TIMEOUT, readTimeout);
builder.property(ApacheClientProperties.CONNECTION_CLOSING_STRATEGY,
(ApacheConnectionClosingStrategy) (config, request, response, stream) -> {
try {
stream.close();
} catch (Exception e) {
// timeout, no chunk ending
} finally {
counter.set(0);
response.close();
}
});
}
InputStream inputStream = builder.get(InputStream.class);

WebTarget sendTarget = target().path("/streamingEndpoint/send");
// trigger sending 'A' to the stream; OK is sent if everything on the server was OK
assertEquals("OK", sendTarget.request().get().readEntity(String.class));
// check 'A' has been sent
assertEquals('A', inputStream.read());
// closing the stream should tear down the connection
inputStream.close();
// trigger sending another 'A' to the stream; it should fail
// (indicating that the streaming has been terminated on the server)
assertEquals("NOK", sendTarget.request().get().readEntity(String.class));
assertEquals(0, counter.get());
}

@Singleton
@Path("streamingEndpoint")
public static class StreamingEndpoint {
Expand Down
Loading