Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dsegog 271 handling failed ingestion #148

Merged
merged 25 commits into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
6a190a3
Added function decorator to handle db issues
moonraker595 Nov 29, 2024
50273e9
Created function decorator to handle db issues
moonraker595 Nov 29, 2024
f9f491a
Added test to check that a db error is caught and returned
moonraker595 Nov 29, 2024
c974569
Wrapped the upload in a try/catch and return the failed channel if ne…
moonraker595 Jan 20, 2025
cbd62c5
Added a function to remove a channel from the record. Used when the c…
moonraker595 Jan 20, 2025
63d2366
Wrapped the upload in a try/catch and return the failed channel if ne…
moonraker595 Jan 20, 2025
b89d078
Updated versions
moonraker595 Jan 20, 2025
2270606
Black formatting
moonraker595 Jan 20, 2025
ce92c48
Added to list of aliases
moonraker595 Jan 20, 2025
eb73d4d
Linting
moonraker595 Jan 20, 2025
18ad3b6
Added to list of aliases
moonraker595 Jan 20, 2025
2f6bd61
Created a way to monitor the result of the echo uploads and remove th…
moonraker595 Jan 21, 2025
1a4dd98
added tests to check various db and echo related failures
moonraker595 Jan 21, 2025
b89d73a
Linting
moonraker595 Jan 21, 2025
f7c53a4
Update vulnerable dependencies
moonraker595 Jan 21, 2025
2d623db
Added error handling around the query_to_list function
moonraker595 Jan 30, 2025
5da3e01
Removing redundant KeyError
moonraker595 Jan 30, 2025
b65af8e
Added passthrough for DatabaseErrors
moonraker595 Jan 31, 2025
f048abc
Updated to fix ingest boto issue
moonraker595 Jan 31, 2025
6d07ad4
Updated tests for coverage
moonraker595 Feb 3, 2025
abfe6fd
Updated test to cover for propagated db errors.
moonraker595 Feb 3, 2025
6cce922
Merge branch 'main' into DSEGOG-271-Handling-Failed-Ingestion
moonraker595 Feb 3, 2025
fdc7309
updated to working versions
moonraker595 Feb 3, 2025
1dea424
testing downgrading of certify
moonraker595 Feb 4, 2025
70bbd4e
testing upgrade of certify
moonraker595 Feb 4, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions operationsgateway_api/src/mongo/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from operationsgateway_api.src.config import Config
from operationsgateway_api.src.exceptions import DatabaseError
from operationsgateway_api.src.mongo.connection import ConnectionInstance
from operationsgateway_api.src.mongo.mongo_error_handling import mongodb_error_handling

log = logging.getLogger()
ProjectionAlias = Optional[Union[Mapping[str, Any], Iterable[str]]]
Expand All @@ -22,16 +23,17 @@
class MongoDBInterface:
"""
An implementation of various PyMongo and Motor functions that suit our specific
database and colllection names
database and collection names

Motor doesn't support type annotations (see
https://jira.mongodb.org/browse/MOTOR-331 for any updates) so type annotations are
used from PyMongo which from a user perspective, acts almost identically (exlcuding
used from PyMongo which from a user perspective, acts almost identically (excluding
async support of course). This means the type hinting can actually be useful for
developers of this repo
"""

@staticmethod
@mongodb_error_handling("get_collection_object")
def get_collection_object(collection_name: str) -> Collection:
"""
Simple getter function which gets a particular collection so it can be
Expand All @@ -44,6 +46,7 @@ def get_collection_object(collection_name: str) -> Collection:
raise DatabaseError("Invalid collection name given") from exc

@staticmethod
@mongodb_error_handling("find")
patrick-austin marked this conversation as resolved.
Show resolved Hide resolved
def find(
collection_name: str = "images",
filter_: dict = None,
Expand Down Expand Up @@ -83,6 +86,7 @@ def find(
)

@staticmethod
@mongodb_error_handling("query_to_list")
async def query_to_list(query: Cursor) -> List[Dict[str, Any]]:
"""
Sends the query to MongoDB and converts the query results into a list
Expand All @@ -99,6 +103,7 @@ async def query_to_list(query: Cursor) -> List[Dict[str, Any]]:
return await query.to_list(length=Config.config.mongodb.max_documents)

@staticmethod
@mongodb_error_handling("find_one")
async def find_one(
collection_name: str,
filter_: Dict[str, Any] = None,
Expand All @@ -120,6 +125,7 @@ async def find_one(
return await collection.find_one(filter_, sort=sort, projection=projection)

@staticmethod
@mongodb_error_handling("update_one")
async def update_one(
collection_name: str,
filter_: Dict[str, Any] = None,
Expand Down Expand Up @@ -153,6 +159,7 @@ async def update_one(
) from exc

@staticmethod
@mongodb_error_handling("update_many")
async def update_many(
collection_name: str,
filter_: Dict[str, Any] = {}, # noqa: B006
Expand All @@ -177,6 +184,7 @@ async def update_many(
) from exc

@staticmethod
@mongodb_error_handling("insert_one")
async def insert_one(collection_name: str, data: Dict[str, Any]) -> InsertOneResult:
"""
Using the input data, insert a single document into a given collection
Expand All @@ -194,6 +202,7 @@ async def insert_one(collection_name: str, data: Dict[str, Any]) -> InsertOneRes
) from exc

@staticmethod
@mongodb_error_handling("insert_many")
async def insert_many(
collection_name: str,
data: List[Dict[str, Any]],
Expand All @@ -215,6 +224,7 @@ async def insert_many(
) from exc

@staticmethod
@mongodb_error_handling("delete_one")
async def delete_one(
collection_name: str,
filter_: Dict[str, Any] = None,
Expand Down Expand Up @@ -245,6 +255,7 @@ async def delete_one(
) from exc

@staticmethod
@mongodb_error_handling("count_documents")
async def count_documents(
collection_name: str,
filter_: Dict[str, Any] = None,
Expand Down Expand Up @@ -273,6 +284,7 @@ async def count_documents(
) from exc

@staticmethod
@mongodb_error_handling("aggregate")
async def aggregate(
collection_name: str,
pipeline,
Expand Down
48 changes: 48 additions & 0 deletions operationsgateway_api/src/mongo/mongo_error_handling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from functools import wraps
from inspect import iscoroutinefunction
import logging

from operationsgateway_api.src.exceptions import DatabaseError

log = logging.getLogger()


def mongodb_error_handling(operation: str):
"""
Decorator for consistent error handling in MongoDB operations, supporting both
sync and async functions.
"""

def decorator(func):
if iscoroutinefunction(func): # Check if the function is async

@wraps(func)
async def async_wrapper(*args, **kwargs):
try:
return await func(*args, **kwargs)
except DatabaseError:
raise # If it's already a DatabaseError, propagate it as-is
except Exception as exc:
log.error("Database operation: %s failed", operation)
raise DatabaseError(
f"Database operation failed during {operation}",
) from exc

return async_wrapper
else: # Handle synchronous functions

@wraps(func)
def sync_wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except DatabaseError:
raise # If it's already a DatabaseError, propagate it as-is
except Exception as exc:
log.error("Database operation: %s failed", operation)
raise DatabaseError(
f"Database operation failed during {operation}",
) from exc

return sync_wrapper

return decorator
14 changes: 11 additions & 3 deletions operationsgateway_api/src/records/image.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from io import BytesIO
import logging
from typing import Tuple
from typing import Optional, Tuple

from botocore.exceptions import ClientError
import numpy as np
Expand Down Expand Up @@ -79,7 +79,7 @@ def extract_metadata_from_path(self) -> Tuple[str, str]:
return record_id, channel_name

@staticmethod
def upload_image(input_image: Image) -> None:
def upload_image(input_image: Image) -> Optional[str]:
"""
Save the image on Echo S3 object storage
"""
Expand All @@ -96,7 +96,15 @@ def upload_image(input_image: Image) -> None:
echo = EchoInterface()
storage_path = Image.get_full_path(input_image.image.path)
log.info("Storing image on S3: %s", storage_path)
echo.upload_file_object(image_bytes, storage_path)

try:
echo.upload_file_object(image_bytes, storage_path)
return None # No failure
except EchoS3Error:
# Extract the channel name and propagate it
record_id, channel_name = input_image.extract_metadata_from_path()
log.error("Failed to upload image for channel: %s", channel_name)
return channel_name

@staticmethod
async def get_image(
Expand Down
7 changes: 7 additions & 0 deletions operationsgateway_api/src/records/record.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,13 @@ async def update(self) -> None:
},
)

def remove_channel(self, channel_name: str) -> None:
if channel_name in self.record.channels:
log.info("Removing channel '%s' from record.", channel_name)
del self.record.channels[channel_name]
else:
log.error("Channel '%s' not found in record.", channel_name)

async def find_existing_record(self) -> Union[RecordModel, None]:
"""
Using the ID, check if the object's record is currently stored in the database
Expand Down
21 changes: 16 additions & 5 deletions operationsgateway_api/src/records/waveform.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from io import BytesIO
import json
import logging
from typing import Optional

from botocore.exceptions import ClientError
import matplotlib
Expand Down Expand Up @@ -35,17 +36,27 @@ def to_json(self):
b.seek(0)
return b

def insert_waveform(self) -> None:
def insert_waveform(self) -> Optional[str]:
"""
Store the waveform from this object in Echo
"""
log.info("Storing waveform: %s", self.waveform.path)
bytes_json = self.to_json()
echo = EchoInterface()
echo.upload_file_object(
bytes_json,
Waveform.get_full_path(self.waveform.path),
)
try:
echo.upload_file_object(
bytes_json,
Waveform.get_full_path(self.waveform.path),
)
return None # Successful upload
except EchoS3Error:
# Extract the channel name and propagate it
channel_name = self.get_channel_name_from_id()
log.exception(
"Failed to upload waveform for channel '%s'",
channel_name,
)
return channel_name

def create_thumbnail(self) -> None:
"""
Expand Down
42 changes: 35 additions & 7 deletions operationsgateway_api/src/routes/ingest_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,24 +111,52 @@ async def submit_hdf(
record = Record(record_data)

log.debug("Processing waveforms")
failed_waveform_uploads = []
for w in waveforms:
waveform = Waveform(w)
waveform.insert_waveform()
waveform.create_thumbnail()
record.store_thumbnail(waveform)

# Call insert_waveform and track failures
failed_upload = waveform.insert_waveform() # Returns channel name if failed
# if the upload to echo fails, don't process the waveform any further
if failed_upload:
failed_waveform_uploads.append(failed_upload)
else:
waveform.create_thumbnail()
record.store_thumbnail(waveform) # in the record not echo

# This section distributes the Image.upload_image calls across
# the threads in the pool.It takes the image_instances list,
# applies the Image.upload_image function to each item, and collects
# the return values in upload_results. The map function blocks the main
# thread until all the tasks in the pool are complete.
log.debug("Processing images")
failed_image_uploads = []
image_instances = [Image(i) for i in images]
for image in image_instances:
image.create_thumbnail()
record.store_thumbnail(image)

record.store_thumbnail(image) # in the record not echo
if len(image_instances) > 0:
pool = ThreadPool(processes=Config.config.images.upload_image_threads)
pool.map(Image.upload_image, image_instances)
upload_results = pool.map(Image.upload_image, image_instances)
# Filter out successful uploads, collect only failed ones
failed_image_uploads = [channel for channel in upload_results if channel]
pool.close()
image_instances = None

# Combine failed channels from waveforms and images and remove them from the record
# Update the channel checker to reflect failed uploads
all_failed_upload_channels = failed_waveform_uploads + failed_image_uploads
for channel in all_failed_upload_channels:
record.remove_channel(channel)
if channel in checker_response["accepted_channels"]:
# Remove from accepted_channels and add to rejected_channels
checker_response["accepted_channels"].remove(channel)
checker_response["rejected_channels"][channel] = ["Upload to Echo failed"]
elif channel in checker_response["rejected_channels"]:
# Append the failure reason to the existing reasons
checker_response["rejected_channels"][channel].append(
"Upload to Echo failed",
)

if stored_record and accept_type == "accept_merge":
log.debug(
"Record matching ID %s already exists in the database, updating existing"
Expand Down
Loading