Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DFP MultiFileSource optionally poll for file updates #978

Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,8 @@ Both scripts are capable of running either a training or inference pipeline for
| `--log_level` | One of: `CRITICAL`, `FATAL`, `ERROR`, `WARN`, `WARNING`, `INFO`, `DEBUG` | Specify the logging level to use. [default: `WARNING`] |
| `--sample_rate_s` | INTEGER | Minimum time step, in milliseconds, between object logs. [env var: `DFP_SAMPLE_RATE_S`; default: 0] |
| `-f`, `--input_file` | TEXT | List of files to process. Can specify multiple arguments for multiple files. Also accepts glob (*) wildcards and schema prefixes such as `s3://`. For example, to make a local cache of an s3 bucket, use `filecache::s3://mybucket/*`. Refer to [fsspec documentation](https://filesystem-spec.readthedocs.io/en/latest/api.html?highlight=open_files#fsspec.open_files) for list of possible options. |
| `--watch_inputs` | FLAG | Instructs the pipeline to continuously check the paths specified by `--input_file` for new files. This assumes that the at least one paths contains a wildcard. |
| `--watch_interval` | FLOAT | Amount of time, in seconds, to wait between checks for new files. Only used if --watch_inputs is set. [default `1.0`] |
| `--tracking_uri` | TEXT | The MLflow tracking URI to connect to the tracking backend. [default: `http://localhost:5000`] |
| `--help` | | Show this message and exit. |

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ The `MultiFileSource`([`examples/digital_fingerprinting/production/morpheus/dfp/
| -------- | ---- | ----------- |
| `c` | `morpheus.config.Config` | Morpheus config object |
| `filenames` | `List[str]` or `str` | Paths to source file to be read from |
| `watch` | `bool` | Optional: when True will repeatedly poll `filenames` for new files. This assumes that at least one of the paths in `filenames` containes a wildcard. By default False. |
| `watch_interval` | `float` | When `watch` is True, this is the time in seconds between polling the paths in `filenames` for new files. Ignored when `watch` is False. |


#### File Batcher Stage (`DFPFileBatcherStage`)
Expand Down
2 changes: 2 additions & 0 deletions examples/digital_fingerprinting/production/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ Both scripts are capable of running either a training or inference pipeline for
| `--log_level` | One of: `CRITICAL`, `FATAL`, `ERROR`, `WARN`, `WARNING`, `INFO`, `DEBUG` | Specify the logging level to use. [default: `WARNING`] |
| `--sample_rate_s` | INTEGER | Minimum time step, in milliseconds, between object logs. [env var: `DFP_SAMPLE_RATE_S`; default: 0] |
| `-f`, `--input_file` | TEXT | List of files to process. Can specify multiple arguments for multiple files. Also accepts glob (*) wildcards and schema prefixes such as `s3://`. For example, to make a local cache of an s3 bucket, use `filecache::s3://mybucket/*`. Refer to [fsspec documentation](https://filesystem-spec.readthedocs.io/en/latest/api.html?highlight=open_files#fsspec.open_files) for list of possible options. |
| `--watch_inputs` | FLAG | Instructs the pipeline to continuously check the paths specified by `--input_file` for new files. This assumes that the at least one paths contains a wildcard. |
| `--watch_interval` | FLOAT | Amount of time, in seconds, to wait between checks for new files. Only used if --watch_inputs is set. [default `1.0`] |
| `--tracking_uri` | TEXT | The MLflow tracking URI to connect to the tracking backend. [default: `http://localhost:5000`] |
| `--help` | | Show this message and exit. |

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"""Source stages for reading multiple files from a list of fsspec paths."""

import logging
import time
import typing

import fsspec
Expand All @@ -24,7 +25,7 @@
from morpheus.pipeline.single_output_source import SingleOutputSource
from morpheus.pipeline.stream_pair import StreamPair

logger = logging.getLogger("morpheus.{}".format(__name__))
logger = logging.getLogger(f"morpheus.{__name__}")


class MultiFileSource(SingleOutputSource):
Expand All @@ -40,12 +41,20 @@ class MultiFileSource(SingleOutputSource):
List of paths to be read from, can be a list of S3 urls (`s3://path`) amd can include wildcard characters `*`
as defined by `fsspec`:
https://filesystem-spec.readthedocs.io/en/latest/api.html?highlight=open_files#fsspec.open_files
watch : bool, default = False
When True, will check `filenames` for new files and emit them as they appear. This assumes that at least one of
the paths in `filenames` contains a wildecard character.
watch_interval : float, default = 1.0
When `watch` is True, this is the time in seconds between polling the paths in `filenames` for new files.
Ignored when `watch` is False.
"""

def __init__(
self,
c: Config,
filenames: typing.List[str],
watch: bool = False,
watch_interval: float = 1.0,
):
super().__init__(c)

Expand All @@ -55,6 +64,8 @@ def __init__(

self._input_count = None
self._max_concurrent = c.num_threads
self._watch = watch
self._watch_interval = watch_interval

@property
def name(self) -> str:
Expand All @@ -80,10 +91,43 @@ def _generate_frames_fsspec(self) -> typing.Iterable[fsspec.core.OpenFiles]:

yield files

def _polling_generate_frames_fsspec(self) -> typing.Iterable[fsspec.core.OpenFiles]:
files_seen = set()
while (True):
file_set = set()
filtered_files = []

files = fsspec.open_files(self._filenames)
for file in files:
file_set.add(file.full_name)
if file.full_name not in files_seen:
filtered_files.append(file)

# Replace files_seen with the new set of files. This prevents a memory leak that could occurr if files are
# deleted from the input directory. In addition if a file with a given name was created, seen/processed by
# the stage, and then deleted, and a new file with the same name appeared sometime later, the stage will
# need to re-ingest that new file.
files_seen = file_set

pre_yield_time = time.time()
if len(filtered_files) > 0:
yield fsspec.core.OpenFiles(filtered_files, fs=files.fs)

post_yield_time = time.time()

# because yielding to the output channel can block, we should only sleep when yielding didn't take longer
# than the watch interval
yield_time = post_yield_time - pre_yield_time
if yield_time < self._watch_interval:
time.sleep(self._watch_interval - yield_time)

def _build_source(self, builder: mrc.Builder) -> StreamPair:

if self._build_cpp_node():
raise RuntimeError("Does not support C++ nodes")

if self._watch:
out_stream = builder.make_source(self.unique_name, self._polling_generate_frames_fsspec())
else:
out_stream = builder.make_source(self.unique_name, self._generate_frames_fsspec())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""DFP training & inference pipelines for Azure Active Directory logs."""

import functools
import logging
Expand Down Expand Up @@ -121,6 +122,17 @@
"For example, to make a local cache of an s3 bucket, use `filecache::s3://mybucket/*`. "
"Refer to fsspec documentation for list of possible options."),
)
@click.option('--watch_inputs',
type=bool,
is_flag=True,
default=False,
help=("Instructs the pipeline to continuously check the paths specified by `--input_file` for new files. "
"This assumes that the at least one paths contains a wildcard."))
@click.option("--watch_interval",
type=float,
default=1.0,
help=("Amount of time, in seconds, to wait between checks for new files. "
"Only used if --watch_inputs is set."))
@click.option('--tracking_uri',
type=str,
default="http://mlflow:5000",
Expand All @@ -134,8 +146,9 @@ def run_pipeline(train_users,
log_level,
sample_rate_s,
**kwargs):
"""Runs the DFP pipeline."""
# To include the generic, we must be training all or generic
include_generic = train_users == "all" or train_users == "generic"
include_generic = train_users in ("all", "generic")

# To include individual, we must be either training or inferring
include_individual = train_users != "generic"
Expand Down Expand Up @@ -163,7 +176,7 @@ def run_pipeline(train_users,
if (len(skip_users) > 0 and len(only_users) > 0):
logging.error("Option --skip_user and --only_user are mutually exclusive. Exiting")

logger = logging.getLogger("morpheus.{}".format(__name__))
logger = logging.getLogger(f"morpheus.{__name__}")

logger.info("Running training pipeline with the following options: ")
logger.info("Train generic_user: %s", include_generic)
Expand Down Expand Up @@ -248,7 +261,11 @@ def run_pipeline(train_users,
# Create a linear pipeline object
pipeline = LinearPipeline(config)

pipeline.set_source(MultiFileSource(config, filenames=list(kwargs["input_file"])))
pipeline.set_source(
MultiFileSource(config,
filenames=list(kwargs["input_file"]),
watch=kwargs["watch_inputs"],
watch_interval=kwargs["watch_interval"]))

# Batch files into buckets by time. Use the default ISO date extractor from the filename
pipeline.add_stage(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""DFP training & inference pipelines for Duo Authentication logs."""

import functools
import logging
Expand Down Expand Up @@ -122,6 +123,17 @@
"For example, to make a local cache of an s3 bucket, use `filecache::s3://mybucket/*`. "
"Refer to fsspec documentation for list of possible options."),
)
@click.option('--watch_inputs',
type=bool,
is_flag=True,
default=False,
help=("Instructs the pipeline to continuously check the paths specified by `--input_file` for new files. "
"This assumes that the at least one paths contains a wildcard."))
@click.option("--watch_interval",
type=float,
default=1.0,
help=("Amount of time, in seconds, to wait between checks for new files. "
"Only used if --watch_inputs is set."))
@click.option('--tracking_uri',
type=str,
default="http://mlflow:5000",
Expand All @@ -135,8 +147,9 @@ def run_pipeline(train_users,
log_level,
sample_rate_s,
**kwargs):
"""Runs the DFP pipeline."""
# To include the generic, we must be training all or generic
include_generic = train_users == "all" or train_users == "generic"
include_generic = train_users in ("all", "generic")

# To include individual, we must be either training or inferring
include_individual = train_users != "generic"
Expand Down Expand Up @@ -164,7 +177,7 @@ def run_pipeline(train_users,
if (len(skip_users) > 0 and len(only_users) > 0):
logging.error("Option --skip_user and --only_user are mutually exclusive. Exiting")

logger = logging.getLogger("morpheus.{}".format(__name__))
logger = logging.getLogger("morpheus.{__name__}")

logger.info("Running training pipeline with the following options: ")
logger.info("Train generic_user: %s", include_generic)
Expand Down Expand Up @@ -243,7 +256,11 @@ def run_pipeline(train_users,
# Create a linear pipeline object
pipeline = LinearPipeline(config)

pipeline.set_source(MultiFileSource(config, filenames=list(kwargs["input_file"])))
pipeline.set_source(
MultiFileSource(config,
filenames=list(kwargs["input_file"]),
watch=kwargs["watch_inputs"],
watch_interval=kwargs["watch_interval"]))

# Batch files into buckets by time. Use the default ISO date extractor from the filename
pipeline.add_stage(
Expand Down
4 changes: 0 additions & 4 deletions morpheus/cli/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@
def run_cli():
"""Main entrypoint for the CLI"""
from morpheus.cli.commands import cli

# the `cli` method expects a `ctx` instance which is provided by the `prepare_command` decordator, but pylint
# is unaware of this and will complain about the missing `ctx` parameter. We can safely ignore this error.
# pylint: disable=no-value-for-parameter
cli(obj={}, auto_envvar_prefix='MORPHEUS', show_default=True, prog_name="morpheus")


Expand Down
9 changes: 8 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,14 @@ missing-member-max-choices = 1
mixin-class-rgx = ".*[Mm]ixin"

# List of decorators that change the signature of a decorated function.
# signature-mutators =
signature-mutators = [
'click.decorators.option',
'click.decorators.argument',
'click.decorators.version_option',
'click.decorators.help_option',
'click.decorators.pass_context',
'click.decorators.confirmation_option',
]

[tool.pylint.variables]
# List of additional names supposed to be defined in builtins. Remember that you
Expand Down
46 changes: 41 additions & 5 deletions tests/examples/digital_fingerprinting/test_multifile_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import glob
import os
from unittest import mock

import pytest

Expand All @@ -31,33 +32,68 @@ def test_constructor(config: Config):
config.pipeline_batch_size = batch_size
config.num_threads = n_threads
filenames = ['some/file', '/tmp/some/files-2023-*-*.csv', 's3://some/bucket/2023-*-*.csv.gz']
stage = MultiFileSource(config, filenames=filenames)
stage = MultiFileSource(config, filenames=filenames, watch=False, watch_interval=2.1)

assert isinstance(stage, SingleOutputSource)
assert stage._batch_size == batch_size
assert stage._max_concurrent == n_threads
assert stage._filenames == filenames
assert not stage._watch
assert stage._watch_interval == 2.1


def test_generate_frames_fsspec(config: Config):
def test_generate_frames_fsspec(config: Config, tmp_path: str):
from dfp.stages.multi_file_source import MultiFileSource

file_glob = os.path.join(TEST_DIRS.tests_data_dir, 'appshield', 'snapshot-1', '*.json')
stage = MultiFileSource(config, filenames=[file_glob])
temp_glob = os.path.join(tmp_path, '*.json') # this won't match anything
stage = MultiFileSource(config, filenames=[file_glob, temp_glob], watch=False)

specs = next(stage._generate_frames_fsspec())
fsspec_gen = stage._generate_frames_fsspec()
specs = next(fsspec_gen)

files = sorted(f.path for f in specs)
assert files == sorted(glob.glob(file_glob))

# Verify that we are not in watch mode
with open(os.path.join(tmp_path, 'new-file.json'), 'w', encoding='utf-8') as f:
f.write('{"foo": "bar"}')

with pytest.raises(StopIteration):
next(fsspec_gen)


@mock.patch('time.sleep')
def test_polling_generate_frames_fsspec(amock_time: mock.MagicMock, config: Config, tmp_path: str):
from dfp.stages.multi_file_source import MultiFileSource

file_glob = os.path.join(TEST_DIRS.tests_data_dir, 'appshield', 'snapshot-1', '*.json')
temp_glob = os.path.join(tmp_path, '*.json') # this won't match anything
stage = MultiFileSource(config, filenames=[file_glob, temp_glob], watch=True, watch_interval=0.2)

fsspec_gen = stage._polling_generate_frames_fsspec()
specs = next(fsspec_gen)

files = sorted(f.path for f in specs)
assert files == sorted(glob.glob(file_glob))

# Verify that we are not in watch mode
with open(os.path.join(tmp_path, 'new-file.json'), 'w', encoding='utf-8') as f:
f.write('{"foo": "bar"}')

specs = next(fsspec_gen)
assert len(specs) == 1
assert specs[0].path == os.path.join(tmp_path, 'new-file.json')
amock_time.assert_called_once()


def test_generate_frames_fsspec_no_files(config: Config, tmp_path: str):
from dfp.stages.multi_file_source import MultiFileSource

assert os.listdir(tmp_path) == []

filenames = [os.path.join(tmp_path, '*.csv')]
stage = MultiFileSource(config, filenames=filenames)
stage = MultiFileSource(config, filenames=filenames, watch=False)

with pytest.raises(RuntimeError):
next(stage._generate_frames_fsspec())