From d84e25559afecc27026c7f4fe7aeaf0d0ce705b4 Mon Sep 17 00:00:00 2001 From: BenWhitehead Date: Wed, 27 Mar 2024 21:41:43 -0400 Subject: [PATCH] feat: port BufferToDiskThenUpload to work with HttpStorageOptions (#2473) --- .../storage/BlobWriteSessionConfigs.java | 8 +-- .../cloud/storage/BufferToDiskThenUpload.java | 4 +- .../com/google/cloud/storage/StorageImpl.java | 59 +++++++++++++++++-- .../storage/it/ITBlobWriteSessionTest.java | 7 ++- 4 files changed, 65 insertions(+), 13 deletions(-) diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteSessionConfigs.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteSessionConfigs.java index 781411c6d8..43da35cc6e 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteSessionConfigs.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteSessionConfigs.java @@ -86,7 +86,7 @@ * Buffer bytes to a temporary file on disk. On {@link WritableByteChannel#close() close()} * upload the entire files contents to Cloud Storage. Delete the temporary file. * - * gRPC + * gRPC, HTTP * *
    *
  1. A Resumable Upload Session will be used to upload the file on disk.
  2. @@ -272,7 +272,7 @@ public static BidiBlobWriteSessionConfig bidiWrite() { * @since 2.26.0 This new api is in preview and is subject to breaking changes. */ @BetaApi - @TransportCompatibility({Transport.GRPC}) + @TransportCompatibility({Transport.GRPC, Transport.HTTP}) public static BlobWriteSessionConfig bufferToTempDirThenUpload() throws IOException { return bufferToDiskThenUpload( Paths.get(System.getProperty("java.io.tmpdir"), "google-cloud-storage")); @@ -289,7 +289,7 @@ public static BlobWriteSessionConfig bufferToTempDirThenUpload() throws IOExcept * @since 2.26.0 This new api is in preview and is subject to breaking changes. */ @BetaApi - @TransportCompatibility({Transport.GRPC}) + @TransportCompatibility({Transport.GRPC, Transport.HTTP}) public static BufferToDiskThenUpload bufferToDiskThenUpload(Path path) throws IOException { return bufferToDiskThenUpload(ImmutableList.of(path)); } @@ -308,7 +308,7 @@ public static BufferToDiskThenUpload bufferToDiskThenUpload(Path path) throws IO * @since 2.26.0 This new api is in preview and is subject to breaking changes. */ @BetaApi - @TransportCompatibility({Transport.GRPC}) + @TransportCompatibility({Transport.GRPC, Transport.HTTP}) public static BufferToDiskThenUpload bufferToDiskThenUpload(Collection paths) throws IOException { return new BufferToDiskThenUpload(ImmutableList.copyOf(paths), false); diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BufferToDiskThenUpload.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BufferToDiskThenUpload.java index 8c9cb7c107..58ce102932 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BufferToDiskThenUpload.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BufferToDiskThenUpload.java @@ -59,9 +59,9 @@ */ @Immutable @BetaApi -@TransportCompatibility({Transport.GRPC}) +@TransportCompatibility({Transport.GRPC, Transport.HTTP}) public final class BufferToDiskThenUpload extends BlobWriteSessionConfig - implements BlobWriteSessionConfig.GrpcCompatible { + implements BlobWriteSessionConfig.HttpCompatible, BlobWriteSessionConfig.GrpcCompatible { private static final long serialVersionUID = 9059242302276891867L; /** diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java index 1a9bd8d03c..ea9e62f070 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java @@ -91,6 +91,7 @@ import java.util.concurrent.TimeoutException; import java.util.function.Function; import java.util.function.Supplier; +import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.Nullable; final class StorageImpl extends BaseService implements Storage, StorageInternal { @@ -147,7 +148,8 @@ public Blob create(BlobInfo blobInfo, BlobTargetOption... options) { .setMd5(EMPTY_BYTE_ARRAY_MD5) .setCrc32c(EMPTY_BYTE_ARRAY_CRC32C) .build(); - return internalCreate(updatedInfo, EMPTY_BYTE_ARRAY, 0, 0, options); + final Opts objectTargetOptOpts = Opts.unwrap(options).resolveFrom(updatedInfo); + return internalCreate(updatedInfo, EMPTY_BYTE_ARRAY, 0, 0, objectTargetOptOpts); } @Override @@ -161,7 +163,8 @@ public Blob create(BlobInfo blobInfo, byte[] content, BlobTargetOption... option BaseEncoding.base64() .encode(Ints.toByteArray(Hashing.crc32c().hashBytes(content).asInt()))) .build(); - return internalCreate(updatedInfo, content, 0, content.length, options); + final Opts objectTargetOptOpts = Opts.unwrap(options).resolveFrom(updatedInfo); + return internalCreate(updatedInfo, content, 0, content.length, objectTargetOptOpts); } @Override @@ -180,7 +183,8 @@ public Blob create( Ints.toByteArray( Hashing.crc32c().hashBytes(content, offset, length).asInt()))) .build(); - return internalCreate(updatedInfo, content, offset, length, options); + final Opts objectTargetOptOpts = Opts.unwrap(options).resolveFrom(updatedInfo); + return internalCreate(updatedInfo, content, offset, length, objectTargetOptOpts); } @Override @@ -203,12 +207,11 @@ public Blob create(BlobInfo blobInfo, InputStream content, BlobWriteOption... op private Blob internalCreate( BlobInfo info, - final byte[] content, + final byte @NonNull [] content, final int offset, final int length, - BlobTargetOption... options) { + Opts opts) { Preconditions.checkNotNull(content); - Opts opts = Opts.unwrap(options).resolveFrom(info); final Map optionsMap = opts.getRpcOptions(); BlobInfo updated = opts.blobInfoMapper().apply(info.toBuilder()).build(); @@ -1647,4 +1650,48 @@ public BlobWriteSession blobWriteSession(BlobInfo blobInfo, BlobWriteOption... o writerFactory.writeSession(this, blobInfo, opts); return BlobWriteSessions.of(writableByteChannelSession); } + + @Override + public BlobInfo internalCreateFrom(Path path, BlobInfo info, Opts opts) + throws IOException { + if (Files.isDirectory(path)) { + throw new StorageException(0, path + " is a directory"); + } + long size = Files.size(path); + if (size == 0L) { + return internalCreate(info, EMPTY_BYTE_ARRAY, 0, 0, opts); + } + final Map optionsMap = opts.getRpcOptions(); + BlobInfo.Builder builder = info.toBuilder().setMd5(null).setCrc32c(null); + BlobInfo updated = opts.blobInfoMapper().apply(builder).build(); + StorageObject encode = codecs.blobInfo().encode(updated); + + Supplier uploadIdSupplier = + ResumableMedia.startUploadForBlobInfo( + getOptions(), + updated, + optionsMap, + retryAlgorithmManager.getForResumableUploadSessionCreate(optionsMap)); + JsonResumableWrite jsonResumableWrite = + JsonResumableWrite.of(encode, optionsMap, uploadIdSupplier.get(), 0); + + JsonResumableSession session = + ResumableSession.json( + HttpClientContext.from(storageRpc), + getOptions().asRetryDependencies(), + retryAlgorithmManager.idempotent(), + jsonResumableWrite); + HttpContentRange contentRange = + HttpContentRange.of(ByteRangeSpec.relativeLength(0L, size), size); + ResumableOperationResult put = + session.put(RewindableContent.of(path), contentRange); + // all exception translation is taken care of down in the JsonResumableSession + StorageObject object = put.getObject(); + if (object == null) { + // if by some odd chance the put didn't get the StorageObject, query for it + ResumableOperationResult query = session.query(); + object = query.getObject(); + } + return codecs.blobInfo().decode(object); + } } diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITBlobWriteSessionTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITBlobWriteSessionTest.java index b8f89726e3..6cff689114 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITBlobWriteSessionTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITBlobWriteSessionTest.java @@ -69,7 +69,6 @@ public void allDefaults() throws Exception { } @Test - @CrossRun.Exclude(transports = Transport.HTTP) public void bufferToTempDirThenUpload() throws Exception { StorageOptions options = null; if (transport == Transport.GRPC) { @@ -78,6 +77,12 @@ public void bufferToTempDirThenUpload() throws Exception { .toBuilder() .setBlobWriteSessionConfig(BlobWriteSessionConfigs.bufferToTempDirThenUpload()) .build(); + } else if (transport == Transport.HTTP) { + options = + ((HttpStorageOptions) storage.getOptions()) + .toBuilder() + .setBlobWriteSessionConfig(BlobWriteSessionConfigs.bufferToTempDirThenUpload()) + .build(); } assertWithMessage("unable to resolve options").that(options).isNotNull(); //noinspection DataFlowIssue