From b8206459115c469fae4a34a12d68213e31c260d9 Mon Sep 17 00:00:00 2001 From: Rick Ley Date: Tue, 22 Oct 2019 14:12:28 -0700 Subject: [PATCH] Optimizations for default block size on HTBB --- .../azure/storage/blob/BlobAsyncClient.java | 71 +++++++++++-------- .../blob/models/ParallelTransferOptions.java | 60 +++++++++++----- .../blob/specialized/BlobAsyncClientBase.java | 16 ++--- .../com/azure/storage/blob/APISpec.groovy | 12 ++++ .../blob/specialized/BlockBlobAPITest.groovy | 55 ++++++++++++-- 5 files changed, 150 insertions(+), 64 deletions(-) diff --git a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/BlobAsyncClient.java b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/BlobAsyncClient.java index 1db1debd2558..873969b6d9e8 100644 --- a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/BlobAsyncClient.java +++ b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/BlobAsyncClient.java @@ -63,13 +63,20 @@ * #getPageBlobAsyncClient() getPageBlobAsyncClient} to construct a client that allows blob specific operations. * *

- * Please refer to the Azure + * Please refer to the + * Azure * Docs for more information. */ public class BlobAsyncClient extends BlobAsyncClientBase { public static final int BLOB_DEFAULT_UPLOAD_BLOCK_SIZE = 4 * Constants.MB; + public static final int BLOB_DEFAULT_NUMBER_OF_BUFFERS = 8; + /** + * If a blob is known to be greater than 100MB, using a larger block size will trigger some server-side + * optimizations. If the block size is not set and the size of the blob is known to be greater than 100MB, this + * value will be used. + */ + public static final int BLOB_DEFAULT_HTBB_UPLOAD_BLOCK_SIZE = 8 * Constants.MB; static final int BLOB_MAX_UPLOAD_BLOCK_SIZE = 100 * Constants.MB; - private final ClientLogger logger = new ClientLogger(BlobAsyncClient.class); /** @@ -94,6 +101,7 @@ protected BlobAsyncClient(HttpPipeline pipeline, String url, BlobServiceVersion * Creates a new {@link BlobAsyncClient} linked to the {@code snapshot} of this blob resource. * * @param snapshot the identifier for a specific snapshot of this blob + * * @return a {@link BlobAsyncClient} used to interact with the specific snapshot. */ @Override @@ -174,6 +182,7 @@ private SpecializedBlobClientBuilder prepareBuilder() { * {@code Flux} be replayable. In other words, it does not have to support multiple subscribers and is not expected * to produce the same values across subscriptions. * @param parallelTransferOptions {@link ParallelTransferOptions} used to configure buffered uploading. + * * @return A reactive response containing the information of the uploaded block blob. */ public Mono upload(Flux data, ParallelTransferOptions parallelTransferOptions) { @@ -193,8 +202,8 @@ public Mono upload(Flux data, ParallelTransferOptions * see the Azure Docs for Put Block and * the Azure Docs for Put Block List. *

- * The data passed need not support multiple subscriptions/be replayable as is required in other upload methods when - * retries are enabled, and the length of the data need not be known in advance. Therefore, this method should + * The data passed need not support multiple subscriptions/be replayable as is required in other upload methods + * when retries are enabled, and the length of the data need not be known in advance. Therefore, this method should * support uploading any arbitrary data source, including network streams. This behavior is possible because this * method will perform some internal buffering as configured by the blockSize and numBuffers parameters, so while * this method may offer additional convenience, it will not be as performant as other options, which should be @@ -202,8 +211,8 @@ public Mono upload(Flux data, ParallelTransferOptions *

* Typically, the greater the number of buffers used, the greater the possible parallelism when transferring the * data. Larger buffers means we will have to stage fewer blocks and therefore require fewer IO operations. The - * trade-offs between these values are context-dependent, so some experimentation may be required to optimize inputs - * for a given scenario. + * trade-offs between these values are context-dependent, so some experimentation may be required to optimize + * inputs for a given scenario. * *

Code Samples

* @@ -232,18 +241,16 @@ public Mono> uploadWithResponse(Flux data, Objects.requireNonNull(data, "'data' must not be null"); BlobRequestConditions accessConditionsFinal = accessConditions == null ? new BlobRequestConditions() : accessConditions; - final ParallelTransferOptions finalParallelTransferOptions = parallelTransferOptions == null - ? new ParallelTransferOptions() : parallelTransferOptions; - int blockSize = finalParallelTransferOptions.getBlockSize(); - int numBuffers = finalParallelTransferOptions.getNumBuffers(); - ProgressReceiver progressReceiver = finalParallelTransferOptions.getProgressReceiver(); + final ParallelTransferOptions finalParallelTransferOptions = new ParallelTransferOptions(); + finalParallelTransferOptions.populateAndApplyDefaults(parallelTransferOptions); // See ProgressReporter for an explanation on why this lock is necessary and why we use AtomicLong. AtomicLong totalProgress = new AtomicLong(0); Lock progressLock = new ReentrantLock(); // Validation done in the constructor. - UploadBufferPool pool = new UploadBufferPool(numBuffers, blockSize); + UploadBufferPool pool = new UploadBufferPool(finalParallelTransferOptions.getNumBuffers(), + finalParallelTransferOptions.getBlockSize()); /* Break the source Flux into chunks that are <= chunk size. This makes filling the pooled buffers much easier @@ -254,29 +261,30 @@ as we can guarantee we only need at most two buffers for any call to write (two Flux chunkedSource = data .filter(ByteBuffer::hasRemaining) .flatMapSequential(buffer -> { - if (buffer.remaining() <= blockSize) { + if (buffer.remaining() <= finalParallelTransferOptions.getBlockSize()) { return Flux.just(buffer); } - int numSplits = (int) Math.ceil(buffer.remaining() / (double) blockSize); + int numSplits = + (int) Math.ceil(buffer.remaining() / (double) finalParallelTransferOptions.getBlockSize()); return Flux.range(0, numSplits) .map(i -> { ByteBuffer duplicate = buffer.duplicate().asReadOnlyBuffer(); - duplicate.position(i * blockSize); - duplicate.limit(Math.min(duplicate.limit(), (i + 1) * blockSize)); + duplicate.position(i * finalParallelTransferOptions.getBlockSize()); + duplicate.limit(Math.min(duplicate.limit(), + (i + 1) * finalParallelTransferOptions.getBlockSize())); return duplicate; }); }); - /* - Write to the pool and upload the output. - */ + /* + Write to the pool and upload the output. + */ return chunkedSource.concatMap(pool::write) .concatWith(Flux.defer(pool::flush)) .flatMapSequential(buffer -> { // Report progress as necessary. Flux progressData = ProgressReporter.addParallelProgressReporting(Flux.just(buffer), - progressReceiver, progressLock, totalProgress); - + finalParallelTransferOptions.getProgressReceiver(), progressLock, totalProgress); final String blockId = Base64.getEncoder().encodeToString( UUID.randomUUID().toString().getBytes(UTF_8)); @@ -296,7 +304,6 @@ as we can guarantee we only need at most two buffers for any call to write (two } catch (RuntimeException ex) { return monoError(logger, ex); } - } /** @@ -308,6 +315,7 @@ as we can guarantee we only need at most two buffers for any call to write (two * {@codesnippet com.azure.storage.blob.BlobAsyncClient.uploadFromFile#String} * * @param filePath Path to the upload file + * * @return An empty response */ public Mono uploadFromFile(String filePath) { @@ -334,6 +342,7 @@ public Mono uploadFromFile(String filePath) { * @param tier {@link AccessTier} for the destination blob. * @param accessConditions {@link BlobRequestConditions} * @return An empty response + * * @throws IllegalArgumentException If {@code blockSize} is less than 0 or greater than 100MB * @throws UncheckedIOException If an I/O error occurs */ @@ -342,10 +351,8 @@ public Mono uploadFromFile(String filePath, ParallelTransferOptions parall BlobHttpHeaders headers, Map metadata, AccessTier tier, BlobRequestConditions accessConditions) { try { - final ParallelTransferOptions finalParallelTransferOptions = parallelTransferOptions == null - ? new ParallelTransferOptions() - : parallelTransferOptions; - ProgressReceiver progressReceiver = finalParallelTransferOptions.getProgressReceiver(); + ParallelTransferOptions finalParallelTransferOptions = new ParallelTransferOptions(); + finalParallelTransferOptions.populateAndApplyDefaults(parallelTransferOptions); // See ProgressReporter for an explanation on why this lock is necessary and why we use AtomicLong. AtomicLong totalProgress = new AtomicLong(0); @@ -354,14 +361,15 @@ public Mono uploadFromFile(String filePath, ParallelTransferOptions parall return Mono.using(() -> uploadFileResourceSupplier(filePath), channel -> { final SortedMap blockIds = new TreeMap<>(); - return Flux.fromIterable(sliceFile(filePath, finalParallelTransferOptions.getBlockSize())) + return Flux.fromIterable(sliceFile(filePath, finalParallelTransferOptions.getBlockSize(), + parallelTransferOptions == null || parallelTransferOptions.getBlockSize() == null)) .doOnNext(chunk -> blockIds.put(chunk.getOffset(), getBlockID())) .flatMap(chunk -> { String blockId = blockIds.get(chunk.getOffset()); Flux progressData = ProgressReporter.addParallelProgressReporting( FluxUtil.readFile(channel, chunk.getOffset(), chunk.getCount()), - progressReceiver, progressLock, totalProgress); + finalParallelTransferOptions.getProgressReceiver(), progressLock, totalProgress); return getBlockBlobAsyncClient() .stageBlockWithResponse(blockId, progressData, chunk.getCount(), null); @@ -386,7 +394,9 @@ public Mono uploadFromFile(String filePath, ParallelTransferOptions parall * Resource Supplier for UploadFile * * @param filePath The path for the file + * * @return {@code AsynchronousFileChannel} + * * @throws UncheckedIOException an input output exception. */ protected AsynchronousFileChannel uploadFileResourceSupplier(String filePath) { @@ -409,9 +419,12 @@ private String getBlockID() { return Base64.getEncoder().encodeToString(UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8)); } - private List sliceFile(String path, int blockSize) { + private List sliceFile(String path, int blockSize, boolean enableHtbbOptimizations) { File file = new File(path); assert file.exists(); + if (file.length() > 100 * Constants.MB && enableHtbbOptimizations) { + blockSize = BLOB_DEFAULT_HTBB_UPLOAD_BLOCK_SIZE; + } List ranges = new ArrayList<>(); for (long pos = 0; pos < file.length(); pos += blockSize) { long count = blockSize; diff --git a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/models/ParallelTransferOptions.java b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/models/ParallelTransferOptions.java index 9bce7e90712d..c4eae55a8e45 100644 --- a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/models/ParallelTransferOptions.java +++ b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/models/ParallelTransferOptions.java @@ -5,40 +5,36 @@ import com.azure.core.annotation.Fluent; import com.azure.storage.blob.ProgressReceiver; +import com.azure.storage.blob.specialized.BlockBlobAsyncClient; import com.azure.storage.common.implementation.Constants; import com.azure.storage.common.implementation.StorageImplUtils; +import static com.azure.storage.blob.BlobAsyncClient.BLOB_DEFAULT_NUMBER_OF_BUFFERS; +import static com.azure.storage.blob.BlobAsyncClient.BLOB_DEFAULT_UPLOAD_BLOCK_SIZE; + /** * This class contains configuration used to parallelize data transfer operations. */ @Fluent -public class ParallelTransferOptions { - - private static final int BLOB_DEFAULT_UPLOAD_BLOCK_SIZE = 4 * Constants.MB; - private static final int BLOB_MAX_BLOCK_SIZE = 100 * Constants.MB; +public final class ParallelTransferOptions { - private static final int BLOB_DEFAULT_NUMBER_OF_PARALLEL_TRANSFERS = 8; + private static final int BLOB_MAX_UPLOAD_BLOCK_SIZE = 100 * Constants.MB; - private int blockSize; - private int numBuffers; + private Integer blockSize; + private Integer numBuffers; private ProgressReceiver progressReceiver; /** * Creates a new {@link ParallelTransferOptions} with default parameters applied. - * blockSize = 4MB - * numBuffers = 8 */ public ParallelTransferOptions() { - this.blockSize = BLOB_DEFAULT_UPLOAD_BLOCK_SIZE; - this.numBuffers = BLOB_DEFAULT_NUMBER_OF_PARALLEL_TRANSFERS; - this.progressReceiver = null; } /** * Gets the block size (chunk size) to transfer at a time. * @return The block size. */ - public int getBlockSize() { + public Integer getBlockSize() { return this.blockSize; } @@ -46,7 +42,7 @@ public int getBlockSize() { * Gets the number of buffers being used for a transfer operation. * @return The number of buffers. */ - public int getNumBuffers() { + public Integer getNumBuffers() { return this.numBuffers; } @@ -68,8 +64,10 @@ public ProgressReceiver getProgressReceiver() { * @return The updated ParallelTransferOptions object. * @throws IllegalArgumentException when block size is less than 0 or greater than max blob block size (10MB). */ - public ParallelTransferOptions setBlockSize(int blockSize) { - StorageImplUtils.assertInBounds("blockSize", blockSize, 0, BLOB_MAX_BLOCK_SIZE); + public ParallelTransferOptions setBlockSize(Integer blockSize) { + if (blockSize != null) { + StorageImplUtils.assertInBounds("blockSize", blockSize, 0, BlockBlobAsyncClient.MAX_STAGE_BLOCK_BYTES); + } this.blockSize = blockSize; return this; } @@ -77,21 +75,45 @@ public ParallelTransferOptions setBlockSize(int blockSize) { /** * Sets the number of buffers being used for an upload/download operation. * @param numBuffers The number of buffers. - * For upload, The number of buffers is the maximum number of buffers this method should allocate. + * For buffered upload only, the number of buffers is the maximum number of buffers this method should allocate. * Must be at least two. Typically, the larger the number of buffers, the more parallel, and thus faster, the * upload portion of this operation will be. The amount of memory consumed by this method may be up to * blockSize * numBuffers. * @return The updated ParallelTransferOptions object. * @throws IllegalArgumentException when numBuffers is less than 2. */ - public ParallelTransferOptions setNumBuffers(int numBuffers) { - StorageImplUtils.assertInBounds("numBuffers", numBuffers, 2, Integer.MAX_VALUE); + public ParallelTransferOptions setNumBuffers(Integer numBuffers) { + if (numBuffers != null) { + StorageImplUtils.assertInBounds("numBuffers", numBuffers, 2, Integer.MAX_VALUE); + } this.numBuffers = numBuffers; return this; } + /** + * Sets the progress receiver for parallel reporting. + * @param progressReceiver The progress receiver. + * @return The updated ParallelTransferOptions object. + */ public ParallelTransferOptions setProgressReceiver(ProgressReceiver progressReceiver) { this.progressReceiver = progressReceiver; return this; } + + /** + * RESERVED FOR INTERNAL USE. + * + * @param other The customer provided transfer options. If it has non-null values, they will be used, otherwise + * defaults will be set. + */ + public void populateAndApplyDefaults(ParallelTransferOptions other) { + if (other == null) { + other = new ParallelTransferOptions(); + } + this.setBlockSize(other.getBlockSize() == null + ? Integer.valueOf(BLOB_DEFAULT_UPLOAD_BLOCK_SIZE) : other.getBlockSize()); + this.setNumBuffers(other.getNumBuffers() == null + ? Integer.valueOf(BLOB_DEFAULT_NUMBER_OF_BUFFERS) : other.getNumBuffers()); + this.setProgressReceiver(other.getProgressReceiver()); + } } diff --git a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobAsyncClientBase.java b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobAsyncClientBase.java index fa0ee80e33fb..2e094145e6e7 100644 --- a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobAsyncClientBase.java +++ b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobAsyncClientBase.java @@ -3,6 +3,7 @@ package com.azure.storage.blob.specialized; +import static com.azure.core.implementation.util.FluxUtil.withContext; import com.azure.core.http.HttpPipeline; import com.azure.core.http.HttpResponse; import com.azure.core.http.RequestConditions; @@ -65,7 +66,6 @@ import static com.azure.core.implementation.util.FluxUtil.fluxError; import static com.azure.core.implementation.util.FluxUtil.monoError; -import static com.azure.core.implementation.util.FluxUtil.withContext; /** * This class provides a client that contains all operations that apply to any blob type. @@ -747,11 +747,9 @@ public Mono> downloadToFileWithResponse(String filePath // TODO (gapra) : Investigate if this is can be parallelized, and include the parallelTransfers parameter. Mono> downloadToFileWithResponse(String filePath, BlobRange range, ParallelTransferOptions parallelTransferOptions, ReliableDownloadOptions options, - BlobRequestConditions accessConditions, boolean rangeGetContentMd5, Context context) { - final ParallelTransferOptions finalParallelTransferOptions = parallelTransferOptions == null - ? new ParallelTransferOptions() - : parallelTransferOptions; - ProgressReceiver progressReceiver = finalParallelTransferOptions.getProgressReceiver(); + BlobRequestConditions accessConditions, boolean rangeGetContentMD5, Context context) { + ParallelTransferOptions finalParallelTransferOptions = new ParallelTransferOptions(); + finalParallelTransferOptions.populateAndApplyDefaults(parallelTransferOptions); // See ProgressReporter for an explanation on why this lock is necessary and why we use AtomicLong. AtomicLong totalProgress = new AtomicLong(0); @@ -760,9 +758,9 @@ Mono> downloadToFileWithResponse(String filePath, BlobR return Mono.using(() -> downloadToFileResourceSupplier(filePath), channel -> getPropertiesWithResponse(accessConditions) .flatMap(response -> processInRange(channel, response, - range, finalParallelTransferOptions.getBlockSize(), options, accessConditions, rangeGetContentMd5, - context, totalProgress, progressLock, progressReceiver)), this::downloadToFileCleanup); - + range, finalParallelTransferOptions.getBlockSize(), options, accessConditions, rangeGetContentMD5, + context, totalProgress, progressLock, finalParallelTransferOptions.getProgressReceiver())), + this::downloadToFileCleanup); } private Mono> processInRange(AsynchronousFileChannel channel, diff --git a/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/APISpec.groovy b/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/APISpec.groovy index c99dc35509d9..096f2cc49810 100644 --- a/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/APISpec.groovy +++ b/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/APISpec.groovy @@ -475,6 +475,18 @@ class APISpec extends Specification { return ByteBuffer.wrap(getRandomByteArray(size)) } + /* + We only allow int because anything larger than 2GB (which would require a long) is left to stress/perf. + */ + File getRandomFile(int size) { + File file = File.createTempFile(UUID.randomUUID().toString(), ".txt") + file.deleteOnExit() + FileOutputStream fos = new FileOutputStream(file) + fos.write(getRandomData(size).array()) + fos.close() + return file + } + /** * This will retrieve the etag to be used in testing match conditions. The result will typically be assigned to * the ifMatch condition when testing success and the ifNoneMatch condition when testing failure. diff --git a/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/specialized/BlockBlobAPITest.groovy b/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/specialized/BlockBlobAPITest.groovy index 53340ca41b55..7cb4f3c44d25 100644 --- a/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/specialized/BlockBlobAPITest.groovy +++ b/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/specialized/BlockBlobAPITest.groovy @@ -35,7 +35,10 @@ import spock.lang.Requires import spock.lang.Unroll import java.nio.ByteBuffer +import java.nio.channels.AsynchronousFileChannel import java.nio.charset.StandardCharsets +import java.nio.file.Paths +import java.nio.file.StandardOpenOption import java.security.MessageDigest class BlockBlobAPITest extends APISpec { @@ -593,17 +596,56 @@ class BlockBlobAPITest extends APISpec { */ @Requires({ liveMode() }) + @Unroll def "Upload from file"() { - given: - def file = new File(this.getClass().getResource("/testfiles/uploadFromFileTestData.txt").getPath()) - def outStream = new ByteArrayOutputStream() + setup: + def file = getRandomFile(fileSize) + def channel = AsynchronousFileChannel.open(file.toPath()) when: - blobClient.uploadFromFile(file.getAbsolutePath()) + // Block length will be ignored for single shot. + blobac.uploadFromFile(file.toPath().toString(), new ParallelTransferOptions().setBlockSize(blockSize), + null, null, null, null).block() then: - bc.download(outStream) - outStream.toByteArray() == new Scanner(file).useDelimiter("\\z").next().getBytes(StandardCharsets.UTF_8) + def outFile = file.getPath().toString() + "result" + def outChannel = AsynchronousFileChannel.open(Paths.get(outFile), StandardOpenOption.CREATE, StandardOpenOption.WRITE) + FluxUtil.writeFile(blobac.download(), outChannel).block() == null + + compareFiles(file, new File(outFile)) + blobac.getBlockBlobAsyncClient().listBlocks(BlockListType.COMMITTED).block().getCommittedBlocks().size() == + commitedBlockCount + + cleanup: + channel.close() + + where: + fileSize | blockSize || commitedBlockCount + 0 | null || 0 + 10 | null || 1 + 10 * 1024 | null || 1 + 50 * 1024 * 1024 | null || Math.ceil((50 * 1024 * 1024) / BlobAsyncClient.BLOB_DEFAULT_UPLOAD_BLOCK_SIZE) + BlockBlobAsyncClient.MAX_UPLOAD_BLOB_BYTES + 1 | null || Math.ceil((BlockBlobClient.MAX_UPLOAD_BLOB_BYTES + 1) / BlobAsyncClient.BLOB_DEFAULT_HTBB_UPLOAD_BLOCK_SIZE) // HTBB optimizations should trigger when file size is >100MB and defaults are used. + 101 * 1024 * 1024 | 4 * 1024 * 1024 || 26 // Making the block size explicit should cancel the optimization + } + + def compareFiles(File file1, File file2) { + FileInputStream fis1 = new FileInputStream(file1) + FileInputStream fis2 = new FileInputStream(file2) + + byte b1 = fis1.read() + byte b2 = fis2.read() + + while (b1 != -1 && b2 != -1) { + if (b1 != b2) { + return false + } + b1 = fis1.read() + b2 = fis2.read() + } + fis1.close() + fis2.close() + return b1 == b2 } @Requires({ liveMode() }) @@ -724,7 +766,6 @@ class BlockBlobAPITest extends APISpec { .setIfModifiedSince(modified) .setIfUnmodifiedSince(unmodified) - expect: bc.uploadWithResponse(defaultInputStream.get(), defaultDataSize, null, null, null, bac, null, null).getStatusCode() == 201