Skip to content

Commit

Permalink
Merge pull request Azure#242 from rickle-msft/dev
Browse files Browse the repository at this point in the history
Added MarkableFileStream to improve memory efficiency of upload and u…
  • Loading branch information
jofriedm-msft authored Dec 13, 2017
2 parents 5b14642 + 5e43425 commit fab3d2f
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
*/
package com.microsoft.azure.storage.blob;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
Expand Down Expand Up @@ -310,4 +313,55 @@ public void eventOccurred(SendingRequestEvent eventArg) {
container.deleteIfExists();
}
}

@Test
@Category({ DevFabricTests.class, DevStoreTests.class, CloudTests.class })
public void testUploadBlobFromFileSinglePut() throws URISyntaxException, StorageException, IOException {
CloudBlobClient bClient = BlobTestHelper.createCloudBlobClient();

final ArrayList<Boolean> callList = new ArrayList<Boolean>();
OperationContext sendingRequestEventContext = new OperationContext();
sendingRequestEventContext.getSendingRequestEventHandler().addListener(new StorageEvent<SendingRequestEvent>() {

@Override
public void eventOccurred(SendingRequestEvent eventArg) {
assertEquals(eventArg.getRequestResult(), eventArg.getOpContext().getLastResult());
callList.add(true);
}
});

assertEquals(0, callList.size());

CloudBlobContainer container = null;
File sourceFile = File.createTempFile("sourceFile", ".tmp");
File destinationFile = new File(sourceFile.getParentFile(), "destinationFile.tmp");
try {
container = bClient.getContainerReference(BlobTestHelper.generateRandomContainerName());
container.createIfNotExists();
CloudBlockBlob blob = container.getBlockBlobReference(BlobTestHelper
.generateRandomBlobNameWithPrefix("uploadThreshold"));

sourceFile = File.createTempFile("sourceFile", ".tmp");
destinationFile = new File(sourceFile.getParentFile(), "destinationFile.tmp");

int fileSize = 10 * 1024;
byte[] buffer = BlobTestHelper.getRandomBuffer(fileSize);
FileOutputStream fos = new FileOutputStream(sourceFile);
fos.write(buffer);
fos.close();

// This should make a single call even though FileInputStream is not seekable because of the optimizations
// from wrapping it in a MarkableFileInputStream
blob.upload(new FileInputStream(sourceFile), fileSize - 1, null, null, sendingRequestEventContext);

assertEquals(1, callList.size());
}
finally {
container.deleteIfExists();

if (sourceFile.exists()) {
sourceFile.delete();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1765,7 +1765,7 @@ public void uploadFromFile(final String path, final AccessCondition accessCondit
OperationContext opContext) throws StorageException, IOException {
File file = new File(path);
long fileLength = file.length();
InputStream inputStream = new BufferedInputStream(new FileInputStream(file));
InputStream inputStream = new FileInputStream(file); // The call to upload supports FileInputStream efficiently.
this.upload(inputStream, fileLength, accessCondition, options, opContext);
inputStream.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.FileInputStream;
import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URISyntaxException;
Expand Down Expand Up @@ -644,8 +645,16 @@ public void upload(final InputStream sourceStream, final long length, final Acce

StreamMd5AndLength descriptor = new StreamMd5AndLength();
descriptor.setLength(length);

InputStream inputDataStream = sourceStream;

// If the sourceStream is a FileInputStream, wrap it in a MarkableFileStream.
// This allows for single shot upload on FileInputStreams.
InputStream inputDataStream;
if(!sourceStream.markSupported() && sourceStream instanceof FileInputStream) {
inputDataStream = new MarkableFileStream((FileInputStream)sourceStream);
}
else {
inputDataStream = sourceStream;
}

// Initial check - skip the PutBlob operation if the input stream isn't markable, or if the length is known to
// be greater than the threshold.
Expand Down Expand Up @@ -1018,27 +1027,37 @@ public void uploadBlock(final String blockId, final InputStream sourceStream, fi
throw new IllegalArgumentException(SR.INVALID_BLOCK_ID);
}

if (sourceStream.markSupported()) {
// If the sourceStream is a FileInputStream, wrap it in a MarkableFileStream.
// This prevents buffering the entire block into memory.
InputStream bufferedStreamReference;
if(!sourceStream.markSupported() && sourceStream instanceof FileInputStream) {
bufferedStreamReference = new MarkableFileStream((FileInputStream)sourceStream);
}
else {
bufferedStreamReference = sourceStream;
}

if (bufferedStreamReference.markSupported()) {
// Mark sourceStream for current position.
sourceStream.mark(Constants.MAX_MARK_LENGTH);
bufferedStreamReference.mark(Constants.MAX_MARK_LENGTH);
}

InputStream bufferedStreamReference = sourceStream;
StreamMd5AndLength descriptor = new StreamMd5AndLength();
descriptor.setLength(length);

if (!sourceStream.markSupported()) {
if (!bufferedStreamReference.markSupported()) {
// needs buffering
// TODO: Change to a BufferedInputStream to avoid the extra buffering and copying.
final ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
descriptor = Utility.writeToOutputStream(sourceStream, byteStream, length, false /* rewindSourceStream */,
descriptor = Utility.writeToOutputStream(bufferedStreamReference, byteStream, length, false /* rewindSourceStream */,
options.getUseTransactionalContentMD5(), opContext, options);

bufferedStreamReference = new ByteArrayInputStream(byteStream.toByteArray());
}
else if (length < 0 || options.getUseTransactionalContentMD5()) {
// If the stream is of unknown length or we need to calculate the
// MD5, then we we need to read the stream contents first
descriptor = Utility.analyzeStream(sourceStream, length, -1L, true /* rewindSourceStream */,
descriptor = Utility.analyzeStream(bufferedStreamReference, length, -1L, true /* rewindSourceStream */,
options.getUseTransactionalContentMD5());
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/**
* Copyright Microsoft Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.microsoft.azure.storage.core;

import java.io.FileInputStream;
import java.io.FilterInputStream;
import java.io.IOException;
import java.nio.channels.FileChannel;

/**
* RESERVED FOR INTERNAL USE. Wraps a FileStream to allow for more memory efficient uploading.
*/
public final class MarkableFileStream extends FilterInputStream {
private long mark = -1;
private FileChannel fileChannel;

public MarkableFileStream(FileInputStream stream) {
super(stream);
this.fileChannel = stream.getChannel();
}

@Override
public synchronized void mark(int readlimit) {
try {
this.mark = this.fileChannel.position();
}
catch (IOException e) {
this.mark = -1;
}
}

@Override
public synchronized void reset() throws IOException {
if(this.mark == -1){
throw new IOException("Stream must be marked before calling reset");
}

this.fileChannel.position(this.mark);
}

@Override
public boolean markSupported() {
return true;
}
}

0 comments on commit fab3d2f

Please sign in to comment.