Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[batch] use volcano TOS as batch storage #344

Merged
merged 9 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions python/aibrix/aibrix/batch/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ def __init__(self):
"""
This is main entrance to bind all components to serve job requests.
"""
_storage.initialize_storage()
self._storage = _storage
self._job_manager = JobManager()
self._scheduler = JobScheduler(self._job_manager, DEFAULT_JOB_POOL_SIZE)
Expand Down
12 changes: 11 additions & 1 deletion python/aibrix/aibrix/batch/storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,27 @@
# See the License for the specific language governing permissions and
# limitations under the License.


from aibrix.batch.storage.generic_storage import StorageType
from aibrix.batch.storage.batch_storage import (
get_job_request_len,
get_storage_job_results,
put_storage_job_results,
read_job_requests,
remove_storage_job_data,
upload_input_data,
initialize_batch_storage,
)


def initialize_storage(storage_type=StorageType.LocalDiskFile, params={}):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: what's the different between initialize_storage and initialize_batch_storage? why do we need two levels?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is to be consistent with other interfaces.

"""Initialize job storage with storage type.

Args:
storage_type: the storage type, such files or TOS
"""
initialize_batch_storage(storage_type, params)


def submit_job_input(inputDataFile):
"""Upload job input data file to storage.

Expand Down
19 changes: 18 additions & 1 deletion python/aibrix/aibrix/batch/storage/batch_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,31 @@

import uuid

# [TODO] Add S3 as another storage
from aibrix.batch.storage.tos_storage import TOSStorage
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please also add S3 later. You can use minio for testing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

from aibrix.batch.storage.generic_storage import LocalDiskFiles
from aibrix.batch.storage.generic_storage import StorageType

current_job_offsets = {}
job_input_requests = {}
p_storage = LocalDiskFiles()
p_storage = None
NUM_REQUESTS_PER_READ = 1024


def initialize_batch_storage(storage_type=StorageType.LocalDiskFile, params={}):
"""Initialize storage type. Now it support files and TOS.
For some storage type, user needs to pass in other parameters to params.
"""
global p_storage
if storage_type == StorageType.LocalDiskFile:
p_storage = LocalDiskFiles()
elif storage_type == StorageType.TOS:
p_storage = TOSStorage(**params)
else:
raise ValueError("Unknown storage type")


def upload_input_data(inputDataFileName):
"""Upload job input data file to storage.
Expand Down
180 changes: 180 additions & 0 deletions python/aibrix/aibrix/batch/storage/tos_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
# Copyright 2024 The Aibrix Team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import tos
from io import StringIO
import json
from aibrix.batch.storage.generic_storage import PersistentStorage


class TOSStorage(PersistentStorage):
def __init__(self, access_key, secret_key, endpoint, region, bucket_name):
"""
This initializes client configuration to a
TOS online bucket. All job information will be submitted to this
TOS bucket. The two input keys are from Volcano TOS account.
"""

self._client = None
self._bucket_name = bucket_name
ak, sk = access_key, secret_key

try:
self._client = tos.TosClientV2(ak, sk, endpoint, region)
logging.info("Finished creating TOS client!!!")
except tos.exceptions.TosClientError as e:
logging.error(
f"fail with client error, message:{e.message}, cause: {e.cause}"
)
except tos.exceptions.TosServerError as e:
logging.error(f"fail with server error, code: {e.code}")
logging.error(f"error with request id: {e.request_id}")
logging.error(f"error with message: {e.message}")
logging.error(f"error with http code: {e.status_code}")
logging.error(f"error with ec: {e.ec}".format())
logging.error(f"error with request url: {e.request_url}")
except Exception as e:
logging.error(f"fail with unknown error: {e}")
logging.error("Attempting to create TOS client failed.")

def write_job_input_data(self, job_id, inputDataFileName):
"""
Each request is an object.
the format is jobID_input_requestId
"""
request_list = []
# Open the JSON file
with open(inputDataFileName, "r") as file:
# Parse JSON data into a Python dictionary
for line in file.readlines():
if len(line) <= 1:
continue
data = json.loads(line)
request_list.append(data)

num_valid_request = len(request_list)
object_prefix = f"{job_id}/input"
try:
for i, req_data in enumerate(request_list):
obj_key = f"{object_prefix}/{i}"
json_str = json.dumps(req_data)
string_io_obj = StringIO(json_str)
self._client.put_object(
self._bucket_name, obj_key, content=string_io_obj
)
except Exception as e:
logging.error(f"TOS write fails with unknown error: {e}")

logging.info(f"TOS receives a job with {num_valid_request} request.")

def read_job_input_data(self, job_id, start_index=0, num_requests=-1):
"""Read job input request data"""

request_inputs = []
object_prefix = f"{job_id}/input"
try:
for i in range(num_requests):
idx = start_index + i
obj_key = f"{object_prefix}/{idx}"
object_stream = self._client.get_object(self._bucket_name, obj_key)
json_obj = json.loads(object_stream.read())
request_inputs.append(json_obj)
except Exception as e:
logging.error(f"TOS reading job input fails with unknown error: {e}")
return request_inputs

def write_job_output_data(self, job_id, start_index, output_list):
"""
Write job results to TOS bucket.
The key of job request output is in the format of
jobID_output_requestId.
"""

object_prefix = f"{job_id}/output"
try:
for i, req_data in enumerate(output_list):
idx = start_index + i
obj_key = f"{object_prefix}/{idx}"
json_str = json.dumps(req_data)
string_io_obj = StringIO(json_str)
self._client.put_object(
self._bucket_name, obj_key, content=string_io_obj
)
except Exception as e:
logging.error("TOS writing output fails with unknown error: {}".format(e))
num_valid_request = len(output_list)
logging.info(f"Write to TOS for job {job_id} with {num_valid_request} request.")

def read_job_output_data(self, job_id, start_index, num_requests):
"""Read job results output from TOS bucket."""

request_results = []
object_prefix = f"{job_id}/output"

try:
for i in range(num_requests):
idx = start_index + i
obj_key = f"{object_prefix}/{idx}"
object_stream = self._client.get_object(self._bucket_name, obj_key)
json_obj = json.loads(object_stream.read())
request_results.append(json_obj)
except Exception as e:
logging.error(f"TOS reading request output fails with unknown error: {e}")
return request_results

def delete_job_data(self, job_id):
"""This deletes all request data for the given job ID,
including both input data and output data.
"""
object_prefix = f"{job_id}/"
is_truncated = True
marker = ""
try:
while is_truncated:
out = self._client.list_objects(
self._bucket_name, prefix=object_prefix, marker=marker
)
for obj in out.contents:
self._client.delete_object(self._bucket_name, obj.key)
is_truncated = out.is_truncated
marker = out.next_marker
except Exception as e:
logging.error(f"Deleting job fails with unknown error: {e}")

def get_job_number_requests(self, job_id):
"""
This read the number of reqeust by listing the number of input requests.
"""
object_prefix = f"{job_id}/input/"
num_requests = 0
try:
is_truncated = True
next_continuation_token = ""

while is_truncated:
out = self._client.list_objects_type2(
self._bucket_name,
delimiter="/",
prefix=object_prefix,
continuation_token=next_continuation_token,
)
is_truncated = out.is_truncated
next_continuation_token = out.next_continuation_token
# This ignore directory in common_prefixes
num_requests += len(out.contents)
except Exception as e:
logging.error(f"Listing number of reqeusts fails with unknown error: {e}")

return num_requests
3 changes: 3 additions & 0 deletions python/aibrix/tests/test_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def generate_input_data(num_requests):
def test_submit_job_input():
num_request = 100
generate_input_data(num_request)
_storage.initialize_storage()
job_id = _storage.submit_job_input("./one_job_input.json")
print("succesfully create job: ", job_id)

Expand All @@ -53,6 +54,7 @@ def test_submit_job_input():
def test_read_job_input():
num_request = 100
generate_input_data(num_request)
_storage.initialize_storage()
job_id = _storage.submit_job_input("./one_job_input.json")
print("succesfully create job: ", job_id)

Expand Down Expand Up @@ -85,6 +87,7 @@ def test_read_job_input():
def test_job_output():
num_request = 100
generate_input_data(num_request)
_storage.initialize_storage()
job_id = _storage.submit_job_input("./one_job_input.json")
print("succesfully create job: ", job_id)

Expand Down
Loading