Skip to content

Commit

Permalink
feat: Retry pending nodes (#2385)
Browse files Browse the repository at this point in the history
Co-authored-by: Simon Behar <simbeh7@gmail.com>
  • Loading branch information
jamhed and simster7 authored Mar 13, 2020
1 parent 7094433 commit 91d2988
Show file tree
Hide file tree
Showing 22 changed files with 679 additions and 345 deletions.
5 changes: 5 additions & 0 deletions api/openapi-spec/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -2431,6 +2431,11 @@
"title": "Resource template subtype which can run k8s resources",
"$ref": "#/definitions/io.argoproj.workflow.v1alpha1.ResourceTemplate"
},
"resubmitPendingPods": {
"type": "boolean",
"format": "boolean",
"title": "ResubmitPendingPods is a flag to enable resubmitting pods that remain Pending after initial submission"
},
"retryStrategy": {
"title": "RetryStrategy describes how to retry a template when it fails",
"$ref": "#/definitions/io.argoproj.workflow.v1alpha1.RetryStrategy"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
apiVersion: v1
data:
config: |
executor:
imagePullPolicy: IfNotPresent
resources:
requests:
cpu: 0.1
memory: 64Mi
limits:
cpu: 0.5
memory: 128Mi
artifactRepository:
archiveLogs: true
s3:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
apiVersion: v1
data:
config: |
executor:
imagePullPolicy: IfNotPresent
resources:
requests:
cpu: 0.1
memory: 64Mi
limits:
cpu: 0.5
memory: 128Mi
artifactRepository:
archiveLogs: true
s3:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
apiVersion: v1
data:
config: |
executor:
imagePullPolicy: IfNotPresent
resources:
requests:
cpu: 0.1
memory: 64Mi
limits:
cpu: 0.5
memory: 128Mi
artifactRepository:
archiveLogs: true
s3:
Expand Down
5 changes: 5 additions & 0 deletions pkg/apiclient/cronworkflow/cron-workflow.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -1374,6 +1374,11 @@
"podSpecPatch": {
"type": "string",
"description": "PodSpecPatch holds strategic merge patch to apply against the pod spec. Allows parameterization of\ncontainer fields which are not strings (e.g. resource limits)."
},
"resubmitPendingPods": {
"type": "boolean",
"format": "boolean",
"title": "ResubmitPendingPods is a flag to enable resubmitting pods that remain Pending after initial submission"
}
},
"title": "Template is a reusable and composable unit of execution in a workflow"
Expand Down
5 changes: 5 additions & 0 deletions pkg/apiclient/workflow/workflow.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -1694,6 +1694,11 @@
"podSpecPatch": {
"type": "string",
"description": "PodSpecPatch holds strategic merge patch to apply against the pod spec. Allows parameterization of\ncontainer fields which are not strings (e.g. resource limits)."
},
"resubmitPendingPods": {
"type": "boolean",
"format": "boolean",
"title": "ResubmitPendingPods is a flag to enable resubmitting pods that remain Pending after initial submission"
}
},
"title": "Template is a reusable and composable unit of execution in a workflow"
Expand Down
5 changes: 5 additions & 0 deletions pkg/apiclient/workflowarchive/workflow-archive.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -1163,6 +1163,11 @@
"podSpecPatch": {
"type": "string",
"description": "PodSpecPatch holds strategic merge patch to apply against the pod spec. Allows parameterization of\ncontainer fields which are not strings (e.g. resource limits)."
},
"resubmitPendingPods": {
"type": "boolean",
"format": "boolean",
"title": "ResubmitPendingPods is a flag to enable resubmitting pods that remain Pending after initial submission"
}
},
"title": "Template is a reusable and composable unit of execution in a workflow"
Expand Down
5 changes: 5 additions & 0 deletions pkg/apiclient/workflowtemplate/workflow-template.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -1204,6 +1204,11 @@
"podSpecPatch": {
"type": "string",
"description": "PodSpecPatch holds strategic merge patch to apply against the pod spec. Allows parameterization of\ncontainer fields which are not strings (e.g. resource limits)."
},
"resubmitPendingPods": {
"type": "boolean",
"format": "boolean",
"title": "ResubmitPendingPods is a flag to enable resubmitting pods that remain Pending after initial submission"
}
},
"title": "Template is a reusable and composable unit of execution in a workflow"
Expand Down
703 changes: 371 additions & 332 deletions pkg/apis/workflow/v1alpha1/generated.pb.go

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions pkg/apis/workflow/v1alpha1/generated.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 9 additions & 1 deletion pkg/apis/workflow/v1alpha1/workflow_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,9 @@ type Template struct {
// PodSpecPatch holds strategic merge patch to apply against the pod spec. Allows parameterization of
// container fields which are not strings (e.g. resource limits).
PodSpecPatch string `json:"podSpecPatch,omitempty" protobuf:"bytes,31,opt,name=podSpecPatch"`

// ResubmitPendingPods is a flag to enable resubmitting pods that remain Pending after initial submission
ResubmitPendingPods *bool `json:"resubmitPendingPods,omitempty" protobuf:"varint,34,opt,name=resubmitPendingPods"`
}

var _ TemplateHolder = &Template{}
Expand Down Expand Up @@ -1019,11 +1022,16 @@ func (in *WorkflowStatus) AnyActiveSuspendNode() bool {
return in.Nodes.Any(func(node NodeStatus) bool { return node.IsActiveSuspendNode() })
}

// Remove returns whether or not the node has completed execution
// Completed returns whether or not the node has completed execution
func (n NodeStatus) Completed() bool {
return isCompletedPhase(n.Phase) || n.IsDaemoned() && n.Phase != NodePending
}

// Pending returns whether or not the node is in pending state
func (n NodeStatus) Pending() bool {
return n.Phase == NodePending
}

// IsDaemoned returns whether or not the node is deamoned
func (n NodeStatus) IsDaemoned() bool {
if n.Daemoned == nil || !*n.Daemoned {
Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/workflow/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions test/e2e/fixtures/e2e_suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,19 @@ func (s *E2ESuite) DeleteResources(label string) {
panic(err)
}
}

// Delete all resourcequotas
rqList, err := s.KubeClient.CoreV1().ResourceQuotas(Namespace).List(metav1.ListOptions{LabelSelector: label})
if err != nil {
panic(err)
}
for _, rq := range rqList.Items {
log.WithField("resourcequota", rq.Name).Info("Deleting resource quota")
err = s.KubeClient.CoreV1().ResourceQuotas(Namespace).Delete(rq.Name, nil)
if err != nil {
panic(err)
}
}
}

func (s *E2ESuite) GetBasicAuthToken() string {
Expand Down
21 changes: 21 additions & 0 deletions test/e2e/fixtures/when.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ import (
"k8s.io/client-go/kubernetes"

log "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"

"github.com/argoproj/argo/persist/sqldb"
wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo/pkg/client/clientset/versioned/typed/workflow/v1alpha1"
"github.com/argoproj/argo/test/util"
"github.com/argoproj/argo/workflow/packer"
)

Expand All @@ -32,6 +34,7 @@ type When struct {
wfTemplateNames []string
cronWorkflowName string
kubeClient kubernetes.Interface
resourceQuota *corev1.ResourceQuota
}

func (w *When) SubmitWorkflow() *When {
Expand Down Expand Up @@ -173,6 +176,24 @@ func (w *When) RunCli(args []string, block func(t *testing.T, output string, err
return w
}

func (w *When) MemoryQuota(quota string) *When {
obj, err := util.CreateHardMemoryQuota(w.kubeClient, "argo", "memory-quota", quota)
if err != nil {
w.t.Fatal(err)
}
w.resourceQuota = obj
return w
}

func (w *When) DeleteQuota() *When {
err := util.DeleteQuota(w.kubeClient, w.resourceQuota)
if err != nil {
w.t.Fatal(err)
}
w.resourceQuota = nil
return w
}

func (w *When) Then() *Then {
return &Then{
t: w.t,
Expand Down
15 changes: 9 additions & 6 deletions test/e2e/functional/param-aggregation.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ spec:
parameters:
- name: message
value: "{{item}}"
withParam: "{{steps.divide-by-2.outputs.result}}"
withParam: "{{steps.divide-by-2.outputs.parameters}}"

# odd-or-even accepts a number and returns whether or not that number is odd or even
- name: odd-or-even
Expand Down Expand Up @@ -63,12 +63,15 @@ spec:
inputs:
parameters:
- name: num
script:
container:
image: alpine:latest
command: [sh, -x]
source: |
#!/bin/sh
echo $(({{inputs.parameters.num}}/2))
command: [sh, -c]
args: ["echo $(({{inputs.parameters.num}}/2)) > /tmp/res"]
outputs:
parameters:
- name: res
valueFrom:
path: /tmp/res

# whalesay prints a number using whalesay
- name: whalesay
Expand Down
113 changes: 110 additions & 3 deletions test/e2e/functional_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package e2e

import (
"regexp"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -178,7 +179,111 @@ func (s *FunctionalSuite) TestLoopEmptyParam() {
})
}

func (s *FunctionalSuite) TestparameterAggregation() {
// 128M is for argo executor
func (s *FunctionalSuite) TestPendingRetryWorkflow() {
s.Given().
Workflow(`
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: dag-limited-1
labels:
argo-e2e: true
spec:
entrypoint: dag
templates:
- name: cowsay
resubmitPendingPods: true
container:
image: cowsay:v1
command: [sh, -c]
args: ["cowsay a"]
resources:
limits:
memory: 128M
- name: dag
dag:
tasks:
- name: a
template: cowsay
- name: b
template: cowsay
`).
When().
MemoryQuota("130M").
SubmitWorkflow().
WaitForWorkflowToStart(5*time.Second).
WaitForWorkflowCondition(func(wf *wfv1.Workflow) bool {
a := wf.Status.Nodes.FindByDisplayName("a")
b := wf.Status.Nodes.FindByDisplayName("b")
return wfv1.NodePending == a.Phase &&
regexp.MustCompile(`^Pending \d+\.\d+s$`).MatchString(a.Message) &&
wfv1.NodePending == b.Phase &&
regexp.MustCompile(`^Pending \d+\.\d+s$`).MatchString(b.Message)
}, "pods pending", 20*time.Second).
DeleteQuota().
WaitForWorkflowCondition(func(wf *wfv1.Workflow) bool {
a := wf.Status.Nodes.FindByDisplayName("a")
b := wf.Status.Nodes.FindByDisplayName("b")
return wfv1.NodeSucceeded == a.Phase && wfv1.NodeSucceeded == b.Phase
}, "pods succeeded", 20*time.Second)
s.TearDownSuite()
}

// 128M is for argo executor
func (s *FunctionalSuite) TestPendingRetryWorkflowWithRetryStrategy() {
s.Given().
Workflow(`
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: dag-limited-2
labels:
argo-e2e: true
spec:
entrypoint: dag
templates:
- name: cowsay
resubmitPendingPods: true
retryStrategy:
limit: 1
container:
image: cowsay:v1
command: [sh, -c]
args: ["cowsay a"]
resources:
limits:
memory: 128M
- name: dag
dag:
tasks:
- name: a
template: cowsay
- name: b
template: cowsay
`).
When().
MemoryQuota("130M").
SubmitWorkflow().
WaitForWorkflowToStart(5*time.Second).
WaitForWorkflowCondition(func(wf *wfv1.Workflow) bool {
a := wf.Status.Nodes.FindByDisplayName("a(0)")
b := wf.Status.Nodes.FindByDisplayName("b(0)")
return wfv1.NodePending == a.Phase &&
regexp.MustCompile(`^Pending \d+\.\d+s$`).MatchString(a.Message) &&
wfv1.NodePending == b.Phase &&
regexp.MustCompile(`^Pending \d+\.\d+s$`).MatchString(b.Message)
}, "pods pending", 20*time.Second).
DeleteQuota().
WaitForWorkflowCondition(func(wf *wfv1.Workflow) bool {
a := wf.Status.Nodes.FindByDisplayName("a(0)")
b := wf.Status.Nodes.FindByDisplayName("b(0)")
return wfv1.NodeSucceeded == a.Phase && wfv1.NodeSucceeded == b.Phase
}, "pods succeeded", 20*time.Second)
s.TearDownSuite()
}

func (s *FunctionalSuite) TestParameterAggregation() {
s.Given().
Workflow("@functional/param-aggregation.yaml").
When().
Expand All @@ -187,8 +292,10 @@ func (s *FunctionalSuite) TestparameterAggregation() {
Then().
ExpectWorkflow(func(t *testing.T, _ *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
assert.Equal(t, wfv1.NodeSucceeded, status.Phase)
nodeStatus := status.Nodes.FindByDisplayName("print(0:1)")
assert.Equal(t, wfv1.NodeSucceeded, nodeStatus.Phase)
nodeStatus := status.Nodes.FindByDisplayName("print(0:res:1)")
if assert.NotNil(t, nodeStatus) {
assert.Equal(t, wfv1.NodeSucceeded, nodeStatus.Phase)
}
})
}

Expand Down
Loading

0 comments on commit 91d2988

Please sign in to comment.