diff --git a/flytepropeller/pkg/controller/nodes/array/event_recorder.go b/flytepropeller/pkg/controller/nodes/array/event_recorder.go index c4e3004011..c2e0c96ed8 100644 --- a/flytepropeller/pkg/controller/nodes/array/event_recorder.go +++ b/flytepropeller/pkg/controller/nodes/array/event_recorder.go @@ -3,21 +3,24 @@ package array import ( "context" "fmt" + "strconv" "time" "github.com/golang/protobuf/ptypes" idlcore "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/event" + "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/encoding" "github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" "github.com/flyteorg/flyte/flytepropeller/pkg/controller/config" "github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/common" "github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/interfaces" + "github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/task" ) type arrayEventRecorder interface { interfaces.EventRecorder - process(ctx context.Context, nCtx interfaces.NodeExecutionContext, index int, retryAttempt uint32) + process(ctx context.Context, nCtx interfaces.NodeExecutionContext, index int, retryAttempt uint32) error finalize(ctx context.Context, nCtx interfaces.NodeExecutionContext, taskPhase idlcore.TaskExecution_Phase, taskPhaseVersion uint32, eventConfig *config.EventConfig) error finalizeRequired(ctx context.Context) bool } @@ -39,8 +42,23 @@ func (e *externalResourcesEventRecorder) RecordTaskEvent(ctx context.Context, ev return nil } -func (e *externalResourcesEventRecorder) process(ctx context.Context, nCtx interfaces.NodeExecutionContext, index int, retryAttempt uint32) { - externalResourceID := fmt.Sprintf("%s-%d", buildSubNodeID(nCtx, index), retryAttempt) +func (e *externalResourcesEventRecorder) process(ctx context.Context, nCtx interfaces.NodeExecutionContext, index int, retryAttempt uint32) error { + // generate externalResourceID + currentNodeUniqueID := nCtx.NodeID() + if nCtx.ExecutionContext().GetEventVersion() != v1alpha1.EventVersion0 { + var err error + currentNodeUniqueID, err = common.GenerateUniqueID(nCtx.ExecutionContext().GetParentInfo(), nCtx.NodeID()) + if err != nil { + return err + } + } + + uniqueID, err := encoding.FixedLengthUniqueIDForParts(task.IDMaxLength, []string{nCtx.NodeExecutionMetadata().GetOwnerID().Name, currentNodeUniqueID, strconv.Itoa(int(retryAttempt))}) + if err != nil { + return err + } + + externalResourceID := fmt.Sprintf("%s-n%d-%d", uniqueID, index, retryAttempt) // process events cacheStatus := idlcore.CatalogCacheStatus_CACHE_DISABLED @@ -83,6 +101,8 @@ func (e *externalResourcesEventRecorder) process(ctx context.Context, nCtx inter // clear nodeEvents and taskEvents e.nodeEvents = e.nodeEvents[:0] e.taskEvents = e.taskEvents[:0] + + return nil } func (e *externalResourcesEventRecorder) finalize(ctx context.Context, nCtx interfaces.NodeExecutionContext, @@ -94,6 +114,17 @@ func (e *externalResourcesEventRecorder) finalize(ctx context.Context, nCtx inte return err } + var taskID *idlcore.Identifier + subNode := nCtx.Node().GetArrayNode().GetSubNodeSpec() + if subNode != nil && subNode.Kind == v1alpha1.NodeKindTask { + executableTask, err := nCtx.ExecutionContext().GetTask(*subNode.GetTaskID()) + if err != nil { + return err + } + + taskID = executableTask.CoreTask().GetId() + } + nodeExecutionID := *nCtx.NodeExecutionMetadata().GetNodeExecutionID() if nCtx.ExecutionContext().GetEventVersion() != v1alpha1.EventVersion0 { currentNodeUniqueID, err := common.GenerateUniqueID(nCtx.ExecutionContext().GetParentInfo(), nodeExecutionID.NodeId) @@ -103,16 +134,8 @@ func (e *externalResourcesEventRecorder) finalize(ctx context.Context, nCtx inte nodeExecutionID.NodeId = currentNodeUniqueID } - workflowExecutionID := nodeExecutionID.ExecutionId - taskExecutionEvent := &event.TaskExecutionEvent{ - TaskId: &idlcore.Identifier{ - ResourceType: idlcore.ResourceType_TASK, - Project: workflowExecutionID.Project, - Domain: workflowExecutionID.Domain, - Name: nCtx.NodeID(), - Version: "v1", // this value is irrelevant but necessary for the identifier to be valid - }, + TaskId: taskID, ParentNodeExecutionId: &nodeExecutionID, RetryAttempt: 0, // ArrayNode will never retry Phase: taskPhase, @@ -120,9 +143,9 @@ func (e *externalResourcesEventRecorder) finalize(ctx context.Context, nCtx inte OccurredAt: occurredAt, Metadata: &event.TaskExecutionMetadata{ ExternalResources: e.externalResources, - PluginIdentifier: "container", + PluginIdentifier: "k8s-array", }, - TaskType: "k8s-array", + TaskType: "container_array", EventVersion: 1, } @@ -165,7 +188,8 @@ type passThroughEventRecorder struct { interfaces.EventRecorder } -func (*passThroughEventRecorder) process(ctx context.Context, nCtx interfaces.NodeExecutionContext, index int, retryAttempt uint32) { +func (*passThroughEventRecorder) process(ctx context.Context, nCtx interfaces.NodeExecutionContext, index int, retryAttempt uint32) error { + return nil } func (*passThroughEventRecorder) finalize(ctx context.Context, nCtx interfaces.NodeExecutionContext, diff --git a/flytepropeller/pkg/controller/nodes/array/handler.go b/flytepropeller/pkg/controller/nodes/array/handler.go index 06a693334e..72b2c511eb 100644 --- a/flytepropeller/pkg/controller/nodes/array/handler.go +++ b/flytepropeller/pkg/controller/nodes/array/handler.go @@ -98,7 +98,9 @@ func (a *arrayNodeHandler) Abort(ctx context.Context, nCtx interfaces.NodeExecut logger.Warnf(ctx, "failed to record ArrayNode events: %v", err) } - eventRecorder.process(ctx, nCtx, i, retryAttempt) + if err := eventRecorder.process(ctx, nCtx, i, retryAttempt); err != nil { + logger.Warnf(ctx, "failed to record ArrayNode events: %v", err) + } } } } @@ -241,7 +243,9 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu logger.Warnf(ctx, "failed to record ArrayNode events: %v", err) } - eventRecorder.process(ctx, nCtx, i, 0) + if err := eventRecorder.process(ctx, nCtx, i, 0); err != nil { + logger.Warnf(ctx, "failed to record ArrayNode events: %v", err) + } } // transition ArrayNode to `ArrayNodePhaseExecuting` @@ -331,7 +335,9 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu } } } - eventRecorder.process(ctx, nCtx, index, subNodeStatus.GetAttempts()) + if err := eventRecorder.process(ctx, nCtx, index, subNodeStatus.GetAttempts()); err != nil { + return handler.UnknownTransition, err + } // update subNode state arrayNodeState.SubNodePhases.SetItem(index, uint64(subNodeStatus.GetPhase())) diff --git a/flytepropeller/pkg/controller/nodes/array/handler_test.go b/flytepropeller/pkg/controller/nodes/array/handler_test.go index b0328250ab..f4107c1f11 100644 --- a/flytepropeller/pkg/controller/nodes/array/handler_test.go +++ b/flytepropeller/pkg/controller/nodes/array/handler_test.go @@ -7,6 +7,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "k8s.io/apimachinery/pkg/types" idlcore "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/event" @@ -145,6 +146,10 @@ func createNodeExecutionContext(dataStore *storage.DataStore, eventRecorder inte Name: "name", }, }) + nodeExecutionMetadata.OnGetOwnerID().Return(types.NamespacedName{ + Namespace: "wf-namespace", + Name: "wf-name", + }) nCtx.OnNodeExecutionMetadata().Return(nodeExecutionMetadata) // NodeID diff --git a/flytepropeller/pkg/controller/nodes/array/mocks/array_event_recorder.go b/flytepropeller/pkg/controller/nodes/array/mocks/array_event_recorder.go index b51ba86931..7e235daa25 100644 --- a/flytepropeller/pkg/controller/nodes/array/mocks/array_event_recorder.go +++ b/flytepropeller/pkg/controller/nodes/array/mocks/array_event_recorder.go @@ -149,7 +149,34 @@ func (_m *arrayEventRecorder) finalizeRequired(ctx context.Context) bool { return r0 } +type arrayEventRecorder_process struct { + *mock.Call +} + +func (_m arrayEventRecorder_process) Return(_a0 error) *arrayEventRecorder_process { + return &arrayEventRecorder_process{Call: _m.Call.Return(_a0)} +} + +func (_m *arrayEventRecorder) Onprocess(ctx context.Context, nCtx interfaces.NodeExecutionContext, index int, retryAttempt uint32) *arrayEventRecorder_process { + c_call := _m.On("process", ctx, nCtx, index, retryAttempt) + return &arrayEventRecorder_process{Call: c_call} +} + +func (_m *arrayEventRecorder) OnprocessMatch(matchers ...interface{}) *arrayEventRecorder_process { + c_call := _m.On("process", matchers...) + return &arrayEventRecorder_process{Call: c_call} +} + // process provides a mock function with given fields: ctx, nCtx, index, retryAttempt -func (_m *arrayEventRecorder) process(ctx context.Context, nCtx interfaces.NodeExecutionContext, index int, retryAttempt uint32) { - _m.Called(ctx, nCtx, index, retryAttempt) +func (_m *arrayEventRecorder) process(ctx context.Context, nCtx interfaces.NodeExecutionContext, index int, retryAttempt uint32) error { + ret := _m.Called(ctx, nCtx, index, retryAttempt) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, interfaces.NodeExecutionContext, int, uint32) error); ok { + r0 = rf(ctx, nCtx, index, retryAttempt) + } else { + r0 = ret.Error(0) + } + + return r0 }