Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RSS Source Stage for Reading RSS Feeds #1149

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docker/conda/environments/cuda11.8_dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ dependencies:
- docker-py=5.0
- docutils
- faker=12.3.0
- feedparser=6.0.10
- flake8
- flatbuffers=2.0
- gcc_linux-64=11.2
Expand Down
1 change: 1 addition & 0 deletions morpheus/cli/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,7 @@ def post_pipeline(ctx: click.Context, *args, **kwargs):
add_command("from-kafka", "morpheus.stages.input.kafka_source_stage.KafkaSourceStage", modes=NOT_AE)
add_command("from-http", "morpheus.stages.input.http_server_source_stage.HttpServerSourceStage", modes=ALL)
add_command("from-http-client", "morpheus.stages.input.http_client_source_stage.HttpClientSourceStage", modes=ALL)
add_command("from-rss", "morpheus.stages.input.rss_source_stage.RSSSourceStage", modes=ALL)
add_command("gen-viz", "morpheus.stages.postprocess.generate_viz_frames_stage.GenerateVizFramesStage", modes=NLP_ONLY)
add_command("inf-identity", "morpheus.stages.inference.identity_inference_stage.IdentityInferenceStage", modes=NOT_AE)
add_command("inf-pytorch",
Expand Down
13 changes: 13 additions & 0 deletions morpheus/controllers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright (c) 2022-2023, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
160 changes: 160 additions & 0 deletions morpheus/controllers/rss_controller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
# Copyright (c) 2022-2023, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import typing
from urllib.parse import urlparse

import feedparser

import cudf

logger = logging.getLogger(__name__)


class RSSController:
"""
RSSController handles fetching and processing of RSS feed entries.

Parameters
----------
feed_input : str
The URL or file path of the RSS feed.
batch_size : int, optional, default = 128
Number of feed items to accumulate before creating a DataFrame.
"""

def __init__(self, feed_input: str, batch_size: int = 128):

self._feed_input = feed_input
self._batch_size = batch_size
self._previous_entires = set() # Stores the IDs of previous entries to prevent the processing of duplicates.
# If feed_input is URL. Runs indefinitely
self._run_indefinitely = RSSController.is_url(feed_input)

@property
def run_indefinitely(self):
"""Property that determines to run the source indefinitely"""
return self._run_indefinitely

def parse_feed(self) -> list[dict]:
"""
Parse the RSS feed using the feedparser library.

Returns
-------
feedparser.FeedParserDict
The parsed feed content.

Raises
------
RuntimeError
If the feed input is invalid or does not contain any entries.
"""
feed = feedparser.parse(self._feed_input)

if feed.entries:
return feed

raise RuntimeError(f"Invalid feed input: {self._feed_input}. No entries found.")

def fetch_dataframes(self) -> cudf.DataFrame:
"""
Fetch and process RSS feed entries.

Yeilds
-------
typing.Union[typing.List[typing.Tuple], typing.List]
List of feed entries or None if no new entries are available.

Raises
------
RuntimeError
If there is error fetching or processing feed entries.
"""
entry_accumulator = []
current_entries = set()

try:

feed = self.parse_feed()

for entry in feed.entries:
entry_id = entry.get('id')
current_entries.add(entry_id)

if entry_id not in self._previous_entires:
entry_accumulator.append(entry)

if len(entry_accumulator) >= self._batch_size:
yield self.create_dataframe(entry_accumulator)
entry_accumulator.clear()

self._previous_entires = current_entries

# Yield any remaining entries.
if entry_accumulator:
df = self.create_dataframe(entry_accumulator)
yield df
else:
logger.debug("No new entries found.")

except Exception as exc:
raise RuntimeError(f"Error fetching or processing feed entries: {exc}") from exc

def create_dataframe(self, entries: typing.List[typing.Tuple]) -> cudf.DataFrame:
"""
Create a DataFrame from accumulated entry data.

Parameters
----------
entries : typing.List[typing.Tuple]
List of accumulated feed entries.

Returns
-------
cudf.DataFrame
A DataFrame containing feed entry data.

Raises
------
RuntimeError
Error creating DataFrame.
"""
try:
return cudf.DataFrame(entries)
except Exception as exc:
logger.error("Error creating DataFrame: %s", exc)
raise RuntimeError(f"Error creating DataFrame: {exc}") from exc

@classmethod
def is_url(cls, feed_input: str) -> bool:
"""
Check if the provided input is a valid URL.

Parameters
----------
feed_input : str
The input string to be checked.

Returns
-------
bool
True if the input is a valid URL, False otherwise.
"""
try:
parsed_url = urlparse(feed_input)
return parsed_url.scheme != '' and parsed_url.netloc != ''
except Exception:
return False
126 changes: 126 additions & 0 deletions morpheus/stages/input/rss_source_stage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
# Copyright (c) 2022-2023, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import time

import mrc

from morpheus.cli import register_stage
from morpheus.config import Config
from morpheus.controllers.rss_controller import RSSController
from morpheus.messages import MessageMeta
from morpheus.pipeline.preallocator_mixin import PreallocatorMixin
from morpheus.pipeline.single_output_source import SingleOutputSource
from morpheus.pipeline.stream_pair import StreamPair

logger = logging.getLogger(__name__)


@register_stage("from-rss")
class RSSSourceStage(PreallocatorMixin, SingleOutputSource):
"""
Load RSS feed items into a pandas DataFrame.

Parameters
----------
c : morpheus.config.Config
Pipeline configuration instance.
feed_input : str
The URL or file path of the RSS feed.
interval_secs : float, optional, default = 600
Interval in seconds between fetching new feed items.
stop_after: int, default = 0
Stops ingesting after emitting `stop_after` records (rows in the dataframe). Useful for testing. Disabled if `0`
max_retries : int, optional, default = 3
Maximum number of retries for fetching entries on exception.
"""

def __init__(self,
c: Config,
feed_input: str,
interval_secs: float = 600,
stop_after: int = 0,
max_retries: int = 5):
super().__init__(c)
self._stop_requested = False
self._stop_after = stop_after
self._interval_secs = interval_secs
self._max_retries = max_retries

self._records_emitted = 0
self._controller = RSSController(feed_input=feed_input, batch_size=c.pipeline_batch_size)

@property
def name(self) -> str:
return "from-rss"

def stop(self):
"""
Stop the RSS source stage.
"""
self._stop_requested = True
return super().stop()

def supports_cpp_node(self):
return False

def _fetch_feeds(self) -> MessageMeta:
"""
Fetch RSS feed entries and yield as MessageMeta object.
"""
retries = 0

while (not self._stop_requested) and (retries < self._max_retries):
try:
for df in self._controller.fetch_dataframes():
df_size = len(df)
self._records_emitted += df_size

if logger.isEnabledFor(logging.DEBUG):
logger.debug("Received %d new entries...", df_size)
logger.debug("Emitted %d records so far.", self._records_emitted)

yield MessageMeta(df=df)

if not self._controller.run_indefinitely:
self._stop_requested = True
continue

if (self._stop_after > 0 and self._records_emitted >= self._stop_after):
self._stop_requested = True
logger.debug("Stop limit reached...preparing to halt the source.")
continue

logger.debug("Waiting for %d seconds before fetching again...", self._interval_secs)
time.sleep(self._interval_secs)

except Exception as exc:
if not self._controller.run_indefinitely:
logger.error("The input provided is not a URL or a valid path, therefore, the maximum " +
"retries are being overridden, and early exiting is triggered.")
raise RuntimeError(f"Failed to fetch feed entries : {exc}") from exc

retries += 1
logger.warning("Error fetching feed entries. Retrying (%d/%d)...", retries, self._max_retries)
logger.debug("Waiting for 5 secs before retrying...")
time.sleep(5) # Wait before retrying

if retries == self._max_retries: # Check if retries exceeded the limit
logger.error("Max retries reached. Unable to fetch feed entries.")
raise RuntimeError(f"Failed to fetch feed entries after max retries: {exc}") from exc

def _build_source(self, builder: mrc.Builder) -> StreamPair:
source = builder.make_source(self.unique_name, self._fetch_feeds)
return source, MessageMeta
Loading