diff --git a/.changes/next-release/feature-AWSSDKforJavav2-5d806ad.json b/.changes/next-release/feature-AWSSDKforJavav2-5d806ad.json
new file mode 100644
index 000000000000..097e330352d5
--- /dev/null
+++ b/.changes/next-release/feature-AWSSDKforJavav2-5d806ad.json
@@ -0,0 +1,6 @@
+{
+ "type": "feature",
+ "category": "AWS SDK for Java v2",
+ "contributor": "StephenFlavin",
+ "description": "Add \"unsafe\" AsyncRequestBody constructors for byte arrays and ByteBuffers"
+}
diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java
index 7a1738f51d97..052ec4b95a6e 100644
--- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java
+++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java
@@ -22,37 +22,38 @@
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
+import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import software.amazon.awssdk.annotations.SdkPublicApi;
-import software.amazon.awssdk.core.internal.async.ByteArrayAsyncRequestBody;
+import software.amazon.awssdk.core.internal.async.ByteBuffersAsyncRequestBody;
import software.amazon.awssdk.core.internal.async.FileAsyncRequestBody;
import software.amazon.awssdk.core.internal.async.InputStreamWithExecutorAsyncRequestBody;
import software.amazon.awssdk.core.internal.util.Mimetype;
import software.amazon.awssdk.utils.BinaryUtils;
/**
- * Interface to allow non-blocking streaming of request content. This follows the reactive streams pattern where
- * this interface is the {@link Publisher} of data (specifically {@link ByteBuffer} chunks) and the HTTP client is the Subscriber
- * of the data (i.e. to write that data on the wire).
+ * Interface to allow non-blocking streaming of request content. This follows the reactive streams pattern where this interface is
+ * the {@link Publisher} of data (specifically {@link ByteBuffer} chunks) and the HTTP client is the Subscriber of the data (i.e.
+ * to write that data on the wire).
*
*
* {@link #subscribe(Subscriber)} should be implemented to tie this publisher to a subscriber. Ideally each call to subscribe
- * should reproduce the content (i.e if you are reading from a file each subscribe call should produce a {@link
- * org.reactivestreams.Subscription} that reads the file fully). This allows for automatic retries to be performed in the SDK. If
- * the content is not reproducible, an exception may be thrown from any subsequent {@link #subscribe(Subscriber)} calls.
+ * should reproduce the content (i.e if you are reading from a file each subscribe call should produce a
+ * {@link org.reactivestreams.Subscription} that reads the file fully). This allows for automatic retries to be performed in the
+ * SDK. If the content is not reproducible, an exception may be thrown from any subsequent {@link #subscribe(Subscriber)} calls.
*
*
*
- * It is important to only send the number of chunks that the subscriber requests to avoid out of memory situations.
- * The subscriber does it's own buffering so it's usually not needed to buffer in the publisher. Additional permits
- * for chunks will be notified via the {@link org.reactivestreams.Subscription#request(long)} method.
+ * It is important to only send the number of chunks that the subscriber requests to avoid out of memory situations. The
+ * subscriber does it's own buffering so it's usually not needed to buffer in the publisher. Additional permits for chunks will be
+ * notified via the {@link org.reactivestreams.Subscription#request(long)} method.
*
*
* @see FileAsyncRequestBody
- * @see ByteArrayAsyncRequestBody
+ * @see ByteBuffersAsyncRequestBody
*/
@SdkPublicApi
public interface AsyncRequestBody extends SdkPublisher {
@@ -70,8 +71,8 @@ default String contentType() {
}
/**
- * Creates an {@link AsyncRequestBody} the produces data from the input ByteBuffer publisher.
- * The data is delivered when the publisher publishes the data.
+ * Creates an {@link AsyncRequestBody} the produces data from the input ByteBuffer publisher. The data is delivered when the
+ * publisher publishes the data.
*
* @param publisher Publisher of source data
* @return Implementation of {@link AsyncRequestBody} that produces data send by the publisher
@@ -124,11 +125,11 @@ static AsyncRequestBody fromFile(File file) {
* @param string The string to provide.
* @param cs The {@link Charset} to use.
* @return Implementation of {@link AsyncRequestBody} that uses the specified string.
- * @see ByteArrayAsyncRequestBody
+ * @see ByteBuffersAsyncRequestBody
*/
static AsyncRequestBody fromString(String string, Charset cs) {
- return new ByteArrayAsyncRequestBody(string.getBytes(cs),
- Mimetype.MIMETYPE_TEXT_PLAIN + "; charset=" + cs.name());
+ return ByteBuffersAsyncRequestBody.from(Mimetype.MIMETYPE_TEXT_PLAIN + "; charset=" + cs.name(),
+ string.getBytes(cs));
}
/**
@@ -143,29 +144,96 @@ static AsyncRequestBody fromString(String string) {
}
/**
- * Creates a {@link AsyncRequestBody} from a byte array. The contents of the byte array are copied so modifications to the
- * original byte array are not reflected in the {@link AsyncRequestBody}.
+ * Creates an {@link AsyncRequestBody} from a byte array. This will copy the contents of the byte array to prevent
+ * modifications to the provided byte array from being reflected in the {@link AsyncRequestBody}.
*
* @param bytes The bytes to send to the service.
* @return AsyncRequestBody instance.
*/
static AsyncRequestBody fromBytes(byte[] bytes) {
- return new ByteArrayAsyncRequestBody(bytes, Mimetype.MIMETYPE_OCTET_STREAM);
+ byte[] clonedBytes = bytes.clone();
+ return ByteBuffersAsyncRequestBody.from(clonedBytes);
}
/**
- * Creates a {@link AsyncRequestBody} from a {@link ByteBuffer}. Buffer contents are copied so any modifications
- * made to the original {@link ByteBuffer} are not reflected in the {@link AsyncRequestBody}.
+ * Creates an {@link AsyncRequestBody} from a byte array without copying the contents of the byte array. This
+ * introduces concurrency risks, allowing: (1) the caller to modify the byte array stored in this {@code AsyncRequestBody}
+ * implementation AND (2) any users of {@link #fromBytesUnsafe(byte[])} to modify the byte array passed into this
+ * {@code AsyncRequestBody} implementation.
+ *
+ * As the method name implies, this is unsafe. Use {@link #fromBytes(byte[])} unless you're sure you know the risks.
+ *
+ * @param bytes The bytes to send to the service.
+ * @return AsyncRequestBody instance.
+ */
+ static AsyncRequestBody fromBytesUnsafe(byte[] bytes) {
+ return ByteBuffersAsyncRequestBody.from(bytes);
+ }
+
+ /**
+ * Creates an {@link AsyncRequestBody} from a {@link ByteBuffer}. This will copy the contents of the {@link ByteBuffer} to
+ * prevent modifications to the provided {@link ByteBuffer} from being reflected in the {@link AsyncRequestBody}.
+ *
The position is set to 0 in the copied {@link ByteBuffer} and the mark if defined is discarded.
*
* @param byteBuffer ByteBuffer to send to the service.
* @return AsyncRequestBody instance.
*/
static AsyncRequestBody fromByteBuffer(ByteBuffer byteBuffer) {
- return fromBytes(BinaryUtils.copyAllBytesFrom(byteBuffer));
+ ByteBuffer immutableCopy = BinaryUtils.immutableCopyOf(byteBuffer);
+ immutableCopy.rewind();
+ return ByteBuffersAsyncRequestBody.of(null, immutableCopy);
+ }
+
+ /**
+ * Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} without copying the contents of the
+ * {@link ByteBuffer}. This introduces concurrency risks, allowing: (1) the caller to modify the {@link ByteBuffer} stored in
+ * this {@code AsyncRequestBody} implementation AND (2) any users of {@link #fromByteBufferUnsafe(ByteBuffer)} to modify the
+ * {@link ByteBuffer} passed into this {@code AsyncRequestBody} implementation.
+ *
+ *
As the method name implies, this is unsafe. Use {@link #fromByteBuffer(ByteBuffer)}} unless you're sure you know the
+ * risks.
+ *
+ * @param byteBuffer ByteBuffer to send to the service.
+ * @return AsyncRequestBody instance.
+ */
+ static AsyncRequestBody fromByteBufferUnsafe(ByteBuffer byteBuffer) {
+ return ByteBuffersAsyncRequestBody.of(null, byteBuffer);
+ }
+
+ /**
+ * Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} array. This will copy the contents of each {@link ByteBuffer}
+ * to prevent modifications to any provided {@link ByteBuffer} from being reflected in the {@link AsyncRequestBody}.
+ *
The position is set to 0 in each copied {@link ByteBuffer} and the mark if defined is discarded.
+ *
+ * @param byteBuffers ByteBuffer array to send to the service.
+ * @return AsyncRequestBody instance.
+ */
+ static AsyncRequestBody fromByteBuffers(ByteBuffer... byteBuffers) {
+ ByteBuffer[] immutableCopy = Arrays.stream(byteBuffers)
+ .map(BinaryUtils::immutableCopyOf)
+ .peek(ByteBuffer::rewind)
+ .toArray(ByteBuffer[]::new);
+ return ByteBuffersAsyncRequestBody.of(null, immutableCopy);
+ }
+
+ /**
+ * Creates an {@link AsyncRequestBody} from a {@link ByteBuffer} array without copying the contents of each
+ * {@link ByteBuffer}. This introduces concurrency risks, allowing: (1) the caller to modify any {@link ByteBuffer} stored in
+ * this {@code AsyncRequestBody} implementation AND (2) any users of {@link #fromByteBufferUnsafe(ByteBuffer)} to modify any
+ * {@link ByteBuffer} passed into this {@code AsyncRequestBody} implementation.
+ *
+ *
As the method name implies, this is unsafe. Use {@link #fromByteBuffers(ByteBuffer...)} unless you're sure you know the
+ * risks.
+ *
+ * @param byteBuffers ByteBuffer array to send to the service.
+ * @return AsyncRequestBody instance.
+ */
+ static AsyncRequestBody fromByteBuffersUnsafe(ByteBuffer... byteBuffers) {
+ return ByteBuffersAsyncRequestBody.of(null, byteBuffers);
}
/**
- * Creates a {@link AsyncRequestBody} from a {@link InputStream}.
+ * Creates an {@link AsyncRequestBody} from an {@link InputStream}.
*
*
An {@link ExecutorService} is required in order to perform the blocking data reads, to prevent blocking the
* non-blocking event loop threads owned by the SDK.
@@ -239,7 +307,7 @@ static BlockingOutputStreamAsyncRequestBody forBlockingOutputStream(Long content
}
/**
- * Creates a {@link AsyncRequestBody} with no content.
+ * Creates an {@link AsyncRequestBody} with no content.
*
* @return AsyncRequestBody instance.
*/
diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ByteArrayAsyncRequestBody.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ByteArrayAsyncRequestBody.java
deleted file mode 100644
index 29205479b798..000000000000
--- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ByteArrayAsyncRequestBody.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License").
- * You may not use this file except in compliance with the License.
- * A copy of the License is located at
- *
- * http://aws.amazon.com/apache2.0
- *
- * or in the "license" file accompanying this file. This file is distributed
- * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package software.amazon.awssdk.core.internal.async;
-
-import java.nio.ByteBuffer;
-import java.util.Optional;
-import org.reactivestreams.Subscriber;
-import org.reactivestreams.Subscription;
-import software.amazon.awssdk.annotations.SdkInternalApi;
-import software.amazon.awssdk.core.async.AsyncRequestBody;
-import software.amazon.awssdk.utils.Logger;
-
-/**
- * An implementation of {@link AsyncRequestBody} for providing data from memory. This is created using static
- * methods on {@link AsyncRequestBody}
- *
- * @see AsyncRequestBody#fromBytes(byte[])
- * @see AsyncRequestBody#fromByteBuffer(ByteBuffer)
- * @see AsyncRequestBody#fromString(String)
- */
-@SdkInternalApi
-public final class ByteArrayAsyncRequestBody implements AsyncRequestBody {
- private static final Logger log = Logger.loggerFor(ByteArrayAsyncRequestBody.class);
-
- private final byte[] bytes;
-
- private final String mimetype;
-
- public ByteArrayAsyncRequestBody(byte[] bytes, String mimetype) {
- this.bytes = bytes.clone();
- this.mimetype = mimetype;
- }
-
- @Override
- public Optional contentLength() {
- return Optional.of((long) bytes.length);
- }
-
- @Override
- public String contentType() {
- return mimetype;
- }
-
- @Override
- public void subscribe(Subscriber super ByteBuffer> s) {
- // As per rule 1.9 we must throw NullPointerException if the subscriber parameter is null
- if (s == null) {
- throw new NullPointerException("Subscription MUST NOT be null.");
- }
-
- // As per 2.13, this method must return normally (i.e. not throw).
- try {
- s.onSubscribe(
- new Subscription() {
- private boolean done = false;
-
- @Override
- public void request(long n) {
- if (done) {
- return;
- }
- if (n > 0) {
- done = true;
- s.onNext(ByteBuffer.wrap(bytes));
- s.onComplete();
- } else {
- s.onError(new IllegalArgumentException("§3.9: non-positive requests are not allowed!"));
- }
- }
-
- @Override
- public void cancel() {
- synchronized (this) {
- if (!done) {
- done = true;
- }
- }
- }
- }
- );
- } catch (Throwable ex) {
- log.error(() -> s + " violated the Reactive Streams rule 2.13 by throwing an exception from onSubscribe.", ex);
- }
- }
-}
diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ByteBuffersAsyncRequestBody.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ByteBuffersAsyncRequestBody.java
new file mode 100644
index 000000000000..22736c081493
--- /dev/null
+++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ByteBuffersAsyncRequestBody.java
@@ -0,0 +1,142 @@
+/*
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/apache2.0
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package software.amazon.awssdk.core.internal.async;
+
+import java.nio.ByteBuffer;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+import software.amazon.awssdk.annotations.SdkInternalApi;
+import software.amazon.awssdk.core.async.AsyncRequestBody;
+import software.amazon.awssdk.core.internal.util.Mimetype;
+import software.amazon.awssdk.utils.BinaryUtils;
+import software.amazon.awssdk.utils.Logger;
+
+/**
+ * An implementation of {@link AsyncRequestBody} for providing data from the supplied {@link ByteBuffer} array. This is created
+ * using static methods on {@link AsyncRequestBody}
+ *
+ * @see AsyncRequestBody#fromBytes(byte[])
+ * @see AsyncRequestBody#fromBytesUnsafe(byte[])
+ * @see AsyncRequestBody#fromByteBuffer(ByteBuffer)
+ * @see AsyncRequestBody#fromByteBufferUnsafe(ByteBuffer)
+ * @see AsyncRequestBody#fromByteBuffers(ByteBuffer...)
+ * @see AsyncRequestBody#fromByteBuffersUnsafe(ByteBuffer...)
+ * @see AsyncRequestBody#fromString(String)
+ */
+@SdkInternalApi
+public final class ByteBuffersAsyncRequestBody implements AsyncRequestBody {
+ private static final Logger log = Logger.loggerFor(ByteBuffersAsyncRequestBody.class);
+
+ private final String mimetype;
+ private final Long length;
+ private final ByteBuffer[] buffers;
+
+ private ByteBuffersAsyncRequestBody(String mimetype, Long length, ByteBuffer... buffers) {
+ this.mimetype = mimetype;
+ this.length = length;
+ this.buffers = buffers;
+ }
+
+ @Override
+ public Optional contentLength() {
+ return Optional.ofNullable(length);
+ }
+
+ @Override
+ public String contentType() {
+ return mimetype;
+ }
+
+ @Override
+ public void subscribe(Subscriber super ByteBuffer> s) {
+ // As per rule 1.9 we must throw NullPointerException if the subscriber parameter is null
+ if (s == null) {
+ throw new NullPointerException("Subscription MUST NOT be null.");
+ }
+
+ // As per 2.13, this method must return normally (i.e. not throw).
+ try {
+ s.onSubscribe(
+ new Subscription() {
+ private final AtomicInteger index = new AtomicInteger(0);
+ private final AtomicBoolean completed = new AtomicBoolean(false);
+
+ @Override
+ public void request(long n) {
+ if (completed.get()) {
+ return;
+ }
+
+ if (n > 0) {
+ int i = index.getAndIncrement();
+
+ if (i >= buffers.length) {
+ return;
+ }
+
+ long remaining = n;
+
+ do {
+ ByteBuffer buffer = buffers[i];
+
+ // Pending discussions on https://github.com/aws/aws-sdk-java-v2/issues/3928
+ if (buffer.isDirect()) {
+ buffer = BinaryUtils.toNonDirectBuffer(buffer);
+ }
+
+ s.onNext(buffer);
+ remaining--;
+ } while (remaining > 0 && (i = index.getAndIncrement()) < buffers.length);
+
+ if (i >= buffers.length - 1 && completed.compareAndSet(false, true)) {
+ s.onComplete();
+ }
+ } else {
+ s.onError(new IllegalArgumentException("§3.9: non-positive requests are not allowed!"));
+ }
+ }
+
+ @Override
+ public void cancel() {
+ completed.set(true);
+ }
+ }
+ );
+ } catch (Throwable ex) {
+ log.error(() -> s + " violated the Reactive Streams rule 2.13 by throwing an exception from onSubscribe.", ex);
+ }
+ }
+
+ public static ByteBuffersAsyncRequestBody of(Long length, ByteBuffer... buffers) {
+ return new ByteBuffersAsyncRequestBody(Mimetype.MIMETYPE_OCTET_STREAM, length, buffers);
+ }
+
+ public static ByteBuffersAsyncRequestBody of(String mimetype, Long length, ByteBuffer... buffers) {
+ return new ByteBuffersAsyncRequestBody(mimetype, length, buffers);
+ }
+
+ public static ByteBuffersAsyncRequestBody from(byte[] bytes) {
+ return new ByteBuffersAsyncRequestBody(Mimetype.MIMETYPE_OCTET_STREAM, (long) bytes.length,
+ ByteBuffer.wrap(bytes));
+ }
+
+ public static ByteBuffersAsyncRequestBody from(String mimetype, byte[] bytes) {
+ return new ByteBuffersAsyncRequestBody(mimetype, (long) bytes.length, ByteBuffer.wrap(bytes));
+ }
+}
diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/async/AsyncRequestBodyTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/async/AsyncRequestBodyTest.java
index e0252c9ba6d2..fa76c681ac51 100644
--- a/core/sdk-core/src/test/java/software/amazon/awssdk/core/async/AsyncRequestBodyTest.java
+++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/async/AsyncRequestBodyTest.java
@@ -15,44 +15,39 @@
package software.amazon.awssdk.core.async;
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import com.google.common.jimfs.Configuration;
import com.google.common.jimfs.Jimfs;
import io.reactivex.Flowable;
-import java.io.File;
-import java.io.FileWriter;
import java.io.IOException;
-import java.io.InputStream;
import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.FileSystem;
import java.nio.file.Files;
import java.nio.file.Path;
-import java.time.Instant;
-import java.util.Collections;
import java.util.List;
-import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
import java.util.stream.Collectors;
import org.assertj.core.util.Lists;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import software.amazon.awssdk.core.internal.util.Mimetype;
import software.amazon.awssdk.http.async.SimpleSubscriber;
import software.amazon.awssdk.utils.BinaryUtils;
-import software.amazon.awssdk.utils.StringInputStream;
-@RunWith(Parameterized.class)
public class AsyncRequestBodyTest {
- private final static String testString = "Hello!";
- private final static Path path;
+
+ private static final String testString = "Hello!";
+ private static final Path path;
static {
FileSystem fs = Jimfs.newFileSystem(Configuration.unix());
@@ -64,27 +59,16 @@ public class AsyncRequestBodyTest {
}
}
- @Parameterized.Parameters
- public static AsyncRequestBody[] data() {
- return new AsyncRequestBody[]{
- AsyncRequestBody.fromString(testString),
- AsyncRequestBody.fromFile(path)
- };
+ @ParameterizedTest
+ @MethodSource("contentIntegrityChecks")
+ void hasCorrectLength(AsyncRequestBody asyncRequestBody) {
+ assertEquals(testString.length(), asyncRequestBody.contentLength().get());
}
- private AsyncRequestBody provider;
- public AsyncRequestBodyTest(AsyncRequestBody provider) {
- this.provider = provider;
- }
-
- @Test
- public void hasCorrectLength() {
- assertThat(provider.contentLength().get()).isEqualTo(testString.length());
- }
-
- @Test
- public void hasCorrectContent() throws InterruptedException {
+ @ParameterizedTest
+ @MethodSource("contentIntegrityChecks")
+ void hasCorrectContent(AsyncRequestBody asyncRequestBody) throws InterruptedException {
StringBuilder sb = new StringBuilder();
CountDownLatch done = new CountDownLatch(1);
@@ -106,75 +90,196 @@ public void onComplete() {
}
};
- provider.subscribe(subscriber);
+ asyncRequestBody.subscribe(subscriber);
done.await();
- assertThat(sb.toString()).isEqualTo(testString);
+ assertEquals(testString, sb.toString());
+ }
+
+ private static AsyncRequestBody[] contentIntegrityChecks() {
+ return new AsyncRequestBody[] {
+ AsyncRequestBody.fromString(testString),
+ AsyncRequestBody.fromFile(path)
+ };
}
@Test
- public void stringConstructorHasCorrectContentType() {
- AsyncRequestBody requestBody = AsyncRequestBody.fromString("hello world");
- assertThat(requestBody.contentType()).isEqualTo("text/plain; charset=UTF-8");
+ void fromBytesCopiesTheProvidedByteArray() {
+ byte[] bytes = testString.getBytes(StandardCharsets.UTF_8);
+ byte[] bytesClone = bytes.clone();
+
+ AsyncRequestBody asyncRequestBody = AsyncRequestBody.fromBytes(bytes);
+
+ for (int i = 0; i < bytes.length; i++) {
+ bytes[i] += 1;
+ }
+
+ AtomicReference publishedBuffer = new AtomicReference<>();
+ Subscriber subscriber = new SimpleSubscriber(publishedBuffer::set);
+
+ asyncRequestBody.subscribe(subscriber);
+
+ byte[] publishedByteArray = BinaryUtils.copyAllBytesFrom(publishedBuffer.get());
+ assertArrayEquals(bytesClone, publishedByteArray);
}
@Test
- public void stringWithEncoding1ConstructorHasCorrectContentType() {
- AsyncRequestBody requestBody = AsyncRequestBody.fromString("hello world", StandardCharsets.ISO_8859_1);
- assertThat(requestBody.contentType()).isEqualTo("text/plain; charset=ISO-8859-1");
+ void fromBytesUnsafeDoesNotCopyTheProvidedByteArray() {
+ byte[] bytes = testString.getBytes(StandardCharsets.UTF_8);
+
+ AsyncRequestBody asyncRequestBody = AsyncRequestBody.fromBytesUnsafe(bytes);
+
+ for (int i = 0; i < bytes.length; i++) {
+ bytes[i] += 1;
+ }
+
+ AtomicReference publishedBuffer = new AtomicReference<>();
+ Subscriber subscriber = new SimpleSubscriber(publishedBuffer::set);
+
+ asyncRequestBody.subscribe(subscriber);
+
+ byte[] publishedByteArray = BinaryUtils.copyAllBytesFrom(publishedBuffer.get());
+ assertArrayEquals(bytes, publishedByteArray);
+ }
+
+ @ParameterizedTest
+ @MethodSource("safeByteBufferBodyBuilders")
+ void safeByteBufferBuildersCopyTheProvidedBuffer(Function bodyBuilder) {
+ byte[] bytes = testString.getBytes(StandardCharsets.UTF_8);
+ byte[] bytesClone = bytes.clone();
+
+ AsyncRequestBody asyncRequestBody = bodyBuilder.apply(ByteBuffer.wrap(bytes));
+
+ for (int i = 0; i < bytes.length; i++) {
+ bytes[i] += 1;
+ }
+
+ AtomicReference publishedBuffer = new AtomicReference<>();
+ Subscriber subscriber = new SimpleSubscriber(publishedBuffer::set);
+
+ asyncRequestBody.subscribe(subscriber);
+
+ byte[] publishedByteArray = BinaryUtils.copyAllBytesFrom(publishedBuffer.get());
+ assertArrayEquals(bytesClone, publishedByteArray);
+ }
+
+ @ParameterizedTest
+ @MethodSource("safeByteBufferBodyBuilders")
+ void safeByteBufferBuildersRewindTheInputByteBuffer(Function bodyBuilder) {
+ byte[] bytes = testString.getBytes(StandardCharsets.UTF_8);
+ ByteBuffer bb = ByteBuffer.wrap(bytes);
+ bb.position(bytes.length - 1);
+
+
+ AsyncRequestBody asyncRequestBody = bodyBuilder.apply(bb);
+
+ for (int i = 0; i < bytes.length; i++) {
+ bytes[i] += 1;
+ }
+
+ AtomicReference publishedBuffer = new AtomicReference<>();
+ Subscriber subscriber = new SimpleSubscriber(publishedBuffer::set);
+
+ asyncRequestBody.subscribe(subscriber);
+
+ assertEquals(0, publishedBuffer.get().position());
+ }
+
+ private static Function[] safeByteBufferBodyBuilders() {
+ Function fromByteBuffer = AsyncRequestBody::fromByteBuffer;
+ Function fromByteBuffers = AsyncRequestBody::fromByteBuffers;
+ return new Function[] {fromByteBuffer, fromByteBuffers};
+ }
+
+ @ParameterizedTest
+ @MethodSource("unsafeByteBufferBodyBuilders")
+ void unsafeByteBufferBuildersDoNotCopyTheProvidedBuffer(Function bodyBuilder) {
+ byte[] bytes = testString.getBytes(StandardCharsets.UTF_8);
+
+ AsyncRequestBody asyncRequestBody = bodyBuilder.apply(ByteBuffer.wrap(bytes));
+
+ for (int i = 0; i < bytes.length; i++) {
+ bytes[i] += 1;
+ }
+
+ AtomicReference publishedBuffer = new AtomicReference<>();
+ Subscriber subscriber = new SimpleSubscriber(publishedBuffer::set);
+
+ asyncRequestBody.subscribe(subscriber);
+
+ byte[] publishedByteArray = BinaryUtils.copyAllBytesFrom(publishedBuffer.get());
+ assertArrayEquals(bytes, publishedByteArray);
+ }
+
+ @ParameterizedTest
+ @MethodSource("unsafeByteBufferBodyBuilders")
+ void safeByteBufferBuildersDoNotRewindTheInputByteBuffer(Function bodyBuilder) {
+ byte[] bytes = testString.getBytes(StandardCharsets.UTF_8);
+ ByteBuffer wrap = ByteBuffer.wrap(bytes);
+ int expectedPosition = bytes.length - 1;
+ wrap.position(expectedPosition);
+
+ AsyncRequestBody asyncRequestBody = bodyBuilder.apply(wrap);
+
+ AtomicReference publishedBuffer = new AtomicReference<>();
+ Subscriber subscriber = new SimpleSubscriber(publishedBuffer::set);
+
+ asyncRequestBody.subscribe(subscriber);
+
+ assertEquals(expectedPosition, publishedBuffer.get().position());
+ }
+
+ private static Function[] unsafeByteBufferBodyBuilders() {
+ Function fromByteBuffer = AsyncRequestBody::fromByteBufferUnsafe;
+ Function fromByteBuffers = AsyncRequestBody::fromByteBuffersUnsafe;
+ return new Function[] {fromByteBuffer, fromByteBuffers};
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {"US-ASCII", "ISO-8859-1", "UTF-8", "UTF-16BE", "UTF-16LE", "UTF-16"})
+ void charsetsAreConvertedToTheCorrectContentType(Charset charset) {
+ AsyncRequestBody requestBody = AsyncRequestBody.fromString("hello world", charset);
+ assertEquals("text/plain; charset=" + charset.name(), requestBody.contentType());
}
@Test
- public void stringWithEncoding2ConstructorHasCorrectContentType() {
- AsyncRequestBody requestBody = AsyncRequestBody.fromString("hello world", StandardCharsets.UTF_16BE);
- assertThat(requestBody.contentType()).isEqualTo("text/plain; charset=UTF-16BE");
+ void stringConstructorHasCorrectDefaultContentType() {
+ AsyncRequestBody requestBody = AsyncRequestBody.fromString("hello world");
+ assertEquals("text/plain; charset=UTF-8", requestBody.contentType());
}
@Test
- public void fileConstructorHasCorrectContentType() {
+ void fileConstructorHasCorrectContentType() {
AsyncRequestBody requestBody = AsyncRequestBody.fromFile(path);
- assertThat(requestBody.contentType()).isEqualTo(Mimetype.MIMETYPE_OCTET_STREAM);
+ assertEquals(Mimetype.MIMETYPE_OCTET_STREAM, requestBody.contentType());
}
@Test
- public void bytesArrayConstructorHasCorrectContentType() {
+ void bytesArrayConstructorHasCorrectContentType() {
AsyncRequestBody requestBody = AsyncRequestBody.fromBytes("hello world".getBytes());
- assertThat(requestBody.contentType()).isEqualTo(Mimetype.MIMETYPE_OCTET_STREAM);
+ assertEquals(Mimetype.MIMETYPE_OCTET_STREAM, requestBody.contentType());
}
@Test
- public void bytesBufferConstructorHasCorrectContentType() {
+ void bytesBufferConstructorHasCorrectContentType() {
ByteBuffer byteBuffer = ByteBuffer.wrap("hello world".getBytes());
AsyncRequestBody requestBody = AsyncRequestBody.fromByteBuffer(byteBuffer);
- assertThat(requestBody.contentType()).isEqualTo(Mimetype.MIMETYPE_OCTET_STREAM);
+ assertEquals(Mimetype.MIMETYPE_OCTET_STREAM, requestBody.contentType());
}
@Test
- public void emptyBytesConstructorHasCorrectContentType() {
+ void emptyBytesConstructorHasCorrectContentType() {
AsyncRequestBody requestBody = AsyncRequestBody.empty();
- assertThat(requestBody.contentType()).isEqualTo(Mimetype.MIMETYPE_OCTET_STREAM);
+ assertEquals(Mimetype.MIMETYPE_OCTET_STREAM, requestBody.contentType());
}
@Test
- public void publisherConstructorHasCorrectContentType() {
+ void publisherConstructorHasCorrectContentType() {
List requestBodyStrings = Lists.newArrayList("A", "B", "C");
List bodyBytes = requestBodyStrings.stream()
- .map(s -> ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8)))
- .collect(Collectors.toList());
+ .map(s -> ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8)))
+ .collect(Collectors.toList());
Publisher bodyPublisher = Flowable.fromIterable(bodyBytes);
AsyncRequestBody requestBody = AsyncRequestBody.fromPublisher(bodyPublisher);
- assertThat(requestBody.contentType()).isEqualTo(Mimetype.MIMETYPE_OCTET_STREAM);
- }
-
- @Test
- public void fromBytes_byteArrayNotNull_createsCopy() {
- byte[] original = {0x1, 0x2, 0x3, 0x4};
- byte[] toModify = new byte[original.length];
- System.arraycopy(original, 0, toModify, 0, original.length);
- AsyncRequestBody body = AsyncRequestBody.fromBytes(toModify);
- for (int i = 0; i < toModify.length; ++i) {
- toModify[i]++;
- }
- ByteBuffer publishedBb = Flowable.fromPublisher(body).toList().blockingGet().get(0);
- assertThat(BinaryUtils.copyAllBytesFrom(publishedBb)).isEqualTo(original);
+ assertEquals(Mimetype.MIMETYPE_OCTET_STREAM, requestBody.contentType());
}
}
diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/ByteArrayAsyncRequestBodyTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/ByteArrayAsyncRequestBodyTest.java
deleted file mode 100644
index 378fbf2f59c3..000000000000
--- a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/ByteArrayAsyncRequestBodyTest.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License").
- * You may not use this file except in compliance with the License.
- * A copy of the License is located at
- *
- * http://aws.amazon.com/apache2.0
- *
- * or in the "license" file accompanying this file. This file is distributed
- * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package software.amazon.awssdk.core.internal.async;
-
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-import java.nio.ByteBuffer;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.junit.jupiter.api.Test;
-import org.reactivestreams.Subscriber;
-import org.reactivestreams.Subscription;
-import software.amazon.awssdk.core.internal.util.Mimetype;
-
-public class ByteArrayAsyncRequestBodyTest {
- private class testSubscriber implements Subscriber {
- private Subscription subscription;
- protected AtomicBoolean onCompleteCalled = new AtomicBoolean(false);
-
- @Override
- public void onSubscribe(Subscription s) {
- this.subscription = s;
- s.request(1);
- }
-
- @Override
- public void onNext(ByteBuffer byteBuffer) {
-
- }
-
- @Override
- public void onError(Throwable throwable) {
-
- }
-
- @Override
- public void onComplete() {
- subscription.request(1);
- onCompleteCalled.set(true);
- }
- }
-
- testSubscriber subscriber = new testSubscriber();
-
- @Test
- public void concurrentRequests_shouldCompleteNormally() {
- ByteArrayAsyncRequestBody byteArrayReq = new ByteArrayAsyncRequestBody("Hello World!".getBytes(),
- Mimetype.MIMETYPE_OCTET_STREAM);
- byteArrayReq.subscribe(subscriber);
- assertTrue(subscriber.onCompleteCalled.get());
- }
-
-}
diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/ByteBuffersAsyncRequestBodyTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/ByteBuffersAsyncRequestBodyTest.java
new file mode 100644
index 000000000000..c4e4c4c0040d
--- /dev/null
+++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/ByteBuffersAsyncRequestBodyTest.java
@@ -0,0 +1,189 @@
+/*
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/apache2.0
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package software.amazon.awssdk.core.internal.async;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+import org.junit.jupiter.api.Test;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+import software.amazon.awssdk.core.async.AsyncRequestBody;
+import software.amazon.awssdk.utils.BinaryUtils;
+
+class ByteBuffersAsyncRequestBodyTest {
+
+ private static class TestSubscriber implements Subscriber {
+ private Subscription subscription;
+ private boolean onCompleteCalled = false;
+ private int callsToComplete = 0;
+ private final List publishedResults = Collections.synchronizedList(new ArrayList<>());
+
+ public void request(long n) {
+ subscription.request(n);
+ }
+
+ @Override
+ public void onSubscribe(Subscription s) {
+ this.subscription = s;
+ }
+
+ @Override
+ public void onNext(ByteBuffer byteBuffer) {
+ publishedResults.add(byteBuffer);
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ throw new IllegalStateException(throwable);
+ }
+
+ @Override
+ public void onComplete() {
+ onCompleteCalled = true;
+ callsToComplete++;
+ }
+ }
+
+ @Test
+ public void subscriberIsMarkedAsCompleted() {
+ AsyncRequestBody requestBody = ByteBuffersAsyncRequestBody.from("Hello World!".getBytes(StandardCharsets.UTF_8));
+
+ TestSubscriber subscriber = new TestSubscriber();
+ requestBody.subscribe(subscriber);
+ subscriber.request(1);
+
+ assertTrue(subscriber.onCompleteCalled);
+ assertEquals(1, subscriber.publishedResults.size());
+ }
+
+ @Test
+ public void subscriberIsMarkedAsCompletedWhenARequestIsMadeForMoreBuffersThanAreAvailable() {
+ AsyncRequestBody requestBody = ByteBuffersAsyncRequestBody.from("Hello World!".getBytes(StandardCharsets.UTF_8));
+
+ TestSubscriber subscriber = new TestSubscriber();
+ requestBody.subscribe(subscriber);
+ subscriber.request(2);
+
+ assertTrue(subscriber.onCompleteCalled);
+ assertEquals(1, subscriber.publishedResults.size());
+ }
+
+ @Test
+ public void subscriberIsThreadSafeAndMarkedAsCompletedExactlyOnce() throws InterruptedException {
+ int numBuffers = 100;
+ AsyncRequestBody requestBody = ByteBuffersAsyncRequestBody.of(null, IntStream.range(0, numBuffers)
+ .mapToObj(i -> ByteBuffer.wrap(new byte[1]))
+ .toArray(ByteBuffer[]::new));
+
+ TestSubscriber subscriber = new TestSubscriber();
+ requestBody.subscribe(subscriber);
+
+ int parallelism = 8;
+ ExecutorService executorService = Executors.newFixedThreadPool(parallelism);
+ for (int i = 0; i < parallelism; i++) {
+ executorService.submit(() -> {
+ for (int j = 0; j < numBuffers; j++) {
+ subscriber.request(2);
+ }
+ });
+ }
+ executorService.shutdown();
+ executorService.awaitTermination(1, TimeUnit.MINUTES);
+
+ assertTrue(subscriber.onCompleteCalled);
+ assertEquals(1, subscriber.callsToComplete);
+ assertEquals(numBuffers, subscriber.publishedResults.size());
+ }
+
+ @Test
+ public void subscriberIsNotMarkedAsCompletedWhenThereAreRemainingBuffersToPublish() {
+ byte[] helloWorld = "Hello World!".getBytes(StandardCharsets.UTF_8);
+ byte[] goodbyeWorld = "Goodbye World!".getBytes(StandardCharsets.UTF_8);
+ AsyncRequestBody requestBody = ByteBuffersAsyncRequestBody.of((long) (helloWorld.length + goodbyeWorld.length),
+ ByteBuffer.wrap(helloWorld),
+ ByteBuffer.wrap(goodbyeWorld));
+
+ TestSubscriber subscriber = new TestSubscriber();
+ requestBody.subscribe(subscriber);
+ subscriber.request(1);
+
+ assertFalse(subscriber.onCompleteCalled);
+ assertEquals(1, subscriber.publishedResults.size());
+ }
+
+ @Test
+ public void subscriberReceivesAllBuffers() {
+ byte[] helloWorld = "Hello World!".getBytes(StandardCharsets.UTF_8);
+ byte[] goodbyeWorld = "Goodbye World!".getBytes(StandardCharsets.UTF_8);
+
+ AsyncRequestBody requestBody = ByteBuffersAsyncRequestBody.of((long) (helloWorld.length + goodbyeWorld.length),
+ ByteBuffer.wrap(helloWorld),
+ ByteBuffer.wrap(goodbyeWorld));
+
+ TestSubscriber subscriber = new TestSubscriber();
+ requestBody.subscribe(subscriber);
+ subscriber.request(2);
+
+ assertEquals(2, subscriber.publishedResults.size());
+ assertTrue(subscriber.onCompleteCalled);
+ assertArrayEquals(helloWorld, BinaryUtils.copyAllBytesFrom(subscriber.publishedResults.get(0)));
+ assertArrayEquals(goodbyeWorld, BinaryUtils.copyAllBytesFrom(subscriber.publishedResults.get(1)));
+ }
+
+ @Test
+ public void canceledSubscriberDoesNotReturnNewResults() {
+ AsyncRequestBody requestBody = ByteBuffersAsyncRequestBody.of(null, ByteBuffer.wrap(new byte[0]));
+
+ TestSubscriber subscriber = new TestSubscriber();
+ requestBody.subscribe(subscriber);
+
+ subscriber.subscription.cancel();
+ subscriber.request(1);
+
+ assertTrue(subscriber.publishedResults.isEmpty());
+ }
+
+ // Pending discussions on https://github.com/aws/aws-sdk-java-v2/issues/3928
+ @Test
+ public void directBuffersAreCoppiedToNonDirectBuffers() {
+ byte[] bytes = "Hello World!".getBytes(StandardCharsets.UTF_8);
+ ByteBuffer buffer = ByteBuffer.allocateDirect(bytes.length)
+ .put(bytes);
+ buffer.flip();
+ AsyncRequestBody requestBody = ByteBuffersAsyncRequestBody.of(null, buffer);
+
+ TestSubscriber subscriber = new TestSubscriber();
+ requestBody.subscribe(subscriber);
+ subscriber.request(1);
+
+ ByteBuffer publishedBuffer = subscriber.publishedResults.get(0);
+ assertFalse(publishedBuffer.isDirect());
+ assertArrayEquals(bytes, publishedBuffer.array());
+ }
+
+}
diff --git a/utils/src/main/java/software/amazon/awssdk/utils/BinaryUtils.java b/utils/src/main/java/software/amazon/awssdk/utils/BinaryUtils.java
index e7fd8c015e1d..543bfc9a2a62 100644
--- a/utils/src/main/java/software/amazon/awssdk/utils/BinaryUtils.java
+++ b/utils/src/main/java/software/amazon/awssdk/utils/BinaryUtils.java
@@ -117,6 +117,60 @@ public static ByteArrayInputStream toStream(ByteBuffer byteBuffer) {
return new ByteArrayInputStream(copyBytesFrom(byteBuffer));
}
+ /**
+ * Returns an immutable copy of the given {@code ByteBuffer}.
+ *
+ * The new buffer's position will be set to the position of the given {@code ByteBuffer}, but the mark if defined will be
+ * ignored.
+ *
+ * Note: this method intentionally converts direct buffers to non-direct though there is no guarantee this will always be
+ * the case, if this is required see {@link #toNonDirectBuffer(ByteBuffer)}
+ *
+ * @param bb the source {@code ByteBuffer} to copy.
+ * @return a read only {@code ByteBuffer}.
+ */
+ public static ByteBuffer immutableCopyOf(ByteBuffer bb) {
+ if (bb == null) {
+ return null;
+ }
+ int sourceBufferPosition = bb.position();
+ ByteBuffer readOnlyCopy = bb.asReadOnlyBuffer();
+ readOnlyCopy.rewind();
+ ByteBuffer cloned = ByteBuffer.allocate(bb.capacity())
+ .put(readOnlyCopy);
+ cloned.position(sourceBufferPosition);
+ return cloned.asReadOnlyBuffer();
+ }
+
+ /**
+ * Returns a copy of the given {@code DirectByteBuffer} from its current position as a non-direct {@code HeapByteBuffer}
+ *
+ * The new buffer's position will be set to the position of the given {@code ByteBuffer}, but the mark if defined will be
+ * ignored.
+ *
+ * @param bb the source {@code ByteBuffer} to copy.
+ * @return {@code ByteBuffer}.
+ */
+ public static ByteBuffer toNonDirectBuffer(ByteBuffer bb) {
+ if (bb == null) {
+ return null;
+ }
+ if (!bb.isDirect()) {
+ throw new IllegalArgumentException("Provided ByteBuffer is already non-direct");
+ }
+ int sourceBufferPosition = bb.position();
+ ByteBuffer readOnlyCopy = bb.asReadOnlyBuffer();
+ readOnlyCopy.rewind();
+ ByteBuffer cloned = ByteBuffer.allocate(bb.capacity())
+ .put(readOnlyCopy);
+ cloned.rewind();
+ cloned.position(sourceBufferPosition);
+ if (bb.isReadOnly()) {
+ return cloned.asReadOnlyBuffer();
+ }
+ return cloned;
+ }
+
/**
* Returns a copy of all the bytes from the given ByteBuffer
,
* from the beginning to the buffer's limit; or null if the input is null.
diff --git a/utils/src/test/java/software/amazon/awssdk/utils/BinaryUtilsTest.java b/utils/src/test/java/software/amazon/awssdk/utils/BinaryUtilsTest.java
index 5f255d347adc..90b73b187937 100644
--- a/utils/src/test/java/software/amazon/awssdk/utils/BinaryUtilsTest.java
+++ b/utils/src/test/java/software/amazon/awssdk/utils/BinaryUtilsTest.java
@@ -16,9 +16,11 @@
package software.amazon.awssdk.utils;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.nio.ByteBuffer;
@@ -32,13 +34,11 @@ public class BinaryUtilsTest {
public void testHex() {
{
String hex = BinaryUtils.toHex(new byte[] {0});
- System.out.println(hex);
String hex2 = Base16Lower.encodeAsString(new byte[] {0});
assertEquals(hex, hex2);
}
{
String hex = BinaryUtils.toHex(new byte[] {-1});
- System.out.println(hex);
String hex2 = Base16Lower.encodeAsString(new byte[] {-1});
assertEquals(hex, hex2);
}
@@ -169,7 +169,7 @@ public void testCopyRemainingBytesFrom_nullBuffer() {
@Test
public void testCopyRemainingBytesFrom_noRemainingBytes() {
ByteBuffer bb = ByteBuffer.allocate(1);
- bb.put(new byte[]{1});
+ bb.put(new byte[] {1});
bb.flip();
bb.get();
@@ -180,7 +180,7 @@ public void testCopyRemainingBytesFrom_noRemainingBytes() {
@Test
public void testCopyRemainingBytesFrom_fullBuffer() {
ByteBuffer bb = ByteBuffer.allocate(4);
- bb.put(new byte[]{1, 2, 3, 4});
+ bb.put(new byte[] {1, 2, 3, 4});
bb.flip();
byte[] copy = BinaryUtils.copyRemainingBytesFrom(bb);
@@ -191,7 +191,7 @@ public void testCopyRemainingBytesFrom_fullBuffer() {
@Test
public void testCopyRemainingBytesFrom_partiallyReadBuffer() {
ByteBuffer bb = ByteBuffer.allocate(4);
- bb.put(new byte[]{1, 2, 3, 4});
+ bb.put(new byte[] {1, 2, 3, 4});
bb.flip();
bb.get();
@@ -201,4 +201,92 @@ public void testCopyRemainingBytesFrom_partiallyReadBuffer() {
assertThat(bb).isEqualTo(ByteBuffer.wrap(copy));
assertThat(copy).hasSize(2);
}
+
+ @Test
+ public void testImmutableCopyOfByteBuffer() {
+ ByteBuffer sourceBuffer = ByteBuffer.allocate(4);
+ byte[] originalBytesInSource = {1, 2, 3, 4};
+ sourceBuffer.put(originalBytesInSource);
+ sourceBuffer.flip();
+
+ ByteBuffer immutableCopy = BinaryUtils.immutableCopyOf(sourceBuffer);
+
+ byte[] bytesInSourceAfterCopy = {-1, -2, -3, -4};
+ sourceBuffer.put(bytesInSourceAfterCopy);
+ sourceBuffer.flip();
+
+ assertTrue(immutableCopy.isReadOnly());
+ byte[] fromImmutableCopy = new byte[originalBytesInSource.length];
+ immutableCopy.get(fromImmutableCopy);
+ assertArrayEquals(originalBytesInSource, fromImmutableCopy);
+
+ assertEquals(0, sourceBuffer.position());
+ byte[] fromSource = new byte[bytesInSourceAfterCopy.length];
+ sourceBuffer.get(fromSource);
+ assertArrayEquals(bytesInSourceAfterCopy, fromSource);
+ }
+
+ @Test
+ public void testImmutableCopyOfByteBuffer_nullBuffer() {
+ assertNull(BinaryUtils.immutableCopyOf(null));
+ }
+
+ @Test
+ public void testImmutableCopyOfByteBuffer_partiallyReadBuffer() {
+ ByteBuffer sourceBuffer = ByteBuffer.allocate(4);
+ byte[] bytes = {1, 2, 3, 4};
+ sourceBuffer.put(bytes);
+ sourceBuffer.position(2);
+
+ ByteBuffer immutableCopy = BinaryUtils.immutableCopyOf(sourceBuffer);
+
+ assertEquals(sourceBuffer.position(), immutableCopy.position());
+ immutableCopy.rewind();
+ byte[] fromImmutableCopy = new byte[bytes.length];
+ immutableCopy.get(fromImmutableCopy);
+ assertArrayEquals(bytes, fromImmutableCopy);
+ }
+
+ @Test
+ public void testToNonDirectBuffer() {
+ ByteBuffer bb = ByteBuffer.allocateDirect(4);
+ byte[] expected = {1, 2, 3, 4};
+ bb.put(expected);
+ bb.flip();
+
+ ByteBuffer nonDirectBuffer = BinaryUtils.toNonDirectBuffer(bb);
+
+ assertFalse(nonDirectBuffer.isDirect());
+ byte[] bytes = new byte[expected.length];
+ nonDirectBuffer.get(bytes);
+ assertArrayEquals(expected, bytes);
+ }
+
+ @Test
+ public void testToNonDirectBuffer_nullBuffer() {
+ assertNull(BinaryUtils.toNonDirectBuffer(null));
+ }
+
+ @Test
+ public void testToNonDirectBuffer_partiallyReadBuffer() {
+ ByteBuffer sourceBuffer = ByteBuffer.allocateDirect(4);
+ byte[] bytes = {1, 2, 3, 4};
+ sourceBuffer.put(bytes);
+ sourceBuffer.position(2);
+
+ ByteBuffer nonDirectBuffer = BinaryUtils.toNonDirectBuffer(sourceBuffer);
+
+ assertEquals(sourceBuffer.position(), nonDirectBuffer.position());
+ nonDirectBuffer.rewind();
+ byte[] fromNonDirectBuffer = new byte[bytes.length];
+ nonDirectBuffer.get(fromNonDirectBuffer);
+ assertArrayEquals(bytes, fromNonDirectBuffer);
+ }
+
+ @Test
+ public void testToNonDirectBuffer_nonDirectBuffer() {
+ ByteBuffer nonDirectBuffer = ByteBuffer.allocate(0);
+ assertThrows(IllegalArgumentException.class, () -> BinaryUtils.toNonDirectBuffer(nonDirectBuffer));
+ }
+
}