diff --git a/examples/dfp_workflow/docker-compose.yml b/examples/dfp_workflow/docker-compose.yml index 47c21fcdd9..1e0c262206 100644 --- a/examples/dfp_workflow/docker-compose.yml +++ b/examples/dfp_workflow/docker-compose.yml @@ -29,17 +29,13 @@ services: networks: - frontend - backend - # environment: - # - AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} - # - AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY} - # - AWS_DEFAULT_REGION=${AWS_DEFAULT_REGION} # command: mlflow server --backend-store-uri mysql+pymysql://${MYSQL_USER}:${MYSQL_PASSWORD}@db:3306/${MYSQL_DATABASE} --serve-artifacts --artifacts-destination /opt/mlflow/artifacts --host 0.0.0.0 - command: mlflow server --backend-store-uri sqlite:////opt/mlflow/dbdata/mlflow.db --serve-artifacts --artifacts-destination /opt/mlflow/artifacts --host 0.0.0.0 + command: mlflow server --gunicorn-opts "--log-level debug" --backend-store-uri sqlite:////opt/mlflow/dbdata/mlflow.db --serve-artifacts --artifacts-destination /opt/mlflow/artifacts --host 0.0.0.0 volumes: - db_data:/opt/mlflow/dbdata - mlflow_data:/opt/mlflow/artifacts - depends_on: - - db + # depends_on: + # - db jupyter: restart: always @@ -96,6 +92,8 @@ services: - mlflow profiles: - training + cap_add: + - sys_nice user: "${UID}:${GID}" # nginx: diff --git a/examples/dfp_workflow/morpheus/dfp/stages/dfp_inference_stage.py b/examples/dfp_workflow/morpheus/dfp/stages/dfp_inference_stage.py index 4c7a52e793..0795b670f9 100644 --- a/examples/dfp_workflow/morpheus/dfp/stages/dfp_inference_stage.py +++ b/examples/dfp_workflow/morpheus/dfp/stages/dfp_inference_stage.py @@ -147,7 +147,7 @@ def __init__(self, model_name_formatter: str) -> None: self._user_model_cache_lock = threading.Lock() self._model_cache_lock = threading.Lock() - self._existing_models: typing.List[str] = [] + self._existing_models: typing.FrozenSet[str] = frozenset() self._existing_models_updated = datetime(1970, 1, 1) # Force an update of the existing models @@ -169,7 +169,7 @@ def _model_exists(self, reg_model_name: str) -> bool: client = MlflowClient() models = client.list_registered_models() - self._existing_models = [model.name for model in models] + self._existing_models = frozenset(model.name for model in models) self._existing_models_updated = now diff --git a/examples/dfp_workflow/morpheus/dfp/stages/dfp_mlflow_model_writer.py b/examples/dfp_workflow/morpheus/dfp/stages/dfp_mlflow_model_writer.py index 64619eb77b..2557963983 100644 --- a/examples/dfp_workflow/morpheus/dfp/stages/dfp_mlflow_model_writer.py +++ b/examples/dfp_workflow/morpheus/dfp/stages/dfp_mlflow_model_writer.py @@ -113,7 +113,8 @@ def on_data(self, message: MultiAEMessage): sample_input = model.prepare_df(message.get_meta()) # TODO(MDD) this should work with sample_input - model_sig = infer_signature(message.get_meta(), model.get_anomaly_score(sample_input)) + columns = [c for c in message.meta.df.columns if c != '_row_hash'] + model_sig = infer_signature(message.get_meta(columns), model.get_anomaly_score(sample_input)) model_info = mlflow.pytorch.log_model( pytorch_model=model, diff --git a/examples/dfp_workflow/morpheus/dfp/stages/dfp_preprocessing_stage.py b/examples/dfp_workflow/morpheus/dfp/stages/dfp_preprocessing_stage.py index 2ad5c05499..bdb1b293bd 100644 --- a/examples/dfp_workflow/morpheus/dfp/stages/dfp_preprocessing_stage.py +++ b/examples/dfp_workflow/morpheus/dfp/stages/dfp_preprocessing_stage.py @@ -12,8 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +import cProfile import logging import os +import pstats import time import typing @@ -449,6 +451,8 @@ def __init__(self, # self._s3_cache_dir = frame_cache_dir self._return_format = return_format self._only_new_batches = only_new_batches + self._do_profile = False + self._profilers = [] @property def name(self) -> str: @@ -464,6 +468,10 @@ def process_features(self, message: MultiDFPMessage): if (message is None): return None + if self._do_profile: + pr = cProfile.Profile() + pr.enable() + start_time = time.time() # Process the columns @@ -477,18 +485,32 @@ def process_features(self, message: MultiDFPMessage): duration = (time.time() - start_time) * 1000.0 - logger.debug("Preprocessed %s data for logs in %s to %s in %s ms", - message.mess_count, - message.get_meta(self._config.ae.timestamp_column_name).min(), - message.get_meta(self._config.ae.timestamp_column_name).max(), - duration) + if logger.isEnabledFor(logging.DEBUG): + logger.debug("Preprocessed %s data for logs in %s to %s in %s ms", + message.mess_count, + message.get_meta(self._config.ae.timestamp_column_name).min(), + message.get_meta(self._config.ae.timestamp_column_name).max(), + duration) + + if self._do_profile: + pr.disable() + self._profilers.append(pr) return [message] + def _stop_prof(self): + if self._do_profile: + s = pstats.Stats() + for p in reversed(self._profilers): + s.add(p) + s.dump_stats('preproc.prof') + + self._profilers.clear() + def _build_single(self, builder: srf.Builder, input_stream: StreamPair) -> StreamPair: def node_fn(obs: srf.Observable, sub: srf.Subscriber): - obs.pipe(ops.map(self.process_features), ops.flatten()).subscribe(sub) + obs.pipe(ops.map(self.process_features), ops.flatten(), ops.on_completed(self._stop_prof)).subscribe(sub) node = builder.make_node_full(self.unique_name, node_fn) builder.make_edge(input_stream[0], node) diff --git a/examples/dfp_workflow/morpheus/dfp/utils/column_info.py b/examples/dfp_workflow/morpheus/dfp/utils/column_info.py index 04db9a5eb8..7a9aadbaa7 100644 --- a/examples/dfp_workflow/morpheus/dfp/utils/column_info.py +++ b/examples/dfp_workflow/morpheus/dfp/utils/column_info.py @@ -112,6 +112,7 @@ def __post_init__(self): def _process_columns(df_in: pd.DataFrame, input_schema: DataFrameInputSchema): + # TODO(MDD): See what causes this to have such a perf impact over using df_in output_df = pd.DataFrame() # Iterate over the column info diff --git a/examples/dfp_workflow/morpheus/dfp_pipeline_duo.py b/examples/dfp_workflow/morpheus/dfp_pipeline_duo.py index e3b82f1042..88fa879382 100644 --- a/examples/dfp_workflow/morpheus/dfp_pipeline_duo.py +++ b/examples/dfp_workflow/morpheus/dfp_pipeline_duo.py @@ -17,6 +17,7 @@ import typing from datetime import datetime from datetime import timedelta +from functools import partial import click import mlflow @@ -49,6 +50,7 @@ from morpheus.messages.message_meta import UserMessageMeta from morpheus.pipeline import LinearPipeline from morpheus.stages.general.monitor_stage import MonitorStage +from morpheus.stages.general.trigger_stage import TriggerStage from morpheus.stages.output.write_to_file_stage import WriteToFileStage from morpheus.utils.logger import configure_logging @@ -151,6 +153,12 @@ def column_locincrement(df: cudf.DataFrame): # Simple but probably incorrect calculation return df.groupby([config.ae.userid_column_name, per_day, "locationcity"]).ngroup() + 1 + def column_listjoin(df: cudf.DataFrame, col_name): + if col_name in df: + return df[col_name].transform(lambda x: ",".join(x)).astype('string') + else: + return pd.Series(None, dtype='string') + def s3_date_extractor_duo(s3_object): key_object = s3_object.key @@ -174,6 +182,7 @@ def s3_date_extractor_duo(s3_object): RenameColumn(name="reason", dtype=str, input_name="reason"), RenameColumn(name="username", dtype=str, input_name="user.name"), RenameColumn(name=config.ae.timestamp_column_name, dtype=datetime, input_name=config.ae.timestamp_column_name), + CustomColumn(name="user.groups", dtype=str, process_column_fn=partial(column_listjoin, col_name="user.groups")) ] input_schema = DataFrameInputSchema(json_columns=["access_device", "application", "auth_device", "user"], @@ -206,6 +215,7 @@ def s3_date_extractor_duo(s3_object): input_schema=input_schema, filter_null=False, cache_dir=cache_dir)) + elif (source == "file"): pipeline.set_source( MultiFileSource(config,