diff --git a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/BlobAsyncClient.java b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/BlobAsyncClient.java
index 1db1debd2558..873969b6d9e8 100644
--- a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/BlobAsyncClient.java
+++ b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/BlobAsyncClient.java
@@ -63,13 +63,20 @@
* #getPageBlobAsyncClient() getPageBlobAsyncClient} to construct a client that allows blob specific operations.
*
*
- * Please refer to the Azure
+ * Please refer to the
+ * Azure
* Docs for more information.
*/
public class BlobAsyncClient extends BlobAsyncClientBase {
public static final int BLOB_DEFAULT_UPLOAD_BLOCK_SIZE = 4 * Constants.MB;
+ public static final int BLOB_DEFAULT_NUMBER_OF_BUFFERS = 8;
+ /**
+ * If a blob is known to be greater than 100MB, using a larger block size will trigger some server-side
+ * optimizations. If the block size is not set and the size of the blob is known to be greater than 100MB, this
+ * value will be used.
+ */
+ public static final int BLOB_DEFAULT_HTBB_UPLOAD_BLOCK_SIZE = 8 * Constants.MB;
static final int BLOB_MAX_UPLOAD_BLOCK_SIZE = 100 * Constants.MB;
-
private final ClientLogger logger = new ClientLogger(BlobAsyncClient.class);
/**
@@ -94,6 +101,7 @@ protected BlobAsyncClient(HttpPipeline pipeline, String url, BlobServiceVersion
* Creates a new {@link BlobAsyncClient} linked to the {@code snapshot} of this blob resource.
*
* @param snapshot the identifier for a specific snapshot of this blob
+ *
* @return a {@link BlobAsyncClient} used to interact with the specific snapshot.
*/
@Override
@@ -174,6 +182,7 @@ private SpecializedBlobClientBuilder prepareBuilder() {
* {@code Flux} be replayable. In other words, it does not have to support multiple subscribers and is not expected
* to produce the same values across subscriptions.
* @param parallelTransferOptions {@link ParallelTransferOptions} used to configure buffered uploading.
+ *
* @return A reactive response containing the information of the uploaded block blob.
*/
public Mono upload(Flux data, ParallelTransferOptions parallelTransferOptions) {
@@ -193,8 +202,8 @@ public Mono upload(Flux data, ParallelTransferOptions
* see the Azure Docs for Put Block and
* the Azure Docs for Put Block List.
*
- * The data passed need not support multiple subscriptions/be replayable as is required in other upload methods when
- * retries are enabled, and the length of the data need not be known in advance. Therefore, this method should
+ * The data passed need not support multiple subscriptions/be replayable as is required in other upload methods
+ * when retries are enabled, and the length of the data need not be known in advance. Therefore, this method should
* support uploading any arbitrary data source, including network streams. This behavior is possible because this
* method will perform some internal buffering as configured by the blockSize and numBuffers parameters, so while
* this method may offer additional convenience, it will not be as performant as other options, which should be
@@ -202,8 +211,8 @@ public Mono upload(Flux data, ParallelTransferOptions
*
* Typically, the greater the number of buffers used, the greater the possible parallelism when transferring the
* data. Larger buffers means we will have to stage fewer blocks and therefore require fewer IO operations. The
- * trade-offs between these values are context-dependent, so some experimentation may be required to optimize inputs
- * for a given scenario.
+ * trade-offs between these values are context-dependent, so some experimentation may be required to optimize
+ * inputs for a given scenario.
*
*
Code Samples
*
@@ -232,18 +241,16 @@ public Mono> uploadWithResponse(Flux data,
Objects.requireNonNull(data, "'data' must not be null");
BlobRequestConditions accessConditionsFinal = accessConditions == null
? new BlobRequestConditions() : accessConditions;
- final ParallelTransferOptions finalParallelTransferOptions = parallelTransferOptions == null
- ? new ParallelTransferOptions() : parallelTransferOptions;
- int blockSize = finalParallelTransferOptions.getBlockSize();
- int numBuffers = finalParallelTransferOptions.getNumBuffers();
- ProgressReceiver progressReceiver = finalParallelTransferOptions.getProgressReceiver();
+ final ParallelTransferOptions finalParallelTransferOptions = new ParallelTransferOptions();
+ finalParallelTransferOptions.populateAndApplyDefaults(parallelTransferOptions);
// See ProgressReporter for an explanation on why this lock is necessary and why we use AtomicLong.
AtomicLong totalProgress = new AtomicLong(0);
Lock progressLock = new ReentrantLock();
// Validation done in the constructor.
- UploadBufferPool pool = new UploadBufferPool(numBuffers, blockSize);
+ UploadBufferPool pool = new UploadBufferPool(finalParallelTransferOptions.getNumBuffers(),
+ finalParallelTransferOptions.getBlockSize());
/*
Break the source Flux into chunks that are <= chunk size. This makes filling the pooled buffers much easier
@@ -254,29 +261,30 @@ as we can guarantee we only need at most two buffers for any call to write (two
Flux chunkedSource = data
.filter(ByteBuffer::hasRemaining)
.flatMapSequential(buffer -> {
- if (buffer.remaining() <= blockSize) {
+ if (buffer.remaining() <= finalParallelTransferOptions.getBlockSize()) {
return Flux.just(buffer);
}
- int numSplits = (int) Math.ceil(buffer.remaining() / (double) blockSize);
+ int numSplits =
+ (int) Math.ceil(buffer.remaining() / (double) finalParallelTransferOptions.getBlockSize());
return Flux.range(0, numSplits)
.map(i -> {
ByteBuffer duplicate = buffer.duplicate().asReadOnlyBuffer();
- duplicate.position(i * blockSize);
- duplicate.limit(Math.min(duplicate.limit(), (i + 1) * blockSize));
+ duplicate.position(i * finalParallelTransferOptions.getBlockSize());
+ duplicate.limit(Math.min(duplicate.limit(),
+ (i + 1) * finalParallelTransferOptions.getBlockSize()));
return duplicate;
});
});
- /*
- Write to the pool and upload the output.
- */
+ /*
+ Write to the pool and upload the output.
+ */
return chunkedSource.concatMap(pool::write)
.concatWith(Flux.defer(pool::flush))
.flatMapSequential(buffer -> {
// Report progress as necessary.
Flux progressData = ProgressReporter.addParallelProgressReporting(Flux.just(buffer),
- progressReceiver, progressLock, totalProgress);
-
+ finalParallelTransferOptions.getProgressReceiver(), progressLock, totalProgress);
final String blockId = Base64.getEncoder().encodeToString(
UUID.randomUUID().toString().getBytes(UTF_8));
@@ -296,7 +304,6 @@ as we can guarantee we only need at most two buffers for any call to write (two
} catch (RuntimeException ex) {
return monoError(logger, ex);
}
-
}
/**
@@ -308,6 +315,7 @@ as we can guarantee we only need at most two buffers for any call to write (two
* {@codesnippet com.azure.storage.blob.BlobAsyncClient.uploadFromFile#String}
*
* @param filePath Path to the upload file
+ *
* @return An empty response
*/
public Mono uploadFromFile(String filePath) {
@@ -334,6 +342,7 @@ public Mono uploadFromFile(String filePath) {
* @param tier {@link AccessTier} for the destination blob.
* @param accessConditions {@link BlobRequestConditions}
* @return An empty response
+ *
* @throws IllegalArgumentException If {@code blockSize} is less than 0 or greater than 100MB
* @throws UncheckedIOException If an I/O error occurs
*/
@@ -342,10 +351,8 @@ public Mono uploadFromFile(String filePath, ParallelTransferOptions parall
BlobHttpHeaders headers, Map metadata, AccessTier tier,
BlobRequestConditions accessConditions) {
try {
- final ParallelTransferOptions finalParallelTransferOptions = parallelTransferOptions == null
- ? new ParallelTransferOptions()
- : parallelTransferOptions;
- ProgressReceiver progressReceiver = finalParallelTransferOptions.getProgressReceiver();
+ ParallelTransferOptions finalParallelTransferOptions = new ParallelTransferOptions();
+ finalParallelTransferOptions.populateAndApplyDefaults(parallelTransferOptions);
// See ProgressReporter for an explanation on why this lock is necessary and why we use AtomicLong.
AtomicLong totalProgress = new AtomicLong(0);
@@ -354,14 +361,15 @@ public Mono uploadFromFile(String filePath, ParallelTransferOptions parall
return Mono.using(() -> uploadFileResourceSupplier(filePath),
channel -> {
final SortedMap blockIds = new TreeMap<>();
- return Flux.fromIterable(sliceFile(filePath, finalParallelTransferOptions.getBlockSize()))
+ return Flux.fromIterable(sliceFile(filePath, finalParallelTransferOptions.getBlockSize(),
+ parallelTransferOptions == null || parallelTransferOptions.getBlockSize() == null))
.doOnNext(chunk -> blockIds.put(chunk.getOffset(), getBlockID()))
.flatMap(chunk -> {
String blockId = blockIds.get(chunk.getOffset());
Flux progressData = ProgressReporter.addParallelProgressReporting(
FluxUtil.readFile(channel, chunk.getOffset(), chunk.getCount()),
- progressReceiver, progressLock, totalProgress);
+ finalParallelTransferOptions.getProgressReceiver(), progressLock, totalProgress);
return getBlockBlobAsyncClient()
.stageBlockWithResponse(blockId, progressData, chunk.getCount(), null);
@@ -386,7 +394,9 @@ public Mono uploadFromFile(String filePath, ParallelTransferOptions parall
* Resource Supplier for UploadFile
*
* @param filePath The path for the file
+ *
* @return {@code AsynchronousFileChannel}
+ *
* @throws UncheckedIOException an input output exception.
*/
protected AsynchronousFileChannel uploadFileResourceSupplier(String filePath) {
@@ -409,9 +419,12 @@ private String getBlockID() {
return Base64.getEncoder().encodeToString(UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8));
}
- private List sliceFile(String path, int blockSize) {
+ private List sliceFile(String path, int blockSize, boolean enableHtbbOptimizations) {
File file = new File(path);
assert file.exists();
+ if (file.length() > 100 * Constants.MB && enableHtbbOptimizations) {
+ blockSize = BLOB_DEFAULT_HTBB_UPLOAD_BLOCK_SIZE;
+ }
List ranges = new ArrayList<>();
for (long pos = 0; pos < file.length(); pos += blockSize) {
long count = blockSize;
diff --git a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/models/ParallelTransferOptions.java b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/models/ParallelTransferOptions.java
index 9bce7e90712d..c4eae55a8e45 100644
--- a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/models/ParallelTransferOptions.java
+++ b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/models/ParallelTransferOptions.java
@@ -5,40 +5,36 @@
import com.azure.core.annotation.Fluent;
import com.azure.storage.blob.ProgressReceiver;
+import com.azure.storage.blob.specialized.BlockBlobAsyncClient;
import com.azure.storage.common.implementation.Constants;
import com.azure.storage.common.implementation.StorageImplUtils;
+import static com.azure.storage.blob.BlobAsyncClient.BLOB_DEFAULT_NUMBER_OF_BUFFERS;
+import static com.azure.storage.blob.BlobAsyncClient.BLOB_DEFAULT_UPLOAD_BLOCK_SIZE;
+
/**
* This class contains configuration used to parallelize data transfer operations.
*/
@Fluent
-public class ParallelTransferOptions {
-
- private static final int BLOB_DEFAULT_UPLOAD_BLOCK_SIZE = 4 * Constants.MB;
- private static final int BLOB_MAX_BLOCK_SIZE = 100 * Constants.MB;
+public final class ParallelTransferOptions {
- private static final int BLOB_DEFAULT_NUMBER_OF_PARALLEL_TRANSFERS = 8;
+ private static final int BLOB_MAX_UPLOAD_BLOCK_SIZE = 100 * Constants.MB;
- private int blockSize;
- private int numBuffers;
+ private Integer blockSize;
+ private Integer numBuffers;
private ProgressReceiver progressReceiver;
/**
* Creates a new {@link ParallelTransferOptions} with default parameters applied.
- * blockSize = 4MB
- * numBuffers = 8
*/
public ParallelTransferOptions() {
- this.blockSize = BLOB_DEFAULT_UPLOAD_BLOCK_SIZE;
- this.numBuffers = BLOB_DEFAULT_NUMBER_OF_PARALLEL_TRANSFERS;
- this.progressReceiver = null;
}
/**
* Gets the block size (chunk size) to transfer at a time.
* @return The block size.
*/
- public int getBlockSize() {
+ public Integer getBlockSize() {
return this.blockSize;
}
@@ -46,7 +42,7 @@ public int getBlockSize() {
* Gets the number of buffers being used for a transfer operation.
* @return The number of buffers.
*/
- public int getNumBuffers() {
+ public Integer getNumBuffers() {
return this.numBuffers;
}
@@ -68,8 +64,10 @@ public ProgressReceiver getProgressReceiver() {
* @return The updated ParallelTransferOptions object.
* @throws IllegalArgumentException when block size is less than 0 or greater than max blob block size (10MB).
*/
- public ParallelTransferOptions setBlockSize(int blockSize) {
- StorageImplUtils.assertInBounds("blockSize", blockSize, 0, BLOB_MAX_BLOCK_SIZE);
+ public ParallelTransferOptions setBlockSize(Integer blockSize) {
+ if (blockSize != null) {
+ StorageImplUtils.assertInBounds("blockSize", blockSize, 0, BlockBlobAsyncClient.MAX_STAGE_BLOCK_BYTES);
+ }
this.blockSize = blockSize;
return this;
}
@@ -77,21 +75,45 @@ public ParallelTransferOptions setBlockSize(int blockSize) {
/**
* Sets the number of buffers being used for an upload/download operation.
* @param numBuffers The number of buffers.
- * For upload, The number of buffers is the maximum number of buffers this method should allocate.
+ * For buffered upload only, the number of buffers is the maximum number of buffers this method should allocate.
* Must be at least two. Typically, the larger the number of buffers, the more parallel, and thus faster, the
* upload portion of this operation will be. The amount of memory consumed by this method may be up to
* blockSize * numBuffers.
* @return The updated ParallelTransferOptions object.
* @throws IllegalArgumentException when numBuffers is less than 2.
*/
- public ParallelTransferOptions setNumBuffers(int numBuffers) {
- StorageImplUtils.assertInBounds("numBuffers", numBuffers, 2, Integer.MAX_VALUE);
+ public ParallelTransferOptions setNumBuffers(Integer numBuffers) {
+ if (numBuffers != null) {
+ StorageImplUtils.assertInBounds("numBuffers", numBuffers, 2, Integer.MAX_VALUE);
+ }
this.numBuffers = numBuffers;
return this;
}
+ /**
+ * Sets the progress receiver for parallel reporting.
+ * @param progressReceiver The progress receiver.
+ * @return The updated ParallelTransferOptions object.
+ */
public ParallelTransferOptions setProgressReceiver(ProgressReceiver progressReceiver) {
this.progressReceiver = progressReceiver;
return this;
}
+
+ /**
+ * RESERVED FOR INTERNAL USE.
+ *
+ * @param other The customer provided transfer options. If it has non-null values, they will be used, otherwise
+ * defaults will be set.
+ */
+ public void populateAndApplyDefaults(ParallelTransferOptions other) {
+ if (other == null) {
+ other = new ParallelTransferOptions();
+ }
+ this.setBlockSize(other.getBlockSize() == null
+ ? Integer.valueOf(BLOB_DEFAULT_UPLOAD_BLOCK_SIZE) : other.getBlockSize());
+ this.setNumBuffers(other.getNumBuffers() == null
+ ? Integer.valueOf(BLOB_DEFAULT_NUMBER_OF_BUFFERS) : other.getNumBuffers());
+ this.setProgressReceiver(other.getProgressReceiver());
+ }
}
diff --git a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobAsyncClientBase.java b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobAsyncClientBase.java
index fa0ee80e33fb..2e094145e6e7 100644
--- a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobAsyncClientBase.java
+++ b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobAsyncClientBase.java
@@ -3,6 +3,7 @@
package com.azure.storage.blob.specialized;
+import static com.azure.core.implementation.util.FluxUtil.withContext;
import com.azure.core.http.HttpPipeline;
import com.azure.core.http.HttpResponse;
import com.azure.core.http.RequestConditions;
@@ -65,7 +66,6 @@
import static com.azure.core.implementation.util.FluxUtil.fluxError;
import static com.azure.core.implementation.util.FluxUtil.monoError;
-import static com.azure.core.implementation.util.FluxUtil.withContext;
/**
* This class provides a client that contains all operations that apply to any blob type.
@@ -747,11 +747,9 @@ public Mono> downloadToFileWithResponse(String filePath
// TODO (gapra) : Investigate if this is can be parallelized, and include the parallelTransfers parameter.
Mono> downloadToFileWithResponse(String filePath, BlobRange range,
ParallelTransferOptions parallelTransferOptions, ReliableDownloadOptions options,
- BlobRequestConditions accessConditions, boolean rangeGetContentMd5, Context context) {
- final ParallelTransferOptions finalParallelTransferOptions = parallelTransferOptions == null
- ? new ParallelTransferOptions()
- : parallelTransferOptions;
- ProgressReceiver progressReceiver = finalParallelTransferOptions.getProgressReceiver();
+ BlobRequestConditions accessConditions, boolean rangeGetContentMD5, Context context) {
+ ParallelTransferOptions finalParallelTransferOptions = new ParallelTransferOptions();
+ finalParallelTransferOptions.populateAndApplyDefaults(parallelTransferOptions);
// See ProgressReporter for an explanation on why this lock is necessary and why we use AtomicLong.
AtomicLong totalProgress = new AtomicLong(0);
@@ -760,9 +758,9 @@ Mono> downloadToFileWithResponse(String filePath, BlobR
return Mono.using(() -> downloadToFileResourceSupplier(filePath),
channel -> getPropertiesWithResponse(accessConditions)
.flatMap(response -> processInRange(channel, response,
- range, finalParallelTransferOptions.getBlockSize(), options, accessConditions, rangeGetContentMd5,
- context, totalProgress, progressLock, progressReceiver)), this::downloadToFileCleanup);
-
+ range, finalParallelTransferOptions.getBlockSize(), options, accessConditions, rangeGetContentMD5,
+ context, totalProgress, progressLock, finalParallelTransferOptions.getProgressReceiver())),
+ this::downloadToFileCleanup);
}
private Mono> processInRange(AsynchronousFileChannel channel,
diff --git a/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/APISpec.groovy b/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/APISpec.groovy
index c99dc35509d9..096f2cc49810 100644
--- a/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/APISpec.groovy
+++ b/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/APISpec.groovy
@@ -475,6 +475,18 @@ class APISpec extends Specification {
return ByteBuffer.wrap(getRandomByteArray(size))
}
+ /*
+ We only allow int because anything larger than 2GB (which would require a long) is left to stress/perf.
+ */
+ File getRandomFile(int size) {
+ File file = File.createTempFile(UUID.randomUUID().toString(), ".txt")
+ file.deleteOnExit()
+ FileOutputStream fos = new FileOutputStream(file)
+ fos.write(getRandomData(size).array())
+ fos.close()
+ return file
+ }
+
/**
* This will retrieve the etag to be used in testing match conditions. The result will typically be assigned to
* the ifMatch condition when testing success and the ifNoneMatch condition when testing failure.
diff --git a/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/specialized/BlockBlobAPITest.groovy b/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/specialized/BlockBlobAPITest.groovy
index 53340ca41b55..7cb4f3c44d25 100644
--- a/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/specialized/BlockBlobAPITest.groovy
+++ b/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/specialized/BlockBlobAPITest.groovy
@@ -35,7 +35,10 @@ import spock.lang.Requires
import spock.lang.Unroll
import java.nio.ByteBuffer
+import java.nio.channels.AsynchronousFileChannel
import java.nio.charset.StandardCharsets
+import java.nio.file.Paths
+import java.nio.file.StandardOpenOption
import java.security.MessageDigest
class BlockBlobAPITest extends APISpec {
@@ -593,17 +596,56 @@ class BlockBlobAPITest extends APISpec {
*/
@Requires({ liveMode() })
+ @Unroll
def "Upload from file"() {
- given:
- def file = new File(this.getClass().getResource("/testfiles/uploadFromFileTestData.txt").getPath())
- def outStream = new ByteArrayOutputStream()
+ setup:
+ def file = getRandomFile(fileSize)
+ def channel = AsynchronousFileChannel.open(file.toPath())
when:
- blobClient.uploadFromFile(file.getAbsolutePath())
+ // Block length will be ignored for single shot.
+ blobac.uploadFromFile(file.toPath().toString(), new ParallelTransferOptions().setBlockSize(blockSize),
+ null, null, null, null).block()
then:
- bc.download(outStream)
- outStream.toByteArray() == new Scanner(file).useDelimiter("\\z").next().getBytes(StandardCharsets.UTF_8)
+ def outFile = file.getPath().toString() + "result"
+ def outChannel = AsynchronousFileChannel.open(Paths.get(outFile), StandardOpenOption.CREATE, StandardOpenOption.WRITE)
+ FluxUtil.writeFile(blobac.download(), outChannel).block() == null
+
+ compareFiles(file, new File(outFile))
+ blobac.getBlockBlobAsyncClient().listBlocks(BlockListType.COMMITTED).block().getCommittedBlocks().size() ==
+ commitedBlockCount
+
+ cleanup:
+ channel.close()
+
+ where:
+ fileSize | blockSize || commitedBlockCount
+ 0 | null || 0
+ 10 | null || 1
+ 10 * 1024 | null || 1
+ 50 * 1024 * 1024 | null || Math.ceil((50 * 1024 * 1024) / BlobAsyncClient.BLOB_DEFAULT_UPLOAD_BLOCK_SIZE)
+ BlockBlobAsyncClient.MAX_UPLOAD_BLOB_BYTES + 1 | null || Math.ceil((BlockBlobClient.MAX_UPLOAD_BLOB_BYTES + 1) / BlobAsyncClient.BLOB_DEFAULT_HTBB_UPLOAD_BLOCK_SIZE) // HTBB optimizations should trigger when file size is >100MB and defaults are used.
+ 101 * 1024 * 1024 | 4 * 1024 * 1024 || 26 // Making the block size explicit should cancel the optimization
+ }
+
+ def compareFiles(File file1, File file2) {
+ FileInputStream fis1 = new FileInputStream(file1)
+ FileInputStream fis2 = new FileInputStream(file2)
+
+ byte b1 = fis1.read()
+ byte b2 = fis2.read()
+
+ while (b1 != -1 && b2 != -1) {
+ if (b1 != b2) {
+ return false
+ }
+ b1 = fis1.read()
+ b2 = fis2.read()
+ }
+ fis1.close()
+ fis2.close()
+ return b1 == b2
}
@Requires({ liveMode() })
@@ -724,7 +766,6 @@ class BlockBlobAPITest extends APISpec {
.setIfModifiedSince(modified)
.setIfUnmodifiedSince(unmodified)
-
expect:
bc.uploadWithResponse(defaultInputStream.get(), defaultDataSize, null, null, null, bac, null, null).getStatusCode() == 201