Skip to content

Commit

Permalink
Add support for fsspec.core.OpenFile instances to the MonitorStage (n…
Browse files Browse the repository at this point in the history
…v-morpheus#942)

* Allows the `MonitorStage` to be placed immediately after stages such as `MultiFileSource` which emits `fsspec.core.OpenFiles` instead of message types.
* Adds more tests for `MonitorStage`, some of these test that when the stage does raise a `NotImplementedError` that the pipeline halts propperly which was ultimately fixed in MRC PRs nv-morpheus/MRC#326 and nv-morpheus/MRC#327

fixes nv-morpheus#941

Authors:
  - David Gardner (https://github.com/dagardner-nv)

Approvers:
  - Michael Demoret (https://github.com/mdemoret-nv)

URL: nv-morpheus#942
  • Loading branch information
dagardner-nv authored May 15, 2023
1 parent cef498a commit e595793
Show file tree
Hide file tree
Showing 4 changed files with 188 additions and 17 deletions.
3 changes: 2 additions & 1 deletion morpheus/utils/monitor_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import typing
from functools import reduce

import fsspec
from tqdm import TMonitor
from tqdm import TqdmSynchronisationWarning
from tqdm import tqdm
Expand Down Expand Up @@ -315,7 +316,7 @@ def check_df(y):
elif (isinstance(x, list)):
item_count_fn = self.auto_count_fn(x[0])
return lambda y: reduce(lambda sum, z, item_count_fn=item_count_fn: sum + item_count_fn(z), y, 0)
elif (isinstance(x, str)):
elif (isinstance(x, (str, fsspec.core.OpenFile))):
return lambda y: 1
elif (hasattr(x, "__len__")):
return len # Return len directly (same as `lambda y: len(y)`)
Expand Down
92 changes: 92 additions & 0 deletions tests/test_error_pipe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
#!/usr/bin/env python
# SPDX-FileCopyrightText: Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import typing

import mrc
import pandas as pd
import pytest

from morpheus.config import Config
from morpheus.pipeline import LinearPipeline
from morpheus.pipeline.single_output_source import SingleOutputSource
from morpheus.pipeline.stream_pair import StreamPair
from morpheus.stages.general.monitor_stage import MonitorStage
from morpheus.stages.input.in_memory_source_stage import InMemorySourceStage
from morpheus.stages.output.in_memory_sink_stage import InMemorySinkStage
from utils.stages.error_raiser import ErrorRaiserStage


class InMemSourceXStage(SingleOutputSource):
"""
InMemorySourceStage subclass that emits whatever you give it and doesn't assume the source data
is a dataframe.
"""

def __init__(self, c: Config, data: typing.List[typing.Any]):
super().__init__(c)
self._data = data

@property
def name(self) -> str:
return "from-data"

def supports_cpp_node(self) -> bool:
return False

def _emit_data(self) -> typing.Iterator[typing.Any]:
for x in self._data:
yield x

def _build_source(self, builder: mrc.Builder) -> StreamPair:
node = builder.make_source(self.unique_name, self._emit_data())
return node, type(self._data[0])


@pytest.mark.parametrize("exception_cls", [RuntimeError, ValueError, NotImplementedError])
def test_stage_raises_exception(config: Config, filter_probs_df: pd.DataFrame, exception_cls: type[Exception]):
pipe = LinearPipeline(config)
pipe.set_source(InMemorySourceStage(config, [filter_probs_df]))
error_raiser_stage = pipe.add_stage(ErrorRaiserStage(config, exception_cls=exception_cls))
sink_stage = pipe.add_stage(InMemorySinkStage(config))

with pytest.raises(exception_cls):
pipe.run()

# Ensure that the raised exception was from our stage and not from something else
assert error_raiser_stage.error_raised
assert len(sink_stage.get_messages()) == 0


@pytest.mark.use_python
@pytest.mark.parametrize("delayed_start", [False, True])
def test_monitor_not_impl(config: Config, delayed_start: bool):

class UnsupportedType:
pass

pipe = LinearPipeline(config)
pipe.set_source(InMemSourceXStage(config, [UnsupportedType()]))
monitor_stage = pipe.add_stage(MonitorStage(config, log_level=logging.WARNING, delayed_start=delayed_start))
sink_stage = pipe.add_stage(InMemorySinkStage(config))

assert monitor_stage._mc.is_enabled()

with pytest.raises(NotImplementedError):
pipe.run()

assert len(sink_stage.get_messages()) == 0
44 changes: 28 additions & 16 deletions tests/test_monitor_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import typing
from unittest import mock

import fsspec
import mrc
import pytest

Expand Down Expand Up @@ -99,27 +100,38 @@ def test_refresh(mock_morph_tqdm, config):
mock_morph_tqdm.refresh.assert_called_once()


def test_auto_count_fn(config):
@pytest.mark.parametrize('value,expected_fn,expected',
[
(None, False, None),
([], False, None),
(['s'], True, 1),
('s', True, 1),
('test', True, 1),
(cudf.DataFrame(), True, 0),
(cudf.DataFrame(range(12), columns=["test"]), True, 12),
(MultiMessage(meta=MessageMeta(df=cudf.DataFrame(range(12), columns=["test"]))), True, 12),
({}, True, 0),
(tuple(), True, 0),
(set(), True, 0),
(fsspec.open_files(os.path.join(TEST_DIRS.tests_data_dir, 'filter_probs.csv')), True, 1),
])
def test_auto_count_fn(config, value: typing.Any, expected_fn: bool, expected: typing.Union[int, None]):
m = MonitorStage(config, log_level=logging.WARNING)

assert m._mc.auto_count_fn(None) is None
assert m._mc.auto_count_fn([]) is None
auto_fn = m._mc.auto_count_fn(value)
if expected_fn:
assert callable(auto_fn)
assert auto_fn(value) == expected
else:
assert auto_fn is None

# Ints not supported, lists are, but lists of unsupported are also unsupported
pytest.raises(NotImplementedError, m._mc.auto_count_fn, 1)
pytest.raises(NotImplementedError, m._mc.auto_count_fn, [1])

# Just verify that we get a valid function for each supported type
assert inspect.isfunction(m._mc.auto_count_fn(['s']))
assert inspect.isfunction(m._mc.auto_count_fn('s'))
assert inspect.isfunction(m._mc.auto_count_fn(cudf.DataFrame()))
assert inspect.isfunction(
m._mc.auto_count_fn(MultiMessage(meta=MessageMeta(df=cudf.DataFrame(range(12), columns=["test"])))))
@pytest.mark.parametrize('value', [1, [1], [2, 0]])
def test_auto_count_fn_not_impl(config, value: typing.Any):
m = MonitorStage(config, log_level=logging.WARNING)

# Other iterables return the len function
assert m._mc.auto_count_fn({}) is len
assert m._mc.auto_count_fn(()) is len
assert m._mc.auto_count_fn(set()) is len
with pytest.raises(NotImplementedError):
m._mc.auto_count_fn(value)


@mock.patch('morpheus.utils.monitor_utils.MorpheusTqdm')
Expand Down
66 changes: 66 additions & 0 deletions tests/utils/stages/error_raiser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# SPDX-FileCopyrightText: Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import typing

import mrc
from mrc.core import operators as ops

from morpheus.config import Config
from morpheus.pipeline.single_port_stage import SinglePortStage
from morpheus.pipeline.stream_pair import StreamPair
from morpheus.utils.atomic_integer import AtomicInteger


class ErrorRaiserStage(SinglePortStage):
"""
Stage that raises an exception in the on_data method
"""

def __init__(self, config: Config, exception_cls: type[Exception] = RuntimeError, raise_on: int = 0):
assert raise_on >= 0

super().__init__(config)
self._exception_cls = exception_cls
self._raise_on = raise_on
self._counter = AtomicInteger(0)
self._error_raised = False

@property
def name(self) -> str:
return "error-raiser"

def accepted_types(self) -> typing.Tuple:
return (typing.Any, )

def supports_cpp_node(self) -> bool:
return False

def on_data(self, message: typing.Any):
count = self._counter.get_and_inc()
if count >= self._raise_on:
self._error_raised = True
raise self._exception_cls(f"ErrorRaiserStage: raising exception on message {count}")
return message

@property
def error_raised(self) -> bool:
return self._error_raised

def _build_single(self, builder: mrc.Builder, input_stream: StreamPair) -> StreamPair:
node = builder.make_node(self.unique_name, ops.map(self.on_data))
builder.make_edge(input_stream[0], node)

return node, input_stream[1]

0 comments on commit e595793

Please sign in to comment.