From a9fafa783f257aac7842df1f2d4d34514ec9f813 Mon Sep 17 00:00:00 2001 From: ethnzhng <26497102+ethnzhng@users.noreply.github.com> Date: Fri, 31 Jan 2025 15:12:21 -0800 Subject: [PATCH] Support different Python executables in Neo dispatcher (#2698) --- serving/docker/partition/sm_neo_dispatcher.py | 29 +++++++++++++------ serving/docker/partition/sm_neo_shard.py | 4 ++- 2 files changed, 23 insertions(+), 10 deletions(-) diff --git a/serving/docker/partition/sm_neo_dispatcher.py b/serving/docker/partition/sm_neo_dispatcher.py index 6216a01ef..db765f9cf 100644 --- a/serving/docker/partition/sm_neo_dispatcher.py +++ b/serving/docker/partition/sm_neo_dispatcher.py @@ -20,6 +20,11 @@ VALID_LOAD_FORMATS = ["sagemaker_fast_model_loader"] +# Paths to each Python executable +LMI_DIST_VENV_EXEC = "/opt/djl/lmi_dist_venv/bin/python" +VLLM_VENV_EXEC = "/opt/djl/vllm_venv/bin/python" +SYSTEM_PY_EXEC = "/usr/bin/python3" + class NeoTask(Enum): """ @@ -75,17 +80,18 @@ def is_valid_sharding_config(self): return True return False - def _get_mpirun_command(self, task: NeoTask, num_processes: int): + def _get_mpirun_command(self, task: NeoTask, num_processes: int, + python_exec: str): return [ "mpirun", "--allow-run-as-root", "--bind-to", "none", "--mca", "btl_vader_single_copy_mechanism", "none", "--tag-output", "-x", "FI_PROVIDER=efa", "-x", "RDMAV_FORK_SAFE=1", "-x", "FI_EFA_USE_DEVICE_RDMA=1", "-x", "LD_LIBRARY_PATH", "-x", "PYTHONPATH", "-x", "MKL_DYNAMIC=FALSE", "-np", - str(num_processes), "/usr/bin/python3", task.script_path + str(num_processes), python_exec, task.script_path ] - def run_task(self, task: NeoTask): + def run_task(self, task: NeoTask, python_exec: str): """ Run the specified task as a subprocess, while forwarding its output to the console. For sharding jobs, use mpirun to launch multiple processes. @@ -97,9 +103,9 @@ def run_task(self, task: NeoTask): pp_degree = self.properties.get( "option.pipeline_parallel_degree", 1) world_size = int(tp_degree) * int(pp_degree) - cmd = self._get_mpirun_command(task, world_size) + cmd = self._get_mpirun_command(task, world_size, python_exec) else: - cmd = ["/usr/bin/python3", task.script_path] + cmd = [python_exec, task.script_path] with subprocess.Popen(cmd, env=os.environ, @@ -125,14 +131,19 @@ def dispatch(self): match self.serving_features: case "vllm,lmi-dist": if self.is_valid_sharding_config(): + if self.properties.get("option.rolling_batch", + "lmi-dist").lower() == "vllm": + python_exec = VLLM_VENV_EXEC + else: + python_exec = LMI_DIST_VENV_EXEC print(f"Sharding Model...") - self.run_task(NeoTask.SHARDING) + self.run_task(NeoTask.SHARDING, python_exec) else: - self.run_task(NeoTask.QUANTIZATION) + self.run_task(NeoTask.QUANTIZATION, LMI_DIST_VENV_EXEC) case "trtllm": - self.run_task(NeoTask.TENSORRT_LLM) + self.run_task(NeoTask.TENSORRT_LLM, SYSTEM_PY_EXEC) case "vllm,lmi-dist,tnx": - self.run_task(NeoTask.NEURON) + self.run_task(NeoTask.NEURON, SYSTEM_PY_EXEC) case _: raise ValueError( "Container does not support SageMaker Neo context") diff --git a/serving/docker/partition/sm_neo_shard.py b/serving/docker/partition/sm_neo_shard.py index fceee853f..b48c9d203 100644 --- a/serving/docker/partition/sm_neo_shard.py +++ b/serving/docker/partition/sm_neo_shard.py @@ -23,7 +23,6 @@ from utils import (update_kwargs_with_env_vars, load_properties) import torch -import sagemaker_fast_model_loader_rust as sm_fml from mpi4py import MPI from lmi_dist.init_engine import engine_from_args @@ -50,6 +49,7 @@ def __init__(self): def save_configs(self, pp_degree: int, tp_degree: int, input_dir: str, output_dir: str, configs: list) -> None: + import sagemaker_fast_model_loader_rust as sm_fml py_version = "{}.{}.{}".format(*sys.version_info[:3]) conf = sm_fml.ModelConfig( pipeline_parallel_size=pp_degree, @@ -160,6 +160,8 @@ def shard_lmi_dist_model(self, input_dir: str, output_dir: str, engine_configs = engine_args.create_engine_configs() engine_worker = load_model_for_sharding(engine_configs) + # Lazy import to avoid MPI not-inited errors + import sagemaker_fast_model_loader_rust as sm_fml model_dir = os.path.join(output_dir, sm_fml.MODEL_DIR_NAME) os.makedirs(model_dir, exist_ok=True)