From 947d61ff58157fc9d03af8d6b6111c086d5fd8c8 Mon Sep 17 00:00:00 2001 From: Katrina Rogan Date: Fri, 25 Jun 2021 09:02:03 -0700 Subject: [PATCH] Store task execution event reason & task type (#211) --- flyteadmin/pkg/repositories/transformers/task_execution.go | 5 +++++ .../pkg/repositories/transformers/task_execution_test.go | 6 ++++++ 2 files changed, 11 insertions(+) diff --git a/flyteadmin/pkg/repositories/transformers/task_execution.go b/flyteadmin/pkg/repositories/transformers/task_execution.go index 3e91f5fec7..be3a2ef3e8 100644 --- a/flyteadmin/pkg/repositories/transformers/task_execution.go +++ b/flyteadmin/pkg/repositories/transformers/task_execution.go @@ -100,6 +100,8 @@ func CreateTaskExecutionModel(input CreateTaskExecutionModelInput) (*models.Task CreatedAt: input.Request.Event.OccurredAt, Logs: input.Request.Event.Logs, CustomInfo: input.Request.Event.CustomInfo, + Reason: input.Request.Event.Reason, + TaskType: input.Request.Event.TaskType, } eventPhase := input.Request.Event.Phase @@ -209,6 +211,9 @@ func UpdateTaskExecutionModel(request *admin.TaskExecutionEventRequest, taskExec taskExecutionClosure.Phase = request.Event.Phase taskExecutionClosure.UpdatedAt = request.Event.OccurredAt taskExecutionClosure.Logs = mergeLogs(taskExecutionClosure.Logs, request.Event.Logs) + if len(request.Event.Reason) > 0 { + taskExecutionClosure.Reason = request.Event.Reason + } if (existingTaskPhase == core.TaskExecution_QUEUED.String() || existingTaskPhase == core.TaskExecution_UNDEFINED.String()) && taskExecutionModel.Phase == core.TaskExecution_RUNNING.String() { err = addTaskStartedState(request, taskExecutionModel, &taskExecutionClosure) if err != nil { diff --git a/flyteadmin/pkg/repositories/transformers/task_execution_test.go b/flyteadmin/pkg/repositories/transformers/task_execution_test.go index 70f31a34e7..ee9ef01644 100644 --- a/flyteadmin/pkg/repositories/transformers/task_execution_test.go +++ b/flyteadmin/pkg/repositories/transformers/task_execution_test.go @@ -151,6 +151,8 @@ func TestCreateTaskExecutionModelQueued(t *testing.T) { RetryAttempt: 1, InputUri: "input uri", OccurredAt: taskEventOccurredAtProto, + Reason: "Task was scheduled", + TaskType: "sidecar", }, }, }) @@ -161,6 +163,8 @@ func TestCreateTaskExecutionModelQueued(t *testing.T) { StartedAt: nil, CreatedAt: taskEventOccurredAtProto, UpdatedAt: taskEventOccurredAtProto, + Reason: "Task was scheduled", + TaskType: "sidecar", } expectedClosureBytes, err := proto.Marshal(expectedClosure) @@ -350,6 +354,7 @@ func TestUpdateTaskExecutionModelRunningToFailed(t *testing.T) { CustomInfo: transformMapToStructPB(t, map[string]string{ "key1": "value1 updated", }), + Reason: "task failed", }, } @@ -379,6 +384,7 @@ func TestUpdateTaskExecutionModelRunningToFailed(t *testing.T) { CustomInfo: transformMapToStructPB(t, map[string]string{ "key1": "value1 updated", }), + Reason: "task failed", } expectedClosureBytes, err := proto.Marshal(expectedClosure)