diff --git a/.changes/next-release/feature-AWSSDKforJavav2-5d806ad.json b/.changes/next-release/feature-AWSSDKforJavav2-5d806ad.json new file mode 100644 index 000000000000..2feca3894f78 --- /dev/null +++ b/.changes/next-release/feature-AWSSDKforJavav2-5d806ad.json @@ -0,0 +1,6 @@ +{ + "type": "feature", + "category": "AWS SDK for Java v2", + "contributor": "StephenFlavin", + "description": "Add \"unsafe\" and \"fromRemaining\" AsyncRequestBody constructors for byte arrays and ByteBuffers" +} diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java index cad4236d241a..3bd3d7136d47 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java @@ -22,13 +22,14 @@ import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.nio.file.Path; +import java.util.Arrays; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import software.amazon.awssdk.annotations.SdkPublicApi; -import software.amazon.awssdk.core.internal.async.ByteArrayAsyncRequestBody; +import software.amazon.awssdk.core.internal.async.ByteBuffersAsyncRequestBody; import software.amazon.awssdk.core.internal.async.FileAsyncRequestBody; import software.amazon.awssdk.core.internal.async.InputStreamWithExecutorAsyncRequestBody; import software.amazon.awssdk.core.internal.async.SplittingPublisher; @@ -37,25 +38,25 @@ import software.amazon.awssdk.utils.Validate; /** - * Interface to allow non-blocking streaming of request content. This follows the reactive streams pattern where - * this interface is the {@link Publisher} of data (specifically {@link ByteBuffer} chunks) and the HTTP client is the Subscriber - * of the data (i.e. to write that data on the wire). + * Interface to allow non-blocking streaming of request content. This follows the reactive streams pattern where this interface is + * the {@link Publisher} of data (specifically {@link ByteBuffer} chunks) and the HTTP client is the Subscriber of the data (i.e. + * to write that data on the wire). * *

* {@link #subscribe(Subscriber)} should be implemented to tie this publisher to a subscriber. Ideally each call to subscribe - * should reproduce the content (i.e if you are reading from a file each subscribe call should produce a {@link - * org.reactivestreams.Subscription} that reads the file fully). This allows for automatic retries to be performed in the SDK. If - * the content is not reproducible, an exception may be thrown from any subsequent {@link #subscribe(Subscriber)} calls. + * should reproduce the content (i.e if you are reading from a file each subscribe call should produce a + * {@link org.reactivestreams.Subscription} that reads the file fully). This allows for automatic retries to be performed in the + * SDK. If the content is not reproducible, an exception may be thrown from any subsequent {@link #subscribe(Subscriber)} calls. *

* *

- * It is important to only send the number of chunks that the subscriber requests to avoid out of memory situations. - * The subscriber does it's own buffering so it's usually not needed to buffer in the publisher. Additional permits - * for chunks will be notified via the {@link org.reactivestreams.Subscription#request(long)} method. + * It is important to only send the number of chunks that the subscriber requests to avoid out of memory situations. The + * subscriber does it's own buffering so it's usually not needed to buffer in the publisher. Additional permits for chunks will be + * notified via the {@link org.reactivestreams.Subscription#request(long)} method. *

* * @see FileAsyncRequestBody - * @see ByteArrayAsyncRequestBody + * @see ByteBuffersAsyncRequestBody */ @SdkPublicApi public interface AsyncRequestBody extends SdkPublisher { @@ -73,8 +74,8 @@ default String contentType() { } /** - * Creates an {@link AsyncRequestBody} the produces data from the input ByteBuffer publisher. - * The data is delivered when the publisher publishes the data. + * Creates an {@link AsyncRequestBody} the produces data from the input ByteBuffer publisher. The data is delivered when the + * publisher publishes the data. * * @param publisher Publisher of source data * @return Implementation of {@link AsyncRequestBody} that produces data send by the publisher @@ -127,11 +128,11 @@ static AsyncRequestBody fromFile(File file) { * @param string The string to provide. * @param cs The {@link Charset} to use. * @return Implementation of {@link AsyncRequestBody} that uses the specified string. - * @see ByteArrayAsyncRequestBody + * @see ByteBuffersAsyncRequestBody */ static AsyncRequestBody fromString(String string, Charset cs) { - return new ByteArrayAsyncRequestBody(string.getBytes(cs), - Mimetype.MIMETYPE_TEXT_PLAIN + "; charset=" + cs.name()); + return ByteBuffersAsyncRequestBody.from(Mimetype.MIMETYPE_TEXT_PLAIN + "; charset=" + cs.name(), + string.getBytes(cs)); } /** @@ -146,29 +147,181 @@ static AsyncRequestBody fromString(String string) { } /** - * Creates a {@link AsyncRequestBody} from a byte array. The contents of the byte array are copied so modifications to the - * original byte array are not reflected in the {@link AsyncRequestBody}. + * Creates an {@link AsyncRequestBody} from a byte array. This will copy the contents of the byte array to prevent + * modifications to the provided byte array from being reflected in the {@link AsyncRequestBody}. * * @param bytes The bytes to send to the service. * @return AsyncRequestBody instance. */ static AsyncRequestBody fromBytes(byte[] bytes) { - return new ByteArrayAsyncRequestBody(bytes, Mimetype.MIMETYPE_OCTET_STREAM); + byte[] clonedBytes = bytes.clone(); + return ByteBuffersAsyncRequestBody.from(clonedBytes); } /** - * Creates a {@link AsyncRequestBody} from a {@link ByteBuffer}. Buffer contents are copied so any modifications - * made to the original {@link ByteBuffer} are not reflected in the {@link AsyncRequestBody}. + * Creates an {@link AsyncRequestBody} from a byte array without copying the contents of the byte array. This + * introduces concurrency risks, allowing: (1) the caller to modify the byte array stored in this {@code AsyncRequestBody} + * implementation AND (2) any users of {@link #fromBytesUnsafe(byte[])} to modify the byte array passed into this + * {@code AsyncRequestBody} implementation. + * + *

As the method name implies, this is unsafe. Use {@link #fromBytes(byte[])} unless you're sure you know the risks. + * + * @param bytes The bytes to send to the service. + * @return AsyncRequestBody instance. + */ + static AsyncRequestBody fromBytesUnsafe(byte[] bytes) { + return ByteBuffersAsyncRequestBody.from(bytes); + } + + /** + * Creates an {@link AsyncRequestBody} from a {@link ByteBuffer}. This will copy the contents of the {@link ByteBuffer} to + * prevent modifications to the provided {@link ByteBuffer} from being reflected in the {@link AsyncRequestBody}. + *

+ * NOTE: This method ignores the current read position. Use {@link #fromRemainingByteBuffer(ByteBuffer)} if you need + * it to copy only the remaining readable bytes. * * @param byteBuffer ByteBuffer to send to the service. * @return AsyncRequestBody instance. */ static AsyncRequestBody fromByteBuffer(ByteBuffer byteBuffer) { - return fromBytes(BinaryUtils.copyAllBytesFrom(byteBuffer)); + ByteBuffer immutableCopy = BinaryUtils.immutableCopyOf(byteBuffer); + immutableCopy.rewind(); + return ByteBuffersAsyncRequestBody.of((long) immutableCopy.remaining(), immutableCopy); + } + + /** + * Creates an {@link AsyncRequestBody} from the remaining readable bytes from a {@link ByteBuffer}. This will copy the + * remaining contents of the {@link ByteBuffer} to prevent modifications to the provided {@link ByteBuffer} from being + * reflected in the {@link AsyncRequestBody}. + *

Unlike {@link #fromByteBuffer(ByteBuffer)}, this method respects the current read position of the buffer and reads + * only the remaining bytes. + * + * @param byteBuffer ByteBuffer to send to the service. + * @return AsyncRequestBody instance. + */ + static AsyncRequestBody fromRemainingByteBuffer(ByteBuffer byteBuffer) { + ByteBuffer immutableCopy = BinaryUtils.immutableCopyOfRemaining(byteBuffer); + return ByteBuffersAsyncRequestBody.of((long) immutableCopy.remaining(), immutableCopy); + } + + /** + * Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} without copying the contents of the + * {@link ByteBuffer}. This introduces concurrency risks, allowing the caller to modify the {@link ByteBuffer} stored in this + * {@code AsyncRequestBody} implementation. + *

+ * NOTE: This method ignores the current read position. Use {@link #fromRemainingByteBufferUnsafe(ByteBuffer)} if you + * need it to copy only the remaining readable bytes. + * + *

As the method name implies, this is unsafe. Use {@link #fromByteBuffer(ByteBuffer)}} unless you're sure you know the + * risks. + * + * @param byteBuffer ByteBuffer to send to the service. + * @return AsyncRequestBody instance. + */ + static AsyncRequestBody fromByteBufferUnsafe(ByteBuffer byteBuffer) { + ByteBuffer readOnlyBuffer = byteBuffer.asReadOnlyBuffer(); + readOnlyBuffer.rewind(); + return ByteBuffersAsyncRequestBody.of((long) readOnlyBuffer.remaining(), readOnlyBuffer); + } + + /** + * Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} without copying the contents of the + * {@link ByteBuffer}. This introduces concurrency risks, allowing the caller to modify the {@link ByteBuffer} stored in this + * {@code AsyncRequestBody} implementation. + *

Unlike {@link #fromByteBufferUnsafe(ByteBuffer)}, this method respects the current read position of + * the buffer and reads only the remaining bytes. + * + *

As the method name implies, this is unsafe. Use {@link #fromByteBuffer(ByteBuffer)}} unless you're sure you know the + * risks. + * + * @param byteBuffer ByteBuffer to send to the service. + * @return AsyncRequestBody instance. + */ + static AsyncRequestBody fromRemainingByteBufferUnsafe(ByteBuffer byteBuffer) { + ByteBuffer readOnlyBuffer = byteBuffer.asReadOnlyBuffer(); + return ByteBuffersAsyncRequestBody.of((long) readOnlyBuffer.remaining(), readOnlyBuffer); + } + + /** + * Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} array. This will copy the contents of each {@link ByteBuffer} + * to prevent modifications to any provided {@link ByteBuffer} from being reflected in the {@link AsyncRequestBody}. + *

+ * NOTE: This method ignores the current read position of each {@link ByteBuffer}. Use + * {@link #fromRemainingByteBuffers(ByteBuffer...)} if you need it to copy only the remaining readable bytes. + * + * @param byteBuffers ByteBuffer array to send to the service. + * @return AsyncRequestBody instance. + */ + static AsyncRequestBody fromByteBuffers(ByteBuffer... byteBuffers) { + ByteBuffer[] immutableCopy = Arrays.stream(byteBuffers) + .map(BinaryUtils::immutableCopyOf) + .peek(ByteBuffer::rewind) + .toArray(ByteBuffer[]::new); + return ByteBuffersAsyncRequestBody.of(immutableCopy); + } + + /** + * Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} array. This will copy the remaining contents of each + * {@link ByteBuffer} to prevent modifications to any provided {@link ByteBuffer} from being reflected in the + * {@link AsyncRequestBody}. + *

Unlike {@link #fromByteBufferUnsafe(ByteBuffer)}, + * this method respects the current read position of each buffer and reads only the remaining bytes. + * + * @param byteBuffers ByteBuffer array to send to the service. + * @return AsyncRequestBody instance. + */ + static AsyncRequestBody fromRemainingByteBuffers(ByteBuffer... byteBuffers) { + ByteBuffer[] immutableCopy = Arrays.stream(byteBuffers) + .map(BinaryUtils::immutableCopyOfRemaining) + .peek(ByteBuffer::rewind) + .toArray(ByteBuffer[]::new); + return ByteBuffersAsyncRequestBody.of(immutableCopy); + } + + /** + * Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} array without copying the contents of each + * {@link ByteBuffer}. This introduces concurrency risks, allowing the caller to modify any {@link ByteBuffer} stored in this + * {@code AsyncRequestBody} implementation. + *

+ * NOTE: This method ignores the current read position of each {@link ByteBuffer}. Use + * {@link #fromRemainingByteBuffers(ByteBuffer...)} if you need it to copy only the remaining readable bytes. + * + *

As the method name implies, this is unsafe. Use {@link #fromByteBuffers(ByteBuffer...)} unless you're sure you know the + * risks. + * + * @param byteBuffers ByteBuffer array to send to the service. + * @return AsyncRequestBody instance. + */ + static AsyncRequestBody fromByteBuffersUnsafe(ByteBuffer... byteBuffers) { + ByteBuffer[] readOnlyBuffers = Arrays.stream(byteBuffers) + .map(ByteBuffer::asReadOnlyBuffer) + .peek(ByteBuffer::rewind) + .toArray(ByteBuffer[]::new); + return ByteBuffersAsyncRequestBody.of(readOnlyBuffers); + } + + /** + * Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} array without copying the contents of each + * {@link ByteBuffer}. This introduces concurrency risks, allowing the caller to modify any {@link ByteBuffer} stored in this + * {@code AsyncRequestBody} implementation. + *

Unlike {@link #fromByteBuffersUnsafe(ByteBuffer...)}, + * this method respects the current read position of each buffer and reads only the remaining bytes. + * + *

As the method name implies, this is unsafe. Use {@link #fromByteBuffers(ByteBuffer...)} unless you're sure you know the + * risks. + * + * @param byteBuffers ByteBuffer array to send to the service. + * @return AsyncRequestBody instance. + */ + static AsyncRequestBody fromRemainingByteBuffersUnsafe(ByteBuffer... byteBuffers) { + ByteBuffer[] readOnlyBuffers = Arrays.stream(byteBuffers) + .map(ByteBuffer::asReadOnlyBuffer) + .toArray(ByteBuffer[]::new); + return ByteBuffersAsyncRequestBody.of(readOnlyBuffers); } /** - * Creates a {@link AsyncRequestBody} from a {@link InputStream}. + * Creates an {@link AsyncRequestBody} from an {@link InputStream}. * *

An {@link ExecutorService} is required in order to perform the blocking data reads, to prevent blocking the * non-blocking event loop threads owned by the SDK. @@ -242,7 +395,7 @@ static BlockingOutputStreamAsyncRequestBody forBlockingOutputStream(Long content } /** - * Creates a {@link AsyncRequestBody} with no content. + * Creates an {@link AsyncRequestBody} with no content. * * @return AsyncRequestBody instance. */ diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ByteArrayAsyncRequestBody.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ByteArrayAsyncRequestBody.java deleted file mode 100644 index 29205479b798..000000000000 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ByteArrayAsyncRequestBody.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/apache2.0 - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - -package software.amazon.awssdk.core.internal.async; - -import java.nio.ByteBuffer; -import java.util.Optional; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; -import software.amazon.awssdk.annotations.SdkInternalApi; -import software.amazon.awssdk.core.async.AsyncRequestBody; -import software.amazon.awssdk.utils.Logger; - -/** - * An implementation of {@link AsyncRequestBody} for providing data from memory. This is created using static - * methods on {@link AsyncRequestBody} - * - * @see AsyncRequestBody#fromBytes(byte[]) - * @see AsyncRequestBody#fromByteBuffer(ByteBuffer) - * @see AsyncRequestBody#fromString(String) - */ -@SdkInternalApi -public final class ByteArrayAsyncRequestBody implements AsyncRequestBody { - private static final Logger log = Logger.loggerFor(ByteArrayAsyncRequestBody.class); - - private final byte[] bytes; - - private final String mimetype; - - public ByteArrayAsyncRequestBody(byte[] bytes, String mimetype) { - this.bytes = bytes.clone(); - this.mimetype = mimetype; - } - - @Override - public Optional contentLength() { - return Optional.of((long) bytes.length); - } - - @Override - public String contentType() { - return mimetype; - } - - @Override - public void subscribe(Subscriber s) { - // As per rule 1.9 we must throw NullPointerException if the subscriber parameter is null - if (s == null) { - throw new NullPointerException("Subscription MUST NOT be null."); - } - - // As per 2.13, this method must return normally (i.e. not throw). - try { - s.onSubscribe( - new Subscription() { - private boolean done = false; - - @Override - public void request(long n) { - if (done) { - return; - } - if (n > 0) { - done = true; - s.onNext(ByteBuffer.wrap(bytes)); - s.onComplete(); - } else { - s.onError(new IllegalArgumentException("§3.9: non-positive requests are not allowed!")); - } - } - - @Override - public void cancel() { - synchronized (this) { - if (!done) { - done = true; - } - } - } - } - ); - } catch (Throwable ex) { - log.error(() -> s + " violated the Reactive Streams rule 2.13 by throwing an exception from onSubscribe.", ex); - } - } -} diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ByteBuffersAsyncRequestBody.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ByteBuffersAsyncRequestBody.java new file mode 100644 index 000000000000..e7e9d00dd0e5 --- /dev/null +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ByteBuffersAsyncRequestBody.java @@ -0,0 +1,157 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.core.internal.async; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import software.amazon.awssdk.annotations.SdkInternalApi; +import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.core.internal.util.Mimetype; +import software.amazon.awssdk.utils.BinaryUtils; +import software.amazon.awssdk.utils.Logger; + +/** + * An implementation of {@link AsyncRequestBody} for providing data from the supplied {@link ByteBuffer} array. This is created + * using static methods on {@link AsyncRequestBody} + * + * @see AsyncRequestBody#fromBytes(byte[]) + * @see AsyncRequestBody#fromBytesUnsafe(byte[]) + * @see AsyncRequestBody#fromByteBuffer(ByteBuffer) + * @see AsyncRequestBody#fromByteBufferUnsafe(ByteBuffer) + * @see AsyncRequestBody#fromByteBuffers(ByteBuffer...) + * @see AsyncRequestBody#fromByteBuffersUnsafe(ByteBuffer...) + * @see AsyncRequestBody#fromString(String) + */ +@SdkInternalApi +public final class ByteBuffersAsyncRequestBody implements AsyncRequestBody { + private static final Logger log = Logger.loggerFor(ByteBuffersAsyncRequestBody.class); + + private final String mimetype; + private final Long length; + private final ByteBuffer[] buffers; + + private ByteBuffersAsyncRequestBody(String mimetype, Long length, ByteBuffer... buffers) { + this.mimetype = mimetype; + this.length = length; + this.buffers = buffers; + } + + @Override + public Optional contentLength() { + return Optional.ofNullable(length); + } + + @Override + public String contentType() { + return mimetype; + } + + @Override + public void subscribe(Subscriber s) { + // As per rule 1.9 we must throw NullPointerException if the subscriber parameter is null + if (s == null) { + throw new NullPointerException("Subscription MUST NOT be null."); + } + + // As per 2.13, this method must return normally (i.e. not throw). + try { + s.onSubscribe( + new Subscription() { + private final AtomicInteger index = new AtomicInteger(0); + private final AtomicBoolean completed = new AtomicBoolean(false); + + @Override + public void request(long n) { + if (completed.get()) { + return; + } + + if (n > 0) { + int i = index.getAndIncrement(); + + if (i >= buffers.length) { + return; + } + + long remaining = n; + + do { + ByteBuffer buffer = buffers[i]; + + // Pending discussions on https://github.com/aws/aws-sdk-java-v2/issues/3928 + if (buffer.isDirect()) { + buffer = BinaryUtils.toNonDirectBuffer(buffer); + } + + s.onNext(buffer.asReadOnlyBuffer()); + remaining--; + } while (remaining > 0 && (i = index.getAndIncrement()) < buffers.length); + + if (i >= buffers.length - 1 && completed.compareAndSet(false, true)) { + s.onComplete(); + } + } else { + s.onError(new IllegalArgumentException("§3.9: non-positive requests are not allowed!")); + } + } + + @Override + public void cancel() { + completed.set(true); + } + } + ); + } catch (Throwable ex) { + log.error(() -> s + " violated the Reactive Streams rule 2.13 by throwing an exception from onSubscribe.", ex); + } + } + + public static ByteBuffersAsyncRequestBody of(ByteBuffer... buffers) { + long length = Arrays.stream(buffers) + .mapToLong(ByteBuffer::remaining) + .sum(); + return new ByteBuffersAsyncRequestBody(Mimetype.MIMETYPE_OCTET_STREAM, length, buffers); + } + + public static ByteBuffersAsyncRequestBody of(Long length, ByteBuffer... buffers) { + return new ByteBuffersAsyncRequestBody(Mimetype.MIMETYPE_OCTET_STREAM, length, buffers); + } + + public static ByteBuffersAsyncRequestBody of(String mimetype, ByteBuffer... buffers) { + long length = Arrays.stream(buffers) + .mapToLong(ByteBuffer::remaining) + .sum(); + return new ByteBuffersAsyncRequestBody(mimetype, length, buffers); + } + + public static ByteBuffersAsyncRequestBody of(String mimetype, Long length, ByteBuffer... buffers) { + return new ByteBuffersAsyncRequestBody(mimetype, length, buffers); + } + + public static ByteBuffersAsyncRequestBody from(byte[] bytes) { + return new ByteBuffersAsyncRequestBody(Mimetype.MIMETYPE_OCTET_STREAM, (long) bytes.length, + ByteBuffer.wrap(bytes)); + } + + public static ByteBuffersAsyncRequestBody from(String mimetype, byte[] bytes) { + return new ByteBuffersAsyncRequestBody(mimetype, (long) bytes.length, ByteBuffer.wrap(bytes)); + } +} diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ChunkBuffer.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ChunkBuffer.java index 8fd7f0260b76..93d6d09578a6 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ChunkBuffer.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ChunkBuffer.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.concurrent.atomic.AtomicLong; import software.amazon.awssdk.annotations.SdkInternalApi; +import software.amazon.awssdk.utils.BinaryUtils; import software.amazon.awssdk.utils.Validate; import software.amazon.awssdk.utils.builder.SdkBuilder; @@ -58,10 +59,11 @@ public synchronized Iterable bufferAndCreateChunks(ByteBuffer buffer int availableToRead = bufferSize - bufferedBytes; int bytesToMove = Math.min(availableToRead, currentBytesRead - startPosition); + byte[] bytes = BinaryUtils.copyAllBytesFrom(buffer); if (bufferedBytes == 0) { - currentBuffer.put(buffer.array(), startPosition, bytesToMove); + currentBuffer.put(bytes, startPosition, bytesToMove); } else { - currentBuffer.put(buffer.array(), 0, bytesToMove); + currentBuffer.put(bytes, 0, bytesToMove); } startPosition = startPosition + bytesToMove; diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/ByteArrayAsyncRequestBodyTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/ByteArrayAsyncRequestBodyTest.java deleted file mode 100644 index 378fbf2f59c3..000000000000 --- a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/ByteArrayAsyncRequestBodyTest.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/apache2.0 - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - -package software.amazon.awssdk.core.internal.async; - -import static org.junit.jupiter.api.Assertions.assertTrue; - -import java.nio.ByteBuffer; -import java.util.concurrent.atomic.AtomicBoolean; -import org.junit.jupiter.api.Test; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; -import software.amazon.awssdk.core.internal.util.Mimetype; - -public class ByteArrayAsyncRequestBodyTest { - private class testSubscriber implements Subscriber { - private Subscription subscription; - protected AtomicBoolean onCompleteCalled = new AtomicBoolean(false); - - @Override - public void onSubscribe(Subscription s) { - this.subscription = s; - s.request(1); - } - - @Override - public void onNext(ByteBuffer byteBuffer) { - - } - - @Override - public void onError(Throwable throwable) { - - } - - @Override - public void onComplete() { - subscription.request(1); - onCompleteCalled.set(true); - } - } - - testSubscriber subscriber = new testSubscriber(); - - @Test - public void concurrentRequests_shouldCompleteNormally() { - ByteArrayAsyncRequestBody byteArrayReq = new ByteArrayAsyncRequestBody("Hello World!".getBytes(), - Mimetype.MIMETYPE_OCTET_STREAM); - byteArrayReq.subscribe(subscriber); - assertTrue(subscriber.onCompleteCalled.get()); - } - -} diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/ByteBuffersAsyncRequestBodyTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/ByteBuffersAsyncRequestBodyTest.java new file mode 100644 index 000000000000..b4073247f8b9 --- /dev/null +++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/ByteBuffersAsyncRequestBodyTest.java @@ -0,0 +1,227 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.core.internal.async; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; +import org.junit.jupiter.api.Test; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.utils.BinaryUtils; + +class ByteBuffersAsyncRequestBodyTest { + + private static class TestSubscriber implements Subscriber { + private Subscription subscription; + private boolean onCompleteCalled = false; + private int callsToComplete = 0; + private final List publishedResults = Collections.synchronizedList(new ArrayList<>()); + + public void request(long n) { + subscription.request(n); + } + + @Override + public void onSubscribe(Subscription s) { + this.subscription = s; + } + + @Override + public void onNext(ByteBuffer byteBuffer) { + publishedResults.add(byteBuffer); + } + + @Override + public void onError(Throwable throwable) { + throw new IllegalStateException(throwable); + } + + @Override + public void onComplete() { + onCompleteCalled = true; + callsToComplete++; + } + } + + @Test + public void subscriberIsMarkedAsCompleted() { + AsyncRequestBody requestBody = ByteBuffersAsyncRequestBody.from("Hello World!".getBytes(StandardCharsets.UTF_8)); + + TestSubscriber subscriber = new TestSubscriber(); + requestBody.subscribe(subscriber); + subscriber.request(1); + + assertTrue(subscriber.onCompleteCalled); + assertEquals(1, subscriber.publishedResults.size()); + } + + @Test + public void subscriberIsMarkedAsCompletedWhenARequestIsMadeForMoreBuffersThanAreAvailable() { + AsyncRequestBody requestBody = ByteBuffersAsyncRequestBody.from("Hello World!".getBytes(StandardCharsets.UTF_8)); + + TestSubscriber subscriber = new TestSubscriber(); + requestBody.subscribe(subscriber); + subscriber.request(2); + + assertTrue(subscriber.onCompleteCalled); + assertEquals(1, subscriber.publishedResults.size()); + } + + @Test + public void subscriberIsThreadSafeAndMarkedAsCompletedExactlyOnce() throws InterruptedException { + int numBuffers = 100; + AsyncRequestBody requestBody = ByteBuffersAsyncRequestBody.of(IntStream.range(0, numBuffers) + .mapToObj(i -> ByteBuffer.wrap(new byte[1])) + .toArray(ByteBuffer[]::new)); + + TestSubscriber subscriber = new TestSubscriber(); + requestBody.subscribe(subscriber); + + int parallelism = 8; + ExecutorService executorService = Executors.newFixedThreadPool(parallelism); + for (int i = 0; i < parallelism; i++) { + executorService.submit(() -> { + for (int j = 0; j < numBuffers; j++) { + subscriber.request(2); + } + }); + } + executorService.shutdown(); + executorService.awaitTermination(1, TimeUnit.MINUTES); + + assertTrue(subscriber.onCompleteCalled); + assertEquals(1, subscriber.callsToComplete); + assertEquals(numBuffers, subscriber.publishedResults.size()); + } + + @Test + public void subscriberIsNotMarkedAsCompletedWhenThereAreRemainingBuffersToPublish() { + byte[] helloWorld = "Hello World!".getBytes(StandardCharsets.UTF_8); + byte[] goodbyeWorld = "Goodbye World!".getBytes(StandardCharsets.UTF_8); + AsyncRequestBody requestBody = ByteBuffersAsyncRequestBody.of((long) (helloWorld.length + goodbyeWorld.length), + ByteBuffer.wrap(helloWorld), + ByteBuffer.wrap(goodbyeWorld)); + + TestSubscriber subscriber = new TestSubscriber(); + requestBody.subscribe(subscriber); + subscriber.request(1); + + assertFalse(subscriber.onCompleteCalled); + assertEquals(1, subscriber.publishedResults.size()); + } + + @Test + public void subscriberReceivesAllBuffers() { + byte[] helloWorld = "Hello World!".getBytes(StandardCharsets.UTF_8); + byte[] goodbyeWorld = "Goodbye World!".getBytes(StandardCharsets.UTF_8); + + AsyncRequestBody requestBody = ByteBuffersAsyncRequestBody.of((long) (helloWorld.length + goodbyeWorld.length), + ByteBuffer.wrap(helloWorld), + ByteBuffer.wrap(goodbyeWorld)); + + TestSubscriber subscriber = new TestSubscriber(); + requestBody.subscribe(subscriber); + subscriber.request(2); + + assertEquals(2, subscriber.publishedResults.size()); + assertTrue(subscriber.onCompleteCalled); + assertArrayEquals(helloWorld, BinaryUtils.copyAllBytesFrom(subscriber.publishedResults.get(0))); + assertArrayEquals(goodbyeWorld, BinaryUtils.copyAllBytesFrom(subscriber.publishedResults.get(1))); + } + + @Test + public void multipleSubscribersReceiveTheSameResults() { + ByteBuffer sourceBuffer = ByteBuffer.wrap("Hello World!".getBytes(StandardCharsets.UTF_8)); + AsyncRequestBody requestBody = ByteBuffersAsyncRequestBody.of(sourceBuffer); + + TestSubscriber subscriber = new TestSubscriber(); + requestBody.subscribe(subscriber); + subscriber.request(1); + TestSubscriber otherSubscriber = new TestSubscriber(); + requestBody.subscribe(otherSubscriber); + otherSubscriber.request(1); + + ByteBuffer publishedBuffer = subscriber.publishedResults.get(0); + ByteBuffer otherPublishedBuffer = otherSubscriber.publishedResults.get(0); + + assertEquals(publishedBuffer, otherPublishedBuffer); + } + + @Test + public void canceledSubscriberDoesNotReturnNewResults() { + AsyncRequestBody requestBody = ByteBuffersAsyncRequestBody.of(ByteBuffer.wrap(new byte[0])); + + TestSubscriber subscriber = new TestSubscriber(); + requestBody.subscribe(subscriber); + + subscriber.subscription.cancel(); + subscriber.request(1); + + assertTrue(subscriber.publishedResults.isEmpty()); + } + + // Pending discussions on https://github.com/aws/aws-sdk-java-v2/issues/3928 + @Test + public void directBuffersAreCoppiedToNonDirectBuffers() { + byte[] bytes = "Hello World!".getBytes(StandardCharsets.UTF_8); + ByteBuffer buffer = ByteBuffer.allocateDirect(bytes.length) + .put(bytes); + buffer.flip(); + AsyncRequestBody requestBody = ByteBuffersAsyncRequestBody.of(buffer); + + TestSubscriber subscriber = new TestSubscriber(); + requestBody.subscribe(subscriber); + subscriber.request(1); + + ByteBuffer publishedBuffer = subscriber.publishedResults.get(0); + assertFalse(publishedBuffer.isDirect()); + byte[] publishedBytes = new byte[publishedBuffer.remaining()]; + publishedBuffer.get(publishedBytes); + assertArrayEquals(bytes, publishedBytes); + } + + @Test + public void staticOfByteBufferConstructorSetsLengthBasedOnBufferRemaining() { + ByteBuffer bb1 = ByteBuffer.allocate(2); + ByteBuffer bb2 = ByteBuffer.allocate(2); + bb2.position(1); + ByteBuffersAsyncRequestBody body = ByteBuffersAsyncRequestBody.of(bb1, bb2); + assertTrue(body.contentLength().isPresent()); + assertEquals(bb1.remaining() + bb2.remaining(), body.contentLength().get()); + } + + @Test + public void staticFromBytesConstructorSetsLengthBasedOnArrayLength() { + byte[] bytes = new byte[2]; + ByteBuffersAsyncRequestBody body = ByteBuffersAsyncRequestBody.from(bytes); + assertTrue(body.contentLength().isPresent()); + assertEquals(bytes.length, body.contentLength().get()); + } + +} diff --git a/utils/src/main/java/software/amazon/awssdk/utils/BinaryUtils.java b/utils/src/main/java/software/amazon/awssdk/utils/BinaryUtils.java index e7fd8c015e1d..192ea7cead9b 100644 --- a/utils/src/main/java/software/amazon/awssdk/utils/BinaryUtils.java +++ b/utils/src/main/java/software/amazon/awssdk/utils/BinaryUtils.java @@ -117,6 +117,80 @@ public static ByteArrayInputStream toStream(ByteBuffer byteBuffer) { return new ByteArrayInputStream(copyBytesFrom(byteBuffer)); } + /** + * Returns an immutable copy of the given {@code ByteBuffer}. + *

+ * The new buffer's position will be set to the position of the given {@code ByteBuffer}, but the mark if defined will be + * ignored. + *

+ * NOTE: this method intentionally converts direct buffers to non-direct though there is no guarantee this will always + * be the case, if this is required see {@link #toNonDirectBuffer(ByteBuffer)} + * + * @param bb the source {@code ByteBuffer} to copy. + * @return a read only {@code ByteBuffer}. + */ + public static ByteBuffer immutableCopyOf(ByteBuffer bb) { + if (bb == null) { + return null; + } + int sourceBufferPosition = bb.position(); + ByteBuffer readOnlyCopy = bb.asReadOnlyBuffer(); + readOnlyCopy.rewind(); + ByteBuffer cloned = ByteBuffer.allocate(readOnlyCopy.capacity()) + .put(readOnlyCopy); + cloned.position(sourceBufferPosition); + return cloned.asReadOnlyBuffer(); + } + + /** + * Returns an immutable copy of the remaining bytes of the given {@code ByteBuffer}. + *

+ * NOTE: this method intentionally converts direct buffers to non-direct though there is no guarantee this will always + * be the case, if this is required see {@link #toNonDirectBuffer(ByteBuffer)} + * + * @param bb the source {@code ByteBuffer} to copy. + * @return a read only {@code ByteBuffer}. + */ + public static ByteBuffer immutableCopyOfRemaining(ByteBuffer bb) { + if (bb == null) { + return null; + } + ByteBuffer readOnlyCopy = bb.asReadOnlyBuffer(); + ByteBuffer cloned = ByteBuffer.allocate(readOnlyCopy.remaining()) + .put(readOnlyCopy); + cloned.flip(); + return cloned.asReadOnlyBuffer(); + } + + /** + * Returns a copy of the given {@code DirectByteBuffer} from its current position as a non-direct {@code HeapByteBuffer} + *

+ * The new buffer's position will be set to the position of the given {@code ByteBuffer}, but the mark if defined will be + * ignored. + * + * @param bb the source {@code ByteBuffer} to copy. + * @return {@code ByteBuffer}. + */ + public static ByteBuffer toNonDirectBuffer(ByteBuffer bb) { + if (bb == null) { + return null; + } + if (!bb.isDirect()) { + throw new IllegalArgumentException("Provided ByteBuffer is already non-direct"); + } + int sourceBufferPosition = bb.position(); + ByteBuffer readOnlyCopy = bb.asReadOnlyBuffer(); + readOnlyCopy.rewind(); + ByteBuffer cloned = ByteBuffer.allocate(bb.capacity()) + .put(readOnlyCopy); + cloned.rewind(); + cloned.position(sourceBufferPosition); + if (bb.isReadOnly()) { + return cloned.asReadOnlyBuffer(); + } + return cloned; + } + /** * Returns a copy of all the bytes from the given ByteBuffer, * from the beginning to the buffer's limit; or null if the input is null. diff --git a/utils/src/test/java/software/amazon/awssdk/utils/BinaryUtilsTest.java b/utils/src/test/java/software/amazon/awssdk/utils/BinaryUtilsTest.java index 5f255d347adc..4e416ea9e3b6 100644 --- a/utils/src/test/java/software/amazon/awssdk/utils/BinaryUtilsTest.java +++ b/utils/src/test/java/software/amazon/awssdk/utils/BinaryUtilsTest.java @@ -16,9 +16,11 @@ package software.amazon.awssdk.utils; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import java.nio.ByteBuffer; @@ -32,13 +34,11 @@ public class BinaryUtilsTest { public void testHex() { { String hex = BinaryUtils.toHex(new byte[] {0}); - System.out.println(hex); String hex2 = Base16Lower.encodeAsString(new byte[] {0}); assertEquals(hex, hex2); } { String hex = BinaryUtils.toHex(new byte[] {-1}); - System.out.println(hex); String hex2 = Base16Lower.encodeAsString(new byte[] {-1}); assertEquals(hex, hex2); } @@ -169,7 +169,7 @@ public void testCopyRemainingBytesFrom_nullBuffer() { @Test public void testCopyRemainingBytesFrom_noRemainingBytes() { ByteBuffer bb = ByteBuffer.allocate(1); - bb.put(new byte[]{1}); + bb.put(new byte[] {1}); bb.flip(); bb.get(); @@ -180,7 +180,7 @@ public void testCopyRemainingBytesFrom_noRemainingBytes() { @Test public void testCopyRemainingBytesFrom_fullBuffer() { ByteBuffer bb = ByteBuffer.allocate(4); - bb.put(new byte[]{1, 2, 3, 4}); + bb.put(new byte[] {1, 2, 3, 4}); bb.flip(); byte[] copy = BinaryUtils.copyRemainingBytesFrom(bb); @@ -191,7 +191,7 @@ public void testCopyRemainingBytesFrom_fullBuffer() { @Test public void testCopyRemainingBytesFrom_partiallyReadBuffer() { ByteBuffer bb = ByteBuffer.allocate(4); - bb.put(new byte[]{1, 2, 3, 4}); + bb.put(new byte[] {1, 2, 3, 4}); bb.flip(); bb.get(); @@ -201,4 +201,137 @@ public void testCopyRemainingBytesFrom_partiallyReadBuffer() { assertThat(bb).isEqualTo(ByteBuffer.wrap(copy)); assertThat(copy).hasSize(2); } + + @Test + public void testImmutableCopyOfByteBuffer() { + ByteBuffer sourceBuffer = ByteBuffer.allocate(4); + byte[] originalBytesInSource = {1, 2, 3, 4}; + sourceBuffer.put(originalBytesInSource); + sourceBuffer.flip(); + + ByteBuffer immutableCopy = BinaryUtils.immutableCopyOf(sourceBuffer); + + byte[] bytesInSourceAfterCopy = {-1, -2, -3, -4}; + sourceBuffer.put(bytesInSourceAfterCopy); + sourceBuffer.flip(); + + assertTrue(immutableCopy.isReadOnly()); + byte[] fromImmutableCopy = new byte[originalBytesInSource.length]; + immutableCopy.get(fromImmutableCopy); + assertArrayEquals(originalBytesInSource, fromImmutableCopy); + + assertEquals(0, sourceBuffer.position()); + byte[] fromSource = new byte[bytesInSourceAfterCopy.length]; + sourceBuffer.get(fromSource); + assertArrayEquals(bytesInSourceAfterCopy, fromSource); + } + + @Test + public void testImmutableCopyOfByteBuffer_nullBuffer() { + assertNull(BinaryUtils.immutableCopyOf(null)); + } + + @Test + public void testImmutableCopyOfByteBuffer_partiallyReadBuffer() { + ByteBuffer sourceBuffer = ByteBuffer.allocate(4); + byte[] bytes = {1, 2, 3, 4}; + sourceBuffer.put(bytes); + sourceBuffer.position(2); + + ByteBuffer immutableCopy = BinaryUtils.immutableCopyOf(sourceBuffer); + + assertEquals(sourceBuffer.position(), immutableCopy.position()); + immutableCopy.rewind(); + byte[] fromImmutableCopy = new byte[bytes.length]; + immutableCopy.get(fromImmutableCopy); + assertArrayEquals(bytes, fromImmutableCopy); + } + + @Test + public void testImmutableCopyOfRemainingByteBuffer() { + ByteBuffer sourceBuffer = ByteBuffer.allocate(4); + byte[] originalBytesInSource = {1, 2, 3, 4}; + sourceBuffer.put(originalBytesInSource); + sourceBuffer.flip(); + + ByteBuffer immutableCopy = BinaryUtils.immutableCopyOfRemaining(sourceBuffer); + + byte[] bytesInSourceAfterCopy = {-1, -2, -3, -4}; + sourceBuffer.put(bytesInSourceAfterCopy); + sourceBuffer.flip(); + + assertTrue(immutableCopy.isReadOnly()); + byte[] fromImmutableCopy = new byte[originalBytesInSource.length]; + immutableCopy.get(fromImmutableCopy); + assertArrayEquals(originalBytesInSource, fromImmutableCopy); + + assertEquals(0, sourceBuffer.position()); + byte[] fromSource = new byte[bytesInSourceAfterCopy.length]; + sourceBuffer.get(fromSource); + assertArrayEquals(bytesInSourceAfterCopy, fromSource); + } + + @Test + public void testImmutableCopyOfByteBufferRemaining_nullBuffer() { + assertNull(BinaryUtils.immutableCopyOfRemaining(null)); + } + + @Test + public void testImmutableCopyOfByteBufferRemaining_partiallyReadBuffer() { + ByteBuffer sourceBuffer = ByteBuffer.allocate(4); + byte[] bytes = {1, 2, 3, 4}; + sourceBuffer.put(bytes); + sourceBuffer.position(2); + + ByteBuffer immutableCopy = BinaryUtils.immutableCopyOfRemaining(sourceBuffer); + + assertEquals(2, immutableCopy.capacity()); + assertEquals(2, immutableCopy.remaining()); + assertEquals(0, immutableCopy.position()); + assertEquals((byte) 3, immutableCopy.get()); + assertEquals((byte) 4, immutableCopy.get()); + } + + @Test + public void testToNonDirectBuffer() { + ByteBuffer bb = ByteBuffer.allocateDirect(4); + byte[] expected = {1, 2, 3, 4}; + bb.put(expected); + bb.flip(); + + ByteBuffer nonDirectBuffer = BinaryUtils.toNonDirectBuffer(bb); + + assertFalse(nonDirectBuffer.isDirect()); + byte[] bytes = new byte[expected.length]; + nonDirectBuffer.get(bytes); + assertArrayEquals(expected, bytes); + } + + @Test + public void testToNonDirectBuffer_nullBuffer() { + assertNull(BinaryUtils.toNonDirectBuffer(null)); + } + + @Test + public void testToNonDirectBuffer_partiallyReadBuffer() { + ByteBuffer sourceBuffer = ByteBuffer.allocateDirect(4); + byte[] bytes = {1, 2, 3, 4}; + sourceBuffer.put(bytes); + sourceBuffer.position(2); + + ByteBuffer nonDirectBuffer = BinaryUtils.toNonDirectBuffer(sourceBuffer); + + assertEquals(sourceBuffer.position(), nonDirectBuffer.position()); + nonDirectBuffer.rewind(); + byte[] fromNonDirectBuffer = new byte[bytes.length]; + nonDirectBuffer.get(fromNonDirectBuffer); + assertArrayEquals(bytes, fromNonDirectBuffer); + } + + @Test + public void testToNonDirectBuffer_nonDirectBuffer() { + ByteBuffer nonDirectBuffer = ByteBuffer.allocate(0); + assertThrows(IllegalArgumentException.class, () -> BinaryUtils.toNonDirectBuffer(nonDirectBuffer)); + } + }