Skip to content

Commit

Permalink
[Neo] JumpStart integration to Neo Neuron flow
Browse files Browse the repository at this point in the history
  • Loading branch information
a-ys committed Apr 30, 2024
1 parent 31a3a74 commit 1b2f31c
Show file tree
Hide file tree
Showing 3 changed files with 216 additions and 41 deletions.
22 changes: 17 additions & 5 deletions serving/docker/partition/partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,15 +166,26 @@ def cleanup(self):
entrypoint_dir = Path(self.properties['entryPoint']).parent
shutil.rmtree(entrypoint_dir)

def run_partition(self):
def run_partition(self) -> str:
"""
:return: the output of the partition command captured from stdout
"""
commands = get_partition_cmd(self.properties_manager.is_mpi_mode,
self.properties)
logging.info(f"cmd: {commands}")
self.set_environmental_vars()
result = subprocess.run(commands)
logging.info(result)
if result.returncode == 0:
logging.info(f"Partitioning done.")
partition_stdout = ""
# Use Popen to capture stdout without delaying terminal output
with subprocess.Popen(commands,
stdout=subprocess.PIPE,
bufsize=1,
universal_newlines=True) as proc:
for line in proc.stdout:
partition_stdout += line
print(line, end='')
logging.info(proc)
if proc.returncode == 0:
logging.info("Partitioning done.")
self.properties_manager.validate_and_correct_checkpoints_json()
self.properties_manager.generate_properties_file()
if not self.properties_manager.skip_copy:
Expand All @@ -183,6 +194,7 @@ def run_partition(self):
self.load_the_generated_checkpoints()
self.upload_checkpoints_to_s3()
self.cleanup()
return partition_stdout
else:
raise Exception("Partitioning was not successful.")

Expand Down
213 changes: 177 additions & 36 deletions serving/docker/partition/sm_neo_neuron_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,136 @@
import logging
import os
from types import SimpleNamespace
from typing import Final
from typing import Final, Optional
import argparse
import re
import shutil

from optimum.commands.export.neuronx import parse_args_neuronx

from sm_neo_utils import InputConfiguration, CompilationFatalError, write_error_to_file, get_neo_env_vars, get_neo_compiler_flags
from sm_neo_utils import (InputConfiguration, CompilationFatalError,
write_error_to_file, get_neo_env_vars,
get_neo_compiler_flags, load_jumpstart_metadata)
from utils import extract_python_jar
from properties_manager import PropertiesManager
from partition import PartitionService

PYTHON_CACHE_DIR = '/tmp/djlserving/cache'
_neuronxcc_version: Optional[str] = None


def get_neuronxcc_version() -> str:
"""
Gets version of NeuronX Compiler
Copied from djl-serving/engines/python/setup/djl_python/neuron_utils/utils.py
:return: NeuronX compiler version
"""
global _neuronxcc_version
if _neuronxcc_version is not None:
return _neuronxcc_version
try:
import neuronxcc
except ImportError:
raise ModuleNotFoundError(
"NeuronX Compiler python package is not installed.")
_neuronxcc_version = neuronxcc.__version__
return _neuronxcc_version


class NeoNeuronCacheManager():
"""
This class manages the creation of alternate Neuron cache formats for Neo.
"""
NEURONXCC_DIR = f"neuronxcc-{get_neuronxcc_version()}"

def __init__(self, neuron_compilation_log: str, cache_dir: str):
self.module_ids: list[
str] = NeoNeuronCacheManager.get_neuron_cache_module_ids(
neuron_compilation_log)
self.cache_dir: str = cache_dir

@staticmethod
def get_neuron_cache_module_ids(text: str) -> list[str]:
"""
Searches the input text for Neuron cache module IDs.
These module IDs correspond to NEFF files in the Neuron cache needed to compile the model.
:return: A list of unique module IDs found in the input
"""
uniq_matches = list(set(re.findall(r"MODULE\_\w{20}\+\w{8}", text)))
assert len(
uniq_matches
) > 0, "Unable to find any module IDs in the Neuron compilation logs"
logging.info(
f"The Neuron cache model IDs for this model are: {uniq_matches}")
return uniq_matches

def copy_neuron_cache_modules(self, output_dir: str):
"""
Copies the Neuron Cache modules for the current model to the specified output directory.
"""
logging.info(
f"Saving Neuron Cache NEFFs to {os.path.abspath(output_dir)}")

for module_id in self.module_ids:
src_path = os.path.join(self.cache_dir, self.NEURONXCC_DIR,
module_id)
dst_path = os.path.join(output_dir, module_id)
shutil.copytree(src_path, dst_path, dirs_exist_ok=True)

def create_jumpstart_neuron_cache_in_output_dir(self, output_dir: str):
"""
Saves the Neuron cache in JumpStart format in the given directory.
The format in this case is:
<output dir>/PRE_COMPILED_NEURON_GRAPH_INFER/<Neuron Persistent Cache format>
For example:
<output dir>/PRE_COMPILED_NEURON_GRAPH_INFER/neuronxcc-2.13.68.0+6dfecc895/<Module folders>
:param output_dir: the path to saved to. Intended to be the partitioning output directory.
"""
output_dir = os.path.join(output_dir,
"PRE_COMPILED_NEURON_GRAPH_INFER",
self.NEURONXCC_DIR)

logging.info(
"Saving Neuron Cache NEFFs for JumpStart in partition output directory"
)
self.copy_neuron_cache_modules(output_dir)

def create_jumpstart_neuron_cache_in_cache_dir(self,
jumpstart_metadata: dict):
"""
Saves the Neuron cache using the passed metadata in the Neuron cache directory with the
following format:
/<cache dir>/JUMPSTART_COMPILED_GRAPHS/neuronxcc-<compiler version>/<JumpStart model id>/...
/<JumpStart model scope>/PRE_COMPILED_NEURON_GRAPH_INFER/<Neuron Persistent Cache format>
For example:
/<cache dir>/JUMPSTART_COMPILED_GRAPHS/neuronxcc-2.13.68.0+6dfecc895/...
/<JumpStart model id>/inference/PRE_COMPILED_NEURON_GRAPH_INFER/...
/neuronxcc-2.13.68.0+6dfecc895/<Module folders>
"""
try:
jumpstart_model_id = jumpstart_metadata["model_info"]["model_id"]
jumpstart_model_scope = jumpstart_metadata["script_info"]["Scope"]
except KeyError as key:
logging.warning(
f"Missing field {key} from JumpStart metadata. "
"JumpStart cache will not be constructed in Neuron Cache directory"
)
return

output_dir = os.path.join(self.cache_dir, "JUMPSTART_COMPILED_GRAPHS",
self.NEURONXCC_DIR, jumpstart_model_id,
jumpstart_model_scope,
"PRE_COMPILED_NEURON_GRAPH_INFER",
self.NEURONXCC_DIR)

logging.info(
"Saving Neuron Cache NEFFs for JumpStart in Neuron cache directory"
)
self.copy_neuron_cache_modules(output_dir)


class NeoNeuronPartitionService():
Expand All @@ -43,6 +162,12 @@ def __init__(self):
self.OUTPUT_MODEL_DIRECTORY: Final[str] = env[2]
self.COMPILATION_ERROR_FILE: Final[str] = env[3]
self.COMPILER_CACHE_LOCATION: Final[str] = env[4]
# Specific to Neuron compilation
self.CACHE_JUMPSTART_FORMAT: Final[str] = os.environ.get(
"SM_CACHE_JUMPSTART_FORMAT", "false")

self.jumpstart_metadata: dict = load_jumpstart_metadata(
self.INPUT_MODEL_DIRECTORY)

def update_neuron_cache_location(self):
logging.info(
Expand All @@ -60,8 +185,8 @@ def initialize_partition_args_namespace(self):
self.args.save_mp_checkpoint_path = self.OUTPUT_MODEL_DIRECTORY
# If skip_copy is not enabled, outputted configs are overwritten, and deployment fails.
self.args.skip_copy = True
# These attributes reflect the default values of the corresponding attributes in the partition argparser.
# PropertiesManager expects these attributes to be defined.
# These attributes reflect the default values of the corresponding attributes
# in the partition argparser. PropertiesManager expects these attributes to be defined.
self.args.model_id = None
self.args.engine = None
self.args.tensor_parallel_degree = None
Expand All @@ -80,8 +205,8 @@ def parse_neo_compiler_flags(self):
if "compile_only" in self.compiler_flags and self.compiler_flags[
"compile_only"].lower() == "true":
logging.info(
"Warning: compile_only flag passed. SafeTensors weights or split model weights must be provided to deploy the model."
)
"Warning: compile_only flag passed. SafeTensors weights or split model "
"weights must be provided to deploy the model.")
self.properties["option.partition_schema"] = "compile_only"
del self.compiler_flags["compile_only"]

Expand All @@ -90,16 +215,16 @@ def parse_neo_compiler_flags(self):
"compiler_interface"]
del self.compiler_flags["compiler_interface"]
logging.info(
f"{self.compiler_interface} is set as the compiler interface by Neo CompilerOptions."
)
f"{self.compiler_interface} is set as the compiler interface by "
"Neo CompilerOptions.")

@staticmethod
def convert_tnx_options_to_djl_options(options: dict) -> dict:
"""
Converts Transformers-NeuronX options accepted by Neo to the equivalent option in djl-serving.
Only options that have a different name or set of values are converted; the remaining are kept as-is.
Supports an additional option "continuous_batching" (equivalently "batch_size_for_shared_caches") to allow
users to enable continuous batching.
Converts Transformers-NeuronX options accepted by Neo to the equivalent option
in djl-serving. Only options that have a different name or set of values are converted;
the remaining are kept as-is. Supports an additional option "continuous_batching"
(equivalently "batch_size_for_shared_caches") to allow users to enable continuous batching.
:param options: A dictionary containing Transformers-NeuronX options as key-value pairs.
:return: returns the modified dictionary
Expand Down Expand Up @@ -135,7 +260,8 @@ def convert_tnx_options_to_djl_options(options: dict) -> dict:

def construct_properties_manager_from_tnx_options(self):
"""
Factory method used to construct a PropertiesManager from Transformers-NeuronX Neo CompilerOptions
Factory method used to construct a PropertiesManager from Transformers-NeuronX
Neo CompilerOptions
"""
self.args.engine = "Python"
# Passing a dummy location because it's expected by PropertiesManager
Expand All @@ -145,17 +271,17 @@ def construct_properties_manager_from_tnx_options(self):
self.properties["option.model_loader"] = "tnx"
self.properties |= NeoNeuronPartitionService.convert_tnx_options_to_djl_options(
self.compiler_flags)
logging.debug(
f"Constructing PropertiesManager from TNX options\nargs:{self.args}\nprops:{self.properties}"
)
logging.debug("Constructing PropertiesManager from TNX "
f"options\nargs:{self.args}\nprops:{self.properties}")
self.properties_manager = PropertiesManager(
self.args, addl_properties=self.properties)

@staticmethod
def convert_optimum_flags_to_djl_options(
flags: argparse.Namespace) -> dict:
"""
This takes a namespace created by parsing Optimum CLI flags and maps the values to djlserving options.
This takes a namespace created by parsing Optimum CLI flags and maps the values to
djl-serving options.
:param flags: a mamespace object returned by the Optimum ArgumentParser
:return: dictionary containing the converted options
Expand Down Expand Up @@ -215,9 +341,8 @@ def construct_properties_manager_from_optimum_flags(self):
self.properties |= NeoNeuronPartitionService.convert_optimum_flags_to_djl_options(
flags)

logging.debug(
f"Constructing PropertiesManager from optimum options\nargs:{self.args}\nprops:{self.properties}"
)
logging.debug("Constructing PropertiesManager from optimum "
f"options\nargs:{self.args}\nprops:{self.properties}")
self.properties_manager = PropertiesManager(
self.args, addl_properties=self.properties)

Expand All @@ -227,24 +352,36 @@ def construct_properties_manager_from_serving_properties(self):
"""
self.args.properties_dir = self.INPUT_MODEL_DIRECTORY
logging.debug(
f"Constructing PropertiesManager from serving.properties\nargs:{self.args}\nprops:{self.properties}"
)
"Constructing PropertiesManager from "
f"serving.properties\nargs:{self.args}\nprops:{self.properties}")
self.properties_manager = PropertiesManager(
self.args, addl_properties=self.properties)

def run_partition(self) -> str:
"""
:return: the output of the partition command captured from stdout
"""
partition_service = PartitionService(self.properties_manager)
extract_python_jar(PYTHON_CACHE_DIR)
try:
return partition_service.run_partition()
except Exception as exc:
raise CompilationFatalError(
f"Encountered an error during Transformers-NeuronX compilation: {exc}"
)

def neo_partition(self):
self.update_neuron_cache_location()
self.initialize_partition_args_namespace()
self.compiler_flags = get_neo_compiler_flags(self.NEO_COMPILER_OPTIONS)
self.parse_neo_compiler_flags()
if self.compiler_interface == "transformers-neuronx":
logging.info(
"Reading Neo CompilerOptions in transformers-neuronx format. serving.properties will be ignored"
)
"Reading Neo CompilerOptions in transformers-neuronx format. "
"serving.properties will be ignored")
self.construct_properties_manager_from_tnx_options()
logging.info(
"Reading Neo CompilerOptions in optimum format. serving.properties will be ignored"
)
logging.info("Reading Neo CompilerOptions in optimum format. "
"serving.properties will be ignored")
elif self.compiler_interface == "optimum":
self.construct_properties_manager_from_optimum_flags()
elif self.compiler_interface:
Expand All @@ -256,17 +393,21 @@ def neo_partition(self):
"Reading compiler options from provided serving.properties file"
)
self.construct_properties_manager_from_serving_properties()

logging.info(f"Model options: {self.properties_manager.properties}")
partition_output = self.run_partition()

partition_service = PartitionService(self.properties_manager)
extract_python_jar(PYTHON_CACHE_DIR)
try:
partition_service.run_partition()
except Exception as exc:
raise CompilationFatalError(
f"Encountered an error during Transformers-NeuronX compilation: {exc}"
)
if self.CACHE_JUMPSTART_FORMAT.lower() == 'true':
logging.info("JumpStart cache environment variable is set")
if self.jumpstart_metadata:
logging.info(
"JumpStart metadata found. Outputting NEFFs in JumpStart format"
)
cache_manager = NeoNeuronCacheManager(
partition_output, self.COMPILER_CACHE_LOCATION)
cache_manager.create_jumpstart_neuron_cache_in_output_dir(
self.OUTPUT_MODEL_DIRECTORY)
cache_manager.create_jumpstart_neuron_cache_in_cache_dir(
self.jumpstart_metadata)


def main():
Expand Down
22 changes: 22 additions & 0 deletions serving/docker/partition/sm_neo_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,25 @@ def get_neo_compiler_flags(compiler_options):
except Exception as exc:
raise InputConfiguration(
f"Failed to parse SageMaker Neo CompilerOptions: {exc}")


def load_jumpstart_metadata(path: str) -> dict:
"""
Loads the JumpStart metadata files __model_info__.json, __script_info__.json files to a
dictionary if they exist.
"""
js_metadata = {}
model_info_path = os.path.join(path, "__model_info__.json")
script_info_path = os.path.join(path, "__script_info__.json")

if os.path.exists(model_info_path):
logging.info("JumpStart __model_info__.json found")
with open(model_info_path) as file:
js_metadata["model_info"] = json.load(file)

if os.path.exists(script_info_path):
logging.info("JumpStart __script_info__.json found")
with open(script_info_path) as file:
js_metadata["script_info"] = json.load(file)

return js_metadata

0 comments on commit 1b2f31c

Please sign in to comment.