Skip to content

Commit

Permalink
helper for user auth
Browse files Browse the repository at this point in the history
  • Loading branch information
BenediktMKuehne committed Feb 24, 2025
1 parent 891fc8c commit 5c2eee7
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 26 deletions.
1 change: 1 addition & 0 deletions embark/dashboard/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
# view routing
urlpatterns = [
path('', views.main_dashboard, name='embark-MainDashboard'),
path('dashboard/', views.main_dashboard, name='embark-MainDashboard'),
path('dashboard/main/', views.main_dashboard, name='embark-MainDashboard'),
path('dashboard/service/', views.service_dashboard, name='embark-dashboard-service'),
path('dashboard/report/', views.report_dashboard, name='embark-ReportDashboard'),
Expand Down
32 changes: 19 additions & 13 deletions embark/dashboard/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from django.views.decorators.http import require_http_methods
from django.contrib import messages
from django.shortcuts import redirect
from embark.helper import user_is_staff
from embark.helper import user_is_auth
from tracker.forms import AssociateForm
from uploader.boundedexecutor import BoundedExecutor
from uploader.forms import LabelForm
Expand All @@ -34,12 +34,10 @@
@require_http_methods(["GET"])
@login_required(login_url='/' + settings.LOGIN_URL)
def main_dashboard(request):
if request.user.is_authenticated:
if FirmwareAnalysis.objects.filter(finished=True, failed=False).count() > 0 and Result.objects.filter(restricted=False).count() > 0:
return render(request, 'dashboard/mainDashboard.html', {'nav_switch': True, 'username': request.user.username})
messages.info(request, "Redirected - There are no Results to display yet")
return redirect('embark-uploader-home')
return HttpResponseForbidden
if FirmwareAnalysis.objects.filter(finished=True, failed=False).count() > 0 and Result.objects.filter(restricted=False).count() > 0:
return render(request, 'dashboard/mainDashboard.html', {'nav_switch': True, 'username': request.user.username})
messages.info(request, "Redirected - There are no Results to display yet")
return redirect('embark-uploader-home')


@permission_required("users.dashboard_permission_advanced", login_url='/')
Expand All @@ -61,7 +59,7 @@ def stop_analysis(request):
analysis = form.cleaned_data['analysis']
analysis_object_ = FirmwareAnalysis.objects.get(id=analysis.id)
# check if user auth
if request.user != analysis_object_.user:
if not user_is_auth(request.user, analysis_object_.user):
return HttpResponseForbidden("You are not authorized!")
logger.info("Stopping analysis with id %s", analysis_object_.id)
pid = analysis_object_.pid
Expand Down Expand Up @@ -142,7 +140,7 @@ def show_log(request, analysis_id):
logger.info("showing log for analyze_id: %s", analysis_id)
firmware = FirmwareAnalysis.objects.get(id=analysis_id)
# check if user auth TODO change to group auth
if request.user != firmware.user or not user_is_staff(request.user):
if not user_is_auth(request.user, firmware.user):
return HttpResponseForbidden("You are not authorized!")
# get the file path
log_file_path_ = f"{Path(firmware.path_to_logs).parent}/emba_run.log"
Expand All @@ -169,7 +167,7 @@ def show_logviewer(request, analysis_id):
logger.info("showing log viewer for analyze_id: %s", analysis_id)
firmware = FirmwareAnalysis.objects.get(id=analysis_id)
# check if user auth
if request.user != firmware.user or not user_is_staff(request.user):
if not user_is_auth(request.user, firmware.user):
return HttpResponseForbidden("You are not authorized!")
# get the file path
log_file_path_ = f"{Path(firmware.path_to_logs).parent}/emba_run.log"
Expand All @@ -194,7 +192,7 @@ def delete_analysis(request, analysis_id):
logger.info("Deleting analyze_id: %s", analysis_id)
analysis = FirmwareAnalysis.objects.get(id=analysis_id)
# check that the user is authorized
if request.user == analysis.user or request.user.is_superuser:
if user_is_auth(request.user, analysis.user):
if analysis.finished is False:
try:
BoundedExecutor.submit_kill(analysis.id)
Expand Down Expand Up @@ -229,7 +227,7 @@ def archive_analysis(request, analysis_id):
logger.info("Archiving Analysis with id: %s", analysis_id)
analysis = FirmwareAnalysis.objects.get(id=analysis_id)
# check if user auth
if request.user != analysis.user and not request.user.is_superuser:
if not user_is_auth(request.user, analysis.user):
return HttpResponseForbidden("You are not authorized!")
if analysis.zip_file is None:
# make archive for uuid
Expand All @@ -252,7 +250,7 @@ def hide_analysis(request, analysis_id):
logger.info("Hiding Analysis with id: %s", analysis_id)
analysis = FirmwareAnalysis.objects.get(id=analysis_id)
# check if user auth
if request.user != analysis.user and not request.user.is_superuser:
if not user_is_auth(request.user, analysis.user):
return HttpResponseForbidden("You are not authorized!")
analysis.hidden = True
analysis.save(update_fields=["hidden"])
Expand Down Expand Up @@ -290,6 +288,10 @@ def add_label(request, analysis_id):
logger.info("User %s tryied to add label %s", request.user.username, new_label.label_name)
# get analysis obj
analysis = FirmwareAnalysis.objects.get(id=analysis_id)
# check auth
if not user_is_auth(request.user, analysis.user):
messages.error(request, 'No permissions for this analysis')
return redirect('..')
analysis.label.add(new_label)
analysis.save()
messages.info(request, 'adding successful of ' + str(new_label))
Expand All @@ -310,6 +312,10 @@ def rm_label(request, analysis_id, label_name):
analysis = FirmwareAnalysis.objects.get(id=analysis_id)
# get lobel obj
label_obj = Label.objects.get(label_name=label_name)
# check auth
if not user_is_auth(request.user, analysis.user):
messages.error(request, 'Removing Label failed, no permissions')
return redirect('..')
analysis.label.remove(label_obj)
analysis.save()
messages.info(request, 'removing successful of ' + str(label_name))
Expand Down
12 changes: 10 additions & 2 deletions embark/embark/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,16 @@ def get_version_strings():
return embark_version, emba_version, stable_emba_version, container_version, nvd_version, github_emba_version


def user_is_staff(user):
return user.is_staff
def user_is_auth(req_user, own_user):
if req_user.is_superuser:
return True
elif req_user.is_staff:
return True
elif req_user.team == own_user.team:
return True
elif req_user.groups.filter(name='Administration_Group').exists() and own_user.team is None:
return True
return False


if __name__ == '__main__':
Expand Down
17 changes: 14 additions & 3 deletions embark/porter/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from django.shortcuts import render, redirect
from django.contrib import messages

from embark.helper import user_is_auth
from uploader.boundedexecutor import BoundedExecutor
from uploader.forms import DeviceForm, LabelForm, VendorForm
from uploader.models import FirmwareAnalysis
Expand Down Expand Up @@ -61,7 +62,8 @@ def import_read(request):
if form.is_valid():
logger.debug("Posted Form is valid")
zip_file_obj = form.cleaned_data['zip_log_file']
if zip_file_obj.user != request.user:
# check auth
if not user_is_auth(request.user, zip_file_obj.user):
logger.error("Permission denied - %s", request)
messages.error(request, "You don't have permission")
return redirect('..')
Expand Down Expand Up @@ -135,6 +137,10 @@ def import_delete(request):
logger.debug("Posted Form is valid")
zip_file = form.cleaned_data['zip_file']
logger.info("User %s tryied to delete %s", request.user.username, zip_file)
# check auth
if not user_is_auth(request.user, zip_file.user):
messages.error(request=request, message='Unauthorized')
return redirect('..')
zip_file.delete()
messages.info(request, 'delete successful.')
return redirect('..')
Expand Down Expand Up @@ -172,6 +178,10 @@ def export_analysis(request):
if form.is_valid():
logger.debug("Posted Form is valid")
analysis_obj = form.cleaned_data['analysis']
# check auth
if not user_is_auth(request.user, analysis_obj.user):
messages.error(request=request, message='Unauthorized')
return redirect('..')
response_data = result_json(analysis_obj.id)
return JsonResponse(data=response_data, status=HTTPStatus.OK)
messages.error(request=request, message='form invalid')
Expand All @@ -189,7 +199,7 @@ def make_zip(request, analysis_id):
try:
analysis = FirmwareAnalysis.objects.get(id=analysis_id)
# check that the user is authorized
if request.user == analysis.user or request.user.is_superuser:
if user_is_auth(request.user, analysis.user):
if BoundedExecutor.submit_zip(uuid=analysis_id) is not None:
# success
logger.info("Successfully submitted zip request %s", str(analysis_id))
Expand Down Expand Up @@ -217,7 +227,8 @@ def retry_import(request):
if form.is_valid():
logger.debug("Posted Form is valid")
analysis = form.cleaned_data['analysis']
if analysis.user != request.user:
# check auth
if not user_is_auth(request.user, analysis.user):
logger.error("Permission denied - %s", request)
messages.error(request, "You don't have permission")
return redirect('..')
Expand Down
13 changes: 7 additions & 6 deletions embark/reporter/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from django.contrib.auth.decorators import login_required, permission_required
from django.views.decorators.http import require_http_methods
from django.urls import reverse
from embark.helper import cleanup_charfield
from embark.helper import cleanup_charfield, user_is_auth
from uploader.boundedexecutor import BoundedExecutor

from uploader.models import FirmwareAnalysis, ResourceTimestamp
Expand Down Expand Up @@ -60,10 +60,11 @@ def html_report(request, analysis_id, html_file):
report_path = Path(f'{settings.EMBA_LOG_ROOT}/{analysis_id}/emba_logs/html-report/{html_file}')
if FirmwareAnalysis.objects.filter(id=analysis_id).exists() and bool(re.match(html_file_pattern, html_file)):
analysis = FirmwareAnalysis.objects.get(id=analysis_id)
if analysis.hidden is False or analysis.user == request.user or request.user.is_superuser:
html_body = get_template(report_path)
logger.debug("html_report - analysis_id: %s html_file: %s", analysis_id, html_file)
return HttpResponse(html_body.render({'embarkBackUrl': reverse('embark-ReportDashboard')}))
if user_is_auth(request.user, analysis.user):
if (analysis.hidden and analysis.user == request.user) or request.user.is_superuser:
html_body = get_template(report_path)
logger.debug("html_report - analysis_id: %s html_file: %s", analysis_id, html_file)
return HttpResponse(html_body.render({'embarkBackUrl': reverse('embark-ReportDashboard')}))
messages.error(request, "User not authorized")
logger.error("could not get template - %s", request)
return redirect("..")
Expand All @@ -81,7 +82,7 @@ def html_report_path(request, analysis_id, html_path, file):
file_pattern = re.compile(r'^[\w\.-]+\.(tar.gz|html)$')
if FirmwareAnalysis.objects.filter(id=analysis_id).exists() and bool(re.match(file_pattern, file)):
analysis = FirmwareAnalysis.objects.get(id=analysis_id)
if analysis.hidden is False or analysis.user == request.user or request.user.is_superuser:
if analysis.user == request.user or (not analysis.hidden and user_is_auth(request.user, analysis.user)):
resource_path = f'{settings.EMBA_LOG_ROOT}/{analysis_id}/emba_logs/html-report/{html_path}/{file}'
parent_path = os.path.abspath(f'{settings.EMBA_LOG_ROOT}/{analysis_id}/emba_logs/html-report/')
if os.path.commonpath([parent_path, resource_path]) == parent_path:
Expand Down
2 changes: 2 additions & 0 deletions embark/templates/dashboard/reportDashboard.html
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,12 @@
<form action={% url 'embark-html-report-index' firmware.id 'index.html' %} method='get'>
<button type="submit" class="btn buttonRowElem" >Open Report</button>
</form>

<form action={% url 'embark-IndividualReportDashboard' firmware.id %} method='get'>
<button type="submit" class="btn buttonRowElem" >Detail View</button>
</form>
<form action={% url 'embark-uploader-home' %} method='get'>
<!--We should try and pre-select the original scan settings-->
<button type="submit" class="btn buttonRowElem" >Rescan/delete</button>
</form>
{% if firmware.archived is False %}
Expand Down
8 changes: 6 additions & 2 deletions embark/uploader/boundedexecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,17 +103,21 @@ def run_emba_cmd(cls, cmd, analysis_id=None, active_analyzer_dir=None):
# get csv log location
csv_log_location = f"{settings.EMBA_LOG_ROOT}/{analysis_id}/emba_logs/csv_logs/f50_base_aggregator.csv"
sbom_log_location = f"{settings.EMBA_LOG_ROOT}/{analysis_id}/emba_logs/SBOM/EMBA_cyclonedx_sbom.json"
error_log_location = f"{settings.EMBA_LOG_ROOT}/{analysis_id}/emba_logs/emba_error.log"

# read f50_aggregator and store it into a Result form
logger.info('Reading report from: %s', csv_log_location)
logger.debug("contents of that dir are %r", Path(csv_log_location).exists())
# if Path(csv_log_location).exists:
if Path(csv_log_location).is_file() or Path(sbom_log_location).is_file():
cls.csv_read(analysis_id=analysis_id, _path=csv_log_location, _cmd=cmd)
else:
elif Path(error_log_location).is_file():
logger.error("No importable log file %s for report: %s generated", csv_log_location, analysis_id)
logger.error("EMBA run was probably not successful!")
logger.error("EMBA run was not successful!")
exit_fail = True
else:
logger.error("EMBA run was probably not successful!")
logger.error("Please check this manually and create a bug report!!")

# take care of cleanup
if active_analyzer_dir:
Expand Down

0 comments on commit 5c2eee7

Please sign in to comment.