From cef498ab6fe4c25d0b18190a358b0e72333163a7 Mon Sep 17 00:00:00 2001 From: David Gardner <96306125+dagardner-nv@users.noreply.github.com> Date: Mon, 15 May 2023 08:49:41 -0700 Subject: [PATCH] Tests for gnn_fraud_detection_pipeline & ransomware_detection (#904) * Renamed `examples/gnn_fraud_detection_pipeline/requirements.yml` to `docker/conda/environments/cuda11.8_examples.yml`, replacing the original with a symlink. * Test stage includes additional packages needed for gnn_fraud_detection_pipeline & ransomware_detection pipelines * Locally pytest will skip these if the deps are missing * Add a new `--fail_missing` flag which will cause tests to fail instead of skip on missing deps * Remove unused redundant apt install of nodejs & npm * Use `openjdk-11-jre-headless` for Kafka instead of `openjdk-11-jdk`, removing un-needed deps from the image like alsa and GL. gnn_fraud_detection_pipeline changes: * Perform pre-allocation of needed columns in `ClassificationStage` * Replace deprecated usage of `StellarGraph` constructor in `FraudGraphConstructionStage` with `StellarGraph.from_networkx` * Work-around Stellargraph/Python 3.10 incompatibility fixes #907 ransomware_detection changes: * Explicitly exclude `ldrmodules_df_path` from `model_features` * Document that C++ execution is currently unsupported * Move nested methods in `CreateFeaturesRWStage` to methods on the class allowing them to be tested * Perform pre-allocation of needed columns in `PreprocessingRWStage` * Update dependencies for training script, and update due to API changes in Tensorflow Authors: - David Gardner (https://github.com/dagardner-nv) Approvers: - Michael Demoret (https://github.com/mdemoret-nv) URL: https://github.com/nv-morpheus/Morpheus/pull/904 --- .github/workflows/ci_pipe.yml | 3 + .github/workflows/pull_request.yml | 4 +- ci/runner/Dockerfile | 19 +- ci/scripts/github/common.sh | 64 +++-- ci/scripts/github/docs.sh | 3 - ci/scripts/github/test.sh | 2 +- .../conda/environments/cuda11.8_examples.yml | 34 +++ docs/conda_docs.yml | 9 +- docs/requirements.txt | 22 -- .../gnn_fraud_detection_pipeline/README.md | 3 + .../requirements.yml | 4 +- .../stages/classification_stage.py | 6 +- .../stages/graph_construction_stage.py | 2 +- .../config/ransomware_detection.yaml | 9 +- examples/ransomware_detection/run.py | 10 +- .../stages/create_features.py | 105 ++++---- .../stages/preprocessing.py | 8 +- .../fraud-detection-models/requirements.txt | 10 +- .../fraud-detection-models/training.py | 2 +- .../stages/input/appshield_source_stage.py | 2 +- pyproject.toml | 3 +- tests/conftest.py | 233 ++++++++++-------- .../gnn_fraud_detection_pipeline/conftest.py | 140 +++++++++++ .../test_classification_stage.py | 68 +++++ .../test_graph_construction_stage.py | 119 +++++++++ .../test_graph_sage_stage.py | 103 ++++++++ .../log_parsing/test_postprocessing.py | 2 +- .../examples/ransomware_detection/conftest.py | 76 ++++++ .../test_create_features.py | 198 +++++++++++++++ .../test_preprocessing.py | 164 ++++++++++++ tests/test_add_classifications_stage.py | 12 +- tests/test_add_scores_stage.py | 12 +- tests/test_deserialize_stage_pipe.py | 2 +- tests/test_file_in_out.py | 2 +- tests/test_message_meta.py | 4 +- tests/test_multi_message.py | 54 ++-- tests/test_write_to_kafka_stage_pipe.py | 9 +- .../inductive_emb.csv | 3 + .../predictions.csv | 3 + .../ransomware_detection/dask_results.csv | 3 + tests/utils/__init__.py | 18 ++ tests/utils/dataset_manager.py | 46 +++- 42 files changed, 1299 insertions(+), 296 deletions(-) create mode 100644 docker/conda/environments/cuda11.8_examples.yml delete mode 100644 docs/requirements.txt create mode 100644 tests/examples/gnn_fraud_detection_pipeline/conftest.py create mode 100644 tests/examples/gnn_fraud_detection_pipeline/test_classification_stage.py create mode 100644 tests/examples/gnn_fraud_detection_pipeline/test_graph_construction_stage.py create mode 100644 tests/examples/gnn_fraud_detection_pipeline/test_graph_sage_stage.py create mode 100644 tests/examples/ransomware_detection/conftest.py create mode 100644 tests/examples/ransomware_detection/test_create_features.py create mode 100644 tests/examples/ransomware_detection/test_preprocessing.py create mode 100644 tests/tests_data/examples/gnn_fraud_detection_pipeline/inductive_emb.csv create mode 100644 tests/tests_data/examples/gnn_fraud_detection_pipeline/predictions.csv create mode 100644 tests/tests_data/examples/ransomware_detection/dask_results.csv diff --git a/.github/workflows/ci_pipe.yml b/.github/workflows/ci_pipe.yml index 45f35f0500..7a31b2f399 100644 --- a/.github/workflows/ci_pipe.yml +++ b/.github/workflows/ci_pipe.yml @@ -133,6 +133,7 @@ jobs: env: NVIDIA_VISIBLE_DEVICES: ${{ env.NVIDIA_VISIBLE_DEVICES }} PARALLEL_LEVEL: '10' + MERGE_EXAMPLES_YAML: '1' strategy: fail-fast: true @@ -164,6 +165,8 @@ jobs: username: '$oauthtoken' password: ${{ secrets.NGC_API_KEY }} image: ${{ inputs.container }} + env: + MERGE_DOCS_YAML: '1' strategy: fail-fast: true diff --git a/.github/workflows/pull_request.yml b/.github/workflows/pull_request.yml index 34d2b58bcf..4698370584 100644 --- a/.github/workflows/pull_request.yml +++ b/.github/workflows/pull_request.yml @@ -46,7 +46,7 @@ jobs: uses: ./.github/workflows/ci_pipe.yml with: run_check: ${{ startsWith(github.ref_name, 'pull-request/') }} - container: nvcr.io/ea-nvidia-morpheus/morpheus:morpheus-ci-build-230414 - test_container: nvcr.io/ea-nvidia-morpheus/morpheus:morpheus-ci-test-230414 + container: nvcr.io/ea-nvidia-morpheus/morpheus:morpheus-ci-build-230510 + test_container: nvcr.io/ea-nvidia-morpheus/morpheus:morpheus-ci-test-230510 secrets: NGC_API_KEY: ${{ secrets.NGC_API_KEY }} diff --git a/ci/runner/Dockerfile b/ci/runner/Dockerfile index fe3181ebb9..372b80cd4e 100644 --- a/ci/runner/Dockerfile +++ b/ci/runner/Dockerfile @@ -45,6 +45,7 @@ RUN apt update && \ COPY ./docker/conda/environments/* /tmp/conda/ RUN CONDA_ALWAYS_YES=true /opt/conda/bin/mamba env create -n ${PROJ_NAME} -q --file /tmp/conda/cuda${CUDA_SHORT_VER}_dev.yml && \ + /opt/conda/bin/mamba install -n morpheus -c conda-forge "conda-merge>=0.2" && \ sed -i "s/conda activate base/conda activate ${PROJ_NAME}/g" ~/.bashrc && \ conda clean -afy && \ rm -rf /tmp/conda @@ -68,25 +69,33 @@ RUN apt update && \ apt clean && \ rm -rf /var/lib/apt/lists/* - # ============ test ================== FROM base as test # Add any test only dependencies here. ARG PROJ_NAME +ARG CUDA_SHORT_VER RUN apt update && \ DEBIAN_FRONTEND=noninteractive TZ=Etc/UTC \ apt install --no-install-recommends -y \ - nodejs \ - npm \ - openjdk-11-jdk && \ + openjdk-11-jre-headless && \ apt clean && \ rm -rf /var/lib/apt/lists/* +COPY ./docker/conda/environments/cuda${CUDA_SHORT_VER}_examples.yml /tmp/conda/cuda${CUDA_SHORT_VER}_examples.yml + +# Install extra deps needed for gnn_fraud_detection_pipeline & ransomware_detection examples +RUN CONDA_ALWAYS_YES=true /opt/conda/bin/mamba env update -n ${PROJ_NAME} -q --file /tmp/conda/cuda${CUDA_SHORT_VER}_examples.yml && \ + conda clean -afy && \ + source activate ${PROJ_NAME} && \ + pip install --ignore-requires-python stellargraph==1.2.1 && \ + rm -rf /tmp/conda + # Install camouflage needed for unittests to mock a triton server -RUN npm install -g camouflage-server@0.9 && \ +RUN source activate ${PROJ_NAME} && \ + npm install -g camouflage-server@0.9 && \ npm cache clean --force # Install pytest-kafka diff --git a/ci/scripts/github/common.sh b/ci/scripts/github/common.sh index 5e88ca862c..cc431abcf3 100644 --- a/ci/scripts/github/common.sh +++ b/ci/scripts/github/common.sh @@ -60,6 +60,11 @@ export SCCACHE_REGION="us-east-2" export SCCACHE_IDLE_TIMEOUT=32768 #export SCCACHE_LOG=debug +export CONDA_ENV_YML=${MORPHEUS_ROOT}/docker/conda/environments/cuda${CUDA_VER}_dev.yml +export CONDA_EXAMPLES_YML=${MORPHEUS_ROOT}/docker/conda/environments/cuda${CUDA_VER}_examples.yml +export CONDA_DOCS_YML=${MORPHEUS_ROOT}/docs/conda_docs.yml +export PIP_REQUIREMENTS=${MORPHEUS_ROOT}/docker/conda/environments/requirements.txt + export CMAKE_BUILD_ALL_FEATURES="-DCMAKE_MESSAGE_CONTEXT_SHOW=ON -DMORPHEUS_CUDA_ARCHITECTURES=60;70;75;80 -DMORPHEUS_BUILD_BENCHMARKS=ON -DMORPHEUS_BUILD_EXAMPLES=ON -DMORPHEUS_BUILD_TESTS=ON -DMORPHEUS_USE_CONDA=ON -DMORPHEUS_PYTHON_INPLACE_BUILD=OFF -DMORPHEUS_PYTHON_BUILD_STUBS=ON -DMORPHEUS_USE_CCACHE=ON" export FETCH_STATUS=0 @@ -67,19 +72,38 @@ export FETCH_STATUS=0 print_env_vars function update_conda_env() { - rapids-logger "Checking for updates to conda env" - # Deactivate the environment first before updating conda deactivate - # Update the packages with --prune to remove any extra packages - rapids-mamba-retry env update -n morpheus --prune -q --file ${MORPHEUS_ROOT}/docker/conda/environments/cuda${CUDA_VER}_dev.yml + ENV_YAML=${CONDA_ENV_YML} + if [[ "${MERGE_EXAMPLES_YAML}" == "1" || "${MERGE_DOCS_YAML}" == "1" ]]; then + # Merge the dev, docs and examples envs, otherwise --prune will remove the examples packages + ENV_YAML=${condatmpdir}/merged_env.yml + YAMLS="${CONDA_ENV_YML}" + if [[ "${MERGE_EXAMPLES_YAML}" == "1" ]]; then + YAMLS="${YAMLS} ${CONDA_EXAMPLES_YML}" + fi + if [[ "${MERGE_DOCS_YAML}" == "1" ]]; then + YAMLS="${YAMLS} ${CONDA_DOCS_YML}" + fi + + # Conda is going to expect a requirements.txt file to be in the same directory as the env yaml + cp ${PIP_REQUIREMENTS} ${condatmpdir}/requirements.txt + + rapids-logger "Merging conda envs: ${YAMLS}" + conda run -n morpheus --live-stream conda-merge ${YAMLS} > ${ENV_YAML} + fi + + rapids-logger "Checking for updates to conda env" + + # Update the packages + rapids-mamba-retry env update -n morpheus --prune -q --file ${ENV_YAML} # Finally, reactivate conda activate morpheus rapids-logger "Final Conda Environment" - conda list + show_conda_info } function fetch_base_branch() { @@ -100,36 +124,6 @@ function fetch_base_branch() { rapids-logger "Base branch: ${BASE_BRANCH}" } -function fetch_s3() { - ENDPOINT=$1 - DESTINATION=$2 - if [[ "${USE_S3_CURL}" == "1" ]]; then - curl -f "${DISPLAY_URL}${ENDPOINT}" -o "${DESTINATION}" - FETCH_STATUS=$? - else - aws s3 cp --no-progress "${S3_URL}${ENDPOINT}" "${DESTINATION}" - FETCH_STATUS=$? - fi -} - -function restore_conda_env() { - - rapids-logger "Downloading build artifacts from ${DISPLAY_ARTIFACT_URL}" - fetch_s3 "${ARTIFACT_ENDPOINT}/conda_env.tar.gz" "${WORKSPACE_TMP}/conda_env.tar.gz" - fetch_s3 "${ARTIFACT_ENDPOINT}/wheel.tar.bz" "${WORKSPACE_TMP}/wheel.tar.bz" - - rapids-logger "Extracting" - mkdir -p /opt/conda/envs/morpheus - - # We are using the --no-same-owner flag since user id & group id's are inconsistent between nodes in our CI pool - tar xf "${WORKSPACE_TMP}/conda_env.tar.gz" --no-same-owner --directory /opt/conda/envs/morpheus - tar xf "${WORKSPACE_TMP}/wheel.tar.bz" --no-same-owner --directory ${MORPHEUS_ROOT} - - rapids-logger "Setting conda env" - conda activate morpheus - conda-unpack -} - function show_conda_info() { rapids-logger "Check Conda info" diff --git a/ci/scripts/github/docs.sh b/ci/scripts/github/docs.sh index 93301b040e..f3073dd28c 100755 --- a/ci/scripts/github/docs.sh +++ b/ci/scripts/github/docs.sh @@ -32,9 +32,6 @@ cd ${MORPHEUS_ROOT} git lfs install ${MORPHEUS_ROOT}/scripts/fetch_data.py fetch docs examples -rapids-logger "Installing Documentation dependencies" -mamba env update -f ${MORPHEUS_ROOT}/docs/conda_docs.yml - git submodule update --init --recursive rapids-logger "Configuring for docs" diff --git a/ci/scripts/github/test.sh b/ci/scripts/github/test.sh index 2ad57f7ce0..edefe6e4c2 100755 --- a/ci/scripts/github/test.sh +++ b/ci/scripts/github/test.sh @@ -69,7 +69,7 @@ done rapids-logger "Running Python tests" set +e -python -I -m pytest --run_slow --run_kafka \ +python -I -m pytest --run_slow --run_kafka --fail_missing \ --junit-xml=${REPORTS_DIR}/report_pytest.xml \ --cov=morpheus \ --cov-report term-missing \ diff --git a/docker/conda/environments/cuda11.8_examples.yml b/docker/conda/environments/cuda11.8_examples.yml new file mode 100644 index 0000000000..6a714c2269 --- /dev/null +++ b/docker/conda/environments/cuda11.8_examples.yml @@ -0,0 +1,34 @@ +# SPDX-FileCopyrightText: Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Additional dependencies needed by a some of the Morpheus examples. +# The intended usage is to first create the conda environment from the `cuda11.8_dev.yml` file, and then update the +# env with this file. ex: +# mamba env create -n morpheus --file docker/conda/environments/cuda11.8_dev.yml +# conda activate morpheus +# mamba env update -n morpheus --file docker/conda/environments/cuda11.8_examples.yml +channels: + - rapidsai + - nvidia + - conda-forge +dependencies: + - chardet=5.0.0 + - cuml=23.02 + - dask==2023.1.1 + - distributed==2023.1.1 + - pip + - pip: + # tensorflow exists in conda-forge but is tied to CUDA-11.3 + - tensorflow==2.12.0 diff --git a/docs/conda_docs.yml b/docs/conda_docs.yml index 4967eb88ef..cd77bee757 100644 --- a/docs/conda_docs.yml +++ b/docs/conda_docs.yml @@ -21,5 +21,10 @@ dependencies: - pip ####### Morpheus Pip Dependencies (keep sorted!) ####### - pip: - # Ensure all runtime requirements are installed using the requirements file - - --requirement requirements.txt + - breathe==4.34.0 + - exhale==0.3.6 + - ipython + - myst-parser==0.17.2 + - nbsphinx + - sphinx + - sphinx_rtd_theme diff --git a/docs/requirements.txt b/docs/requirements.txt deleted file mode 100644 index 1406ac0e68..0000000000 --- a/docs/requirements.txt +++ /dev/null @@ -1,22 +0,0 @@ -# SPDX-FileCopyrightText: Copyright (c) 2021, NVIDIA CORPORATION & AFFILIATES. All rights reserved. -# SPDX-License-Identifier: Apache-2.0 -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -breathe==4.34.0 -exhale==0.3.6 -ipython -myst-parser==0.17.2 -nbsphinx -sphinx -sphinx_rtd_theme diff --git a/examples/gnn_fraud_detection_pipeline/README.md b/examples/gnn_fraud_detection_pipeline/README.md index ef00c11576..1a2b0ed3a3 100644 --- a/examples/gnn_fraud_detection_pipeline/README.md +++ b/examples/gnn_fraud_detection_pipeline/README.md @@ -22,8 +22,11 @@ Prior to running the GNN fraud detection pipeline, additional requirements must ```bash mamba env update -n ${CONDA_DEFAULT_ENV} -f examples/gnn_fraud_detection_pipeline/requirements.yml +pip install --ignore-requires-python stellargraph==1.2.1 ``` +> **Note**: The `--ignore-requires-python` is needed because Stellargraph only officially supports Python versions prior to 3.9 ([stellargraph/stellargraph#1960](https://github.com/stellargraph/stellargraph/issues/1960)). + ## Running ##### Setup Env Variable diff --git a/examples/gnn_fraud_detection_pipeline/requirements.yml b/examples/gnn_fraud_detection_pipeline/requirements.yml index 85150ba54b..fa3f867ee9 100644 --- a/examples/gnn_fraud_detection_pipeline/requirements.yml +++ b/examples/gnn_fraud_detection_pipeline/requirements.yml @@ -20,7 +20,9 @@ channels: dependencies: - chardet=5.0.0 - cuml=23.02 + - dask==2023.1.1 + - distributed==2023.1.1 + - pip - pip: # tensorflow exists in conda-forge but is tied to CUDA-11.3 - - stellargraph==1.2.1 - tensorflow==2.12.0 diff --git a/examples/gnn_fraud_detection_pipeline/stages/classification_stage.py b/examples/gnn_fraud_detection_pipeline/stages/classification_stage.py index 2444d33438..3abdcd5561 100644 --- a/examples/gnn_fraud_detection_pipeline/stages/classification_stage.py +++ b/examples/gnn_fraud_detection_pipeline/stages/classification_stage.py @@ -21,6 +21,7 @@ import cuml from morpheus.cli.register_stage import register_stage +from morpheus.common import TypeId from morpheus.config import Config from morpheus.config import PipelineModes from morpheus.messages import MultiMessage @@ -48,6 +49,7 @@ def __init__(self, c: Config, model_xgb_file: str): super().__init__(c) self._xgb_model = cuml.ForestInference.load(model_xgb_file, output_class=True) + self._needed_columns.update({'node_id': TypeId.INT64, 'prediction': TypeId.FLOAT32}) @property def name(self) -> str: @@ -61,9 +63,11 @@ def supports_cpp_node(self): def _process_message(self, message: GraphSAGEMultiMessage): ind_emb_columns = message.get_meta(message.inductive_embedding_column_names) - message.set_meta("node_id", message.node_identifiers) + # The XGBoost model is returning two probabilities for the binary classification. The first (column 0) is + # probability that the transaction is in the benign class, and the second (column 1) is the probability that + # the transaction is in the fraudulent class. Added together the two values will always equal 1. prediction = self._xgb_model.predict_proba(ind_emb_columns).iloc[:, 1] message.set_meta("prediction", prediction) diff --git a/examples/gnn_fraud_detection_pipeline/stages/graph_construction_stage.py b/examples/gnn_fraud_detection_pipeline/stages/graph_construction_stage.py index 2bc377720c..cae0a6a81b 100644 --- a/examples/gnn_fraud_detection_pipeline/stages/graph_construction_stage.py +++ b/examples/gnn_fraud_detection_pipeline/stages/graph_construction_stage.py @@ -90,7 +90,7 @@ def _graph_construction(nodes, edges, node_features) -> "stellargraph.StellarGra for edge in edges: g_nx.add_edges_from(edge) - return StellarGraph(g_nx, node_type_name="ntype", node_features=node_features) + return StellarGraph.from_networkx(g_nx, node_type_attr='ntype', node_features=node_features) @staticmethod def _build_graph_features(dataset: pd.DataFrame) -> "stellargraph.StellarGraph": diff --git a/examples/ransomware_detection/config/ransomware_detection.yaml b/examples/ransomware_detection/config/ransomware_detection.yaml index ba1a3889bb..882c75baa7 100644 --- a/examples/ransomware_detection/config/ransomware_detection.yaml +++ b/examples/ransomware_detection/config/ransomware_detection.yaml @@ -13,8 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. ---- -file_extensions: +--- +file_extensions: - doc - docx - html @@ -38,7 +38,7 @@ file_extensions: - 7z - rar - msg -model_features: +model_features: - envirs_pathext - count_double_extension_count_handles - page_readonly_vads_count @@ -138,8 +138,9 @@ model_features: - page_execute_readwrite_vads_count - handles_df_type_unique_ratio - page_execute_readwrite_count +features: - ldrmodules_df_path -raw_columns: +raw_columns: - Base - Block - CommitCharge diff --git a/examples/ransomware_detection/run.py b/examples/ransomware_detection/run.py index bf1e7337d3..8950a7ee47 100644 --- a/examples/ransomware_detection/run.py +++ b/examples/ransomware_detection/run.py @@ -36,7 +36,7 @@ @click.command() @click.option('--debug', default=False) -@click.option('--use_cpp', default=False) +@click.option('--use_cpp', default=False, help="Enable C++ execution for this pipeline, currently this is unsupported.") @click.option( "--num_threads", default=os.cpu_count(), @@ -147,7 +147,10 @@ def run_pipeline(debug, cols_interested_plugins = rwd_conf['raw_columns'] # Feature columns used by the model. - feature_columns = rwd_conf['model_features'] + model_features = rwd_conf['model_features'] + + # Features to include in the DF, superset of model_features along with a few that the model doesn't receive + feature_columns = model_features + rwd_conf['features'] # File extensions. file_extns = rwd_conf['file_extensions'] @@ -185,8 +188,7 @@ def run_pipeline(debug, # Add preprocessing stage. # This stage generates snapshot sequences using sliding window for each pid_process. - pipeline.add_stage(PreprocessingRWStage(config, feature_columns=feature_columns[:-1], - sliding_window=sliding_window)) + pipeline.add_stage(PreprocessingRWStage(config, feature_columns=model_features, sliding_window=sliding_window)) # Add a monitor stage # This stage logs the metrics (msg/sec) from the above stage. diff --git a/examples/ransomware_detection/stages/create_features.py b/examples/ransomware_detection/stages/create_features.py index c5898dac13..bd74bdb83b 100644 --- a/examples/ransomware_detection/stages/create_features.py +++ b/examples/ransomware_detection/stages/create_features.py @@ -24,7 +24,6 @@ from morpheus.cli.register_stage import register_stage from morpheus.config import Config from morpheus.config import PipelineModes -from morpheus.messages import MessageMeta from morpheus.messages import MultiMessage from morpheus.pipeline.multi_message_stage import MultiMessageStage from morpheus.pipeline.stream_pair import StreamPair @@ -83,86 +82,86 @@ def accepted_types(self) -> typing.Tuple: def supports_cpp_node(self): return False - def _build_single(self, builder: mrc.Builder, input_stream: StreamPair) -> StreamPair: + def on_next(self, x: AppShieldMessageMeta): - stream = input_stream[0] + snapshot_fea_dfs = [] - def on_next(x: AppShieldMessageMeta): + df = x.df - snapshot_fea_dfs = [] + # Type cast CommitCharge. + df["CommitCharge"] = df["CommitCharge"].astype("float").astype("Int32") + df["Name"] = df["Name"].str.lower() - df = x.df + # Create PID_Process feature. + df['PID_Process'] = df.PID + '_' + df.Process - # Type cast CommitCharge. - df["CommitCharge"] = df["CommitCharge"].astype("float").astype("Int32") - df["Name"] = df["Name"].str.lower() + snapshot_ids = df.snapshot_id.unique() - # Create PID_Process feature. - df['PID_Process'] = df.PID + '_' + df.Process + if len(snapshot_ids) > 1: + # Group snapshot rows using snapshot id. + all_dfs = [df[df.snapshot_id == snapshot_id] for snapshot_id in snapshot_ids] + else: + all_dfs = [df] - snapshot_ids = df.snapshot_id.unique() + extract_func = self._fe.extract_features + combine_func = FeatureExtractor.combine_features - if len(snapshot_ids) > 1: - # Group snapshot rows using snapshot id. - all_dfs = [df[df.snapshot_id == snapshot_id] for snapshot_id in snapshot_ids] - else: - all_dfs = [df] + # Schedule dask task `extract_features` per snapshot. + snapshot_fea_dfs = self._client.map(extract_func, all_dfs, feas_all_zeros=self._feas_all_zeros) - extract_func = self._fe.extract_features - combine_func = FeatureExtractor.combine_features + # Combined `extract_features` results. + features_df = self._client.submit(combine_func, snapshot_fea_dfs) - # Schedule dask task `extract_features` per snapshot. - snapshot_fea_dfs = self._client.map(extract_func, all_dfs, feas_all_zeros=self._feas_all_zeros) + # Gather features from all the snapshots. + features_df = features_df.result() - # Combined `extract_features` results. - features_df = self._client.submit(combine_func, snapshot_fea_dfs) + # Snapshot sequence will be generated using `source_pid_process`. + # Determines which source generated the snapshot messages. + # There's a chance of receiving the same snapshots names from multiple sources(hosts) + features_df['source_pid_process'] = x.source + '_' + features_df.pid_process - # Gather features from all the snapshots. - features_df = features_df.result() + # Sort entries by pid_process and snapshot_id + features_df = features_df.sort_values(by=["pid_process", "snapshot_id"]).reset_index(drop=True) - # Snapshot sequence will be generated using `source_pid_process`. - # Determines which source generated the snapshot messages. - # There's a chance of receiving the same snapshots names from multiple sources(hosts) - features_df['source_pid_process'] = x.source + '_' + features_df.pid_process + # Create AppShieldMessageMeta with extracted features information. + meta = AppShieldMessageMeta(features_df, x.source) - # Sort entries by pid_process and snapshot_id - features_df = features_df.sort_values(by=["pid_process", "snapshot_id"]).reset_index(drop=True) + return meta - # Create AppShieldMessageMeta with extracted features information. - meta = AppShieldMessageMeta(features_df, x.source) + def create_multi_messages(self, x: AppShieldMessageMeta) -> typing.List[MultiMessage]: - return meta + multi_messages = [] - def create_multi_messages(x: MessageMeta) -> typing.List[MultiMessage]: + df = x.df - multi_messages = [] + pid_processes = df.pid_process.unique() - df = x.df + # Create multi messaage per pid_process, this assumes that the DF has been sorted by the `pid_process` column + for pid_process in pid_processes: - pid_processes = df.pid_process.unique() + pid_process_index = df[df.pid_process == pid_process].index - # Create multi messaage per pid_process - for pid_process in pid_processes: + start = pid_process_index.min() + stop = pid_process_index.max() + 1 + mess_count = stop - start - pid_process_index = df[df.pid_process == pid_process].index + multi_message = MultiMessage(meta=x, mess_offset=start, mess_count=mess_count) + multi_messages.append(multi_message) - start = pid_process_index.min() - stop = pid_process_index.max() + 1 - mess_count = stop - start + return multi_messages - multi_message = MultiMessage(meta=x, mess_offset=start, mess_count=mess_count) - multi_messages.append(multi_message) + def on_completed(self): + # Close dask client when pipeline initiates shutdown + self._client.close() - return multi_messages + def _build_single(self, builder: mrc.Builder, input_stream: StreamPair) -> StreamPair: - def on_completed(): - # Close dask client when pipeline initiates shutdown - self._client.close() + stream = input_stream[0] node = builder.make_node(self.unique_name, - ops.map(on_next), - ops.map(create_multi_messages), - ops.on_completed(on_completed), + ops.map(self.on_next), + ops.map(self.create_multi_messages), + ops.on_completed(self.on_completed), ops.flatten()) builder.make_edge(stream, node) stream = node diff --git a/examples/ransomware_detection/stages/preprocessing.py b/examples/ransomware_detection/stages/preprocessing.py index de90b57c8c..9f8abfe423 100644 --- a/examples/ransomware_detection/stages/preprocessing.py +++ b/examples/ransomware_detection/stages/preprocessing.py @@ -20,6 +20,7 @@ from common.data_models import SnapshotData from morpheus.cli.register_stage import register_stage +from morpheus.common import TypeId from morpheus.config import Config from morpheus.config import PipelineModes from morpheus.messages import InferenceMemoryFIL @@ -32,7 +33,7 @@ @register_stage("ransomware-preprocess", modes=[PipelineModes.FIL]) class PreprocessingRWStage(PreprocessBaseStage): """ - This class extends PreprocessBaseStage and process the features that aree derived from Appshield data. + This class extends PreprocessBaseStage and process the features that are derived from Appshield data. It also arranges the snapshots of Appshield data in a sequential order using provided sliding window. Parameters @@ -58,6 +59,7 @@ def __init__(self, c: Config, feature_columns: typing.List[str], sliding_window: # Padding data to map inference response with input messages. self._padding_data = [0 for i in range(self._features_len * sliding_window)] + self._needed_columns.update({'sequence': TypeId.STRING}) @property def name(self) -> str: @@ -67,10 +69,12 @@ def supports_cpp_node(self): return False def _sliding_window_offsets(self, ids: typing.List[int], ids_len: int, - window: int) -> typing.List[typing.List[int]]: + window: int) -> typing.List[typing.Tuple[int]]: """ Create snapshot_id's sliding sequence for a given window """ + assert ids_len == len(ids) + assert ids_len >= window sliding_window_offsets = [] diff --git a/models/training-tuning-scripts/fraud-detection-models/requirements.txt b/models/training-tuning-scripts/fraud-detection-models/requirements.txt index 139f666431..45d60f1018 100644 --- a/models/training-tuning-scripts/fraud-detection-models/requirements.txt +++ b/models/training-tuning-scripts/fraud-detection-models/requirements.txt @@ -1,9 +1,9 @@ dateparser==1.1.1 -matplotlib==3.6.0 -networkx==2.8.6 -numpy==1.22.4 +matplotlib==3.7.1 +networkx==2.8.8 +numpy==1.23.5 pandas==1.3.5 scikit_learn==1.1.2 stellargraph==1.2.1 -tensorflow==2.9.0 -xgboost==1.6.2 +tensorflow==2.12.0 +xgboost==1.6.2 \ No newline at end of file diff --git a/models/training-tuning-scripts/fraud-detection-models/training.py b/models/training-tuning-scripts/fraud-detection-models/training.py index 193058e6e0..80c7144a06 100644 --- a/models/training-tuning-scripts/fraud-detection-models/training.py +++ b/models/training-tuning-scripts/fraud-detection-models/training.py @@ -167,7 +167,7 @@ def inductive_step_hinsage(S, trained_model, inductive_node_identifiers, batch_s generator = HinSAGENodeGenerator(S, batch_size, num_samples, head_node_type="transaction") test_gen_not_shuffled = generator.flow(inductive_node_identifiers, shuffle=False) - inductive_emb = trained_model.predict(test_gen_not_shuffled, verbose=1) + inductive_emb = np.concatenate([trained_model.predict(row[0], verbose=1) for row in test_gen_not_shuffled]) inductive_emb = pd.DataFrame(inductive_emb, index=inductive_node_identifiers) return inductive_emb diff --git a/morpheus/stages/input/appshield_source_stage.py b/morpheus/stages/input/appshield_source_stage.py index c37e81f78c..9a800b7a83 100644 --- a/morpheus/stages/input/appshield_source_stage.py +++ b/morpheus/stages/input/appshield_source_stage.py @@ -294,7 +294,7 @@ def files_to_dfs(x: typing.List[str], cols_include: typing.List[str], cols_exclude: typing.List[str], plugins_include: typing.List[str], - encoding: str) -> pd.DataFrame: + encoding: str) -> typing.Dict[str, pd.DataFrame]: """ Load plugin files into a dataframe, then segment the dataframe by source. diff --git a/pyproject.toml b/pyproject.toml index a374aa34f7..6d559c6eea 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,7 +19,8 @@ markers = [ "use_cudf: Test supports cuDF datasets", "use_pandas: Test supports Pandas datasets", "replace_callback: Replaces the results_callback in cli", - "reload_modules: Reloads a set of python modules after running the current test" + "reload_modules: Reloads a set of python modules after running the current test", + "import_mod: Import python modules not currently in the Python search path by file name", ] filterwarnings = [ diff --git a/tests/conftest.py b/tests/conftest.py index d9a0c1cc0c..bee7a03421 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -112,6 +112,14 @@ def pytest_addoption(parser: pytest.Parser): help="A specific log level to use during testing. Defaults to WARNING if not set.", ) + parser.addoption( + "--fail_missing", + action="store_true", + dest="fail_missing", + help=("Tests requiring unmet dependencies are normally skipped. " + "Setting this flag will instead cause them to be reported as a failure"), + ) + def pytest_generate_tests(metafunc: pytest.Metafunc): """ @@ -449,110 +457,28 @@ def chdir_tmpdir(request: pytest.FixtureRequest, tmp_path): @pytest.fixture(scope="function") -def dataset(df_type: typing.Literal['cudf', 'pandas']): - """ - Yields a DatasetLoader instance with `df_type` as the default DataFrame type. - Users of this fixture can still explicitly request either a cudf or pandas dataframe with the `cudf` and `pandas` - properties: - ``` - def test_something(dataset: DatasetManager): - df = dataset["filter_probs.csv"] # type will match the df_type parameter - if dataset.default_df_type == 'pandas': - assert isinstance(df, pd.DataFrame) - else: - assert isinstance(df, cudf.DataFrame) - - pdf = dataset.pandas["filter_probs.csv"] - cdf = dataset.cudf["filter_probs.csv"] - - ``` - - A test that requests this fixture will parameterize on the type of DataFrame returned by the DatasetManager. - If a test requests both this fixture and the `use_cpp` fixture, or indirectly via the `config` fixture, then - the test will parameterize over both df_type:[cudf, pandas] and use_cpp[True, False]. However it will remove the - df_type=pandas & use_cpp=True combinations as this will cause an unsupported usage of Pandas dataframes with the - C++ implementation of message classes. - - This behavior can also be overridden by using the `use_cudf`, `use_pandas`, `use_cpp` or `use_pandas` marks ex: - ``` - # This test will only run once with C++ enabled and cudf dataframes - @pytest.mark.use_cpp - def test something(dataset: DatasetManager): - ... - # This test will run once for each dataframe type, with C++ disabled both times - @pytest.mark.use_python - def test something(dataset: DatasetManager): - ... - # This test will run twice with C++ mode enabled/disabled, using cudf dataframes both times - @pytest.mark.use_cudf - def test something(use_cpp: bool, dataset: DatasetManager): - ... - # This test will run only once - @pytest.mark.use_cudf - @pytest.mark.use_python - def test something(dataset: DatasetManager): - ... - # This test creates an incompatible combination and will raise a RuntimeError without being executed - @pytest.mark.use_pandas - @pytest.mark.use_cpp - def test something(dataset: DatasetManager): - ``` - - Users who don't want to parametarize over the DataFrame should use the `dataset_pandas` or `dataset_cudf` fixtures. - """ - from utils import dataset_manager - yield dataset_manager.DatasetManager(df_type=df_type) - - -@pytest.fixture(scope="function") -def dataset_pandas(): - """ - Yields a DatasetLoader instance with pandas as the default DataFrame type. - - Note: This fixture won't prevent a user from writing a test requiring C++ mode execution and requesting Pandas - dataframes. This is quite useful for tests like `tests/test_add_scores_stage_pipe.py` where we want to test with - both Python & C++ executions, but we use Pandas to build up the expected DataFrame to validate the test against. - - In addition to this, users can use this fixture to explicitly request a cudf Dataframe as well, allowing for a test - that looks like: - ``` - @pytest.mark.use_cpp - def test_something(dataset_pandas: DatasetManager): - input_df = dataset_pandas.cudf["filter_probs.csv"] # Feed our source stage a cudf DF - - # Perform pandas transformations to mimic the add scores stage - expected_df = dataset["filter_probs.csv"] - expected_df = expected_df.rename(columns=dict(zip(expected_df.columns, class_labels))) - ``` - """ - from utils import dataset_manager - yield dataset_manager.DatasetManager(df_type='pandas') +def reset_plugin_manger(): + from morpheus.cli.plugin_manager import PluginManager + PluginManager._singleton = None + yield @pytest.fixture(scope="function") -def dataset_cudf(): - """ - Yields a DatasetLoader instance with cudf as the default DataFrame type. - - Users who wish to have both cudf and pandas DataFrames can do so with this fixture and using the `pandas` property: - def test_something(dataset_cudf: DatasetManager): - cdf = dataset_cudf["filter_probs.csv"] - pdf = dataset_cudf.pandas["filter_probs.csv"] - """ - from utils import dataset_manager - yield dataset_manager.DatasetManager(df_type='cudf') +def reset_global_stage_registry(): + from morpheus.cli.stage_registry import GlobalStageRegistry + from morpheus.cli.stage_registry import StageRegistry + GlobalStageRegistry._global_registry = StageRegistry() + yield @pytest.fixture(scope="function") -def filter_probs_df(dataset, use_cpp: bool): +def reset_plugins(reset_plugin_manger, reset_global_stage_registry): """ - Shortcut fixture for loading the filter_probs.csv dataset. - - Unless your test uses the `use_pandas` or `use_cudf` marks this fixture will parametarize over the two dataframe - types. Similarly unless your test uses the `use_cpp` or `use_python` marks this fixture will also parametarize over - that as well, while excluding the combination of C++ execution and Pandas dataframes. + Reset both the plugin manager and the global stage gregistry. + Some of the tests for examples import modules dynamically, which in some cases can cause register_stage to be + called more than once for the same stage. """ - yield dataset["filter_probs.csv"] + yield def wait_for_camouflage(host="localhost", port=8000, timeout=5): @@ -745,6 +671,16 @@ def _wrap_set_log_level(log_level: int): set_log_level(old_level) +@pytest.fixture(scope="session") +def fail_missing(pytestconfig: pytest.Config) -> bool: + """ + Returns the value of the `fail_missing` flag, when false tests requiring unmet dependencies will be skipped, when + True they will fail. + """ + yield pytestconfig.getoption("fail_missing") + + +# ==== Logging Fixtures ==== @pytest.fixture(scope="function") def reset_loglevel(): """ @@ -802,3 +738,108 @@ def loglevel_fatal(): # ==== DataFrame Fixtures ==== +@pytest.fixture(scope="function") +def dataset(df_type: typing.Literal['cudf', 'pandas']): + """ + Yields a DatasetLoader instance with `df_type` as the default DataFrame type. + Users of this fixture can still explicitly request either a cudf or pandas dataframe with the `cudf` and `pandas` + properties: + ``` + def test_something(dataset: DatasetManager): + df = dataset["filter_probs.csv"] # type will match the df_type parameter + if dataset.default_df_type == 'pandas': + assert isinstance(df, pd.DataFrame) + else: + assert isinstance(df, cudf.DataFrame) + + pdf = dataset.pandas["filter_probs.csv"] + cdf = dataset.cudf["filter_probs.csv"] + + ``` + + A test that requests this fixture will parameterize on the type of DataFrame returned by the DatasetManager. + If a test requests both this fixture and the `use_cpp` fixture, or indirectly via the `config` fixture, then + the test will parameterize over both df_type:[cudf, pandas] and use_cpp[True, False]. However it will remove the + df_type=pandas & use_cpp=True combinations as this will cause an unsupported usage of Pandas dataframes with the + C++ implementation of message classes. + + This behavior can also be overridden by using the `use_cudf`, `use_pandas`, `use_cpp` or `use_pandas` marks ex: + ``` + # This test will only run once with C++ enabled and cudf dataframes + @pytest.mark.use_cpp + def test something(dataset: DatasetManager): + ... + # This test will run once for each dataframe type, with C++ disabled both times + @pytest.mark.use_python + def test something(dataset: DatasetManager): + ... + # This test will run twice with C++ mode enabled/disabled, using cudf dataframes both times + @pytest.mark.use_cudf + def test something(use_cpp: bool, dataset: DatasetManager): + ... + # This test will run only once + @pytest.mark.use_cudf + @pytest.mark.use_python + def test something(dataset: DatasetManager): + ... + # This test creates an incompatible combination and will raise a RuntimeError without being executed + @pytest.mark.use_pandas + @pytest.mark.use_cpp + def test something(dataset: DatasetManager): + ``` + + Users who don't want to parametarize over the DataFrame should use the `dataset_pandas` or `dataset_cudf` fixtures. + """ + from utils import dataset_manager + yield dataset_manager.DatasetManager(df_type=df_type) + + +@pytest.fixture(scope="function") +def dataset_pandas(): + """ + Yields a DatasetLoader instance with pandas as the default DataFrame type. + + Note: This fixture won't prevent a user from writing a test requiring C++ mode execution and requesting Pandas + dataframes. This is quite useful for tests like `tests/test_add_scores_stage_pipe.py` where we want to test with + both Python & C++ executions, but we use Pandas to build up the expected DataFrame to validate the test against. + + In addition to this, users can use this fixture to explicitly request a cudf Dataframe as well, allowing for a test + that looks like: + ``` + @pytest.mark.use_cpp + def test_something(dataset_pandas: DatasetManager): + input_df = dataset_pandas.cudf["filter_probs.csv"] # Feed our source stage a cudf DF + + # Perform pandas transformations to mimic the add scores stage + expected_df = dataset["filter_probs.csv"] + expected_df = expected_df.rename(columns=dict(zip(expected_df.columns, class_labels))) + ``` + """ + from utils import dataset_manager + yield dataset_manager.DatasetManager(df_type='pandas') + + +@pytest.fixture(scope="function") +def dataset_cudf(): + """ + Yields a DatasetLoader instance with cudf as the default DataFrame type. + + Users who wish to have both cudf and pandas DataFrames can do so with this fixture and using the `pandas` property: + def test_something(dataset_cudf: DatasetManager): + cdf = dataset_cudf["filter_probs.csv"] + pdf = dataset_cudf.pandas["filter_probs.csv"] + """ + from utils import dataset_manager + yield dataset_manager.DatasetManager(df_type='cudf') + + +@pytest.fixture(scope="function") +def filter_probs_df(dataset, use_cpp: bool): + """ + Shortcut fixture for loading the filter_probs.csv dataset. + + Unless your test uses the `use_pandas` or `use_cudf` marks this fixture will parametarize over the two dataframe + types. Similarly unless your test uses the `use_cpp` or `use_python` marks this fixture will also parametarize over + that as well, while excluding the combination of C++ execution and Pandas dataframes. + """ + yield dataset["filter_probs.csv"] diff --git a/tests/examples/gnn_fraud_detection_pipeline/conftest.py b/tests/examples/gnn_fraud_detection_pipeline/conftest.py new file mode 100644 index 0000000000..ed8e690878 --- /dev/null +++ b/tests/examples/gnn_fraud_detection_pipeline/conftest.py @@ -0,0 +1,140 @@ +# SPDX-FileCopyrightText: Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import sys + +import pytest + +from utils import TEST_DIRS +from utils import import_or_skip + +SKIP_REASON = ("Tests for the gnn_fraud_detection_pipeline example require a number of packages not installed in the " + "Morpheus development environment. See `examples/gnn_fraud_detection_pipeline/README.md` for details on " + "installing these additional dependencies") + + +@pytest.fixture(autouse=True, scope='session') +def stellargraph(fail_missing: bool): + """ + All of the tests in this subdir require stellargraph + """ + yield import_or_skip("stellargraph", reason=SKIP_REASON, fail_missing=fail_missing) + + +@pytest.fixture(autouse=True, scope='session') +def cuml(fail_missing: bool): + """ + All of the tests in this subdir require cuml + """ + yield import_or_skip("cuml", reason=SKIP_REASON, fail_missing=fail_missing) + + +@pytest.fixture(autouse=True, scope='session') +def tensorflow(fail_missing: bool): + """ + All of the tests in this subdir require tensorflow + """ + yield import_or_skip("tensorflow", reason=SKIP_REASON, fail_missing=fail_missing) + + +@pytest.fixture +def config(config): + """ + The GNN fraud detection pipeline utilizes the "other" pipeline mode. + """ + from morpheus.config import PipelineModes + config.mode = PipelineModes.OTHER + yield config + + +@pytest.fixture +def example_dir(): + yield os.path.join(TEST_DIRS.examples_dir, 'gnn_fraud_detection_pipeline') + + +@pytest.fixture +def training_file(example_dir: str): + yield os.path.join(example_dir, 'training.csv') + + +@pytest.fixture +def hinsage_model(example_dir: str): + yield os.path.join(example_dir, 'model/hinsage-model.pt') + + +@pytest.fixture +def xgb_model(example_dir: str): + yield os.path.join(example_dir, 'model/xgb-model.pt') + + +# Some of the code inside gnn_fraud_detection_pipeline performs some relative imports in the form of: +# from .mod import Class +# For this reason we need to ensure that the examples dir is in the sys.path first +@pytest.fixture +def gnn_fraud_detection_pipeline(request: pytest.FixtureRequest, restore_sys_path, reset_plugins): + sys.path.append(TEST_DIRS.examples_dir) + import gnn_fraud_detection_pipeline + yield gnn_fraud_detection_pipeline + + +@pytest.fixture +def test_data(): + """ + Construct test data, a small DF of 10 rows which we will build a graph from + The nodes in our graph will be the unique values from each of our three columns, and the index is also + representing our transaction ids. + There is only one duplicated value (2697) in our dataset so we should expect 29 nodes + Our expected edges will be each value in client_node and merchant_node to their associated index value ex: + (795, 2) & (8567, 2) + thus we should expect 20 edges, although 2697 is duplicated in the client_node column we should expect two + unique edges for each entry (2697, 14) & (2697, 91) + """ + import pandas as pd + index = [2, 14, 16, 26, 41, 42, 70, 91, 93, 95] + client_data = [795, 2697, 5531, 415, 2580, 3551, 6547, 2697, 3503, 7173] + merchant_data = [8567, 4609, 2781, 7844, 629, 6915, 7071, 570, 2446, 8110] + + df_data = { + 'index': index, + 'client_node': client_data, + 'merchant_node': merchant_data, + 'fraud_label': [1 for _ in range(len(index))] + } + + # Fill in the other columns so that we match the shape the model is expecting + for i in range(1000, 1113): + # these two values are skipped, apparently place-holders for client_node & merchant_node + if i not in (1002, 1003): + df_data[str(i)] = [0 for _ in range(len(index))] + + df = pd.DataFrame(df_data, index=index) + + expected_nodes = set(index + client_data + merchant_data) + assert len(expected_nodes) == 29 # ensuring test data & assumptions are correct + + expected_edges = set() + for data in (client_data, merchant_data): + for (i, val) in enumerate(data): + expected_edges.add((val, index[i])) + + assert len(expected_edges) == 20 # ensuring test data & assumptions are correct + + yield dict(index=index, + client_data=client_data, + merchant_data=merchant_data, + df=df, + expected_nodes=expected_nodes, + expected_edges=expected_edges) diff --git a/tests/examples/gnn_fraud_detection_pipeline/test_classification_stage.py b/tests/examples/gnn_fraud_detection_pipeline/test_classification_stage.py new file mode 100644 index 0000000000..ae164ecace --- /dev/null +++ b/tests/examples/gnn_fraud_detection_pipeline/test_classification_stage.py @@ -0,0 +1,68 @@ +# SPDX-FileCopyrightText: Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, eithe r express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import types + +import pytest + +from morpheus.config import Config +from morpheus.messages import MessageMeta +from utils.dataset_manager import DatasetManager + + +@pytest.mark.use_python +class TestClassificationStage: + + def test_constructor(self, + config: Config, + xgb_model: str, + gnn_fraud_detection_pipeline: types.ModuleType, + cuml: types.ModuleType): + from gnn_fraud_detection_pipeline.stages.classification_stage import ClassificationStage + + stage = ClassificationStage(config, xgb_model) + assert isinstance(stage._xgb_model, cuml.ForestInference) + + def test_process_message(self, + config: Config, + xgb_model: str, + gnn_fraud_detection_pipeline: types.ModuleType, + dataset_cudf: DatasetManager): + from gnn_fraud_detection_pipeline.stages.classification_stage import ClassificationStage + from gnn_fraud_detection_pipeline.stages.graph_sage_stage import GraphSAGEMultiMessage + + df = dataset_cudf['examples/gnn_fraud_detection_pipeline/inductive_emb.csv'] + df.rename(lambda x: "ind_emb_{}".format(x), axis=1, inplace=True) + + expected_df = dataset_cudf.pandas['examples/gnn_fraud_detection_pipeline/predictions.csv'] + assert len(df) == len(expected_df) + + # The exact values of the node_identifiers aren't important to this stage, we just need to verify that they're + # inserted into a "node_id" column in the DF + node_identifiers = expected_df['node_id'].tolist() + + ind_emb_columns = list(df.columns) + + meta = MessageMeta(df) + msg = GraphSAGEMultiMessage(meta=meta, + node_identifiers=node_identifiers, + inductive_embedding_column_names=ind_emb_columns) + + stage = ClassificationStage(config, xgb_model) + results = stage._process_message(msg) + print(results.get_meta(['prediction', 'node_id'])) + + # The stage actually edits the message in place, and returns it, but we don't need to assert that + dataset_cudf.assert_compare_df(results.get_meta(['prediction', 'node_id']), expected_df) diff --git a/tests/examples/gnn_fraud_detection_pipeline/test_graph_construction_stage.py b/tests/examples/gnn_fraud_detection_pipeline/test_graph_construction_stage.py new file mode 100644 index 0000000000..d39c00d994 --- /dev/null +++ b/tests/examples/gnn_fraud_detection_pipeline/test_graph_construction_stage.py @@ -0,0 +1,119 @@ +# SPDX-FileCopyrightText: Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import types +import typing +from io import StringIO + +import pandas as pd +import pytest + +import cudf + +from morpheus.config import Config +from morpheus.messages import MessageMeta +from morpheus.messages import MultiMessage +from utils import TEST_DIRS + + +@pytest.mark.use_python +@pytest.mark.import_mod( + [os.path.join(TEST_DIRS.examples_dir, 'gnn_fraud_detection_pipeline/stages/graph_construction_stage.py')]) +class TestGraphConstructionStage: + + def test_constructor(self, config: Config, training_file: str, import_mod: typing.List[types.ModuleType]): + graph_construction_stage = import_mod[0] + stage = graph_construction_stage.FraudGraphConstructionStage(config, training_file) + assert isinstance(stage._training_data, cudf.DataFrame) + + # The training datafile contains many more columns than this, but these are the four columns + # that are depended upon in the code + assert {'client_node', 'index', 'fraud_label', 'merchant_node'}.issubset(stage._column_names) + + def _check_graph( + self, + stellargraph: types.ModuleType, + sg: "stellargraph.StellarGraph", # noqa: F821 + expected_nodes, + expected_edges): + assert isinstance(sg, stellargraph.StellarGraph) + sg.check_graph_for_ml(features=True, expensive_check=True) # this will raise if it doesn't pass + assert not sg.is_directed() + + nodes = sg.nodes() + assert set(nodes) == expected_nodes + + edges = sg.edges() + assert set(edges) == expected_edges + + def test_graph_construction(self, + import_mod: typing.List[types.ModuleType], + stellargraph: types.ModuleType, + test_data: dict): + graph_construction_stage = import_mod[0] + df = test_data['df'] + + client_features = pd.DataFrame({0: 1}, index=list(set(test_data['client_data']))) + merchant_features = pd.DataFrame({0: 1}, index=test_data['merchant_data']) + + # Call _graph_construction + sg = graph_construction_stage.FraudGraphConstructionStage._graph_construction( + nodes={ + 'client': df.client_node, 'merchant': df.merchant_node, 'transaction': df.index + }, + edges=[ + zip(df.client_node, df.index), + zip(df.merchant_node, df.index), + ], + node_features={ + "transaction": df[['client_node', 'merchant_node']], + "client": client_features, + "merchant": merchant_features + }) + + self._check_graph(stellargraph, sg, test_data['expected_nodes'], test_data['expected_edges']) + + def test_build_graph_features(self, + import_mod: typing.List[types.ModuleType], + stellargraph: types.ModuleType, + test_data: dict): + graph_construction_stage = import_mod[0] + sg = graph_construction_stage.FraudGraphConstructionStage._build_graph_features(test_data['df']) + self._check_graph(stellargraph, sg, test_data['expected_nodes'], test_data['expected_edges']) + + def test_process_message(self, + config: Config, + import_mod: typing.List[types.ModuleType], + stellargraph: types.ModuleType, + test_data: dict): + graph_construction_stage = import_mod[0] + df = test_data['df'] + + # The stage wants a csv file from the first 5 rows + training_data = StringIO(df[0:5].to_csv(index=False)) + stage = graph_construction_stage.FraudGraphConstructionStage(config, training_data) + + # Since we used the first 5 rows as the training data, send the second 5 as inference data + meta = MessageMeta(cudf.DataFrame(df)) + mm = MultiMessage(meta=meta, mess_offset=5, mess_count=5) + fgmm = stage._process_message(mm) + + assert isinstance(fgmm, graph_construction_stage.FraudGraphMultiMessage) + assert fgmm.meta is meta + assert fgmm.mess_offset == 5 + assert fgmm.mess_count == 5 + + self._check_graph(stellargraph, fgmm.graph, test_data['expected_nodes'], test_data['expected_edges']) diff --git a/tests/examples/gnn_fraud_detection_pipeline/test_graph_sage_stage.py b/tests/examples/gnn_fraud_detection_pipeline/test_graph_sage_stage.py new file mode 100644 index 0000000000..b8a449de48 --- /dev/null +++ b/tests/examples/gnn_fraud_detection_pipeline/test_graph_sage_stage.py @@ -0,0 +1,103 @@ +# SPDX-FileCopyrightText: Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import types + +import pytest + +import cudf + +from morpheus.config import Config +from morpheus.messages import MessageMeta +from utils.dataset_manager import DatasetManager + + +@pytest.mark.use_python +class TestGraphSageStage: + + def test_constructor(self, + config: Config, + hinsage_model: str, + gnn_fraud_detection_pipeline: types.ModuleType, + tensorflow): + from gnn_fraud_detection_pipeline.stages.graph_sage_stage import GraphSAGEStage + stage = GraphSAGEStage(config, + model_hinsage_file=hinsage_model, + batch_size=10, + sample_size=[4, 64], + record_id="test_id", + target_node="test_node") + + assert isinstance(stage._keras_model, tensorflow.keras.models.Model) + assert stage._batch_size == 10 + assert stage._sample_size == [4, 64] + assert stage._record_id == "test_id" + assert stage._target_node == "test_node" + + def test_inductive_step_hinsage(self, + config: Config, + hinsage_model: str, + gnn_fraud_detection_pipeline: types.ModuleType, + test_data: dict, + dataset_pandas: DatasetManager): + from gnn_fraud_detection_pipeline.stages.graph_construction_stage import FraudGraphConstructionStage + from gnn_fraud_detection_pipeline.stages.graph_sage_stage import GraphSAGEStage + + # The column names in the saved test data will be strings, in the results they will be ints + expected_df = dataset_pandas['examples/gnn_fraud_detection_pipeline/inductive_emb.csv'] + expected_df.rename(lambda x: int(x), axis=1, inplace=True) + + df = test_data['df'] + + graph = FraudGraphConstructionStage._build_graph_features(df) + + stage = GraphSAGEStage(config, model_hinsage_file=hinsage_model) + results = stage._inductive_step_hinsage(graph, stage._keras_model, test_data['index']) + + assert isinstance(results, cudf.DataFrame) + assert results.index.to_arrow().to_pylist() == test_data['index'] + dataset_pandas.assert_compare_df(results, expected_df) + + def test_process_message(self, + config: Config, + hinsage_model: str, + gnn_fraud_detection_pipeline: types.ModuleType, + test_data: dict, + dataset_pandas: DatasetManager): + from gnn_fraud_detection_pipeline.stages.graph_construction_stage import FraudGraphConstructionStage + from gnn_fraud_detection_pipeline.stages.graph_construction_stage import FraudGraphMultiMessage + from gnn_fraud_detection_pipeline.stages.graph_sage_stage import GraphSAGEMultiMessage + from gnn_fraud_detection_pipeline.stages.graph_sage_stage import GraphSAGEStage + + expected_df = dataset_pandas['examples/gnn_fraud_detection_pipeline/inductive_emb.csv'] + expected_df.rename(lambda x: "ind_emb_{}".format(x), axis=1, inplace=True) + + df = test_data['df'] + meta = MessageMeta(cudf.DataFrame(df)) + graph = FraudGraphConstructionStage._build_graph_features(df) + msg = FraudGraphMultiMessage(meta=meta, graph=graph) + + stage = GraphSAGEStage(config, model_hinsage_file=hinsage_model) + results = stage._process_message(msg) + + assert isinstance(results, GraphSAGEMultiMessage) + assert results.meta is meta + assert results.mess_offset == 0 + assert results.mess_count == len(df) + assert results.node_identifiers == test_data['index'] + assert sorted(results.inductive_embedding_column_names) == sorted(expected_df.columns) + + ind_emb_df = results.get_meta(results.inductive_embedding_column_names) + dataset_pandas.assert_compare_df(ind_emb_df.to_pandas(), expected_df) diff --git a/tests/examples/log_parsing/test_postprocessing.py b/tests/examples/log_parsing/test_postprocessing.py index f40d936437..ec2c6e9f4a 100644 --- a/tests/examples/log_parsing/test_postprocessing.py +++ b/tests/examples/log_parsing/test_postprocessing.py @@ -73,4 +73,4 @@ def test_log_parsing_post_processing_stage(config: Config, out_meta = stage._postprocess(post_proc_message) assert isinstance(out_meta, MessageMeta) - DatasetManager.assert_df_equal(out_meta._df, expected_df) + DatasetManager.assert_compare_df(out_meta._df, expected_df) diff --git a/tests/examples/ransomware_detection/conftest.py b/tests/examples/ransomware_detection/conftest.py new file mode 100644 index 0000000000..5a52f30449 --- /dev/null +++ b/tests/examples/ransomware_detection/conftest.py @@ -0,0 +1,76 @@ +# SPDX-FileCopyrightText: Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import sys + +import pytest +import yaml + +from utils import TEST_DIRS +from utils import import_or_skip + +SKIP_REASON = ("Tests for the ransomware_detection example require a number of packages not installed in the Morpheus " + "development environment. See `/home/dagardner/work/morpheus/examples/ransomware_detection/README.md` " + "for details on installing these additional dependencies") + + +@pytest.fixture(autouse=True, scope='session') +def dask_distributed(fail_missing: bool): + """ + All of the tests in this subdir requires dask.distributed + """ + yield import_or_skip("dask.distributed", reason=SKIP_REASON, fail_missing=fail_missing) + + +@pytest.fixture +def config(config): + """ + The ransomware detection pipeline utilizes the FIL pipeline mode. + """ + from morpheus.config import PipelineModes + config.mode = PipelineModes.FIL + yield config + + +@pytest.fixture +def example_dir(): + yield os.path.join(TEST_DIRS.examples_dir, 'ransomware_detection') + + +@pytest.fixture +def conf_file(example_dir): + yield os.path.join(example_dir, 'config/ransomware_detection.yaml') + + +@pytest.fixture +def rwd_conf(conf_file): + with open(conf_file, encoding='UTF-8') as fh: + conf = yaml.safe_load(fh) + + yield conf + + +@pytest.fixture +def interested_plugins(): + yield ['ldrmodules', 'threadlist', 'envars', 'vadinfo', 'handles'] + + +# Some of the code inside ransomware_detection performs imports in the form of: +# from common.... +# For this reason we need to ensure that the examples/ransomware_detection dir is in the sys.path first +@pytest.fixture(autouse=True) +def ransomware_detection_in_sys_path(request: pytest.FixtureRequest, restore_sys_path, reset_plugins, example_dir): + sys.path.append(example_dir) diff --git a/tests/examples/ransomware_detection/test_create_features.py b/tests/examples/ransomware_detection/test_create_features.py new file mode 100644 index 0000000000..5b0fccc2b1 --- /dev/null +++ b/tests/examples/ransomware_detection/test_create_features.py @@ -0,0 +1,198 @@ +# SPDX-FileCopyrightText: Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import glob +import os +import types +import typing +from unittest import mock + +import pytest + +from morpheus.config import Config +from morpheus.messages import MultiMessage +from morpheus.messages.message_meta import AppShieldMessageMeta +from morpheus.pipeline.multi_message_stage import MultiMessageStage +from morpheus.stages.input.appshield_source_stage import AppShieldSourceStage +from utils import TEST_DIRS +from utils.dataset_manager import DatasetManager + + +@pytest.mark.use_python +class TestCreateFeaturesRWStage: + + def test_constructor(self, + config: Config, + dask_distributed: types.ModuleType, + rwd_conf: dict, + interested_plugins: typing.List[str]): + from common.data_models import FeatureConfig + from common.feature_extractor import FeatureExtractor + from stages.create_features import CreateFeaturesRWStage + + n_workers = 12 + threads_per_worker = 8 + stage = CreateFeaturesRWStage(config, + interested_plugins=interested_plugins, + feature_columns=rwd_conf['model_features'], + file_extns=rwd_conf['file_extensions'], + n_workers=n_workers, + threads_per_worker=threads_per_worker) + + assert isinstance(stage, MultiMessageStage) + assert isinstance(stage._client, dask_distributed.Client) + scheduler_info = stage._client.scheduler_info() + len(scheduler_info['workers']) == n_workers + for worker in scheduler_info['workers'].values(): + assert worker['nthreads'] == threads_per_worker + + assert isinstance(stage._feature_config, FeatureConfig) + assert stage._feature_config.file_extns == rwd_conf['file_extensions'] + assert stage._feature_config.interested_plugins == interested_plugins + + assert stage._feas_all_zeros == {c: 0 for c in rwd_conf['model_features']} + + assert isinstance(stage._fe, FeatureExtractor) + assert stage._fe._config is stage._feature_config + + @mock.patch('stages.create_features.Client') + def test_on_next(self, + mock_dask_client, + config: Config, + rwd_conf: dict, + interested_plugins: typing.List[str], + dataset_pandas: DatasetManager): + from stages.create_features import CreateFeaturesRWStage + + test_data_dir = os.path.join(TEST_DIRS.tests_data_dir, 'examples/ransomware_detection') + + mock_dask_client.return_value = mock_dask_client + mock_dask_client.map.return_value = mock.MagicMock() + + dask_results = dataset_pandas[os.path.join(test_data_dir, 'dask_results.csv')] + + mock_dask_future = mock.MagicMock() + mock_dask_future.result.return_value = dask_results + mock_dask_client.submit.return_value = mock_dask_future + + input_glob = os.path.join(TEST_DIRS.tests_data_dir, 'appshield', 'snapshot-1', '*.json') + input_data = AppShieldSourceStage.files_to_dfs(glob.glob(input_glob), + cols_include=rwd_conf['raw_columns'], + cols_exclude=["SHA256"], + plugins_include=interested_plugins, + encoding='latin1') + + input_metas = AppShieldSourceStage._build_metadata(input_data) + + # Make sure the input test date looks the way we expect it + assert len(input_metas) == 1 + input_meta = input_metas[0] + assert input_meta.source == 'appshield' + + stage = CreateFeaturesRWStage(config, + interested_plugins=interested_plugins, + feature_columns=rwd_conf['model_features'], + file_extns=rwd_conf['file_extensions'], + n_workers=5, + threads_per_worker=6) + + # make sure we have a mocked dask client + assert stage._client is mock_dask_client + + meta = stage.on_next(input_meta) + assert isinstance(meta, AppShieldMessageMeta) + assert meta.source == input_meta.source + + expected_df = dataset_pandas[os.path.join(test_data_dir, 'dask_results.csv')] + expected_df['source_pid_process'] = 'appshield_' + expected_df.pid_process + expected_df.sort_values(by=["pid_process", "snapshot_id"], inplace=True) + expected_df.reset_index(drop=True, inplace=True) + dataset_pandas.assert_compare_df(meta.copy_dataframe(), expected_df) + + @mock.patch('stages.create_features.Client') + def test_create_multi_messages(self, + mock_dask_client, + config: Config, + rwd_conf: dict, + interested_plugins: typing.List[str], + dataset_pandas: DatasetManager): + from stages.create_features import CreateFeaturesRWStage + mock_dask_client.return_value = mock_dask_client + + pids = [75956, 118469, 1348612, 2698363, 2721362, 2788672] + df = dataset_pandas["filter_probs.csv"] + df['pid_process'] = [ + 2788672, + 75956, + 75956, + 2788672, + 2788672, + 2698363, + 2721362, + 118469, + 1348612, + 2698363, + 118469, + 2698363, + 1348612, + 118469, + 75956, + 2721362, + 75956, + 118469, + 118469, + 118469 + ] + df = df.sort_values(by=["pid_process"]).reset_index(drop=True) + + stage = CreateFeaturesRWStage(config, + interested_plugins=interested_plugins, + feature_columns=rwd_conf['model_features'], + file_extns=rwd_conf['file_extensions'], + n_workers=5, + threads_per_worker=6) + + meta = AppShieldMessageMeta(df, source='tests') + multi_messages = stage.create_multi_messages(meta) + assert len(multi_messages) == len(pids) + + prev_loc = 0 + for (i, mm) in enumerate(multi_messages): + assert isinstance(mm, MultiMessage) + pid = pids[i] + (mm.get_meta(['pid_process']) == pid).all() + assert mm.mess_offset == prev_loc + prev_loc = mm.mess_offset + mm.mess_count + + assert prev_loc == len(df) + + @mock.patch('stages.create_features.Client') + def test_on_completed(self, mock_dask_client, config: Config, rwd_conf: dict, interested_plugins: typing.List[str]): + from stages.create_features import CreateFeaturesRWStage + mock_dask_client.return_value = mock_dask_client + + stage = CreateFeaturesRWStage(config, + interested_plugins=interested_plugins, + feature_columns=rwd_conf['model_features'], + file_extns=rwd_conf['file_extensions'], + n_workers=5, + threads_per_worker=6) + + assert stage._client is mock_dask_client + mock_dask_client.close.assert_not_called() + + stage.on_completed() + + mock_dask_client.close.assert_called_once() diff --git a/tests/examples/ransomware_detection/test_preprocessing.py b/tests/examples/ransomware_detection/test_preprocessing.py new file mode 100644 index 0000000000..af86df90b1 --- /dev/null +++ b/tests/examples/ransomware_detection/test_preprocessing.py @@ -0,0 +1,164 @@ +# SPDX-FileCopyrightText: Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import cupy as cp +import pandas as pd +import pytest + +from morpheus.config import Config +from morpheus.messages import MultiMessage +from morpheus.messages.message_meta import AppShieldMessageMeta +from morpheus.messages.multi_inference_message import MultiInferenceFILMessage +from morpheus.stages.preprocess.preprocess_base_stage import PreprocessBaseStage +from utils.dataset_manager import DatasetManager + + +@pytest.mark.use_python +class TestPreprocessingRWStage: + + def test_constructor(self, config: Config, rwd_conf: dict): + from stages.preprocessing import PreprocessingRWStage + + stage = PreprocessingRWStage(config, feature_columns=rwd_conf['model_features'], sliding_window=6) + assert isinstance(stage, PreprocessBaseStage) + assert stage._feature_columns == rwd_conf['model_features'] + assert stage._features_len == len(rwd_conf['model_features']) + assert stage._snapshot_dict == {} + assert len(stage._padding_data) == len(rwd_conf['model_features']) * 6 + for i in stage._padding_data: + assert i == 0 + + def test_sliding_window_offsets(self, config: Config, rwd_conf: dict): + from stages.preprocessing import PreprocessingRWStage + + stage = PreprocessingRWStage(config, feature_columns=rwd_conf['model_features'], sliding_window=6) + + window = 3 + ids = [17, 18, 19, 20, 21, 22, 23, 31, 32, 33] + results = stage._sliding_window_offsets(ids, len(ids), window=window) + assert results == [(0, 3), (1, 4), (2, 5), (3, 6), (4, 7), (7, 10)] + + # Non-consecutive ids don't create sliding windows + stage._sliding_window_offsets(list(reversed(ids)), len(ids), window=window) == [] + + def test_sliding_window_offsets_errors(self, config: Config, rwd_conf: dict): + from stages.preprocessing import PreprocessingRWStage + + stage = PreprocessingRWStage(config, feature_columns=rwd_conf['model_features'], sliding_window=6) + + # ids_len doesn't match the length of the ids list + with pytest.raises(AssertionError): + stage._sliding_window_offsets(ids=[5, 6, 7], ids_len=12, window=2) + + # Window is larger than ids + with pytest.raises(AssertionError): + stage._sliding_window_offsets(ids=[5, 6, 7], ids_len=3, window=4) + + def test_rollover_pending_snapshots(self, config: Config, rwd_conf: dict, dataset_pandas: DatasetManager): + from stages.preprocessing import PreprocessingRWStage + + snapshot_ids = [5, 8, 10, 13] + source_pid_process = "123_test.exe" + df = dataset_pandas['examples/ransomware_detection/dask_results.csv'] + assert len(df) == len(snapshot_ids) + + # The snapshot_id's in the test data set are all '1', set them to different values + df['snapshot_id'] = snapshot_ids + df.index = df.snapshot_id + + stage = PreprocessingRWStage(config, feature_columns=rwd_conf['model_features'], sliding_window=4) + stage._rollover_pending_snapshots(snapshot_ids, source_pid_process, df) + + assert list(stage._snapshot_dict.keys()) == [source_pid_process] + + # Due to the sliding window we should have all but the first snapshot_id in the results + expected_snapshot_ids = snapshot_ids[1:] + snapshots = stage._snapshot_dict[source_pid_process] + + assert len(snapshots) == len(expected_snapshot_ids) + for (i, snapshot) in enumerate(snapshots): + expected_snapshot_id = expected_snapshot_ids[i] + assert snapshot.snapshot_id == expected_snapshot_id + expected_data = df.loc[expected_snapshot_id].fillna('').values + assert (pd.Series(snapshot.data).fillna('').values == expected_data).all() + + def test_rollover_pending_snapshots_empty_results(self, + config: Config, + rwd_conf: dict, + dataset_pandas: DatasetManager): + from stages.preprocessing import PreprocessingRWStage + + snapshot_ids = [] + source_pid_process = "123_test.exe" + df = dataset_pandas['examples/ransomware_detection/dask_results.csv'] + + stage = PreprocessingRWStage(config, feature_columns=rwd_conf['model_features'], sliding_window=4) + stage._rollover_pending_snapshots(snapshot_ids, source_pid_process, df) + assert len(stage._snapshot_dict) == 0 + + def test_merge_curr_and_prev_snapshots(self, config: Config, rwd_conf: dict, dataset_pandas: DatasetManager): + from common.data_models import SnapshotData + from stages.preprocessing import PreprocessingRWStage + + snapshot_ids = [5, 8, 10, 13] + source_pid_process = "123_test.exe" + df = dataset_pandas['examples/ransomware_detection/dask_results.csv'] + assert len(df) == len(snapshot_ids) + df['snapshot_id'] = snapshot_ids + df.index = df.snapshot_id + + stage = PreprocessingRWStage(config, feature_columns=rwd_conf['model_features'], sliding_window=4) + test_row_8 = df.loc[8].copy(deep=True) + test_row_8.pid_process = 'test_val1' + + test_row_13 = df.loc[13].copy(deep=True) + test_row_13.pid_process = 'test_val2' + + stage._snapshot_dict = { + source_pid_process: [SnapshotData(8, test_row_8.values), SnapshotData(13, test_row_13.values)] + } + + expected_df = dataset_pandas['examples/ransomware_detection/dask_results.csv'].fillna('') + expected_df['pid_process'][1] = 'test_val1' + expected_df['pid_process'][3] = 'test_val2' + + expected_df['snapshot_id'] = snapshot_ids + expected_df.index = expected_df.snapshot_id + + stage._merge_curr_and_prev_snapshots(df, source_pid_process) + dataset_pandas.assert_compare_df(df.fillna(''), expected_df) + + def test_pre_process_batch(self, config: Config, rwd_conf: dict, dataset_pandas: DatasetManager): + from stages.preprocessing import PreprocessingRWStage + df = dataset_pandas['examples/ransomware_detection/dask_results.csv'] + df['source_pid_process'] = 'appshield_' + df.pid_process + expected_df = df.copy(deep=True).fillna('') + meta = AppShieldMessageMeta(df=df, source='tests') + mm = MultiMessage(meta=meta) + + sliding_window = 4 + stage = PreprocessingRWStage(config, feature_columns=rwd_conf['model_features'], sliding_window=sliding_window) + results = stage._pre_process_batch(mm) + assert isinstance(results, MultiInferenceFILMessage) + + expected_df['sequence'] = ['dummy' for _ in range(len(expected_df))] + expected_input__0 = cp.asarray([0 for i in range(len(rwd_conf['model_features']) * sliding_window)]) + expected_seq_ids = cp.zeros((len(expected_df), 3), dtype=cp.uint32) + expected_seq_ids[:, 0] = cp.arange(0, len(expected_df), dtype=cp.uint32) + expected_seq_ids[:, 2] = len(rwd_conf['model_features']) * 3 + + dataset_pandas.assert_compare_df(results.get_meta().fillna(''), expected_df) + assert (results.get_tensor('input__0') == expected_input__0).all() + assert (results.get_tensor('seq_ids') == expected_seq_ids).all() diff --git a/tests/test_add_classifications_stage.py b/tests/test_add_classifications_stage.py index a8abaade05..5b90235d1b 100755 --- a/tests/test_add_classifications_stage.py +++ b/tests/test_add_classifications_stage.py @@ -65,9 +65,9 @@ def test_add_labels(): labeled = AddClassificationsStage._add_labels(message, idx2label=class_labels, threshold=threshold) - assert DatasetManager.assert_df_equal(labeled.get_meta("frogs"), probs_array_bool[:, 0]) - assert DatasetManager.assert_df_equal(labeled.get_meta("lizards"), probs_array_bool[:, 1]) - assert DatasetManager.assert_df_equal(labeled.get_meta("toads"), probs_array_bool[:, 2]) + DatasetManager.assert_df_equal(labeled.get_meta("frogs"), probs_array_bool[:, 0]) + DatasetManager.assert_df_equal(labeled.get_meta("lizards"), probs_array_bool[:, 1]) + DatasetManager.assert_df_equal(labeled.get_meta("toads"), probs_array_bool[:, 2]) # Same thing but change the probs tensor name message = MultiResponseMessage(meta=MessageMeta(df), @@ -76,9 +76,9 @@ def test_add_labels(): labeled = AddClassificationsStage._add_labels(message, idx2label=class_labels, threshold=threshold) - assert DatasetManager.assert_df_equal(labeled.get_meta("frogs"), probs_array_bool[:, 0]) - assert DatasetManager.assert_df_equal(labeled.get_meta("lizards"), probs_array_bool[:, 1]) - assert DatasetManager.assert_df_equal(labeled.get_meta("toads"), probs_array_bool[:, 2]) + DatasetManager.assert_df_equal(labeled.get_meta("frogs"), probs_array_bool[:, 0]) + DatasetManager.assert_df_equal(labeled.get_meta("lizards"), probs_array_bool[:, 1]) + DatasetManager.assert_df_equal(labeled.get_meta("toads"), probs_array_bool[:, 2]) # Fail in missing probs data message = MultiResponseMessage(meta=MessageMeta(df), diff --git a/tests/test_add_scores_stage.py b/tests/test_add_scores_stage.py index ab7a1fa144..e08642d7bc 100755 --- a/tests/test_add_scores_stage.py +++ b/tests/test_add_scores_stage.py @@ -63,9 +63,9 @@ def test_add_labels(): labeled = AddClassificationsStage._add_labels(message, idx2label=class_labels, threshold=None) - assert DatasetManager.assert_df_equal(labeled.get_meta("frogs"), probs_array[:, 0]) - assert DatasetManager.assert_df_equal(labeled.get_meta("lizards"), probs_array[:, 1]) - assert DatasetManager.assert_df_equal(labeled.get_meta("toads"), probs_array[:, 2]) + DatasetManager.assert_df_equal(labeled.get_meta("frogs"), probs_array[:, 0]) + DatasetManager.assert_df_equal(labeled.get_meta("lizards"), probs_array[:, 1]) + DatasetManager.assert_df_equal(labeled.get_meta("toads"), probs_array[:, 2]) # Same thing but change the probs tensor name message = MultiResponseMessage(meta=MessageMeta(df), @@ -74,9 +74,9 @@ def test_add_labels(): labeled = AddClassificationsStage._add_labels(message, idx2label=class_labels, threshold=None) - assert DatasetManager.assert_df_equal(labeled.get_meta("frogs"), probs_array[:, 0]) - assert DatasetManager.assert_df_equal(labeled.get_meta("lizards"), probs_array[:, 1]) - assert DatasetManager.assert_df_equal(labeled.get_meta("toads"), probs_array[:, 2]) + DatasetManager.assert_df_equal(labeled.get_meta("frogs"), probs_array[:, 0]) + DatasetManager.assert_df_equal(labeled.get_meta("lizards"), probs_array[:, 1]) + DatasetManager.assert_df_equal(labeled.get_meta("toads"), probs_array[:, 2]) # Fail in missing probs data message = MultiResponseMessage(meta=MessageMeta(df), diff --git a/tests/test_deserialize_stage_pipe.py b/tests/test_deserialize_stage_pipe.py index dbda223bd4..68e55e30cb 100755 --- a/tests/test_deserialize_stage_pipe.py +++ b/tests/test_deserialize_stage_pipe.py @@ -44,7 +44,7 @@ def test_fixing_non_unique_indexes(use_cpp: bool, dataset: DatasetManager): assert not meta.has_sliceable_index() assert "_index_" not in meta.df.columns - assert dataset.assert_df_equal(meta.df, df) + dataset.assert_df_equal(meta.df, df) DeserializeStage.process_dataframe(meta, 5, ensure_sliceable_index=True) diff --git a/tests/test_file_in_out.py b/tests/test_file_in_out.py index eab5664bc9..a52549de66 100755 --- a/tests/test_file_in_out.py +++ b/tests/test_file_in_out.py @@ -197,7 +197,7 @@ def test_read_cpp_compare(input_file: str): CppConfig.set_should_use_cpp(True) df_cpp = read_file_to_df(input_file, df_type='cudf') - assert DatasetManager.assert_df_equal(df_python, df_cpp) + DatasetManager.assert_df_equal(df_python, df_cpp) @pytest.mark.slow diff --git a/tests/test_message_meta.py b/tests/test_message_meta.py index 57ac593dc9..105c732d69 100644 --- a/tests/test_message_meta.py +++ b/tests/test_message_meta.py @@ -130,10 +130,10 @@ def test_copy_dataframe(df: cudf.DataFrame): copied_df = meta.copy_dataframe() - assert DatasetManager.assert_df_equal(copied_df, df), "Should be identical" + DatasetManager.assert_df_equal(copied_df, df), "Should be identical" assert copied_df is not df, "But should be different instances" # Try setting a single value on the copy cdf = meta.copy_dataframe() cdf['v2'].iloc[3] = 47 - assert DatasetManager.assert_df_equal(meta.copy_dataframe(), df), "Should be identical" + DatasetManager.assert_df_equal(meta.copy_dataframe(), df), "Should be identical" diff --git a/tests/test_multi_message.py b/tests/test_multi_message.py index 524f3b60d3..5431f85f31 100644 --- a/tests/test_multi_message.py +++ b/tests/test_multi_message.py @@ -120,18 +120,18 @@ def _test_get_meta(df: typing.Union[cudf.DataFrame, pd.DataFrame]): # Manually slice the dataframe according to the multi settings df_sliced: cudf.DataFrame = df.iloc[multi.mess_offset:multi.mess_offset + multi.mess_count, :] - assert DatasetManager.assert_df_equal(multi.get_meta(), df_sliced) + DatasetManager.assert_df_equal(multi.get_meta(), df_sliced) # Make sure we return a table here, not a series col_name = df_sliced.columns[0] - assert DatasetManager.assert_df_equal(multi.get_meta(col_name), df_sliced[col_name]) + DatasetManager.assert_df_equal(multi.get_meta(col_name), df_sliced[col_name]) col_name = [df_sliced.columns[0], df_sliced.columns[2]] - assert DatasetManager.assert_df_equal(multi.get_meta(col_name), df_sliced[col_name]) + DatasetManager.assert_df_equal(multi.get_meta(col_name), df_sliced[col_name]) # Out of order columns col_name = [df_sliced.columns[3], df_sliced.columns[0]] - assert DatasetManager.assert_df_equal(multi.get_meta(col_name), df_sliced[col_name]) + DatasetManager.assert_df_equal(multi.get_meta(col_name), df_sliced[col_name]) # Should fail with missing column with pytest.raises(KeyError): @@ -139,7 +139,7 @@ def _test_get_meta(df: typing.Union[cudf.DataFrame, pd.DataFrame]): # Finally, check that we dont overwrite the original dataframe multi.get_meta(col_name).iloc[:] = 5 - assert DatasetManager.assert_df_equal(multi.get_meta(col_name), df_sliced[col_name]) + DatasetManager.assert_df_equal(multi.get_meta(col_name), df_sliced[col_name]) def test_get_meta(filter_probs_df: typing.Union[cudf.DataFrame, pd.DataFrame]): @@ -167,10 +167,10 @@ def test_set_meta(use_cpp: bool, dataset: DatasetManager): def test_value(columns, value): multi.set_meta(columns, value) - assert dataset.assert_df_equal(multi.get_meta(columns), value) + dataset.assert_df_equal(multi.get_meta(columns), value) # Now make sure the original dataframe is untouched - assert dataset.assert_df_equal(df_saved[saved_mask], meta.df[saved_mask]) + dataset.assert_df_equal(df_saved[saved_mask], meta.df[saved_mask]) single_column = "v2" two_columns = ["v1", "v3"] @@ -207,17 +207,17 @@ def _test_set_meta_new_column(df: typing.Union[cudf.DataFrame, pd.DataFrame], df # Set a list val_to_set = list(range(multi.mess_count)) multi.set_meta("list_column", val_to_set) - assert DatasetManager.assert_df_equal(multi.get_meta("list_column"), val_to_set) + DatasetManager.assert_df_equal(multi.get_meta("list_column"), val_to_set) # Set a string val_to_set = "string to set" multi.set_meta("string_column", val_to_set) - assert DatasetManager.assert_df_equal(multi.get_meta("string_column"), val_to_set) + DatasetManager.assert_df_equal(multi.get_meta("string_column"), val_to_set) # Set a date val_to_set = pd.date_range("2018-01-01", periods=multi.mess_count, freq="H") multi.set_meta("date_column", val_to_set) - assert DatasetManager.assert_df_equal(multi.get_meta("date_column"), val_to_set) + DatasetManager.assert_df_equal(multi.get_meta("date_column"), val_to_set) if (df_type == "cudf"): # cudf isnt capable of setting more than one new column at a time @@ -226,7 +226,7 @@ def _test_set_meta_new_column(df: typing.Union[cudf.DataFrame, pd.DataFrame], df # Now set one with new and old columns val_to_set = np.random.randn(multi.mess_count, 2) multi.set_meta(["v2", "new_column2"], val_to_set) - assert DatasetManager.assert_df_equal(multi.get_meta(["v2", "new_column2"]), val_to_set) + DatasetManager.assert_df_equal(multi.get_meta(["v2", "new_column2"]), val_to_set) def test_set_meta_new_column(use_cpp: bool, dataset: DatasetManager): @@ -272,7 +272,7 @@ def _test_copy_ranges(df: typing.Union[cudf.DataFrame, pd.DataFrame]): assert mm2.meta.df is not df assert mm2.mess_offset == 0 assert mm2.mess_count == 6 - 2 - assert DatasetManager.assert_df_equal(mm2.get_meta(), df.iloc[2:6]) + DatasetManager.assert_df_equal(mm2.get_meta(), df.iloc[2:6]) # slice two different ranges of rows mm3 = mm.copy_ranges([(2, 6), (12, 15)]) @@ -293,7 +293,7 @@ def _test_copy_ranges(df: typing.Union[cudf.DataFrame, pd.DataFrame]): expected_df = concat_fn([df.iloc[2:6], df.iloc[12:15]]) - assert DatasetManager.assert_df_equal(mm3.get_meta(), expected_df) + DatasetManager.assert_df_equal(mm3.get_meta(), expected_df) def test_copy_ranges(filter_probs_df: typing.Union[cudf.DataFrame, pd.DataFrame]): @@ -367,54 +367,54 @@ def _test_get_slice_values(df: typing.Union[cudf.DataFrame, pd.DataFrame]): multi_full = MultiMessage(meta=meta) # Single slice - assert DatasetManager.assert_df_equal(multi_full.get_slice(3, 8).get_meta(), df.iloc[3:8]) + DatasetManager.assert_df_equal(multi_full.get_slice(3, 8).get_meta(), df.iloc[3:8]) # Single slice with one columns - assert DatasetManager.assert_df_equal(multi_full.get_slice(3, 8).get_meta("v1"), df.iloc[3:8]["v1"]) + DatasetManager.assert_df_equal(multi_full.get_slice(3, 8).get_meta("v1"), df.iloc[3:8]["v1"]) # Single slice with multiple columns - assert DatasetManager.assert_df_equal( + DatasetManager.assert_df_equal( multi_full.get_slice(3, 8).get_meta(["v4", "v3", "v1"]), df.iloc[3:8][["v4", "v3", "v1"]]) # Chained slice - assert DatasetManager.assert_df_equal( + DatasetManager.assert_df_equal( multi_full.get_slice(2, 18).get_slice(5, 9).get_meta(), df.iloc[2 + 5:(2 + 5) + (9 - 5)]) # Chained slice one column - assert DatasetManager.assert_df_equal( + DatasetManager.assert_df_equal( multi_full.get_slice(2, 18).get_slice(5, 9).get_meta("v1"), df.iloc[2 + 5:(2 + 5) + (9 - 5)]["v1"]) # Chained slice multi column - assert DatasetManager.assert_df_equal( + DatasetManager.assert_df_equal( multi_full.get_slice(2, 18).get_slice(5, 9).get_meta(["v4", "v3", "v1"]), df.iloc[2 + 5:(2 + 5) + (9 - 5)][["v4", "v3", "v1"]]) # Set values multi_full.get_slice(4, 10).set_meta(None, 1.15) - assert DatasetManager.assert_df_equal(multi_full.get_slice(4, 10).get_meta(), df.iloc[4:10]) + DatasetManager.assert_df_equal(multi_full.get_slice(4, 10).get_meta(), df.iloc[4:10]) # Set values one column multi_full.get_slice(1, 6).set_meta("v3", 5.3) - assert DatasetManager.assert_df_equal(multi_full.get_slice(1, 6).get_meta("v3"), df.iloc[1:6]["v3"]) + DatasetManager.assert_df_equal(multi_full.get_slice(1, 6).get_meta("v3"), df.iloc[1:6]["v3"]) # Set values multi column multi_full.get_slice(5, 8).set_meta(["v4", "v1", "v3"], 7) - assert DatasetManager.assert_df_equal( + DatasetManager.assert_df_equal( multi_full.get_slice(5, 8).get_meta(["v4", "v1", "v3"]), df.iloc[5:8][["v4", "v1", "v3"]]) # Chained Set values multi_full.get_slice(10, 20).get_slice(1, 4).set_meta(None, 8) - assert DatasetManager.assert_df_equal( + DatasetManager.assert_df_equal( multi_full.get_slice(10, 20).get_slice(1, 4).get_meta(), df.iloc[10 + 1:(10 + 1) + (4 - 1)]) # Chained Set values one column multi_full.get_slice(10, 20).get_slice(3, 5).set_meta("v4", 112) - assert DatasetManager.assert_df_equal( + DatasetManager.assert_df_equal( multi_full.get_slice(10, 20).get_slice(3, 5).get_meta("v4"), df.iloc[10 + 3:(10 + 3) + (5 - 3)]["v4"]) # Chained Set values multi column multi_full.get_slice(10, 20).get_slice(5, 8).set_meta(["v4", "v1", "v2"], 22) - assert DatasetManager.assert_df_equal( + DatasetManager.assert_df_equal( multi_full.get_slice(10, 20).get_slice(5, 8).get_meta(["v4", "v1", "v2"]), df.iloc[10 + 5:(10 + 5) + (8 - 5)][["v4", "v1", "v2"]]) @@ -769,7 +769,7 @@ def test_tensor_slicing(use_cpp: bool, dataset: DatasetManager): assert multi_slice.mess_count == equiv_slice.mess_count assert multi_slice.offset == equiv_slice.offset assert multi_slice.count == equiv_slice.count - assert dataset.assert_df_equal(multi_slice.get_meta(), equiv_slice.get_meta()) + dataset.assert_df_equal(multi_slice.get_meta(), equiv_slice.get_meta()) # Finally, compare a double slice to a single memory = InferenceMemory(count=tensor_count, tensors={"seq_ids": seq_ids, "probs": probs}) @@ -781,4 +781,4 @@ def test_tensor_slicing(use_cpp: bool, dataset: DatasetManager): assert double_slice.offset == single_slice.offset assert double_slice.count == single_slice.count assert cp.all(double_slice.get_tensor("probs") == single_slice.get_tensor("probs")) - assert dataset.assert_df_equal(double_slice.get_meta(), single_slice.get_meta()) + dataset.assert_df_equal(double_slice.get_meta(), single_slice.get_meta()) diff --git a/tests/test_write_to_kafka_stage_pipe.py b/tests/test_write_to_kafka_stage_pipe.py index 6e7bad114d..397b1c680e 100644 --- a/tests/test_write_to_kafka_stage_pipe.py +++ b/tests/test_write_to_kafka_stage_pipe.py @@ -25,8 +25,7 @@ from morpheus.stages.output.write_to_kafka_stage import WriteToKafkaStage from morpheus.stages.postprocess.serialize_stage import SerializeStage from morpheus.stages.preprocess.deserialize_stage import DeserializeStage -from morpheus.utils import compare_df -from utils import assert_results +from utils.dataset_manager import DatasetManager if (typing.TYPE_CHECKING): from kafka import KafkaConsumer @@ -35,7 +34,7 @@ @pytest.mark.kafka @pytest.mark.use_cudf def test_write_to_kafka_stage_pipe(config, - filter_probs_df, + dataset_cudf: DatasetManager, kafka_bootstrap_servers: str, kafka_consumer: "KafkaConsumer", kafka_topics: typing.Tuple[str, str]) -> None: @@ -43,6 +42,8 @@ def test_write_to_kafka_stage_pipe(config, Even though WriteToKafkaStage only has a Python impl, testing with both C++ and Python execution to ensure it works just as well with the C++ impls of the message classes. """ + + filter_probs_df = dataset_cudf['filter_probs.csv'] pipe = LinearPipeline(config) pipe.set_source(InMemorySourceStage(config, [filter_probs_df])) pipe.add_stage(DeserializeStage(config)) @@ -62,4 +63,4 @@ def test_write_to_kafka_stage_pipe(config, assert len(output_df) == len(filter_probs_df) - assert_results(compare_df.compare_df(filter_probs_df.to_pandas(), output_df)) + dataset_cudf.assert_compare_df(filter_probs_df, output_df) diff --git a/tests/tests_data/examples/gnn_fraud_detection_pipeline/inductive_emb.csv b/tests/tests_data/examples/gnn_fraud_detection_pipeline/inductive_emb.csv new file mode 100644 index 0000000000..3c8eae2372 --- /dev/null +++ b/tests/tests_data/examples/gnn_fraud_detection_pipeline/inductive_emb.csv @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:278c304032df75c9c2e13d0367c08ad4d652fb77a86a50ea7b1d26c37e282a76 +size 8142 diff --git a/tests/tests_data/examples/gnn_fraud_detection_pipeline/predictions.csv b/tests/tests_data/examples/gnn_fraud_detection_pipeline/predictions.csv new file mode 100644 index 0000000000..87256ef044 --- /dev/null +++ b/tests/tests_data/examples/gnn_fraud_detection_pipeline/predictions.csv @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:4baf153412823c07ebc35684ef88e2237aeef159ba0857dd98ef80190c639e44 +size 168 diff --git a/tests/tests_data/examples/ransomware_detection/dask_results.csv b/tests/tests_data/examples/ransomware_detection/dask_results.csv new file mode 100644 index 0000000000..57392631fd --- /dev/null +++ b/tests/tests_data/examples/ransomware_detection/dask_results.csv @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:d88740c15ee80c199e5d1a59fc1c469ba188e3717fa88f2ece0286df70860b2a +size 4456 diff --git a/tests/utils/__init__.py b/tests/utils/__init__.py index fa25defca5..d854bd988d 100755 --- a/tests/utils/__init__.py +++ b/tests/utils/__init__.py @@ -17,8 +17,11 @@ import json import os import time +import types import typing +import pytest + from morpheus.io.deserializers import read_file_to_df from .test_directories import TestDirectories @@ -138,3 +141,18 @@ def assert_results(results: dict) -> dict: assert results["diff_cols"] == 0, f"Expected diff_cols=0 : {results}" assert results["diff_rows"] == 0, f"Expected diff_rows=0 : {results}" return results + + +def import_or_skip(modname: str, + minversion: str = None, + reason: str = None, + fail_missing: bool = False) -> types.ModuleType: + """ + Wrapper for `pytest.importorskip` will re-raise any `Skipped` exceptions as `ImportError` if `fail_missing` is True. + """ + try: + return pytest.importorskip(modname, minversion=minversion, reason=reason) + except pytest.skip.Exception as e: + if fail_missing: + raise ImportError(e) + raise diff --git a/tests/utils/dataset_manager.py b/tests/utils/dataset_manager.py index 61fdd839da..05f67b1498 100644 --- a/tests/utils/dataset_manager.py +++ b/tests/utils/dataset_manager.py @@ -24,7 +24,9 @@ import cudf as cdf # rename to avoid clash with property method from morpheus.io.deserializers import read_file_to_df +from morpheus.utils import compare_df from utils import TEST_DIRS +from utils import assert_results class DatasetManager(object): @@ -180,19 +182,47 @@ def dup_index(cls, # Return a new dataframe where we replace some index values with others return cls.replace_index(df, replace_dict) - @staticmethod - def assert_df_equal(df_to_check: typing.Union[pd.DataFrame, cdf.DataFrame], val_to_check: typing.Any) -> bool: + def value_as_pandas(val: typing.Union[pd.DataFrame, cdf.DataFrame, cdf.Series], assert_is_pandas=True): + if (isinstance(val, cdf.DataFrame) or isinstance(val, cdf.Series)): + return val.to_pandas() + + if assert_is_pandas: + assert isinstance(val, (pd.DataFrame, pd.Series)), type(val) + + return val + + @classmethod + def assert_df_equal(cls, df_to_check: typing.Union[pd.DataFrame, cdf.DataFrame], val_to_check: typing.Any): """Compare a DataFrame against a validation dataset which can either be a DataFrame, Series or CuPy array.""" # Comparisons work better in cudf so convert everything to that - if (isinstance(df_to_check, cdf.DataFrame) or isinstance(df_to_check, cdf.Series)): - df_to_check = df_to_check.to_pandas() + df_to_check = cls.value_as_pandas(df_to_check) - if (isinstance(val_to_check, cdf.DataFrame) or isinstance(val_to_check, cdf.Series)): - val_to_check = val_to_check.to_pandas() - elif (isinstance(val_to_check, cp.ndarray)): + if (isinstance(val_to_check, cp.ndarray)): val_to_check = val_to_check.get() + else: + val_to_check = cls.value_as_pandas(val_to_check, assert_is_pandas=False) bool_df = df_to_check == val_to_check - return bool(bool_df.all(axis=None)) + assert bool(bool_df.all(axis=None)) + + @classmethod + def compare_df(cls, + dfa: typing.Union[pd.DataFrame, cdf.DataFrame], + dfb: typing.Union[pd.DataFrame, cdf.DataFrame], + **compare_args): + """ + Wrapper for morpheus.utils.compare_df.compare_df + """ + return compare_df.compare_df(cls.value_as_pandas(dfa), cls.value_as_pandas(dfb), **compare_args) + + @classmethod + def assert_compare_df(cls, + dfa: typing.Union[pd.DataFrame, cdf.DataFrame], + dfb: typing.Union[pd.DataFrame, cdf.DataFrame], + **compare_args): + """ + Convenience method for calling compare_df and asserting that the results are the same + """ + assert_results(cls.compare_df(dfa, dfb, **compare_args))