Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Neo] Add JumpStart Integration to SM Neo Neuron AOT compilation flow #1854

Merged
merged 1 commit into from
May 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 17 additions & 5 deletions serving/docker/partition/partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,15 +166,26 @@ def cleanup(self):
entrypoint_dir = Path(self.properties['entryPoint']).parent
shutil.rmtree(entrypoint_dir)

def run_partition(self):
def run_partition(self) -> str:
"""
:return: the output of the partition command captured from stdout
"""
commands = get_partition_cmd(self.properties_manager.is_mpi_mode,
self.properties)
logging.info(f"cmd: {commands}")
self.set_environmental_vars()
result = subprocess.run(commands)
logging.info(result)
if result.returncode == 0:
logging.info(f"Partitioning done.")
partition_stdout = ""
# Use Popen to capture stdout without delaying terminal output
with subprocess.Popen(commands,
stdout=subprocess.PIPE,
bufsize=1,
universal_newlines=True) as proc:
for line in proc.stdout:
partition_stdout += line
print(line, end='')
logging.info(proc)
if proc.returncode == 0:
logging.info("Partitioning done.")
self.properties_manager.validate_and_correct_checkpoints_json()
self.properties_manager.generate_properties_file()
if not self.properties_manager.skip_copy:
Expand All @@ -183,6 +194,7 @@ def run_partition(self):
self.load_the_generated_checkpoints()
self.upload_checkpoints_to_s3()
self.cleanup()
return partition_stdout
else:
raise Exception("Partitioning was not successful.")

Expand Down
213 changes: 177 additions & 36 deletions serving/docker/partition/sm_neo_neuron_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,136 @@
import logging
import os
from types import SimpleNamespace
from typing import Final
from typing import Final, Optional
import argparse
import re
import shutil

from optimum.commands.export.neuronx import parse_args_neuronx

from sm_neo_utils import InputConfiguration, CompilationFatalError, write_error_to_file, get_neo_env_vars, get_neo_compiler_flags
from sm_neo_utils import (InputConfiguration, CompilationFatalError,
write_error_to_file, get_neo_env_vars,
get_neo_compiler_flags, load_jumpstart_metadata)
from utils import extract_python_jar
from properties_manager import PropertiesManager
from partition import PartitionService

PYTHON_CACHE_DIR = '/tmp/djlserving/cache'
_neuronxcc_version: Optional[str] = None


def get_neuronxcc_version() -> str:
"""
Gets version of NeuronX Compiler
Copied from djl-serving/engines/python/setup/djl_python/neuron_utils/utils.py

:return: NeuronX compiler version
"""
global _neuronxcc_version
if _neuronxcc_version is not None:
return _neuronxcc_version
try:
import neuronxcc
except ImportError:
raise ModuleNotFoundError(
"NeuronX Compiler python package is not installed.")
_neuronxcc_version = neuronxcc.__version__
return _neuronxcc_version


class NeoNeuronCacheManager():
"""
This class manages the creation of alternate Neuron cache formats for Neo.
"""
NEURONXCC_DIR = f"neuronxcc-{get_neuronxcc_version()}"

def __init__(self, neuron_compilation_log: str, cache_dir: str):
self.module_ids: list[
str] = NeoNeuronCacheManager.get_neuron_cache_module_ids(
neuron_compilation_log)
self.cache_dir: str = cache_dir

@staticmethod
def get_neuron_cache_module_ids(text: str) -> list[str]:
"""
Searches the input text for Neuron cache module IDs.
These module IDs correspond to NEFF files in the Neuron cache needed to compile the model.

:return: A list of unique module IDs found in the input
"""
uniq_matches = list(set(re.findall(r"MODULE\_\w{20}\+\w{8}", text)))
assert len(
uniq_matches
) > 0, "Unable to find any module IDs in the Neuron compilation logs"
logging.info(
f"The Neuron cache model IDs for this model are: {uniq_matches}")
return uniq_matches

def copy_neuron_cache_modules(self, output_dir: str):
"""
Copies the Neuron Cache modules for the current model to the specified output directory.
"""
logging.info(
f"Saving Neuron Cache NEFFs to {os.path.abspath(output_dir)}")

for module_id in self.module_ids:
src_path = os.path.join(self.cache_dir, self.NEURONXCC_DIR,
module_id)
dst_path = os.path.join(output_dir, module_id)
shutil.copytree(src_path, dst_path, dirs_exist_ok=True)

def create_jumpstart_neuron_cache_in_output_dir(self, output_dir: str):
"""
Saves the Neuron cache in JumpStart format in the given directory.
The format in this case is:
<output dir>/PRE_COMPILED_NEURON_GRAPH_INFER/<Neuron Persistent Cache format>
For example:
<output dir>/PRE_COMPILED_NEURON_GRAPH_INFER/neuronxcc-2.13.68.0+6dfecc895/<Module folders>

:param output_dir: the path to saved to. Intended to be the partitioning output directory.
"""
output_dir = os.path.join(output_dir,
"PRE_COMPILED_NEURON_GRAPH_INFER",
self.NEURONXCC_DIR)

logging.info(
"Saving Neuron Cache NEFFs for JumpStart in partition output directory"
)
self.copy_neuron_cache_modules(output_dir)

def create_jumpstart_neuron_cache_in_cache_dir(self,
jumpstart_metadata: dict):
"""
Saves the Neuron cache using the passed metadata in the Neuron cache directory with the
following format:
/<cache dir>/JUMPSTART_COMPILED_GRAPHS/neuronxcc-<compiler version>/<JumpStart model id>/...
/<JumpStart model scope>/PRE_COMPILED_NEURON_GRAPH_INFER/<Neuron Persistent Cache format>

For example:
/<cache dir>/JUMPSTART_COMPILED_GRAPHS/neuronxcc-2.13.68.0+6dfecc895/...
/<JumpStart model id>/inference/PRE_COMPILED_NEURON_GRAPH_INFER/...
/neuronxcc-2.13.68.0+6dfecc895/<Module folders>
"""
try:
jumpstart_model_id = jumpstart_metadata["model_info"]["model_id"]
jumpstart_model_scope = jumpstart_metadata["script_info"]["Scope"]
except KeyError as key:
logging.warning(
f"Missing field {key} from JumpStart metadata. "
"JumpStart cache will not be constructed in Neuron Cache directory"
)
return

output_dir = os.path.join(self.cache_dir, "JUMPSTART_COMPILED_GRAPHS",
self.NEURONXCC_DIR, jumpstart_model_id,
jumpstart_model_scope,
"PRE_COMPILED_NEURON_GRAPH_INFER",
self.NEURONXCC_DIR)

logging.info(
"Saving Neuron Cache NEFFs for JumpStart in Neuron cache directory"
)
self.copy_neuron_cache_modules(output_dir)


class NeoNeuronPartitionService():
Expand All @@ -43,6 +162,12 @@ def __init__(self):
self.OUTPUT_MODEL_DIRECTORY: Final[str] = env[2]
self.COMPILATION_ERROR_FILE: Final[str] = env[3]
self.COMPILER_CACHE_LOCATION: Final[str] = env[4]
# Specific to Neuron compilation
self.CACHE_JUMPSTART_FORMAT: Final[str] = os.environ.get(
"SM_CACHE_JUMPSTART_FORMAT", "false")

self.jumpstart_metadata: dict = load_jumpstart_metadata(
self.INPUT_MODEL_DIRECTORY)

def update_neuron_cache_location(self):
logging.info(
Expand All @@ -60,8 +185,8 @@ def initialize_partition_args_namespace(self):
self.args.save_mp_checkpoint_path = self.OUTPUT_MODEL_DIRECTORY
# If skip_copy is not enabled, outputted configs are overwritten, and deployment fails.
self.args.skip_copy = True
# These attributes reflect the default values of the corresponding attributes in the partition argparser.
# PropertiesManager expects these attributes to be defined.
# These attributes reflect the default values of the corresponding attributes
# in the partition argparser. PropertiesManager expects these attributes to be defined.
self.args.model_id = None
self.args.engine = None
self.args.tensor_parallel_degree = None
Expand All @@ -80,8 +205,8 @@ def parse_neo_compiler_flags(self):
if "compile_only" in self.compiler_flags and self.compiler_flags[
"compile_only"].lower() == "true":
logging.info(
"Warning: compile_only flag passed. SafeTensors weights or split model weights must be provided to deploy the model."
)
"Warning: compile_only flag passed. SafeTensors weights or split model "
"weights must be provided to deploy the model.")
self.properties["option.partition_schema"] = "compile_only"
del self.compiler_flags["compile_only"]

Expand All @@ -90,16 +215,16 @@ def parse_neo_compiler_flags(self):
"compiler_interface"]
del self.compiler_flags["compiler_interface"]
logging.info(
f"{self.compiler_interface} is set as the compiler interface by Neo CompilerOptions."
)
f"{self.compiler_interface} is set as the compiler interface by "
"Neo CompilerOptions.")

@staticmethod
def convert_tnx_options_to_djl_options(options: dict) -> dict:
"""
Converts Transformers-NeuronX options accepted by Neo to the equivalent option in djl-serving.
Only options that have a different name or set of values are converted; the remaining are kept as-is.
Supports an additional option "continuous_batching" (equivalently "batch_size_for_shared_caches") to allow
users to enable continuous batching.
Converts Transformers-NeuronX options accepted by Neo to the equivalent option
in djl-serving. Only options that have a different name or set of values are converted;
the remaining are kept as-is. Supports an additional option "continuous_batching"
(equivalently "batch_size_for_shared_caches") to allow users to enable continuous batching.

:param options: A dictionary containing Transformers-NeuronX options as key-value pairs.
:return: returns the modified dictionary
Expand Down Expand Up @@ -135,7 +260,8 @@ def convert_tnx_options_to_djl_options(options: dict) -> dict:

def construct_properties_manager_from_tnx_options(self):
"""
Factory method used to construct a PropertiesManager from Transformers-NeuronX Neo CompilerOptions
Factory method used to construct a PropertiesManager from Transformers-NeuronX
Neo CompilerOptions
"""
self.args.engine = "Python"
# Passing a dummy location because it's expected by PropertiesManager
Expand All @@ -145,17 +271,17 @@ def construct_properties_manager_from_tnx_options(self):
self.properties["option.model_loader"] = "tnx"
self.properties |= NeoNeuronPartitionService.convert_tnx_options_to_djl_options(
self.compiler_flags)
logging.debug(
f"Constructing PropertiesManager from TNX options\nargs:{self.args}\nprops:{self.properties}"
)
logging.debug("Constructing PropertiesManager from TNX "
f"options\nargs:{self.args}\nprops:{self.properties}")
self.properties_manager = PropertiesManager(
self.args, addl_properties=self.properties)

@staticmethod
def convert_optimum_flags_to_djl_options(
flags: argparse.Namespace) -> dict:
"""
This takes a namespace created by parsing Optimum CLI flags and maps the values to djlserving options.
This takes a namespace created by parsing Optimum CLI flags and maps the values to
djl-serving options.

:param flags: a mamespace object returned by the Optimum ArgumentParser
:return: dictionary containing the converted options
Expand Down Expand Up @@ -215,9 +341,8 @@ def construct_properties_manager_from_optimum_flags(self):
self.properties |= NeoNeuronPartitionService.convert_optimum_flags_to_djl_options(
flags)

logging.debug(
f"Constructing PropertiesManager from optimum options\nargs:{self.args}\nprops:{self.properties}"
)
logging.debug("Constructing PropertiesManager from optimum "
f"options\nargs:{self.args}\nprops:{self.properties}")
self.properties_manager = PropertiesManager(
self.args, addl_properties=self.properties)

Expand All @@ -227,24 +352,36 @@ def construct_properties_manager_from_serving_properties(self):
"""
self.args.properties_dir = self.INPUT_MODEL_DIRECTORY
logging.debug(
f"Constructing PropertiesManager from serving.properties\nargs:{self.args}\nprops:{self.properties}"
)
"Constructing PropertiesManager from "
f"serving.properties\nargs:{self.args}\nprops:{self.properties}")
self.properties_manager = PropertiesManager(
self.args, addl_properties=self.properties)

def run_partition(self) -> str:
"""
:return: the output of the partition command captured from stdout
"""
partition_service = PartitionService(self.properties_manager)
extract_python_jar(PYTHON_CACHE_DIR)
try:
return partition_service.run_partition()
except Exception as exc:
raise CompilationFatalError(
f"Encountered an error during Transformers-NeuronX compilation: {exc}"
)

def neo_partition(self):
self.update_neuron_cache_location()
self.initialize_partition_args_namespace()
self.compiler_flags = get_neo_compiler_flags(self.NEO_COMPILER_OPTIONS)
self.parse_neo_compiler_flags()
if self.compiler_interface == "transformers-neuronx":
logging.info(
"Reading Neo CompilerOptions in transformers-neuronx format. serving.properties will be ignored"
)
"Reading Neo CompilerOptions in transformers-neuronx format. "
"serving.properties will be ignored")
self.construct_properties_manager_from_tnx_options()
logging.info(
"Reading Neo CompilerOptions in optimum format. serving.properties will be ignored"
)
logging.info("Reading Neo CompilerOptions in optimum format. "
"serving.properties will be ignored")
elif self.compiler_interface == "optimum":
self.construct_properties_manager_from_optimum_flags()
elif self.compiler_interface:
Expand All @@ -256,17 +393,21 @@ def neo_partition(self):
"Reading compiler options from provided serving.properties file"
)
self.construct_properties_manager_from_serving_properties()

logging.info(f"Model options: {self.properties_manager.properties}")
partition_output = self.run_partition()

partition_service = PartitionService(self.properties_manager)
extract_python_jar(PYTHON_CACHE_DIR)
try:
partition_service.run_partition()
except Exception as exc:
raise CompilationFatalError(
f"Encountered an error during Transformers-NeuronX compilation: {exc}"
)
if self.CACHE_JUMPSTART_FORMAT.lower() == 'true':
logging.info("JumpStart cache environment variable is set")
if self.jumpstart_metadata:
logging.info(
"JumpStart metadata found. Outputting NEFFs in JumpStart format"
)
cache_manager = NeoNeuronCacheManager(
partition_output, self.COMPILER_CACHE_LOCATION)
cache_manager.create_jumpstart_neuron_cache_in_output_dir(
self.OUTPUT_MODEL_DIRECTORY)
cache_manager.create_jumpstart_neuron_cache_in_cache_dir(
self.jumpstart_metadata)


def main():
Expand Down
22 changes: 22 additions & 0 deletions serving/docker/partition/sm_neo_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,25 @@ def get_neo_compiler_flags(compiler_options):
except Exception as exc:
raise InputConfiguration(
f"Failed to parse SageMaker Neo CompilerOptions: {exc}")


def load_jumpstart_metadata(path: str) -> dict:
"""
Loads the JumpStart metadata files __model_info__.json, __script_info__.json files to a
dictionary if they exist.
"""
js_metadata = {}
model_info_path = os.path.join(path, "__model_info__.json")
script_info_path = os.path.join(path, "__script_info__.json")

if os.path.exists(model_info_path):
logging.info("JumpStart __model_info__.json found")
with open(model_info_path) as file:
js_metadata["model_info"] = json.load(file)

if os.path.exists(script_info_path):
logging.info("JumpStart __script_info__.json found")
with open(script_info_path) as file:
js_metadata["script_info"] = json.load(file)

return js_metadata
Loading