Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Failing Tests #7296

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ private Mono<Response<BlockBlobItem>> determineUploadFullOrChunked(final Flux<By
/*
* Use cutBefore = true as we want to window all data under 4MB into one window.
* Set the prefetch to 'Integer.MAX_VALUE' to leverage an unbounded fetch limit in case there are numerous
* tiny buffers, windowUntil uses a default limit of 256 and once that is hit it will trigger onComplete
* tiny buffers, windowUntil uses a default limit of 256 and once that is hit it will trigger onComplete
* which causes downstream issues.
*/
}, true, Integer.MAX_VALUE)
Expand Down Expand Up @@ -538,6 +538,9 @@ public Mono<Void> uploadFromFile(String filePath, boolean overwrite) {
public Mono<Void> uploadFromFile(String filePath, ParallelTransferOptions parallelTransferOptions,
BlobHttpHeaders headers, Map<String, String> metadata, AccessTier tier,
BlobRequestConditions requestConditions) {
Integer originalBlockSize = (parallelTransferOptions == null)
? null
: parallelTransferOptions.getBlockSize();
final ParallelTransferOptions finalParallelTransferOptions =
ModelHelper.populateAndApplyDefaults(parallelTransferOptions);
try {
Expand All @@ -549,8 +552,8 @@ public Mono<Void> uploadFromFile(String filePath, ParallelTransferOptions parall

// If the file is larger than 256MB chunk it and stage it as blocks.
if (uploadInBlocks(filePath, finalParallelTransferOptions.getMaxSingleUploadSize())) {
return uploadBlocks(fileSize, finalParallelTransferOptions, headers, metadata, tier,
requestConditions, channel, blockBlobAsyncClient);
return uploadBlocks(fileSize, finalParallelTransferOptions, originalBlockSize, headers,
metadata, tier, requestConditions, channel, blockBlobAsyncClient);
} else {
// Otherwise we know it can be sent in a single request reducing network overhead.
return blockBlobAsyncClient.uploadWithResponse(FluxUtil.readFile(channel), fileSize,
Expand Down Expand Up @@ -581,7 +584,7 @@ boolean uploadInBlocks(String filePath, Integer maxSingleUploadSize) {
}

private Mono<Void> uploadBlocks(long fileSize, ParallelTransferOptions parallelTransferOptions,
BlobHttpHeaders headers, Map<String, String> metadata, AccessTier tier,
Integer originalBlockSize, BlobHttpHeaders headers, Map<String, String> metadata, AccessTier tier,
BlobRequestConditions requestConditions, AsynchronousFileChannel channel, BlockBlobAsyncClient client) {
final BlobRequestConditions finalRequestConditions = (requestConditions == null)
? new BlobRequestConditions() : requestConditions;
Expand All @@ -592,8 +595,7 @@ private Mono<Void> uploadBlocks(long fileSize, ParallelTransferOptions parallelT
Lock progressLock = new ReentrantLock();

final SortedMap<Long, String> blockIds = new TreeMap<>();
return Flux.fromIterable(sliceFile(fileSize, parallelTransferOptions.getBlockSize(),
parallelTransferOptions))
return Flux.fromIterable(sliceFile(fileSize, originalBlockSize, parallelTransferOptions.getBlockSize()))
.flatMap(chunk -> {
String blockId = getBlockID();
blockIds.put(chunk.getOffset(), blockId);
Expand Down Expand Up @@ -639,10 +641,9 @@ private String getBlockID() {
return Base64.getEncoder().encodeToString(UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8));
}

private List<BlobRange> sliceFile(long fileSize, int blockSize, ParallelTransferOptions originalOptions) {
boolean enableHtbbOptimization = originalOptions == null || originalOptions.getBlockSize() == null;
private List<BlobRange> sliceFile(long fileSize, Integer originalBlockSize, int blockSize) {
List<BlobRange> ranges = new ArrayList<>();
if (fileSize > 100 * Constants.MB && enableHtbbOptimization) {
if (fileSize > 100 * Constants.MB && originalBlockSize == null) {
blockSize = BLOB_DEFAULT_HTBB_UPLOAD_BLOCK_SIZE;
}
for (long pos = 0; pos < fileSize; pos += blockSize) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ import com.azure.storage.blob.sas.BlobServiceSasSignatureValues
import com.azure.storage.blob.specialized.BlobClientBase
import com.azure.storage.blob.specialized.SpecializedBlobClientBuilder
import com.azure.storage.common.implementation.Constants
import reactor.core.Exceptions
import reactor.core.publisher.Hooks
import reactor.test.StepVerifier
import spock.lang.Requires
import spock.lang.Unroll

import java.nio.ByteBuffer
import java.nio.channels.NonWritableChannelException
import java.nio.charset.StandardCharsets
import java.nio.file.FileAlreadyExistsException
import java.nio.file.Files
Expand Down Expand Up @@ -580,8 +580,22 @@ class BlobAPITest extends APISpec {
StepVerifier.create(bac.downloadToFileWithResponse(outFile.toPath().toString(), null, options, null, null, false)
.doOnSubscribe({ bac.upload(defaultFlux, defaultDataSize, true).delaySubscription(Duration.ofMillis(500)).subscribe() }))
.verifyErrorSatisfies({
assert it instanceof BlobStorageException
assert ((BlobStorageException) it).getStatusCode() == 412
/*
* If an operation is running on multiple threads and multiple return an exception Reactor will combine
* them into a CompositeException which needs to be unwrapped. If there is only a single exception
* 'Exceptions.unwrapMultiple' will return a singleton list of the exception it was passed.
*
* These exceptions may be wrapped exceptions where the exception we are expecting is contained within
* ReactiveException that needs to be unwrapped. If the passed exception isn't a 'ReactiveException' it
* will be returned unmodified by 'Exceptions.unwrap'.
*/
assert Exceptions.unwrapMultiple(it).stream().anyMatch({ it2 ->
def exception = Exceptions.unwrap(it2)
if (exception instanceof BlobStorageException) {
assert ((BlobStorageException) exception).getStatusCode() == 412
return true
}
})
})

// Give the file a chance to be deleted by the download operation before verifying its deletion
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -760,22 +760,41 @@ class BlockBlobAPITest extends APISpec {
file.delete()
}

/*
* Reports the number of bytes sent when uploading a file. This is different than other reporters which track the
* number of reportings as upload from file hooks into the loading data from disk data stream which is a hard-coded
* read size.
*/
class FileUploadReporter implements ProgressReceiver {
private long reportedByteCount

@Override
void reportProgress(long bytesTransferred) {
this.reportedByteCount += bytesTransferred
}

long getReportedByteCount() {
return this.reportedByteCount
}
}

@Unroll
@Requires({ liveMode() })
def "Upload from file reporter"() {
when:
def uploadReporter = new Reporter(blockSize)
def uploadReporter = new FileUploadReporter()
def file = getRandomFile(size)

ParallelTransferOptions parallelTransferOptions = new ParallelTransferOptions(blockSize, bufferCount,
uploadReporter)
uploadReporter, blockSize - 1)

then:
StepVerifier.create(blobAsyncClient.uploadFromFile(file.toPath().toString(), parallelTransferOptions,
null, null, null, null))
.assertNext({
assert uploadReporter.getReportingCount() == (long) (size / blockSize)
}).verifyComplete()
.verifyComplete()

// Check if the reported size is equal to or grater than the file size in case there are retries.
uploadReporter.getReportedByteCount() >= size

cleanup:
file.delete()
Expand Down Expand Up @@ -809,7 +828,7 @@ class BlockBlobAPITest extends APISpec {
where:
dataSize | singleUploadSize | blockSize || expectedBlockCount
BlockBlobAsyncClient.MAX_UPLOAD_BLOB_BYTES - 1 | null | null || 0 // Test that the default for singleUploadSize is the maximum
BlockBlobAsyncClient.MAX_UPLOAD_BLOB_BYTES + 1 | null | null || Math.ceil(((double) BlockBlobAsyncClient.MAX_UPLOAD_BLOB_BYTES + 1) / (double) BlobClient.BLOB_DEFAULT_UPLOAD_BLOCK_SIZE) // "". This also validates the default for blockSize
BlockBlobAsyncClient.MAX_UPLOAD_BLOB_BYTES + 1 | null | null || Math.ceil(((double) BlockBlobAsyncClient.MAX_UPLOAD_BLOB_BYTES + 1) / (double) BlobClient.BLOB_DEFAULT_HTBB_UPLOAD_BLOCK_SIZE) // "". This also validates the default for blockSize
100 | 50 | null || 1 // Test that singleUploadSize is respected
100 | 50 | 20 || 5 // Test that blockSize is respected
}
Expand Down Expand Up @@ -1056,7 +1075,7 @@ class BlockBlobAPITest extends APISpec {
def "Async buffered upload"() {
when:
def data = getRandomData(dataSize)
ParallelTransferOptions parallelTransferOptions = new ParallelTransferOptions(bufferSize, numBuffs, null)
ParallelTransferOptions parallelTransferOptions = new ParallelTransferOptions(bufferSize, numBuffs, null, 4 * Constants.MB)
blobAsyncClient.upload(Flux.just(data), parallelTransferOptions, true).block()
data.position(0)

Expand Down Expand Up @@ -1127,7 +1146,7 @@ class BlockBlobAPITest extends APISpec {
def uploadReporter = new Reporter(blockSize)

ParallelTransferOptions parallelTransferOptions = new ParallelTransferOptions(blockSize, bufferCount,
uploadReporter)
uploadReporter, 4 * Constants.MB)

then:
StepVerifier.create(blobAsyncClient.uploadWithResponse(Flux.just(getRandomData(size)), parallelTransferOptions,
Expand Down Expand Up @@ -1159,8 +1178,8 @@ class BlockBlobAPITest extends APISpec {
it will be chunked appropriately.
*/
setup:
ParallelTransferOptions parallelTransferOptions = new ParallelTransferOptions(bufferSize * Constants.MB, numBuffers, null)
def dataList = [] as List
ParallelTransferOptions parallelTransferOptions = new ParallelTransferOptions(bufferSize * Constants.MB, numBuffers, null, 4 * Constants.MB)
def dataList = [] as List<ByteBuffer>
dataSizeList.each { size -> dataList.add(getRandomData(size * Constants.MB)) }
def uploadOperation = blobAsyncClient.upload(Flux.fromIterable(dataList), parallelTransferOptions, true)

Expand Down Expand Up @@ -1188,7 +1207,7 @@ class BlockBlobAPITest extends APISpec {
setup:
def dataList = [] as List<ByteBuffer>
dataSizeList.each { size -> dataList.add(getRandomData(size)) }
def uploadOperation = blobAsyncClient.upload(Flux.fromIterable(dataList), null, true)
def uploadOperation = blobAsyncClient.upload(Flux.fromIterable(dataList), new ParallelTransferOptions(null, null, null, 4 * Constants.MB), true)

expect:
StepVerifier.create(uploadOperation.then(collectBytesInBuffer(blockBlobAsyncClient.download())))
Expand All @@ -1213,7 +1232,7 @@ class BlockBlobAPITest extends APISpec {
setup:
def dataList = [] as List<ByteBuffer>
dataSizeList.each { size -> dataList.add(getRandomData(size)) }
def uploadOperation = blobAsyncClient.upload(Flux.fromIterable(dataList).publish().autoConnect(), null, true)
def uploadOperation = blobAsyncClient.upload(Flux.fromIterable(dataList).publish().autoConnect(), new ParallelTransferOptions(null, null, null, 4 * Constants.MB), true)

expect:
StepVerifier.create(uploadOperation.then(collectBytesInBuffer(blockBlobAsyncClient.download())))
Expand Down Expand Up @@ -1260,7 +1279,7 @@ class BlockBlobAPITest extends APISpec {
when:
def data = getRandomByteArray(dataSize)
def contentMD5 = validateContentMD5 ? MessageDigest.getInstance("MD5").digest(data) : null
def uploadOperation = blobAsyncClient.uploadWithResponse(Flux.just(ByteBuffer.wrap(data)), null, new BlobHttpHeaders()
def uploadOperation = blobAsyncClient.uploadWithResponse(Flux.just(ByteBuffer.wrap(data)), new ParallelTransferOptions(null, null, null, 4 * Constants.MB), new BlobHttpHeaders()
.setCacheControl(cacheControl)
.setContentDisposition(contentDisposition)
.setContentEncoding(contentEncoding)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,11 +148,8 @@ class QueueServiceAPITests extends APISpec {
}

def "List empty queues"() {
when:
System.out.println(methodName);
primaryQueueServiceClient.getQueueClient(testResourceName.randomName(methodName, 60))

then:
expect:
// Queue was never made with the prefix, should expect no queues to be listed.
!primaryQueueServiceClient.listQueues(new QueuesSegmentOptions().setPrefix(methodName), null, null).iterator().hasNext()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,9 @@ class QueueServiceAsyncAPITests extends APISpec {
}

def "List empty queues"() {
when:
def listQueueVerifier = StepVerifier.create((primaryQueueServiceAsyncClient.listQueues(new QueuesSegmentOptions())))
then:
listQueueVerifier
expect:
// Queue was never made with the prefix, should expect no queues to be listed.
StepVerifier.create(primaryQueueServiceAsyncClient.listQueues(new QueuesSegmentOptions().setPrefix(methodName)))
.expectNextCount(0)
.verifyComplete()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,24 @@
"Method" : "GET",
"Uri" : "https://azstoragesdkaccount.queue.core.windows.net?prefix=QueueServiceAPITestsListEmptyQueues&include=&comp=list",
"Headers" : {
"x-ms-version" : "2018-03-28",
"User-Agent" : "azsdk-java-azure-storage-queue/12.0.0-preview.5 11.0.4; Windows 10 10.0",
"x-ms-client-request-id" : "88c5dc85-ff1a-4a05-8080-5d76127bb06a"
"x-ms-version" : "2019-02-02",
"User-Agent" : "azsdk-java-azure-storage-queue/12.2.0 (11.0.4; Windows 10 10.0)",
"x-ms-client-request-id" : "3c697395-20a7-4ff6-9537-41ad304265bb"
},
"Response" : {
"Transfer-Encoding" : "chunked",
"x-ms-version" : "2018-03-28",
"x-ms-version" : "2019-02-02",
"Server" : "Windows-Azure-Queue/1.0 Microsoft-HTTPAPI/2.0",
"Cache-Control" : "no-cache",
"retry-after" : "0",
"StatusCode" : "200",
"x-ms-request-id" : "102a03ba-6003-004b-668d-8301ec000000",
"x-ms-request-id" : "fa3e26ab-c003-0046-0228-c7eee0000000",
"Body" : "<?xml version=\"1.0\" encoding=\"utf-8\"?><EnumerationResults ServiceEndpoint=\"https://azstoragesdkaccount.queue.core.windows.net/\"><Prefix>QueueServiceAPITestsListEmptyQueues</Prefix><Queues /><NextMarker /></EnumerationResults>",
"Date" : "Tue, 15 Oct 2019 19:20:00 GMT",
"Date" : "Thu, 09 Jan 2020 20:10:26 GMT",
"x-ms-client-request-id" : "3c697395-20a7-4ff6-9537-41ad304265bb",
"Content-Type" : "application/xml"
},
"Exception" : null
} ],
"variables" : [ "queueserviceapitestslistemptyqueues31768c4b32dabda" ]
"variables" : [ ]
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
{
"networkCallRecords" : [ {
"Method" : "GET",
"Uri" : "https://azstoragesdkaccount.queue.core.windows.net?include=&comp=list",
"Uri" : "https://azstoragesdkaccount.queue.core.windows.net?prefix=QueueServiceAsyncAPITestsListEmptyQueues&include=&comp=list",
"Headers" : {
"x-ms-version" : "2019-02-02",
"User-Agent" : "azsdk-java-azure-storage-queue/12.1.0-beta.1 11.0.4; Windows 10 10.0",
"x-ms-client-request-id" : "0bc9e540-beab-4dfb-9054-9fd8309b1101"
"User-Agent" : "azsdk-java-azure-storage-queue/12.2.0 (11.0.4; Windows 10 10.0)",
"x-ms-client-request-id" : "7ade11c8-92cf-4df2-81ec-326410311e5a"
},
"Response" : {
"Transfer-Encoding" : "chunked",
Expand All @@ -14,10 +14,10 @@
"Cache-Control" : "no-cache",
"retry-after" : "0",
"StatusCode" : "200",
"x-ms-request-id" : "726ffc85-0003-00d3-48e6-988fd3000000",
"Body" : "<?xml version=\"1.0\" encoding=\"utf-8\"?><EnumerationResults ServiceEndpoint=\"https://azstoragesdkaccount.queue.core.windows.net/\"><Queues /><NextMarker /></EnumerationResults>",
"Date" : "Mon, 11 Nov 2019 23:20:55 GMT",
"x-ms-client-request-id" : "0bc9e540-beab-4dfb-9054-9fd8309b1101",
"x-ms-request-id" : "5dffa26a-f003-0105-2228-c7825c000000",
"Body" : "<?xml version=\"1.0\" encoding=\"utf-8\"?><EnumerationResults ServiceEndpoint=\"https://azstoragesdkaccount.queue.core.windows.net/\"><Prefix>QueueServiceAsyncAPITestsListEmptyQueues</Prefix><Queues /><NextMarker /></EnumerationResults>",
"Date" : "Thu, 09 Jan 2020 20:10:31 GMT",
"x-ms-client-request-id" : "7ade11c8-92cf-4df2-81ec-326410311e5a",
"Content-Type" : "application/xml"
},
"Exception" : null
Expand Down