Skip to content

Commit

Permalink
feat: API to list executions and filter by entity type (#1103)
Browse files Browse the repository at this point in the history
* feat: API to list executions and filter by entity and date range with pagination, ordering

* Moved execution app into workflow_manager, added API to list file executions for an execution with latest logs, schema migration for indexes on some columns

---------

Co-authored-by: Gayathri <142381512+gaya3-zipstack@users.noreply.github.com>
  • Loading branch information
chandrasekharan-zipstack and gaya3-zipstack authored Feb 10, 2025
1 parent b7b81e3 commit cad3bd5
Show file tree
Hide file tree
Showing 22 changed files with 281 additions and 13 deletions.
1 change: 1 addition & 0 deletions backend/backend/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ def get_required_setting(
"workflow_manager.file_execution",
"workflow_manager.endpoint_v2",
"workflow_manager.workflow_v2",
"workflow_manager.execution",
"tool_instance_v2",
"pipeline_v2",
"platform_settings_v2",
Expand Down
1 change: 1 addition & 0 deletions backend/backend/urls_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,5 @@
include("prompt_studio.prompt_studio_index_manager_v2.urls"),
),
path("tags/", include("tags.urls")),
path("execution/", include("workflow_manager.execution.urls")),
]
8 changes: 3 additions & 5 deletions backend/pipeline_v2/execution_view.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from permissions.permission import IsOwner
from pipeline_v2.serializers.execute import DateRangeSerializer
from rest_framework import viewsets
from rest_framework.versioning import URLPathVersioning
from utils.date import DateRangeKeys, DateRangeSerializer
from utils.pagination import CustomPagination
from workflow_manager.workflow_v2.models.execution import WorkflowExecution
from workflow_manager.workflow_v2.serializers import WorkflowExecutionSerializer
Expand All @@ -14,8 +14,6 @@ class PipelineExecutionViewSet(viewsets.ModelViewSet):
pagination_class = CustomPagination

CREATED_AT_FIELD_DESC = "-created_at"
START_DATE_FIELD = "start_date"
END_DATE_FIELD = "end_date"

def get_queryset(self):
# Get the pipeline_id from the URL path
Expand All @@ -25,8 +23,8 @@ def get_queryset(self):
# Validate start_date and end_date parameters using DateRangeSerializer
date_range_serializer = DateRangeSerializer(data=self.request.query_params)
date_range_serializer.is_valid(raise_exception=True)
start_date = date_range_serializer.validated_data.get(self.START_DATE_FIELD)
end_date = date_range_serializer.validated_data.get(self.END_DATE_FIELD)
start_date = date_range_serializer.validated_data.get(DateRangeKeys.START_DATE)
end_date = date_range_serializer.validated_data.get(DateRangeKeys.END_DATE)

if start_date and end_date:
queryset = queryset.filter(created_at__range=(start_date, end_date))
Expand Down
5 changes: 0 additions & 5 deletions backend/pipeline_v2/serializers/execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,3 @@ def validate_pipeline_id(self, value: str) -> str:
except Pipeline.DoesNotExist:
raise serializers.ValidationError("Invalid pipeline ID")
return value


class DateRangeSerializer(serializers.Serializer):
start_date = serializers.DateTimeField(required=False)
end_date = serializers.DateTimeField(required=False)
1 change: 0 additions & 1 deletion backend/usage_v2/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ def filter_date_range(cls, value: str) -> Optional[DateRange]:
if not preset:
return None
start_date, end_date = preset.get_date_range()
print(start_date, end_date)
return cls._validate_date_range(start_date=start_date, end_date=end_date)

@classmethod
Expand Down
2 changes: 2 additions & 0 deletions backend/utils/date/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .constants import DateRangeKeys # noqa: F401
from .serializer import DateRangeSerializer # noqa: F401
3 changes: 3 additions & 0 deletions backend/utils/date/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
class DateRangeKeys:
START_DATE = "start_date"
END_DATE = "end_date"
6 changes: 6 additions & 0 deletions backend/utils/date/serializer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from rest_framework import serializers


class DateRangeSerializer(serializers.Serializer):
start_date = serializers.DateTimeField(required=False)
end_date = serializers.DateTimeField(required=False)
Empty file.
6 changes: 6 additions & 0 deletions backend/workflow_manager/execution/apps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from django.apps import AppConfig


class ExecutionConfig(AppConfig):
default_auto_field = "django.db.models.BigAutoField"
name = "workflow_manager.execution"
8 changes: 8 additions & 0 deletions backend/workflow_manager/execution/enum.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from enum import Enum


class ExecutionEntity(Enum):
ETL = "ETL"
API = "API"
TASK = "TASK"
WORKFLOW = "WF"
2 changes: 2 additions & 0 deletions backend/workflow_manager/execution/serializer/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .execution import ExecutionSerializer # noqa: F401
from .file_centric import FileCentricExecutionSerializer # noqa: F401
22 changes: 22 additions & 0 deletions backend/workflow_manager/execution/serializer/execution.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from typing import Optional

from rest_framework import serializers
from workflow_manager.workflow_v2.models import WorkflowExecution


# TODO: Optimize with select_related / prefetch_related to reduce DB queries
class ExecutionSerializer(serializers.ModelSerializer):
workflow_name = serializers.SerializerMethodField()
pipeline_name = serializers.SerializerMethodField()

class Meta:
model = WorkflowExecution
exclude = ["task_id", "execution_log_id", "execution_type"]

def get_workflow_name(self, obj: WorkflowExecution) -> Optional[str]:
"""Fetch the workflow name using workflow_id"""
return obj.workflow_name

def get_pipeline_name(self, obj: WorkflowExecution) -> Optional[str]:
"""Fetch the pipeline or API deployment name"""
return obj.pipeline_name
22 changes: 22 additions & 0 deletions backend/workflow_manager/execution/serializer/file_centric.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from typing import Optional

from rest_framework import serializers
from workflow_manager.file_execution.models import (
WorkflowFileExecution as FileExecution,
)


class FileCentricExecutionSerializer(serializers.ModelSerializer):
latest_log = serializers.SerializerMethodField()

class Meta:
model = FileExecution
exclude = ["file_hash"]

def get_latest_log(self, obj: FileExecution) -> Optional[dict[str, any]]:
latest_log = (
obj.execution_logs.exclude(data__log_level__in=["DEBUG", "WARN"])
.order_by("-event_time")
.first()
)
return latest_log.data if latest_log else None
25 changes: 25 additions & 0 deletions backend/workflow_manager/execution/urls.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from django.urls import path
from rest_framework.urlpatterns import format_suffix_patterns
from workflow_manager.execution.views import (
ExecutionViewSet,
FileCentricExecutionViewSet,
)
from workflow_manager.workflow_v2.execution_log_view import (
WorkflowExecutionLogViewSet as ExecutionLogViewSet,
)

execution_list = ExecutionViewSet.as_view(
{
"get": "list",
}
)
file_centric_list = FileCentricExecutionViewSet.as_view({"get": "list"})
execution_log_list = ExecutionLogViewSet.as_view({"get": "list"})

urlpatterns = format_suffix_patterns(
[
path("", execution_list, name="execution-list"),
path("<uuid:pk>/files/", file_centric_list, name="file-centric-execution-list"),
path("<uuid:pk>/logs/", execution_log_list, name="execution-log"),
]
)
2 changes: 2 additions & 0 deletions backend/workflow_manager/execution/views/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .execution import ExecutionViewSet # noqa: F401
from .file_centric import FileCentricExecutionViewSet # noqa: F401
69 changes: 69 additions & 0 deletions backend/workflow_manager/execution/views/execution.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import logging
from typing import Optional

from api_v2.models import APIDeployment
from django.db.models import Q, QuerySet
from pipeline_v2.models import Pipeline
from rest_framework import viewsets
from rest_framework.filters import OrderingFilter
from rest_framework.permissions import IsAuthenticated
from utils.date import DateRangeKeys, DateRangeSerializer
from utils.pagination import CustomPagination
from workflow_manager.execution.enum import ExecutionEntity
from workflow_manager.execution.serializer import ExecutionSerializer
from workflow_manager.workflow_v2.models import Workflow, WorkflowExecution

logger = logging.getLogger(__name__)


class ExecutionViewSet(viewsets.ReadOnlyModelViewSet):
permission_classes = [IsAuthenticated]
serializer_class = ExecutionSerializer
pagination_class = CustomPagination
filter_backends = [OrderingFilter]
ordering_fields = ["created_at"]
ordering = ["-created_at"]

def get_queryset(self) -> Optional[QuerySet]:
execution_entity = self.request.query_params.get("execution_entity")

queryset = WorkflowExecution.objects.all()

# Filter based on execution entity
if execution_entity == ExecutionEntity.API:
queryset = queryset.filter(
pipeline_id__in=APIDeployment.objects.values_list("id", flat=True)
)
elif execution_entity == ExecutionEntity.ETL:
queryset = queryset.filter(
pipeline_id__in=Pipeline.objects.filter(
pipeline_type=Pipeline.PipelineType.ETL
).values_list("id", flat=True)
)
elif execution_entity == ExecutionEntity.TASK:
queryset = queryset.filter(
pipeline_id__in=Pipeline.objects.filter(
pipeline_type=Pipeline.PipelineType.TASK
).values_list("id", flat=True)
)
elif execution_entity == ExecutionEntity.WORKFLOW:
queryset = queryset.filter(
pipeline_id=None,
workflow_id__in=Workflow.objects.values_list("id", flat=True),
)

# Parse and apply date filters
date_range_serializer = DateRangeSerializer(data=self.request.query_params)
date_range_serializer.is_valid(raise_exception=True)

filters = Q()
if start_date := date_range_serializer.validated_data.get(
DateRangeKeys.START_DATE
):
filters &= Q(created_at__gte=start_date)
if end_date := date_range_serializer.validated_data.get(DateRangeKeys.END_DATE):
filters &= Q(created_at__lte=end_date)

queryset = queryset.filter(filters)

return queryset
25 changes: 25 additions & 0 deletions backend/workflow_manager/execution/views/file_centric.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import logging

from rest_framework import viewsets
from rest_framework.filters import OrderingFilter
from rest_framework.permissions import IsAuthenticated
from utils.pagination import CustomPagination
from workflow_manager.execution.serializer import FileCentricExecutionSerializer
from workflow_manager.file_execution.models import (
WorkflowFileExecution as FileExecution,
)

logger = logging.getLogger(__name__)


class FileCentricExecutionViewSet(viewsets.ReadOnlyModelViewSet):
permission_classes = [IsAuthenticated]
serializer_class = FileCentricExecutionSerializer
pagination_class = CustomPagination
filter_backends = [OrderingFilter]
ordering_fields = ["created_at"]
ordering = ["created_at"]

def get_queryset(self):
execution_id = self.kwargs.get("pk")
return FileExecution.objects.filter(workflow_execution_id=execution_id)
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Generated by Django 4.2.1 on 2025-02-04 04:12

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
("workflow_v2", "0005_workflowexecution_tags"),
]

operations = [
migrations.AddIndex(
model_name="workflowexecution",
index=models.Index(
fields=["workflow_id", "-created_at"],
name="workflow_ex_workflo_5942c9_idx",
),
),
migrations.AddIndex(
model_name="workflowexecution",
index=models.Index(
fields=["pipeline_id", "-created_at"],
name="workflow_ex_pipelin_126dbf_idx",
),
),
]
7 changes: 5 additions & 2 deletions backend/workflow_manager/workflow_v2/models/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from .execution import WorkflowExecution # noqa: F401
# isort:skip_file

# Do not change the order of the imports below to avoid circular dependency issues
from .workflow import Workflow # noqa: F401
from .execution_log import ExecutionLog # noqa: F401
from .execution import WorkflowExecution # noqa: F401
from .file_history import FileHistory # noqa: F401
from .workflow import Workflow # noqa: F401
45 changes: 45 additions & 0 deletions backend/workflow_manager/workflow_v2/models/execution.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,17 @@
import logging
import uuid
from typing import Optional

from api_v2.models import APIDeployment
from django.core.exceptions import ObjectDoesNotExist
from django.db import models
from pipeline_v2.models import Pipeline
from tags.models import Tag
from utils.models.base_model import BaseModel
from workflow_manager.workflow_v2.models import Workflow

logger = logging.getLogger(__name__)


EXECUTION_ERROR_LENGTH = 256

Expand Down Expand Up @@ -32,6 +41,7 @@ class Type(models.TextChoices):
null=True,
db_comment="task id of asynchronous execution",
)
# TODO: Make as foreign key to access the instance directly
workflow_id = models.UUIDField(
editable=False, db_comment="Id of workflow to be executed"
)
Expand Down Expand Up @@ -68,12 +78,47 @@ class Meta:
verbose_name = "Workflow Execution"
verbose_name_plural = "Workflow Executions"
db_table = "workflow_execution"
indexes = [
models.Index(fields=["workflow_id", "-created_at"]),
models.Index(fields=["pipeline_id", "-created_at"]),
]

@property
def tag_names(self) -> list[str]:
"""Return a list of tag names associated with the workflow execution."""
return list(self.tags.values_list("name", flat=True))

@property
def workflow_name(self) -> Optional[str]:
"""Obtains the workflow's name associated to this execution."""
try:
return Workflow.objects.get(id=self.workflow_id).workflow_name
except ObjectDoesNotExist:
logger.warning(
f"Expected workflow ID '{self.workflow_id}' to exist but missing"
)
return None

@property
def pipeline_name(self) -> Optional[str]:
"""Obtains the pipeline's name associated to this execution.
It could be ETL / TASK / API pipeline, None returned if there's no such pipeline
"""
if not self.pipeline_id:
return None

try:
return APIDeployment.objects.get(id=self.pipeline_id).display_name
except ObjectDoesNotExist:
pass

try:
return Pipeline.objects.get(id=self.pipeline_id).pipeline_name
except ObjectDoesNotExist:
pass

return None

def __str__(self) -> str:
return (
f"Workflow execution: {self.id} ("
Expand Down
7 changes: 7 additions & 0 deletions backend/workflow_manager/workflow_v2/workflow_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,11 @@ def _process_file(
except Exception as e:
error = f"Error processing file '{os.path.basename(input_file)}'. {str(e)}"
execution_service.publish_log(error, level=LogLevel.ERROR)
workflow_file_execution.update_status(
status=ExecutionStatus.ERROR,
execution_error=error,
)
# Not propagating error here to continue execution for other files
execution_service.publish_update_log(
LogState.RUNNING,
f"Processing output for {file_name}",
Expand Down Expand Up @@ -369,6 +374,8 @@ def run_workflow(
)
raise
finally:
# TODO: Handle error gracefully during delete
# Mark status as an ERROR correctly
destination.delete_execution_directory()

@staticmethod
Expand Down

0 comments on commit cad3bd5

Please sign in to comment.