Skip to content

Commit

Permalink
[0.27.0-DLC][cherrypick][tnx] 0.27.0 neo script update (#1854) (#1856)
Browse files Browse the repository at this point in the history
Co-authored-by: Andrew Song <40076917+a-ys@users.noreply.github.com>
  • Loading branch information
tosterberg and a-ys authored May 1, 2024
1 parent 90b92ab commit ad77111
Show file tree
Hide file tree
Showing 3 changed files with 231 additions and 28 deletions.
22 changes: 17 additions & 5 deletions serving/docker/partition/partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,15 +166,26 @@ def cleanup(self):
entrypoint_dir = Path(self.properties['entryPoint']).parent
shutil.rmtree(entrypoint_dir)

def run_partition(self):
def run_partition(self) -> str:
"""
:return: the output of the partition command captured from stdout
"""
commands = get_partition_cmd(self.properties_manager.is_mpi_mode,
self.properties)
logging.info(f"cmd: {commands}")
self.set_environmental_vars()
result = subprocess.run(commands)
logging.info(result)
if result.returncode == 0:
logging.info(f"Partitioning done.")
partition_stdout = ""
# Use Popen to capture stdout without delaying terminal output
with subprocess.Popen(commands,
stdout=subprocess.PIPE,
bufsize=1,
universal_newlines=True) as proc:
for line in proc.stdout:
partition_stdout += line
print(line, end='')
logging.info(proc)
if proc.returncode == 0:
logging.info("Partitioning done.")
self.properties_manager.validate_and_correct_checkpoints_json()
self.properties_manager.generate_properties_file()
if not self.properties_manager.skip_copy:
Expand All @@ -183,6 +194,7 @@ def run_partition(self):
self.load_the_generated_checkpoints()
self.upload_checkpoints_to_s3()
self.cleanup()
return partition_stdout
else:
raise Exception("Partitioning was not successful.")

Expand Down
215 changes: 192 additions & 23 deletions serving/docker/partition/sm_neo_neuron_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,136 @@
import logging
import os
from types import SimpleNamespace
from typing import Final
from typing import Final, Optional
import argparse
import re
import shutil

from optimum.commands.export.neuronx import parse_args_neuronx

from sm_neo_utils import InputConfiguration, CompilationFatalError, write_error_to_file, get_neo_env_vars, get_neo_compiler_flags
from sm_neo_utils import (InputConfiguration, CompilationFatalError,
write_error_to_file, get_neo_env_vars,
get_neo_compiler_flags, load_jumpstart_metadata)
from utils import extract_python_jar
from properties_manager import PropertiesManager
from partition import PartitionService

PYTHON_CACHE_DIR = '/tmp/djlserving/cache'
_neuronxcc_version: Optional[str] = None


def get_neuronxcc_version() -> str:
"""
Gets version of NeuronX Compiler
Copied from djl-serving/engines/python/setup/djl_python/neuron_utils/utils.py
:return: NeuronX compiler version
"""
global _neuronxcc_version
if _neuronxcc_version is not None:
return _neuronxcc_version
try:
import neuronxcc
except ImportError:
raise ModuleNotFoundError(
"NeuronX Compiler python package is not installed.")
_neuronxcc_version = neuronxcc.__version__
return _neuronxcc_version


class NeoNeuronCacheManager():
"""
This class manages the creation of alternate Neuron cache formats for Neo.
"""
NEURONXCC_DIR = f"neuronxcc-{get_neuronxcc_version()}"

def __init__(self, neuron_compilation_log: str, cache_dir: str):
self.module_ids: list[
str] = NeoNeuronCacheManager.get_neuron_cache_module_ids(
neuron_compilation_log)
self.cache_dir: str = cache_dir

@staticmethod
def get_neuron_cache_module_ids(text: str) -> list[str]:
"""
Searches the input text for Neuron cache module IDs.
These module IDs correspond to NEFF files in the Neuron cache needed to compile the model.
:return: A list of unique module IDs found in the input
"""
uniq_matches = list(set(re.findall(r"MODULE\_\w{20}\+\w{8}", text)))
assert len(
uniq_matches
) > 0, "Unable to find any module IDs in the Neuron compilation logs"
logging.info(
f"The Neuron cache model IDs for this model are: {uniq_matches}")
return uniq_matches

def copy_neuron_cache_modules(self, output_dir: str):
"""
Copies the Neuron Cache modules for the current model to the specified output directory.
"""
logging.info(
f"Saving Neuron Cache NEFFs to {os.path.abspath(output_dir)}")

for module_id in self.module_ids:
src_path = os.path.join(self.cache_dir, self.NEURONXCC_DIR,
module_id)
dst_path = os.path.join(output_dir, module_id)
shutil.copytree(src_path, dst_path, dirs_exist_ok=True)

def create_jumpstart_neuron_cache_in_output_dir(self, output_dir: str):
"""
Saves the Neuron cache in JumpStart format in the given directory.
The format in this case is:
<output dir>/PRE_COMPILED_NEURON_GRAPH_INFER/<Neuron Persistent Cache format>
For example:
<output dir>/PRE_COMPILED_NEURON_GRAPH_INFER/neuronxcc-2.13.68.0+6dfecc895/<Module folders>
:param output_dir: the path to saved to. Intended to be the partitioning output directory.
"""
output_dir = os.path.join(output_dir,
"PRE_COMPILED_NEURON_GRAPH_INFER",
self.NEURONXCC_DIR)

logging.info(
"Saving Neuron Cache NEFFs for JumpStart in partition output directory"
)
self.copy_neuron_cache_modules(output_dir)

def create_jumpstart_neuron_cache_in_cache_dir(self,
jumpstart_metadata: dict):
"""
Saves the Neuron cache using the passed metadata in the Neuron cache directory with the
following format:
/<cache dir>/JUMPSTART_COMPILED_GRAPHS/neuronxcc-<compiler version>/<JumpStart model id>/...
/<JumpStart model scope>/PRE_COMPILED_NEURON_GRAPH_INFER/<Neuron Persistent Cache format>
For example:
/<cache dir>/JUMPSTART_COMPILED_GRAPHS/neuronxcc-2.13.68.0+6dfecc895/...
/<JumpStart model id>/inference/PRE_COMPILED_NEURON_GRAPH_INFER/...
/neuronxcc-2.13.68.0+6dfecc895/<Module folders>
"""
try:
jumpstart_model_id = jumpstart_metadata["model_info"]["model_id"]
jumpstart_model_scope = jumpstart_metadata["script_info"]["Scope"]
except KeyError as key:
logging.warning(
f"Missing field {key} from JumpStart metadata. "
"JumpStart cache will not be constructed in Neuron Cache directory"
)
return

output_dir = os.path.join(self.cache_dir, "JUMPSTART_COMPILED_GRAPHS",
self.NEURONXCC_DIR, jumpstart_model_id,
jumpstart_model_scope,
"PRE_COMPILED_NEURON_GRAPH_INFER",
self.NEURONXCC_DIR)

logging.info(
"Saving Neuron Cache NEFFs for JumpStart in Neuron cache directory"
)
self.copy_neuron_cache_modules(output_dir)


class NeoNeuronPartitionService():
Expand All @@ -43,8 +162,17 @@ def __init__(self):
self.OUTPUT_MODEL_DIRECTORY: Final[str] = env[2]
self.COMPILATION_ERROR_FILE: Final[str] = env[3]
self.COMPILER_CACHE_LOCATION: Final[str] = env[4]
# Specific to Neuron compilation
self.CACHE_JUMPSTART_FORMAT: Final[str] = os.environ.get(
"SM_CACHE_JUMPSTART_FORMAT", "false")

self.jumpstart_metadata: dict = load_jumpstart_metadata(
self.INPUT_MODEL_DIRECTORY)

def update_neuron_cache_location(self):
logging.info(
f"Updating Neuron Persistent Cache directory to: {self.COMPILER_CACHE_LOCATION}"
)
os.environ['NEURON_COMPILE_CACHE_URL'] = self.COMPILER_CACHE_LOCATION

def initialize_partition_args_namespace(self):
Expand All @@ -57,14 +185,18 @@ def initialize_partition_args_namespace(self):
self.args.save_mp_checkpoint_path = self.OUTPUT_MODEL_DIRECTORY
# If skip_copy is not enabled, outputted configs are overwritten, and deployment fails.
self.args.skip_copy = True
# These attributes reflect the default values of the corresponding attributes in the partition argparser.
# PropertiesManager expects these attributes to be defined.
# These attributes reflect the default values of the corresponding attributes
# in the partition argparser. PropertiesManager expects these attributes to be defined.
self.args.model_id = None
self.args.engine = None
self.args.tensor_parallel_degree = None

def parse_neo_compiler_flags(self):
"""
Parses the compiler_flags field of Neo CompilerOptions.
"""
if self.compiler_flags:
logging.debug(f"Compiler Flags: {self.compiler_flags}")
if not isinstance(self.compiler_flags, dict):
raise InputConfiguration(
"Invalid compiler flags. Ensure that the input is a valid JSON dictionary."
Expand All @@ -73,8 +205,8 @@ def parse_neo_compiler_flags(self):
if "compile_only" in self.compiler_flags and self.compiler_flags[
"compile_only"].lower() == "true":
logging.info(
"Warning: compile_only flag passed. SafeTensors weights or split model weights must be provided to deploy the model."
)
"Warning: compile_only flag passed. SafeTensors weights or split model "
"weights must be provided to deploy the model.")
self.properties["option.partition_schema"] = "compile_only"
del self.compiler_flags["compile_only"]

Expand All @@ -83,16 +215,16 @@ def parse_neo_compiler_flags(self):
"compiler_interface"]
del self.compiler_flags["compiler_interface"]
logging.info(
f"{self.compiler_interface} is set as the compiler interface by Neo CompilerOptions."
)
f"{self.compiler_interface} is set as the compiler interface by "
"Neo CompilerOptions.")

@staticmethod
def convert_tnx_options_to_djl_options(options: dict) -> dict:
"""
Converts Transformers-NeuronX options accepted by Neo to the equivalent option in djl-serving.
Only options that have a different name or set of values are converted; the remaining are kept as-is.
Supports an additional option "continuous_batching" (equivalently "batch_size_for_shared_caches") to allow
users to enable continuous batching.
Converts Transformers-NeuronX options accepted by Neo to the equivalent option
in djl-serving. Only options that have a different name or set of values are converted;
the remaining are kept as-is. Supports an additional option "continuous_batching"
(equivalently "batch_size_for_shared_caches") to allow users to enable continuous batching.
:param options: A dictionary containing Transformers-NeuronX options as key-value pairs.
:return: returns the modified dictionary
Expand All @@ -104,6 +236,7 @@ def convert_tnx_options_to_djl_options(options: dict) -> dict:
's8': 'int8'
}

logging.debug(f"tnx options dict: {options}")
if "amp" in options:
options["option.dtype"] = amp_dtype_map[options["amp"]]
del options["amp"]
Expand All @@ -127,7 +260,8 @@ def convert_tnx_options_to_djl_options(options: dict) -> dict:

def construct_properties_manager_from_tnx_options(self):
"""
Factory method used to construct a PropertiesManager from Transformers-NeuronX Neo CompilerOptions
Factory method used to construct a PropertiesManager from Transformers-NeuronX
Neo CompilerOptions
"""
self.args.engine = "Python"
# Passing a dummy location because it's expected by PropertiesManager
Expand All @@ -137,14 +271,17 @@ def construct_properties_manager_from_tnx_options(self):
self.properties["option.model_loader"] = "tnx"
self.properties |= NeoNeuronPartitionService.convert_tnx_options_to_djl_options(
self.compiler_flags)
logging.debug("Constructing PropertiesManager from TNX "
f"options\nargs:{self.args}\nprops:{self.properties}")
self.properties_manager = PropertiesManager(
self.args, addl_properties=self.properties)

@staticmethod
def convert_optimum_flags_to_djl_options(
flags: argparse.Namespace) -> dict:
"""
This takes a namespace created by parsing Optimum CLI flags and maps the values to djlserving options.
This takes a namespace created by parsing Optimum CLI flags and maps the values to
djl-serving options.
:param flags: a mamespace object returned by the Optimum ArgumentParser
:return: dictionary containing the converted options
Expand All @@ -161,6 +298,8 @@ def convert_optimum_flags_to_djl_options(

props = {}

logging.debug(f"optimum flags: {flags}")

# Iterating through the attributes of the namespace
for flag, value in vars(flags).items():
if flag == "task" and value == "auto":
Expand Down Expand Up @@ -202,21 +341,47 @@ def construct_properties_manager_from_optimum_flags(self):
self.properties |= NeoNeuronPartitionService.convert_optimum_flags_to_djl_options(
flags)

logging.debug("Constructing PropertiesManager from optimum "
f"options\nargs:{self.args}\nprops:{self.properties}")
self.properties_manager = PropertiesManager(
self.args, addl_properties=self.properties)

def construct_properties_manager_from_serving_properties(self):
"""
Factory method used to construct a PropertiesManager from serving.properties
"""
self.args.properties_dir = self.INPUT_MODEL_DIRECTORY
logging.debug(
"Constructing PropertiesManager from "
f"serving.properties\nargs:{self.args}\nprops:{self.properties}")
self.properties_manager = PropertiesManager(
self.args, addl_properties=self.properties)

def run_partition(self) -> str:
"""
:return: the output of the partition command captured from stdout
"""
partition_service = PartitionService(self.properties_manager)
extract_python_jar(PYTHON_CACHE_DIR)
try:
return partition_service.run_partition()
except Exception as exc:
raise CompilationFatalError(
f"Encountered an error during Transformers-NeuronX compilation: {exc}"
)

def neo_partition(self):
self.update_neuron_cache_location()
self.initialize_partition_args_namespace()
self.compiler_flags = get_neo_compiler_flags(self.NEO_COMPILER_OPTIONS)
self.parse_neo_compiler_flags()
if self.compiler_interface == "transformers-neuronx":
logging.info(
"Reading Neo CompilerOptions in transformers-neuronx format. "
"serving.properties will be ignored")
self.construct_properties_manager_from_tnx_options()
logging.info("Reading Neo CompilerOptions in optimum format. "
"serving.properties will be ignored")
elif self.compiler_interface == "optimum":
self.construct_properties_manager_from_optimum_flags()
elif self.compiler_interface:
Expand All @@ -228,17 +393,21 @@ def neo_partition(self):
"Reading compiler options from provided serving.properties file"
)
self.construct_properties_manager_from_serving_properties()

logging.info(f"Model options: {self.properties_manager.properties}")
partition_output = self.run_partition()

partition_service = PartitionService(self.properties_manager)
extract_python_jar(PYTHON_CACHE_DIR)
try:
partition_service.run_partition()
except Exception as exc:
raise CompilationFatalError(
f"Encountered an error during Transformers-NeuronX compilation: {exc}"
)
if self.CACHE_JUMPSTART_FORMAT.lower() == 'true':
logging.info("JumpStart cache environment variable is set")
if self.jumpstart_metadata:
logging.info(
"JumpStart metadata found. Outputting NEFFs in JumpStart format"
)
cache_manager = NeoNeuronCacheManager(
partition_output, self.COMPILER_CACHE_LOCATION)
cache_manager.create_jumpstart_neuron_cache_in_output_dir(
self.OUTPUT_MODEL_DIRECTORY)
cache_manager.create_jumpstart_neuron_cache_in_cache_dir(
self.jumpstart_metadata)


def main():
Expand Down
Loading

0 comments on commit ad77111

Please sign in to comment.