Skip to content

Commit

Permalink
Improving inference model lookup using Davids suggestions
Browse files Browse the repository at this point in the history
  • Loading branch information
mdemoret-nv committed Aug 29, 2022
1 parent 5f36840 commit 2cfff73
Show file tree
Hide file tree
Showing 7 changed files with 753 additions and 35 deletions.
2 changes: 1 addition & 1 deletion examples/dfp_workflow/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ services:
jupyter:
restart: always
build:
context: ./morpheus_jupyter
context: ./jupyter
args:
- MORPHEUS_CONTAINER_VERSION=v22.08.00a-runtime
image: dfp_morpheus_jupyter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ WORKDIR /work/examples/dfp_workflow/morpheus
# # This will get used by pipelines for the --s3_cache option
# ENV DFP_S3_CACHE="/work/examples/dfp_workflow/morpheus/.s3_cache"

# Set the tracking URI for mlflow
ENV MLFLOW_TRACKING_URI="http://mlflow:5000"

# Copy the sources
COPY . ./

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,6 @@
logger = logging.getLogger("morpheus.{}".format(__name__))


def get_registered_models():
client = MlflowClient(os.environ.get('DFP_TRACKING_URI'))
models = client.list_registered_models()
return set(model.name for model in models)


REGISTERED_MODELS = get_registered_models()


class ModelCache:

def __init__(self, reg_model_name: str, model_uri: str) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,19 @@
import logging
import time
import typing
from datetime import date
from datetime import datetime

import numpy as np
import srf
from srf.core import operators as ops

import cudf
from examples.dfp_workflow.morpheus.dfp.stages.multi_dfp_message import DFPMessageMeta

from morpheus.config import Config
from morpheus.messages.multi_ae_message import MultiAEMessage
from morpheus.pipeline.single_port_stage import SinglePortStage
from morpheus.pipeline.stream_pair import StreamPair

from .dfp_autoencoder import DFPAutoEncoder
from .multi_dfp_message import DFPMessageMeta

logger = logging.getLogger("morpheus.{}".format(__name__))

Expand Down Expand Up @@ -93,7 +90,7 @@ def on_data(self, message: MultiAEMessage):
if (extracted_events is None):
return None

return DFPMessageMeta(extracted_events, user_id=message.user_id)
return DFPMessageMeta(extracted_events, user_id=message.meta.user_id)

def _build_single(self, builder: srf.Builder, input_stream: StreamPair) -> StreamPair:

Expand Down
527 changes: 527 additions & 0 deletions examples/dfp_workflow/morpheus/dfp_morpheus_duo_inference.ipynb

Large diffs are not rendered by default.

236 changes: 218 additions & 18 deletions examples/dfp_workflow/morpheus/dfp_morpheus_duo_training.ipynb

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions examples/dfp_workflow/morpheus/dfp_pipeline_duo.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@
"'file:///mlruns' relative to the current directory"))
def run_pipeline(train_users, skip_user: typing.Tuple[str], duration, cache_dir, sample_rate_s, **kwargs):

source = "s3"
source = "file"

# To include the generic, we must be training all or generic
include_generic = train_users == "all" or train_users == "generic"
Expand Down Expand Up @@ -273,7 +273,7 @@ def s3_date_extractor_duo(s3_object):

pipeline.add_stage(MonitorStage(config, description="Inference rate", smoothing=0.001))

pipeline.add_stage(DFPPostprocessingStage(config, z_score_threshold=5.0))
pipeline.add_stage(DFPPostprocessingStage(config, z_score_threshold=3.0))

if (source == "file"):
pipeline.add_stage(WriteToFileStage(config, filename="dfp_detections.csv", overwrite=True))
Expand Down

0 comments on commit 2cfff73

Please sign in to comment.