From d76eb152eb36b9a77887985ab0ff3be923261bfb Mon Sep 17 00:00:00 2001 From: Sean Lin Date: Thu, 2 Sep 2021 11:16:03 -0700 Subject: [PATCH] Adopt flyteidl's ordered variable map change (#198) --- go.mod | 2 +- go.sum | 4 +- .../pluginmachinery/flytek8s/copilot_test.go | 182 ++++++++++++++---- go/tasks/pluginmachinery/utils/task.go | 25 +++ go/tasks/pluginmachinery/utils/task_test.go | 54 ++++++ go/tasks/plugins/array/array_tests_base.go | 26 ++- .../array/awsbatch/job_definition_test.go | 18 +- go/tasks/plugins/array/catalog.go | 10 +- go/tasks/plugins/array/catalog_test.go | 15 +- go/tasks/plugins/array/outputs.go | 4 +- go/tasks/plugins/array/outputs_test.go | 9 +- go/tasks/plugins/hive/execution_state.go | 9 +- go/tasks/plugins/hive/test_helpers.go | 11 +- go/tasks/plugins/k8s/sagemaker/outputs.go | 5 +- .../k8s/sagemaker/plugin_test_utils.go | 26 ++- go/tasks/plugins/presto/execution_state.go | 6 +- go/tasks/plugins/webapi/athena/utils.go | 8 +- go/tasks/plugins/webapi/athena/utils_test.go | 37 ++-- 18 files changed, 340 insertions(+), 111 deletions(-) create mode 100644 go/tasks/pluginmachinery/utils/task.go create mode 100644 go/tasks/pluginmachinery/utils/task_test.go diff --git a/go.mod b/go.mod index 3c591277ae..fe0cd344d6 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/aws/aws-sdk-go-v2/config v1.0.0 github.com/aws/aws-sdk-go-v2/service/athena v1.0.0 github.com/coocood/freecache v1.1.1 - github.com/flyteorg/flyteidl v0.19.2 + github.com/flyteorg/flyteidl v0.20.0 github.com/flyteorg/flytestdlib v0.3.33 github.com/go-logr/zapr v0.4.0 // indirect github.com/go-test/deep v1.0.7 diff --git a/go.sum b/go.sum index 1f7d36a9c8..db496ddacb 100644 --- a/go.sum +++ b/go.sum @@ -226,8 +226,8 @@ github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5Kwzbycv github.com/fatih/color v1.10.0 h1:s36xzo75JdqLaaWoiEHk767eHiwo0598uUxyfiPkDsg= github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM= github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94= -github.com/flyteorg/flyteidl v0.19.2 h1:jXuRrLJEzSo33N9pw7bMEd6mRYSL7LCz/vnazz5XcOg= -github.com/flyteorg/flyteidl v0.19.2/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U= +github.com/flyteorg/flyteidl v0.20.0 h1:g5xGayFfPSzFJxJedgL390WFSEbGYjFiPey+NXAB030= +github.com/flyteorg/flyteidl v0.20.0/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U= github.com/flyteorg/flytestdlib v0.3.13/go.mod h1:Tz8JCECAbX6VWGwFT6cmEQ+RJpZ/6L9pswu3fzWs220= github.com/flyteorg/flytestdlib v0.3.33 h1:+oCx3zXUIldL7CWmNMD7PMFPXvGqaPgYkSKn9wB6qvY= github.com/flyteorg/flytestdlib v0.3.33/go.mod h1:7cDWkY3v7xsoesFcDdu6DSW5Q2U2W5KlHUbUHSwBG1Q= diff --git a/go/tasks/pluginmachinery/flytek8s/copilot_test.go b/go/tasks/pluginmachinery/flytek8s/copilot_test.go index 3c6803cacf..38d77d83a6 100644 --- a/go/tasks/pluginmachinery/flytek8s/copilot_test.go +++ b/go/tasks/pluginmachinery/flytek8s/copilot_test.go @@ -102,9 +102,19 @@ func TestDownloadCommandArgs(t *testing.T) { assert.Error(t, err) iFace := &core.VariableMap{ - Variables: map[string]*core.Variable{ - "x": {Type: &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_INTEGER}}}, - "y": {Type: &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_INTEGER}}}, + Variables: []*core.VariableMapEntry{ + { + Name: "x", + Var: &core.Variable{ + Type: &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_INTEGER}}, + }, + }, + { + Name: "y", + Var: &core.Variable{ + Type: &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_INTEGER}}, + }, + }, }, } d, err := DownloadCommandArgs("s3://from", "s3://output-meta", "/to", core.DataLoadingConfig_JSON, iFace) @@ -121,10 +131,10 @@ func TestDownloadCommandArgs(t *testing.T) { vm := &core.VariableMap{} assert.NoError(t, proto.Unmarshal(serIFaceBytes, vm)) assert.Len(t, vm.Variables, 2) - for k, v := range iFace.Variables { - v2, ok := vm.Variables[k] - assert.True(t, ok) - assert.Equal(t, v.Type.GetSimple(), v2.Type.GetSimple(), "for %s, types do not match", k) + for i, v := range iFace.Variables { + v2 := vm.Variables[i] + assert.Equal(t, v.Name, v2.Name, "for index %d, keys do not match", i) + assert.Equal(t, v.Var.Type.GetSimple(), v2.Var.Type.GetSimple(), "for %s, types do not match", v.Name) } } } @@ -136,9 +146,19 @@ func TestSidecarCommandArgs(t *testing.T) { iFace := &core.TypedInterface{ Outputs: &core.VariableMap{ - Variables: map[string]*core.Variable{ - "x": {Type: &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_INTEGER}}}, - "y": {Type: &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_INTEGER}}}, + Variables: []*core.VariableMapEntry{ + { + Name: "x", + Var: &core.Variable{ + Type: &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_INTEGER}}, + }, + }, + { + Name: "y", + Var: &core.Variable{ + Type: &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_INTEGER}}, + }, + }, }, }, } @@ -156,10 +176,10 @@ func TestSidecarCommandArgs(t *testing.T) { if2 := &core.TypedInterface{} assert.NoError(t, proto.Unmarshal(serIFaceBytes, if2)) assert.Len(t, if2.Outputs.Variables, 2) - for k, v := range iFace.Outputs.Variables { - v2, ok := if2.Outputs.Variables[k] - assert.True(t, ok) - assert.Equal(t, v.Type.GetSimple(), v2.Type.GetSimple(), "for %s, types do not match", k) + for i, v := range iFace.Outputs.Variables { + v2 := if2.Outputs.Variables[i] + assert.Equal(t, v.Name, v2.Name, "for index %d, keys do not match", i) + assert.Equal(t, v.Var.Type.GetSimple(), v2.Var.Type.GetSimple(), "for %s, types do not match", v.Name) } } } @@ -358,14 +378,29 @@ func TestAddCoPilotToContainer(t *testing.T) { c := v1.Container{} iface := &core.TypedInterface{ Inputs: &core.VariableMap{ - Variables: map[string]*core.Variable{ - "x": {Type: &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_INTEGER}}}, - "y": {Type: &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_INTEGER}}}, + Variables: []*core.VariableMapEntry{ + { + Name: "x", + Var: &core.Variable{ + Type: &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_INTEGER}}, + }, + }, + { + Name: "y", + Var: &core.Variable{ + Type: &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_INTEGER}}, + }, + }, }, }, Outputs: &core.VariableMap{ - Variables: map[string]*core.Variable{ - "o": {Type: &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_INTEGER}}}, + Variables: []*core.VariableMapEntry{ + { + Name: "o", + Var: &core.Variable{ + Type: &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_INTEGER}}, + }, + }, }, }, } @@ -380,14 +415,29 @@ func TestAddCoPilotToContainer(t *testing.T) { c := v1.Container{} iface := &core.TypedInterface{ Inputs: &core.VariableMap{ - Variables: map[string]*core.Variable{ - "x": {Type: &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_INTEGER}}}, - "y": {Type: &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_INTEGER}}}, + Variables: []*core.VariableMapEntry{ + { + Name: "x", + Var: &core.Variable{ + Type: &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_INTEGER}}, + }, + }, + { + Name: "y", + Var: &core.Variable{ + Type: &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_INTEGER}}, + }, + }, }, }, Outputs: &core.VariableMap{ - Variables: map[string]*core.Variable{ - "o": {Type: &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_INTEGER}}}, + Variables: []*core.VariableMapEntry{ + { + Name: "o", + Var: &core.Variable{ + Type: &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_INTEGER}}, + }, + }, }, }, } @@ -406,9 +456,19 @@ func TestAddCoPilotToContainer(t *testing.T) { c := v1.Container{} iface := &core.TypedInterface{ Inputs: &core.VariableMap{ - Variables: map[string]*core.Variable{ - "x": {Type: &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_INTEGER}}}, - "y": {Type: &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_INTEGER}}}, + Variables: []*core.VariableMapEntry{ + { + Name: "x", + Var: &core.Variable{ + Type: &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_INTEGER}}, + }, + }, + { + Name: "y", + Var: &core.Variable{ + Type: &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_INTEGER}}, + }, + }, }, }, } @@ -427,8 +487,13 @@ func TestAddCoPilotToContainer(t *testing.T) { c := v1.Container{} iface := &core.TypedInterface{ Outputs: &core.VariableMap{ - Variables: map[string]*core.Variable{ - "o": {Type: &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_INTEGER}}}, + Variables: []*core.VariableMapEntry{ + { + Name: "o", + Var: &core.Variable{ + Type: &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_INTEGER}}, + }, + }, }, }, } @@ -507,14 +572,29 @@ func TestAddCoPilotToPod(t *testing.T) { pod := v1.PodSpec{} iface := &core.TypedInterface{ Inputs: &core.VariableMap{ - Variables: map[string]*core.Variable{ - "x": {Type: &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_INTEGER}}}, - "y": {Type: &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_INTEGER}}}, + Variables: []*core.VariableMapEntry{ + { + Name: "x", + Var: &core.Variable{ + Type: &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_INTEGER}}, + }, + }, + { + Name: "y", + Var: &core.Variable{ + Type: &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_INTEGER}}, + }, + }, }, }, Outputs: &core.VariableMap{ - Variables: map[string]*core.Variable{ - "o": {Type: &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_INTEGER}}}, + Variables: []*core.VariableMapEntry{ + { + Name: "o", + Var: &core.Variable{ + Type: &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_INTEGER}}, + }, + }, }, }, } @@ -544,9 +624,19 @@ func TestAddCoPilotToPod(t *testing.T) { pod := v1.PodSpec{} iface := &core.TypedInterface{ Inputs: &core.VariableMap{ - Variables: map[string]*core.Variable{ - "x": {Type: &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_INTEGER}}}, - "y": {Type: &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_INTEGER}}}, + Variables: []*core.VariableMapEntry{ + { + Name: "x", + Var: &core.Variable{ + Type: &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_INTEGER}}, + }, + }, + { + Name: "y", + Var: &core.Variable{ + Type: &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_INTEGER}}, + }, + }, }, }, } @@ -564,8 +654,13 @@ func TestAddCoPilotToPod(t *testing.T) { pod := v1.PodSpec{} iface := &core.TypedInterface{ Outputs: &core.VariableMap{ - Variables: map[string]*core.Variable{ - "o": {Type: &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_INTEGER}}}, + Variables: []*core.VariableMapEntry{ + { + Name: "o", + Var: &core.Variable{ + Type: &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_INTEGER}}, + }, + }, }, }, } @@ -583,8 +678,13 @@ func TestAddCoPilotToPod(t *testing.T) { pod := v1.PodSpec{} iface := &core.TypedInterface{ Outputs: &core.VariableMap{ - Variables: map[string]*core.Variable{ - "o": {Type: &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_INTEGER}}}, + Variables: []*core.VariableMapEntry{ + { + Name: "o", + Var: &core.Variable{ + Type: &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_INTEGER}}, + }, + }, }, }, } diff --git a/go/tasks/pluginmachinery/utils/task.go b/go/tasks/pluginmachinery/utils/task.go new file mode 100644 index 0000000000..27f27010a8 --- /dev/null +++ b/go/tasks/pluginmachinery/utils/task.go @@ -0,0 +1,25 @@ +package utils + +import "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" + +// This function extracts the variable named "results" from the TaskTemplate +func GetResultsVariable(taskTemplate *core.TaskTemplate) (results *core.Variable, exists bool) { + if taskTemplate == nil { + return nil, false + } + if taskTemplate.Interface == nil { + return nil, false + } + if taskTemplate.Interface.Outputs == nil { + return nil, false + } + if taskTemplate.Interface.Outputs.Variables == nil { + return nil, false + } + for _, e := range taskTemplate.Interface.Outputs.Variables { + if e.Name == "results" { + return e.Var, true + } + } + return nil, false +} diff --git a/go/tasks/pluginmachinery/utils/task_test.go b/go/tasks/pluginmachinery/utils/task_test.go new file mode 100644 index 0000000000..896604d2fc --- /dev/null +++ b/go/tasks/pluginmachinery/utils/task_test.go @@ -0,0 +1,54 @@ +package utils + +import ( + "testing" + + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" + "github.com/stretchr/testify/assert" +) + +func TestGetResultsVariable(t *testing.T) { + t.Run("variable not found with nil pointer", func(t *testing.T) { + emptyTaskTemplate := &core.TaskTemplate{ + Interface: nil, + } + _, found := GetResultsVariable(emptyTaskTemplate) + assert.False(t, found) + }) + + t.Run("variable not found", func(t *testing.T) { + taskTemplate := &core.TaskTemplate{ + Interface: &core.TypedInterface{ + Outputs: &core.VariableMap{ + Variables: []*core.VariableMapEntry{ + { + Name: "o0", + }, + }, + }, + }, + } + _, found := GetResultsVariable(taskTemplate) + assert.False(t, found) + }) + + t.Run("happy case", func(t *testing.T) { + taskTemplate := &core.TaskTemplate{ + Interface: &core.TypedInterface{ + Outputs: &core.VariableMap{ + Variables: []*core.VariableMapEntry{ + { + Name: "results", + Var: &core.Variable{ + Description: "athena result", + }, + }, + }, + }, + }, + } + v, found := GetResultsVariable(taskTemplate) + assert.True(t, found) + assert.Equal(t, "athena result", v.Description) + }) +} diff --git a/go/tasks/plugins/array/array_tests_base.go b/go/tasks/plugins/array/array_tests_base.go index dd8551bc01..ebf4067294 100644 --- a/go/tasks/plugins/array/array_tests_base.go +++ b/go/tasks/plugins/array/array_tests_base.go @@ -36,11 +36,14 @@ func RunArrayTestsEndToEnd(t *testing.T, executor core.Plugin, iter AdvanceItera template.Interface = &idlCore.TypedInterface{ Inputs: nil, Outputs: &idlCore.VariableMap{ - Variables: map[string]*idlCore.Variable{ - "x": { - Type: &idlCore.LiteralType{ - Type: &idlCore.LiteralType_CollectionType{ - CollectionType: &idlCore.LiteralType{Type: &idlCore.LiteralType_Simple{Simple: idlCore.SimpleType_INTEGER}}, + Variables: []*idlCore.VariableMapEntry{ + { + Name: "x", + Var: &idlCore.Variable{ + Type: &idlCore.LiteralType{ + Type: &idlCore.LiteralType_CollectionType{ + CollectionType: &idlCore.LiteralType{Type: &idlCore.LiteralType_Simple{Simple: idlCore.SimpleType_INTEGER}}, + }, }, }, }, @@ -71,11 +74,14 @@ func RunArrayTestsEndToEnd(t *testing.T, executor core.Plugin, iter AdvanceItera template.Interface = &idlCore.TypedInterface{ Inputs: nil, Outputs: &idlCore.VariableMap{ - Variables: map[string]*idlCore.Variable{ - "x": { - Type: &idlCore.LiteralType{ - Type: &idlCore.LiteralType_CollectionType{ - CollectionType: &idlCore.LiteralType{Type: &idlCore.LiteralType_Simple{Simple: idlCore.SimpleType_INTEGER}}, + Variables: []*idlCore.VariableMapEntry{ + { + Name: "x", + Var: &idlCore.Variable{ + Type: &idlCore.LiteralType{ + Type: &idlCore.LiteralType_CollectionType{ + CollectionType: &idlCore.LiteralType{Type: &idlCore.LiteralType_Simple{Simple: idlCore.SimpleType_INTEGER}}, + }, }, }, }, diff --git a/go/tasks/plugins/array/awsbatch/job_definition_test.go b/go/tasks/plugins/array/awsbatch/job_definition_test.go index a7dbd97754..cacb7aa9e3 100644 --- a/go/tasks/plugins/array/awsbatch/job_definition_test.go +++ b/go/tasks/plugins/array/awsbatch/job_definition_test.go @@ -44,7 +44,14 @@ func TestEnsureJobDefinition(t *testing.T) { tReader.OnReadMatch(mock.Anything).Return(&core.TaskTemplate{ Interface: &core.TypedInterface{ Outputs: &core.VariableMap{ - Variables: map[string]*core.Variable{"var1": {Type: &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_INTEGER}}}}, + Variables: []*core.VariableMapEntry{ + { + Name: "var1", + Var: &core.Variable{ + Type: &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_INTEGER}}, + }, + }, + }, }, }, Target: &core.TaskTemplate_Container{ @@ -109,7 +116,14 @@ func TestEnsureJobDefinitionWithSecurityContext(t *testing.T) { tReader.OnReadMatch(mock.Anything).Return(&core.TaskTemplate{ Interface: &core.TypedInterface{ Outputs: &core.VariableMap{ - Variables: map[string]*core.Variable{"var1": {Type: &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_INTEGER}}}}, + Variables: []*core.VariableMapEntry{ + { + Name: "var1", + Var: &core.Variable{ + Type: &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_INTEGER}}, + }, + }, + }, }, }, Target: &core.TaskTemplate_Container{ diff --git a/go/tasks/plugins/array/catalog.go b/go/tasks/plugins/array/catalog.go index 6dc10f4d07..f7679dc90c 100644 --- a/go/tasks/plugins/array/catalog.go +++ b/go/tasks/plugins/array/catalog.go @@ -344,14 +344,14 @@ func makeSingularTaskInterface(varMap *idlCore.VariableMap) *idlCore.VariableMap } res := &idlCore.VariableMap{ - Variables: make(map[string]*idlCore.Variable, len(varMap.Variables)), + Variables: make([]*idlCore.VariableMapEntry, len(varMap.Variables)), } - for key, val := range varMap.Variables { - if val.GetType().GetCollectionType() != nil { - res.Variables[key] = &idlCore.Variable{Type: val.GetType().GetCollectionType()} + for i, val := range varMap.Variables { + if val.GetVar().GetType().GetCollectionType() != nil { + res.Variables[i] = &idlCore.VariableMapEntry{Name: val.GetName(), Var: &idlCore.Variable{Type: val.GetVar().GetType().GetCollectionType()}} } else { - res.Variables[key] = val + res.Variables[i] = val } } diff --git a/go/tasks/plugins/array/catalog_test.go b/go/tasks/plugins/array/catalog_test.go index aad365dd04..3c17970a43 100644 --- a/go/tasks/plugins/array/catalog_test.go +++ b/go/tasks/plugins/array/catalog_test.go @@ -163,8 +163,8 @@ func TestDetermineDiscoverability(t *testing.T) { Version: "1", }, Interface: &core.TypedInterface{ - Inputs: &core.VariableMap{Variables: map[string]*core.Variable{}}, - Outputs: &core.VariableMap{Variables: map[string]*core.Variable{}}, + Inputs: &core.VariableMap{Variables: []*core.VariableMapEntry{}}, + Outputs: &core.VariableMap{Variables: []*core.VariableMapEntry{}}, }, Target: &core.TaskTemplate_Container{ Container: &core.Container{ @@ -261,12 +261,15 @@ func TestDiscoverabilityTaskType1(t *testing.T) { Version: "1", }, Interface: &core.TypedInterface{ - Inputs: &core.VariableMap{Variables: map[string]*core.Variable{ - "foo": { - Description: "foo", + Inputs: &core.VariableMap{Variables: []*core.VariableMapEntry{ + { + Name: "foo", + Var: &core.Variable{ + Description: "foo", + }, }, }}, - Outputs: &core.VariableMap{Variables: map[string]*core.Variable{}}, + Outputs: &core.VariableMap{Variables: []*core.VariableMapEntry{}}, }, Target: &core.TaskTemplate_Container{ Container: &core.Container{ diff --git a/go/tasks/plugins/array/outputs.go b/go/tasks/plugins/array/outputs.go index ab31021952..c049b80809 100644 --- a/go/tasks/plugins/array/outputs.go +++ b/go/tasks/plugins/array/outputs.go @@ -177,8 +177,8 @@ func AssembleFinalOutputs(ctx context.Context, assemblyQueue OutputAssembler, tC state.GetOriginalArraySize(), state.GetArrayStatus().Detailed.ItemsCount) varNames := make([]string, 0, len(outputVariables.GetVariables())) - for varName := range outputVariables.GetVariables() { - varNames = append(varNames, varName) + for _, v := range outputVariables.GetVariables() { + varNames = append(varNames, v.Name) } finalPhases := buildFinalPhases(state.GetArrayStatus().Detailed, diff --git a/go/tasks/plugins/array/outputs_test.go b/go/tasks/plugins/array/outputs_test.go index 743a3366b2..f36675a7e9 100644 --- a/go/tasks/plugins/array/outputs_test.go +++ b/go/tasks/plugins/array/outputs_test.go @@ -295,7 +295,14 @@ func TestAssembleFinalOutputs(t *testing.T) { tReader.OnReadMatch(mock.Anything).Return(&core.TaskTemplate{ Interface: &core.TypedInterface{ Outputs: &core.VariableMap{ - Variables: map[string]*core.Variable{"var1": {Type: &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_INTEGER}}}}, + Variables: []*core.VariableMapEntry{ + { + Name: "var1", + Var: &core.Variable{ + Type: &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_INTEGER}}, + }, + }, + }, }, }, }, nil) diff --git a/go/tasks/plugins/hive/execution_state.go b/go/tasks/plugins/hive/execution_state.go index cbc45cc06d..6d10791067 100644 --- a/go/tasks/plugins/hive/execution_state.go +++ b/go/tasks/plugins/hive/execution_state.go @@ -514,9 +514,10 @@ func WriteOutputs(ctx context.Context, tCtx core.TaskExecutionContext, currentSt return currentState, errors.Errorf(errors.BadTaskSpecification, "Hive tasks must have zero or one output: [%d] found", len(outputs)) } if len(outputs) == 1 { - if results, ok := outputs["results"]; ok { - if results.GetType().GetSchema() == nil { - return currentState, errors.Errorf(errors.BadTaskSpecification, "A non-SchemaType was found [%v]", results.GetType()) + results := outputs[0] + if results.GetName() == "results" { + if results.GetVar().GetType().GetSchema() == nil { + return currentState, errors.Errorf(errors.BadTaskSpecification, "A non-SchemaType was found [%v]", results.GetVar().GetType()) } logger.Debugf(ctx, "Writing outputs file for Hive task at [%s]", tCtx.OutputWriter().GetOutputPrefixPath()) err = tCtx.OutputWriter().Put(ctx, ioutils.NewInMemoryOutputReader( @@ -527,7 +528,7 @@ func WriteOutputs(ctx context.Context, tCtx core.TaskExecutionContext, currentSt Scalar: &idlCore.Scalar{Value: &idlCore.Scalar_Schema{ Schema: &idlCore.Schema{ Uri: externalLocation.String(), - Type: results.GetType().GetSchema(), + Type: results.GetVar().GetType().GetSchema(), }, }, }, diff --git a/go/tasks/plugins/hive/test_helpers.go b/go/tasks/plugins/hive/test_helpers.go index 63b7fd564a..e427dbb80f 100644 --- a/go/tasks/plugins/hive/test_helpers.go +++ b/go/tasks/plugins/hive/test_helpers.go @@ -49,10 +49,13 @@ func GetSingleHiveQueryTaskTemplate() idlCore.TaskTemplate { }, Interface: &idlCore.TypedInterface{ Outputs: &idlCore.VariableMap{ - Variables: map[string]*idlCore.Variable{ - "results": &idlCore.Variable{ - Type: &idlCore.LiteralType{ - Type: &idlCore.LiteralType_Schema{Schema: &idlCore.SchemaType{}}, + Variables: []*idlCore.VariableMapEntry{ + { + Name: "results", + Var: &idlCore.Variable{ + Type: &idlCore.LiteralType{ + Type: &idlCore.LiteralType_Schema{Schema: &idlCore.SchemaType{}}, + }, }, }, }, diff --git a/go/tasks/plugins/k8s/sagemaker/outputs.go b/go/tasks/plugins/k8s/sagemaker/outputs.go index e4da7ea953..a2bed446d1 100644 --- a/go/tasks/plugins/k8s/sagemaker/outputs.go +++ b/go/tasks/plugins/k8s/sagemaker/outputs.go @@ -17,10 +17,11 @@ import ( func createOutputLiteralMap(tk *core.TaskTemplate, outputPath string) *core.LiteralMap { op := &core.LiteralMap{} - for k := range tk.Interface.Outputs.Variables { + for _, v := range tk.Interface.Outputs.Variables { // if v != core.LiteralType_Blob{} op.Literals = make(map[string]*core.Literal) - op.Literals[k] = &core.Literal{ + // make literal map ordered? + op.Literals[v.Name] = &core.Literal{ Value: &core.Literal_Scalar{ Scalar: &core.Scalar{ Value: &core.Scalar_Blob{ diff --git a/go/tasks/plugins/k8s/sagemaker/plugin_test_utils.go b/go/tasks/plugins/k8s/sagemaker/plugin_test_utils.go index 9814c953f3..d223faf0c8 100644 --- a/go/tasks/plugins/k8s/sagemaker/plugin_test_utils.go +++ b/go/tasks/plugins/k8s/sagemaker/plugin_test_utils.go @@ -114,22 +114,28 @@ func generateMockHyperparameterTuningJobTaskTemplate(id string, hpoJobCustomObj Custom: &structObj, Interface: &flyteIdlCore.TypedInterface{ Inputs: &flyteIdlCore.VariableMap{ - Variables: map[string]*flyteIdlCore.Variable{ - "input": { - Type: &flyteIdlCore.LiteralType{ - Type: &flyteIdlCore.LiteralType_CollectionType{ - CollectionType: &flyteIdlCore.LiteralType{Type: &flyteIdlCore.LiteralType_Simple{Simple: flyteIdlCore.SimpleType_INTEGER}}, + Variables: []*flyteIdlCore.VariableMapEntry{ + { + Name: "input", + Var: &flyteIdlCore.Variable{ + Type: &flyteIdlCore.LiteralType{ + Type: &flyteIdlCore.LiteralType_CollectionType{ + CollectionType: &flyteIdlCore.LiteralType{Type: &flyteIdlCore.LiteralType_Simple{Simple: flyteIdlCore.SimpleType_INTEGER}}, + }, }, }, }, }, }, Outputs: &flyteIdlCore.VariableMap{ - Variables: map[string]*flyteIdlCore.Variable{ - "output": { - Type: &flyteIdlCore.LiteralType{ - Type: &flyteIdlCore.LiteralType_CollectionType{ - CollectionType: &flyteIdlCore.LiteralType{Type: &flyteIdlCore.LiteralType_Simple{Simple: flyteIdlCore.SimpleType_INTEGER}}, + Variables: []*flyteIdlCore.VariableMapEntry{ + { + Name: "output", + Var: &flyteIdlCore.Variable{ + Type: &flyteIdlCore.LiteralType{ + Type: &flyteIdlCore.LiteralType_CollectionType{ + CollectionType: &flyteIdlCore.LiteralType{Type: &flyteIdlCore.LiteralType_Simple{Simple: flyteIdlCore.SimpleType_INTEGER}}, + }, }, }, }, diff --git a/go/tasks/plugins/presto/execution_state.go b/go/tasks/plugins/presto/execution_state.go index 3370b0b951..44d669477a 100644 --- a/go/tasks/plugins/presto/execution_state.go +++ b/go/tasks/plugins/presto/execution_state.go @@ -447,7 +447,11 @@ func writeOutput(ctx context.Context, tCtx core.TaskExecutionContext, externalLo return err } - results := taskTemplate.Interface.Outputs.Variables["results"] + results, exists := utils.GetResultsVariable(taskTemplate) + if !exists { + logger.Infof(ctx, "The task declares no outputs. Skipping writing the outputs.") + return nil + } return tCtx.OutputWriter().Put(ctx, ioutils.NewInMemoryOutputReader( &pb.LiteralMap{ diff --git a/go/tasks/plugins/webapi/athena/utils.go b/go/tasks/plugins/webapi/athena/utils.go index a4b89b4e66..7a2ffc0479 100644 --- a/go/tasks/plugins/webapi/athena/utils.go +++ b/go/tasks/plugins/webapi/athena/utils.go @@ -12,6 +12,7 @@ import ( pb "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/ioutils" + pluginUtils "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/utils" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/webapi" "github.com/flyteorg/flytestdlib/logger" ) @@ -22,12 +23,7 @@ func writeOutput(ctx context.Context, tCtx webapi.StatusContext, externalLocatio return err } - if taskTemplate.Interface == nil || taskTemplate.Interface.Outputs == nil || taskTemplate.Interface.Outputs.Variables == nil { - logger.Infof(ctx, "The task declares no outputs. Skipping writing the outputs.") - return nil - } - - resultsSchema, exists := taskTemplate.Interface.Outputs.Variables["results"] + resultsSchema, exists := pluginUtils.GetResultsVariable(taskTemplate) if !exists { logger.Infof(ctx, "The task declares no outputs. Skipping writing the outputs.") return nil diff --git a/go/tasks/plugins/webapi/athena/utils_test.go b/go/tasks/plugins/webapi/athena/utils_test.go index 4fae06cba8..b4f943fd61 100644 --- a/go/tasks/plugins/webapi/athena/utils_test.go +++ b/go/tasks/plugins/webapi/athena/utils_test.go @@ -41,8 +41,11 @@ func Test_writeOutput(t *testing.T) { taskReader.OnRead(ctx).Return(&core.TaskTemplate{ Interface: &core.TypedInterface{ Outputs: &core.VariableMap{ - Variables: map[string]*core.Variable{ - "myOutput": &core.Variable{}, + Variables: []*core.VariableMapEntry{ + { + Name: "myOutput", + Var: &core.Variable{}, + }, }, }, }, @@ -73,12 +76,15 @@ func Test_writeOutput(t *testing.T) { taskReader.OnRead(ctx).Return(&core.TaskTemplate{ Interface: &core.TypedInterface{ Outputs: &core.VariableMap{ - Variables: map[string]*core.Variable{ - "results": { - Type: &core.LiteralType{ - Type: &core.LiteralType_Schema{ - Schema: &core.SchemaType{ - Columns: []*core.SchemaType_SchemaColumn{}, + Variables: []*core.VariableMapEntry{ + { + Name: "results", + Var: &core.Variable{ + Type: &core.LiteralType{ + Type: &core.LiteralType_Schema{ + Schema: &core.SchemaType{ + Columns: []*core.SchemaType_SchemaColumn{}, + }, }, }, }, @@ -158,12 +164,15 @@ func Test_ExtractQueryInfo(t *testing.T) { Type: validProto.taskType, Interface: &core.TypedInterface{ Outputs: &core.VariableMap{ - Variables: map[string]*core.Variable{ - "results": { - Type: &core.LiteralType{ - Type: &core.LiteralType_Schema{ - Schema: &core.SchemaType{ - Columns: []*core.SchemaType_SchemaColumn{}, + Variables: []*core.VariableMapEntry{ + { + Name: "results", + Var: &core.Variable{ + Type: &core.LiteralType{ + Type: &core.LiteralType_Schema{ + Schema: &core.SchemaType{ + Columns: []*core.SchemaType_SchemaColumn{}, + }, }, }, },