From 294f5a250908ef1eba8ce90da26c78b5b4adc327 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Mon, 29 Aug 2022 09:13:57 -0700 Subject: [PATCH 01/10] Don't copy the dataframe --- examples/dfp_workflow/morpheus/dfp/utils/column_info.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/dfp_workflow/morpheus/dfp/utils/column_info.py b/examples/dfp_workflow/morpheus/dfp/utils/column_info.py index 04db9a5eb8..f8716ba48b 100644 --- a/examples/dfp_workflow/morpheus/dfp/utils/column_info.py +++ b/examples/dfp_workflow/morpheus/dfp/utils/column_info.py @@ -112,13 +112,13 @@ def __post_init__(self): def _process_columns(df_in: pd.DataFrame, input_schema: DataFrameInputSchema): - output_df = pd.DataFrame() + output_df = df_in # Iterate over the column info for ci in input_schema.column_info: output_df[ci.name] = ci.process_column(df_in) - if (input_schema.preserve_columns is not None): + if (False and input_schema.preserve_columns is not None): # Get the list of remaining columns not already added df_in_columns = set(df_in.columns) - set(output_df.columns) From 440a6efdc6140449a6510e72cae95d2a3f7123e2 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Mon, 29 Aug 2022 09:31:39 -0700 Subject: [PATCH 02/10] Avoid potentially costly debug statements if debug logging isn't enabled. Add Optional profiling --- .../dfp/stages/dfp_preprocessing_stage.py | 34 +++++++++++++++---- 1 file changed, 28 insertions(+), 6 deletions(-) diff --git a/examples/dfp_workflow/morpheus/dfp/stages/dfp_preprocessing_stage.py b/examples/dfp_workflow/morpheus/dfp/stages/dfp_preprocessing_stage.py index e3afb4e2c2..f10798a575 100644 --- a/examples/dfp_workflow/morpheus/dfp/stages/dfp_preprocessing_stage.py +++ b/examples/dfp_workflow/morpheus/dfp/stages/dfp_preprocessing_stage.py @@ -12,9 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. +import cProfile import dataclasses import logging import os +import pstats import time import typing @@ -451,6 +453,8 @@ def __init__(self, # self._s3_cache_dir = frame_cache_dir self._return_format = return_format self._only_last_batch = only_last_batch + self._do_profile = False + self._profilers = [] @property def name(self) -> str: @@ -466,6 +470,10 @@ def process_features(self, message: UserMessageMeta): if (message is None): return None + if self._do_profile: + pr = cProfile.Profile() + pr.enable() + start_time = time.time() # Process the columns @@ -480,18 +488,32 @@ def process_features(self, message: UserMessageMeta): duration = (time.time() - start_time) * 1000.0 - logger.debug("Preprocessed %s data for logs in %s to %s in %s ms", - message.count, - message.df[self._config.ae.timestamp_column_name].min(), - message.df[self._config.ae.timestamp_column_name].max(), - duration) + if logger.isEnabledFor(logging.DEBUG): + logger.debug("Preprocessed %s data for logs in %s to %s in %s ms", + message.count, + message.df[self._config.ae.timestamp_column_name].min(), + message.df[self._config.ae.timestamp_column_name].max(), + duration) + + if self._do_profile: + pr.disable() + self._profilers.append(pr) return [output_message] + def _stop_prof(self): + if self._do_profile: + s = pstats.Stats() + for p in reversed(self._profilers): + s.add(p) + s.dump_stats('preproc.prof') + + self._profilers.clear() + def _build_single(self, builder: srf.Builder, input_stream: StreamPair) -> StreamPair: def node_fn(obs: srf.Observable, sub: srf.Subscriber): - obs.pipe(ops.map(self.process_features), ops.flatten()).subscribe(sub) + obs.pipe(ops.map(self.process_features), ops.flatten(), ops.on_completed(self._stop_prof)).subscribe(sub) node = builder.make_node_full(self.unique_name, node_fn) builder.make_edge(input_stream[0], node) From d6850c820acff1356cc662a89e9e6061446a735f Mon Sep 17 00:00:00 2001 From: David Gardner Date: Mon, 29 Aug 2022 13:27:28 -0700 Subject: [PATCH 03/10] Use a relative import examples doesn't have an __init__.py --- .../morpheus/dfp/stages/dfp_mlflow_model_writer.py | 3 ++- .../morpheus/dfp/stages/dfp_postprocessing_stage.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/examples/dfp_workflow/morpheus/dfp/stages/dfp_mlflow_model_writer.py b/examples/dfp_workflow/morpheus/dfp/stages/dfp_mlflow_model_writer.py index 64619eb77b..2557963983 100644 --- a/examples/dfp_workflow/morpheus/dfp/stages/dfp_mlflow_model_writer.py +++ b/examples/dfp_workflow/morpheus/dfp/stages/dfp_mlflow_model_writer.py @@ -113,7 +113,8 @@ def on_data(self, message: MultiAEMessage): sample_input = model.prepare_df(message.get_meta()) # TODO(MDD) this should work with sample_input - model_sig = infer_signature(message.get_meta(), model.get_anomaly_score(sample_input)) + columns = [c for c in message.meta.df.columns if c != '_row_hash'] + model_sig = infer_signature(message.get_meta(columns), model.get_anomaly_score(sample_input)) model_info = mlflow.pytorch.log_model( pytorch_model=model, diff --git a/examples/dfp_workflow/morpheus/dfp/stages/dfp_postprocessing_stage.py b/examples/dfp_workflow/morpheus/dfp/stages/dfp_postprocessing_stage.py index 46e5c715a1..feff0d1edb 100644 --- a/examples/dfp_workflow/morpheus/dfp/stages/dfp_postprocessing_stage.py +++ b/examples/dfp_workflow/morpheus/dfp/stages/dfp_postprocessing_stage.py @@ -23,7 +23,6 @@ from srf.core import operators as ops import cudf -from examples.dfp_workflow.morpheus.dfp.stages.multi_dfp_message import DFPMessageMeta from morpheus.config import Config from morpheus.messages.multi_ae_message import MultiAEMessage @@ -31,6 +30,7 @@ from morpheus.pipeline.stream_pair import StreamPair from .dfp_autoencoder import DFPAutoEncoder +from .multi_dfp_message import DFPMessageMeta logger = logging.getLogger("morpheus.{}".format(__name__)) From 16383c9a043a434a2604efbb47289feb04f4f8a2 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Mon, 29 Aug 2022 13:30:40 -0700 Subject: [PATCH 04/10] Flatten the user.groups column to a string --- examples/dfp_workflow/docker-compose.yml | 28 ++++++++++++----- .../dfp/stages/dfp_inference_stage.py | 30 +++++++++++++++++-- .../dfp_workflow/morpheus/dfp_pipeline_duo.py | 10 +++++++ scripts/compile.sh | 2 +- 4 files changed, 59 insertions(+), 11 deletions(-) diff --git a/examples/dfp_workflow/docker-compose.yml b/examples/dfp_workflow/docker-compose.yml index d95be4ac21..a6e55b1908 100644 --- a/examples/dfp_workflow/docker-compose.yml +++ b/examples/dfp_workflow/docker-compose.yml @@ -2,10 +2,10 @@ version: '3.3' services: - db: + mlflow-db: restart: always image: mysql/mysql-server - container_name: mlflow_db + container_name: mlflow-db expose: - "3306" networks: @@ -27,19 +27,21 @@ services: ports: - "5000:5000" networks: - - frontend - backend + - frontend # environment: # - AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} # - AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY} # - AWS_DEFAULT_REGION=${AWS_DEFAULT_REGION} - # command: mlflow server --backend-store-uri mysql+pymysql://${MYSQL_USER}:${MYSQL_PASSWORD}@db:3306/${MYSQL_DATABASE} --serve-artifacts --artifacts-destination /opt/mlflow/artifacts --host 0.0.0.0 - command: mlflow server --backend-store-uri sqlite:////opt/mlflow/dbdata/mlflow.db --serve-artifacts --artifacts-destination /opt/mlflow/artifacts --host 0.0.0.0 + #command: mlflow server --backend-store-uri mysql+pymysql://${MYSQL_USER}:${MYSQL_PASSWORD}@mlflow-db:3306/${MYSQL_DATABASE} --serve-artifacts --artifacts-destination /opt/mlflow/artifacts --host 0.0.0.0 + #command: mlflow server --backend-store-uri postgresql://mlflow_user:mlflow@localhost/mlflow_db --serve-artifacts --artifacts-destination /opt/mlflow/artifacts --host 0.0.0.0 + command: mlflow server --gunicorn-opts "--log-level debug" --backend-store-uri sqlite:////opt/mlflow/dbdata/mlflow.db --serve-artifacts --artifacts-destination /opt/mlflow/artifacts --host 0.0.0.0 + #command: mlflow server -w 64 --backend-store-uri /opt/mlflow/dstore --serve-artifacts --artifacts-destination /opt/mlflow/artifacts --host 0.0.0.0 volumes: - db_data:/opt/mlflow/dbdata - mlflow_data:/opt/mlflow/artifacts - depends_on: - - db + # depends_on: + # - mlflow-db jupyter: restart: always @@ -83,6 +85,16 @@ services: # PS1: "$$(whoami):$$(pwd) $$ " VAULT_ROLE_ID: "${VAULT_ROLE_ID}" VAULT_SECRET_ID: "${VAULT_SECRET_ID}" + VAULT_ADDR: "${VAULT_ADDR}" + VAULT_NAMESPACE: heimdall + VAULT_SECRET_PATH: aws/heimdall/sts/heimdall-deployments + VAULT_LOGIN_PATH: auth/heimdall-deployments/login + VAULT_TOKEN: "${VAULT_TOKEN}" + AWS_CREDENTIALS: "${AWS_CREDENTIALS}" + AWS_ACCESS_KEY_ID: "${AWS_ACCESS_KEY_ID}" + AWS_SECRET_ACCESS_KEY: "${AWS_SECRET_ACCESS_KEY}" + AWS_SESSION_TOKEN: "${AWS_SESSION_TOKEN}" + NGC_CLI_API_KEY: "${NGC_CLI_API_KEY}" DFP_CACHE_DIR: "/work/.cache/dfp" DFP_TRACKING_URI: "http://mlflow:5000" command: ./launch.sh --train_users=generic --duration=1d @@ -96,6 +108,8 @@ services: - mlflow profiles: - training + cap_add: + - sys_nice user: "${UID}:${GID}" # nginx: diff --git a/examples/dfp_workflow/morpheus/dfp/stages/dfp_inference_stage.py b/examples/dfp_workflow/morpheus/dfp/stages/dfp_inference_stage.py index 67ee5711bc..218d1d998c 100644 --- a/examples/dfp_workflow/morpheus/dfp/stages/dfp_inference_stage.py +++ b/examples/dfp_workflow/morpheus/dfp/stages/dfp_inference_stage.py @@ -12,8 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +import cProfile import logging import os +import pstats import threading import time import typing @@ -23,6 +25,7 @@ import srf from mlflow.exceptions import MlflowException from mlflow.tracking.client import MlflowClient +from srf.core import operators as ops from morpheus.config import Config from morpheus.messages.multi_ae_message import MultiAEMessage @@ -244,6 +247,8 @@ def __init__(self, c: Config, model_name_formatter: str): self._cache_timeout_sec = 600 self._model_manager = ModelManager(model_name_formatter=model_name_formatter) + self._do_profile = False + self._profilers = [] @property def name(self) -> str: @@ -262,8 +267,9 @@ def get_model(self, user: str) -> ModelCache: fallback_user_ids=[self._config.ae.fallback_username]) def on_data(self, message: MultiDFPMessage): - if (not message or message.mess_count == 0): - return None + if self._do_profile: + pr = cProfile.Profile() + pr.enable() start_time = time.time() @@ -306,10 +312,28 @@ def on_data(self, message: MultiDFPMessage): df_user[self._config.ae.timestamp_column_name].min(), df_user[self._config.ae.timestamp_column_name].max()) + if self._do_profile: + pr.disable() + self._profilers.append(pr) + return output_message + def _stop_prof(self): + if self._do_profile: + s = pstats.Stats() + for p in reversed(self._profilers): + s.add(p) + s.dump_stats('inf_col.{}.prof'.format(time.time())) + + self._profilers.clear() + def _build_single(self, builder: srf.Builder, input_stream: StreamPair) -> StreamPair: - node = builder.make_node(self.unique_name, self.on_data) + #node = builder.make_node(self.unique_name, self.on_data) + def node_fn(obs: srf.Observable, sub: srf.Subscriber): + + obs.pipe(ops.map(self.on_data), ops.on_completed(self._stop_prof)).subscribe(sub) + + node = builder.make_node_full(self.unique_name, node_fn) builder.make_edge(input_stream[0], node) node.launch_options.pe_count = self._config.num_threads diff --git a/examples/dfp_workflow/morpheus/dfp_pipeline_duo.py b/examples/dfp_workflow/morpheus/dfp_pipeline_duo.py index 3338c5d492..bde29b452b 100644 --- a/examples/dfp_workflow/morpheus/dfp_pipeline_duo.py +++ b/examples/dfp_workflow/morpheus/dfp_pipeline_duo.py @@ -17,6 +17,7 @@ import typing from datetime import datetime from datetime import timedelta +from functools import partial import click import mlflow @@ -49,6 +50,7 @@ from morpheus.messages.message_meta import UserMessageMeta from morpheus.pipeline import LinearPipeline from morpheus.stages.general.monitor_stage import MonitorStage +from morpheus.stages.general.trigger_stage import TriggerStage from morpheus.stages.output.write_to_file_stage import WriteToFileStage from morpheus.utils.logger import configure_logging @@ -151,6 +153,12 @@ def column_locincrement(df: cudf.DataFrame): # Simple but probably incorrect calculation return df.groupby([config.ae.userid_column_name, per_day, "locationcity"]).ngroup() + 1 + def column_listjoin(df: cudf.DataFrame, col_name): + if col_name in df: + return df[col_name].transform(lambda x: ",".join(x)).astype('string') + else: + return pd.Series(None, dtype='string') + def s3_date_extractor_duo(s3_object): key_object = s3_object.key @@ -174,6 +182,7 @@ def s3_date_extractor_duo(s3_object): RenameColumn(name="reason", dtype=str, input_name="reason"), RenameColumn(name="username", dtype=str, input_name="user.name"), RenameColumn(name=config.ae.timestamp_column_name, dtype=datetime, input_name=config.ae.timestamp_column_name), + CustomColumn(name="user.groups", dtype=str, process_column_fn=partial(column_listjoin, col_name="user.groups")) ] input_schema = DataFrameInputSchema(json_columns=["access_device", "application", "auth_device", "user"], @@ -206,6 +215,7 @@ def s3_date_extractor_duo(s3_object): input_schema=input_schema, filter_null=False, cache_dir=cache_dir)) + elif (source == "file"): pipeline.set_source( MultiFileSource(config, diff --git a/scripts/compile.sh b/scripts/compile.sh index 6ac2e14701..4d3eb8c715 100755 --- a/scripts/compile.sh +++ b/scripts/compile.sh @@ -24,7 +24,7 @@ echo "Runing CMake configure..." cmake -B ${BUILD_DIR} -GNinja \ -DCMAKE_MESSAGE_CONTEXT_SHOW=ON \ -DMORPHEUS_USE_CLANG_TIDY=OFF \ - -DMORPHEUS_PYTHON_INPLACE_BUILD=ON \ + -DMORPHEUS_PYTHON_INPLACE_BUILD=OFF \ -DMORPHEUS_USE_CCACHE=ON \ -DMORPHEUS_USE_CONDA=${MORPHEUS_USE_CONDA:-"ON"} \ ${INSTALL_PREFIX:+"-DCMAKE_INSTALL_PREFIX=${INSTALL_PREFIX}"} \ From d9ae2f82a1192b33adc845536c588734a0bc6d7f Mon Sep 17 00:00:00 2001 From: David Gardner Date: Mon, 29 Aug 2022 13:31:26 -0700 Subject: [PATCH 05/10] Revert "Flatten the user.groups column to a string" This reverts commit 16383c9a043a434a2604efbb47289feb04f4f8a2. --- examples/dfp_workflow/docker-compose.yml | 28 +++++------------ .../dfp/stages/dfp_inference_stage.py | 30 ++----------------- .../dfp_workflow/morpheus/dfp_pipeline_duo.py | 10 ------- scripts/compile.sh | 2 +- 4 files changed, 11 insertions(+), 59 deletions(-) diff --git a/examples/dfp_workflow/docker-compose.yml b/examples/dfp_workflow/docker-compose.yml index a6e55b1908..d95be4ac21 100644 --- a/examples/dfp_workflow/docker-compose.yml +++ b/examples/dfp_workflow/docker-compose.yml @@ -2,10 +2,10 @@ version: '3.3' services: - mlflow-db: + db: restart: always image: mysql/mysql-server - container_name: mlflow-db + container_name: mlflow_db expose: - "3306" networks: @@ -27,21 +27,19 @@ services: ports: - "5000:5000" networks: - - backend - frontend + - backend # environment: # - AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} # - AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY} # - AWS_DEFAULT_REGION=${AWS_DEFAULT_REGION} - #command: mlflow server --backend-store-uri mysql+pymysql://${MYSQL_USER}:${MYSQL_PASSWORD}@mlflow-db:3306/${MYSQL_DATABASE} --serve-artifacts --artifacts-destination /opt/mlflow/artifacts --host 0.0.0.0 - #command: mlflow server --backend-store-uri postgresql://mlflow_user:mlflow@localhost/mlflow_db --serve-artifacts --artifacts-destination /opt/mlflow/artifacts --host 0.0.0.0 - command: mlflow server --gunicorn-opts "--log-level debug" --backend-store-uri sqlite:////opt/mlflow/dbdata/mlflow.db --serve-artifacts --artifacts-destination /opt/mlflow/artifacts --host 0.0.0.0 - #command: mlflow server -w 64 --backend-store-uri /opt/mlflow/dstore --serve-artifacts --artifacts-destination /opt/mlflow/artifacts --host 0.0.0.0 + # command: mlflow server --backend-store-uri mysql+pymysql://${MYSQL_USER}:${MYSQL_PASSWORD}@db:3306/${MYSQL_DATABASE} --serve-artifacts --artifacts-destination /opt/mlflow/artifacts --host 0.0.0.0 + command: mlflow server --backend-store-uri sqlite:////opt/mlflow/dbdata/mlflow.db --serve-artifacts --artifacts-destination /opt/mlflow/artifacts --host 0.0.0.0 volumes: - db_data:/opt/mlflow/dbdata - mlflow_data:/opt/mlflow/artifacts - # depends_on: - # - mlflow-db + depends_on: + - db jupyter: restart: always @@ -85,16 +83,6 @@ services: # PS1: "$$(whoami):$$(pwd) $$ " VAULT_ROLE_ID: "${VAULT_ROLE_ID}" VAULT_SECRET_ID: "${VAULT_SECRET_ID}" - VAULT_ADDR: "${VAULT_ADDR}" - VAULT_NAMESPACE: heimdall - VAULT_SECRET_PATH: aws/heimdall/sts/heimdall-deployments - VAULT_LOGIN_PATH: auth/heimdall-deployments/login - VAULT_TOKEN: "${VAULT_TOKEN}" - AWS_CREDENTIALS: "${AWS_CREDENTIALS}" - AWS_ACCESS_KEY_ID: "${AWS_ACCESS_KEY_ID}" - AWS_SECRET_ACCESS_KEY: "${AWS_SECRET_ACCESS_KEY}" - AWS_SESSION_TOKEN: "${AWS_SESSION_TOKEN}" - NGC_CLI_API_KEY: "${NGC_CLI_API_KEY}" DFP_CACHE_DIR: "/work/.cache/dfp" DFP_TRACKING_URI: "http://mlflow:5000" command: ./launch.sh --train_users=generic --duration=1d @@ -108,8 +96,6 @@ services: - mlflow profiles: - training - cap_add: - - sys_nice user: "${UID}:${GID}" # nginx: diff --git a/examples/dfp_workflow/morpheus/dfp/stages/dfp_inference_stage.py b/examples/dfp_workflow/morpheus/dfp/stages/dfp_inference_stage.py index 218d1d998c..67ee5711bc 100644 --- a/examples/dfp_workflow/morpheus/dfp/stages/dfp_inference_stage.py +++ b/examples/dfp_workflow/morpheus/dfp/stages/dfp_inference_stage.py @@ -12,10 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -import cProfile import logging import os -import pstats import threading import time import typing @@ -25,7 +23,6 @@ import srf from mlflow.exceptions import MlflowException from mlflow.tracking.client import MlflowClient -from srf.core import operators as ops from morpheus.config import Config from morpheus.messages.multi_ae_message import MultiAEMessage @@ -247,8 +244,6 @@ def __init__(self, c: Config, model_name_formatter: str): self._cache_timeout_sec = 600 self._model_manager = ModelManager(model_name_formatter=model_name_formatter) - self._do_profile = False - self._profilers = [] @property def name(self) -> str: @@ -267,9 +262,8 @@ def get_model(self, user: str) -> ModelCache: fallback_user_ids=[self._config.ae.fallback_username]) def on_data(self, message: MultiDFPMessage): - if self._do_profile: - pr = cProfile.Profile() - pr.enable() + if (not message or message.mess_count == 0): + return None start_time = time.time() @@ -312,28 +306,10 @@ def on_data(self, message: MultiDFPMessage): df_user[self._config.ae.timestamp_column_name].min(), df_user[self._config.ae.timestamp_column_name].max()) - if self._do_profile: - pr.disable() - self._profilers.append(pr) - return output_message - def _stop_prof(self): - if self._do_profile: - s = pstats.Stats() - for p in reversed(self._profilers): - s.add(p) - s.dump_stats('inf_col.{}.prof'.format(time.time())) - - self._profilers.clear() - def _build_single(self, builder: srf.Builder, input_stream: StreamPair) -> StreamPair: - #node = builder.make_node(self.unique_name, self.on_data) - def node_fn(obs: srf.Observable, sub: srf.Subscriber): - - obs.pipe(ops.map(self.on_data), ops.on_completed(self._stop_prof)).subscribe(sub) - - node = builder.make_node_full(self.unique_name, node_fn) + node = builder.make_node(self.unique_name, self.on_data) builder.make_edge(input_stream[0], node) node.launch_options.pe_count = self._config.num_threads diff --git a/examples/dfp_workflow/morpheus/dfp_pipeline_duo.py b/examples/dfp_workflow/morpheus/dfp_pipeline_duo.py index bde29b452b..3338c5d492 100644 --- a/examples/dfp_workflow/morpheus/dfp_pipeline_duo.py +++ b/examples/dfp_workflow/morpheus/dfp_pipeline_duo.py @@ -17,7 +17,6 @@ import typing from datetime import datetime from datetime import timedelta -from functools import partial import click import mlflow @@ -50,7 +49,6 @@ from morpheus.messages.message_meta import UserMessageMeta from morpheus.pipeline import LinearPipeline from morpheus.stages.general.monitor_stage import MonitorStage -from morpheus.stages.general.trigger_stage import TriggerStage from morpheus.stages.output.write_to_file_stage import WriteToFileStage from morpheus.utils.logger import configure_logging @@ -153,12 +151,6 @@ def column_locincrement(df: cudf.DataFrame): # Simple but probably incorrect calculation return df.groupby([config.ae.userid_column_name, per_day, "locationcity"]).ngroup() + 1 - def column_listjoin(df: cudf.DataFrame, col_name): - if col_name in df: - return df[col_name].transform(lambda x: ",".join(x)).astype('string') - else: - return pd.Series(None, dtype='string') - def s3_date_extractor_duo(s3_object): key_object = s3_object.key @@ -182,7 +174,6 @@ def s3_date_extractor_duo(s3_object): RenameColumn(name="reason", dtype=str, input_name="reason"), RenameColumn(name="username", dtype=str, input_name="user.name"), RenameColumn(name=config.ae.timestamp_column_name, dtype=datetime, input_name=config.ae.timestamp_column_name), - CustomColumn(name="user.groups", dtype=str, process_column_fn=partial(column_listjoin, col_name="user.groups")) ] input_schema = DataFrameInputSchema(json_columns=["access_device", "application", "auth_device", "user"], @@ -215,7 +206,6 @@ def s3_date_extractor_duo(s3_object): input_schema=input_schema, filter_null=False, cache_dir=cache_dir)) - elif (source == "file"): pipeline.set_source( MultiFileSource(config, diff --git a/scripts/compile.sh b/scripts/compile.sh index 4d3eb8c715..6ac2e14701 100755 --- a/scripts/compile.sh +++ b/scripts/compile.sh @@ -24,7 +24,7 @@ echo "Runing CMake configure..." cmake -B ${BUILD_DIR} -GNinja \ -DCMAKE_MESSAGE_CONTEXT_SHOW=ON \ -DMORPHEUS_USE_CLANG_TIDY=OFF \ - -DMORPHEUS_PYTHON_INPLACE_BUILD=OFF \ + -DMORPHEUS_PYTHON_INPLACE_BUILD=ON \ -DMORPHEUS_USE_CCACHE=ON \ -DMORPHEUS_USE_CONDA=${MORPHEUS_USE_CONDA:-"ON"} \ ${INSTALL_PREFIX:+"-DCMAKE_INSTALL_PREFIX=${INSTALL_PREFIX}"} \ From bf7866927e1fccfe4209743e9e3f91e4fd240925 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Mon, 29 Aug 2022 13:33:29 -0700 Subject: [PATCH 06/10] Flatten the user.groups column to a string --- examples/dfp_workflow/morpheus/dfp_pipeline_duo.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/examples/dfp_workflow/morpheus/dfp_pipeline_duo.py b/examples/dfp_workflow/morpheus/dfp_pipeline_duo.py index 3338c5d492..bde29b452b 100644 --- a/examples/dfp_workflow/morpheus/dfp_pipeline_duo.py +++ b/examples/dfp_workflow/morpheus/dfp_pipeline_duo.py @@ -17,6 +17,7 @@ import typing from datetime import datetime from datetime import timedelta +from functools import partial import click import mlflow @@ -49,6 +50,7 @@ from morpheus.messages.message_meta import UserMessageMeta from morpheus.pipeline import LinearPipeline from morpheus.stages.general.monitor_stage import MonitorStage +from morpheus.stages.general.trigger_stage import TriggerStage from morpheus.stages.output.write_to_file_stage import WriteToFileStage from morpheus.utils.logger import configure_logging @@ -151,6 +153,12 @@ def column_locincrement(df: cudf.DataFrame): # Simple but probably incorrect calculation return df.groupby([config.ae.userid_column_name, per_day, "locationcity"]).ngroup() + 1 + def column_listjoin(df: cudf.DataFrame, col_name): + if col_name in df: + return df[col_name].transform(lambda x: ",".join(x)).astype('string') + else: + return pd.Series(None, dtype='string') + def s3_date_extractor_duo(s3_object): key_object = s3_object.key @@ -174,6 +182,7 @@ def s3_date_extractor_duo(s3_object): RenameColumn(name="reason", dtype=str, input_name="reason"), RenameColumn(name="username", dtype=str, input_name="user.name"), RenameColumn(name=config.ae.timestamp_column_name, dtype=datetime, input_name=config.ae.timestamp_column_name), + CustomColumn(name="user.groups", dtype=str, process_column_fn=partial(column_listjoin, col_name="user.groups")) ] input_schema = DataFrameInputSchema(json_columns=["access_device", "application", "auth_device", "user"], @@ -206,6 +215,7 @@ def s3_date_extractor_duo(s3_object): input_schema=input_schema, filter_null=False, cache_dir=cache_dir)) + elif (source == "file"): pipeline.set_source( MultiFileSource(config, From 9ee3a4bae1ea74aae8b0f476b087095c4b4561e0 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Tue, 6 Sep 2022 13:15:20 -0700 Subject: [PATCH 07/10] Remove commented out env settings, disable mlflow depending on the db, add sys_nice to morpheus_training --- examples/dfp_workflow/docker-compose.yml | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/examples/dfp_workflow/docker-compose.yml b/examples/dfp_workflow/docker-compose.yml index 47c21fcdd9..1e0c262206 100644 --- a/examples/dfp_workflow/docker-compose.yml +++ b/examples/dfp_workflow/docker-compose.yml @@ -29,17 +29,13 @@ services: networks: - frontend - backend - # environment: - # - AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} - # - AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY} - # - AWS_DEFAULT_REGION=${AWS_DEFAULT_REGION} # command: mlflow server --backend-store-uri mysql+pymysql://${MYSQL_USER}:${MYSQL_PASSWORD}@db:3306/${MYSQL_DATABASE} --serve-artifacts --artifacts-destination /opt/mlflow/artifacts --host 0.0.0.0 - command: mlflow server --backend-store-uri sqlite:////opt/mlflow/dbdata/mlflow.db --serve-artifacts --artifacts-destination /opt/mlflow/artifacts --host 0.0.0.0 + command: mlflow server --gunicorn-opts "--log-level debug" --backend-store-uri sqlite:////opt/mlflow/dbdata/mlflow.db --serve-artifacts --artifacts-destination /opt/mlflow/artifacts --host 0.0.0.0 volumes: - db_data:/opt/mlflow/dbdata - mlflow_data:/opt/mlflow/artifacts - depends_on: - - db + # depends_on: + # - db jupyter: restart: always @@ -96,6 +92,8 @@ services: - mlflow profiles: - training + cap_add: + - sys_nice user: "${UID}:${GID}" # nginx: From 6bda14386c2279f4c02e202df7995c6e661a07a1 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Tue, 6 Sep 2022 13:16:11 -0700 Subject: [PATCH 08/10] Use a set instead of a list for existing_models cache --- .../dfp_workflow/morpheus/dfp/stages/dfp_inference_stage.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/dfp_workflow/morpheus/dfp/stages/dfp_inference_stage.py b/examples/dfp_workflow/morpheus/dfp/stages/dfp_inference_stage.py index 4c7a52e793..5462faa1e5 100644 --- a/examples/dfp_workflow/morpheus/dfp/stages/dfp_inference_stage.py +++ b/examples/dfp_workflow/morpheus/dfp/stages/dfp_inference_stage.py @@ -169,7 +169,7 @@ def _model_exists(self, reg_model_name: str) -> bool: client = MlflowClient() models = client.list_registered_models() - self._existing_models = [model.name for model in models] + self._existing_models = set(model.name for model in models) self._existing_models_updated = now From 23f493b78d33fb5b82636662ffc7271b86e33a2a Mon Sep 17 00:00:00 2001 From: David Gardner Date: Tue, 6 Sep 2022 13:24:58 -0700 Subject: [PATCH 09/10] Use frozenset since existing_models is not mutated, fix typing --- .../dfp_workflow/morpheus/dfp/stages/dfp_inference_stage.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/dfp_workflow/morpheus/dfp/stages/dfp_inference_stage.py b/examples/dfp_workflow/morpheus/dfp/stages/dfp_inference_stage.py index 5462faa1e5..0795b670f9 100644 --- a/examples/dfp_workflow/morpheus/dfp/stages/dfp_inference_stage.py +++ b/examples/dfp_workflow/morpheus/dfp/stages/dfp_inference_stage.py @@ -147,7 +147,7 @@ def __init__(self, model_name_formatter: str) -> None: self._user_model_cache_lock = threading.Lock() self._model_cache_lock = threading.Lock() - self._existing_models: typing.List[str] = [] + self._existing_models: typing.FrozenSet[str] = frozenset() self._existing_models_updated = datetime(1970, 1, 1) # Force an update of the existing models @@ -169,7 +169,7 @@ def _model_exists(self, reg_model_name: str) -> bool: client = MlflowClient() models = client.list_registered_models() - self._existing_models = set(model.name for model in models) + self._existing_models = frozenset(model.name for model in models) self._existing_models_updated = now From 41fcf094f5aeb522faa844a03303fb50079df7e1 Mon Sep 17 00:00:00 2001 From: Michael Demoret Date: Mon, 12 Sep 2022 20:39:04 -0600 Subject: [PATCH 10/10] Reverting using the same DF in process_columns --- examples/dfp_workflow/morpheus/dfp/utils/column_info.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/examples/dfp_workflow/morpheus/dfp/utils/column_info.py b/examples/dfp_workflow/morpheus/dfp/utils/column_info.py index f8716ba48b..7a9aadbaa7 100644 --- a/examples/dfp_workflow/morpheus/dfp/utils/column_info.py +++ b/examples/dfp_workflow/morpheus/dfp/utils/column_info.py @@ -112,13 +112,14 @@ def __post_init__(self): def _process_columns(df_in: pd.DataFrame, input_schema: DataFrameInputSchema): - output_df = df_in + # TODO(MDD): See what causes this to have such a perf impact over using df_in + output_df = pd.DataFrame() # Iterate over the column info for ci in input_schema.column_info: output_df[ci.name] = ci.process_column(df_in) - if (False and input_schema.preserve_columns is not None): + if (input_schema.preserve_columns is not None): # Get the list of remaining columns not already added df_in_columns = set(df_in.columns) - set(output_df.columns)