Skip to content

Commit

Permalink
Ensure BufferedResponse type is not caching un-released buffer (Azure…
Browse files Browse the repository at this point in the history
…#7837)

* Making BufferedResponse to do deep caching (it was doing shallow caching)

* Make BufferedResponse really buffer and updating decoder to not to use BufferedResponse

* Update sdk/core/azure-core/src/main/java/com/azure/core/implementation/http/BufferedHttpResponse.java

Co-Authored-By: Jonathan Giles <jonathan@jonathangiles.net>

* Update sdk/core/azure-core/src/main/java/com/azure/core/implementation/http/BufferedHttpResponse.java

Co-Authored-By: Jonathan Giles <jonathan@jonathangiles.net>

Co-authored-by: Jonathan Giles <jonathan@jonathangiles.net>
  • Loading branch information
2 people authored and srnagar committed Feb 4, 2020
1 parent 58c75b8 commit c02098b
Show file tree
Hide file tree
Showing 6 changed files with 131 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ private Mono<HttpDecodedResponse> ensureExpectedStatus(final HttpDecodedResponse
//
asyncResult = bodyAsString.flatMap((Function<String, Mono<HttpDecodedResponse>>) responseContent -> {
// bodyAsString() emits non-empty string, now look for decoded version of same string
Mono<Object> decodedErrorBody = decodedResponse.getDecodedBody();
Mono<Object> decodedErrorBody = decodedResponse.getDecodedBody(responseContent);
//
return decodedErrorBody
.flatMap((Function<Object, Mono<HttpDecodedResponse>>) responseDecodedErrorObject -> {
Expand Down Expand Up @@ -506,7 +506,7 @@ private Mono<?> handleBodyReturnType(final HttpDecodedResponse response,
asyncResult = Mono.just(response.getSourceResponse().getBody());
} else {
// Mono<Object> or Mono<Page<T>>
asyncResult = response.getDecodedBody();
asyncResult = response.getDecodedBody(null);
}
return asyncResult;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
* REST response with a streaming content.
*/
public final class StreamResponse extends SimpleResponse<Flux<ByteBuffer>> implements Closeable {
private volatile boolean consumed;

/**
* Creates a {@link StreamResponse}.
*
Expand All @@ -32,18 +34,19 @@ public StreamResponse(HttpRequest request, int statusCode, HttpHeaders headers,
*/
@Override
public Flux<ByteBuffer> getValue() {
return super.getValue();
return super.getValue().doFinally(t -> this.consumed = true);
}

/**
* Disposes the connection associated with this {@link StreamResponse}.
*/
@Override
public void close() {
final Flux<ByteBuffer> value = getValue();

if (value != null) {
value.subscribe().dispose();
if (this.consumed) {
return;
}
this.consumed = true;
final Flux<ByteBuffer> value = getValue();
value.subscribe().dispose();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ public final class BufferedHttpResponse extends HttpResponse {
public BufferedHttpResponse(HttpResponse innerHttpResponse) {
super(innerHttpResponse.getRequest());
this.innerHttpResponse = innerHttpResponse;
this.cachedBody = innerHttpResponse.getBody().cache();
this.cachedBody = FluxUtil.collectBytesInByteBufferStream(innerHttpResponse.getBody())
.map(ByteBuffer::wrap)
.flux()
.cache();
}

@Override
Expand All @@ -53,7 +56,7 @@ public Flux<ByteBuffer> getBody() {

@Override
public Mono<byte[]> getBodyAsByteArray() {
return FluxUtil.collectBytesInByteBufferStream(cachedBody.map(ByteBuffer::duplicate));
return cachedBody.next().map(ByteBuffer::array);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,20 +41,25 @@ final class HttpResponseBodyDecoder {
* The content reading and decoding happens when caller subscribe to the returned {@code Mono<Object>},
* if the response body is not decodable then {@code Mono.empty()} will be returned.
*
* @param body the response body to decode, null for this parameter
* indicate read body from {@code httpResponse} parameter and decode it.
* @param httpResponse the response containing the body to be decoded
* @param serializer the adapter to use for decoding
* @param decodeData the necessary data required to decode a Http response
* @return publisher that emits decoded response body upon subscription if body is decodable,
* no emission if the body is not-decodable
*/
static Mono<Object> decode(HttpResponse httpResponse, SerializerAdapter serializer,
static Mono<Object> decode(String body, HttpResponse httpResponse, SerializerAdapter serializer,
HttpResponseDecodeData decodeData) {
ensureRequestSet(httpResponse);
final ClientLogger logger = new ClientLogger(HttpResponseBodyDecoder.class);
//
return Mono.defer(() -> {
if (isErrorStatus(httpResponse, decodeData)) {
return httpResponse.getBodyAsString()
Mono<String> bodyMono = body == null
? httpResponse.getBodyAsString()
: Mono.just(body);
return bodyMono
.flatMap(bodyString -> {
try {
final Object decodedErrorEntity = deserializeBody(bodyString,
Expand All @@ -74,7 +79,10 @@ static Mono<Object> decode(HttpResponse httpResponse, SerializerAdapter serializ
} else if (!isReturnTypeDecodable(decodeData)) {
return Mono.empty();
} else {
return httpResponse.getBodyAsString()
Mono<String> bodyMono = body == null
? httpResponse.getBodyAsString()
: Mono.just(body);
return bodyMono
.flatMap(bodyString -> {
try {
final Object decodedSuccessEntity = deserializeBody(bodyString,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,7 @@ public static final class HttpDecodedResponse implements Closeable {
*/
HttpDecodedResponse(final HttpResponse response, SerializerAdapter serializer,
HttpResponseDecodeData decodeData) {
if (HttpResponseBodyDecoder.isDecodable(response, decodeData)) {
this.response = response.buffer();
} else {
this.response = response;
}
this.response = response;
this.serializer = serializer;
this.decodeData = decodeData;
}
Expand All @@ -81,11 +77,15 @@ public HttpResponse getSourceResponse() {
* and emitted. {@code Mono.empty()} gets emitted if the content is not
* decodable.
*
* @param body the response body to decode, null for this parameter
* indicate read body from source response and decode it.
*
* @return publisher that emits decoded http content
*/
public Mono<Object> getDecodedBody() {
public Mono<Object> getDecodedBody(String body) {
if (this.bodyCached == null) {
this.bodyCached = HttpResponseBodyDecoder.decode(this.response,
this.bodyCached = HttpResponseBodyDecoder.decode(body,
this.response,
this.serializer,
this.decodeData).cache();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.implementation.http;

import com.azure.core.http.HttpHeaders;
import com.azure.core.http.HttpMethod;
import com.azure.core.http.HttpRequest;
import com.azure.core.http.HttpResponse;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import reactor.util.function.Tuple2;

import java.nio.ByteBuffer;
import java.nio.charset.Charset;

import static org.junit.jupiter.api.Assertions.assertFalse;

public class HttpResponseTests {
@Test
public void testBufferedResponseSubscribeOnceAndDoDeepCopy() {
// A source Response that throws if body is subscribed more than once.
SelfDisposedHttpResponse sourceHttpResponse = new SelfDisposedHttpResponse();
// A Buffered response based on source response.
Flux<ByteBuffer> bufferedContentFlux = sourceHttpResponse.buffer().getBody();
Flux<Tuple2<ByteBuffer, ByteBuffer>> zipped
= bufferedContentFlux.zipWith(sourceHttpResponse.getInnerContentFlux());
// Validate that buffered Response is not replaying source Response body.
StepVerifier.create(zipped)
.thenConsumeWhile(o -> {
assertFalse(o.getT1() == o.getT2(),
"Buffered response should not cache shallow copy of source.");
return true;
})
.verifyComplete();
}

// A Type to mimic Response with body content released/disposed as it consumed
private static class SelfDisposedHttpResponse extends HttpResponse {
private final Mono<ByteBuffer> contentMono;
private final HttpHeaders headers = new HttpHeaders();
private volatile boolean consumed;

protected SelfDisposedHttpResponse() {
super(new HttpRequest(HttpMethod.GET, "http://localhost"));
this.contentMono = Mono.just(ByteBuffer.wrap("long_long_content".getBytes()));
}

Flux<ByteBuffer> getInnerContentFlux() {
return this.contentMono.flux();
}

@Override
public int getStatusCode() {
return 200;
}

@Override
public String getHeaderValue(String name) {
return null;
}

@Override
public HttpHeaders getHeaders() {
return headers;
}

@Override
public Flux<ByteBuffer> getBody() {
return this.contentMono
.doOnNext(bb -> {
// This ensure BufferedHttpResponse subscribes only once.
assertFalse(consumed, "content is already consumed");
consumed = true;
})
.flux();
}

@Override
public Mono<byte[]> getBodyAsByteArray() {
return this.getBody()
.map(bb -> new byte[bb.remaining()])
.next();
}

@Override
public Mono<String> getBodyAsString() {
throw new RuntimeException("Not implemented");
}

@Override
public Mono<String> getBodyAsString(Charset charset) {
throw new RuntimeException("Not implemented");
}
}
}

0 comments on commit c02098b

Please sign in to comment.