Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] Fix download bugs during download benchmark #134

Merged
merged 7 commits into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 14 additions & 17 deletions python/aibrix/aibrix/downloader/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,11 @@ def download_directory(self, local_path: Path):

if not self._support_range_download():
# download using multi threads
st = time.perf_counter()
num_threads = envs.DOWNLOADER_NUM_THREADS
logger.info(
f"Downloader {self.__class__.__name__} does not support "
f"range download, use {num_threads} threads to download."
f"Downloader {self.__class__.__name__} download "
f"{len(filtered_files)} files from {self.model_uri} "
f"use {num_threads} threads to download."
)

executor = ThreadPoolExecutor(num_threads)
Expand All @@ -108,26 +108,16 @@ def download_directory(self, local_path: Path):
for file in filtered_files
]
wait(futures)
duration = time.perf_counter() - st

else:
logger.info(
f"Downloader {self.__class__.__name__} download "
f"{len(filtered_files)} files from {self.model_uri} "
f"using {num_threads} threads, "
f"duration: {duration:.2f} seconds."
f"using range support methods."
)

else:
st = time.perf_counter()
for file in filtered_files:
# use range download to speedup download
self.download(local_path, file, self.bucket_name, True)
duration = time.perf_counter() - st
logger.info(
f"Downloader {self.__class__.__name__} download "
f"{len(filtered_files)} files from {self.model_uri} "
f"using range support methods, "
f"duration: {duration:.2f} seconds."
)

def download_model(self, local_path: Optional[str] = None):
if local_path is None:
Expand All @@ -139,7 +129,7 @@ def download_model(self, local_path: Optional[str] = None):
model_path.mkdir(parents=True, exist_ok=True)

# TODO check local file exists

st = time.perf_counter()
if self._is_directory():
self.download_directory(local_path=model_path)
else:
Expand All @@ -149,6 +139,13 @@ def download_model(self, local_path: Optional[str] = None):
bucket_name=self.bucket_name,
enable_range=self._support_range_download(),
)
duration = time.perf_counter() - st
logger.info(
f"Downloader {self.__class__.__name__} download "
f"from {self.model_uri} "
f"duration: {duration:.2f} seconds."
)

return model_path

def __hash__(self):
Expand Down
13 changes: 13 additions & 0 deletions python/aibrix/aibrix/downloader/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import boto3
from boto3.s3.transfer import TransferConfig
from botocore.config import Config, MAX_POOL_CONNECTIONS
from tqdm import tqdm

from aibrix import envs
Expand All @@ -40,12 +41,24 @@ def __init__(self, model_uri):
region = envs.DOWNLOADER_AWS_REGION
bucket_name, bucket_path = _parse_bucket_info_from_uri(model_uri)

# Avoid warning log "Connection pool is full"
# Refs: https://github.com/boto/botocore/issues/619#issuecomment-583511406
max_pool_connections = (
envs.DOWNLOADER_NUM_THREADS
if envs.DOWNLOADER_NUM_THREADS > MAX_POOL_CONNECTIONS
else MAX_POOL_CONNECTIONS
)
client_config = Config(
max_pool_connections=max_pool_connections,
)

self.client = boto3.client(
service_name="s3",
region_name=region,
endpoint_url=endpoint,
aws_access_key_id=ak,
aws_secret_access_key=sk,
config=client_config,
)

super().__init__(
Expand Down
9 changes: 6 additions & 3 deletions python/aibrix/aibrix/downloader/tos.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,12 @@ def __init__(self, model_uri):
sk = envs.DOWNLOADER_TOS_SECRET_KEY
endpoint = envs.DOWNLOADER_TOS_ENDPOINT or ""
region = envs.DOWNLOADER_TOS_REGION or ""
enable_crc = envs.DOWNLOADER_TOS_ENABLE_CRC
bucket_name, bucket_path = _parse_bucket_info_from_uri(model_uri)

self.client = tos.TosClientV2(ak=ak, sk=sk, endpoint=endpoint, region=region)
self.client = tos.TosClientV2(
ak=ak, sk=sk, endpoint=endpoint, region=region, enable_crc=enable_crc
)

super().__init__(
model_uri=model_uri,
Expand Down Expand Up @@ -109,8 +112,8 @@ def download(
task_num = envs.DOWNLOADER_NUM_THREADS if enable_range else 1

download_kwargs = {}
if envs.DOWNLOADER_PART_THRESHOLD is not None:
download_kwargs["multipart_threshold"] = envs.DOWNLOADER_PART_THRESHOLD
if envs.DOWNLOADER_PART_CHUNKSIZE is not None:
download_kwargs["part_size"] = envs.DOWNLOADER_PART_CHUNKSIZE

# download file
total_length = meta_data.content_length
Expand Down
1 change: 1 addition & 0 deletions python/aibrix/aibrix/envs.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ def _parse_int_or_none(value: Optional[str]) -> Optional[int]:
DOWNLOADER_TOS_SECRET_KEY = os.getenv("TOS_SECRET_KEY")
DOWNLOADER_TOS_ENDPOINT = os.getenv("TOS_ENDPOINT")
DOWNLOADER_TOS_REGION = os.getenv("TOS_REGION")
DOWNLOADER_TOS_ENABLE_CRC = _is_true(os.getenv("TOS_ENABLE_CRC"))

# Downloader AWS S3 Envs
DOWNLOADER_AWS_ACCESS_KEY = os.getenv("AWS_ACCESS_KEY_ID")
Expand Down
9 changes: 8 additions & 1 deletion runtime.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,15 @@ FROM ${BASE_IMAGE} AS base

WORKDIR /app

# Install Poetry
ARG POETRY_VERSION=1.8.3

# Install dependencies
RUN apt-get update \
&& apt-get install -y python3-dev build-essential \
&& apt-get clean && \
rm -rf /var/lib/apt/lists/*

# Install Poetry
RUN python3 -m pip install poetry==${POETRY_VERSION}

# Copy the runtime source
Expand Down
Loading