Skip to content

Commit

Permalink
Add "unsafe" AsyncRequestBody constructors for byte[] and ByteBuffers (
Browse files Browse the repository at this point in the history
  • Loading branch information
StephenFlavin authored Jun 9, 2023
1 parent 43505cc commit be45eb0
Show file tree
Hide file tree
Showing 10 changed files with 1,031 additions and 265 deletions.
6 changes: 6 additions & 0 deletions .changes/next-release/feature-AWSSDKforJavav2-5d806ad.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "feature",
"category": "AWS SDK for Java v2",
"contributor": "StephenFlavin",
"description": "Add \"unsafe\" and \"fromRemaining\" AsyncRequestBody constructors for byte arrays and ByteBuffers"
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,37 +22,38 @@
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.core.internal.async.ByteArrayAsyncRequestBody;
import software.amazon.awssdk.core.internal.async.ByteBuffersAsyncRequestBody;
import software.amazon.awssdk.core.internal.async.FileAsyncRequestBody;
import software.amazon.awssdk.core.internal.async.InputStreamWithExecutorAsyncRequestBody;
import software.amazon.awssdk.core.internal.util.Mimetype;
import software.amazon.awssdk.utils.BinaryUtils;

/**
* Interface to allow non-blocking streaming of request content. This follows the reactive streams pattern where
* this interface is the {@link Publisher} of data (specifically {@link ByteBuffer} chunks) and the HTTP client is the Subscriber
* of the data (i.e. to write that data on the wire).
* Interface to allow non-blocking streaming of request content. This follows the reactive streams pattern where this interface is
* the {@link Publisher} of data (specifically {@link ByteBuffer} chunks) and the HTTP client is the Subscriber of the data (i.e.
* to write that data on the wire).
*
* <p>
* {@link #subscribe(Subscriber)} should be implemented to tie this publisher to a subscriber. Ideally each call to subscribe
* should reproduce the content (i.e if you are reading from a file each subscribe call should produce a {@link
* org.reactivestreams.Subscription} that reads the file fully). This allows for automatic retries to be performed in the SDK. If
* the content is not reproducible, an exception may be thrown from any subsequent {@link #subscribe(Subscriber)} calls.
* should reproduce the content (i.e if you are reading from a file each subscribe call should produce a
* {@link org.reactivestreams.Subscription} that reads the file fully). This allows for automatic retries to be performed in the
* SDK. If the content is not reproducible, an exception may be thrown from any subsequent {@link #subscribe(Subscriber)} calls.
* </p>
*
* <p>
* It is important to only send the number of chunks that the subscriber requests to avoid out of memory situations.
* The subscriber does it's own buffering so it's usually not needed to buffer in the publisher. Additional permits
* for chunks will be notified via the {@link org.reactivestreams.Subscription#request(long)} method.
* It is important to only send the number of chunks that the subscriber requests to avoid out of memory situations. The
* subscriber does it's own buffering so it's usually not needed to buffer in the publisher. Additional permits for chunks will be
* notified via the {@link org.reactivestreams.Subscription#request(long)} method.
* </p>
*
* @see FileAsyncRequestBody
* @see ByteArrayAsyncRequestBody
* @see ByteBuffersAsyncRequestBody
*/
@SdkPublicApi
public interface AsyncRequestBody extends SdkPublisher<ByteBuffer> {
Expand All @@ -70,8 +71,8 @@ default String contentType() {
}

/**
* Creates an {@link AsyncRequestBody} the produces data from the input ByteBuffer publisher.
* The data is delivered when the publisher publishes the data.
* Creates an {@link AsyncRequestBody} the produces data from the input ByteBuffer publisher. The data is delivered when the
* publisher publishes the data.
*
* @param publisher Publisher of source data
* @return Implementation of {@link AsyncRequestBody} that produces data send by the publisher
Expand Down Expand Up @@ -124,11 +125,11 @@ static AsyncRequestBody fromFile(File file) {
* @param string The string to provide.
* @param cs The {@link Charset} to use.
* @return Implementation of {@link AsyncRequestBody} that uses the specified string.
* @see ByteArrayAsyncRequestBody
* @see ByteBuffersAsyncRequestBody
*/
static AsyncRequestBody fromString(String string, Charset cs) {
return new ByteArrayAsyncRequestBody(string.getBytes(cs),
Mimetype.MIMETYPE_TEXT_PLAIN + "; charset=" + cs.name());
return ByteBuffersAsyncRequestBody.from(Mimetype.MIMETYPE_TEXT_PLAIN + "; charset=" + cs.name(),
string.getBytes(cs));
}

/**
Expand All @@ -143,29 +144,181 @@ static AsyncRequestBody fromString(String string) {
}

/**
* Creates a {@link AsyncRequestBody} from a byte array. The contents of the byte array are copied so modifications to the
* original byte array are not reflected in the {@link AsyncRequestBody}.
* Creates an {@link AsyncRequestBody} from a byte array. This will copy the contents of the byte array to prevent
* modifications to the provided byte array from being reflected in the {@link AsyncRequestBody}.
*
* @param bytes The bytes to send to the service.
* @return AsyncRequestBody instance.
*/
static AsyncRequestBody fromBytes(byte[] bytes) {
return new ByteArrayAsyncRequestBody(bytes, Mimetype.MIMETYPE_OCTET_STREAM);
byte[] clonedBytes = bytes.clone();
return ByteBuffersAsyncRequestBody.from(clonedBytes);
}

/**
* Creates a {@link AsyncRequestBody} from a {@link ByteBuffer}. Buffer contents are copied so any modifications
* made to the original {@link ByteBuffer} are not reflected in the {@link AsyncRequestBody}.
* Creates an {@link AsyncRequestBody} from a byte array <b>without</b> copying the contents of the byte array. This
* introduces concurrency risks, allowing: (1) the caller to modify the byte array stored in this {@code AsyncRequestBody}
* implementation AND (2) any users of {@link #fromBytesUnsafe(byte[])} to modify the byte array passed into this
* {@code AsyncRequestBody} implementation.
*
* <p>As the method name implies, this is unsafe. Use {@link #fromBytes(byte[])} unless you're sure you know the risks.
*
* @param bytes The bytes to send to the service.
* @return AsyncRequestBody instance.
*/
static AsyncRequestBody fromBytesUnsafe(byte[] bytes) {
return ByteBuffersAsyncRequestBody.from(bytes);
}

/**
* Creates an {@link AsyncRequestBody} from a {@link ByteBuffer}. This will copy the contents of the {@link ByteBuffer} to
* prevent modifications to the provided {@link ByteBuffer} from being reflected in the {@link AsyncRequestBody}.
* <p>
* <b>NOTE:</b> This method ignores the current read position. Use {@link #fromRemainingByteBuffer(ByteBuffer)} if you need
* it to copy only the remaining readable bytes.
*
* @param byteBuffer ByteBuffer to send to the service.
* @return AsyncRequestBody instance.
*/
static AsyncRequestBody fromByteBuffer(ByteBuffer byteBuffer) {
return fromBytes(BinaryUtils.copyAllBytesFrom(byteBuffer));
ByteBuffer immutableCopy = BinaryUtils.immutableCopyOf(byteBuffer);
immutableCopy.rewind();
return ByteBuffersAsyncRequestBody.of((long) immutableCopy.remaining(), immutableCopy);
}

/**
* Creates an {@link AsyncRequestBody} from the remaining readable bytes from a {@link ByteBuffer}. This will copy the
* remaining contents of the {@link ByteBuffer} to prevent modifications to the provided {@link ByteBuffer} from being
* reflected in the {@link AsyncRequestBody}.
* <p> Unlike {@link #fromByteBuffer(ByteBuffer)}, this method respects the current read position of the buffer and reads
* only the remaining bytes.
*
* @param byteBuffer ByteBuffer to send to the service.
* @return AsyncRequestBody instance.
*/
static AsyncRequestBody fromRemainingByteBuffer(ByteBuffer byteBuffer) {
ByteBuffer immutableCopy = BinaryUtils.immutableCopyOfRemaining(byteBuffer);
return ByteBuffersAsyncRequestBody.of((long) immutableCopy.remaining(), immutableCopy);
}

/**
* Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} <b>without</b> copying the contents of the
* {@link ByteBuffer}. This introduces concurrency risks, allowing the caller to modify the {@link ByteBuffer} stored in this
* {@code AsyncRequestBody} implementation.
* <p>
* <b>NOTE:</b> This method ignores the current read position. Use {@link #fromRemainingByteBufferUnsafe(ByteBuffer)} if you
* need it to copy only the remaining readable bytes.
*
* <p>As the method name implies, this is unsafe. Use {@link #fromByteBuffer(ByteBuffer)}} unless you're sure you know the
* risks.
*
* @param byteBuffer ByteBuffer to send to the service.
* @return AsyncRequestBody instance.
*/
static AsyncRequestBody fromByteBufferUnsafe(ByteBuffer byteBuffer) {
ByteBuffer readOnlyBuffer = byteBuffer.asReadOnlyBuffer();
readOnlyBuffer.rewind();
return ByteBuffersAsyncRequestBody.of((long) readOnlyBuffer.remaining(), readOnlyBuffer);
}

/**
* Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} <b>without</b> copying the contents of the
* {@link ByteBuffer}. This introduces concurrency risks, allowing the caller to modify the {@link ByteBuffer} stored in this
* {@code AsyncRequestBody} implementation.
* <p>Unlike {@link #fromByteBufferUnsafe(ByteBuffer)}, this method respects the current read position of
* the buffer and reads only the remaining bytes.
*
* <p>As the method name implies, this is unsafe. Use {@link #fromByteBuffer(ByteBuffer)}} unless you're sure you know the
* risks.
*
* @param byteBuffer ByteBuffer to send to the service.
* @return AsyncRequestBody instance.
*/
static AsyncRequestBody fromRemainingByteBufferUnsafe(ByteBuffer byteBuffer) {
ByteBuffer readOnlyBuffer = byteBuffer.asReadOnlyBuffer();
return ByteBuffersAsyncRequestBody.of((long) readOnlyBuffer.remaining(), readOnlyBuffer);
}

/**
* Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} array. This will copy the contents of each {@link ByteBuffer}
* to prevent modifications to any provided {@link ByteBuffer} from being reflected in the {@link AsyncRequestBody}.
* <p>
* <b>NOTE:</b> This method ignores the current read position of each {@link ByteBuffer}. Use
* {@link #fromRemainingByteBuffers(ByteBuffer...)} if you need it to copy only the remaining readable bytes.
*
* @param byteBuffers ByteBuffer array to send to the service.
* @return AsyncRequestBody instance.
*/
static AsyncRequestBody fromByteBuffers(ByteBuffer... byteBuffers) {
ByteBuffer[] immutableCopy = Arrays.stream(byteBuffers)
.map(BinaryUtils::immutableCopyOf)
.peek(ByteBuffer::rewind)
.toArray(ByteBuffer[]::new);
return ByteBuffersAsyncRequestBody.of(immutableCopy);
}

/**
* Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} array. This will copy the remaining contents of each
* {@link ByteBuffer} to prevent modifications to any provided {@link ByteBuffer} from being reflected in the
* {@link AsyncRequestBody}.
* <p>Unlike {@link #fromByteBufferUnsafe(ByteBuffer)},
* this method respects the current read position of each buffer and reads only the remaining bytes.
*
* @param byteBuffers ByteBuffer array to send to the service.
* @return AsyncRequestBody instance.
*/
static AsyncRequestBody fromRemainingByteBuffers(ByteBuffer... byteBuffers) {
ByteBuffer[] immutableCopy = Arrays.stream(byteBuffers)
.map(BinaryUtils::immutableCopyOfRemaining)
.peek(ByteBuffer::rewind)
.toArray(ByteBuffer[]::new);
return ByteBuffersAsyncRequestBody.of(immutableCopy);
}

/**
* Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} array <b>without</b> copying the contents of each
* {@link ByteBuffer}. This introduces concurrency risks, allowing the caller to modify any {@link ByteBuffer} stored in this
* {@code AsyncRequestBody} implementation.
* <p>
* <b>NOTE:</b> This method ignores the current read position of each {@link ByteBuffer}. Use
* {@link #fromRemainingByteBuffers(ByteBuffer...)} if you need it to copy only the remaining readable bytes.
*
* <p>As the method name implies, this is unsafe. Use {@link #fromByteBuffers(ByteBuffer...)} unless you're sure you know the
* risks.
*
* @param byteBuffers ByteBuffer array to send to the service.
* @return AsyncRequestBody instance.
*/
static AsyncRequestBody fromByteBuffersUnsafe(ByteBuffer... byteBuffers) {
ByteBuffer[] readOnlyBuffers = Arrays.stream(byteBuffers)
.map(ByteBuffer::asReadOnlyBuffer)
.peek(ByteBuffer::rewind)
.toArray(ByteBuffer[]::new);
return ByteBuffersAsyncRequestBody.of(readOnlyBuffers);
}

/**
* Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} array <b>without</b> copying the contents of each
* {@link ByteBuffer}. This introduces concurrency risks, allowing the caller to modify any {@link ByteBuffer} stored in this
* {@code AsyncRequestBody} implementation.
* <p>Unlike {@link #fromByteBuffersUnsafe(ByteBuffer...)},
* this method respects the current read position of each buffer and reads only the remaining bytes.
*
* <p>As the method name implies, this is unsafe. Use {@link #fromByteBuffers(ByteBuffer...)} unless you're sure you know the
* risks.
*
* @param byteBuffers ByteBuffer array to send to the service.
* @return AsyncRequestBody instance.
*/
static AsyncRequestBody fromRemainingByteBuffersUnsafe(ByteBuffer... byteBuffers) {
ByteBuffer[] readOnlyBuffers = Arrays.stream(byteBuffers)
.map(ByteBuffer::asReadOnlyBuffer)
.toArray(ByteBuffer[]::new);
return ByteBuffersAsyncRequestBody.of(readOnlyBuffers);
}

/**
* Creates a {@link AsyncRequestBody} from a {@link InputStream}.
* Creates an {@link AsyncRequestBody} from an {@link InputStream}.
*
* <p>An {@link ExecutorService} is required in order to perform the blocking data reads, to prevent blocking the
* non-blocking event loop threads owned by the SDK.
Expand Down Expand Up @@ -239,7 +392,7 @@ static BlockingOutputStreamAsyncRequestBody forBlockingOutputStream(Long content
}

/**
* Creates a {@link AsyncRequestBody} with no content.
* Creates an {@link AsyncRequestBody} with no content.
*
* @return AsyncRequestBody instance.
*/
Expand Down

This file was deleted.

Loading

0 comments on commit be45eb0

Please sign in to comment.