From 8059d6df1a50edf93d1c4066da658951eee829cb Mon Sep 17 00:00:00 2001 From: Katrina Rogan Date: Tue, 29 Sep 2020 15:15:59 -0700 Subject: [PATCH] Add TaskExecutionMetadata to TaskExecutionEvent (#190) --- go.mod | 2 +- go.sum | 10 ++-------- pkg/controller/nodes/task/handler.go | 9 +++++---- pkg/controller/nodes/task/handler_test.go | 3 +++ pkg/controller/nodes/task/transformer.go | 9 ++++++++- pkg/controller/nodes/task/transformer_test.go | 17 ++++++++++++++--- 6 files changed, 33 insertions(+), 17 deletions(-) diff --git a/go.mod b/go.mod index cb9d983d55..d427cc443f 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,7 @@ require ( github.com/grpc-ecosystem/grpc-gateway v1.14.3 // indirect github.com/imdario/mergo v0.3.8 // indirect github.com/lyft/datacatalog v0.2.1 - github.com/lyft/flyteidl v0.18.6 + github.com/lyft/flyteidl v0.18.7 github.com/lyft/flyteplugins v0.5.1 github.com/lyft/flytestdlib v0.3.9 github.com/magiconair/properties v1.8.1 diff --git a/go.sum b/go.sum index c3f88fa08a..48950afd7a 100644 --- a/go.sum +++ b/go.sum @@ -397,14 +397,8 @@ github.com/lyft/datacatalog v0.2.1 h1:W7LsAjaS297iLCtSH9ZaAAG3YPofwkbbgIaqkfdeM0 github.com/lyft/datacatalog v0.2.1/go.mod h1:ktrPvzTDUwHO5Lv0hLH38zLHnOJ++rGoAO0iQ/sIPJ4= github.com/lyft/flyteidl v0.17.0/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20= github.com/lyft/flyteidl v0.18.0/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20= -github.com/lyft/flyteidl v0.18.1 h1:COKkZi5k6bQvUYOk5gE70+FJX9/NUn0WOQ1uMrw3Qio= -github.com/lyft/flyteidl v0.18.1/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20= -github.com/lyft/flyteidl v0.18.6 h1:HGbxHI8avEDvoPqcO2+/BoJVcP9sjOj4qwJ/wNRWuoA= -github.com/lyft/flyteidl v0.18.6/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20= -github.com/lyft/flyteplugins v0.4.7-0.20200904001213-41861229003a h1:NcURTOidN/PUOMRSKvaKMJmSdGcdLJHPwZFmpSX+KVk= -github.com/lyft/flyteplugins v0.4.7-0.20200904001213-41861229003a/go.mod h1:8zhqFG9BzbHNQGEXzGYltTJLD+KTmQZkanxXgeFI25c= -github.com/lyft/flyteplugins v0.4.8 h1:fGihSTfOw1LEGuwh7QfRp3l2dT28DfGFAu2d0axr8Q4= -github.com/lyft/flyteplugins v0.4.8/go.mod h1:8zhqFG9BzbHNQGEXzGYltTJLD+KTmQZkanxXgeFI25c= +github.com/lyft/flyteidl v0.18.7 h1:R8gSt2Tze9BlHbFHZPDPWl630272US+MbSjqoeVkflg= +github.com/lyft/flyteidl v0.18.7/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20= github.com/lyft/flyteplugins v0.5.1 h1:76FpQFohLCy4Eo490sES2empRAi31DJiERfbOSV9pCg= github.com/lyft/flyteplugins v0.5.1/go.mod h1:8zhqFG9BzbHNQGEXzGYltTJLD+KTmQZkanxXgeFI25c= github.com/lyft/flytestdlib v0.3.0/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU= diff --git a/pkg/controller/nodes/task/handler.go b/pkg/controller/nodes/task/handler.go index b25484adc1..b92dc30396 100644 --- a/pkg/controller/nodes/task/handler.go +++ b/pkg/controller/nodes/task/handler.go @@ -112,12 +112,13 @@ func (p *pluginRequestedTransition) TransitionPreviouslyRecorded() { p.previouslyObserved = true } -func (p *pluginRequestedTransition) FinalTaskEvent(id *core.TaskExecutionIdentifier, in io.InputFilePaths, out io.OutputFilePaths) (*event.TaskExecutionEvent, error) { +func (p *pluginRequestedTransition) FinalTaskEvent(id *core.TaskExecutionIdentifier, in io.InputFilePaths, out io.OutputFilePaths, + nodeExecutionMetadata handler.NodeExecutionMetadata) (*event.TaskExecutionEvent, error) { if p.previouslyObserved { return nil, nil } - return ToTaskExecutionEvent(id, in, out, p.pInfo) + return ToTaskExecutionEvent(id, in, out, p.pInfo, nodeExecutionMetadata) } func (p *pluginRequestedTransition) ObserveSuccess(outputPath storage.DataReference, taskMetadata *event.TaskNodeMetadata) { @@ -542,7 +543,7 @@ func (t Handler) Handle(ctx context.Context, nCtx handler.NodeExecutionContext) // STEP 4: Send buffered events! logger.Debugf(ctx, "Sending buffered Task events.") for _, ev := range tCtx.ber.GetAll(ctx) { - evInfo, err := ToTaskExecutionEvent(&execID, nCtx.InputReader(), tCtx.ow, ev) + evInfo, err := ToTaskExecutionEvent(&execID, nCtx.InputReader(), tCtx.ow, ev, nCtx.NodeExecutionMetadata()) if err != nil { return handler.UnknownTransition, err } @@ -556,7 +557,7 @@ func (t Handler) Handle(ctx context.Context, nCtx handler.NodeExecutionContext) // STEP 5: Send Transition events logger.Debugf(ctx, "Sending transition event for plugin phase [%s]", pluginTrns.pInfo.Phase().String()) - evInfo, err := pluginTrns.FinalTaskEvent(&execID, nCtx.InputReader(), tCtx.ow) + evInfo, err := pluginTrns.FinalTaskEvent(&execID, nCtx.InputReader(), tCtx.ow, nCtx.NodeExecutionMetadata()) if err != nil { logger.Errorf(ctx, "failed to convert plugin transition to TaskExecutionEvent. Error: %s", err.Error()) return handler.UnknownTransition, err diff --git a/pkg/controller/nodes/task/handler_test.go b/pkg/controller/nodes/task/handler_test.go index 129101d4b1..6b74e9899c 100644 --- a/pkg/controller/nodes/task/handler_test.go +++ b/pkg/controller/nodes/task/handler_test.go @@ -359,6 +359,7 @@ func Test_task_Handle_NoCatalog(t *testing.T) { Kind: "sample", Name: "name", }) + nm.OnIsInterruptible().Return(false) tk := &core.TaskTemplate{ Id: &core.Identifier{ResourceType: core.ResourceType_TASK, Project: "proj", Domain: "dom", Version: "ver"}, @@ -680,6 +681,7 @@ func Test_task_Handle_Catalog(t *testing.T) { Kind: "sample", Name: "name", }) + nm.OnIsInterruptible().Return(true) taskID := &core.Identifier{} tk := &core.TaskTemplate{ @@ -902,6 +904,7 @@ func Test_task_Handle_Barrier(t *testing.T) { Kind: "sample", Name: "name", }) + nm.OnIsInterruptible().Return(true) taskID := &core.Identifier{} tk := &core.TaskTemplate{ diff --git a/pkg/controller/nodes/task/transformer.go b/pkg/controller/nodes/task/transformer.go index 3e51b805ee..c762e24a9f 100644 --- a/pkg/controller/nodes/task/transformer.go +++ b/pkg/controller/nodes/task/transformer.go @@ -50,7 +50,8 @@ func trimErrorMessage(original string, maxLength int) string { return original[0:maxLength/2] + original[len(original)-maxLength/2:] } -func ToTaskExecutionEvent(taskExecID *core.TaskExecutionIdentifier, in io.InputFilePaths, out io.OutputFilePaths, info pluginCore.PhaseInfo) (*event.TaskExecutionEvent, error) { +func ToTaskExecutionEvent(taskExecID *core.TaskExecutionIdentifier, in io.InputFilePaths, out io.OutputFilePaths, info pluginCore.PhaseInfo, + nodeExecutionMetadata handler.NodeExecutionMetadata) (*event.TaskExecutionEvent, error) { // Transitions to a new phase tm := ptypes.TimestampNow() @@ -90,6 +91,12 @@ func ToTaskExecutionEvent(taskExecID *core.TaskExecutionIdentifier, in io.InputF tev.CustomInfo = info.Info().CustomInfo } + if nodeExecutionMetadata.IsInterruptible() { + tev.Metadata = &event.TaskExecutionMetadata{InstanceClass: event.TaskExecutionMetadata_INTERRUPTIBLE} + } else { + tev.Metadata = &event.TaskExecutionMetadata{InstanceClass: event.TaskExecutionMetadata_DEFAULT} + } + return tev, nil } diff --git a/pkg/controller/nodes/task/transformer_test.go b/pkg/controller/nodes/task/transformer_test.go index 618417f273..915115c736 100644 --- a/pkg/controller/nodes/task/transformer_test.go +++ b/pkg/controller/nodes/task/transformer_test.go @@ -4,6 +4,8 @@ import ( "testing" "time" + "github.com/lyft/flyteidl/gen/pb-go/flyteidl/event" + "github.com/golang/protobuf/ptypes" structpb "github.com/golang/protobuf/ptypes/struct" "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" @@ -13,6 +15,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/lyft/flytepropeller/pkg/controller/nodes/handler" + handlerMocks "github.com/lyft/flytepropeller/pkg/controller/nodes/handler/mocks" ) func TestToTaskEventPhase(t *testing.T) { @@ -68,7 +71,10 @@ func TestToTaskExecutionEvent(t *testing.T) { const outputPath = "out" out.On("GetOutputPath").Return(storage.DataReference(outputPath)) - tev, err := ToTaskExecutionEvent(id, in, out, pluginCore.PhaseInfoWaitingForResources(n, 0, "reason")) + nodeExecutionMetadata := handlerMocks.NodeExecutionMetadata{} + nodeExecutionMetadata.OnIsInterruptible().Return(true) + tev, err := ToTaskExecutionEvent(id, in, out, pluginCore.PhaseInfoWaitingForResources(n, 0, "reason"), + &nodeExecutionMetadata) assert.NoError(t, err) assert.Nil(t, tev.Logs) assert.Equal(t, core.TaskExecution_WAITING_FOR_RESOURCES, tev.Phase) @@ -78,6 +84,7 @@ func TestToTaskExecutionEvent(t *testing.T) { assert.Equal(t, nodeID, tev.ParentNodeExecutionId) assert.Equal(t, inputPath, tev.InputUri) assert.Nil(t, tev.OutputResult) + assert.Equal(t, event.TaskExecutionMetadata_INTERRUPTIBLE, tev.Metadata.InstanceClass) l := []*core.TaskLog{ {Uri: "x", Name: "y", MessageFormat: core.TaskLog_JSON}, @@ -87,7 +94,7 @@ func TestToTaskExecutionEvent(t *testing.T) { OccurredAt: &n, Logs: l, CustomInfo: c, - })) + }), &nodeExecutionMetadata) assert.NoError(t, err) assert.Equal(t, core.TaskExecution_RUNNING, tev.Phase) assert.Equal(t, uint32(1), tev.PhaseVersion) @@ -98,12 +105,15 @@ func TestToTaskExecutionEvent(t *testing.T) { assert.Equal(t, nodeID, tev.ParentNodeExecutionId) assert.Equal(t, inputPath, tev.InputUri) assert.Nil(t, tev.OutputResult) + assert.Equal(t, event.TaskExecutionMetadata_INTERRUPTIBLE, tev.Metadata.InstanceClass) + defaultNodeExecutionMetadata := handlerMocks.NodeExecutionMetadata{} + defaultNodeExecutionMetadata.OnIsInterruptible().Return(false) tev, err = ToTaskExecutionEvent(id, in, out, pluginCore.PhaseInfoSuccess(&pluginCore.TaskInfo{ OccurredAt: &n, Logs: l, CustomInfo: c, - })) + }), &defaultNodeExecutionMetadata) assert.NoError(t, err) assert.Equal(t, core.TaskExecution_SUCCEEDED, tev.Phase) assert.Equal(t, uint32(0), tev.PhaseVersion) @@ -116,6 +126,7 @@ func TestToTaskExecutionEvent(t *testing.T) { assert.NotNil(t, tev.OutputResult) assert.Equal(t, inputPath, tev.InputUri) assert.Equal(t, outputPath, tev.GetOutputUri()) + assert.Empty(t, event.TaskExecutionMetadata_DEFAULT, tev.Metadata.InstanceClass) } func TestToTransitionType(t *testing.T) {