From 00f08b157ed403f728b237fb362bce3a5e7a88dc Mon Sep 17 00:00:00 2001 From: Saravanan Balasubramanian Date: Thu, 4 Apr 2019 13:18:33 -0700 Subject: [PATCH 01/21] CheckandEstimate implementation --- workflow/controller/operator.go | 67 ++++++++++++++++++++++++++++----- 1 file changed, 58 insertions(+), 9 deletions(-) diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index bfc1ed6b52df..b04c045956d6 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -61,6 +61,15 @@ type wfOperationCtx struct { // workflowDeadline is the deadline which the workflow is expected to complete before we // terminate the workflow. workflowDeadline *time.Time + + // currentWFSize is current Workflow size + currentWFSize int + + // unSavedNodeStatusSize is unsaved workflow size + unSavedNodeStatusSize int + + // wfFailed is workflow failed status + isWFFailed bool } var ( @@ -124,7 +133,12 @@ func (woc *wfOperationCtx) operate() { woc.log.Errorf("Recovered from panic: %+v\n%s", r, debug.Stack()) } }() + woc.log.Infof("Processing workflow") + + // Initialize Workflow failed status + woc.wfFailed = false + // Perform one-time workflow validation if woc.wf.Status.Phase == "" { woc.markWorkflowRunning() @@ -453,9 +467,9 @@ func (woc *wfOperationCtx) podReconciliation() error { seenPodLock := &sync.Mutex{} wfNodesLock := &sync.RWMutex{} - performAssessment := func(pod *apiv1.Pod) { + performAssessment := func(pod *apiv1.Pod) string { if pod == nil { - return + return "" } nodeNameForPod := pod.Annotations[common.AnnotationKeyNodeName] nodeID := woc.wf.NodeID(nodeNameForPod) @@ -475,16 +489,20 @@ func (woc *wfOperationCtx) podReconciliation() error { if node.Completed() && !node.IsDaemoned() { if tmpVal, tmpOk := pod.Labels[common.LabelKeyCompleted]; tmpOk { if tmpVal == "true" { - return + return nodeID } } woc.completedPods[pod.ObjectMeta.Name] = true } } + return nodeID } parallelPodNum := make(chan string, 500) var wg sync.WaitGroup + + woc.currentWFSize = woc.getSize() + for _, pod := range podList.Items { parallelPodNum <- pod.Name wg.Add(1) @@ -493,20 +511,23 @@ func (woc *wfOperationCtx) podReconciliation() error { wfNodesLock.Lock() origNodeStatus := *woc.wf.Status.DeepCopy() wfNodesLock.Unlock() - performAssessment(&tmpPod) + nodeID := performAssessment(&tmpPod) err = woc.applyExecutionControl(&tmpPod, wfNodesLock) if err != nil { woc.log.Warnf("Failed to apply execution control to pod %s", tmpPod.Name) } wfNodesLock.Lock() defer wfNodesLock.Unlock() - err = woc.checkAndCompress() + err = woc.checkAndEstimate(nodeID) if err != nil { woc.wf.Status = origNodeStatus nodeNameForPod := tmpPod.Annotations[common.AnnotationKeyNodeName] woc.log.Warnf("%v", err) woc.markNodeErrorClearOuput(nodeNameForPod, err) err = woc.checkAndCompress() + if err != nil { + woc.wfFailed = true + } } <-parallelPodNum }(pod) @@ -1664,17 +1685,19 @@ func (woc *wfOperationCtx) getSize() int { // The compressed content will be assign to compressedNodes element and clear the nodestatus map. func (woc *wfOperationCtx) checkAndCompress() error { - if woc.wf.Status.CompressedNodes != "" || (woc.wf.Status.CompressedNodes == "" && woc.getSize() >= maxWorkflowSize) { - + if woc.isWFFailed == false && (woc.wf.Status.CompressedNodes != "" || (woc.wf.Status.CompressedNodes == "" && woc.getSize() >= maxWorkflowSize)) { + start := time.Now() nodeContent, err := json.Marshal(woc.wf.Status.Nodes) if err != nil { return errors.InternalWrapError(err) } buff := string(nodeContent) woc.wf.Status.CompressedNodes = file.CompressEncodeString(buff) - + fmt.Println("checkAndCompress: %s", time.Since(start)) } - if woc.wf.Status.CompressedNodes != "" && woc.getSize() >= maxWorkflowSize { + + if woc.isWFFailed || (woc.wf.Status.CompressedNodes != "" && woc.getSize() >= maxWorkflowSize) { + woc.isWFFailed = true return errors.InternalError(fmt.Sprintf("Workflow is longer than maximum allowed size. Size=%d", woc.getSize())) } return nil @@ -1698,3 +1721,29 @@ func (woc *wfOperationCtx) checkAndDecompress() error { } return nil } + +// checkAndEstimate will check and estimate the workflow size with current nodestatus +func (woc *wfOperationCtx) checkAndEstimate(nodeID string) error { + if nodeID == "" { + return nil + } + + if woc.isWFFailed { + return errors.InternalError(fmt.Sprintf("Workflow is longer than maximum allowed size. Size=%d", woc.currentWFSize+woc.unSavedNodeStatusSize)) + } + + if woc.wf.Status.CompressedNodes != "" { + if node, ok := woc.wf.Status.Nodes[nodeID]; ok { + content, err := json.Marshal(node) + if err != nil { + return errors.InternalWrapError(err) + } + nodeSize := len(file.CompressEncodeString(string(content))) + if (nodeSize + woc.unSavedNodeStatusSize + woc.currentWFSize) >= maxWorkflowSize { + return errors.InternalError(fmt.Sprintf("Workflow is longer than maximum allowed size. Size=%d", woc.currentWFSize+nodeSize+woc.unSavedNodeStatusSize)) + } + woc.unSavedNodeStatusSize += nodeSize + } + } + return nil +} From e7ac42ad6996b40bda9408f05535d56c562049de Mon Sep 17 00:00:00 2001 From: Saravanan Balasubramanian Date: Thu, 4 Apr 2019 14:35:50 -0700 Subject: [PATCH 02/21] fixed variable rename --- workflow/controller/operator.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index b04c045956d6..f9cad161aa00 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -137,7 +137,7 @@ func (woc *wfOperationCtx) operate() { woc.log.Infof("Processing workflow") // Initialize Workflow failed status - woc.wfFailed = false + woc.isWFFailed = false // Perform one-time workflow validation if woc.wf.Status.Phase == "" { @@ -526,7 +526,7 @@ func (woc *wfOperationCtx) podReconciliation() error { woc.markNodeErrorClearOuput(nodeNameForPod, err) err = woc.checkAndCompress() if err != nil { - woc.wfFailed = true + woc.isWFFailed = true } } <-parallelPodNum From d956bfcd3300ce3a3581e93b2c47ece2360624f7 Mon Sep 17 00:00:00 2001 From: Saravanan Balasubramanian Date: Thu, 4 Apr 2019 17:02:17 -0700 Subject: [PATCH 03/21] fixed gofmt --- workflow/controller/operator.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index f9cad161aa00..aa5eb64b983b 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -1686,14 +1686,12 @@ func (woc *wfOperationCtx) getSize() int { func (woc *wfOperationCtx) checkAndCompress() error { if woc.isWFFailed == false && (woc.wf.Status.CompressedNodes != "" || (woc.wf.Status.CompressedNodes == "" && woc.getSize() >= maxWorkflowSize)) { - start := time.Now() nodeContent, err := json.Marshal(woc.wf.Status.Nodes) if err != nil { return errors.InternalWrapError(err) } buff := string(nodeContent) woc.wf.Status.CompressedNodes = file.CompressEncodeString(buff) - fmt.Println("checkAndCompress: %s", time.Since(start)) } if woc.isWFFailed || (woc.wf.Status.CompressedNodes != "" && woc.getSize() >= maxWorkflowSize) { From be9b3b5037f86043754cbee8433980ec4f3b3135 Mon Sep 17 00:00:00 2001 From: Saravanan Balasubramanian Date: Fri, 5 Apr 2019 13:47:42 -0700 Subject: [PATCH 04/21] fixed feedbacks --- workflow/controller/operator.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index aa5eb64b983b..64d80cdcf755 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -1727,7 +1727,7 @@ func (woc *wfOperationCtx) checkAndEstimate(nodeID string) error { } if woc.isWFFailed { - return errors.InternalError(fmt.Sprintf("Workflow is longer than maximum allowed size. Size=%d", woc.currentWFSize+woc.unSavedNodeStatusSize)) + return errors.InternalErrorf("Workflow is longer than maximum allowed size. Size=%d", woc.currentWFSize+woc.unSavedNodeStatusSize) } if woc.wf.Status.CompressedNodes != "" { @@ -1738,7 +1738,7 @@ func (woc *wfOperationCtx) checkAndEstimate(nodeID string) error { } nodeSize := len(file.CompressEncodeString(string(content))) if (nodeSize + woc.unSavedNodeStatusSize + woc.currentWFSize) >= maxWorkflowSize { - return errors.InternalError(fmt.Sprintf("Workflow is longer than maximum allowed size. Size=%d", woc.currentWFSize+nodeSize+woc.unSavedNodeStatusSize)) + return errors.InternalErrorf("Workflow is longer than maximum allowed size. Size=%d", woc.currentWFSize+nodeSize+woc.unSavedNodeStatusSize) } woc.unSavedNodeStatusSize += nodeSize } From b0e4288b27d67bbe05b45765f9a12633f6dae994 Mon Sep 17 00:00:00 2001 From: Saravanan Balasubramanian Date: Wed, 3 Jul 2019 14:57:14 -0700 Subject: [PATCH 05/21] Implement the exclude script output in step and DAG workflow --- cmd/argoexec/commands/wait.go | 40 +++++++++++----- workflow/common/common.go | 4 ++ workflow/controller/operator.go | 68 +++++++++++++++++++++++++-- workflow/controller/operator_test.go | 70 ++++++++++++++++++++++++++++ workflow/controller/workflowpod.go | 3 +- workflow/executor/executor.go | 54 +++++++++++++++++++++ 6 files changed, 224 insertions(+), 15 deletions(-) diff --git a/cmd/argoexec/commands/wait.go b/cmd/argoexec/commands/wait.go index 1e933f29fdcf..f08421f5c8bf 100644 --- a/cmd/argoexec/commands/wait.go +++ b/cmd/argoexec/commands/wait.go @@ -1,6 +1,10 @@ package commands import ( + "github.com/argoproj/argo/workflow/common" + "github.com/argoproj/argo/workflow/executor" + "strconv" + "strings" "time" "github.com/argoproj/pkg/stats" @@ -28,8 +32,18 @@ func waitContainer() error { defer stats.LogStats() stats.StartStatsTicker(5 * time.Minute) + includeOutput, err := executor.GetAnnotationField(wfExecutor.PodAnnotationsPath, common.AnnotationIncludeOutputs) + if err != nil { + wfExecutor.AddError(err) + } + + includeOutputFlag, err := strconv.ParseBool(strings.Trim(includeOutput, "\"")) + + if err != nil { + wfExecutor.AddError(err) + } // Wait for main container to complete - err := wfExecutor.Wait() + err = wfExecutor.Wait() if err != nil { wfExecutor.AddError(err) // do not return here so we can still try to kill sidecars & save outputs @@ -56,16 +70,20 @@ func waitContainer() error { wfExecutor.AddError(err) return err } - // Capture output script result - err = wfExecutor.CaptureScriptResult() - if err != nil { - wfExecutor.AddError(err) - return err - } - err = wfExecutor.AnnotateOutputs(logArt) - if err != nil { - wfExecutor.AddError(err) - return err + + if includeOutputFlag { + // Capture output script result + err = wfExecutor.CaptureScriptResult() + if err != nil { + wfExecutor.AddError(err) + return err + } + + err = wfExecutor.AnnotateOutputs(logArt) + if err != nil { + wfExecutor.AddError(err) + return err + } } return nil } diff --git a/workflow/common/common.go b/workflow/common/common.go index 68909a755877..d4f80b10c2d4 100644 --- a/workflow/common/common.go +++ b/workflow/common/common.go @@ -34,6 +34,10 @@ const ( // AnnotationKeyNodeName is the pod metadata annotation key containing the workflow node name AnnotationKeyNodeName = workflow.FullName + "/node-name" + + // AnnotationIncludeOutputs is the pod metadata annotation key containing flag to include script output + AnnotationIncludeOutputs = workflow.FullName + "/include-output" + // AnnotationKeyNodeMessage is the pod metadata annotation key the executor will use to // communicate errors encountered by the executor during artifact load/save, etc... AnnotationKeyNodeMessage = workflow.FullName + "/node-message" diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index 9b354a96b275..f5dd4bc30f98 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -1290,7 +1290,7 @@ func (woc *wfOperationCtx) executeContainer(nodeName string, tmpl *wfv1.Template return node } woc.log.Debugf("Executing node %s with container template: %v\n", nodeName, tmpl) - _, err := woc.createWorkflowPod(nodeName, *tmpl.Container, tmpl) + _, err := woc.createWorkflowPod(nodeName, *tmpl.Container, tmpl, false) if err != nil { return woc.initializeNode(nodeName, wfv1.NodeTypePod, tmpl.Name, boundaryID, wfv1.NodeError, err.Error()) } @@ -1362,14 +1362,76 @@ func getTemplateOutputsFromScope(tmpl *wfv1.Template, scope *wfScope) (*wfv1.Out return &outputs, nil } +func HasOutputResultRef(name string, parentTmpl *wfv1.Template) bool { + + nodeName := "." + name + "." + + if parentTmpl.DAG != nil { + for _, dagTask := range parentTmpl.DAG.Tasks { + + if strings.Contains(dagTask.When, nodeName) { + return true + } + + for _, artf := range dagTask.Arguments.Parameters { + if strings.Contains(*artf.Value, "{{") && (strings.Contains(*artf.Value, nodeName) || (artf.ValueFrom != nil && strings.Contains(artf.ValueFrom.Parameter, nodeName))) { + return true + } + } + } + } + + if parentTmpl.Steps != nil { + for idx := range parentTmpl.Steps { + stepGroup := parentTmpl.Steps[idx] + for _, step := range stepGroup { + + if strings.Contains(step.When, nodeName) { + return true + } + for _, artf := range step.Arguments.Parameters { + if strings.Contains(*artf.Value, "{{") && (strings.Contains(*artf.Value, nodeName) || (artf.ValueFrom != nil && strings.Contains(artf.ValueFrom.Parameter, nodeName))) { + return true + } + } + } + } + } + + for _, param := range parentTmpl.Outputs.Parameters { + if strings.Contains(*param.Value, "{{") && (strings.Contains(*param.Value, nodeName) || (param.ValueFrom != nil && strings.Contains(param.ValueFrom.Parameter, nodeName))) { + return true + } + } + + return false +} + +func GetStepOrDAGTaskName(nodeName string) string { + if strings.Contains(nodeName, ".") { + return nodeName[strings.LastIndex(nodeName, ".")+1:] + } + return nodeName +} + func (woc *wfOperationCtx) executeScript(nodeName string, tmpl *wfv1.Template, boundaryID string) *wfv1.NodeStatus { + + boundaryNode := woc.wf.Status.Nodes[boundaryID] + parentTemplate := woc.wf.GetTemplate(boundaryNode.TemplateName) + + includeScriptOutput := false + if parentTemplate != nil { + name := GetStepOrDAGTaskName(nodeName) + includeScriptOutput = HasOutputResultRef(name, parentTemplate) + } node := woc.getNodeByName(nodeName) + if node != nil { return node } mainCtr := tmpl.Script.Container mainCtr.Args = append(mainCtr.Args, common.ExecutorScriptSourcePath) - _, err := woc.createWorkflowPod(nodeName, mainCtr, tmpl) + _, err := woc.createWorkflowPod(nodeName, mainCtr, tmpl, includeScriptOutput) if err != nil { return woc.initializeNode(nodeName, wfv1.NodeTypePod, tmpl.Name, boundaryID, wfv1.NodeError, err.Error()) } @@ -1604,7 +1666,7 @@ func (woc *wfOperationCtx) executeResource(nodeName string, tmpl *wfv1.Template, mainCtr.VolumeMounts = []apiv1.VolumeMount{ volumeMountPodMetadata, } - _, err = woc.createWorkflowPod(nodeName, *mainCtr, tmpl) + _, err = woc.createWorkflowPod(nodeName, *mainCtr, tmpl, false) if err != nil { return woc.initializeNode(nodeName, wfv1.NodeTypePod, tmpl.Name, boundaryID, wfv1.NodeError, err.Error()) } diff --git a/workflow/controller/operator_test.go b/workflow/controller/operator_test.go index fbe94183869c..b6891c436315 100644 --- a/workflow/controller/operator_test.go +++ b/workflow/controller/operator_test.go @@ -1159,3 +1159,73 @@ func TestResourceWithOwnerReferenceTemplate(t *testing.T) { assert.Equal(t, "resource-with-ownerreference-template", objectMetas["resource-cm-3"].OwnerReferences[1].Name) } } + +var stepScriptTmpl = ` +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + name: resource-with-ownerreference-template +spec: + entrypoint: start + templates: + - name: start + steps: + - - name: resource-1 + template: resource-1 + - name: resource-2 + template: resource-2 + arguments: + parameters: [{name: message, value: "{{steps.resource-1.output.result}}"}] + - name: resource-3 + template: resource-3 ` + +var dagScriptTmpl = ` +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: dag-target- +spec: + entrypoint: dag-target + arguments: + parameters: + - name: target + value: E + + templates: + - name: dag-target + dag: + tasks: + - name: A + template: echo + arguments: + parameters: [{name: message, value: A}] + - name: B + dependencies: [A] + template: echo + arguments: + parameters: [{name: message, value: B}] + - name: C + dependencies: [A] + template: echo + arguments: + parameters: [{name: message, value: "{{dag.A.output.result}}"}] + - name: D + dependencies: [B, C] + template: echo + arguments: + parameters: [{name: message, value: D}]` + +func TestGetNodeName(t *testing.T) { + + // Steps Workflow + wf := unmarshalWF(stepScriptTmpl) + + assert.True(t, HasOutputResultRef("resource-1", &wf.Spec.Templates[0])) + assert.False(t, HasOutputResultRef("resource-2", &wf.Spec.Templates[0])) + + // DAG workflow + wf = unmarshalWF(dagScriptTmpl) + + assert.True(t, HasOutputResultRef("A", &wf.Spec.Templates[0])) + assert.False(t, HasOutputResultRef("B", &wf.Spec.Templates[0])) +} diff --git a/workflow/controller/workflowpod.go b/workflow/controller/workflowpod.go index 8ca546dd65de..b456f9afa038 100644 --- a/workflow/controller/workflowpod.go +++ b/workflow/controller/workflowpod.go @@ -85,7 +85,7 @@ func (woc *wfOperationCtx) getVolumeDockerSock() apiv1.Volume { } } -func (woc *wfOperationCtx) createWorkflowPod(nodeName string, mainCtr apiv1.Container, tmpl *wfv1.Template) (*apiv1.Pod, error) { +func (woc *wfOperationCtx) createWorkflowPod(nodeName string, mainCtr apiv1.Container, tmpl *wfv1.Template, includeScriptOutput bool) (*apiv1.Pod, error) { nodeID := woc.wf.NodeID(nodeName) woc.log.Debugf("Creating Pod: %s (%s)", nodeName, nodeID) tmpl = tmpl.DeepCopy() @@ -106,6 +106,7 @@ func (woc *wfOperationCtx) createWorkflowPod(nodeName string, mainCtr apiv1.Cont }, Annotations: map[string]string{ common.AnnotationKeyNodeName: nodeName, + common.AnnotationIncludeOutputs: strconv.FormatBool(includeScriptOutput), }, OwnerReferences: []metav1.OwnerReference{ *metav1.NewControllerRef(woc.wf, wfv1.SchemaGroupVersionKind), diff --git a/workflow/executor/executor.go b/workflow/executor/executor.go index 698b36dc16eb..934d403d71bf 100644 --- a/workflow/executor/executor.go +++ b/workflow/executor/executor.go @@ -69,6 +69,8 @@ type WorkflowExecutor struct { // list of errors that occurred during execution. // the first of these is used as the overall message of the node errors []error + + ExcludeOutput bool } // ContainerRuntimeExecutor is the interface for interacting with a container runtime (e.g. docker) @@ -1101,6 +1103,58 @@ func LoadTemplate(path string) (*wfv1.Template, error) { return &tmpl, nil } +func GetAnnotationField(filePath string, key string) (string, error) { + // Read the annotation file + file, err := os.Open(filePath) + if err != nil { + log.Errorf("ERROR opening annotation file from %s", filePath) + return "", errors.InternalWrapError(err) + } + content := "" + defer func() { + _ = file.Close() + }() + reader := bufio.NewReader(file) + + // Prefix of key property in the annotation file + prefix := fmt.Sprintf("%s=", key) + + for { + // Read line-by-line + var buffer bytes.Buffer + var l []byte + var isPrefix bool + for { + l, isPrefix, err = reader.ReadLine() + buffer.Write(l) + // If we've reached the end of the line, stop reading. + if !isPrefix { + break + } + // If we're just at the EOF, break + if err != nil { + break + } + } + + line := buffer.String() + + // Read property + if strings.HasPrefix(line, prefix) { + // Trim the prefix + content = strings.TrimPrefix(line, prefix) + break + } + + // The end of the annotation file + if err == io.EOF { + break + } + } + + return content, nil +} + // unmarshalAnnotationField unmarshals the value of an annotation key into the supplied interface // from the downward api annotation volume file func unmarshalAnnotationField(filePath string, key string, into interface{}) error { From 14af0f72874a62322a419a6884b0da37bc55e11f Mon Sep 17 00:00:00 2001 From: Saravanan Balasubramanian Date: Wed, 3 Jul 2019 15:44:33 -0700 Subject: [PATCH 06/21] Update operator.go --- workflow/controller/operator.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index f5dd4bc30f98..8ef83656d8f3 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -1365,6 +1365,7 @@ func getTemplateOutputsFromScope(tmpl *wfv1.Template, scope *wfScope) (*wfv1.Out func HasOutputResultRef(name string, parentTmpl *wfv1.Template) bool { nodeName := "." + name + "." + fmt.Println(nodeName) if parentTmpl.DAG != nil { for _, dagTask := range parentTmpl.DAG.Tasks { @@ -1409,6 +1410,11 @@ func HasOutputResultRef(name string, parentTmpl *wfv1.Template) bool { func GetStepOrDAGTaskName(nodeName string) string { if strings.Contains(nodeName, ".") { + name := nodeName[strings.LastIndex(nodeName, ".")+1:] + // Check retry scenario + if strings.Contains(name, "(") { + return name[0:strings.Index(name, "(")] + } return nodeName[strings.LastIndex(nodeName, ".")+1:] } return nodeName @@ -1424,6 +1430,7 @@ func (woc *wfOperationCtx) executeScript(nodeName string, tmpl *wfv1.Template, b name := GetStepOrDAGTaskName(nodeName) includeScriptOutput = HasOutputResultRef(name, parentTemplate) } + fmt.Println(includeScriptOutput) node := woc.getNodeByName(nodeName) if node != nil { From e94f5b6bbeec99cdddd532c863bf129d4a51319e Mon Sep 17 00:00:00 2001 From: Saravanan Balasubramanian Date: Wed, 3 Jul 2019 15:51:50 -0700 Subject: [PATCH 07/21] Update workflowpod.go --- workflow/controller/workflowpod.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/workflow/controller/workflowpod.go b/workflow/controller/workflowpod.go index b456f9afa038..33dddbe89d19 100644 --- a/workflow/controller/workflowpod.go +++ b/workflow/controller/workflowpod.go @@ -105,7 +105,7 @@ func (woc *wfOperationCtx) createWorkflowPod(nodeName string, mainCtr apiv1.Cont common.LabelKeyCompleted: "false", // Allows filtering by incomplete workflow pods }, Annotations: map[string]string{ - common.AnnotationKeyNodeName: nodeName, + common.AnnotationKeyNodeName: nodeName, common.AnnotationIncludeOutputs: strconv.FormatBool(includeScriptOutput), }, OwnerReferences: []metav1.OwnerReference{ From 9693af7220c095adad758a83d290280642dfbf77 Mon Sep 17 00:00:00 2001 From: Saravanan Balasubramanian Date: Tue, 16 Jul 2019 13:07:50 -0700 Subject: [PATCH 08/21] updated Review Comments --- cmd/argoexec/commands/wait.go | 18 ++---------------- workflow/common/common.go | 5 ++--- workflow/controller/operator.go | 1 - workflow/controller/workflowpod.go | 28 ++++++++++++++++------------ 4 files changed, 20 insertions(+), 32 deletions(-) diff --git a/cmd/argoexec/commands/wait.go b/cmd/argoexec/commands/wait.go index f08421f5c8bf..4f0591f4472e 100644 --- a/cmd/argoexec/commands/wait.go +++ b/cmd/argoexec/commands/wait.go @@ -1,10 +1,6 @@ package commands import ( - "github.com/argoproj/argo/workflow/common" - "github.com/argoproj/argo/workflow/executor" - "strconv" - "strings" "time" "github.com/argoproj/pkg/stats" @@ -32,18 +28,8 @@ func waitContainer() error { defer stats.LogStats() stats.StartStatsTicker(5 * time.Minute) - includeOutput, err := executor.GetAnnotationField(wfExecutor.PodAnnotationsPath, common.AnnotationIncludeOutputs) - if err != nil { - wfExecutor.AddError(err) - } - - includeOutputFlag, err := strconv.ParseBool(strings.Trim(includeOutput, "\"")) - - if err != nil { - wfExecutor.AddError(err) - } // Wait for main container to complete - err = wfExecutor.Wait() + err := wfExecutor.Wait() if err != nil { wfExecutor.AddError(err) // do not return here so we can still try to kill sidecars & save outputs @@ -71,7 +57,7 @@ func waitContainer() error { return err } - if includeOutputFlag { + if wfExecutor.ExecutionControl != nil && wfExecutor.ExecutionControl.IncludeScriptOutput { // Capture output script result err = wfExecutor.CaptureScriptResult() if err != nil { diff --git a/workflow/common/common.go b/workflow/common/common.go index d4f80b10c2d4..633a685ef3e2 100644 --- a/workflow/common/common.go +++ b/workflow/common/common.go @@ -35,9 +35,6 @@ const ( // AnnotationKeyNodeName is the pod metadata annotation key containing the workflow node name AnnotationKeyNodeName = workflow.FullName + "/node-name" - // AnnotationIncludeOutputs is the pod metadata annotation key containing flag to include script output - AnnotationIncludeOutputs = workflow.FullName + "/include-output" - // AnnotationKeyNodeMessage is the pod metadata annotation key the executor will use to // communicate errors encountered by the executor during artifact load/save, etc... AnnotationKeyNodeMessage = workflow.FullName + "/node-message" @@ -132,6 +129,8 @@ type ExecutionControl struct { // It is used to signal the executor to terminate a daemoned container. In the future it will be // used to support workflow or steps/dag level timeouts. Deadline *time.Time `json:"deadline,omitempty"` + // IncludeScriptOutput is containing flag to include script output + IncludeScriptOutput bool `json:"includeScriptOutput,omitemptysssss"` } type ResourceInterface interface { diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index 8ef83656d8f3..ab3ba4e96f44 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -1365,7 +1365,6 @@ func getTemplateOutputsFromScope(tmpl *wfv1.Template, scope *wfScope) (*wfv1.Out func HasOutputResultRef(name string, parentTmpl *wfv1.Template) bool { nodeName := "." + name + "." - fmt.Println(nodeName) if parentTmpl.DAG != nil { for _, dagTask := range parentTmpl.DAG.Tasks { diff --git a/workflow/controller/workflowpod.go b/workflow/controller/workflowpod.go index 33dddbe89d19..95a3348cc94d 100644 --- a/workflow/controller/workflowpod.go +++ b/workflow/controller/workflowpod.go @@ -105,8 +105,7 @@ func (woc *wfOperationCtx) createWorkflowPod(nodeName string, mainCtr apiv1.Cont common.LabelKeyCompleted: "false", // Allows filtering by incomplete workflow pods }, Annotations: map[string]string{ - common.AnnotationKeyNodeName: nodeName, - common.AnnotationIncludeOutputs: strconv.FormatBool(includeScriptOutput), + common.AnnotationKeyNodeName: nodeName, }, OwnerReferences: []metav1.OwnerReference{ *metav1.NewControllerRef(woc.wf, wfv1.SchemaGroupVersionKind), @@ -169,7 +168,7 @@ func (woc *wfOperationCtx) createWorkflowPod(nodeName string, mainCtr apiv1.Cont } addSchedulingConstraints(pod, wfSpec, tmpl) - woc.addMetadata(pod, tmpl) + woc.addMetadata(pod, tmpl, includeScriptOutput) err = addVolumeReferences(pod, woc.volumes, tmpl, woc.wf.Status.PersistentVolumeClaims) if err != nil { @@ -447,23 +446,28 @@ func isResourcesSpecified(ctr *apiv1.Container) bool { } // addMetadata applies metadata specified in the template -func (woc *wfOperationCtx) addMetadata(pod *apiv1.Pod, tmpl *wfv1.Template) { +func (woc *wfOperationCtx) addMetadata(pod *apiv1.Pod, tmpl *wfv1.Template, includeScriptOutput bool) { for k, v := range tmpl.Metadata.Annotations { pod.ObjectMeta.Annotations[k] = v } for k, v := range tmpl.Metadata.Labels { pod.ObjectMeta.Labels[k] = v } + + execCtl := common.ExecutionControl{ + IncludeScriptOutput: includeScriptOutput, + } + if woc.workflowDeadline != nil { - execCtl := common.ExecutionControl{ - Deadline: woc.workflowDeadline, - } - execCtlBytes, err := json.Marshal(execCtl) - if err != nil { - panic(err) - } - pod.ObjectMeta.Annotations[common.AnnotationKeyExecutionControl] = string(execCtlBytes) + execCtl.Deadline = woc.workflowDeadline + } + execCtlBytes, err := json.Marshal(execCtl) + if err != nil { + panic(err) + } + + pod.ObjectMeta.Annotations[common.AnnotationKeyExecutionControl] = string(execCtlBytes) } // addSchedulingConstraints applies any node selectors or affinity rules to the pod, either set in the workflow or the template From 730c7637f744ad80babdb17959779c6fff77aa8b Mon Sep 17 00:00:00 2001 From: Saravanan Balasubramanian Date: Wed, 17 Jul 2019 14:56:43 -0700 Subject: [PATCH 09/21] Update exec_control.go --- workflow/controller/exec_control.go | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/workflow/controller/exec_control.go b/workflow/controller/exec_control.go index b9ab95e4603f..fa2c4f5f0cc5 100644 --- a/workflow/controller/exec_control.go +++ b/workflow/controller/exec_control.go @@ -48,10 +48,6 @@ func (woc *wfOperationCtx) applyExecutionControl(pod *apiv1.Pod, wfNodesLock *sy } } - // Now ensure the pod's current annotation matches our desired deadline - desiredExecCtl := common.ExecutionControl{ - Deadline: woc.workflowDeadline, - } var podExecCtl common.ExecutionControl if execCtlStr, ok := pod.Annotations[common.AnnotationKeyExecutionControl]; ok && execCtlStr != "" { err := json.Unmarshal([]byte(execCtlStr), &podExecCtl) @@ -59,21 +55,26 @@ func (woc *wfOperationCtx) applyExecutionControl(pod *apiv1.Pod, wfNodesLock *sy woc.log.Warnf("Failed to unmarshal execution control from pod %s", pod.Name) } } - if podExecCtl.Deadline == nil && desiredExecCtl.Deadline == nil { + if podExecCtl.Deadline == nil && woc.workflowDeadline == nil { return nil - } else if podExecCtl.Deadline != nil && desiredExecCtl.Deadline != nil { - if podExecCtl.Deadline.Equal(*desiredExecCtl.Deadline) { + } else if podExecCtl.Deadline != nil && woc.workflowDeadline != nil { + if podExecCtl.Deadline.Equal(*woc.workflowDeadline) { return nil } } + if podExecCtl.Deadline != nil && podExecCtl.Deadline.IsZero() { // If the pod has already been explicitly signaled to terminate, then do nothing. // This can happen when daemon steps are terminated. woc.log.Infof("Skipping sync of execution control of pod %s. pod has been signaled to terminate", pod.Name) return nil } - woc.log.Infof("Execution control for pod %s out-of-sync desired: %v, actual: %v", pod.Name, desiredExecCtl.Deadline, podExecCtl.Deadline) - return woc.updateExecutionControl(pod.Name, desiredExecCtl) + + // Assign new Dealing value to PodExeCtl + podExecCtl.Deadline = woc.workflowDeadline + + woc.log.Infof("Execution control for pod %s out-of-sync desired: %v, actual: %v", pod.Name, woc.workflowDeadline, podExecCtl.Deadline) + return woc.updateExecutionControl(pod.Name, podExecCtl) } // killDaemonedChildren kill any daemoned pods of a steps or DAG template node. From 3a7a544c0a16b4aaad9844cf8c2c09af52a0b5b1 Mon Sep 17 00:00:00 2001 From: Saravanan Balasubramanian Date: Thu, 18 Jul 2019 17:39:25 -0700 Subject: [PATCH 10/21] fixed typo --- workflow/common/common.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/workflow/common/common.go b/workflow/common/common.go index 633a685ef3e2..cb6ef2aa6501 100644 --- a/workflow/common/common.go +++ b/workflow/common/common.go @@ -130,7 +130,7 @@ type ExecutionControl struct { // used to support workflow or steps/dag level timeouts. Deadline *time.Time `json:"deadline,omitempty"` // IncludeScriptOutput is containing flag to include script output - IncludeScriptOutput bool `json:"includeScriptOutput,omitemptysssss"` + IncludeScriptOutput bool `json:"includeScriptOutput,omitempty"` } type ResourceInterface interface { From 9fd31099b686f55847963ebe6c5b8c6d26156b3e Mon Sep 17 00:00:00 2001 From: Saravanan Balasubramanian Date: Fri, 19 Jul 2019 10:43:12 -0700 Subject: [PATCH 11/21] Updated Review comments --- cmd/argoexec/commands/wait.go | 12 ++++--- workflow/controller/exec_control.go | 2 +- workflow/controller/operator.go | 32 ++++++++++++----- workflow/controller/operator_test.go | 8 ++--- workflow/controller/workflowpod.go | 12 ++++--- workflow/executor/executor.go | 54 ---------------------------- 6 files changed, 42 insertions(+), 78 deletions(-) diff --git a/cmd/argoexec/commands/wait.go b/cmd/argoexec/commands/wait.go index 4f0591f4472e..e0b826ceb252 100644 --- a/cmd/argoexec/commands/wait.go +++ b/cmd/argoexec/commands/wait.go @@ -65,11 +65,13 @@ func waitContainer() error { return err } - err = wfExecutor.AnnotateOutputs(logArt) - if err != nil { - wfExecutor.AddError(err) - return err - } } + + err = wfExecutor.AnnotateOutputs(logArt) + if err != nil { + wfExecutor.AddError(err) + return err + } + return nil } diff --git a/workflow/controller/exec_control.go b/workflow/controller/exec_control.go index fa2c4f5f0cc5..c1b7ab1aa5d9 100644 --- a/workflow/controller/exec_control.go +++ b/workflow/controller/exec_control.go @@ -70,7 +70,7 @@ func (woc *wfOperationCtx) applyExecutionControl(pod *apiv1.Pod, wfNodesLock *sy return nil } - // Assign new Dealing value to PodExeCtl + // Assign new deadline value to PodExeCtl podExecCtl.Deadline = woc.workflowDeadline woc.log.Infof("Execution control for pod %s out-of-sync desired: %v, actual: %v", pod.Name, woc.workflowDeadline, podExecCtl.Deadline) diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index ab3ba4e96f44..12e3fd05d1e5 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -1362,19 +1362,23 @@ func getTemplateOutputsFromScope(tmpl *wfv1.Template, scope *wfScope) (*wfv1.Out return &outputs, nil } -func HasOutputResultRef(name string, parentTmpl *wfv1.Template) bool { +// hasOutputResultRef will check given template output has any reference +func hasOutputResultRef(name string, parentTmpl *wfv1.Template) bool { nodeName := "." + name + "." if parentTmpl.DAG != nil { + nodeOutputName := "{{tasks." + name + ".outputs.result}}" for _, dagTask := range parentTmpl.DAG.Tasks { - if strings.Contains(dagTask.When, nodeName) { + if dagTask.When == nodeOutputName { return true } for _, artf := range dagTask.Arguments.Parameters { - if strings.Contains(*artf.Value, "{{") && (strings.Contains(*artf.Value, nodeName) || (artf.ValueFrom != nil && strings.Contains(artf.ValueFrom.Parameter, nodeName))) { + if strings.Contains(*artf.Value, "{{") && + (strings.Contains(*artf.Value, nodeName) || + (artf.ValueFrom != nil && artf.ValueFrom.Parameter == nodeOutputName)) { return true } } @@ -1382,15 +1386,18 @@ func HasOutputResultRef(name string, parentTmpl *wfv1.Template) bool { } if parentTmpl.Steps != nil { + nodeOutputName := "{{steps." + name + ".outputs.result}}" for idx := range parentTmpl.Steps { stepGroup := parentTmpl.Steps[idx] for _, step := range stepGroup { - if strings.Contains(step.When, nodeName) { + if step.When == nodeOutputName { return true } for _, artf := range step.Arguments.Parameters { - if strings.Contains(*artf.Value, "{{") && (strings.Contains(*artf.Value, nodeName) || (artf.ValueFrom != nil && strings.Contains(artf.ValueFrom.Parameter, nodeName))) { + if strings.Contains(*artf.Value, "{{") && + (strings.Contains(*artf.Value, nodeName) || + (artf.ValueFrom != nil && artf.ValueFrom.Parameter == nodeOutputName)) { return true } } @@ -1399,7 +1406,13 @@ func HasOutputResultRef(name string, parentTmpl *wfv1.Template) bool { } for _, param := range parentTmpl.Outputs.Parameters { - if strings.Contains(*param.Value, "{{") && (strings.Contains(*param.Value, nodeName) || (param.ValueFrom != nil && strings.Contains(param.ValueFrom.Parameter, nodeName))) { + taskNodeOutputName := "{{tasks." + name + ".outputs.result}}" + stepNodeOutputName := "{{steps." + name + ".outputs.result}}" + + if strings.Contains(*param.Value, "{{") && + (strings.Contains(*param.Value, nodeName) || + (param.ValueFrom != nil && (param.ValueFrom.Parameter == taskNodeOutputName || + param.ValueFrom.Parameter == stepNodeOutputName))) { return true } } @@ -1407,7 +1420,8 @@ func HasOutputResultRef(name string, parentTmpl *wfv1.Template) bool { return false } -func GetStepOrDAGTaskName(nodeName string) string { +// getStepOrDAGTaskName will extract the node from NodeStatus Name +func getStepOrDAGTaskName(nodeName string) string { if strings.Contains(nodeName, ".") { name := nodeName[strings.LastIndex(nodeName, ".")+1:] // Check retry scenario @@ -1426,8 +1440,8 @@ func (woc *wfOperationCtx) executeScript(nodeName string, tmpl *wfv1.Template, b includeScriptOutput := false if parentTemplate != nil { - name := GetStepOrDAGTaskName(nodeName) - includeScriptOutput = HasOutputResultRef(name, parentTemplate) + name := getStepOrDAGTaskName(nodeName) + includeScriptOutput = hasOutputResultRef(name, parentTemplate) } fmt.Println(includeScriptOutput) node := woc.getNodeByName(nodeName) diff --git a/workflow/controller/operator_test.go b/workflow/controller/operator_test.go index b6891c436315..557c94381ec9 100644 --- a/workflow/controller/operator_test.go +++ b/workflow/controller/operator_test.go @@ -1220,12 +1220,12 @@ func TestGetNodeName(t *testing.T) { // Steps Workflow wf := unmarshalWF(stepScriptTmpl) - assert.True(t, HasOutputResultRef("resource-1", &wf.Spec.Templates[0])) - assert.False(t, HasOutputResultRef("resource-2", &wf.Spec.Templates[0])) + assert.True(t, hasOutputResultRef("resource-1", &wf.Spec.Templates[0])) + assert.False(t, hasOutputResultRef("resource-2", &wf.Spec.Templates[0])) // DAG workflow wf = unmarshalWF(dagScriptTmpl) - assert.True(t, HasOutputResultRef("A", &wf.Spec.Templates[0])) - assert.False(t, HasOutputResultRef("B", &wf.Spec.Templates[0])) + assert.True(t, hasOutputResultRef("A", &wf.Spec.Templates[0])) + assert.False(t, hasOutputResultRef("B", &wf.Spec.Templates[0])) } diff --git a/workflow/controller/workflowpod.go b/workflow/controller/workflowpod.go index 95a3348cc94d..4323aa1c5b6b 100644 --- a/workflow/controller/workflowpod.go +++ b/workflow/controller/workflowpod.go @@ -462,12 +462,14 @@ func (woc *wfOperationCtx) addMetadata(pod *apiv1.Pod, tmpl *wfv1.Template, incl execCtl.Deadline = woc.workflowDeadline } - execCtlBytes, err := json.Marshal(execCtl) - if err != nil { - panic(err) - } + if woc.workflowDeadline != nil || includeScriptOutput { + execCtlBytes, err := json.Marshal(execCtl) + if err != nil { + panic(err) + } - pod.ObjectMeta.Annotations[common.AnnotationKeyExecutionControl] = string(execCtlBytes) + pod.ObjectMeta.Annotations[common.AnnotationKeyExecutionControl] = string(execCtlBytes) + } } // addSchedulingConstraints applies any node selectors or affinity rules to the pod, either set in the workflow or the template diff --git a/workflow/executor/executor.go b/workflow/executor/executor.go index 934d403d71bf..698b36dc16eb 100644 --- a/workflow/executor/executor.go +++ b/workflow/executor/executor.go @@ -69,8 +69,6 @@ type WorkflowExecutor struct { // list of errors that occurred during execution. // the first of these is used as the overall message of the node errors []error - - ExcludeOutput bool } // ContainerRuntimeExecutor is the interface for interacting with a container runtime (e.g. docker) @@ -1103,58 +1101,6 @@ func LoadTemplate(path string) (*wfv1.Template, error) { return &tmpl, nil } -func GetAnnotationField(filePath string, key string) (string, error) { - // Read the annotation file - file, err := os.Open(filePath) - if err != nil { - log.Errorf("ERROR opening annotation file from %s", filePath) - return "", errors.InternalWrapError(err) - } - content := "" - defer func() { - _ = file.Close() - }() - reader := bufio.NewReader(file) - - // Prefix of key property in the annotation file - prefix := fmt.Sprintf("%s=", key) - - for { - // Read line-by-line - var buffer bytes.Buffer - var l []byte - var isPrefix bool - for { - l, isPrefix, err = reader.ReadLine() - buffer.Write(l) - // If we've reached the end of the line, stop reading. - if !isPrefix { - break - } - // If we're just at the EOF, break - if err != nil { - break - } - } - - line := buffer.String() - - // Read property - if strings.HasPrefix(line, prefix) { - // Trim the prefix - content = strings.TrimPrefix(line, prefix) - break - } - - // The end of the annotation file - if err == io.EOF { - break - } - } - - return content, nil -} - // unmarshalAnnotationField unmarshals the value of an annotation key into the supplied interface // from the downward api annotation volume file func unmarshalAnnotationField(filePath string, key string, into interface{}) error { From 4d65f7567a80e42df3627fc640451f92abdefaa0 Mon Sep 17 00:00:00 2001 From: Saravanan Balasubramanian Date: Mon, 22 Jul 2019 13:51:00 -0700 Subject: [PATCH 12/21] updated review comments --- cmd/argoexec/commands/wait.go | 3 +- workflow/controller/operator.go | 4 +- workflow/controller/operator_test.go | 98 +++++++++++++++++++++------- workflow/executor/executor.go | 5 ++ 4 files changed, 84 insertions(+), 26 deletions(-) diff --git a/cmd/argoexec/commands/wait.go b/cmd/argoexec/commands/wait.go index e0b826ceb252..1de38adb5ce2 100644 --- a/cmd/argoexec/commands/wait.go +++ b/cmd/argoexec/commands/wait.go @@ -57,7 +57,7 @@ func waitContainer() error { return err } - if wfExecutor.ExecutionControl != nil && wfExecutor.ExecutionControl.IncludeScriptOutput { + // Capture output script result err = wfExecutor.CaptureScriptResult() if err != nil { @@ -65,7 +65,6 @@ func waitContainer() error { return err } - } err = wfExecutor.AnnotateOutputs(logArt) if err != nil { diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index 12e3fd05d1e5..95adca43942d 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -1422,13 +1422,14 @@ func hasOutputResultRef(name string, parentTmpl *wfv1.Template) bool { // getStepOrDAGTaskName will extract the node from NodeStatus Name func getStepOrDAGTaskName(nodeName string) string { + fmt.Println("NodeName:",nodeName) if strings.Contains(nodeName, ".") { name := nodeName[strings.LastIndex(nodeName, ".")+1:] // Check retry scenario if strings.Contains(name, "(") { return name[0:strings.Index(name, "(")] } - return nodeName[strings.LastIndex(nodeName, ".")+1:] + return name } return nodeName } @@ -1443,7 +1444,6 @@ func (woc *wfOperationCtx) executeScript(nodeName string, tmpl *wfv1.Template, b name := getStepOrDAGTaskName(nodeName) includeScriptOutput = hasOutputResultRef(name, parentTemplate) } - fmt.Println(includeScriptOutput) node := woc.getNodeByName(nodeName) if node != nil { diff --git a/workflow/controller/operator_test.go b/workflow/controller/operator_test.go index 557c94381ec9..a4d864f0ca8c 100644 --- a/workflow/controller/operator_test.go +++ b/workflow/controller/operator_test.go @@ -3,6 +3,7 @@ package controller import ( "fmt" "github.com/argoproj/argo/workflow/config" + "strings" "testing" wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" @@ -1164,20 +1165,37 @@ var stepScriptTmpl = ` apiVersion: argoproj.io/v1alpha1 kind: Workflow metadata: - name: resource-with-ownerreference-template + generateName: scripts-bash- spec: - entrypoint: start + entrypoint: bash-script-example templates: - - name: start + - name: bash-script-example steps: - - - name: resource-1 - template: resource-1 - - name: resource-2 - template: resource-2 + - - name: generate + template: gen-random-int + - - name: print + template: print-message arguments: - parameters: [{name: message, value: "{{steps.resource-1.output.result}}"}] - - name: resource-3 - template: resource-3 ` + parameters: + - name: message + value: "{{steps.generate.outputs.result}}" + + - name: gen-random-int + script: + image: debian:9.4 + command: [bash] + source: | + cat /dev/urandom | od -N2 -An -i | awk -v f=1 -v r=100 '{printf "%i\n", f + r * $1 / 65536}' + + - name: print-message + inputs: + parameters: + - name: message + container: + image: alpine:latest + command: [sh, -c] + args: ["echo result was: {{inputs.parameters.message}}"] +` var dagScriptTmpl = ` apiVersion: argoproj.io/v1alpha1 @@ -1200,7 +1218,6 @@ spec: arguments: parameters: [{name: message, value: A}] - name: B - dependencies: [A] template: echo arguments: parameters: [{name: message, value: B}] @@ -1209,23 +1226,60 @@ spec: template: echo arguments: parameters: [{name: message, value: "{{dag.A.output.result}}"}] - - name: D - dependencies: [B, C] - template: echo - arguments: - parameters: [{name: message, value: D}]` + - name: echo + script: + image: debian:9.4 + command: [bash] + source: | + cat /dev/urandom | od -N2 -An -i | awk -v f=1 -v r=100 '{printf "%i\n", f + r * $1 / 65536}'` -func TestGetNodeName(t *testing.T) { +func TestStepWFGetNodeName(t *testing.T) { - // Steps Workflow + controller := newController() + wfcset := controller.wfclientset.ArgoprojV1alpha1().Workflows("") + + // operate the workflow. it should create a pod. wf := unmarshalWF(stepScriptTmpl) + wf, err := wfcset.Create(wf) + assert.Nil(t, err) + assert.True(t, hasOutputResultRef("generate", &wf.Spec.Templates[0])) + assert.False(t, hasOutputResultRef("print-message", &wf.Spec.Templates[0])) + woc := newWorkflowOperationCtx(wf, controller) + woc.operate() + wf, err = wfcset.Get(wf.ObjectMeta.Name, metav1.GetOptions{}) + assert.Nil(t, err) + assert.Equal(t, 3, len(wf.Status.Nodes)) + for _, node := range wf.Status.Nodes { + if strings.Contains(node.Name, "generate") { + assert.True(t, getStepOrDAGTaskName(node.Name) == "generate") + } else if strings.Contains(node.Name, "print-message") { + assert.True(t, getStepOrDAGTaskName(node.Name) == "print-message") + } + } +} - assert.True(t, hasOutputResultRef("resource-1", &wf.Spec.Templates[0])) - assert.False(t, hasOutputResultRef("resource-2", &wf.Spec.Templates[0])) +func TestDAGWFGetNodeName(t *testing.T) { - // DAG workflow - wf = unmarshalWF(dagScriptTmpl) + controller := newController() + wfcset := controller.wfclientset.ArgoprojV1alpha1().Workflows("") + // operate the workflow. it should create a pod. + wf := unmarshalWF(dagScriptTmpl) + wf, err := wfcset.Create(wf) + assert.Nil(t, err) assert.True(t, hasOutputResultRef("A", &wf.Spec.Templates[0])) assert.False(t, hasOutputResultRef("B", &wf.Spec.Templates[0])) + woc := newWorkflowOperationCtx(wf, controller) + woc.operate() + wf, err = wfcset.Get(wf.ObjectMeta.Name, metav1.GetOptions{}) + assert.Nil(t, err) + assert.Equal(t, 3, len(wf.Status.Nodes)) + for _, node := range wf.Status.Nodes { + if strings.Contains(node.Name, ".A") { + assert.True(t, getStepOrDAGTaskName(node.Name) == "A") + } + if strings.Contains(node.Name, ".B") { + assert.True(t, getStepOrDAGTaskName(node.Name) == "B") + } + } } diff --git a/workflow/executor/executor.go b/workflow/executor/executor.go index 698b36dc16eb..2cd1de42010c 100644 --- a/workflow/executor/executor.go +++ b/workflow/executor/executor.go @@ -732,6 +732,11 @@ func (we *WorkflowExecutor) GetMainContainerID() (string, error) { // CaptureScriptResult will add the stdout of a script template as output result func (we *WorkflowExecutor) CaptureScriptResult() error { + + if we.ExecutionControl != nil && we.ExecutionControl.IncludeScriptOutput { + log.Infof("No Script output reference in workflow. Capturing script output ignored") + return nil + } if we.Template.Script == nil { return nil } From 08779f0ab28f9c4b8e3d8c8a0322db6855d58a75 Mon Sep 17 00:00:00 2001 From: Saravanan Balasubramanian Date: Mon, 22 Jul 2019 14:22:03 -0700 Subject: [PATCH 13/21] Update wait.go --- cmd/argoexec/commands/wait.go | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/cmd/argoexec/commands/wait.go b/cmd/argoexec/commands/wait.go index 1de38adb5ce2..0fd10630725b 100644 --- a/cmd/argoexec/commands/wait.go +++ b/cmd/argoexec/commands/wait.go @@ -57,14 +57,12 @@ func waitContainer() error { return err } - - // Capture output script result - err = wfExecutor.CaptureScriptResult() - if err != nil { - wfExecutor.AddError(err) - return err - } - + // Capture output script result + err = wfExecutor.CaptureScriptResult() + if err != nil { + wfExecutor.AddError(err) + return err + } err = wfExecutor.AnnotateOutputs(logArt) if err != nil { From a8c5c97360558a0012fdb1a6d7f10e1143857b08 Mon Sep 17 00:00:00 2001 From: Saravanan Balasubramanian Date: Mon, 22 Jul 2019 15:12:15 -0700 Subject: [PATCH 14/21] Update operator.go --- workflow/controller/operator.go | 1 - 1 file changed, 1 deletion(-) diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index 95adca43942d..df2833d0a574 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -1422,7 +1422,6 @@ func hasOutputResultRef(name string, parentTmpl *wfv1.Template) bool { // getStepOrDAGTaskName will extract the node from NodeStatus Name func getStepOrDAGTaskName(nodeName string) string { - fmt.Println("NodeName:",nodeName) if strings.Contains(nodeName, ".") { name := nodeName[strings.LastIndex(nodeName, ".")+1:] // Check retry scenario From 413c1f579ef0cf233ca9f38ddbb3cc376473bbc9 Mon Sep 17 00:00:00 2001 From: Saravanan Balasubramanian Date: Mon, 22 Jul 2019 16:00:32 -0700 Subject: [PATCH 15/21] fixed merge issue --- workflow/controller/dag.go | 2 +- workflow/controller/operator_test.go | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/workflow/controller/dag.go b/workflow/controller/dag.go index 8d338c4ae891..80b17e3b6aaf 100644 --- a/workflow/controller/dag.go +++ b/workflow/controller/dag.go @@ -66,7 +66,7 @@ func (d *dagContext) assertBranchFinished(targetTaskName string) bool { // We should ensure that from the bottom to the top, // all the nodes of this branch have at least one failure. // If successful, we should continue to run down until the leaf node - taskNode := d.getTaskNode(targetTaskName) + taskNode := d.GetTaskNode(targetTaskName) if taskNode == nil { taskObject := d.getTask(targetTaskName) if taskObject != nil { diff --git a/workflow/controller/operator_test.go b/workflow/controller/operator_test.go index 90f17704d0b2..0de8bcdf54a3 100644 --- a/workflow/controller/operator_test.go +++ b/workflow/controller/operator_test.go @@ -1282,6 +1282,7 @@ func TestDAGWFGetNodeName(t *testing.T) { assert.True(t, getStepOrDAGTaskName(node.Name) == "B") } } +} var withParamAsJsonList = ` apiVersion: argoproj.io/v1alpha1 @@ -1314,7 +1315,7 @@ spec: args: ["echo result was: {{inputs.parameters.message}}"] ` -func TestWithParamAsJsonList(t *testing.T) { +func TestWithParamAsJsonList(t *testing.T){ controller := newController() wfcset := controller.wfclientset.ArgoprojV1alpha1().Workflows("") @@ -1327,4 +1328,4 @@ func TestWithParamAsJsonList(t *testing.T) { pods, err := controller.kubeclientset.CoreV1().Pods("").List(metav1.ListOptions{}) assert.Nil(t, err) assert.Equal(t, 4, len(pods.Items)) -} +} \ No newline at end of file From a70662d6f59ac091258d40cdad1fd2237c3d2aaa Mon Sep 17 00:00:00 2001 From: Saravanan Balasubramanian Date: Mon, 22 Jul 2019 16:31:32 -0700 Subject: [PATCH 16/21] Update operator_test.go --- workflow/controller/operator_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/workflow/controller/operator_test.go b/workflow/controller/operator_test.go index 0de8bcdf54a3..4ef124acebc4 100644 --- a/workflow/controller/operator_test.go +++ b/workflow/controller/operator_test.go @@ -1315,7 +1315,7 @@ spec: args: ["echo result was: {{inputs.parameters.message}}"] ` -func TestWithParamAsJsonList(t *testing.T){ +func TestWithParamAsJsonList(t *testing.T) { controller := newController() wfcset := controller.wfclientset.ArgoprojV1alpha1().Workflows("") @@ -1328,4 +1328,4 @@ func TestWithParamAsJsonList(t *testing.T){ pods, err := controller.kubeclientset.CoreV1().Pods("").List(metav1.ListOptions{}) assert.Nil(t, err) assert.Equal(t, 4, len(pods.Items)) -} \ No newline at end of file +} From 164519b9301e9753898ee30696cfe17c3f28b86a Mon Sep 17 00:00:00 2001 From: Saravanan Balasubramanian Date: Wed, 24 Jul 2019 18:38:49 -0700 Subject: [PATCH 17/21] updated review comments --- workflow/controller/operator.go | 66 +++++++++------------------------ workflow/executor/executor.go | 2 +- 2 files changed, 18 insertions(+), 50 deletions(-) diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index 726cd86a9b15..b319bcb4d380 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -1365,62 +1365,30 @@ func getTemplateOutputsFromScope(tmpl *wfv1.Template, scope *wfScope) (*wfv1.Out return &outputs, nil } -// hasOutputResultRef will check given template output has any reference -func hasOutputResultRef(name string, parentTmpl *wfv1.Template) bool { - - nodeName := "." + name + "." - - if parentTmpl.DAG != nil { - nodeOutputName := "{{tasks." + name + ".outputs.result}}" - for _, dagTask := range parentTmpl.DAG.Tasks { - - if dagTask.When == nodeOutputName { - return true - } - - for _, artf := range dagTask.Arguments.Parameters { - if strings.Contains(*artf.Value, "{{") && - (strings.Contains(*artf.Value, nodeName) || - (artf.ValueFrom != nil && artf.ValueFrom.Parameter == nodeOutputName)) { - return true - } - } - } +func checkVariableRefInTmpl(template *wfv1.Template, variable string) bool { + jsonValue, err := json.Marshal(template) + if err != nil { + log.Warnf("Unable to marshal the template. %v, %v", template, err) } + jsonStr := string(jsonValue) - if parentTmpl.Steps != nil { - nodeOutputName := "{{steps." + name + ".outputs.result}}" - for idx := range parentTmpl.Steps { - stepGroup := parentTmpl.Steps[idx] - for _, step := range stepGroup { - - if step.When == nodeOutputName { - return true - } - for _, artf := range step.Arguments.Parameters { - if strings.Contains(*artf.Value, "{{") && - (strings.Contains(*artf.Value, nodeName) || - (artf.ValueFrom != nil && artf.ValueFrom.Parameter == nodeOutputName)) { - return true - } - } - } - } + if strings.Contains(jsonStr, variable) { + return true } + return false +} - for _, param := range parentTmpl.Outputs.Parameters { - taskNodeOutputName := "{{tasks." + name + ".outputs.result}}" - stepNodeOutputName := "{{steps." + name + ".outputs.result}}" +// hasOutputResultRef will check given template output has any reference +func hasOutputResultRef(name string, parentTmpl *wfv1.Template) bool { - if strings.Contains(*param.Value, "{{") && - (strings.Contains(*param.Value, nodeName) || - (param.ValueFrom != nil && (param.ValueFrom.Parameter == taskNodeOutputName || - param.ValueFrom.Parameter == stepNodeOutputName))) { - return true - } + var variableRefName string + if parentTmpl.DAG != nil { + variableRefName = "{{tasks." + name + ".outputs.result}}" + } else if parentTmpl.Steps != nil { + variableRefName = "{{steps." + name + ".outputs.result}}" } - return false + return checkVariableRefInTmpl(parentTmpl, variableRefName) } // getStepOrDAGTaskName will extract the node from NodeStatus Name diff --git a/workflow/executor/executor.go b/workflow/executor/executor.go index dca071de4f88..9c6084d06a94 100644 --- a/workflow/executor/executor.go +++ b/workflow/executor/executor.go @@ -733,7 +733,7 @@ func (we *WorkflowExecutor) GetMainContainerID() (string, error) { // CaptureScriptResult will add the stdout of a script template as output result func (we *WorkflowExecutor) CaptureScriptResult() error { - if we.ExecutionControl != nil && we.ExecutionControl.IncludeScriptOutput { + if we.ExecutionControl == nil || !we.ExecutionControl.IncludeScriptOutput { log.Infof("No Script output reference in workflow. Capturing script output ignored") return nil } From a4a79b3a8998596d9cef78417debea34dac3fd33 Mon Sep 17 00:00:00 2001 From: Saravanan Balasubramanian Date: Wed, 24 Jul 2019 18:59:55 -0700 Subject: [PATCH 18/21] updated comments --- workflow/controller/operator.go | 10 ++++++---- workflow/controller/operator_test.go | 12 +++++------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index b319bcb4d380..ffb461186f9c 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -1392,12 +1392,14 @@ func hasOutputResultRef(name string, parentTmpl *wfv1.Template) bool { } // getStepOrDAGTaskName will extract the node from NodeStatus Name -func getStepOrDAGTaskName(nodeName string) string { +func getStepOrDAGTaskName(nodeName string, hasRetryStrategy bool) string { if strings.Contains(nodeName, ".") { name := nodeName[strings.LastIndex(nodeName, ".")+1:] // Check retry scenario - if strings.Contains(name, "(") { - return name[0:strings.Index(name, "(")] + if hasRetryStrategy { + if indx := strings.LastIndex(name, "("); indx > 0 { + return name[0:indx] + } } return name } @@ -1411,7 +1413,7 @@ func (woc *wfOperationCtx) executeScript(nodeName string, tmpl *wfv1.Template, b includeScriptOutput := false if parentTemplate != nil { - name := getStepOrDAGTaskName(nodeName) + name := getStepOrDAGTaskName(nodeName, tmpl.RetryStrategy != nil) includeScriptOutput = hasOutputResultRef(name, parentTemplate) } node := woc.getNodeByName(nodeName) diff --git a/workflow/controller/operator_test.go b/workflow/controller/operator_test.go index 4ef124acebc4..cad23bcb3c7f 100644 --- a/workflow/controller/operator_test.go +++ b/workflow/controller/operator_test.go @@ -1225,7 +1225,7 @@ spec: dependencies: [A] template: echo arguments: - parameters: [{name: message, value: "{{dag.A.output.result}}"}] + parameters: [{name: message, value: "{{tasks.A.outputs.result}}"}] - name: echo script: image: debian:9.4 @@ -1248,12 +1248,11 @@ func TestStepWFGetNodeName(t *testing.T) { woc.operate() wf, err = wfcset.Get(wf.ObjectMeta.Name, metav1.GetOptions{}) assert.Nil(t, err) - assert.Equal(t, 3, len(wf.Status.Nodes)) for _, node := range wf.Status.Nodes { if strings.Contains(node.Name, "generate") { - assert.True(t, getStepOrDAGTaskName(node.Name) == "generate") + assert.True(t, getStepOrDAGTaskName(node.Name, &wf.Spec.Templates[0].RetryStrategy != nil) == "generate") } else if strings.Contains(node.Name, "print-message") { - assert.True(t, getStepOrDAGTaskName(node.Name) == "print-message") + assert.True(t, getStepOrDAGTaskName(node.Name, &wf.Spec.Templates[0].RetryStrategy != nil ) == "print-message") } } } @@ -1273,13 +1272,12 @@ func TestDAGWFGetNodeName(t *testing.T) { woc.operate() wf, err = wfcset.Get(wf.ObjectMeta.Name, metav1.GetOptions{}) assert.Nil(t, err) - assert.Equal(t, 3, len(wf.Status.Nodes)) for _, node := range wf.Status.Nodes { if strings.Contains(node.Name, ".A") { - assert.True(t, getStepOrDAGTaskName(node.Name) == "A") + assert.True(t, getStepOrDAGTaskName(node.Name, wf.Spec.Templates[0].RetryStrategy != nil) == "A") } if strings.Contains(node.Name, ".B") { - assert.True(t, getStepOrDAGTaskName(node.Name) == "B") + assert.True(t, getStepOrDAGTaskName(node.Name, wf.Spec.Templates[0].RetryStrategy != nil) == "B") } } } From 737c59567aa20a907f940f5e7c5df2a79962577f Mon Sep 17 00:00:00 2001 From: Saravanan Balasubramanian Date: Thu, 25 Jul 2019 07:12:54 -0700 Subject: [PATCH 19/21] updated --- cmd/argoexec/commands/wait.go | 2 -- workflow/controller/operator.go | 5 +---- 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/cmd/argoexec/commands/wait.go b/cmd/argoexec/commands/wait.go index 0fd10630725b..3e475d9ba2ef 100644 --- a/cmd/argoexec/commands/wait.go +++ b/cmd/argoexec/commands/wait.go @@ -56,14 +56,12 @@ func waitContainer() error { wfExecutor.AddError(err) return err } - // Capture output script result err = wfExecutor.CaptureScriptResult() if err != nil { wfExecutor.AddError(err) return err } - err = wfExecutor.AnnotateOutputs(logArt) if err != nil { wfExecutor.AddError(err) diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index ffb461186f9c..30841bc108fb 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -1372,10 +1372,7 @@ func checkVariableRefInTmpl(template *wfv1.Template, variable string) bool { } jsonStr := string(jsonValue) - if strings.Contains(jsonStr, variable) { - return true - } - return false + return strings.Contains(jsonStr, variable) } // hasOutputResultRef will check given template output has any reference From f21ca515f02f3a0d4948c20e2a32c89f8d9a7cea Mon Sep 17 00:00:00 2001 From: Saravanan Balasubramanian Date: Thu, 25 Jul 2019 07:59:57 -0700 Subject: [PATCH 20/21] Update operator_test.go --- workflow/controller/operator_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/workflow/controller/operator_test.go b/workflow/controller/operator_test.go index cad23bcb3c7f..c8e0f3b55667 100644 --- a/workflow/controller/operator_test.go +++ b/workflow/controller/operator_test.go @@ -1252,7 +1252,7 @@ func TestStepWFGetNodeName(t *testing.T) { if strings.Contains(node.Name, "generate") { assert.True(t, getStepOrDAGTaskName(node.Name, &wf.Spec.Templates[0].RetryStrategy != nil) == "generate") } else if strings.Contains(node.Name, "print-message") { - assert.True(t, getStepOrDAGTaskName(node.Name, &wf.Spec.Templates[0].RetryStrategy != nil ) == "print-message") + assert.True(t, getStepOrDAGTaskName(node.Name, &wf.Spec.Templates[0].RetryStrategy != nil) == "print-message") } } } From c4d2b50b75febe75cf6f0b07f94dd4a3b0c1b2a2 Mon Sep 17 00:00:00 2001 From: Saravanan Balasubramanian Date: Mon, 29 Jul 2019 15:39:46 -0700 Subject: [PATCH 21/21] updated comments --- cmd/argoexec/commands/wait.go | 1 - workflow/controller/operator.go | 17 ++++++----------- 2 files changed, 6 insertions(+), 12 deletions(-) diff --git a/cmd/argoexec/commands/wait.go b/cmd/argoexec/commands/wait.go index 3e475d9ba2ef..1e933f29fdcf 100644 --- a/cmd/argoexec/commands/wait.go +++ b/cmd/argoexec/commands/wait.go @@ -67,6 +67,5 @@ func waitContainer() error { wfExecutor.AddError(err) return err } - return nil } diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index 30841bc108fb..48862e04dc41 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -1365,16 +1365,6 @@ func getTemplateOutputsFromScope(tmpl *wfv1.Template, scope *wfScope) (*wfv1.Out return &outputs, nil } -func checkVariableRefInTmpl(template *wfv1.Template, variable string) bool { - jsonValue, err := json.Marshal(template) - if err != nil { - log.Warnf("Unable to marshal the template. %v, %v", template, err) - } - jsonStr := string(jsonValue) - - return strings.Contains(jsonStr, variable) -} - // hasOutputResultRef will check given template output has any reference func hasOutputResultRef(name string, parentTmpl *wfv1.Template) bool { @@ -1385,7 +1375,12 @@ func hasOutputResultRef(name string, parentTmpl *wfv1.Template) bool { variableRefName = "{{steps." + name + ".outputs.result}}" } - return checkVariableRefInTmpl(parentTmpl, variableRefName) + jsonValue, err := json.Marshal(parentTmpl) + if err != nil { + log.Warnf("Unable to marshal the template. %v, %v", parentTmpl, err) + } + + return strings.Contains(string(jsonValue), variableRefName) } // getStepOrDAGTaskName will extract the node from NodeStatus Name