Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

nlp_si_detection example improvements #193

Merged
7 commits merged into from
Jun 27, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 54 additions & 18 deletions examples/nlp_si_detection/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ The following command line is the entire command to build and launch the pipelin
```bash
export MORPHEUS_ROOT=../..
# Launch Morpheus printing debug messages
morpheus --debug --log_level=DEBUG \
morpheus --log_level=DEBUG \
`# Run a pipeline with 8 threads and a model batch size of 32 (Must match Triton config)` \
run --num_threads=8 --pipeline_batch_size=1024 --model_max_batch_size=32 \
`# Specify a NLP pipeline with 256 sequence length (Must match Triton config)` \
Expand All @@ -116,16 +116,14 @@ morpheus --debug --log_level=DEBUG \
`# 3rd Stage: Preprocessing converts the input data into BERT tokens` \
preprocess --vocab_hash_file=$MORPHEUS_ROOT/morpheus/data/bert-base-uncased-hash.txt --do_lower_case=True --truncation=True \
`# 4th Stage: Send messages to Triton for inference. Specify the model loaded in Setup` \
inf-triton --model_name=sid-minibert-onnx --server_url=localhost:8001 --force_convert_inputs=True \
inf-triton --model_name=sid-minibert-onnx --server_url=localhost:8000 --force_convert_inputs=True \
`# 5th Stage: Monitor stage prints throughput information to the console` \
monitor --description "Inference Rate" --smoothing=0.001 --unit inf \
`# 6th Stage: Add results from inference to the messages` \
add-class \
`# 7th Stage: Filtering removes any messages that did not detect SI` \
filter \
`# 8th Stage: Convert from objects back into strings` \
`# 7th Stage: Convert from objects back into strings` \
serialize --exclude '^_ts_' \
`# 9th Stage: Write out the JSON lines to the detections.jsonlines file` \
`# 8th Stage: Write out the JSON lines to the detections.jsonlines file` \
to-file --filename=detections.jsonlines --overwrite
```

Expand All @@ -150,7 +148,7 @@ Config:
"secret_keys",
"user"
],
"debug": true,
"debug": false,
"edge_buffer_size": 128,
"feature_length": 256,
"fil": null,
Expand All @@ -165,29 +163,67 @@ CPP Enabled: True
====Registering Pipeline====
====Registering Pipeline Complete!====
====Starting Pipeline====
====Pipeline Started====
====Building Pipeline====
Added source: <from-file-0; FileSourceStage(filename=/home/dagardner/work/examples/data/pcap_dump.jsonlines, iterative=False, file_type=FileTypes.Auto, repeat=1, filter_null=True, cudf_kwargs=None)>
Added source: <from-file-0; FileSourceStage(filename=/home/dagardner/work/morpheus/examples/data/pcap_dump.jsonlines, iterative=False, file_type=FileTypes.Auto, repeat=1, filter_null=True, cudf_kwargs=None)>
└─> morpheus.MessageMeta
Added stage: <deserialize-1; DeserializeStage()>
└─ morpheus.MessageMeta -> morpheus.MultiMessage
Added stage: <preprocess-nlp-2; PreprocessNLPStage(vocab_hash_file=/home/dagardner/work/morpheus/data/bert-base-uncased-hash.txt, truncation=True, do_lower_case=True, add_special_tokens=False, stride=-1)>
Added stage: <preprocess-nlp-2; PreprocessNLPStage(vocab_hash_file=/home/dagardner/work/morpheus/morpheus/data/bert-base-uncased-hash.txt, truncation=True, do_lower_case=True, add_special_tokens=False, stride=-1)>
└─ morpheus.MultiMessage -> morpheus.MultiInferenceNLPMessage
Added stage: <inference-3; TritonInferenceStage(model_name=sid-minibert-onnx, server_url=localhost:8001, force_convert_inputs=True, use_shared_memory=False)>
Added stage: <inference-3; TritonInferenceStage(model_name=sid-minibert-onnx, server_url=localhost:8000, force_convert_inputs=True, use_shared_memory=False)>
└─ morpheus.MultiInferenceNLPMessage -> morpheus.MultiResponseProbsMessage
Added stage: <monitor-4; MonitorStage(description=Inference Rate, smoothing=0.001, unit=inf, delayed_start=False, determine_count_fn=None)>
└─ morpheus.MultiResponseProbsMessage -> morpheus.MultiResponseProbsMessage
Added stage: <add-class-5; AddClassificationsStage(threshold=0.5, labels=[], prefix=)>
└─ morpheus.MultiResponseProbsMessage -> morpheus.MultiResponseProbsMessage
Added stage: <filter-6; FilterDetectionsStage(threshold=0.5)>
└─ morpheus.MultiResponseProbsMessage -> morpheus.MultiResponseProbsMessage
Added stage: <serialize-7; SerializeStage(include=[], exclude=['^_ts_'], fixed_columns=True)>
Added stage: <serialize-6; SerializeStage(include=[], exclude=['^_ts_'], fixed_columns=True)>
└─ morpheus.MultiResponseProbsMessage -> morpheus.MessageMeta
Added stage: <to-file-8; WriteToFileStage(filename=detections.jsonlines, overwrite=True, file_type=FileTypes.Auto)>
Added stage: <to-file-7; WriteToFileStage(filename=detections.jsonlines, overwrite=True, file_type=FileTypes.Auto)>
└─ morpheus.MessageMeta -> morpheus.MessageMeta
====Building Pipeline Complete!====
Starting! Time: 1651079123.1867409
====Pipeline Started====
Inference Rate[Complete]: 93085inf [00:06, 153.30inf/s]
Starting! Time: 1656106589.013337
Inference Rate[Complete]: 93085inf [00:56, 1637.89inf/s]
====Pipeline Complete====
```

The output file `detections.jsonlines` will contain PCAP messages that contain some SI (any class with a predection greater that 0.5).
The output file `detections.jsonlines` will contain the original PCAP messages with the following additional fields added:
* address
* bank_acct
* credit_card
* email
* govt_id
* name
* password
* phone_num
* secret_keys
* user

The value for these fields will either be a `1` indicating a decection and a `0` indicating no detection. An example row with a detection looks like:
```json
{
"timestamp": 1616381019580,
"host_ip": "10.188.40.56",
"data_len": "129",
"data": "\"{\\\"X-Postmark-Server-Token\\\": \\\"76904958 O7FWqd9p TzIBfSYk\\\"}\"",
"src_mac": "04:3f:72:bf:af:74",
"dest_mac": "b4:a9:fc:3c:46:f8",
"protocol": "6",
"src_ip": "10.20.16.248",
"dest_ip": "10.244.0.60",
"src_port": "51374",
"dest_port": "80",
"flags": "24",
"is_pii": false,
"address": 0,
"bank_acct": 0,
"credit_card": 0,
"email": 0,
"govt_id": 0,
"name": 0,
"password": 0,
"phone_num": 0,
"secret_keys": 1,
"user": 0
}
```
18 changes: 18 additions & 0 deletions morpheus/_lib/src/python_modules/messages.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
#include <morpheus/objects/tensor.hpp>
#include <morpheus/utilities/cudf_util.hpp>

#include <srf/node/edge_connector.hpp>

#include <pybind11/cast.h>
#include <pybind11/functional.h> // IWYU pragma: keep
#include <pybind11/pybind11.h>
Expand Down Expand Up @@ -68,6 +70,22 @@ PYBIND11_MODULE(messages, m)
// Allows python objects to keep DataTable objects alive
py::class_<IDataTable, std::shared_ptr<IDataTable>>(m, "DataTable");

// EdgeConnectors for derived classes of MultiMessage to MultiMessage
srf::node::EdgeConnector<std::shared_ptr<morpheus::MultiInferenceMessage>,
std::shared_ptr<morpheus::MultiMessage>>::register_converter();

srf::node::EdgeConnector<std::shared_ptr<morpheus::MultiInferenceFILMessage>,
std::shared_ptr<morpheus::MultiMessage>>::register_converter();

srf::node::EdgeConnector<std::shared_ptr<morpheus::MultiInferenceNLPMessage>,
std::shared_ptr<morpheus::MultiMessage>>::register_converter();

srf::node::EdgeConnector<std::shared_ptr<morpheus::MultiResponseMessage>,
std::shared_ptr<morpheus::MultiMessage>>::register_converter();

srf::node::EdgeConnector<std::shared_ptr<morpheus::MultiResponseProbsMessage>,
std::shared_ptr<morpheus::MultiMessage>>::register_converter();

py::class_<MessageMeta, std::shared_ptr<MessageMeta>>(m, "MessageMeta")
.def(py::init<>(&MessageMetaInterfaceProxy::init_python), py::arg("df"))
.def_property_readonly("count", &MessageMetaInterfaceProxy::count)
Expand Down