diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/TransferManagerImpl.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/TransferManagerImpl.java index 4bbbeb9393..5489c1c890 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/TransferManagerImpl.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/TransferManagerImpl.java @@ -26,6 +26,7 @@ import com.google.cloud.storage.BlobInfo; import com.google.cloud.storage.BlobWriteSessionConfigs; import com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig; +import com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.BufferAllocationStrategy; import com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.ExecutorSupplier; import com.google.cloud.storage.Storage; import com.google.cloud.storage.Storage.BlobWriteOption; @@ -83,7 +84,11 @@ final class TransferManagerImpl implements TransferManager { if (transferManagerConfig.isAllowParallelCompositeUpload()) { ParallelCompositeUploadBlobWriteSessionConfig pcuConfig = BlobWriteSessionConfigs.parallelCompositeUpload() - .withExecutorSupplier(ExecutorSupplier.useExecutor(executor)); + .withExecutorSupplier(ExecutorSupplier.useExecutor(executor)) + .withBufferAllocationStrategy( + BufferAllocationStrategy.fixedPool( + transferManagerConfig.getMaxWorkers(), + transferManagerConfig.getPerWorkerBufferSize())); storageOptions = storageOptions.toBuilder().setBlobWriteSessionConfig(pcuConfig).build(); } this.pcuQueue = new ConcurrentLinkedDeque<>(); @@ -264,8 +269,13 @@ public void run() { return; } - UploadResult result = poll.callable.call(); - poll.resultFuture.set(result); + try { + UploadResult result = poll.callable.call(); + poll.resultFuture.set(result); + } catch (Throwable e) { + poll.resultFuture.setException(e); + throw e; + } } while (true); }