Skip to content

Commit

Permalink
Support different Python executables in Neo dispatcher (#2698)
Browse files Browse the repository at this point in the history
  • Loading branch information
ethnzhng authored Jan 31, 2025
1 parent dc08153 commit a9fafa7
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 10 deletions.
29 changes: 20 additions & 9 deletions serving/docker/partition/sm_neo_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@

VALID_LOAD_FORMATS = ["sagemaker_fast_model_loader"]

# Paths to each Python executable
LMI_DIST_VENV_EXEC = "/opt/djl/lmi_dist_venv/bin/python"
VLLM_VENV_EXEC = "/opt/djl/vllm_venv/bin/python"
SYSTEM_PY_EXEC = "/usr/bin/python3"


class NeoTask(Enum):
"""
Expand Down Expand Up @@ -75,17 +80,18 @@ def is_valid_sharding_config(self):
return True
return False

def _get_mpirun_command(self, task: NeoTask, num_processes: int):
def _get_mpirun_command(self, task: NeoTask, num_processes: int,
python_exec: str):
return [
"mpirun", "--allow-run-as-root", "--bind-to", "none", "--mca",
"btl_vader_single_copy_mechanism", "none", "--tag-output", "-x",
"FI_PROVIDER=efa", "-x", "RDMAV_FORK_SAFE=1", "-x",
"FI_EFA_USE_DEVICE_RDMA=1", "-x", "LD_LIBRARY_PATH", "-x",
"PYTHONPATH", "-x", "MKL_DYNAMIC=FALSE", "-np",
str(num_processes), "/usr/bin/python3", task.script_path
str(num_processes), python_exec, task.script_path
]

def run_task(self, task: NeoTask):
def run_task(self, task: NeoTask, python_exec: str):
"""
Run the specified task as a subprocess, while forwarding its output to the console.
For sharding jobs, use mpirun to launch multiple processes.
Expand All @@ -97,9 +103,9 @@ def run_task(self, task: NeoTask):
pp_degree = self.properties.get(
"option.pipeline_parallel_degree", 1)
world_size = int(tp_degree) * int(pp_degree)
cmd = self._get_mpirun_command(task, world_size)
cmd = self._get_mpirun_command(task, world_size, python_exec)
else:
cmd = ["/usr/bin/python3", task.script_path]
cmd = [python_exec, task.script_path]

with subprocess.Popen(cmd,
env=os.environ,
Expand All @@ -125,14 +131,19 @@ def dispatch(self):
match self.serving_features:
case "vllm,lmi-dist":
if self.is_valid_sharding_config():
if self.properties.get("option.rolling_batch",
"lmi-dist").lower() == "vllm":
python_exec = VLLM_VENV_EXEC
else:
python_exec = LMI_DIST_VENV_EXEC
print(f"Sharding Model...")
self.run_task(NeoTask.SHARDING)
self.run_task(NeoTask.SHARDING, python_exec)
else:
self.run_task(NeoTask.QUANTIZATION)
self.run_task(NeoTask.QUANTIZATION, LMI_DIST_VENV_EXEC)
case "trtllm":
self.run_task(NeoTask.TENSORRT_LLM)
self.run_task(NeoTask.TENSORRT_LLM, SYSTEM_PY_EXEC)
case "vllm,lmi-dist,tnx":
self.run_task(NeoTask.NEURON)
self.run_task(NeoTask.NEURON, SYSTEM_PY_EXEC)
case _:
raise ValueError(
"Container does not support SageMaker Neo context")
Expand Down
4 changes: 3 additions & 1 deletion serving/docker/partition/sm_neo_shard.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
from utils import (update_kwargs_with_env_vars, load_properties)

import torch
import sagemaker_fast_model_loader_rust as sm_fml
from mpi4py import MPI

from lmi_dist.init_engine import engine_from_args
Expand All @@ -50,6 +49,7 @@ def __init__(self):

def save_configs(self, pp_degree: int, tp_degree: int, input_dir: str,
output_dir: str, configs: list) -> None:
import sagemaker_fast_model_loader_rust as sm_fml
py_version = "{}.{}.{}".format(*sys.version_info[:3])
conf = sm_fml.ModelConfig(
pipeline_parallel_size=pp_degree,
Expand Down Expand Up @@ -160,6 +160,8 @@ def shard_lmi_dist_model(self, input_dir: str, output_dir: str,
engine_configs = engine_args.create_engine_configs()
engine_worker = load_model_for_sharding(engine_configs)

# Lazy import to avoid MPI not-inited errors
import sagemaker_fast_model_loader_rust as sm_fml
model_dir = os.path.join(output_dir, sm_fml.MODEL_DIR_NAME)
os.makedirs(model_dir, exist_ok=True)

Expand Down

0 comments on commit a9fafa7

Please sign in to comment.