Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RSS Source Stage for Reading RSS Feeds #1149

Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docker/conda/environments/cuda11.8_dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ dependencies:
- docker-py=5.0
- docutils
- faker=12.3.0
- feedparser=6.0.10
- flake8
- flatbuffers=2.0
- gcc_linux-64=11.2
Expand Down
2 changes: 2 additions & 0 deletions morpheus/cli/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,8 @@ def post_pipeline(ctx: click.Context, *args, **kwargs):
add_command("from-duo", "morpheus.stages.input.duo_source_stage.DuoSourceStage", modes=AE_ONLY)
add_command("from-file", "morpheus.stages.input.file_source_stage.FileSourceStage", modes=NOT_AE)
add_command("from-kafka", "morpheus.stages.input.kafka_source_stage.KafkaSourceStage", modes=NOT_AE)
add_command("from-http", "morpheus.stages.input.http_server_source_stage.HttpServerSourceStage", modes=ALL)
add_command("from-http-client", "morpheus.stages.input.http_client_source_stage.HttpClientSourceStage", modes=ALL)
add_command("from-rss", "morpheus.stages.input.rss_source_stage.RSSSourceStage", modes=ALL)
add_command("gen-viz", "morpheus.stages.postprocess.generate_viz_frames_stage.GenerateVizFramesStage", modes=NLP_ONLY)
add_command("inf-identity", "morpheus.stages.inference.identity_inference_stage.IdentityInferenceStage", modes=NOT_AE)
Expand Down
59 changes: 34 additions & 25 deletions morpheus/stages/input/rss_source_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,23 @@
# limitations under the License.

import logging
import feedparser
import cudf
from morpheus.utils.controllers.rss_controller import RSSController
import time

import mrc

from morpheus.cli import register_stage
import time
import feedparser
import pandas as pd
from datetime import datetime, timedelta
from morpheus.config import Config
from morpheus.config import PipelineModes
from morpheus.messages import ControlMessage
from morpheus.messages import MessageMeta
from morpheus.pipeline.preallocator_mixin import PreallocatorMixin
from morpheus.pipeline.single_output_source import SingleOutputSource
from morpheus.pipeline.stream_pair import StreamPair

from morpheus.utils.controllers.rss_controller import RSSController

logger = logging.getLogger(__name__)

@register_stage("from-rss", modes=[PipelineModes.AE, PipelineModes.FIL, PipelineModes.NLP, PipelineModes.OTHER])

@register_stage("from-rss")
class RSSSourceStage(PreallocatorMixin, SingleOutputSource):
"""
Load RSS feed items into a pandas DataFrame.
Expand All @@ -42,21 +39,28 @@ class RSSSourceStage(PreallocatorMixin, SingleOutputSource):
c : morpheus.config.Config
Pipeline configuration instance.
feed_input : str
The URL of the RSS feed.
The URL or file path of the RSS feed.
interval_secs : float, optional, default = 600
Interval in seconds between fetching new feed items.
stop_after: int, default = 0
Stops ingesting after emitting `stop_after` records (rows in the dataframe). Useful for testing. Disabled if `0`
max_retries : int, optional, default = 3
Maximum number of retries for fetching entries on exception.
"""
def __init__(self, c: Config, feed_input: str, interval_secs: float = 600, stop_after: int = 0, max_retries: int = 5):

def __init__(self,
c: Config,
feed_input: str,
interval_secs: float = 600,
stop_after: int = 0,
max_retries: int = 5):
super().__init__(c)
self._stop_requested = False
self._records_emitted = 0
self._stop_after = stop_after
self._interval_secs = interval_secs
self._max_retries = max_retries

self._records_emitted = 0
self._controller = RSSController(feed_input=feed_input, batch_size=c.pipeline_batch_size)

@property
Expand All @@ -73,23 +77,23 @@ def stop(self):
def supports_cpp_node(self):
return False

def _fetch_feeds(self):
def _fetch_feeds(self) -> ControlMessage:
"""
Fetch RSS feed entries and yield as MessageMeta messages.
Fetch RSS feed entries and yield as ControlMessage object.
"""
retries = 0

while (not self._stop_requested) and (retries < self._max_retries):
try:
for entry_accumulator in self._controller.fetch_entries():
if entry_accumulator is not None and entry_accumulator:
logger.debug("Processing %d new entries...", len(entry_accumulator))
df = self._controller.create_dataframe(entry_accumulator)
self._records_emitted += len(df)
for df in self._controller.fetch_dataframes():
df_size = len(df)
self._records_emitted += df_size

if logger.isEnabledFor(logging.DEBUG):
logger.debug("Received %d new entries...", df_size)
logger.debug("Emitted %d records so far.", self._records_emitted)
yield MessageMeta(df=df)
continue
logger.debug("New entries not found.")

yield MessageMeta(df=df)

if not self._controller.run_indefinitely:
self._stop_requested = True
Expand All @@ -104,15 +108,20 @@ def _fetch_feeds(self):
time.sleep(self._interval_secs)

except Exception as exc:
if not self._controller.run_indefinitely:
logger.error("The input provided is not a URL or a valid path, therefore, the maximum " +
"retries are being overridden, and early exiting is triggered.")
raise Exception(f"Failed to fetch feed entries : {exc}") from exc # pylint: disable=W0719

retries += 1
logger.warning("Error fetching feed entries. Retrying (%d/%d)...", retries, self._max_retries)
logger.debug("Waiting for 5 secs before retrying...")
time.sleep(5) # Wait before retrying

if retries == self._max_retries: # Check if retries exceeded the limit
logger.error("Max retries reached. Unable to fetch feed entries.")
raise Exception("Failed to fetch feed entries after max retries: %s", exc)

# pylint: disable=W0719
raise Exception(f"Failed to fetch feed entries after max retries: {exc}") from exc

def _build_source(self, builder: mrc.Builder) -> StreamPair:
source = builder.make_source(self.unique_name, self._fetch_feeds)
Expand Down
31 changes: 20 additions & 11 deletions morpheus/utils/controllers/rss_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,18 @@
# limitations under the License.

import logging
import feedparser
import cudf
import typing
from urllib.parse import urlparse

import feedparser

import cudf

logger = logging.getLogger(__name__)


class RSSController:

def __init__(self, feed_input: str, batch_size: int = 128):
"""
RSSController handles fetching and processing of RSS feed entries.
Expand All @@ -36,13 +40,13 @@ def __init__(self, feed_input: str, batch_size: int = 128):
self._batch_size = batch_size
self._previous_entires = set() # Stores the IDs of previous entries to prevent the processing of duplicates.
# If feed_input is URL. Runs indefinitely
self._run_indefinitely = True if RSSController.is_url(feed_input) else False
self._run_indefinitely = RSSController.is_url(feed_input)

@property
def run_indefinitely(self):
return self._run_indefinitely

def parse_feed(self):
def parse_feed(self) -> typing.List:
"""
Parse the RSS feed using the feedparser library.

Expand All @@ -60,15 +64,14 @@ def parse_feed(self):

if feed.entries:
return feed
else:
raise Exception(f"Invalid feed input: {self._feed_input}. No entries found.")

raise Exception(f"Invalid feed input: {self._feed_input}. No entries found.") # pylint: disable=W0719

def fetch_entries(self) -> typing.Union[typing.List[typing.Tuple], typing.List]:
def fetch_dataframes(self) -> cudf.DataFrame:
"""
Fetch and process RSS feed entries.

Returns
Yeilds
-------
typing.Union[typing.List[typing.Tuple], typing.List]
List of feed entries or None if no new entries are available.
Expand All @@ -93,15 +96,20 @@ def fetch_entries(self) -> typing.Union[typing.List[typing.Tuple], typing.List]:
entry_accumulator.append(entry)

if len(entry_accumulator) >= self._batch_size:
yield entry_accumulator
yield self.create_dataframe(entry_accumulator)
entry_accumulator.clear()

self._previous_entires = current_entries

yield entry_accumulator
# Yield any remaining entries.
if entry_accumulator:
df = self.create_dataframe(entry_accumulator)
yield df
else:
logger.debug("No new entries found.")

except Exception as exc:
raise Exception("Error fetching or processing feed entries: %s", exc)
raise Exception(f"Error fetching or processing feed entries: {exc}") from exc # pylint: disable=W0719

def create_dataframe(self, entries: typing.List[typing.Tuple]) -> cudf.DataFrame:
"""
Expand All @@ -121,6 +129,7 @@ def create_dataframe(self, entries: typing.List[typing.Tuple]) -> cudf.DataFrame
return cudf.DataFrame(entries)
except Exception as exc:
logger.error("Error creating DataFrame: %s", exc)
raise Exception(f"Error creating DataFrame: {exc}") from exc # pylint: disable=W0719

@classmethod
def is_url(cls, feed_input: str) -> bool:
Expand Down
93 changes: 93 additions & 0 deletions tests/test_rss_source_pipe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
#!/usr/bin/env python
# SPDX-FileCopyrightText: Copyright (c) 2022-2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os

import pytest

from _utils import TEST_DIRS
from morpheus.pipeline.pipeline import Pipeline
from morpheus.stages.input.rss_source_stage import RSSSourceStage
from morpheus.stages.output.in_memory_sink_stage import InMemorySinkStage


@pytest.mark.use_python
def test_constructor(config):

url_feed_input = "https://realpython.com/atom.xml"
rss_source_stage = RSSSourceStage(config, feed_input=url_feed_input)

file_feed_input = os.path.join(TEST_DIRS.tests_data_dir, "rss_feed_atom.xml")
rss_source_stage_2 = RSSSourceStage(config,
feed_input=file_feed_input,
interval_secs=5,
stop_after=10,
max_retries=2)

ctlr = rss_source_stage._controller
ctlr_2 = rss_source_stage_2._controller

assert ctlr._feed_input == "https://realpython.com/atom.xml"
assert ctlr._run_indefinitely is True
assert ctlr._batch_size == config.pipeline_batch_size
assert rss_source_stage._interval_secs == 600
assert rss_source_stage._stop_after == 0
assert rss_source_stage._max_retries == 5

assert ctlr_2._feed_input == file_feed_input
assert ctlr_2._run_indefinitely is False
assert ctlr_2._batch_size == config.pipeline_batch_size
assert rss_source_stage_2._interval_secs == 5
assert rss_source_stage_2._stop_after == 10
assert rss_source_stage_2._max_retries == 2

assert rss_source_stage.supports_cpp_node() is False
assert rss_source_stage_2.supports_cpp_node() is False


@pytest.mark.use_python
@pytest.mark.parametrize("batch_size, expected_count", [(30, 1), (12, 3), (15, 2)])
def test_rss_source_stage_pipe(config, batch_size, expected_count) -> None:

feed_input = os.path.join(TEST_DIRS.tests_data_dir, "rss_feed_atom.xml")
config.pipeline_batch_size = batch_size

pipe = Pipeline(config)

rss_source_stage = pipe.add_stage(RSSSourceStage(config, feed_input=feed_input))
sink_stage = pipe.add_stage(InMemorySinkStage(config))

pipe.add_edge(rss_source_stage, sink_stage)

pipe.run()

assert len(sink_stage.get_messages()) == expected_count


@pytest.mark.use_python
def test_invalid_input_rss_source_stage_pipe(config) -> None:

feed_input = os.path.join(TEST_DIRS.tests_data_dir, "rss_feed_atom.xm")

pipe = Pipeline(config)

rss_source_stage = pipe.add_stage(RSSSourceStage(config, feed_input=feed_input, interval_secs=1, max_retries=1))
sink_stage = pipe.add_stage(InMemorySinkStage(config))

pipe.add_edge(rss_source_stage, sink_stage)

with pytest.raises(Exception):
pipe.run()
Loading