Skip to content

Commit

Permalink
Skip defensive copies and transforms in AsyncRequestBody
Browse files Browse the repository at this point in the history
  • Loading branch information
StephenFlavin committed Apr 28, 2023
1 parent 6530cd7 commit cfb4737
Show file tree
Hide file tree
Showing 6 changed files with 334 additions and 34 deletions.
6 changes: 6 additions & 0 deletions .changes/next-release/feature-AWSSDKforJavav2-5d806ad.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "feature",
"category": "AWS SDK for Java v2",
"contributor": "StephenFlavin",
"description": "Skip defensive copies and transforms in AsyncRequestBody"
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
import software.amazon.awssdk.core.internal.async.ByteArrayAsyncRequestBody;
import software.amazon.awssdk.core.internal.async.FileAsyncRequestBody;
import software.amazon.awssdk.core.internal.async.InputStreamWithExecutorAsyncRequestBody;
import software.amazon.awssdk.core.internal.async.ByteBuffersAsyncRequestBody;
import software.amazon.awssdk.core.internal.util.Mimetype;
import software.amazon.awssdk.utils.BinaryUtils;

/**
* Interface to allow non-blocking streaming of request content. This follows the reactive streams pattern where
Expand Down Expand Up @@ -124,11 +124,11 @@ static AsyncRequestBody fromFile(File file) {
* @param string The string to provide.
* @param cs The {@link Charset} to use.
* @return Implementation of {@link AsyncRequestBody} that uses the specified string.
* @see ByteArrayAsyncRequestBody
* @see ByteBuffersAsyncRequestBody
*/
static AsyncRequestBody fromString(String string, Charset cs) {
return new ByteArrayAsyncRequestBody(string.getBytes(cs),
Mimetype.MIMETYPE_TEXT_PLAIN + "; charset=" + cs.name());
return ByteBuffersAsyncRequestBody.from(Mimetype.MIMETYPE_TEXT_PLAIN + "; charset=" + cs.name(),
string.getBytes(cs));
}

/**
Expand All @@ -143,25 +143,33 @@ static AsyncRequestBody fromString(String string) {
}

/**
* Creates a {@link AsyncRequestBody} from a byte array. The contents of the byte array are copied so modifications to the
* original byte array are not reflected in the {@link AsyncRequestBody}.
* Creates a {@link AsyncRequestBody} from a byte array.
*
* @param bytes The bytes to send to the service.
* @return AsyncRequestBody instance.
*/
static AsyncRequestBody fromBytes(byte[] bytes) {
return new ByteArrayAsyncRequestBody(bytes, Mimetype.MIMETYPE_OCTET_STREAM);
return ByteBuffersAsyncRequestBody.from(bytes);
}

/**
* Creates a {@link AsyncRequestBody} from a {@link ByteBuffer}. Buffer contents are copied so any modifications
* made to the original {@link ByteBuffer} are not reflected in the {@link AsyncRequestBody}.
* Creates a {@link AsyncRequestBody} from a {@link ByteBuffer}.
*
* @param byteBuffer ByteBuffer to send to the service.
* @return AsyncRequestBody instance.
*/
static AsyncRequestBody fromByteBuffer(ByteBuffer byteBuffer) {
return fromBytes(BinaryUtils.copyAllBytesFrom(byteBuffer));
return ByteBuffersAsyncRequestBody.of(null, byteBuffer);
}

/**
* Creates a {@link AsyncRequestBody} from an array of {@link ByteBuffer}.
*
* @param byteBuffers ByteBuffer[] to send to the service.
* @return AsyncRequestBody instance.
*/
static AsyncRequestBody fromByteBuffers(ByteBuffer... byteBuffers) {
return ByteBuffersAsyncRequestBody.of(null, byteBuffers);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,11 @@
* @see AsyncRequestBody#fromBytes(byte[])
* @see AsyncRequestBody#fromByteBuffer(ByteBuffer)
* @see AsyncRequestBody#fromString(String)
*
* @deprecated by {@link ByteBuffersAsyncRequestBody#from}.
*/
@SdkInternalApi
@Deprecated
public final class ByteArrayAsyncRequestBody implements AsyncRequestBody {
private static final Logger log = Logger.loggerFor(ByteArrayAsyncRequestBody.class);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.core.internal.async;

import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.internal.util.Mimetype;
import software.amazon.awssdk.utils.BinaryUtils;
import software.amazon.awssdk.utils.Logger;

/**
* An implementation of {@link AsyncRequestBody} for providing data from the supplied {@link ByteBuffer} array. This is created
* using static methods on {@link AsyncRequestBody}
*
* @see AsyncRequestBody#fromBytes(byte[])
* @see AsyncRequestBody#fromByteBuffer(ByteBuffer)
* @see AsyncRequestBody#fromByteBuffers(ByteBuffer...)
* @see AsyncRequestBody#fromString(String)
*/
@SdkInternalApi
public final class ByteBuffersAsyncRequestBody implements AsyncRequestBody {
private static final Logger log = Logger.loggerFor(ByteBuffersAsyncRequestBody.class);

private final String mimetype;
private final Long length;
private final ByteBuffer[] buffers;

private ByteBuffersAsyncRequestBody(String mimetype, Long length, ByteBuffer... buffers) {
this.mimetype = mimetype;
this.length = length;
this.buffers = buffers;
}

@Override
public Optional<Long> contentLength() {
return Optional.ofNullable(length);
}

@Override
public String contentType() {
return mimetype;
}

@Override
public void subscribe(Subscriber<? super ByteBuffer> s) {
// As per rule 1.9 we must throw NullPointerException if the subscriber parameter is null
if (s == null) {
throw new NullPointerException("Subscription MUST NOT be null.");
}

// As per 2.13, this method must return normally (i.e. not throw).
try {
s.onSubscribe(
new Subscription() {
private final AtomicInteger index = new AtomicInteger(0);
private final AtomicBoolean completed = new AtomicBoolean(false);

@Override
public void request(long n) {
if (completed.get()) {
return;
}

if (n > 0) {
int i = index.getAndIncrement();

if (i >= buffers.length) {
return;
}

long remaining = n;

do {
ByteBuffer buffer = buffers[i];
if (!buffer.hasArray()) {
buffer = ByteBuffer.wrap(BinaryUtils.copyBytesFrom(buffer));
}
s.onNext(buffer);
remaining--;
} while (remaining > 0 && (i = index.getAndIncrement()) < buffers.length);

if (i >= buffers.length - 1 && completed.compareAndSet(false, true)) {
s.onComplete();
}
} else {
s.onError(new IllegalArgumentException("§3.9: non-positive requests are not allowed!"));
}
}

@Override
public void cancel() {
completed.set(true);
}
}
);
} catch (Throwable ex) {
log.error(() -> s + " violated the Reactive Streams rule 2.13 by throwing an exception from onSubscribe.", ex);
}
}

public static ByteBuffersAsyncRequestBody of(Long length, ByteBuffer... buffers) {
return new ByteBuffersAsyncRequestBody(Mimetype.MIMETYPE_OCTET_STREAM, length, buffers);
}

public static ByteBuffersAsyncRequestBody of(String mimetype, Long length, ByteBuffer... buffers) {
return new ByteBuffersAsyncRequestBody(mimetype, length, buffers);
}

public static ByteBuffersAsyncRequestBody from(byte[] bytes) {
return new ByteBuffersAsyncRequestBody(Mimetype.MIMETYPE_OCTET_STREAM, (long) bytes.length,
ByteBuffer.wrap(bytes));
}

public static ByteBuffersAsyncRequestBody from(String mimetype, byte[] bytes) {
return new ByteBuffersAsyncRequestBody(mimetype, (long) bytes.length, ByteBuffer.wrap(bytes));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,39 +15,28 @@

package software.amazon.awssdk.core.async;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.assertj.core.api.Assertions.assertThat;

import com.google.common.jimfs.Configuration;
import com.google.common.jimfs.Jimfs;
import io.reactivex.Flowable;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.FileSystem;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;
import org.assertj.core.util.Lists;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import software.amazon.awssdk.core.internal.util.Mimetype;
import software.amazon.awssdk.http.async.SimpleSubscriber;
import software.amazon.awssdk.utils.BinaryUtils;
import software.amazon.awssdk.utils.StringInputStream;

@RunWith(Parameterized.class)
public class AsyncRequestBodyTest {
Expand Down Expand Up @@ -164,17 +153,4 @@ public void publisherConstructorHasCorrectContentType() {
AsyncRequestBody requestBody = AsyncRequestBody.fromPublisher(bodyPublisher);
assertThat(requestBody.contentType()).isEqualTo(Mimetype.MIMETYPE_OCTET_STREAM);
}

@Test
public void fromBytes_byteArrayNotNull_createsCopy() {
byte[] original = {0x1, 0x2, 0x3, 0x4};
byte[] toModify = new byte[original.length];
System.arraycopy(original, 0, toModify, 0, original.length);
AsyncRequestBody body = AsyncRequestBody.fromBytes(toModify);
for (int i = 0; i < toModify.length; ++i) {
toModify[i]++;
}
ByteBuffer publishedBb = Flowable.fromPublisher(body).toList().blockingGet().get(0);
assertThat(BinaryUtils.copyAllBytesFrom(publishedBb)).isEqualTo(original);
}
}
Loading

0 comments on commit cfb4737

Please sign in to comment.