From 87e9678ba6bec9579b57186946be8392aa1433c1 Mon Sep 17 00:00:00 2001 From: "Mateusz \"Serafin\" Gajewski" Date: Wed, 21 Jun 2023 15:25:51 +0200 Subject: [PATCH] Use AsyncRequestBody.fromByteBufferUnsafe and drop custom implementation It was introduced in SDK v2 2.20.84 by https://github.com/aws/aws-sdk-java-v2/pull/3925 --- .../s3/ByteBufferAsyncRequestBody.java | 116 ------------------ .../s3/S3FileSystemExchangeStorage.java | 5 +- 2 files changed, 3 insertions(+), 118 deletions(-) delete mode 100644 plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/s3/ByteBufferAsyncRequestBody.java diff --git a/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/s3/ByteBufferAsyncRequestBody.java b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/s3/ByteBufferAsyncRequestBody.java deleted file mode 100644 index cf141169323c..000000000000 --- a/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/s3/ByteBufferAsyncRequestBody.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.plugin.exchange.filesystem.s3; - -import io.airlift.log.Logger; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; -import software.amazon.awssdk.core.async.AsyncRequestBody; -import software.amazon.awssdk.core.internal.async.ByteArrayAsyncRequestBody; -import software.amazon.awssdk.core.internal.util.Mimetype; - -import java.nio.ByteBuffer; -import java.util.Optional; - -import static java.util.Objects.requireNonNull; - -/** - * This class mimics the implementation of {@link ByteArrayAsyncRequestBody} except for we use a ByteBuffer - * to avoid unnecessary memory copy - * - * An implementation of {@link AsyncRequestBody} for providing data from memory. This is created using static - * methods on {@link AsyncRequestBody} - * - * @see AsyncRequestBody#fromBytes(byte[]) - * @see AsyncRequestBody#fromByteBuffer(ByteBuffer) - * @see AsyncRequestBody#fromString(String) - */ -public final class ByteBufferAsyncRequestBody - implements AsyncRequestBody -{ - private static final Logger log = Logger.get(ByteBufferAsyncRequestBody.class); - - private final ByteBuffer byteBuffer; - - private final String mimetype; - - public ByteBufferAsyncRequestBody(ByteBuffer byteBuffer, String mimetype) - { - this.byteBuffer = requireNonNull(byteBuffer, "byteBuffer is null"); - this.mimetype = requireNonNull(mimetype, "mimetype is null"); - } - - @Override - public Optional contentLength() - { - return Optional.of((long) byteBuffer.remaining()); - } - - @Override - public String contentType() - { - return mimetype; - } - - @Override - public void subscribe(Subscriber s) - { - // As per rule 1.9 we must throw NullPointerException if the subscriber parameter is null - if (s == null) { - throw new NullPointerException("Subscription MUST NOT be null."); - } - - // As per 2.13, this method must return normally (i.e. not throw). - try { - s.onSubscribe( - new Subscription() { - private boolean done; - - @Override - public void request(long n) - { - if (done) { - return; - } - if (n > 0) { - done = true; - s.onNext(byteBuffer.asReadOnlyBuffer()); - s.onComplete(); - } - else { - s.onError(new IllegalArgumentException("ยง3.9: non-positive requests are not allowed!")); - } - } - - @Override - public void cancel() - { - synchronized (this) { - if (!done) { - done = true; - } - } - } - }); - } - catch (Throwable ex) { - log.error(ex, " violated the Reactive Streams rule 2.13 by throwing an exception from onSubscribe."); - } - } - - static AsyncRequestBody fromByteBuffer(ByteBuffer byteBuffer) - { - return new ByteBufferAsyncRequestBody(byteBuffer, Mimetype.MIMETYPE_OCTET_STREAM); - } -} diff --git a/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/s3/S3FileSystemExchangeStorage.java b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/s3/S3FileSystemExchangeStorage.java index bca0e0685336..85b1de895ceb 100644 --- a/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/s3/S3FileSystemExchangeStorage.java +++ b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/s3/S3FileSystemExchangeStorage.java @@ -120,6 +120,7 @@ import static java.util.Objects.requireNonNull; import static java.util.Objects.requireNonNullElseGet; import static java.util.concurrent.TimeUnit.SECONDS; +import static software.amazon.awssdk.core.async.AsyncRequestBody.fromByteBufferUnsafe; import static software.amazon.awssdk.core.client.config.SdkAdvancedClientOption.USER_AGENT_PREFIX; import static software.amazon.awssdk.core.client.config.SdkAdvancedClientOption.USER_AGENT_SUFFIX; @@ -712,7 +713,7 @@ public ListenableFuture write(Slice slice) .key(key) .storageClass(storageClass); directUploadFuture = translateFailures(toListenableFuture(s3AsyncClient.putObject(putObjectRequestBuilder.build(), - ByteBufferAsyncRequestBody.fromByteBuffer(slice.toByteBuffer())))); + fromByteBufferUnsafe(slice.toByteBuffer())))); stats.getPutObject().record(directUploadFuture); stats.getPutObjectDataSizeInBytes().add(slice.length()); return directUploadFuture; @@ -804,7 +805,7 @@ private ListenableFuture uploadPart(String uploadId, Slice slice, .partNumber(partNumber); UploadPartRequest uploadPartRequest = uploadPartRequestBuilder.build(); stats.getUploadPartDataSizeInBytes().add(slice.length()); - return stats.getUploadPart().record(Futures.transform(toListenableFuture(s3AsyncClient.uploadPart(uploadPartRequest, ByteBufferAsyncRequestBody.fromByteBuffer(slice.toByteBuffer()))), + return stats.getUploadPart().record(Futures.transform(toListenableFuture(s3AsyncClient.uploadPart(uploadPartRequest, fromByteBufferUnsafe(slice.toByteBuffer()))), uploadPartResponse -> CompletedPart.builder().eTag(uploadPartResponse.eTag()).partNumber(partNumber).build(), directExecutor())); }