-
Notifications
You must be signed in to change notification settings - Fork 161
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
RSS Source Stage for Reading RSS Feeds (#1149)
- RSS Source Stage implementation and tests closes #1143 Authors: - Bhargav Suryadevara (https://github.com/bsuryadevara) Approvers: - Michael Demoret (https://github.com/mdemoret-nv) URL: #1149
- Loading branch information
1 parent
485c983
commit 214232c
Showing
8 changed files
with
493 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
# Copyright (c) 2022-2023, NVIDIA CORPORATION. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,160 @@ | ||
# Copyright (c) 2022-2023, NVIDIA CORPORATION. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
import logging | ||
import typing | ||
from urllib.parse import urlparse | ||
|
||
import feedparser | ||
|
||
import cudf | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class RSSController: | ||
""" | ||
RSSController handles fetching and processing of RSS feed entries. | ||
Parameters | ||
---------- | ||
feed_input : str | ||
The URL or file path of the RSS feed. | ||
batch_size : int, optional, default = 128 | ||
Number of feed items to accumulate before creating a DataFrame. | ||
""" | ||
|
||
def __init__(self, feed_input: str, batch_size: int = 128): | ||
|
||
self._feed_input = feed_input | ||
self._batch_size = batch_size | ||
self._previous_entires = set() # Stores the IDs of previous entries to prevent the processing of duplicates. | ||
# If feed_input is URL. Runs indefinitely | ||
self._run_indefinitely = RSSController.is_url(feed_input) | ||
|
||
@property | ||
def run_indefinitely(self): | ||
"""Property that determines to run the source indefinitely""" | ||
return self._run_indefinitely | ||
|
||
def parse_feed(self) -> list[dict]: | ||
""" | ||
Parse the RSS feed using the feedparser library. | ||
Returns | ||
------- | ||
feedparser.FeedParserDict | ||
The parsed feed content. | ||
Raises | ||
------ | ||
RuntimeError | ||
If the feed input is invalid or does not contain any entries. | ||
""" | ||
feed = feedparser.parse(self._feed_input) | ||
|
||
if feed.entries: | ||
return feed | ||
|
||
raise RuntimeError(f"Invalid feed input: {self._feed_input}. No entries found.") | ||
|
||
def fetch_dataframes(self) -> cudf.DataFrame: | ||
""" | ||
Fetch and process RSS feed entries. | ||
Yeilds | ||
------- | ||
typing.Union[typing.List[typing.Tuple], typing.List] | ||
List of feed entries or None if no new entries are available. | ||
Raises | ||
------ | ||
RuntimeError | ||
If there is error fetching or processing feed entries. | ||
""" | ||
entry_accumulator = [] | ||
current_entries = set() | ||
|
||
try: | ||
|
||
feed = self.parse_feed() | ||
|
||
for entry in feed.entries: | ||
entry_id = entry.get('id') | ||
current_entries.add(entry_id) | ||
|
||
if entry_id not in self._previous_entires: | ||
entry_accumulator.append(entry) | ||
|
||
if len(entry_accumulator) >= self._batch_size: | ||
yield self.create_dataframe(entry_accumulator) | ||
entry_accumulator.clear() | ||
|
||
self._previous_entires = current_entries | ||
|
||
# Yield any remaining entries. | ||
if entry_accumulator: | ||
df = self.create_dataframe(entry_accumulator) | ||
yield df | ||
else: | ||
logger.debug("No new entries found.") | ||
|
||
except Exception as exc: | ||
raise RuntimeError(f"Error fetching or processing feed entries: {exc}") from exc | ||
|
||
def create_dataframe(self, entries: typing.List[typing.Tuple]) -> cudf.DataFrame: | ||
""" | ||
Create a DataFrame from accumulated entry data. | ||
Parameters | ||
---------- | ||
entries : typing.List[typing.Tuple] | ||
List of accumulated feed entries. | ||
Returns | ||
------- | ||
cudf.DataFrame | ||
A DataFrame containing feed entry data. | ||
Raises | ||
------ | ||
RuntimeError | ||
Error creating DataFrame. | ||
""" | ||
try: | ||
return cudf.DataFrame(entries) | ||
except Exception as exc: | ||
logger.error("Error creating DataFrame: %s", exc) | ||
raise RuntimeError(f"Error creating DataFrame: {exc}") from exc | ||
|
||
@classmethod | ||
def is_url(cls, feed_input: str) -> bool: | ||
""" | ||
Check if the provided input is a valid URL. | ||
Parameters | ||
---------- | ||
feed_input : str | ||
The input string to be checked. | ||
Returns | ||
------- | ||
bool | ||
True if the input is a valid URL, False otherwise. | ||
""" | ||
try: | ||
parsed_url = urlparse(feed_input) | ||
return parsed_url.scheme != '' and parsed_url.netloc != '' | ||
except Exception: | ||
return False |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,126 @@ | ||
# Copyright (c) 2022-2023, NVIDIA CORPORATION. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
import logging | ||
import time | ||
|
||
import mrc | ||
|
||
from morpheus.cli import register_stage | ||
from morpheus.config import Config | ||
from morpheus.controllers.rss_controller import RSSController | ||
from morpheus.messages import MessageMeta | ||
from morpheus.pipeline.preallocator_mixin import PreallocatorMixin | ||
from morpheus.pipeline.single_output_source import SingleOutputSource | ||
from morpheus.pipeline.stream_pair import StreamPair | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
@register_stage("from-rss") | ||
class RSSSourceStage(PreallocatorMixin, SingleOutputSource): | ||
""" | ||
Load RSS feed items into a pandas DataFrame. | ||
Parameters | ||
---------- | ||
c : morpheus.config.Config | ||
Pipeline configuration instance. | ||
feed_input : str | ||
The URL or file path of the RSS feed. | ||
interval_secs : float, optional, default = 600 | ||
Interval in seconds between fetching new feed items. | ||
stop_after: int, default = 0 | ||
Stops ingesting after emitting `stop_after` records (rows in the dataframe). Useful for testing. Disabled if `0` | ||
max_retries : int, optional, default = 3 | ||
Maximum number of retries for fetching entries on exception. | ||
""" | ||
|
||
def __init__(self, | ||
c: Config, | ||
feed_input: str, | ||
interval_secs: float = 600, | ||
stop_after: int = 0, | ||
max_retries: int = 5): | ||
super().__init__(c) | ||
self._stop_requested = False | ||
self._stop_after = stop_after | ||
self._interval_secs = interval_secs | ||
self._max_retries = max_retries | ||
|
||
self._records_emitted = 0 | ||
self._controller = RSSController(feed_input=feed_input, batch_size=c.pipeline_batch_size) | ||
|
||
@property | ||
def name(self) -> str: | ||
return "from-rss" | ||
|
||
def stop(self): | ||
""" | ||
Stop the RSS source stage. | ||
""" | ||
self._stop_requested = True | ||
return super().stop() | ||
|
||
def supports_cpp_node(self): | ||
return False | ||
|
||
def _fetch_feeds(self) -> MessageMeta: | ||
""" | ||
Fetch RSS feed entries and yield as MessageMeta object. | ||
""" | ||
retries = 0 | ||
|
||
while (not self._stop_requested) and (retries < self._max_retries): | ||
try: | ||
for df in self._controller.fetch_dataframes(): | ||
df_size = len(df) | ||
self._records_emitted += df_size | ||
|
||
if logger.isEnabledFor(logging.DEBUG): | ||
logger.debug("Received %d new entries...", df_size) | ||
logger.debug("Emitted %d records so far.", self._records_emitted) | ||
|
||
yield MessageMeta(df=df) | ||
|
||
if not self._controller.run_indefinitely: | ||
self._stop_requested = True | ||
continue | ||
|
||
if (self._stop_after > 0 and self._records_emitted >= self._stop_after): | ||
self._stop_requested = True | ||
logger.debug("Stop limit reached...preparing to halt the source.") | ||
continue | ||
|
||
logger.debug("Waiting for %d seconds before fetching again...", self._interval_secs) | ||
time.sleep(self._interval_secs) | ||
|
||
except Exception as exc: | ||
if not self._controller.run_indefinitely: | ||
logger.error("The input provided is not a URL or a valid path, therefore, the maximum " + | ||
"retries are being overridden, and early exiting is triggered.") | ||
raise RuntimeError(f"Failed to fetch feed entries : {exc}") from exc | ||
|
||
retries += 1 | ||
logger.warning("Error fetching feed entries. Retrying (%d/%d)...", retries, self._max_retries) | ||
logger.debug("Waiting for 5 secs before retrying...") | ||
time.sleep(5) # Wait before retrying | ||
|
||
if retries == self._max_retries: # Check if retries exceeded the limit | ||
logger.error("Max retries reached. Unable to fetch feed entries.") | ||
raise RuntimeError(f"Failed to fetch feed entries after max retries: {exc}") from exc | ||
|
||
def _build_source(self, builder: mrc.Builder) -> StreamPair: | ||
source = builder.make_source(self.unique_name, self._fetch_feeds) | ||
return source, MessageMeta |
Oops, something went wrong.