diff --git a/sdk/core/azure-core/src/main/java/com/azure/core/http/rest/RestProxy.java b/sdk/core/azure-core/src/main/java/com/azure/core/http/rest/RestProxy.java index a23a4bc1cf9a..3fad2a3f49d2 100644 --- a/sdk/core/azure-core/src/main/java/com/azure/core/http/rest/RestProxy.java +++ b/sdk/core/azure-core/src/main/java/com/azure/core/http/rest/RestProxy.java @@ -390,7 +390,7 @@ private Mono ensureExpectedStatus(final HttpDecodedResponse // asyncResult = bodyAsString.flatMap((Function>) responseContent -> { // bodyAsString() emits non-empty string, now look for decoded version of same string - Mono decodedErrorBody = decodedResponse.getDecodedBody(); + Mono decodedErrorBody = decodedResponse.getDecodedBody(responseContent); // return decodedErrorBody .flatMap((Function>) responseDecodedErrorObject -> { @@ -506,7 +506,7 @@ private Mono handleBodyReturnType(final HttpDecodedResponse response, asyncResult = Mono.just(response.getSourceResponse().getBody()); } else { // Mono or Mono> - asyncResult = response.getDecodedBody(); + asyncResult = response.getDecodedBody(null); } return asyncResult; } diff --git a/sdk/core/azure-core/src/main/java/com/azure/core/http/rest/StreamResponse.java b/sdk/core/azure-core/src/main/java/com/azure/core/http/rest/StreamResponse.java index bd55b51eda12..8da0b68c1321 100644 --- a/sdk/core/azure-core/src/main/java/com/azure/core/http/rest/StreamResponse.java +++ b/sdk/core/azure-core/src/main/java/com/azure/core/http/rest/StreamResponse.java @@ -13,6 +13,8 @@ * REST response with a streaming content. */ public final class StreamResponse extends SimpleResponse> implements Closeable { + private volatile boolean consumed; + /** * Creates a {@link StreamResponse}. * @@ -32,7 +34,7 @@ public StreamResponse(HttpRequest request, int statusCode, HttpHeaders headers, */ @Override public Flux getValue() { - return super.getValue(); + return super.getValue().doFinally(t -> this.consumed = true); } /** @@ -40,10 +42,11 @@ public Flux getValue() { */ @Override public void close() { - final Flux value = getValue(); - - if (value != null) { - value.subscribe().dispose(); + if (this.consumed) { + return; } + this.consumed = true; + final Flux value = getValue(); + value.subscribe().dispose(); } } diff --git a/sdk/core/azure-core/src/main/java/com/azure/core/implementation/http/BufferedHttpResponse.java b/sdk/core/azure-core/src/main/java/com/azure/core/implementation/http/BufferedHttpResponse.java index 532f5a9e274e..8e7fd42f9547 100644 --- a/sdk/core/azure-core/src/main/java/com/azure/core/implementation/http/BufferedHttpResponse.java +++ b/sdk/core/azure-core/src/main/java/com/azure/core/implementation/http/BufferedHttpResponse.java @@ -28,7 +28,10 @@ public final class BufferedHttpResponse extends HttpResponse { public BufferedHttpResponse(HttpResponse innerHttpResponse) { super(innerHttpResponse.getRequest()); this.innerHttpResponse = innerHttpResponse; - this.cachedBody = innerHttpResponse.getBody().cache(); + this.cachedBody = FluxUtil.collectBytesInByteBufferStream(innerHttpResponse.getBody()) + .map(ByteBuffer::wrap) + .flux() + .cache(); } @Override @@ -53,7 +56,7 @@ public Flux getBody() { @Override public Mono getBodyAsByteArray() { - return FluxUtil.collectBytesInByteBufferStream(cachedBody.map(ByteBuffer::duplicate)); + return cachedBody.next().map(ByteBuffer::array); } @Override diff --git a/sdk/core/azure-core/src/main/java/com/azure/core/implementation/serializer/HttpResponseBodyDecoder.java b/sdk/core/azure-core/src/main/java/com/azure/core/implementation/serializer/HttpResponseBodyDecoder.java index 9381bbfe38a9..036e0618dc28 100644 --- a/sdk/core/azure-core/src/main/java/com/azure/core/implementation/serializer/HttpResponseBodyDecoder.java +++ b/sdk/core/azure-core/src/main/java/com/azure/core/implementation/serializer/HttpResponseBodyDecoder.java @@ -41,20 +41,25 @@ final class HttpResponseBodyDecoder { * The content reading and decoding happens when caller subscribe to the returned {@code Mono}, * if the response body is not decodable then {@code Mono.empty()} will be returned. * + * @param body the response body to decode, null for this parameter + * indicate read body from {@code httpResponse} parameter and decode it. * @param httpResponse the response containing the body to be decoded * @param serializer the adapter to use for decoding * @param decodeData the necessary data required to decode a Http response * @return publisher that emits decoded response body upon subscription if body is decodable, * no emission if the body is not-decodable */ - static Mono decode(HttpResponse httpResponse, SerializerAdapter serializer, + static Mono decode(String body, HttpResponse httpResponse, SerializerAdapter serializer, HttpResponseDecodeData decodeData) { ensureRequestSet(httpResponse); final ClientLogger logger = new ClientLogger(HttpResponseBodyDecoder.class); // return Mono.defer(() -> { if (isErrorStatus(httpResponse, decodeData)) { - return httpResponse.getBodyAsString() + Mono bodyMono = body == null + ? httpResponse.getBodyAsString() + : Mono.just(body); + return bodyMono .flatMap(bodyString -> { try { final Object decodedErrorEntity = deserializeBody(bodyString, @@ -74,7 +79,10 @@ static Mono decode(HttpResponse httpResponse, SerializerAdapter serializ } else if (!isReturnTypeDecodable(decodeData)) { return Mono.empty(); } else { - return httpResponse.getBodyAsString() + Mono bodyMono = body == null + ? httpResponse.getBodyAsString() + : Mono.just(body); + return bodyMono .flatMap(bodyString -> { try { final Object decodedSuccessEntity = deserializeBody(bodyString, diff --git a/sdk/core/azure-core/src/main/java/com/azure/core/implementation/serializer/HttpResponseDecoder.java b/sdk/core/azure-core/src/main/java/com/azure/core/implementation/serializer/HttpResponseDecoder.java index a735a53fcefd..d3c82090b4df 100644 --- a/sdk/core/azure-core/src/main/java/com/azure/core/implementation/serializer/HttpResponseDecoder.java +++ b/sdk/core/azure-core/src/main/java/com/azure/core/implementation/serializer/HttpResponseDecoder.java @@ -60,11 +60,7 @@ public static final class HttpDecodedResponse implements Closeable { */ HttpDecodedResponse(final HttpResponse response, SerializerAdapter serializer, HttpResponseDecodeData decodeData) { - if (HttpResponseBodyDecoder.isDecodable(response, decodeData)) { - this.response = response.buffer(); - } else { - this.response = response; - } + this.response = response; this.serializer = serializer; this.decodeData = decodeData; } @@ -81,11 +77,15 @@ public HttpResponse getSourceResponse() { * and emitted. {@code Mono.empty()} gets emitted if the content is not * decodable. * + * @param body the response body to decode, null for this parameter + * indicate read body from source response and decode it. + * * @return publisher that emits decoded http content */ - public Mono getDecodedBody() { + public Mono getDecodedBody(String body) { if (this.bodyCached == null) { - this.bodyCached = HttpResponseBodyDecoder.decode(this.response, + this.bodyCached = HttpResponseBodyDecoder.decode(body, + this.response, this.serializer, this.decodeData).cache(); } diff --git a/sdk/core/azure-core/src/test/java/com/azure/core/implementation/http/HttpResponseTests.java b/sdk/core/azure-core/src/test/java/com/azure/core/implementation/http/HttpResponseTests.java new file mode 100644 index 000000000000..9e2682352dd5 --- /dev/null +++ b/sdk/core/azure-core/src/test/java/com/azure/core/implementation/http/HttpResponseTests.java @@ -0,0 +1,98 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.core.implementation.http; + +import com.azure.core.http.HttpHeaders; +import com.azure.core.http.HttpMethod; +import com.azure.core.http.HttpRequest; +import com.azure.core.http.HttpResponse; +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; +import reactor.util.function.Tuple2; + +import java.nio.ByteBuffer; +import java.nio.charset.Charset; + +import static org.junit.jupiter.api.Assertions.assertFalse; + +public class HttpResponseTests { + @Test + public void testBufferedResponseSubscribeOnceAndDoDeepCopy() { + // A source Response that throws if body is subscribed more than once. + SelfDisposedHttpResponse sourceHttpResponse = new SelfDisposedHttpResponse(); + // A Buffered response based on source response. + Flux bufferedContentFlux = sourceHttpResponse.buffer().getBody(); + Flux> zipped + = bufferedContentFlux.zipWith(sourceHttpResponse.getInnerContentFlux()); + // Validate that buffered Response is not replaying source Response body. + StepVerifier.create(zipped) + .thenConsumeWhile(o -> { + assertFalse(o.getT1() == o.getT2(), + "Buffered response should not cache shallow copy of source."); + return true; + }) + .verifyComplete(); + } + + // A Type to mimic Response with body content released/disposed as it consumed + private static class SelfDisposedHttpResponse extends HttpResponse { + private final Mono contentMono; + private final HttpHeaders headers = new HttpHeaders(); + private volatile boolean consumed; + + protected SelfDisposedHttpResponse() { + super(new HttpRequest(HttpMethod.GET, "http://localhost")); + this.contentMono = Mono.just(ByteBuffer.wrap("long_long_content".getBytes())); + } + + Flux getInnerContentFlux() { + return this.contentMono.flux(); + } + + @Override + public int getStatusCode() { + return 200; + } + + @Override + public String getHeaderValue(String name) { + return null; + } + + @Override + public HttpHeaders getHeaders() { + return headers; + } + + @Override + public Flux getBody() { + return this.contentMono + .doOnNext(bb -> { + // This ensure BufferedHttpResponse subscribes only once. + assertFalse(consumed, "content is already consumed"); + consumed = true; + }) + .flux(); + } + + @Override + public Mono getBodyAsByteArray() { + return this.getBody() + .map(bb -> new byte[bb.remaining()]) + .next(); + } + + @Override + public Mono getBodyAsString() { + throw new RuntimeException("Not implemented"); + } + + @Override + public Mono getBodyAsString(Charset charset) { + throw new RuntimeException("Not implemented"); + } + } +}