From 6f2f5045bb7c1dabdd9b1c19ce7d2b02163c0eb8 Mon Sep 17 00:00:00 2001 From: BenWhitehead Date: Thu, 13 Jun 2024 15:11:22 -0400 Subject: [PATCH] fix: include x-goog-user-project on resumable upload puts for grpc transport (#2586) In HTTP transport, gcs includes the userProject in the returned location. However, in grpc the uploadId is opaque and does not necessarily include the userProject in a client visible way. For completeness sake, include x-goog-user-project in resumable upload puts. --- .../cloud/storage/DefaultBlobWriteSessionConfig.java | 5 ++++- .../java/com/google/cloud/storage/GrpcStorageImpl.java | 2 +- .../storage/JournalingBlobWriteSessionConfig.java | 10 ++++++++-- 3 files changed, 13 insertions(+), 4 deletions(-) diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/DefaultBlobWriteSessionConfig.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/DefaultBlobWriteSessionConfig.java index 4b6c34ef69..5a7ef18198 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/DefaultBlobWriteSessionConfig.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/DefaultBlobWriteSessionConfig.java @@ -159,7 +159,10 @@ public WritableByteChannelSession writeSession( grpc.startResumableWrite(grpcCallContext, req); return ResumableMedia.gapic() .write() - .byteChannel(grpc.storageClient.writeObjectCallable()) + .byteChannel( + grpc.storageClient + .writeObjectCallable() + .withDefaultCallContext(grpcCallContext)) .setHasher(Hasher.noop()) .setByteStringStrategy(ByteStringStrategy.copy()) .resumable() diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java index c195ce78f3..0d0904ca1c 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java @@ -796,7 +796,7 @@ public GrpcBlobWriteChannel writer(BlobInfo blobInfo, BlobWriteOption... options // 3. wrap the result in another future container before constructing the BlobWriteChannel ApiFuture wrapped = ApiFutures.immediateFuture(resumableWrite); return new GrpcBlobWriteChannel( - storageClient.writeObjectCallable(), + storageClient.writeObjectCallable().withDefaultCallContext(grpcCallContext), getOptions(), retryAlgorithmManager.idempotent(), () -> wrapped, diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/JournalingBlobWriteSessionConfig.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/JournalingBlobWriteSessionConfig.java index 31784bfded..7d0e46ce5b 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/JournalingBlobWriteSessionConfig.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/JournalingBlobWriteSessionConfig.java @@ -23,6 +23,7 @@ import com.google.api.core.InternalApi; import com.google.api.gax.grpc.GrpcCallContext; import com.google.api.gax.rpc.ApiExceptions; +import com.google.api.gax.rpc.ClientStreamingCallable; import com.google.cloud.storage.Conversions.Decoder; import com.google.cloud.storage.RecoveryFileManager.RecoveryVolumeSinkFactory; import com.google.cloud.storage.Storage.BlobWriteOption; @@ -34,6 +35,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.MoreExecutors; import com.google.storage.v2.ServiceConstants.Values; +import com.google.storage.v2.WriteObjectRequest; import com.google.storage.v2.WriteObjectResponse; import java.io.IOException; import java.io.ObjectInputStream; @@ -184,16 +186,20 @@ public WritableByteChannelSession writeSession( if (storage instanceof GrpcStorageImpl) { GrpcStorageImpl grpcStorage = (GrpcStorageImpl) storage; RecoveryFile recoveryFile = recoveryFileManager.newRecoveryFile(info); + GrpcCallContext grpcCallContext = + opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault()); ApiFuture f = grpcStorage.startResumableWrite( - GrpcCallContext.createDefault(), grpcStorage.getWriteObjectRequest(info, opts)); + grpcCallContext, grpcStorage.getWriteObjectRequest(info, opts)); ApiFuture> start = ApiFutures.transform(f, WriteCtx::new, MoreExecutors.directExecutor()); + ClientStreamingCallable write = + grpcStorage.storageClient.writeObjectCallable().withDefaultCallContext(grpcCallContext); BufferedWritableByteChannelSession session = ResumableMedia.gapic() .write() - .byteChannel(grpcStorage.storageClient.writeObjectCallable()) + .byteChannel(write) .setHasher(Hasher.noop()) .setByteStringStrategy(ByteStringStrategy.copy()) .journaling()