Skip to content

Commit

Permalink
DFP pipeline module (#510)
Browse files Browse the repository at this point in the history
- Morpheus modules implementation for DFP (Azure, Duo) pipeline training and preprocessing stages
- Added modules utility
- Moved `column_info.py` to `morpheus.utils`
- Added tests
- Updated digital fingerprinting production example README.md
- Added dask and distributed packages
- Updated DFP production examples notebooks

Authors:
  - Bhargav Suryadevara (https://github.com/bsuryadevara)
  - https://github.com/bsuryadev

Approvers:
  - Michael Demoret (https://github.com/mdemoret-nv)

URL: #510
  • Loading branch information
bsuryadevara authored Jan 24, 2023
1 parent 3a13a03 commit 83479a5
Show file tree
Hide file tree
Showing 41 changed files with 4,021 additions and 318 deletions.
172 changes: 165 additions & 7 deletions docs/source/developer_guide/guides/5_digital_fingerprinting.md
Original file line number Diff line number Diff line change
Expand Up @@ -338,12 +338,12 @@ Other attributes which might be needed:
### Schema Definition

#### DataFrame Input Schema (`DataFrameInputSchema`)
The `DataFrameInputSchema` ([examples/digital_fingerprinting/production/morpheus/dfp/utils/column_info.py](/examples/digital_fingerprinting/production/morpheus/dfp/utils/column_info.py)) class defines the schema specifying the columns to be included in the output `DataFrame`. Within the DFP pipeline there are two stages where pre-processing is performed, the `DFPFileToDataFrameStage` stage and the `DFPPreprocessingStage`. This decoupling of the pre-processing stages from the actual operations needed to be performed allows for the actual schema to be user-defined in the pipeline and re-usability of the stages. It is up to the user to define the fields which will appear in the `DataFrame`. Any column in the input data that isn't specified in either `column_info` or `preserve_columns` constructor arguments will not appear in the output. The exception to this are JSON fields, specified in the `json_columns` argument which defines json fields which are to be normalized.
The `DataFrameInputSchema` ([morpheus/utils/column_info.py](/morpheus/utils/column_info.py)) class defines the schema specifying the columns to be included in the output `DataFrame`. Within the DFP pipeline there are two stages where pre-processing is performed, the `DFPFileToDataFrameStage` stage and the `DFPPreprocessingStage`. This decoupling of the pre-processing stages from the actual operations needed to be performed allows for the actual schema to be user-defined in the pipeline and re-usability of the stages. It is up to the user to define the fields which will appear in the `DataFrame`. Any column in the input data that isn't specified in either `column_info` or `preserve_columns` constructor arguments will not appear in the output. The exception to this are JSON fields, specified in the `json_columns` argument which defines json fields which are to be normalized.

It is important to note that the fields defined in `json_columns` are normalized prior to the processing of the fields in `column_info`, allowing for processing to be performed on fields nested in json columns. For example, say we had a JSON field named `event` containing a key named `timestamp` which in the JSON data appears as an ISO 8601 formatted date string, we could ensure it was converted to a datetime object to downstream stages with the following:
```python
from dfp.utils.column_info import DataFrameInputSchema
from dfp.utils.column_info import DateTimeColumn
from morpheus.utils.column_info import DataFrameInputSchema
from morpheus.utils.column_info import DateTimeColumn
```
```python
schema = DataFrameInputSchema(
Expand Down Expand Up @@ -392,7 +392,7 @@ Subclass of `ColumnInfo`, adds the ability to also perform a rename.
#### Boolean Column (`BoolColumn`)
Subclass of `RenameColumn`, adds the ability to map a set custom values as boolean values. For example say we had a string input field containing one of 5 possible enum values: `OK`, `SUCCESS`, `DENIED`, `CANCELED` and `EXPIRED` we could map these values into a single boolean field as:
```python
from dfp.utils.column_info import BoolColumn
from morpheus.utils.column_info import BoolColumn
```
```python
field = BoolColumn(name="result",
Expand Down Expand Up @@ -480,14 +480,14 @@ The `DFPFileBatcherStage` ([`examples/digital_fingerprinting/production/morpheus
| `start_time` | `datetime` | Optional, default=`None`. When not None incoming data files will be filtered, excluding any files created prior to `start_time` |
| `end_time` | `datetime` | Optional, default=`None`. When not None incoming data files will be filtered, excluding any files created after `end_time` |

For situations where the creation date of the log file is encoded in the filename, the `date_extractor` in the [`examples/digital_fingerprinting/production/morpheus/dfp/utils/file_utils.py`](/examples/digital_fingerprinting/production/morpheus/dfp/utils/file_utils.py) module can be used. The `date_extractor` assumes that the timestamps are localized to UTC and will need to have a regex pattern bound to it before being passed in as a parameter to `DFPFileBatcherStage`. The regex pattern will need to contain the following named groups: `year`, `month`, `day`, `hour`, `minute`, `second`, and optionally `microsecond`. In cases where the regular expression does not match the `date_extractor` function will fallback to using the modified time of the file.
For situations where the creation date of the log file is encoded in the filename, the `date_extractor` in the [`morpheus/utils/file_utils.py`](/morpheus/utils/file_utils.py) module can be used. The `date_extractor` assumes that the timestamps are localized to UTC and will need to have a regex pattern bound to it before being passed in as a parameter to `DFPFileBatcherStage`. The regex pattern will need to contain the following named groups: `year`, `month`, `day`, `hour`, `minute`, `second`, and optionally `microsecond`. In cases where the regular expression does not match the `date_extractor` function will fallback to using the modified time of the file.

For input files containing an ISO 8601 formatted date string the `iso_date_regex` regex can be used ex:
```python
from functools import partial

from dfp.utils.file_utils import date_extractor
from dfp.utils.file_utils import iso_date_regex
from morpheus.utils.file_utils import date_extractor
from dfp.utils.regex_utils import iso_date_regex
```
```python
# Batch files into buckets by time. Use the default ISO date extractor from the filename
Expand Down Expand Up @@ -648,3 +648,161 @@ This stage filters the output from the inference stage for any anomalous message

#### Post Processing Stage (`DFPPostprocessingStage`)
This stage adds a new `event_time` column to the DataFrame indicating the time which Morpheus detected the anomalous messages, and replaces any `NAN` values with the a string value of `'NaN'`.

## Morpheus Pipeline with Modules

A module is a type of work unit that can be utilized in the Morpheus stage and can be registered to a MRC segment module registry. Modules are beneficial when there is a possibility for the work-unit to be reused. We can load the module from the registry into a multiple contexts without having to be familiar with the inner workings of the Module; all that is needed is that we pass an input and it returns the output.

Let's first look at the module implementation structure before diving deeper into the DFP Training pipeline as a module.

> Note: Modules can be used for more than just creating middle nodes to connect sources and sinks. Additionally, it can be used to construct Source and Sink nodes.
```py
import mrc
import typing

from morpheus.utils.module_utils import get_module_config
from morpheus.utils.module_utils import register_module

@register_module("SimpleModule", "morpheus_modules")
def module_init(builder: mrc.Builder):

module_id = "SimpleModule"

config: typing.Dict[str, str] = get_module_config(module_id, builder)

sep = config.get("sep", ",")

def on_data(message: str):

# Your implementation goes here...

def node_fn(obs: mrc.Observable, sub: mrc.Subscriber):
obs.pipe(ops.map(on_data), ops.filter(lambda x: x is not None)).subscribe(sub)

# Here we are creating a node.
node = builder.make_node_full(module_id, node_fn)

# Register input and output port name for a module.
builder.register_module_input("<input port name>", node)
builder.register_module_output("<output port name>", node)

```

The `register_module` decorator on the module initializer function registers the module with the `SimpleModule` (module_id) and the `morpheus_modules` (namespace).

> Note: While registering the module, the user has the opportunity to choose the input and output port names. When the module has been registered. To obtain the input /output port connection, the same names must be used.

Required key meta fields for module configuration as shown below.
- `module_id` : Unique identifier for a module in the module registry.
- `module_name` : Specifies the module name.
- `namespace` : Virtually cluster the modules.

```py
# Module configuration
module_config = {
"module_id": "SimpleModule",
"module_name": "simple_module",
"namespace": "morpheus_modules",
"sep": ":"
}
```

Module must be packaged as a stage, as illustrated below, in order to be used in the Morpheus pipeline.

```py
from morpheus.config import Config
from morpheus.stages.general.linear_modules_stage import LinearModulesStage

# Morpheus configuration
c = Config()

module_stage = LinearModulesStage(c,
module_config,
input_port_name="<input port name>",
output_port_name="<input port name>",
output_type="<module output type>")


```
[LinearModulesStage](/morpheus/stages/general/linear_modules_stage.py) is an utility stage that loads an existing, registered, MRC SegmentModule and wraps it as a Morpheus SinglePortStage.

| Argument | Type | Description |
| -------- | ---- | ----------- |
| `c` | `morpheus.config.Config` | Morpheus config object |
| `module_config` | `dict` or `None` | Module configuration |
| `input_port_name` | `str` | Name of a module input port, as used during registration |
| `output_port_name` | `str` | Name of a module output port, as used during registration |
| `output_type` | `typing.Any` [default] | Module output data type |


A module can serve as a wrapper for a chain of complex constructs-containing child modules. The example below demonstrates how to establish a chain module, presuming `modules_1` through `module_n` are already registered.

```py
import mrc
import typing

from morpheus.utils.module_utils import get_module_config
from morpheus.utils.module_utils import register_module

@register_module("ChainModule", "morpheus_modules")
def simple_module(builder: mrc.Builder):

module_id = "ChainModule"

config: typing.Dict[str, str] = get_module_config(module_id, builder)

# Get module configurations
module_1_config = config.get("module_1", None)
...
...
module_n_config = config.get("module_n", None)

# Load modules from the registry
module_1 = load_module(module_1_config, builder=builder)
...
...
module_n = load_module(module_n_config, builder=builder)

# Make an edge between the modules.
# input/output port names used at 'builder.register_module_input' and 'builder.register_module_output'.
builder.make_edge(module_1.output_port("<output port name>"), module_2.input_port("<input port name>"))
...
...
builder.make_edge(module_n-1.output_port("<output port name>"), module_n.input_port("<input port name>"))

# Register input and output port for a module.
builder.register_module_input("<your input port name>", module_1.input_port("<input port name>"))
builder.register_module_output("<your output port name>", module_n.output_port("<output port name>"))
```

Let's look at the DFP Training process based on modules. On a higher level, the complete DFP training process has been divided into two modules.

* [**DFPPreprocessing**](/examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_preprocessing.py)

This module constructs a chain module by combining the separate modules listed below into a single module that contains all of the internal components for preprocessing the Azure Active Directory and Duo Authentication logs.
- [FileBatcher](/morpheus/modules/file_batcher.py)
- [FileToDF](/morpheus/modules/file_to_df.py)
- [DFPSplitUsers](/examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_split_users.py)
- [DFPRollingWindow](/examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_rolling_window.py)
- [DFPDataPrep](/examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_data_prep.py)

* [**DFPModelTrainDeploy**](/examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_model_train_deploy.py)

This module creates a chain module by integrating the individual modules described below into a single module that incorporates all of the internals for consuming preprocessed data, training the model, and writing the trained model to MLFLow server to serve inference requests.
- [DFPTraining](/examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_training.py)
- [MLFlowModelWriter](/morpheus/modules/mlflow_model_writer.py)


#### Run DFP Training Pipeline with Modules
To run the DFP pipelines using modules with the example datasets, within the container run:

* Duo Training Pipeline
```bash
python dfp_duo_modules_pipeline.py --train_users=all --start_time="2022-08-01" --input_file="/workspace/examples/data/dfp/duo-training-data/*.json"
```
* Azure Training Pipeline
```bash
python dfp_azure_modules_pipeline.py --train_users=all --start_time="2022-08-01" --input_file="/workspace/examples/data/dfp/azure-training-data/*.json"
```
1 change: 1 addition & 0 deletions examples/digital_fingerprinting/production/conda_env.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ dependencies:
- boto3
- dask
- dill
- distributed
- kfp
- librdkafka
- mlflow>1.29.0,<2
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright (c) 2022-2023, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# Copyright (c) 2022-2023, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import pickle
import time

import mrc
from mrc.core import operators as ops

from morpheus.utils.column_info import process_dataframe
from morpheus.utils.module_ids import MODULE_NAMESPACE
from morpheus.utils.module_utils import get_module_config
from morpheus.utils.module_utils import register_module

from ..messages.multi_dfp_message import MultiDFPMessage
from ..utils.module_ids import DFP_DATA_PREP

logger = logging.getLogger(__name__)


@register_module(DFP_DATA_PREP, MODULE_NAMESPACE)
def dfp_data_prep(builder: mrc.Builder):
"""
This module function prepares data for either inference or model training.
Parameters
----------
builder : mrc.Builder
Pipeline budler instance.
"""

config = get_module_config(DFP_DATA_PREP, builder)

schema_config = config.get("schema", None)
schema_str = schema_config.get("schema_str", None)
encoding = schema_config.get("encoding", None)
timestamp_column_name = config.get("timestamp_column_name", None)

schema = pickle.loads(bytes(schema_str, encoding))

def process_features(message: MultiDFPMessage):
if (message is None):
return None

start_time = time.time()

# Process the columns
df_processed = process_dataframe(message.get_meta_dataframe(), schema)

# Apply the new dataframe, only the rows in the offset
message.set_meta_dataframe(list(df_processed.columns), df_processed)

if logger.isEnabledFor(logging.DEBUG):
duration = (time.time() - start_time) * 1000.0

logger.debug("Preprocessed %s data for logs in %s to %s in %s ms",
message.mess_count,
message.get_meta(timestamp_column_name).min(),
message.get_meta(timestamp_column_name).max(),
duration)

return message

def node_fn(obs: mrc.Observable, sub: mrc.Subscriber):
obs.pipe(ops.map(process_features)).subscribe(sub)

node = builder.make_node_full(DFP_DATA_PREP, node_fn)

builder.register_module_input("input", node)
builder.register_module_output("output", node)
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Copyright (c) 2022-2023, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging

import dfp.modules.dfp_training # noqa: F401
import mrc

import morpheus.modules.mlflow_model_writer # noqa: F401
from morpheus.utils.module_ids import MLFLOW_MODEL_WRITER
from morpheus.utils.module_ids import MODULE_NAMESPACE
from morpheus.utils.module_utils import get_module_config
from morpheus.utils.module_utils import load_module
from morpheus.utils.module_utils import register_module

from ..utils.module_ids import DFP_MODEL_TRAIN_DEPLOY
from ..utils.module_ids import DFP_TRAINING

logger = logging.getLogger(__name__)


@register_module(DFP_MODEL_TRAIN_DEPLOY, MODULE_NAMESPACE)
def dfp_model_train_deploy(builder: mrc.Builder):
"""
This module function allows for the consolidation of multiple dfp training and mlflow model deployment modules into
a single module.
Parameters
----------
builder : mrc.Builder
Pipeline budler instance.
"""

config = get_module_config(DFP_MODEL_TRAIN_DEPLOY, builder)

dfp_training_conf = config.get(DFP_TRAINING, None)
mlflow_model_writer_conf = config.get(MLFLOW_MODEL_WRITER, None)

dfp_training_module = load_module(dfp_training_conf, builder=builder)
mlflow_model_writer_module = load_module(mlflow_model_writer_conf, builder=builder)

# Make an edge between the modules.
builder.make_edge(dfp_training_module.output_port("output"), mlflow_model_writer_module.input_port("input"))

# Register input and output port for a module.
builder.register_module_input("input", dfp_training_module.input_port("input"))
builder.register_module_output("output", mlflow_model_writer_module.output_port("output"))
Loading

0 comments on commit 83479a5

Please sign in to comment.