Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor max_connections to max_concurrency #7531

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions sdk/storage/azure-storage-blob/azure/storage/blob/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def upload_blob_to_url(
blob_url, # type: str
data, # type: Union[Iterable[AnyStr], IO[AnyStr]]
overwrite=False, # type: bool
max_connections=1, # type: int
max_concurrency=1, # type: int
encoding='UTF-8', # type: str
credential=None, # type: Any
**kwargs):
Expand Down Expand Up @@ -125,22 +125,22 @@ def upload_blob_to_url(
data=data,
blob_type=BlobType.BlockBlob,
overwrite=overwrite,
max_connections=max_connections,
max_concurrency=max_concurrency,
encoding=encoding,
**kwargs)


def _download_to_stream(client, handle, max_connections, **kwargs):
def _download_to_stream(client, handle, max_concurrency, **kwargs):
"""Download data to specified open file-handle."""
stream = client.download_blob(**kwargs)
stream.download_to_stream(handle, max_connections=max_connections)
stream.download_to_stream(handle, max_concurrency=max_concurrency)


def download_blob_from_url(
blob_url, # type: str
output, # type: str
overwrite=False, # type: bool
max_connections=1, # type: int
max_concurrency=1, # type: int
credential=None, # type: Any
**kwargs):
# type: (...) -> None
Expand All @@ -166,9 +166,9 @@ def download_blob_from_url(
"""
with BlobClient(blob_url, credential=credential) as client:
if hasattr(output, 'write'):
_download_to_stream(client, output, max_connections, **kwargs)
_download_to_stream(client, output, max_concurrency, **kwargs)
else:
if not overwrite and os.path.isfile(output):
raise ValueError("The file '{}' already exists.".format(output))
with open(output, 'wb') as file_handle:
_download_to_stream(client, file_handle, max_connections, **kwargs)
_download_to_stream(client, file_handle, max_concurrency, **kwargs)
Original file line number Diff line number Diff line change
Expand Up @@ -383,32 +383,32 @@ def _initial_request(self):

return response

def content_as_bytes(self, max_connections=1):
def content_as_bytes(self, max_concurrency=1):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess moving this parameter to the constructor will be a separate PR specifically for the download stream API?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I'll do it here #7504

"""Download the contents of this file.

This operation is blocking until all data is downloaded.

:param int max_connections:
:param int max_concurrency:
The number of parallel connections with which to download.
:rtype: bytes
"""
stream = BytesIO()
self.download_to_stream(stream, max_connections=max_connections)
self.download_to_stream(stream, max_concurrency=max_concurrency)
return stream.getvalue()

def content_as_text(self, max_connections=1, encoding="UTF-8"):
def content_as_text(self, max_concurrency=1, encoding="UTF-8"):
"""Download the contents of this file, and decode as text.

This operation is blocking until all data is downloaded.

:param int max_connections:
:param int max_concurrency:
The number of parallel connections with which to download.
:rtype: str
"""
content = self.content_as_bytes(max_connections=max_connections)
content = self.content_as_bytes(max_concurrency=max_concurrency)
return content.decode(encoding)

def download_to_stream(self, stream, max_connections=1):
def download_to_stream(self, stream, max_concurrency=1):
"""Download the contents of this file to a stream.

:param stream:
Expand All @@ -419,7 +419,7 @@ def download_to_stream(self, stream, max_connections=1):
:rtype: Any
"""
# the stream must be seekable if parallel download is required
if max_connections > 1:
if max_concurrency > 1:
error_message = "Target stream handle must be seekable."
if sys.version_info >= (3,) and not stream.seekable():
raise ValueError(error_message)
Expand Down Expand Up @@ -447,7 +447,7 @@ def download_to_stream(self, stream, max_connections=1):
# Use the length unless it is over the end of the file
data_end = min(self.file_size, self.length + 1)

downloader_class = ParallelChunkDownloader if max_connections > 1 else SequentialChunkDownloader
downloader_class = ParallelChunkDownloader if max_concurrency > 1 else SequentialChunkDownloader
downloader = downloader_class(
service=self.service,
total_size=self.download_size,
Expand All @@ -462,9 +462,9 @@ def download_to_stream(self, stream, max_connections=1):
**self.request_options
)

if max_connections > 1:
if max_concurrency > 1:
import concurrent.futures
executor = concurrent.futures.ThreadPoolExecutor(max_connections)
executor = concurrent.futures.ThreadPoolExecutor(max_concurrency)
list(executor.map(
with_current_context(downloader.process_chunk),
downloader.get_chunk_offsets()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,32 +328,32 @@ async def _initial_request(self):
self._download_complete = True
return response

async def content_as_bytes(self, max_connections=1):
async def content_as_bytes(self, max_concurrency=1):
"""Download the contents of this file.

This operation is blocking until all data is downloaded.

:param int max_connections:
:param int max_concurrency:
The number of parallel connections with which to download.
:rtype: bytes
"""
stream = BytesIO()
await self.download_to_stream(stream, max_connections=max_connections)
await self.download_to_stream(stream, max_concurrency=max_concurrency)
return stream.getvalue()

async def content_as_text(self, max_connections=1, encoding='UTF-8'):
async def content_as_text(self, max_concurrency=1, encoding='UTF-8'):
"""Download the contents of this file, and decode as text.

This operation is blocking until all data is downloaded.

:param int max_connections:
:param int max_concurrency:
The number of parallel connections with which to download.
:rtype: str
"""
content = await self.content_as_bytes(max_connections=max_connections)
content = await self.content_as_bytes(max_concurrency=max_concurrency)
return content.decode(encoding)

async def download_to_stream(self, stream, max_connections=1):
async def download_to_stream(self, stream, max_concurrency=1):
"""Download the contents of this file to a stream.

:param stream:
Expand All @@ -367,7 +367,7 @@ async def download_to_stream(self, stream, max_connections=1):
raise ValueError("Stream is currently being iterated.")

# the stream must be seekable if parallel download is required
parallel = max_connections > 1
parallel = max_concurrency > 1
if parallel:
error_message = "Target stream handle must be seekable."
if sys.version_info >= (3,) and not stream.seekable():
Expand Down Expand Up @@ -412,7 +412,7 @@ async def download_to_stream(self, stream, max_connections=1):
dl_tasks = downloader.get_chunk_offsets()
running_futures = [
asyncio.ensure_future(downloader.process_chunk(d))
for d in islice(dl_tasks, 0, max_connections)
for d in islice(dl_tasks, 0, max_concurrency)
]
while running_futures:
# Wait for some download to finish before adding a new one
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def upload_data_chunks(
uploader_class=None,
total_size=None,
chunk_size=None,
max_connections=None,
max_concurrency=None,
stream=None,
validate_content=None,
encryption_options=None,
Expand All @@ -63,7 +63,7 @@ def upload_data_chunks(
kwargs['encryptor'] = encryptor
kwargs['padder'] = padder

parallel = max_connections > 1
parallel = max_concurrency > 1
if parallel and 'modified_access_conditions' in kwargs:
# Access conditions do not work with parallelism
kwargs['modified_access_conditions'] = None
Expand All @@ -77,11 +77,11 @@ def upload_data_chunks(
validate_content=validate_content,
**kwargs)
if parallel:
executor = futures.ThreadPoolExecutor(max_connections)
executor = futures.ThreadPoolExecutor(max_concurrency)
upload_tasks = uploader.get_chunk_streams()
running_futures = [
executor.submit(with_current_context(uploader.process_chunk), u)
for u in islice(upload_tasks, 0, max_connections)
for u in islice(upload_tasks, 0, max_concurrency)
]
range_ids = _parallel_uploads(executor, uploader.process_chunk, upload_tasks, running_futures)
else:
Expand All @@ -96,10 +96,10 @@ def upload_substream_blocks(
uploader_class=None,
total_size=None,
chunk_size=None,
max_connections=None,
max_concurrency=None,
stream=None,
**kwargs):
parallel = max_connections > 1
parallel = max_concurrency > 1
if parallel and 'modified_access_conditions' in kwargs:
# Access conditions do not work with parallelism
kwargs['modified_access_conditions'] = None
Expand All @@ -112,11 +112,11 @@ def upload_substream_blocks(
**kwargs)

if parallel:
executor = futures.ThreadPoolExecutor(max_connections)
executor = futures.ThreadPoolExecutor(max_concurrency)
upload_tasks = uploader.get_substream_blocks()
running_futures = [
executor.submit(with_current_context(uploader.process_substream_block), u)
for u in islice(upload_tasks, 0, max_connections)
for u in islice(upload_tasks, 0, max_concurrency)
]
range_ids = _parallel_uploads(executor, uploader.process_substream_block, upload_tasks, running_futures)
else:
Expand Down Expand Up @@ -420,7 +420,7 @@ def read(self, n):
# or read in just enough data for the current block/sub stream
current_max_buffer_size = min(self._max_buffer_size, self._length - self._position)

# lock is only defined if max_connections > 1 (parallel uploads)
# lock is only defined if max_concurrency > 1 (parallel uploads)
if self._lock:
with self._lock:
# reposition the underlying stream to match the start of the data to read
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ async def upload_data_chunks(
uploader_class=None,
total_size=None,
chunk_size=None,
max_connections=None,
max_concurrency=None,
stream=None,
encryption_options=None,
**kwargs):
Expand All @@ -63,7 +63,7 @@ async def upload_data_chunks(
kwargs['encryptor'] = encryptor
kwargs['padder'] = padder

parallel = max_connections > 1
parallel = max_concurrency > 1
if parallel and 'modified_access_conditions' in kwargs:
# Access conditions do not work with parallelism
kwargs['modified_access_conditions'] = None
Expand All @@ -80,7 +80,7 @@ async def upload_data_chunks(
upload_tasks = uploader.get_chunk_streams()
running_futures = [
asyncio.ensure_future(uploader.process_chunk(u))
for u in islice(upload_tasks, 0, max_connections)
for u in islice(upload_tasks, 0, max_concurrency)
]
range_ids = await _parallel_uploads(uploader.process_chunk, upload_tasks, running_futures)
else:
Expand All @@ -98,10 +98,10 @@ async def upload_substream_blocks(
uploader_class=None,
total_size=None,
chunk_size=None,
max_connections=None,
max_concurrency=None,
stream=None,
**kwargs):
parallel = max_connections > 1
parallel = max_concurrency > 1
if parallel and 'modified_access_conditions' in kwargs:
# Access conditions do not work with parallelism
kwargs['modified_access_conditions'] = None
Expand All @@ -117,7 +117,7 @@ async def upload_substream_blocks(
upload_tasks = uploader.get_substream_blocks()
running_futures = [
asyncio.ensure_future(uploader.process_substream_block(u))
for u in islice(upload_tasks, 0, max_connections)
for u in islice(upload_tasks, 0, max_concurrency)
]
range_ids = await _parallel_uploads(uploader.process_substream_block, upload_tasks, running_futures)
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def upload_block_blob( # pylint: disable=too-many-locals
overwrite=None,
headers=None,
validate_content=None,
max_connections=None,
max_concurrency=None,
blob_settings=None,
encryption_options=None,
**kwargs):
Expand Down Expand Up @@ -121,7 +121,7 @@ def upload_block_blob( # pylint: disable=too-many-locals
uploader_class=BlockBlobChunkUploader,
total_size=length,
chunk_size=blob_settings.max_block_size,
max_connections=max_connections,
max_concurrency=max_concurrency,
stream=stream,
validate_content=validate_content,
encryption_options=encryption_options,
Expand All @@ -133,7 +133,7 @@ def upload_block_blob( # pylint: disable=too-many-locals
uploader_class=BlockBlobChunkUploader,
total_size=length,
chunk_size=blob_settings.max_block_size,
max_connections=max_connections,
max_concurrency=max_concurrency,
stream=stream,
validate_content=validate_content,
**kwargs
Expand Down Expand Up @@ -165,7 +165,7 @@ def upload_page_blob(
overwrite=None,
headers=None,
validate_content=None,
max_connections=None,
max_concurrency=None,
blob_settings=None,
encryption_options=None,
**kwargs):
Expand Down Expand Up @@ -203,7 +203,7 @@ def upload_page_blob(
total_size=length,
chunk_size=blob_settings.max_page_size,
stream=stream,
max_connections=max_connections,
max_concurrency=max_concurrency,
validate_content=validate_content,
encryption_options=encryption_options,
**kwargs)
Expand All @@ -224,7 +224,7 @@ def upload_append_blob( # pylint: disable=unused-argument
overwrite=None,
headers=None,
validate_content=None,
max_connections=None,
max_concurrency=None,
blob_settings=None,
encryption_options=None,
**kwargs):
Expand All @@ -248,7 +248,7 @@ def upload_append_blob( # pylint: disable=unused-argument
total_size=length,
chunk_size=blob_settings.max_block_size,
stream=stream,
max_connections=max_connections,
max_concurrency=max_concurrency,
validate_content=validate_content,
append_position_access_conditions=append_conditions,
**kwargs)
Expand All @@ -274,7 +274,7 @@ def upload_append_blob( # pylint: disable=unused-argument
total_size=length,
chunk_size=blob_settings.max_block_size,
stream=stream,
max_connections=max_connections,
max_concurrency=max_concurrency,
validate_content=validate_content,
append_position_access_conditions=append_conditions,
**kwargs)
Expand Down
Loading