Skip to content

Commit

Permalink
Skip defensive copies and transforms in AsyncRequestBody
Browse files Browse the repository at this point in the history
  • Loading branch information
StephenFlavin committed May 17, 2023
1 parent 68329a7 commit 71d4ce4
Show file tree
Hide file tree
Showing 9 changed files with 718 additions and 248 deletions.
6 changes: 6 additions & 0 deletions .changes/next-release/feature-AWSSDKforJavav2-5d806ad.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "feature",
"category": "AWS SDK for Java v2",
"contributor": "StephenFlavin",
"description": "Skip defensive copies and transforms in AsyncRequestBody"
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.core.internal.async.ByteArrayAsyncRequestBody;
import software.amazon.awssdk.core.internal.async.ByteBuffersAsyncRequestBody;
import software.amazon.awssdk.core.internal.async.FileAsyncRequestBody;
import software.amazon.awssdk.core.internal.async.InputStreamWithExecutorAsyncRequestBody;
import software.amazon.awssdk.core.internal.util.Mimetype;
Expand All @@ -52,7 +53,7 @@
* </p>
*
* @see FileAsyncRequestBody
* @see ByteArrayAsyncRequestBody
* @see ByteBuffersAsyncRequestBody
*/
@SdkPublicApi
public interface AsyncRequestBody extends SdkPublisher<ByteBuffer> {
Expand Down Expand Up @@ -124,11 +125,11 @@ static AsyncRequestBody fromFile(File file) {
* @param string The string to provide.
* @param cs The {@link Charset} to use.
* @return Implementation of {@link AsyncRequestBody} that uses the specified string.
* @see ByteArrayAsyncRequestBody
* @see ByteBuffersAsyncRequestBody
*/
static AsyncRequestBody fromString(String string, Charset cs) {
return new ByteArrayAsyncRequestBody(string.getBytes(cs),
Mimetype.MIMETYPE_TEXT_PLAIN + "; charset=" + cs.name());
return ByteBuffersAsyncRequestBody.from(Mimetype.MIMETYPE_TEXT_PLAIN + "; charset=" + cs.name(),
string.getBytes(cs));
}

/**
Expand All @@ -150,18 +151,69 @@ static AsyncRequestBody fromString(String string) {
* @return AsyncRequestBody instance.
*/
static AsyncRequestBody fromBytes(byte[] bytes) {
return new ByteArrayAsyncRequestBody(bytes, Mimetype.MIMETYPE_OCTET_STREAM);
byte[] clonedBytes = bytes.clone();
return ByteBuffersAsyncRequestBody.from(clonedBytes);
}

/**
* Creates a {@link AsyncRequestBody} from a {@link ByteBuffer}. Buffer contents are copied so any modifications
* made to the original {@link ByteBuffer} are not reflected in the {@link AsyncRequestBody}.
* Creates a {@link AsyncRequestBody} from a byte array. The contents of the byte array are copied so modifications to the
* original byte array are not reflected in the {@link AsyncRequestBody}.
*
* @param bytes The bytes to send to the service.
* @return AsyncRequestBody instance.
*/
static AsyncRequestBody fromBytesUnsafe(byte[] bytes) {
return ByteBuffersAsyncRequestBody.from(bytes);
}

/**
* Creates a {@link AsyncRequestBody} from a {@link ByteBuffer}. Buffer contents are copied with the position set to zero, the
* mark if defined is discarded, this is to ensure modifications made to the original {@link ByteBuffer} are not reflected in
* the {@link AsyncRequestBody}.
*
* @param byteBuffer ByteBuffer to send to the service.
* @return AsyncRequestBody instance.
*/
static AsyncRequestBody fromByteBuffer(ByteBuffer byteBuffer) {
return fromBytes(BinaryUtils.copyAllBytesFrom(byteBuffer));
ByteBuffer immutableCopy = BinaryUtils.immutableCopyOf(byteBuffer);
immutableCopy.rewind();
return ByteBuffersAsyncRequestBody.of(null, immutableCopy);
}

/**
* Creates a {@link AsyncRequestBody} from a {@link ByteBuffer}.
*
* @param byteBuffer ByteBuffer to send to the service.
* @return AsyncRequestBody instance.
*/
static AsyncRequestBody fromByteBufferUnsafe(ByteBuffer byteBuffer) {
return ByteBuffersAsyncRequestBody.of(null, byteBuffer);
}

/**
* Creates a {@link AsyncRequestBody} from an array of {@link ByteBuffer}. Each Buffers contents are copied with their
* positions set to zero and marks if defined are discarded, this is to ensure modifications made to the original array of
* {@link ByteBuffer} are not reflected in the {@link AsyncRequestBody}.
*
* @param byteBuffers ByteBuffer[] to send to the service.
* @return AsyncRequestBody instance.
*/
static AsyncRequestBody fromByteBuffers(ByteBuffer... byteBuffers) {
ByteBuffer[] immutableCopy = Arrays.stream(byteBuffers)
.map(BinaryUtils::immutableCopyOf)
.peek(ByteBuffer::rewind)
.toArray(ByteBuffer[]::new);
return ByteBuffersAsyncRequestBody.of(null, immutableCopy);
}

/**
* Creates a {@link AsyncRequestBody} from an array of {@link ByteBuffer}.
*
* @param byteBuffers ByteBuffer[] to send to the service.
* @return AsyncRequestBody instance.
*/
static AsyncRequestBody fromByteBuffersUnsafe(ByteBuffer... byteBuffers) {
return ByteBuffersAsyncRequestBody.of(null, byteBuffers);
}

/**
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.core.internal.async;

import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.internal.util.Mimetype;
import software.amazon.awssdk.utils.BinaryUtils;
import software.amazon.awssdk.utils.Logger;

/**
* An implementation of {@link AsyncRequestBody} for providing data from the supplied {@link ByteBuffer} array. This is created
* using static methods on {@link AsyncRequestBody}
*
* @see AsyncRequestBody#fromBytes(byte[])
* @see AsyncRequestBody#fromByteBuffer(ByteBuffer)
* @see AsyncRequestBody#fromByteBuffers(ByteBuffer...)
* @see AsyncRequestBody#fromString(String)
*/
@SdkInternalApi
public final class ByteBuffersAsyncRequestBody implements AsyncRequestBody {
private static final Logger log = Logger.loggerFor(ByteBuffersAsyncRequestBody.class);

private final String mimetype;
private final Long length;
private final ByteBuffer[] buffers;

private ByteBuffersAsyncRequestBody(String mimetype, Long length, ByteBuffer... buffers) {
this.mimetype = mimetype;
this.length = length;
this.buffers = buffers;
}

@Override
public Optional<Long> contentLength() {
return Optional.ofNullable(length);
}

@Override
public String contentType() {
return mimetype;
}

@Override
public void subscribe(Subscriber<? super ByteBuffer> s) {
// As per rule 1.9 we must throw NullPointerException if the subscriber parameter is null
if (s == null) {
throw new NullPointerException("Subscription MUST NOT be null.");
}

// As per 2.13, this method must return normally (i.e. not throw).
try {
s.onSubscribe(
new Subscription() {
private final AtomicInteger index = new AtomicInteger(0);
private final AtomicBoolean completed = new AtomicBoolean(false);

@Override
public void request(long n) {
if (completed.get()) {
return;
}

if (n > 0) {
int i = index.getAndIncrement();

if (i >= buffers.length) {
return;
}

long remaining = n;

do {
ByteBuffer buffer = buffers[i];

// Pending discussions on https://github.com/aws/aws-sdk-java-v2/issues/3928
if (buffer.isDirect()) {
buffer = BinaryUtils.toNonDirectBuffer(buffer);
}

s.onNext(buffer);
remaining--;
} while (remaining > 0 && (i = index.getAndIncrement()) < buffers.length);

if (i >= buffers.length - 1 && completed.compareAndSet(false, true)) {
s.onComplete();
}
} else {
s.onError(new IllegalArgumentException("§3.9: non-positive requests are not allowed!"));
}
}

@Override
public void cancel() {
completed.set(true);
}
}
);
} catch (Throwable ex) {
log.error(() -> s + " violated the Reactive Streams rule 2.13 by throwing an exception from onSubscribe.", ex);
}
}

public static ByteBuffersAsyncRequestBody of(Long length, ByteBuffer... buffers) {
return new ByteBuffersAsyncRequestBody(Mimetype.MIMETYPE_OCTET_STREAM, length, buffers);
}

public static ByteBuffersAsyncRequestBody of(String mimetype, Long length, ByteBuffer... buffers) {
return new ByteBuffersAsyncRequestBody(mimetype, length, buffers);
}

public static ByteBuffersAsyncRequestBody from(byte[] bytes) {
return new ByteBuffersAsyncRequestBody(Mimetype.MIMETYPE_OCTET_STREAM, (long) bytes.length,
ByteBuffer.wrap(bytes));
}

public static ByteBuffersAsyncRequestBody from(String mimetype, byte[] bytes) {
return new ByteBuffersAsyncRequestBody(mimetype, (long) bytes.length, ByteBuffer.wrap(bytes));
}
}
Loading

0 comments on commit 71d4ce4

Please sign in to comment.