Skip to content

Commit

Permalink
Merge pull request #6 from dagardner-nv/david-mdd_dfp-example
Browse files Browse the repository at this point in the history
Don't copy the dataframe during pre-processing
  • Loading branch information
mdemoret-nv authored Sep 13, 2022
2 parents 2cfff73 + 41fcf09 commit 80332fa
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 16 deletions.
12 changes: 5 additions & 7 deletions examples/dfp_workflow/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,13 @@ services:
networks:
- frontend
- backend
# environment:
# - AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID}
# - AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}
# - AWS_DEFAULT_REGION=${AWS_DEFAULT_REGION}
# command: mlflow server --backend-store-uri mysql+pymysql://${MYSQL_USER}:${MYSQL_PASSWORD}@db:3306/${MYSQL_DATABASE} --serve-artifacts --artifacts-destination /opt/mlflow/artifacts --host 0.0.0.0
command: mlflow server --backend-store-uri sqlite:////opt/mlflow/dbdata/mlflow.db --serve-artifacts --artifacts-destination /opt/mlflow/artifacts --host 0.0.0.0
command: mlflow server --gunicorn-opts "--log-level debug" --backend-store-uri sqlite:////opt/mlflow/dbdata/mlflow.db --serve-artifacts --artifacts-destination /opt/mlflow/artifacts --host 0.0.0.0
volumes:
- db_data:/opt/mlflow/dbdata
- mlflow_data:/opt/mlflow/artifacts
depends_on:
- db
# depends_on:
# - db

jupyter:
restart: always
Expand Down Expand Up @@ -96,6 +92,8 @@ services:
- mlflow
profiles:
- training
cap_add:
- sys_nice
user: "${UID}:${GID}"

# nginx:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def __init__(self, model_name_formatter: str) -> None:
self._user_model_cache_lock = threading.Lock()
self._model_cache_lock = threading.Lock()

self._existing_models: typing.List[str] = []
self._existing_models: typing.FrozenSet[str] = frozenset()
self._existing_models_updated = datetime(1970, 1, 1)

# Force an update of the existing models
Expand All @@ -169,7 +169,7 @@ def _model_exists(self, reg_model_name: str) -> bool:
client = MlflowClient()
models = client.list_registered_models()

self._existing_models = [model.name for model in models]
self._existing_models = frozenset(model.name for model in models)

self._existing_models_updated = now

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ def on_data(self, message: MultiAEMessage):
sample_input = model.prepare_df(message.get_meta())

# TODO(MDD) this should work with sample_input
model_sig = infer_signature(message.get_meta(), model.get_anomaly_score(sample_input))
columns = [c for c in message.meta.df.columns if c != '_row_hash']
model_sig = infer_signature(message.get_meta(columns), model.get_anomaly_score(sample_input))

model_info = mlflow.pytorch.log_model(
pytorch_model=model,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import cProfile
import logging
import os
import pstats
import time
import typing

Expand Down Expand Up @@ -449,6 +451,8 @@ def __init__(self,
# self._s3_cache_dir = frame_cache_dir
self._return_format = return_format
self._only_new_batches = only_new_batches
self._do_profile = False
self._profilers = []

@property
def name(self) -> str:
Expand All @@ -464,6 +468,10 @@ def process_features(self, message: MultiDFPMessage):
if (message is None):
return None

if self._do_profile:
pr = cProfile.Profile()
pr.enable()

start_time = time.time()

# Process the columns
Expand All @@ -477,18 +485,32 @@ def process_features(self, message: MultiDFPMessage):

duration = (time.time() - start_time) * 1000.0

logger.debug("Preprocessed %s data for logs in %s to %s in %s ms",
message.mess_count,
message.get_meta(self._config.ae.timestamp_column_name).min(),
message.get_meta(self._config.ae.timestamp_column_name).max(),
duration)
if logger.isEnabledFor(logging.DEBUG):
logger.debug("Preprocessed %s data for logs in %s to %s in %s ms",
message.mess_count,
message.get_meta(self._config.ae.timestamp_column_name).min(),
message.get_meta(self._config.ae.timestamp_column_name).max(),
duration)

if self._do_profile:
pr.disable()
self._profilers.append(pr)

return [message]

def _stop_prof(self):
if self._do_profile:
s = pstats.Stats()
for p in reversed(self._profilers):
s.add(p)
s.dump_stats('preproc.prof')

self._profilers.clear()

def _build_single(self, builder: srf.Builder, input_stream: StreamPair) -> StreamPair:

def node_fn(obs: srf.Observable, sub: srf.Subscriber):
obs.pipe(ops.map(self.process_features), ops.flatten()).subscribe(sub)
obs.pipe(ops.map(self.process_features), ops.flatten(), ops.on_completed(self._stop_prof)).subscribe(sub)

node = builder.make_node_full(self.unique_name, node_fn)
builder.make_edge(input_stream[0], node)
Expand Down
1 change: 1 addition & 0 deletions examples/dfp_workflow/morpheus/dfp/utils/column_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ def __post_init__(self):

def _process_columns(df_in: pd.DataFrame, input_schema: DataFrameInputSchema):

# TODO(MDD): See what causes this to have such a perf impact over using df_in
output_df = pd.DataFrame()

# Iterate over the column info
Expand Down
10 changes: 10 additions & 0 deletions examples/dfp_workflow/morpheus/dfp_pipeline_duo.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import typing
from datetime import datetime
from datetime import timedelta
from functools import partial

import click
import mlflow
Expand Down Expand Up @@ -49,6 +50,7 @@
from morpheus.messages.message_meta import UserMessageMeta
from morpheus.pipeline import LinearPipeline
from morpheus.stages.general.monitor_stage import MonitorStage
from morpheus.stages.general.trigger_stage import TriggerStage
from morpheus.stages.output.write_to_file_stage import WriteToFileStage
from morpheus.utils.logger import configure_logging

Expand Down Expand Up @@ -151,6 +153,12 @@ def column_locincrement(df: cudf.DataFrame):
# Simple but probably incorrect calculation
return df.groupby([config.ae.userid_column_name, per_day, "locationcity"]).ngroup() + 1

def column_listjoin(df: cudf.DataFrame, col_name):
if col_name in df:
return df[col_name].transform(lambda x: ",".join(x)).astype('string')
else:
return pd.Series(None, dtype='string')

def s3_date_extractor_duo(s3_object):
key_object = s3_object.key

Expand All @@ -174,6 +182,7 @@ def s3_date_extractor_duo(s3_object):
RenameColumn(name="reason", dtype=str, input_name="reason"),
RenameColumn(name="username", dtype=str, input_name="user.name"),
RenameColumn(name=config.ae.timestamp_column_name, dtype=datetime, input_name=config.ae.timestamp_column_name),
CustomColumn(name="user.groups", dtype=str, process_column_fn=partial(column_listjoin, col_name="user.groups"))
]

input_schema = DataFrameInputSchema(json_columns=["access_device", "application", "auth_device", "user"],
Expand Down Expand Up @@ -206,6 +215,7 @@ def s3_date_extractor_duo(s3_object):
input_schema=input_schema,
filter_null=False,
cache_dir=cache_dir))

elif (source == "file"):
pipeline.set_source(
MultiFileSource(config,
Expand Down

0 comments on commit 80332fa

Please sign in to comment.