diff --git a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/BlobAsyncClient.java b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/BlobAsyncClient.java index 65eaa5db744a..b747d9afe4e5 100644 --- a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/BlobAsyncClient.java +++ b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/BlobAsyncClient.java @@ -431,7 +431,7 @@ private Mono> determineUploadFullOrChunked(final Flux uploadFromFile(String filePath, boolean overwrite) { public Mono uploadFromFile(String filePath, ParallelTransferOptions parallelTransferOptions, BlobHttpHeaders headers, Map metadata, AccessTier tier, BlobRequestConditions requestConditions) { + Integer originalBlockSize = (parallelTransferOptions == null) + ? null + : parallelTransferOptions.getBlockSize(); final ParallelTransferOptions finalParallelTransferOptions = ModelHelper.populateAndApplyDefaults(parallelTransferOptions); try { @@ -549,8 +552,8 @@ public Mono uploadFromFile(String filePath, ParallelTransferOptions parall // If the file is larger than 256MB chunk it and stage it as blocks. if (uploadInBlocks(filePath, finalParallelTransferOptions.getMaxSingleUploadSize())) { - return uploadBlocks(fileSize, finalParallelTransferOptions, headers, metadata, tier, - requestConditions, channel, blockBlobAsyncClient); + return uploadBlocks(fileSize, finalParallelTransferOptions, originalBlockSize, headers, + metadata, tier, requestConditions, channel, blockBlobAsyncClient); } else { // Otherwise we know it can be sent in a single request reducing network overhead. return blockBlobAsyncClient.uploadWithResponse(FluxUtil.readFile(channel), fileSize, @@ -581,7 +584,7 @@ boolean uploadInBlocks(String filePath, Integer maxSingleUploadSize) { } private Mono uploadBlocks(long fileSize, ParallelTransferOptions parallelTransferOptions, - BlobHttpHeaders headers, Map metadata, AccessTier tier, + Integer originalBlockSize, BlobHttpHeaders headers, Map metadata, AccessTier tier, BlobRequestConditions requestConditions, AsynchronousFileChannel channel, BlockBlobAsyncClient client) { final BlobRequestConditions finalRequestConditions = (requestConditions == null) ? new BlobRequestConditions() : requestConditions; @@ -592,8 +595,7 @@ private Mono uploadBlocks(long fileSize, ParallelTransferOptions parallelT Lock progressLock = new ReentrantLock(); final SortedMap blockIds = new TreeMap<>(); - return Flux.fromIterable(sliceFile(fileSize, parallelTransferOptions.getBlockSize(), - parallelTransferOptions)) + return Flux.fromIterable(sliceFile(fileSize, originalBlockSize, parallelTransferOptions.getBlockSize())) .flatMap(chunk -> { String blockId = getBlockID(); blockIds.put(chunk.getOffset(), blockId); @@ -639,10 +641,9 @@ private String getBlockID() { return Base64.getEncoder().encodeToString(UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8)); } - private List sliceFile(long fileSize, int blockSize, ParallelTransferOptions originalOptions) { - boolean enableHtbbOptimization = originalOptions == null || originalOptions.getBlockSize() == null; + private List sliceFile(long fileSize, Integer originalBlockSize, int blockSize) { List ranges = new ArrayList<>(); - if (fileSize > 100 * Constants.MB && enableHtbbOptimization) { + if (fileSize > 100 * Constants.MB && originalBlockSize == null) { blockSize = BLOB_DEFAULT_HTBB_UPLOAD_BLOCK_SIZE; } for (long pos = 0; pos < fileSize; pos += blockSize) { diff --git a/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/BlobAPITest.groovy b/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/BlobAPITest.groovy index 7a59ec06d6cf..fd5717c13bfb 100644 --- a/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/BlobAPITest.groovy +++ b/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/BlobAPITest.groovy @@ -30,13 +30,13 @@ import com.azure.storage.blob.sas.BlobServiceSasSignatureValues import com.azure.storage.blob.specialized.BlobClientBase import com.azure.storage.blob.specialized.SpecializedBlobClientBuilder import com.azure.storage.common.implementation.Constants +import reactor.core.Exceptions import reactor.core.publisher.Hooks import reactor.test.StepVerifier import spock.lang.Requires import spock.lang.Unroll import java.nio.ByteBuffer -import java.nio.channels.NonWritableChannelException import java.nio.charset.StandardCharsets import java.nio.file.FileAlreadyExistsException import java.nio.file.Files @@ -580,8 +580,22 @@ class BlobAPITest extends APISpec { StepVerifier.create(bac.downloadToFileWithResponse(outFile.toPath().toString(), null, options, null, null, false) .doOnSubscribe({ bac.upload(defaultFlux, defaultDataSize, true).delaySubscription(Duration.ofMillis(500)).subscribe() })) .verifyErrorSatisfies({ - assert it instanceof BlobStorageException - assert ((BlobStorageException) it).getStatusCode() == 412 + /* + * If an operation is running on multiple threads and multiple return an exception Reactor will combine + * them into a CompositeException which needs to be unwrapped. If there is only a single exception + * 'Exceptions.unwrapMultiple' will return a singleton list of the exception it was passed. + * + * These exceptions may be wrapped exceptions where the exception we are expecting is contained within + * ReactiveException that needs to be unwrapped. If the passed exception isn't a 'ReactiveException' it + * will be returned unmodified by 'Exceptions.unwrap'. + */ + assert Exceptions.unwrapMultiple(it).stream().anyMatch({ it2 -> + def exception = Exceptions.unwrap(it2) + if (exception instanceof BlobStorageException) { + assert ((BlobStorageException) exception).getStatusCode() == 412 + return true + } + }) }) // Give the file a chance to be deleted by the download operation before verifying its deletion diff --git a/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/specialized/BlockBlobAPITest.groovy b/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/specialized/BlockBlobAPITest.groovy index 5f98a26dcb9d..abaf8ee58289 100644 --- a/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/specialized/BlockBlobAPITest.groovy +++ b/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/specialized/BlockBlobAPITest.groovy @@ -760,22 +760,41 @@ class BlockBlobAPITest extends APISpec { file.delete() } + /* + * Reports the number of bytes sent when uploading a file. This is different than other reporters which track the + * number of reportings as upload from file hooks into the loading data from disk data stream which is a hard-coded + * read size. + */ + class FileUploadReporter implements ProgressReceiver { + private long reportedByteCount + + @Override + void reportProgress(long bytesTransferred) { + this.reportedByteCount += bytesTransferred + } + + long getReportedByteCount() { + return this.reportedByteCount + } + } + @Unroll @Requires({ liveMode() }) def "Upload from file reporter"() { when: - def uploadReporter = new Reporter(blockSize) + def uploadReporter = new FileUploadReporter() def file = getRandomFile(size) ParallelTransferOptions parallelTransferOptions = new ParallelTransferOptions(blockSize, bufferCount, - uploadReporter) + uploadReporter, blockSize - 1) then: StepVerifier.create(blobAsyncClient.uploadFromFile(file.toPath().toString(), parallelTransferOptions, null, null, null, null)) - .assertNext({ - assert uploadReporter.getReportingCount() == (long) (size / blockSize) - }).verifyComplete() + .verifyComplete() + + // Check if the reported size is equal to or grater than the file size in case there are retries. + uploadReporter.getReportedByteCount() >= size cleanup: file.delete() @@ -809,7 +828,7 @@ class BlockBlobAPITest extends APISpec { where: dataSize | singleUploadSize | blockSize || expectedBlockCount BlockBlobAsyncClient.MAX_UPLOAD_BLOB_BYTES - 1 | null | null || 0 // Test that the default for singleUploadSize is the maximum - BlockBlobAsyncClient.MAX_UPLOAD_BLOB_BYTES + 1 | null | null || Math.ceil(((double) BlockBlobAsyncClient.MAX_UPLOAD_BLOB_BYTES + 1) / (double) BlobClient.BLOB_DEFAULT_UPLOAD_BLOCK_SIZE) // "". This also validates the default for blockSize + BlockBlobAsyncClient.MAX_UPLOAD_BLOB_BYTES + 1 | null | null || Math.ceil(((double) BlockBlobAsyncClient.MAX_UPLOAD_BLOB_BYTES + 1) / (double) BlobClient.BLOB_DEFAULT_HTBB_UPLOAD_BLOCK_SIZE) // "". This also validates the default for blockSize 100 | 50 | null || 1 // Test that singleUploadSize is respected 100 | 50 | 20 || 5 // Test that blockSize is respected } @@ -1056,7 +1075,7 @@ class BlockBlobAPITest extends APISpec { def "Async buffered upload"() { when: def data = getRandomData(dataSize) - ParallelTransferOptions parallelTransferOptions = new ParallelTransferOptions(bufferSize, numBuffs, null) + ParallelTransferOptions parallelTransferOptions = new ParallelTransferOptions(bufferSize, numBuffs, null, 4 * Constants.MB) blobAsyncClient.upload(Flux.just(data), parallelTransferOptions, true).block() data.position(0) @@ -1127,7 +1146,7 @@ class BlockBlobAPITest extends APISpec { def uploadReporter = new Reporter(blockSize) ParallelTransferOptions parallelTransferOptions = new ParallelTransferOptions(blockSize, bufferCount, - uploadReporter) + uploadReporter, 4 * Constants.MB) then: StepVerifier.create(blobAsyncClient.uploadWithResponse(Flux.just(getRandomData(size)), parallelTransferOptions, @@ -1159,8 +1178,8 @@ class BlockBlobAPITest extends APISpec { it will be chunked appropriately. */ setup: - ParallelTransferOptions parallelTransferOptions = new ParallelTransferOptions(bufferSize * Constants.MB, numBuffers, null) - def dataList = [] as List + ParallelTransferOptions parallelTransferOptions = new ParallelTransferOptions(bufferSize * Constants.MB, numBuffers, null, 4 * Constants.MB) + def dataList = [] as List dataSizeList.each { size -> dataList.add(getRandomData(size * Constants.MB)) } def uploadOperation = blobAsyncClient.upload(Flux.fromIterable(dataList), parallelTransferOptions, true) @@ -1188,7 +1207,7 @@ class BlockBlobAPITest extends APISpec { setup: def dataList = [] as List dataSizeList.each { size -> dataList.add(getRandomData(size)) } - def uploadOperation = blobAsyncClient.upload(Flux.fromIterable(dataList), null, true) + def uploadOperation = blobAsyncClient.upload(Flux.fromIterable(dataList), new ParallelTransferOptions(null, null, null, 4 * Constants.MB), true) expect: StepVerifier.create(uploadOperation.then(collectBytesInBuffer(blockBlobAsyncClient.download()))) @@ -1213,7 +1232,7 @@ class BlockBlobAPITest extends APISpec { setup: def dataList = [] as List dataSizeList.each { size -> dataList.add(getRandomData(size)) } - def uploadOperation = blobAsyncClient.upload(Flux.fromIterable(dataList).publish().autoConnect(), null, true) + def uploadOperation = blobAsyncClient.upload(Flux.fromIterable(dataList).publish().autoConnect(), new ParallelTransferOptions(null, null, null, 4 * Constants.MB), true) expect: StepVerifier.create(uploadOperation.then(collectBytesInBuffer(blockBlobAsyncClient.download()))) @@ -1260,7 +1279,7 @@ class BlockBlobAPITest extends APISpec { when: def data = getRandomByteArray(dataSize) def contentMD5 = validateContentMD5 ? MessageDigest.getInstance("MD5").digest(data) : null - def uploadOperation = blobAsyncClient.uploadWithResponse(Flux.just(ByteBuffer.wrap(data)), null, new BlobHttpHeaders() + def uploadOperation = blobAsyncClient.uploadWithResponse(Flux.just(ByteBuffer.wrap(data)), new ParallelTransferOptions(null, null, null, 4 * Constants.MB), new BlobHttpHeaders() .setCacheControl(cacheControl) .setContentDisposition(contentDisposition) .setContentEncoding(contentEncoding) diff --git a/sdk/storage/azure-storage-queue/src/test/java/com/azure/storage/queue/QueueServiceAPITests.groovy b/sdk/storage/azure-storage-queue/src/test/java/com/azure/storage/queue/QueueServiceAPITests.groovy index a4351a7a6de0..9edbd3b3cf52 100644 --- a/sdk/storage/azure-storage-queue/src/test/java/com/azure/storage/queue/QueueServiceAPITests.groovy +++ b/sdk/storage/azure-storage-queue/src/test/java/com/azure/storage/queue/QueueServiceAPITests.groovy @@ -148,11 +148,8 @@ class QueueServiceAPITests extends APISpec { } def "List empty queues"() { - when: - System.out.println(methodName); - primaryQueueServiceClient.getQueueClient(testResourceName.randomName(methodName, 60)) - - then: + expect: + // Queue was never made with the prefix, should expect no queues to be listed. !primaryQueueServiceClient.listQueues(new QueuesSegmentOptions().setPrefix(methodName), null, null).iterator().hasNext() } diff --git a/sdk/storage/azure-storage-queue/src/test/java/com/azure/storage/queue/QueueServiceAsyncAPITests.groovy b/sdk/storage/azure-storage-queue/src/test/java/com/azure/storage/queue/QueueServiceAsyncAPITests.groovy index 0bf5c231ba95..a607a150a06b 100644 --- a/sdk/storage/azure-storage-queue/src/test/java/com/azure/storage/queue/QueueServiceAsyncAPITests.groovy +++ b/sdk/storage/azure-storage-queue/src/test/java/com/azure/storage/queue/QueueServiceAsyncAPITests.groovy @@ -155,10 +155,9 @@ class QueueServiceAsyncAPITests extends APISpec { } def "List empty queues"() { - when: - def listQueueVerifier = StepVerifier.create((primaryQueueServiceAsyncClient.listQueues(new QueuesSegmentOptions()))) - then: - listQueueVerifier + expect: + // Queue was never made with the prefix, should expect no queues to be listed. + StepVerifier.create(primaryQueueServiceAsyncClient.listQueues(new QueuesSegmentOptions().setPrefix(methodName))) .expectNextCount(0) .verifyComplete() } diff --git a/sdk/storage/azure-storage-queue/src/test/resources/session-records/QueueServiceAPITestsListEmptyQueues.json b/sdk/storage/azure-storage-queue/src/test/resources/session-records/QueueServiceAPITestsListEmptyQueues.json index 49527244326c..21030cd40773 100644 --- a/sdk/storage/azure-storage-queue/src/test/resources/session-records/QueueServiceAPITestsListEmptyQueues.json +++ b/sdk/storage/azure-storage-queue/src/test/resources/session-records/QueueServiceAPITestsListEmptyQueues.json @@ -3,23 +3,24 @@ "Method" : "GET", "Uri" : "https://azstoragesdkaccount.queue.core.windows.net?prefix=QueueServiceAPITestsListEmptyQueues&include=&comp=list", "Headers" : { - "x-ms-version" : "2018-03-28", - "User-Agent" : "azsdk-java-azure-storage-queue/12.0.0-preview.5 11.0.4; Windows 10 10.0", - "x-ms-client-request-id" : "88c5dc85-ff1a-4a05-8080-5d76127bb06a" + "x-ms-version" : "2019-02-02", + "User-Agent" : "azsdk-java-azure-storage-queue/12.2.0 (11.0.4; Windows 10 10.0)", + "x-ms-client-request-id" : "3c697395-20a7-4ff6-9537-41ad304265bb" }, "Response" : { "Transfer-Encoding" : "chunked", - "x-ms-version" : "2018-03-28", + "x-ms-version" : "2019-02-02", "Server" : "Windows-Azure-Queue/1.0 Microsoft-HTTPAPI/2.0", "Cache-Control" : "no-cache", "retry-after" : "0", "StatusCode" : "200", - "x-ms-request-id" : "102a03ba-6003-004b-668d-8301ec000000", + "x-ms-request-id" : "fa3e26ab-c003-0046-0228-c7eee0000000", "Body" : "QueueServiceAPITestsListEmptyQueues", - "Date" : "Tue, 15 Oct 2019 19:20:00 GMT", + "Date" : "Thu, 09 Jan 2020 20:10:26 GMT", + "x-ms-client-request-id" : "3c697395-20a7-4ff6-9537-41ad304265bb", "Content-Type" : "application/xml" }, "Exception" : null } ], - "variables" : [ "queueserviceapitestslistemptyqueues31768c4b32dabda" ] + "variables" : [ ] } \ No newline at end of file diff --git a/sdk/storage/azure-storage-queue/src/test/resources/session-records/QueueServiceAsyncAPITestsListEmptyQueues.json b/sdk/storage/azure-storage-queue/src/test/resources/session-records/QueueServiceAsyncAPITestsListEmptyQueues.json index 81fa19c37320..2fe3821006b0 100644 --- a/sdk/storage/azure-storage-queue/src/test/resources/session-records/QueueServiceAsyncAPITestsListEmptyQueues.json +++ b/sdk/storage/azure-storage-queue/src/test/resources/session-records/QueueServiceAsyncAPITestsListEmptyQueues.json @@ -1,11 +1,11 @@ { "networkCallRecords" : [ { "Method" : "GET", - "Uri" : "https://azstoragesdkaccount.queue.core.windows.net?include=&comp=list", + "Uri" : "https://azstoragesdkaccount.queue.core.windows.net?prefix=QueueServiceAsyncAPITestsListEmptyQueues&include=&comp=list", "Headers" : { "x-ms-version" : "2019-02-02", - "User-Agent" : "azsdk-java-azure-storage-queue/12.1.0-beta.1 11.0.4; Windows 10 10.0", - "x-ms-client-request-id" : "0bc9e540-beab-4dfb-9054-9fd8309b1101" + "User-Agent" : "azsdk-java-azure-storage-queue/12.2.0 (11.0.4; Windows 10 10.0)", + "x-ms-client-request-id" : "7ade11c8-92cf-4df2-81ec-326410311e5a" }, "Response" : { "Transfer-Encoding" : "chunked", @@ -14,10 +14,10 @@ "Cache-Control" : "no-cache", "retry-after" : "0", "StatusCode" : "200", - "x-ms-request-id" : "726ffc85-0003-00d3-48e6-988fd3000000", - "Body" : "", - "Date" : "Mon, 11 Nov 2019 23:20:55 GMT", - "x-ms-client-request-id" : "0bc9e540-beab-4dfb-9054-9fd8309b1101", + "x-ms-request-id" : "5dffa26a-f003-0105-2228-c7825c000000", + "Body" : "QueueServiceAsyncAPITestsListEmptyQueues", + "Date" : "Thu, 09 Jan 2020 20:10:31 GMT", + "x-ms-client-request-id" : "7ade11c8-92cf-4df2-81ec-326410311e5a", "Content-Type" : "application/xml" }, "Exception" : null