From e595793be9d22b8f349f007baf56ae1224338e68 Mon Sep 17 00:00:00 2001 From: David Gardner <96306125+dagardner-nv@users.noreply.github.com> Date: Mon, 15 May 2023 09:59:36 -0700 Subject: [PATCH] Add support for fsspec.core.OpenFile instances to the MonitorStage (#942) * Allows the `MonitorStage` to be placed immediately after stages such as `MultiFileSource` which emits `fsspec.core.OpenFiles` instead of message types. * Adds more tests for `MonitorStage`, some of these test that when the stage does raise a `NotImplementedError` that the pipeline halts propperly which was ultimately fixed in MRC PRs https://github.com/nv-morpheus/MRC/pull/326 and https://github.com/nv-morpheus/MRC/pull/327 fixes #941 Authors: - David Gardner (https://github.com/dagardner-nv) Approvers: - Michael Demoret (https://github.com/mdemoret-nv) URL: https://github.com/nv-morpheus/Morpheus/pull/942 --- morpheus/utils/monitor_utils.py | 3 +- tests/test_error_pipe.py | 92 ++++++++++++++++++++++++++++++ tests/test_monitor_stage.py | 44 ++++++++------ tests/utils/stages/error_raiser.py | 66 +++++++++++++++++++++ 4 files changed, 188 insertions(+), 17 deletions(-) create mode 100755 tests/test_error_pipe.py create mode 100644 tests/utils/stages/error_raiser.py diff --git a/morpheus/utils/monitor_utils.py b/morpheus/utils/monitor_utils.py index e6c36122ca..7bc473828a 100644 --- a/morpheus/utils/monitor_utils.py +++ b/morpheus/utils/monitor_utils.py @@ -16,6 +16,7 @@ import typing from functools import reduce +import fsspec from tqdm import TMonitor from tqdm import TqdmSynchronisationWarning from tqdm import tqdm @@ -315,7 +316,7 @@ def check_df(y): elif (isinstance(x, list)): item_count_fn = self.auto_count_fn(x[0]) return lambda y: reduce(lambda sum, z, item_count_fn=item_count_fn: sum + item_count_fn(z), y, 0) - elif (isinstance(x, str)): + elif (isinstance(x, (str, fsspec.core.OpenFile))): return lambda y: 1 elif (hasattr(x, "__len__")): return len # Return len directly (same as `lambda y: len(y)`) diff --git a/tests/test_error_pipe.py b/tests/test_error_pipe.py new file mode 100755 index 0000000000..09fae093d3 --- /dev/null +++ b/tests/test_error_pipe.py @@ -0,0 +1,92 @@ +#!/usr/bin/env python +# SPDX-FileCopyrightText: Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import typing + +import mrc +import pandas as pd +import pytest + +from morpheus.config import Config +from morpheus.pipeline import LinearPipeline +from morpheus.pipeline.single_output_source import SingleOutputSource +from morpheus.pipeline.stream_pair import StreamPair +from morpheus.stages.general.monitor_stage import MonitorStage +from morpheus.stages.input.in_memory_source_stage import InMemorySourceStage +from morpheus.stages.output.in_memory_sink_stage import InMemorySinkStage +from utils.stages.error_raiser import ErrorRaiserStage + + +class InMemSourceXStage(SingleOutputSource): + """ + InMemorySourceStage subclass that emits whatever you give it and doesn't assume the source data + is a dataframe. + """ + + def __init__(self, c: Config, data: typing.List[typing.Any]): + super().__init__(c) + self._data = data + + @property + def name(self) -> str: + return "from-data" + + def supports_cpp_node(self) -> bool: + return False + + def _emit_data(self) -> typing.Iterator[typing.Any]: + for x in self._data: + yield x + + def _build_source(self, builder: mrc.Builder) -> StreamPair: + node = builder.make_source(self.unique_name, self._emit_data()) + return node, type(self._data[0]) + + +@pytest.mark.parametrize("exception_cls", [RuntimeError, ValueError, NotImplementedError]) +def test_stage_raises_exception(config: Config, filter_probs_df: pd.DataFrame, exception_cls: type[Exception]): + pipe = LinearPipeline(config) + pipe.set_source(InMemorySourceStage(config, [filter_probs_df])) + error_raiser_stage = pipe.add_stage(ErrorRaiserStage(config, exception_cls=exception_cls)) + sink_stage = pipe.add_stage(InMemorySinkStage(config)) + + with pytest.raises(exception_cls): + pipe.run() + + # Ensure that the raised exception was from our stage and not from something else + assert error_raiser_stage.error_raised + assert len(sink_stage.get_messages()) == 0 + + +@pytest.mark.use_python +@pytest.mark.parametrize("delayed_start", [False, True]) +def test_monitor_not_impl(config: Config, delayed_start: bool): + + class UnsupportedType: + pass + + pipe = LinearPipeline(config) + pipe.set_source(InMemSourceXStage(config, [UnsupportedType()])) + monitor_stage = pipe.add_stage(MonitorStage(config, log_level=logging.WARNING, delayed_start=delayed_start)) + sink_stage = pipe.add_stage(InMemorySinkStage(config)) + + assert monitor_stage._mc.is_enabled() + + with pytest.raises(NotImplementedError): + pipe.run() + + assert len(sink_stage.get_messages()) == 0 diff --git a/tests/test_monitor_stage.py b/tests/test_monitor_stage.py index 8e9a22fe14..896fb1b18a 100755 --- a/tests/test_monitor_stage.py +++ b/tests/test_monitor_stage.py @@ -21,6 +21,7 @@ import typing from unittest import mock +import fsspec import mrc import pytest @@ -99,27 +100,38 @@ def test_refresh(mock_morph_tqdm, config): mock_morph_tqdm.refresh.assert_called_once() -def test_auto_count_fn(config): +@pytest.mark.parametrize('value,expected_fn,expected', + [ + (None, False, None), + ([], False, None), + (['s'], True, 1), + ('s', True, 1), + ('test', True, 1), + (cudf.DataFrame(), True, 0), + (cudf.DataFrame(range(12), columns=["test"]), True, 12), + (MultiMessage(meta=MessageMeta(df=cudf.DataFrame(range(12), columns=["test"]))), True, 12), + ({}, True, 0), + (tuple(), True, 0), + (set(), True, 0), + (fsspec.open_files(os.path.join(TEST_DIRS.tests_data_dir, 'filter_probs.csv')), True, 1), + ]) +def test_auto_count_fn(config, value: typing.Any, expected_fn: bool, expected: typing.Union[int, None]): m = MonitorStage(config, log_level=logging.WARNING) - assert m._mc.auto_count_fn(None) is None - assert m._mc.auto_count_fn([]) is None + auto_fn = m._mc.auto_count_fn(value) + if expected_fn: + assert callable(auto_fn) + assert auto_fn(value) == expected + else: + assert auto_fn is None - # Ints not supported, lists are, but lists of unsupported are also unsupported - pytest.raises(NotImplementedError, m._mc.auto_count_fn, 1) - pytest.raises(NotImplementedError, m._mc.auto_count_fn, [1]) - # Just verify that we get a valid function for each supported type - assert inspect.isfunction(m._mc.auto_count_fn(['s'])) - assert inspect.isfunction(m._mc.auto_count_fn('s')) - assert inspect.isfunction(m._mc.auto_count_fn(cudf.DataFrame())) - assert inspect.isfunction( - m._mc.auto_count_fn(MultiMessage(meta=MessageMeta(df=cudf.DataFrame(range(12), columns=["test"]))))) +@pytest.mark.parametrize('value', [1, [1], [2, 0]]) +def test_auto_count_fn_not_impl(config, value: typing.Any): + m = MonitorStage(config, log_level=logging.WARNING) - # Other iterables return the len function - assert m._mc.auto_count_fn({}) is len - assert m._mc.auto_count_fn(()) is len - assert m._mc.auto_count_fn(set()) is len + with pytest.raises(NotImplementedError): + m._mc.auto_count_fn(value) @mock.patch('morpheus.utils.monitor_utils.MorpheusTqdm') diff --git a/tests/utils/stages/error_raiser.py b/tests/utils/stages/error_raiser.py new file mode 100644 index 0000000000..aef1ded487 --- /dev/null +++ b/tests/utils/stages/error_raiser.py @@ -0,0 +1,66 @@ +# SPDX-FileCopyrightText: Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import typing + +import mrc +from mrc.core import operators as ops + +from morpheus.config import Config +from morpheus.pipeline.single_port_stage import SinglePortStage +from morpheus.pipeline.stream_pair import StreamPair +from morpheus.utils.atomic_integer import AtomicInteger + + +class ErrorRaiserStage(SinglePortStage): + """ + Stage that raises an exception in the on_data method + """ + + def __init__(self, config: Config, exception_cls: type[Exception] = RuntimeError, raise_on: int = 0): + assert raise_on >= 0 + + super().__init__(config) + self._exception_cls = exception_cls + self._raise_on = raise_on + self._counter = AtomicInteger(0) + self._error_raised = False + + @property + def name(self) -> str: + return "error-raiser" + + def accepted_types(self) -> typing.Tuple: + return (typing.Any, ) + + def supports_cpp_node(self) -> bool: + return False + + def on_data(self, message: typing.Any): + count = self._counter.get_and_inc() + if count >= self._raise_on: + self._error_raised = True + raise self._exception_cls(f"ErrorRaiserStage: raising exception on message {count}") + return message + + @property + def error_raised(self) -> bool: + return self._error_raised + + def _build_single(self, builder: mrc.Builder, input_stream: StreamPair) -> StreamPair: + node = builder.make_node(self.unique_name, ops.map(self.on_data)) + builder.make_edge(input_stream[0], node) + + return node, input_stream[1]