-
Notifications
You must be signed in to change notification settings - Fork 7
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #802 from cisagov/AL-add-user-logs
Add User Event Logs Functionality
- Loading branch information
Showing
11 changed files
with
664 additions
and
9 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
144 changes: 144 additions & 0 deletions
144
backend/src/xfd_django/xfd_api/api_methods/user_log_search.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,144 @@ | ||
"""User log search.""" | ||
|
||
# Standard Python Libraries | ||
from datetime import datetime | ||
import json | ||
import re | ||
import traceback | ||
from typing import Any, Dict | ||
|
||
# Third-Party Libraries | ||
from django.db.models import Q | ||
from fastapi import HTTPException | ||
|
||
from ..auth import is_global_view_admin | ||
from ..models import Log | ||
|
||
|
||
def parse_query_string(query): | ||
""" | ||
Parse a query string into a dictionary for JSONField filtering. | ||
Example Input: "user.id:12345 user.name:John Doe" | ||
Output: {"user__id": "12345", "user__name": "John Doe"} | ||
""" | ||
result = {} | ||
# Match key:value pairs, allowing values with spaces | ||
pattern = re.compile(r'(\w+(\.\w+)*)\s*:\s*("[^"]+"|\'[^\']+\'|\S+)') | ||
matches = pattern.findall(query) | ||
|
||
for match in matches: | ||
key, _, value = match | ||
# Remove quotes if present | ||
if (value.startswith('"') and value.endswith('"')) or ( | ||
value.startswith("'") and value.endswith("'") | ||
): | ||
value = value[1:-1] | ||
# Replace dots with double underscores for Django ORM | ||
orm_key = key.replace(".", "__") | ||
result[orm_key] = value | ||
return result | ||
|
||
|
||
def generate_date_condition(filter_obj: Dict[str, Any]) -> Q: | ||
"""Generate date condition.""" | ||
operator = filter_obj.get("operator") | ||
value = filter_obj.get("value", "") | ||
|
||
try: | ||
date_obj = datetime.fromisoformat(value) | ||
except ValueError: | ||
raise ValueError("Invalid date format. Use ISO format.") | ||
|
||
if operator == "is": | ||
return Q(createdAt__exact=date_obj) | ||
elif operator == "not": | ||
return ~Q(createdAt__exact=date_obj) | ||
elif operator == "after": | ||
return Q(createdAt__gt=date_obj) | ||
elif operator == "onOrAfter": | ||
return Q(createdAt__gte=date_obj) | ||
elif operator == "before": | ||
return Q(createdAt__lt=date_obj) | ||
elif operator == "onOrBefore": | ||
return Q(createdAt__lte=date_obj) | ||
elif operator == "empty": | ||
return Q(createdAt__isnull=True) | ||
elif operator == "notEmpty": | ||
return Q(createdAt__isnull=False) | ||
else: | ||
raise ValueError("Invalid date operator.") | ||
|
||
|
||
def generate_filter_qs(search: Dict[str, Any]) -> Q: | ||
"""Generate a Q object based on the search filters.""" | ||
q = Q() | ||
if "eventType" in search and search["eventType"]: | ||
event_filter = search["eventType"] | ||
q &= Q(eventType__icontains=event_filter["value"]) | ||
|
||
if "result" in search and search["result"]: | ||
result_filter = search["result"] | ||
q &= Q(result__icontains=result_filter["value"]) | ||
|
||
if "timestamp" in search and search["timestamp"]: | ||
timestamp_filter = search["timestamp"] | ||
# Use the correct field name "createdAt" instead of "created_at" | ||
q &= generate_date_condition(timestamp_filter) | ||
|
||
if "payload" in search and search["payload"]: | ||
payload_filters = parse_query_string(search["payload"]) | ||
for key, value in payload_filters.items(): | ||
# This assumes your keys in the payload match your search keys. | ||
q &= Q(**{f"payload__{key}": value}) | ||
|
||
return q | ||
|
||
|
||
# POST: /log/search | ||
def search_logs(search_data, current_user): | ||
"""Search logs based on filters.""" | ||
try: | ||
# Check if the user is a GlobalViewAdmin | ||
if not is_global_view_admin(current_user): | ||
raise HTTPException(status_code=403, detail="Unauthorized access.") | ||
|
||
# Convert Pydantic model to dict and remove None values | ||
search_dict = search_data.dict(exclude_unset=True) | ||
|
||
# Generate Q object for filters | ||
q_object = generate_filter_qs(search_dict) | ||
|
||
# As Django ORM is synchronous, use sync_to_async | ||
logs_qs = Log.objects.filter(q_object) | ||
|
||
# Get count | ||
count = logs_qs.count() | ||
|
||
# Serialize logs | ||
logs_serialized = [] | ||
for log in logs_qs: | ||
try: | ||
payload_dict = json.loads(log.payload) | ||
except (ValueError, TypeError): | ||
# If somehow it's not valid JSON, just keep it as a string | ||
payload_dict = log.payload | ||
|
||
logs_serialized.append( | ||
{ | ||
"id": str(log.id), | ||
"eventType": log.eventType, | ||
"result": log.result, | ||
"payload": payload_dict, | ||
"createdAt": log.createdAt.isoformat(), | ||
} | ||
) | ||
|
||
return logs_serialized, count | ||
|
||
except ValueError as ve: | ||
raise HTTPException(status_code=500, detail=str(ve)) | ||
except Exception as e: | ||
print(e) | ||
print(traceback.format_exc()) | ||
raise HTTPException(status_code=500, detail=str(e)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
78 changes: 78 additions & 0 deletions
78
backend/src/xfd_django/xfd_api/schema_models/user_log_schema.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
"""User event log schema.""" | ||
|
||
# Standard Python Libraries | ||
from typing import Any, List, Optional | ||
|
||
# Third-Party Libraries | ||
from pydantic import BaseModel, validator | ||
|
||
|
||
class Filter(BaseModel): | ||
"""Filter schema.""" | ||
|
||
value: str | ||
operator: Optional[str] = "contains" | ||
|
||
@validator("operator") | ||
def validate_operator(cls, v): | ||
"""Validate operator.""" | ||
allowed = [ | ||
"contains", | ||
"exact", | ||
"iexact", | ||
"startswith", | ||
"istartswith", | ||
"endswith", | ||
"iendswith", | ||
] | ||
if v and v not in allowed: | ||
raise ValueError(f"Operator must be one of {allowed}") | ||
return v | ||
|
||
|
||
class DateFilter(BaseModel): | ||
"""Date filter schema.""" | ||
|
||
value: str | ||
operator: str | ||
|
||
@validator("operator") | ||
def validate_operator(cls, v): | ||
"""Validate operator.""" | ||
allowed = [ | ||
"is", | ||
"not", | ||
"after", | ||
"onOrAfter", | ||
"before", | ||
"onOrBefore", | ||
"empty", | ||
"notEmpty", | ||
] | ||
if v not in allowed: | ||
raise ValueError(f"Operator must be one of {allowed}") | ||
return v | ||
|
||
|
||
class LogSearch(BaseModel): | ||
"""Log search schema.""" | ||
|
||
eventType: Optional[Filter] = None | ||
result: Optional[Filter] = None | ||
timestamp: Optional[DateFilter] = None | ||
payload: Optional[str] = None | ||
|
||
@validator("payload") | ||
def validate_payload(cls, v): | ||
"""Validate payload.""" | ||
if v: | ||
if not isinstance(v, str): | ||
raise ValueError("Payload must be a string") | ||
return v | ||
|
||
|
||
class LogSearchResponse(BaseModel): | ||
"""Log search response model.""" | ||
|
||
result: List[Any] | ||
count: int |
Oops, something went wrong.