Skip to content

Commit

Permalink
Use Flowable<ByteBuffer> instead of Flowable<byte[]> (Azure#361)
Browse files Browse the repository at this point in the history
* byte[] -> ByteBuffer and experimental ByteBufAllocator

* Fix checkstyle

* Fix build errors. Use standard ByteBufAllocator.

* Add memory mapped upload test

* Add ByteBuffer splitting and read/write memory mapping

* Fix build errors

* Use ByteBuffer.remaining()
  • Loading branch information
RikkiGibson authored Feb 2, 2018
1 parent 47f7cd1 commit 1fbbfe3
Show file tree
Hide file tree
Showing 17 changed files with 229 additions and 127 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.reactivex.Single;

import java.io.IOException;
import java.nio.ByteBuffer;

public class MockHttpResponse extends HttpResponse {
private final static SerializerAdapter<?> serializer = new JacksonAdapter();
Expand Down Expand Up @@ -75,8 +76,8 @@ public Single<byte[]> bodyAsByteArrayAsync() {
}

@Override
public Flowable<byte[]> streamBodyAsync() {
return Flowable.just(byteArray);
public Flowable<ByteBuffer> streamBodyAsync() {
return Flowable.just(ByteBuffer.wrap(byteArray));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;

public class MockAzureHttpResponse extends HttpResponse {
private final static SerializerAdapter<?> serializer = new JacksonAdapter();
Expand Down Expand Up @@ -78,8 +79,8 @@ public Single<byte[]> bodyAsByteArrayAsync() {
}

@Override
public Flowable<byte[]> streamBodyAsync() {
return Flowable.just(bodyBytes);
public Flowable<ByteBuffer> streamBodyAsync() {
return Flowable.just(ByteBuffer.wrap(bodyBytes));
}

@Override
Expand Down
4 changes: 2 additions & 2 deletions client-runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-kqueue</artifactId>
<version>4.1.19.Final</version>
<version>${netty.version}</version>
<classifier>${os.detected.name}-${os.detected.arch}</classifier>
<scope>test</scope>
</dependency>
Expand All @@ -170,7 +170,7 @@
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
<version>4.1.19.Final</version>
<version>${netty.version}</version>
<classifier>${os.detected.name}-${os.detected.arch}</classifier>
<scope>test</scope>
</dependency>
Expand Down
21 changes: 6 additions & 15 deletions client-runtime/src/main/java/com/microsoft/rest/v2/RestProxy.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.microsoft.rest.v2.protocol.SerializerAdapter;
import com.microsoft.rest.v2.protocol.SerializerEncoding;
import com.microsoft.rest.v2.serializer.JacksonAdapter;
import com.microsoft.rest.v2.util.FlowableUtil;
import io.reactivex.Completable;
import io.reactivex.Flowable;
import io.reactivex.Maybe;
Expand All @@ -43,6 +44,7 @@
import java.lang.reflect.Proxy;
import java.lang.reflect.Type;
import java.net.URL;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.NoSuchElementException;

Expand Down Expand Up @@ -184,10 +186,10 @@ private HttpRequest createHttpRequest(SwaggerMethodParser methodParser, Object[]
final String bodyContentString = serializer.serialize(bodyContentObject, SerializerEncoding.JSON);
request.withBody(bodyContentString);
}
else if (isFlowableByteArray(TypeToken.of(methodParser.bodyJavaType()))) {
else if (FlowableUtil.isFlowableByteBuffer(TypeToken.of(methodParser.bodyJavaType()))) {
// Content-Length or Transfer-Encoding: chunked must be provided by a user-specified header when a Flowable<byte[]> is given for the body.
//noinspection ConstantConditions
request.withBody((Flowable<byte[]>) bodyContentObject);
request.withBody((Flowable<ByteBuffer>) bodyContentObject);
}
else if (bodyContentObject instanceof byte[]) {
request.withBody((byte[]) bodyContentObject);
Expand Down Expand Up @@ -314,17 +316,6 @@ private Single<?> handleRestResponseReturnTypeAsync(HttpResponse response, Swagg
return asyncResult;
}

private boolean isFlowableByteArray(TypeToken entityTypeToken) {
if (entityTypeToken.isSubtypeOf(Flowable.class)) {
final Type innerType = ((ParameterizedType) entityTypeToken.getType()).getActualTypeArguments()[0];
final TypeToken innerTypeToken = TypeToken.of(innerType);
if (innerTypeToken.isSubtypeOf(byte[].class)) {
return true;
}
}
return false;
}

protected final Maybe<?> handleBodyReturnTypeAsync(final HttpResponse response, final SwaggerMethodParser methodParser, final Type entityType) {
final TypeToken entityTypeToken = TypeToken.of(entityType);
final int responseStatusCode = response.statusCode();
Expand All @@ -347,7 +338,7 @@ public byte[] apply(byte[] base64UrlBytes) {
});
}
asyncResult = responseBodyBytesAsync;
} else if (isFlowableByteArray(entityTypeToken)) {
} else if (FlowableUtil.isFlowableByteBuffer(entityTypeToken)) {
asyncResult = Maybe.just(response.streamBodyAsync());
} else {
Object result = response.deserializedBody();
Expand Down Expand Up @@ -406,7 +397,7 @@ public Single<?> apply(HttpResponse response) throws Exception {
else if (returnTypeToken.isSubtypeOf(Observable.class)) {
throw new InvalidReturnTypeException("RestProxy does not support swagger interface methods (such as " + methodParser.fullyQualifiedMethodName() + "()) with a return type of " + returnType.toString());
}
else if (isFlowableByteArray(returnTypeToken)) {
else if (FlowableUtil.isFlowableByteBuffer(returnTypeToken)) {
result = asyncExpectedResponse.flatMapPublisher(new Function<HttpResponse, Publisher<?>>() {
@Override
public Publisher<?> apply(HttpResponse httpResponse) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
import io.reactivex.Flowable;
import io.reactivex.Single;
import io.reactivex.functions.Function;
import org.reactivestreams.Publisher;

import java.nio.ByteBuffer;

/**
* HTTP response which will buffer the response's body when/if it is read.
Expand Down Expand Up @@ -57,8 +60,13 @@ public byte[] apply(byte[] bytes) {
}

@Override
public Flowable<byte[]> streamBodyAsync() {
return bodyAsByteArrayAsync().toFlowable();
public Flowable<ByteBuffer> streamBodyAsync() {
return bodyAsByteArrayAsync().flatMapPublisher(new Function<byte[], Publisher<? extends ByteBuffer>>() {
@Override
public Publisher<? extends ByteBuffer> apply(byte[] bytes) throws Exception {
return Flowable.just(ByteBuffer.wrap(bytes));
}
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.reactivex.Flowable;

import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;

/**
Expand All @@ -20,7 +21,7 @@ public class HttpRequest {
private HttpMethod httpMethod;
private URL url;
private HttpHeaders headers;
private Flowable<byte[]> body;
private Flowable<ByteBuffer> body;
private final HttpResponseDecoder responseDecoder;

/**
Expand Down Expand Up @@ -49,7 +50,7 @@ public HttpRequest(String callerMethod, HttpMethod httpMethod, URL url, HttpResp
* @param body The body of this HTTP request.
* @param responseDecoder the which decodes messages sent in response to this HttpRequest.
*/
public HttpRequest(String callerMethod, HttpMethod httpMethod, URL url, HttpHeaders headers, Flowable<byte[]> body, HttpResponseDecoder responseDecoder) {
public HttpRequest(String callerMethod, HttpMethod httpMethod, URL url, HttpHeaders headers, Flowable<ByteBuffer> body, HttpResponseDecoder responseDecoder) {
this.callerMethod = callerMethod;
this.httpMethod = httpMethod;
this.url = url;
Expand Down Expand Up @@ -153,7 +154,7 @@ public HttpRequest withHeader(String headerName, String headerValue) {
* Get the body for this HttpRequest.
* @return The body for this HttpRequest.
*/
public Flowable<byte[]> body() {
public Flowable<ByteBuffer> body() {
return body;
}

Expand All @@ -175,7 +176,7 @@ public HttpRequest withBody(String body) {
*/
public HttpRequest withBody(byte[] body) {
headers.set("Content-Length", String.valueOf(body.length));
return withBody(Flowable.just(body));
return withBody(Flowable.just(ByteBuffer.wrap(body)));
}

/**
Expand All @@ -185,7 +186,7 @@ public HttpRequest withBody(byte[] body) {
* @param body The body of this HTTP request.
* @return This HttpRequest so that multiple operations can be chained together.
*/
public HttpRequest withBody(Flowable<byte[]> body) {
public HttpRequest withBody(Flowable<ByteBuffer> body) {
this.body = body;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.reactivex.Single;

import java.io.Closeable;
import java.nio.ByteBuffer;

/**
* This class contains all of the details necessary for reacting to a HTTP response from a
Expand Down Expand Up @@ -43,7 +44,7 @@ public abstract class HttpResponse implements Closeable {
* Stream this response's body content.
* @return This response's body as an asynchronous sequence of byte[].
*/
public abstract Flowable<byte[]> streamBodyAsync();
public abstract Flowable<ByteBuffer> streamBodyAsync();

/**
* Get this response object's body as a byte[]. If this response object doesn't have a body,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.EncoderException;
import io.netty.handler.codec.http.DefaultHttpContent;
import io.netty.handler.codec.http.DefaultHttpRequest;
import io.netty.handler.codec.http.DefaultLastHttpContent;
import io.netty.handler.codec.http.HttpClientCodec;
Expand Down Expand Up @@ -49,6 +48,7 @@
import java.net.Proxy;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -282,14 +282,8 @@ public void operationComplete(Future<? super Void> future) throws Exception {
}
});
} else {
Flowable<ByteBuf> byteBufContent = request.body().map(new Function<byte[], ByteBuf>() {
@Override
public ByteBuf apply(byte[] bytes) throws Exception {
return Unpooled.wrappedBuffer(bytes);
}
});

byteBufContent.observeOn(Schedulers.from(channel.eventLoop())).subscribe(new FlowableSubscriber<ByteBuf>() {
request.body().observeOn(Schedulers.from(channel.eventLoop())).subscribe(new FlowableSubscriber<ByteBuffer>() {
Subscription subscription;
@Override
public void onSubscribe(Subscription s) {
Expand All @@ -311,11 +305,11 @@ public void operationComplete(Future<? super Void> future) throws Exception {
};

@Override
public void onNext(ByteBuf buf) {
public void onNext(ByteBuffer buf) {
if (!channel.eventLoop().inEventLoop()) {
throw new IllegalStateException("onNext must be called from the event loop managing the channel.");
}
channel.writeAndFlush(new DefaultHttpContent(buf))
channel.writeAndFlush(Unpooled.wrappedBuffer(buf))
.addListener(onChannelWriteComplete);

if (channel.isWritable()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.reactivex.functions.Function;
import org.reactivestreams.Subscription;

import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map.Entry;

Expand Down Expand Up @@ -70,22 +71,22 @@ public byte[] apply(ByteBuf byteBuf) {
}

static byte[] toByteArray(ByteBuf byteBuf) {
if (byteBuf.hasArray()) {
return byteBuf.array();
} else {
byte[] res = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(res);
byteBuf.release();
return res;
}
byte[] res = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(res);
byteBuf.release();
return res;
}

@Override
public Flowable<byte[]> streamBodyAsync() {
return contentStream.map(new Function<ByteBuf, byte[]>() {
public Flowable<ByteBuffer> streamBodyAsync() {
return contentStream.map(new Function<ByteBuf, ByteBuffer>() {
@Override
public byte[] apply(ByteBuf byteBuf) {
return toByteArray(byteBuf);
public ByteBuffer apply(ByteBuf byteBuf) {
ByteBuffer dst = ByteBuffer.allocate(byteBuf.readableBytes());
byteBuf.readBytes(dst);
byteBuf.release();
dst.flip();
return dst;
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public Single<HttpResponse> decode(final HttpResponse response) {

final TypeToken entityTypeToken = getEntityType();

boolean isSerializableBody = !FlowableUtil.isFlowableByteArray(entityTypeToken)
boolean isSerializableBody = !FlowableUtil.isFlowableByteBuffer(entityTypeToken)
&& !entityTypeToken.isSubtypeOf(Completable.class)
&& !entityTypeToken.isSubtypeOf(byte[].class)
&& !entityTypeToken.isSubtypeOf(Boolean.TYPE) && !entityTypeToken.isSubtypeOf(Boolean.class)
Expand Down
Loading

0 comments on commit 1fbbfe3

Please sign in to comment.