Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix iso_date_regex_pattern config in file_batcher module and allow override #1580

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions morpheus/modules/file_batcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def file_batcher(builder: mrc.Builder):
sampling = config.get("sampling", None)
sampling_rate_s = config.get("sampling_rate_s", None)

iso_date_regex_pattern = config.get("batch_iso_date_regex_pattern", DEFAULT_ISO_DATE_REGEX_PATTERN)
iso_date_regex_pattern = config.get("iso_date_regex_pattern", DEFAULT_ISO_DATE_REGEX_PATTERN)
iso_date_regex = re.compile(iso_date_regex_pattern)

if (sampling_rate_s is not None and sampling_rate_s > 0):
Expand All @@ -99,6 +99,7 @@ def file_batcher(builder: mrc.Builder):
"sampling": sampling,
"start_time": config.get("start_time"),
"end_time": config.get("end_time"),
"iso_date_regex_pattern": iso_date_regex_pattern
}

default_file_to_df_opts = {
Expand All @@ -123,11 +124,19 @@ def build_period_batches(files: typing.List[str],
params: typing.Dict[any, any]) -> typing.List[typing.Tuple[typing.List[str], int]]:
file_objects: fsspec.core.OpenFiles = fsspec.open_files(files)

nonlocal iso_date_regex_pattern
nonlocal iso_date_regex

if params["iso_date_regex_pattern"] != iso_date_regex_pattern:
iso_date_regex_pattern = params["iso_date_regex_pattern"]
iso_date_regex = re.compile(iso_date_regex_pattern)

try:
start_time = params["start_time"]
end_time = params["end_time"]
period = params["period"]
sampling_rate_s = params["sampling_rate_s"]
sampling = params["sampling"]

if not isinstance(start_time, (str, type(None))) or (start_time is not None
and not re.match(r"\d{4}-\d{2}-\d{2}", start_time)):
Expand All @@ -137,8 +146,15 @@ def build_period_batches(files: typing.List[str],
and not re.match(r"\d{4}-\d{2}-\d{2}", end_time)):
raise ValueError(f"Invalid 'end_time' value: {end_time}")

if not isinstance(sampling_rate_s, int) or sampling_rate_s < 0:
raise ValueError(f"Invalid 'sampling_rate_s' value: {sampling_rate_s}")
if (sampling_rate_s is not None and sampling_rate_s > 0):
assert sampling is None, "Cannot set both sampling and sampling_rate_s at the same time"

# Show the deprecation message
warnings.warn(("The `sampling_rate_s` argument has been deprecated. "
"Please use `sampling={sampling_rate_s}S` instead"),
DeprecationWarning)

sampling = f"{sampling_rate_s}S"

if (start_time is not None):
start_time = datetime.datetime.strptime(start_time, '%Y-%m-%d').replace(tzinfo=datetime.timezone.utc)
Expand Down
300 changes: 300 additions & 0 deletions tests/modules/test_file_batcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,300 @@
#!/usr/bin/env python
# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pytest

import cudf

import morpheus.modules # noqa: F401 # pylint: disable=unused-import
from morpheus.config import Config
from morpheus.messages import ControlMessage
from morpheus.messages.message_meta import MessageMeta
from morpheus.pipeline import LinearPipeline
from morpheus.pipeline.stage_decorator import source
from morpheus.stages.general.linear_modules_stage import LinearModulesStage
from morpheus.stages.output.in_memory_sink_stage import InMemorySinkStage
from morpheus.utils.module_ids import FILE_BATCHER
from morpheus.utils.module_ids import MORPHEUS_MODULE_NAMESPACE

# pylint: disable=redundant-keyword-arg


@source
def source_test_stage(filenames: list[str], cm_batching_options: dict) -> ControlMessage:

df = cudf.DataFrame(filenames, columns=['files'])

control_message = ControlMessage()

control_message.set_metadata("batching_options", cm_batching_options)
control_message.set_metadata("data_type", "payload")

control_message.payload(MessageMeta(df=df))

yield control_message


@pytest.fixture(name="default_module_config")
def default_module_config_fixture():
yield {
"module_id": FILE_BATCHER,
"module_name": "file_batcher",
"namespace": MORPHEUS_MODULE_NAMESPACE,
"sampling_rate_s": 0,
"start_time": "2022-08-01",
"end_time": "2022-08-31",
"parser_kwargs": None,
"schema": {
"schema_str": None, "encoding": None
}
}


@pytest.fixture(name="default_file_list")
def default_file_list_fixture():
yield [
"DUO_2022-08-01T00_05_06.806Z.json",
"DUO_2022-08-01T03_02_04.418Z.json",
"DUO_2022-08-01T06_05_05.064Z.json",
"DUO_2022-08-02T00_05_06.806Z.json",
"DUO_2022-08-02T03_02_04.418Z.json",
"DUO_2022-08-02T06_05_05.064Z.json"
]


def test_no_overrides(config: Config, default_module_config, default_file_list):
pipeline = LinearPipeline(config)

cm_batching_opts = {
"sampling_rate_s": 0,
"start_time": "2022-08-01",
"end_time": "2022-08-31",
"parser_kwargs": None,
"schema": {
"schema_str": None, "encoding": None
}
}

pipeline.set_source(source_test_stage(config, filenames=default_file_list, cm_batching_options=cm_batching_opts))

pipeline.add_stage(
LinearModulesStage(config, default_module_config, input_port_name="input", output_port_name="output"))

sink_stage = pipeline.add_stage(InMemorySinkStage(config))

pipeline.run()

sink_messages = sink_stage.get_messages()
assert len(sink_messages) == 2
assert len(sink_messages[0].get_tasks()["load"][0]["files"]) == 3
assert sink_messages[0].get_tasks()["load"][0]["n_groups"] == 2
assert len(sink_messages[1].get_tasks()["load"][0]["files"]) == 3
assert sink_messages[1].get_tasks()["load"][0]["n_groups"] == 2


def test_no_date_matches(config: Config, default_module_config, default_file_list):
pipeline = LinearPipeline(config)

cm_batching_opts = {
"sampling_rate_s": 0,
"start_time": "2022-09-01",
"end_time": "2022-09-30",
"parser_kwargs": None,
"schema": {
"schema_str": None, "encoding": None
}
}

pipeline.set_source(source_test_stage(config, filenames=default_file_list, cm_batching_options=cm_batching_opts))

pipeline.add_stage(
LinearModulesStage(config, default_module_config, input_port_name="input", output_port_name="output"))

sink_stage = pipeline.add_stage(InMemorySinkStage(config))

pipeline.run()

sink_messages = sink_stage.get_messages()
assert len(sink_messages) == 0


def test_partial_date_matches(config: Config, default_module_config, default_file_list):
pipeline = LinearPipeline(config)

cm_batching_opts = {
"sampling_rate_s": 0,
"start_time": "2022-07-30",
"end_time": "2022-08-02",
"parser_kwargs": None,
"schema": {
"schema_str": None, "encoding": None
}
}

pipeline.set_source(source_test_stage(config, filenames=default_file_list, cm_batching_options=cm_batching_opts))

pipeline.add_stage(
LinearModulesStage(config, default_module_config, input_port_name="input", output_port_name="output"))

sink_stage = pipeline.add_stage(InMemorySinkStage(config))

pipeline.run()

sink_messages = sink_stage.get_messages()
sink_messages = sink_stage.get_messages()
assert len(sink_messages) == 1
assert len(sink_messages[0].get_tasks()["load"][0]["files"]) == 3
assert sink_messages[0].get_tasks()["load"][0]["n_groups"] == 1


def test_override_date_regex(config: Config, default_module_config):
pipeline = LinearPipeline(config)

filenames = [
"DUO_2022-08-01_00_05_06.806Z.json",
"DUO_2022-08-01_03_02_04.418Z.json",
"DUO_2022-08-01_06_05_05.064Z.json",
"DUO_2022-08-02_00_05_06.806Z.json",
"DUO_2022-08-02_03_02_04.418Z.json",
"DUO_2022-08-02_06_05_05.064Z.json"
]

cm_date_regex_pattern = (
r"(?P<year>\d{4})-(?P<month>\d{1,2})-(?P<day>\d{1,2})"
r"_(?P<hour>\d{1,2})(:|_)(?P<minute>\d{1,2})(:|_)(?P<second>\d{1,2})(?P<microsecond>\.\d{1,6})?Z")

cm_batching_opts = {
"sampling_rate_s": 0,
"start_time": "2022-08-01",
"end_time": "2022-08-31",
"iso_date_regex_pattern": cm_date_regex_pattern,
"parser_kwargs": None,
"schema": {
"schema_str": None, "encoding": None
}
}

pipeline.set_source(source_test_stage(config, filenames=filenames, cm_batching_options=cm_batching_opts))

pipeline.add_stage(
LinearModulesStage(config, default_module_config, input_port_name="input", output_port_name="output"))

sink_stage = pipeline.add_stage(InMemorySinkStage(config))

pipeline.run()

sink_messages = sink_stage.get_messages()
assert len(sink_messages) == 2
assert len(sink_messages[0].get_tasks()["load"][0]["files"]) == 3
assert sink_messages[0].get_tasks()["load"][0]["n_groups"] == 2
assert len(sink_messages[1].get_tasks()["load"][0]["files"]) == 3
assert sink_messages[1].get_tasks()["load"][0]["n_groups"] == 2


def test_sampling_freq(config: Config, default_module_config):
pipeline = LinearPipeline(config)

filenames = [
"DUO_2022-08-01T00_05_06.806Z.json",
"DUO_2022-08-01T00_05_08.418Z.json",
"DUO_2022-08-01T00_05_12.064Z.json",
"DUO_2022-08-02T03_02_06.806Z.json",
"DUO_2022-08-02T03_02_14.418Z.json",
"DUO_2022-08-02T03_02_17.064Z.json"
]

cm_batching_opts = {
"sampling_rate_s": None,
"sampling": "30S",
"start_time": "2022-08-01",
"end_time": "2022-08-31",
"parser_kwargs": None,
"schema": {
"schema_str": None, "encoding": None
}
}

pipeline.set_source(source_test_stage(config, filenames=filenames, cm_batching_options=cm_batching_opts))

pipeline.add_stage(
LinearModulesStage(config, default_module_config, input_port_name="input", output_port_name="output"))

sink_stage = pipeline.add_stage(InMemorySinkStage(config))

pipeline.run()

sink_messages = sink_stage.get_messages()
assert len(sink_messages) == 2
assert len(sink_messages[0].get_tasks()["load"][0]["files"]) == 1
assert sink_messages[0].get_tasks()["load"][0]["n_groups"] == 2
assert len(sink_messages[1].get_tasks()["load"][0]["files"]) == 1
assert sink_messages[1].get_tasks()["load"][0]["n_groups"] == 2


def test_sampling_pct(config: Config, default_module_config, default_file_list):
pipeline = LinearPipeline(config)

cm_batching_opts = {
"sampling_rate_s": None,
"sampling": 0.5,
"start_time": "2022-08-01",
"end_time": "2022-08-31",
"parser_kwargs": None,
"schema": {
"schema_str": None, "encoding": None
}
}

pipeline.set_source(source_test_stage(config, filenames=default_file_list, cm_batching_options=cm_batching_opts))

pipeline.add_stage(
LinearModulesStage(config, default_module_config, input_port_name="input", output_port_name="output"))

sink_stage = pipeline.add_stage(InMemorySinkStage(config))

pipeline.run()

sink_messages = sink_stage.get_messages()
msg_counts = [len(m.get_tasks()["load"][0]["files"]) for m in sink_messages]
assert sum(msg_counts) == 3


def test_sampling_fixed(config: Config, default_module_config, default_file_list):
pipeline = LinearPipeline(config)

cm_batching_opts = {
"sampling_rate_s": None,
"sampling": 5,
"start_time": "2022-08-01",
"end_time": "2022-08-31",
"parser_kwargs": None,
"schema": {
"schema_str": None, "encoding": None
}
}

pipeline.set_source(source_test_stage(config, filenames=default_file_list, cm_batching_options=cm_batching_opts))

pipeline.add_stage(
LinearModulesStage(config, default_module_config, input_port_name="input", output_port_name="output"))

sink_stage = pipeline.add_stage(InMemorySinkStage(config))

pipeline.run()

sink_messages = sink_stage.get_messages()
msg_counts = [len(m.get_tasks()["load"][0]["files"]) for m in sink_messages]
assert sum(msg_counts) == 5
Loading