Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented Conditionally annotate outputs of script template only when consumed #1359 #1462

Merged
merged 34 commits into from
Jul 30, 2019
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
00f08b1
CheckandEstimate implementation
sarabala1979 Apr 4, 2019
e7ac42a
fixed variable rename
sarabala1979 Apr 4, 2019
d956bfc
fixed gofmt
sarabala1979 Apr 5, 2019
be9b3b5
fixed feedbacks
sarabala1979 Apr 5, 2019
0d15503
merge
sarabala1979 Apr 9, 2019
d967df3
Merge branch 'master' of https://github.com/argoproj/argo
sarabala1979 Apr 9, 2019
4c9b645
Merge branch 'master' of https://github.com/argoproj/argo
sarabala1979 Apr 10, 2019
c40cc5f
Merge branch 'master' of https://github.com/argoproj/argo
sarabala1979 Apr 11, 2019
51e2d5f
Merge branch 'master' of https://github.com/argoproj/argo
sarabala1979 Apr 26, 2019
5f41f3e
Merge branch 'master' of https://github.com/argoproj/argo
sarabala1979 May 9, 2019
9a95a48
Merge branch 'master' of https://github.com/argoproj/argo
sarabala1979 May 31, 2019
6a080b5
Merge branch 'master' of https://github.com/argoproj/argo
sarabala1979 Jun 7, 2019
0a48649
Merge branch 'master' of https://github.com/argoproj/argo
sarabala1979 Jun 11, 2019
7b273ab
Merge branch 'master' of https://github.com/argoproj/argo
sarabala1979 Jun 17, 2019
0ba8910
Merge remote-tracking branch 'upstream/master' into Issue1359
sarabala1979 Jul 1, 2019
b0e4288
Implement the exclude script output in step and DAG workflow
sarabala1979 Jul 3, 2019
14af0f7
Update operator.go
sarabala1979 Jul 3, 2019
e94f5b6
Update workflowpod.go
sarabala1979 Jul 3, 2019
9693af7
updated Review Comments
sarabala1979 Jul 16, 2019
730c763
Update exec_control.go
sarabala1979 Jul 17, 2019
3a7a544
fixed typo
sarabala1979 Jul 19, 2019
9fd3109
Updated Review comments
sarabala1979 Jul 19, 2019
4d65f75
updated review comments
sarabala1979 Jul 22, 2019
08779f0
Update wait.go
sarabala1979 Jul 22, 2019
a8c5c97
Update operator.go
sarabala1979 Jul 22, 2019
c9716aa
Merge remote-tracking branch 'upstream/master' into Issue1359
sarabala1979 Jul 22, 2019
2d6b309
Merge branch 'master' into Issue1359
sarabala1979 Jul 22, 2019
413c1f5
fixed merge issue
sarabala1979 Jul 22, 2019
a70662d
Update operator_test.go
sarabala1979 Jul 22, 2019
164519b
updated review comments
sarabala1979 Jul 25, 2019
a4a79b3
updated comments
sarabala1979 Jul 25, 2019
737c595
updated
sarabala1979 Jul 25, 2019
f21ca51
Update operator_test.go
sarabala1979 Jul 25, 2019
c4d2b50
updated comments
sarabala1979 Jul 29, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 29 additions & 11 deletions cmd/argoexec/commands/wait.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package commands

import (
"github.com/argoproj/argo/workflow/common"
"github.com/argoproj/argo/workflow/executor"
"strconv"
"strings"
"time"

"github.com/argoproj/pkg/stats"
Expand Down Expand Up @@ -28,8 +32,18 @@ func waitContainer() error {
defer stats.LogStats()
stats.StartStatsTicker(5 * time.Minute)

includeOutput, err := executor.GetAnnotationField(wfExecutor.PodAnnotationsPath, common.AnnotationIncludeOutputs)
if err != nil {
wfExecutor.AddError(err)
}

includeOutputFlag, err := strconv.ParseBool(strings.Trim(includeOutput, "\""))

if err != nil {
wfExecutor.AddError(err)
}
// Wait for main container to complete
err := wfExecutor.Wait()
err = wfExecutor.Wait()
if err != nil {
wfExecutor.AddError(err)
// do not return here so we can still try to kill sidecars & save outputs
Expand All @@ -56,16 +70,20 @@ func waitContainer() error {
wfExecutor.AddError(err)
return err
}
// Capture output script result
err = wfExecutor.CaptureScriptResult()
if err != nil {
wfExecutor.AddError(err)
return err
}
err = wfExecutor.AnnotateOutputs(logArt)
if err != nil {
wfExecutor.AddError(err)
return err

if includeOutputFlag {
// Capture output script result
err = wfExecutor.CaptureScriptResult()
if err != nil {
wfExecutor.AddError(err)
return err
}

err = wfExecutor.AnnotateOutputs(logArt)
if err != nil {
wfExecutor.AddError(err)
return err
}
}
return nil
}
4 changes: 4 additions & 0 deletions workflow/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ const (

// AnnotationKeyNodeName is the pod metadata annotation key containing the workflow node name
AnnotationKeyNodeName = workflow.FullName + "/node-name"

// AnnotationIncludeOutputs is the pod metadata annotation key containing flag to include script output
AnnotationIncludeOutputs = workflow.FullName + "/include-output"

// AnnotationKeyNodeMessage is the pod metadata annotation key the executor will use to
// communicate errors encountered by the executor during artifact load/save, etc...
AnnotationKeyNodeMessage = workflow.FullName + "/node-message"
Expand Down
75 changes: 72 additions & 3 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1290,7 +1290,7 @@ func (woc *wfOperationCtx) executeContainer(nodeName string, tmpl *wfv1.Template
return node
}
woc.log.Debugf("Executing node %s with container template: %v\n", nodeName, tmpl)
_, err := woc.createWorkflowPod(nodeName, *tmpl.Container, tmpl)
_, err := woc.createWorkflowPod(nodeName, *tmpl.Container, tmpl, false)
if err != nil {
return woc.initializeNode(nodeName, wfv1.NodeTypePod, tmpl.Name, boundaryID, wfv1.NodeError, err.Error())
}
Expand Down Expand Up @@ -1362,14 +1362,83 @@ func getTemplateOutputsFromScope(tmpl *wfv1.Template, scope *wfScope) (*wfv1.Out
return &outputs, nil
}

func HasOutputResultRef(name string, parentTmpl *wfv1.Template) bool {

nodeName := "." + name + "."
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These strings.Contains checks against nodeName are very fragile. We should be able to formulate the explicit {{steps.XXX.output.result}} string to search for.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is nodeName still defined when you have the nodeOutputName below?

fmt.Println(nodeName)

if parentTmpl.DAG != nil {
for _, dagTask := range parentTmpl.DAG.Tasks {

if strings.Contains(dagTask.When, nodeName) {
return true
}

for _, artf := range dagTask.Arguments.Parameters {
if strings.Contains(*artf.Value, "{{") && (strings.Contains(*artf.Value, nodeName) || (artf.ValueFrom != nil && strings.Contains(artf.ValueFrom.Parameter, nodeName))) {
return true
}
}
}
}

if parentTmpl.Steps != nil {
for idx := range parentTmpl.Steps {
stepGroup := parentTmpl.Steps[idx]
for _, step := range stepGroup {

if strings.Contains(step.When, nodeName) {
return true
}
for _, artf := range step.Arguments.Parameters {
if strings.Contains(*artf.Value, "{{") && (strings.Contains(*artf.Value, nodeName) || (artf.ValueFrom != nil && strings.Contains(artf.ValueFrom.Parameter, nodeName))) {
return true
}
}
}
}
}

for _, param := range parentTmpl.Outputs.Parameters {
if strings.Contains(*param.Value, "{{") && (strings.Contains(*param.Value, nodeName) || (param.ValueFrom != nil && strings.Contains(param.ValueFrom.Parameter, nodeName))) {
return true
}
}

return false
}

func GetStepOrDAGTaskName(nodeName string) string {
if strings.Contains(nodeName, ".") {
name := nodeName[strings.LastIndex(nodeName, ".")+1:]
// Check retry scenario
if strings.Contains(name, "(") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to do the parenthesis hack. We already have the template information and already know if it will be retried ahead of time.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to handle the retry scenario with "(". This function is getting NodeName: scripts-bash-g7vnp[0].generate(1). We need to get step/task name from nodename.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resolved it

return name[0:strings.Index(name, "(")]
}
return nodeName[strings.LastIndex(nodeName, ".")+1:]
}
return nodeName
}

func (woc *wfOperationCtx) executeScript(nodeName string, tmpl *wfv1.Template, boundaryID string) *wfv1.NodeStatus {

boundaryNode := woc.wf.Status.Nodes[boundaryID]
parentTemplate := woc.wf.GetTemplate(boundaryNode.TemplateName)

includeScriptOutput := false
if parentTemplate != nil {
name := GetStepOrDAGTaskName(nodeName)
includeScriptOutput = HasOutputResultRef(name, parentTemplate)
}
fmt.Println(includeScriptOutput)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove your debug code

node := woc.getNodeByName(nodeName)

if node != nil {
return node
}
mainCtr := tmpl.Script.Container
mainCtr.Args = append(mainCtr.Args, common.ExecutorScriptSourcePath)
_, err := woc.createWorkflowPod(nodeName, mainCtr, tmpl)
_, err := woc.createWorkflowPod(nodeName, mainCtr, tmpl, includeScriptOutput)
if err != nil {
return woc.initializeNode(nodeName, wfv1.NodeTypePod, tmpl.Name, boundaryID, wfv1.NodeError, err.Error())
}
Expand Down Expand Up @@ -1604,7 +1673,7 @@ func (woc *wfOperationCtx) executeResource(nodeName string, tmpl *wfv1.Template,
mainCtr.VolumeMounts = []apiv1.VolumeMount{
volumeMountPodMetadata,
}
_, err = woc.createWorkflowPod(nodeName, *mainCtr, tmpl)
_, err = woc.createWorkflowPod(nodeName, *mainCtr, tmpl, false)
if err != nil {
return woc.initializeNode(nodeName, wfv1.NodeTypePod, tmpl.Name, boundaryID, wfv1.NodeError, err.Error())
}
Expand Down
70 changes: 70 additions & 0 deletions workflow/controller/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1159,3 +1159,73 @@ func TestResourceWithOwnerReferenceTemplate(t *testing.T) {
assert.Equal(t, "resource-with-ownerreference-template", objectMetas["resource-cm-3"].OwnerReferences[1].Name)
}
}

var stepScriptTmpl = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: resource-with-ownerreference-template
spec:
entrypoint: start
templates:
- name: start
steps:
- - name: resource-1
template: resource-1
- name: resource-2
template: resource-2
arguments:
parameters: [{name: message, value: "{{steps.resource-1.output.result}}"}]
- name: resource-3
template: resource-3 `

var dagScriptTmpl = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: dag-target-
spec:
entrypoint: dag-target
arguments:
parameters:
- name: target
value: E

templates:
- name: dag-target
dag:
tasks:
- name: A
template: echo
arguments:
parameters: [{name: message, value: A}]
- name: B
dependencies: [A]
template: echo
arguments:
parameters: [{name: message, value: B}]
- name: C
dependencies: [A]
template: echo
arguments:
parameters: [{name: message, value: "{{dag.A.output.result}}"}]
- name: D
dependencies: [B, C]
template: echo
arguments:
parameters: [{name: message, value: D}]`

func TestGetNodeName(t *testing.T) {

// Steps Workflow
wf := unmarshalWF(stepScriptTmpl)

assert.True(t, HasOutputResultRef("resource-1", &wf.Spec.Templates[0]))
assert.False(t, HasOutputResultRef("resource-2", &wf.Spec.Templates[0]))

// DAG workflow
wf = unmarshalWF(dagScriptTmpl)

assert.True(t, HasOutputResultRef("A", &wf.Spec.Templates[0]))
assert.False(t, HasOutputResultRef("B", &wf.Spec.Templates[0]))
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please include a test which verifies that the pod is created correctly with execution control set

5 changes: 3 additions & 2 deletions workflow/controller/workflowpod.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (woc *wfOperationCtx) getVolumeDockerSock() apiv1.Volume {
}
}

func (woc *wfOperationCtx) createWorkflowPod(nodeName string, mainCtr apiv1.Container, tmpl *wfv1.Template) (*apiv1.Pod, error) {
func (woc *wfOperationCtx) createWorkflowPod(nodeName string, mainCtr apiv1.Container, tmpl *wfv1.Template, includeScriptOutput bool) (*apiv1.Pod, error) {
nodeID := woc.wf.NodeID(nodeName)
woc.log.Debugf("Creating Pod: %s (%s)", nodeName, nodeID)
tmpl = tmpl.DeepCopy()
Expand All @@ -105,7 +105,8 @@ func (woc *wfOperationCtx) createWorkflowPod(nodeName string, mainCtr apiv1.Cont
common.LabelKeyCompleted: "false", // Allows filtering by incomplete workflow pods
},
Annotations: map[string]string{
common.AnnotationKeyNodeName: nodeName,
common.AnnotationKeyNodeName: nodeName,
common.AnnotationIncludeOutputs: strconv.FormatBool(includeScriptOutput),
},
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(woc.wf, wfv1.SchemaGroupVersionKind),
Expand Down
54 changes: 54 additions & 0 deletions workflow/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ type WorkflowExecutor struct {
// list of errors that occurred during execution.
// the first of these is used as the overall message of the node
errors []error

ExcludeOutput bool
}

// ContainerRuntimeExecutor is the interface for interacting with a container runtime (e.g. docker)
Expand Down Expand Up @@ -1101,6 +1103,58 @@ func LoadTemplate(path string) (*wfv1.Template, error) {
return &tmpl, nil
}

func GetAnnotationField(filePath string, key string) (string, error) {
// Read the annotation file
file, err := os.Open(filePath)
if err != nil {
log.Errorf("ERROR opening annotation file from %s", filePath)
return "", errors.InternalWrapError(err)
}
content := ""
defer func() {
_ = file.Close()
}()
reader := bufio.NewReader(file)

// Prefix of key property in the annotation file
prefix := fmt.Sprintf("%s=", key)

for {
// Read line-by-line
var buffer bytes.Buffer
var l []byte
var isPrefix bool
for {
l, isPrefix, err = reader.ReadLine()
buffer.Write(l)
// If we've reached the end of the line, stop reading.
if !isPrefix {
break
}
// If we're just at the EOF, break
if err != nil {
break
}
}

line := buffer.String()

// Read property
if strings.HasPrefix(line, prefix) {
// Trim the prefix
content = strings.TrimPrefix(line, prefix)
break
}

// The end of the annotation file
if err == io.EOF {
break
}
}

return content, nil
}

// unmarshalAnnotationField unmarshals the value of an annotation key into the supplied interface
// from the downward api annotation volume file
func unmarshalAnnotationField(filePath string, key string, into interface{}) error {
Expand Down