Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Stats endpoints, settings and management commands #707

Merged
merged 11 commits into from
Nov 20, 2024
6 changes: 3 additions & 3 deletions backend/src/xfd_django/xfd_api/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
from .models import ApiKey, OrganizationTag, User

# JWT_ALGORITHM = "RS256"
JWT_SECRET = os.getenv("JWT_SECRET")
JWT_SECRET = settings.JWT_SECRET
SECRET_KEY = settings.SECRET_KEY
JWT_ALGORITHM = "HS256"
JWT_TIMEOUT_HOURS = 4
JWT_ALGORITHM = settings.JWT_ALGORITHM
JWT_TIMEOUT_HOURS = settings.JWT_TIMEOUT_HOURS

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
api_key_header = APIKeyHeader(name="X-API-KEY", auto_error=False)
Expand Down
Empty file.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from django.core.management.base import BaseCommand
from django.conf import settings
import redis
import json

class Command(BaseCommand):
help = 'Inspects and displays the most_common_vulnerabilities from AWS ElastiCache (Redis).'

def handle(self, *args, **options):
"""
Connects to AWS ElastiCache (Redis), retrieves the 'most_common_vulnerabilities' key,
and displays its contents.
"""

try:
# Initialize Redis client
redis_client = redis.StrictRedis(
host=settings.ELASTICACHE_ENDPOINT,
port=6379,
db=0,
decode_responses=True # Automatically decode responses as UTF-8 strings
)

# Check if the key exists
if redis_client.exists('most_common_vulnerabilities'):
self.stdout.write(self.style.SUCCESS("'most_common_vulnerabilities' key exists in Redis."))

# Retrieve the JSON string
vulnerabilities_json = redis_client.get('most_common_vulnerabilities')

if vulnerabilities_json:
# Parse the JSON string into Python objects
vulnerabilities_data = json.loads(vulnerabilities_json)
self.stdout.write("Most Common Vulnerabilities data:")
for vuln in vulnerabilities_data:
self.stdout.write(
f"Title: {vuln.get('title')}, "
f"Description: {vuln.get('description')}, "
f"Severity: {vuln.get('severity')}, "
f"Count: {vuln.get('count')}, "
f"Domain: {vuln.get('domain')}"
)
else:
self.stdout.write(self.style.WARNING("'most_common_vulnerabilities' key is empty."))
else:
self.stdout.write(self.style.WARNING("'most_common_vulnerabilities' key does not exist in Redis."))

except redis.ConnectionError as conn_err:
self.stderr.write(self.style.ERROR(f"Redis connection error: {conn_err}"))
except redis.RedisError as redis_err:
self.stderr.write(self.style.ERROR(f"Redis error: {redis_err}"))
except json.JSONDecodeError as json_err:
self.stderr.write(self.style.ERROR(f"JSON decode error: {json_err}"))
except Exception as e:
self.stderr.write(self.style.ERROR(f"An unexpected error occurred: {e}"))
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from django.core.management.base import BaseCommand
from django.contrib.auth import get_user_model
from xfd_api.models import User # Adjust if your User model is in a different app
from xfd_api.models import Role, Organization # Import your models
from xfd_api.auth import get_org_memberships # Import the function

#TO run the command: python manage.py test_get_org_memberships 090d87fd-a7be-4bdc-b3a9-5c0bbf24dc40


class Command(BaseCommand):
help = 'Test get_org_memberships function for a given user'

def add_arguments(self, parser):
parser.add_argument(
'user_identifier',
type=str,
help='User ID or email to identify the user',
)

def handle(self, *args, **options):
user_identifier = options['user_identifier']

# Try to retrieve the user by ID or email
try:
user = User.objects.get(id=user_identifier)
user = user.id



except (User.DoesNotExist, ValueError):
try:
user = User.objects.get(email=user_identifier)
except User.DoesNotExist:
self.stderr.write(
self.style.ERROR(f'User with ID or email "{user_identifier}" does not exist.')
)
return

# Call the get_org_memberships function
org_memberships = get_org_memberships(user)

# Output the results
if org_memberships:
self.stdout.write(
self.style.SUCCESS(
f'User "{user}" is a member of organizations: {org_memberships}'
)
)
else:
self.stdout.write(
self.style.WARNING(f'User "{user}" is not a member of any organizations.')
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from django.core.management.base import BaseCommand
from xfd_api.models import Role

class Command(BaseCommand):
help = 'Finds user IDs associated with services having ports.'

def handle(self, *args, **options):
# Find user IDs associated with services that have ports
user_ids_with_ports = Role.objects.filter(
organizationId__domain__service__port__isnull=False
).values_list('userId', flat=True).distinct()

# Convert the QuerySet to a list for better readability
user_ids_list = list(user_ids_with_ports)

# Display the user IDs
if user_ids_list:
self.stdout.write(self.style.SUCCESS(
f"User IDs with ports in service table: {user_ids_list}"
))
# Pick the first user ID for testing
test_user_id = user_ids_list[0]
self.stdout.write(f"Using user_id: {test_user_id} for testing")
else:
self.stdout.write(self.style.WARNING(
"No users found with services having ports."
))
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from django.core.management.base import BaseCommand
from xfd_api.models import User, Role, Domain
from django.db.models import Q

class Command(BaseCommand):
help = 'Lists user IDs and emails of users who have domains via their organizations.'

def handle(self, *args, **options):
# Step 1: Get organization IDs that have domains
organization_ids_with_domains = Domain.objects.values_list('organizationId', flat=True).distinct()

# Step 2: Get user IDs that have roles in these organizations
user_ids_with_domains = Role.objects.filter(
organizationId__in=organization_ids_with_domains
).values_list('userId', flat=True).distinct()

# Step 3: Get the count of user IDs
user_count = user_ids_with_domains.count()

# Output the count
self.stdout.write(f"Total users with domains: {user_count}")

# Step 4: Fetch users
users_with_domains = User.objects.filter(id__in=user_ids_with_domains)

# Step 5: Output the user IDs and emails
for user in users_with_domains:
self.stdout.write(f"User ID: {user.id}, Email: {user.email}")
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from django.core.management.base import BaseCommand
from xfd_api.auth import get_user_domains # Adjust the import path as needed
import asyncio

class Command(BaseCommand):
help = 'Tests the get_user_domains function for a given user_id and prints the output.'

def add_arguments(self, parser):
parser.add_argument(
'user_id',
type=str,
help='The ID of the user to retrieve domains for.'
)

def handle(self, *args, **options):
user_id = options['user_id']

try:
# Call the synchronous get_user_domains function
domains = get_user_domains(user_id)

# Print the result
if domains:
self.stdout.write(self.style.SUCCESS(f"Domains for user_id {user_id}:"))
for domain in domains:
self.stdout.write(f"- {domain}")
else:
self.stdout.write(self.style.WARNING(f"No domains found for user_id {user_id}."))

except Exception as e:
self.stderr.write(self.style.ERROR(f"An error occurred: {e}"))
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
from django.core.management.base import BaseCommand
from xfd_api.models import Role, Domain, Service, User # Adjust the import paths as necessary

class Command(BaseCommand):
help = 'Test get_user_service_ids function for a given user ID'

def add_arguments(self, parser):
parser.add_argument(
'user_id',
type=str,
help='User ID to retrieve service IDs for',
)

def handle(self, *args, **options):
user_id = options['user_id']

# Verify that the user exists
try:
user = User.objects.get(id=user_id)
except User.DoesNotExist:
self.stderr.write(self.style.ERROR(f'User with ID {user_id} does not exist.'))
return

# Call the get_user_service_ids function
service_ids = self.get_user_service_ids(user_id)

if service_ids:
self.stdout.write(self.style.SUCCESS(f'Service IDs for user {user_id}:'))
for service_id in service_ids:
self.stdout.write(f'- {service_id}')
else:
self.stdout.write(self.style.WARNING(f'No services found for user {user_id}.'))

def get_user_service_ids(self, user_id):
"""
Retrieves service IDs associated with the organizations the user belongs to.
"""
# Get organization IDs the user is a member of
organization_ids = Role.objects.filter(userId=user_id).values_list('organizationId', flat=True)

# Debug: Print organization IDs
self.stdout.write(f'Organization IDs for user {user_id}: {list(organization_ids)}')

# Get domain IDs associated with these organizations
domain_ids = Domain.objects.filter(organizationId__in=organization_ids).values_list('id', flat=True)

# Debug: Print domain IDs
self.stdout.write(f'Domain IDs for organizations: {list(domain_ids)}')

# Get service IDs associated with these domains
service_ids = Service.objects.filter(domainId__in=domain_ids).values_list('id', flat=True)

# Debug: Print service IDs
self.stdout.write(f'Service IDs for domains: {list(service_ids)}')

return list(map(str, service_ids)) # Convert UUIDs to strings if necessary
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from django.core.management.base import BaseCommand
from django.conf import settings
import redis
import json



class Command(BaseCommand):
help = 'Inspects and displays the latest_vulnerabilities from AWS ElastiCache (Redis).'

def handle(self, *args, **options):
"""
Connects to AWS ElastiCache (Redis), retrieves the 'latest_vulnerabilities' key,
and displays its contents.
"""

try:
# Initialize Redis client
redis_client = redis.StrictRedis(
host=settings.ELASTICACHE_ENDPOINT,
port=6379,
db=0,
decode_responses=True # Automatically decode responses as UTF-8 strings
)

# Check if the key exists
if redis_client.exists('latest_vulnerabilities'):
self.stdout.write(self.style.SUCCESS("'latest_vulnerabilities' key exists in Redis."))

# Retrieve the JSON string
vulnerabilities_json = redis_client.get('latest_vulnerabilities')

if vulnerabilities_json:
# Parse the JSON string into Python objects
vulnerabilities_data = json.loads(vulnerabilities_json)
self.stdout.write("Vulnerabilities data:")
for vuln in vulnerabilities_data:
self.stdout.write(
f"ID: {vuln['id']}, Title: {vuln['title']}, "
f"State: {vuln['state']}, CreatedAt: {vuln['createdAt']}, "
f"Domain: {vuln['domain']}"
)
else:
self.stdout.write(self.style.WARNING("'latest_vulnerabilities' key is empty."))
else:
self.stdout.write(self.style.WARNING("'latest_vulnerabilities' key does not exist in Redis."))

except redis.ConnectionError as conn_err:
self.stderr.write(self.style.ERROR(f"Redis connection error: {conn_err}"))
except redis.RedisError as redis_err:
self.stderr.write(self.style.ERROR(f"Redis error: {redis_err}"))
except json.JSONDecodeError as json_err:
self.stderr.write(self.style.ERROR(f"JSON decode error: {json_err}"))
except Exception as e:
self.stderr.write(self.style.ERROR(f"An unexpected error occurred: {e}"))
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from django.core.management.base import BaseCommand
from django.conf import settings
import redis
import json



class Command(BaseCommand):
help = 'Inspects and displays the num_vulnerabilities_stats from AWS ElastiCache (Redis).'

def handle(self, *args, **options):
"""
Connects to AWS ElastiCache (Redis), retrieves the 'num_vulnerabilities_stats' hash,
and displays its contents.
"""


try:
# Initialize Redis client
redis_client = redis.StrictRedis(
host=settings.ELASTICACHE_ENDPOINT,
port=6379,
db=0,
decode_responses=True
# Automatically decode responses as UTF-8 strings
)

# Check if the key exists
if redis_client.exists('num_vulnerabilities_stats'):
self.stdout.write(self.style.SUCCESS("'num_vulnerabilities_stats' key exists in Redis."))
vulnerabilities_stats = redis_client.hgetall('num_vulnerabilities_stats')

if vulnerabilities_stats:
self.stdout.write("Vulnerabilities data:")
for key, value in vulnerabilities_stats.items():
self.stdout.write(f"{key}: {value}")
else:
self.stdout.write(self.style.WARNING("'num_vulnerabilities_stats' key is empty."))
else:
self.stdout.write(self.style.WARNING("'num_vulnerabilities_stats' key does not exist in Redis."))

except redis.ConnectionError as conn_err:
self.stderr.write(self.style.ERROR(f"Redis connection error: {conn_err}"))
except redis.RedisError as redis_err:
self.stderr.write(self.style.ERROR(f"Redis error: {redis_err}"))
except Exception as e:
self.stderr.write(self.style.ERROR(f"An unexpected error occurred: {e}"))
Loading
Loading