-
Notifications
You must be signed in to change notification settings - Fork 7
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
11 changed files
with
600 additions
and
9 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
136 changes: 136 additions & 0 deletions
136
backend/src/xfd_django/xfd_api/api_methods/user_log_search.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,136 @@ | ||
"""User log search.""" | ||
|
||
from fastapi import HTTPException | ||
import traceback | ||
import json | ||
|
||
from typing import Dict, Any | ||
from django.db.models import Q | ||
from datetime import datetime | ||
import re | ||
from ..models import Log | ||
from ..auth import is_global_view_admin | ||
|
||
def parse_query_string(query): | ||
""" | ||
Parses a query string into a dictionary for JSONField filtering. | ||
Example Input: "user.id:12345 user.name:John Doe" | ||
Output: {"user__id": "12345", "user__name": "John Doe"} | ||
""" | ||
result = {} | ||
# Match key:value pairs, allowing values with spaces | ||
pattern = re.compile(r'(\w+(\.\w+)*)\s*:\s*("[^"]+"|\'[^\']+\'|\S+)') | ||
matches = pattern.findall(query) | ||
|
||
for match in matches: | ||
key, _, value = match | ||
# Remove quotes if present | ||
if (value.startswith('"') and value.endswith('"')) or (value.startswith("'") and value.endswith("'")): | ||
value = value[1:-1] | ||
# Replace dots with double underscores for Django ORM | ||
orm_key = key.replace('.', '__') | ||
result[orm_key] = value | ||
return result | ||
|
||
def generate_date_condition(filter_obj: Dict[str, Any]) -> Q: | ||
"""Generate date condition.""" | ||
operator = filter_obj.get('operator') | ||
value = filter_obj.get('value', "") | ||
|
||
try: | ||
date_obj = datetime.fromisoformat(value) | ||
except ValueError: | ||
raise ValueError('Invalid date format. Use ISO format.') | ||
|
||
if operator == 'is': | ||
return Q(createdAt__exact=date_obj) | ||
elif operator == 'not': | ||
return ~Q(createdAt__exact=date_obj) | ||
elif operator == 'after': | ||
return Q(createdAt__gt=date_obj) | ||
elif operator == 'onOrAfter': | ||
return Q(createdAt__gte=date_obj) | ||
elif operator == 'before': | ||
return Q(createdAt__lt=date_obj) | ||
elif operator == 'onOrBefore': | ||
return Q(createdAt__lte=date_obj) | ||
elif operator == 'empty': | ||
return Q(createdAt__isnull=True) | ||
elif operator == 'notEmpty': | ||
return Q(createdAt__isnull=False) | ||
else: | ||
raise ValueError('Invalid date operator.') | ||
|
||
|
||
def generate_filter_qs(search: Dict[str, Any]) -> Q: | ||
""" | ||
Generates a Q object based on the search filters. | ||
""" | ||
q = Q() | ||
if 'eventType' in search and search['eventType']: | ||
event_filter = search['eventType'] | ||
q &= Q(eventType__icontains=event_filter['value']) | ||
|
||
if 'result' in search and search['result']: | ||
result_filter = search['result'] | ||
q &= Q(result__icontains=result_filter['value']) | ||
|
||
if 'timestamp' in search and search['timestamp']: | ||
timestamp_filter = search['timestamp'] | ||
# Use the correct field name "createdAt" instead of "created_at" | ||
q &= generate_date_condition(timestamp_filter) | ||
|
||
if 'payload' in search and search['payload']: | ||
payload_filters = parse_query_string(search['payload']) | ||
for key, value in payload_filters.items(): | ||
# This assumes your keys in the payload match your search keys. | ||
q &= Q(**{f'payload__{key}': value}) | ||
|
||
return q | ||
|
||
# POST: /log/search | ||
def search_logs(search_data, current_user): | ||
"""Search logs based on filters.""" | ||
try: | ||
# Check if the user is a GlobalViewAdmin | ||
if not is_global_view_admin(current_user): | ||
raise HTTPException(status_code=403, detail="Unauthorized access.") | ||
|
||
# Convert Pydantic model to dict and remove None values | ||
search_dict = search_data.dict(exclude_unset=True) | ||
|
||
# Generate Q object for filters | ||
q_object = generate_filter_qs(search_dict) | ||
|
||
# As Django ORM is synchronous, use sync_to_async | ||
logs_qs = Log.objects.filter(q_object) | ||
|
||
# Get count | ||
count = logs_qs.count() | ||
|
||
# Serialize logs | ||
logs_serialized = [] | ||
for log in logs_qs: | ||
|
||
try: | ||
payload_dict = json.loads(log.payload) | ||
except (ValueError, TypeError): | ||
# If somehow it's not valid JSON, just keep it as a string | ||
payload_dict = log.payload | ||
|
||
logs_serialized.append({ | ||
"id": str(log.id), | ||
"eventType": log.eventType, | ||
"result": log.result, | ||
"payload": payload_dict, | ||
"createdAt": log.createdAt.isoformat() | ||
}) | ||
|
||
return logs_serialized, count | ||
|
||
except ValueError as ve: | ||
raise HTTPException(status_code=500, detail=str(ve)) | ||
except Exception as e: | ||
print(e) | ||
print(traceback.format_exc()) | ||
raise HTTPException(status_code=500, detail=str(e)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
45 changes: 45 additions & 0 deletions
45
backend/src/xfd_django/xfd_api/schema_models/user_log_schema.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
"""User event log schema.""" | ||
|
||
from typing import Optional, Any, List | ||
from pydantic import BaseModel, validator | ||
|
||
class Filter(BaseModel): | ||
value: str | ||
operator: Optional[str] = 'contains' | ||
|
||
@validator('operator') | ||
def validate_operator(cls, v): | ||
allowed = ['contains', 'exact', 'iexact', 'startswith', 'istartswith', 'endswith', 'iendswith'] | ||
if v and v not in allowed: | ||
raise ValueError(f'Operator must be one of {allowed}') | ||
return v | ||
|
||
class DateFilter(BaseModel): | ||
value: str | ||
operator: str | ||
|
||
@validator('operator') | ||
def validate_operator(cls, v): | ||
allowed = ['is', 'not', 'after', 'onOrAfter', 'before', 'onOrBefore', 'empty', 'notEmpty'] | ||
if v not in allowed: | ||
raise ValueError(f'Operator must be one of {allowed}') | ||
return v | ||
|
||
class LogSearch(BaseModel): | ||
eventType: Optional[Filter] = None | ||
result: Optional[Filter] = None | ||
timestamp: Optional[DateFilter] = None | ||
payload: Optional[str] = None | ||
|
||
@validator('payload') | ||
def validate_payload(cls, v): | ||
if v: | ||
if not isinstance(v, str): | ||
raise ValueError('Payload must be a string') | ||
return v | ||
|
||
class LogSearchResponse(BaseModel): | ||
"""Log search response model.""" | ||
result: List[Any] | ||
count: int | ||
|
Oops, something went wrong.