Skip to content

Commit

Permalink
[Blob][Fix]fix downloader.chunks() return chunks in different size (#…
Browse files Browse the repository at this point in the history
…17559)

* [Blob][Fix]fix downloader.chunks()

* [FileShare][Fix]fix downloader.chunks()

* [FileShare][Fix]fix downloader.chunks()

* add doc

* modify doc

* modify doc

* Update _download.py

* Update _download.py

* Update _download.py
  • Loading branch information
xiafu-msft authored Apr 20, 2021
1 parent 2ada230 commit 68095e0
Show file tree
Hide file tree
Showing 29 changed files with 34,035 additions and 484 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1027,7 +1027,7 @@ def download_blob(self, blob, offset=None, length=None, **kwargs):
# type: (Union[str, BlobProperties], Optional[int], Optional[int], **Any) -> StorageStreamDownloader
"""Downloads a blob to the StorageStreamDownloader. The readall() method must
be used to read all the content or readinto() must be used to download the blob into
a stream.
a stream. Using chunks() returns an iterator which allows the user to iterate over the content in chunks.
:param blob: The blob with which to interact. If specified, this value will override
a blob value specified in the blob URL.
Expand Down
49 changes: 42 additions & 7 deletions sdk/storage/azure-storage-blob/azure/storage/blob/_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import warnings
from io import BytesIO

from typing import Iterator
from azure.core.exceptions import HttpResponseError
from azure.core.tracing.common import with_current_context
from ._shared.encryption import decrypt_blob
Expand Down Expand Up @@ -213,8 +214,9 @@ def _download_chunk(self, chunk_start, chunk_end):
class _ChunkIterator(object):
"""Async iterator for chunks in blob download stream."""

def __init__(self, size, content, downloader):
def __init__(self, size, content, downloader, chunk_size):
self.size = size
self._chunk_size = chunk_size
self._current_content = content
self._iter_downloader = downloader
self._iter_chunks = None
Expand All @@ -231,21 +233,39 @@ def __next__(self):
if self._complete:
raise StopIteration("Download complete")
if not self._iter_downloader:
# If no iterator was supplied, the download completed with
# the initial GET, so we just return that data
# cut the data obtained from initial GET into chunks
if len(self._current_content) > self._chunk_size:
return self._get_chunk_data()
self._complete = True
return self._current_content

if not self._iter_chunks:
self._iter_chunks = self._iter_downloader.get_chunk_offsets()
else:

# initial GET result still has more than _chunk_size bytes of data
if len(self._current_content) >= self._chunk_size:
return self._get_chunk_data()

try:
chunk = next(self._iter_chunks)
self._current_content = self._iter_downloader.yield_chunk(chunk)
self._current_content += self._iter_downloader.yield_chunk(chunk)
except StopIteration as e:
self._complete = True
if self._current_content:
return self._current_content
raise e

return self._current_content
# the current content from the first get is still there but smaller than chunk size
# therefore we want to make sure its also included
return self._get_chunk_data()

next = __next__ # Python 2 compatibility.

def _get_chunk_data(self):
chunk_data = self._current_content[: self._chunk_size]
self._current_content = self._current_content[self._chunk_size:]
return chunk_data


class StorageStreamDownloader(object): # pylint: disable=too-many-instance-attributes
"""A streaming object to download from Azure Storage.
Expand Down Expand Up @@ -426,6 +446,20 @@ def _initial_request(self):
return response

def chunks(self):
# type: () -> Iterator[bytes]
"""Iterate over chunks in the download stream.
:rtype: Iterator[bytes]
.. admonition:: Example:
.. literalinclude:: ../samples/blob_samples_hello_world.py
:start-after: [START download_a_blob_in_chunk]
:end-before: [END download_a_blob_in_chunk]
:language: python
:dedent: 12
:caption: Download a blob using chunks().
"""
if self.size == 0 or self._download_complete:
iter_downloader = None
else:
Expand All @@ -451,7 +485,8 @@ def chunks(self):
return _ChunkIterator(
size=self.size,
content=self._current_content,
downloader=iter_downloader)
downloader=iter_downloader,
chunk_size=self._config.max_chunk_get_size)

def readall(self):
"""Download the contents of this blob.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ async def download_blob(self, offset=None, length=None, **kwargs):
# type: (Optional[int], Optional[int], Any) -> StorageStreamDownloader
"""Downloads a blob to the StorageStreamDownloader. The readall() method must
be used to read all the content or readinto() must be used to download the blob into
a stream. Using chunks() returns an iterator which allows the user to iterate over the content in chunks.
a stream. Using chunks() returns an async iterator which allows the user to iterate over the content in chunks.
:param int offset:
Start of byte range to use for downloading a section of the blob.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -892,7 +892,7 @@ async def download_blob(self, blob, offset=None, length=None, **kwargs):
# type: (Union[str, BlobProperties], Optional[int], Optional[int], Any) -> StorageStreamDownloader
"""Downloads a blob to the StorageStreamDownloader. The readall() method must
be used to read all the content or readinto() must be used to download the blob into
a stream.
a stream. Using chunks() returns an async iterator which allows the user to iterate over the content in chunks.
:param blob: The blob with which to interact. If specified, this value will override
a blob value specified in the blob URL.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from itertools import islice
import warnings

from typing import AsyncIterator
from azure.core.exceptions import HttpResponseError
from .._shared.encryption import decrypt_blob
from .._shared.request_handlers import validate_and_format_range_headers
Expand Down Expand Up @@ -115,8 +116,9 @@ async def _download_chunk(self, chunk_start, chunk_end):
class _AsyncChunkIterator(object):
"""Async iterator for chunks in blob download stream."""

def __init__(self, size, content, downloader):
def __init__(self, size, content, downloader, chunk_size):
self.size = size
self._chunk_size = chunk_size
self._current_content = content
self._iter_downloader = downloader
self._iter_chunks = None
Expand All @@ -136,21 +138,35 @@ async def __anext__(self):
if self._complete:
raise StopAsyncIteration("Download complete")
if not self._iter_downloader:
# If no iterator was supplied, the download completed with
# the initial GET, so we just return that data
# cut the data obtained from initial GET into chunks
if len(self._current_content) > self._chunk_size:
return self._get_chunk_data()
self._complete = True
return self._current_content

if not self._iter_chunks:
self._iter_chunks = self._iter_downloader.get_chunk_offsets()
else:
try:
chunk = next(self._iter_chunks)
except StopIteration:
raise StopAsyncIteration("Download complete")
self._current_content = await self._iter_downloader.yield_chunk(chunk)

return self._current_content
# initial GET result still has more than _chunk_size bytes of data
if len(self._current_content) >= self._chunk_size:
return self._get_chunk_data()

try:
chunk = next(self._iter_chunks)
self._current_content += await self._iter_downloader.yield_chunk(chunk)
except StopIteration:
self._complete = True
# it's likely that there some data left in self._current_content
if self._current_content:
return self._current_content
raise StopAsyncIteration("Download complete")

return self._get_chunk_data()

def _get_chunk_data(self):
chunk_data = self._current_content[: self._chunk_size]
self._current_content = self._current_content[self._chunk_size:]
return chunk_data


class StorageStreamDownloader(object): # pylint: disable=too-many-instance-attributes
Expand Down Expand Up @@ -325,9 +341,19 @@ async def _initial_request(self):
return response

def chunks(self):
# type: () -> AsyncIterator[bytes]
"""Iterate over chunks in the download stream.
:rtype: Iterable[bytes]
:rtype: AsyncIterator[bytes]
.. admonition:: Example:
.. literalinclude:: ../samples/blob_samples_hello_world_async.py
:start-after: [START download_a_blob_in_chunk]
:end-before: [END download_a_blob_in_chunk]
:language: python
:dedent: 16
:caption: Download a blob using chunks().
"""
if self.size == 0 or self._download_complete:
iter_downloader = None
Expand All @@ -353,7 +379,8 @@ def chunks(self):
return _AsyncChunkIterator(
size=self.size,
content=self._current_content,
downloader=iter_downloader)
downloader=iter_downloader,
chunk_size=self._config.max_chunk_get_size)

async def readall(self):
"""Download the contents of this blob.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ def stream_block_blob(self):
source_blob_client.upload_blob(data, blob_type="BlockBlob")

destination_blob_client = container_client.get_blob_client("destination_blob")

# [START download_a_blob_in_chunk]
# This returns a StorageStreamDownloader.
stream = source_blob_client.download_blob()
block_list = []
Expand All @@ -122,6 +122,8 @@ def stream_block_blob(self):
destination_blob_client.stage_block(block_id=block_id, data=chunk)
block_list.append(BlobBlock(block_id=block_id))

# [END download_a_blob_in_chunk]

# Upload the whole chunk to azure storage and make up one blob
destination_blob_client.commit_block_list(block_list)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ async def stream_block_blob(self):

destination_blob_client = container_client.get_blob_client("destination_blob")

# [START download_a_blob_in_chunk]
# This returns a StorageStreamDownloader.
stream = await source_blob_client.download_blob()
block_list = []
Expand All @@ -126,6 +127,7 @@ async def stream_block_blob(self):
block_id = str(uuid.uuid4())
await destination_blob_client.stage_block(block_id=block_id, data=chunk)
block_list.append(BlobBlock(block_id=block_id))
# [END download_a_blob_in_chunk]

# Upload the whole chunk to azure storage and make up one blob
await destination_blob_client.commit_block_list(block_list)
Expand Down
Loading

0 comments on commit 68095e0

Please sign in to comment.