Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Retry pending nodes #2385

Merged
merged 21 commits into from
Mar 13, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion pkg/apis/workflow/v1alpha1/workflow_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -987,11 +987,16 @@ func (in *WorkflowStatus) AnyActiveSuspendNode() bool {
return in.Nodes.Any(func(node NodeStatus) bool { return node.IsActiveSuspendNode() })
}

// Remove returns whether or not the node has completed execution
// Completed returns whether or not the node has completed execution
func (n NodeStatus) Completed() bool {
return isCompletedPhase(n.Phase) || n.IsDaemoned() && n.Phase != NodePending
}

// Pending returns whether or not the node has completed execution
func (n NodeStatus) Pending() bool {
return n.Phase == NodePending
}

// IsDaemoned returns whether or not the node is deamoned
func (n NodeStatus) IsDaemoned() bool {
if n.Daemoned == nil || !*n.Daemoned {
Expand Down
28 changes: 26 additions & 2 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -748,7 +748,7 @@ func (woc *wfOperationCtx) podReconciliation() error {
// It is now impossible to infer pod status. The only thing we can do at this point is to mark
// the node with Error.
for nodeID, node := range woc.wf.Status.Nodes {
if node.Type != wfv1.NodeTypePod || node.Completed() || node.StartedAt.IsZero() {
if node.Type != wfv1.NodeTypePod || node.Completed() || node.StartedAt.IsZero() || node.Pending() {
// node is not a pod, it is already complete, or it can be re-run.
continue
}
Expand Down Expand Up @@ -1338,6 +1338,16 @@ func (woc *wfOperationCtx) executeTemplate(nodeName string, orgTmpl wfv1.Templat
if err != nil {
return woc.markNodeError(retryNodeName, err), err
}
if lastChildNode != nil && lastChildNode.Pending() && processedTmpl.GetType() == wfv1.TemplateTypeContainer {
_, err := woc.createWorkflowPod(lastChildNode.Name, *processedTmpl.Container, processedTmpl, false)
if apierr.IsForbidden(err) {
return woc.markNodePending(lastChildNode.Name, err), nil
}
if err != nil {
return woc.markNodeError(lastChildNode.Name, err), err
}
return woc.markNodePhase(lastChildNode.Name, wfv1.NodeRunning), nil
}
if lastChildNode != nil && !lastChildNode.Completed() {
// Last child node is still running.
nodeName = lastChildNode.Name
Expand All @@ -1360,7 +1370,11 @@ func (woc *wfOperationCtx) executeTemplate(nodeName string, orgTmpl wfv1.Templat

switch processedTmpl.GetType() {
case wfv1.TemplateTypeContainer:
node, err = woc.executeContainer(nodeName, templateScope, processedTmpl, orgTmpl, boundaryID)
if node != nil && node.Pending() {
_, err = woc.createWorkflowPod(node.Name, *processedTmpl.Container, processedTmpl, false)
} else {
node, err = woc.executeContainer(nodeName, templateScope, processedTmpl, orgTmpl, boundaryID)
}
case wfv1.TemplateTypeSteps:
node, err = woc.executeSteps(nodeName, newTmplCtx, templateScope, processedTmpl, orgTmpl, boundaryID)
case wfv1.TemplateTypeScript:
Expand All @@ -1375,6 +1389,9 @@ func (woc *wfOperationCtx) executeTemplate(nodeName string, orgTmpl wfv1.Templat
err = errors.Errorf(errors.CodeBadRequest, "Template '%s' missing specification", processedTmpl.Name)
return woc.initializeNode(nodeName, wfv1.NodeTypeSkipped, templateScope, orgTmpl, boundaryID, wfv1.NodeError, err.Error()), err
}
if apierr.IsForbidden(err) {
return woc.markNodePending(node.Name, err), nil
}
if err != nil {
node = woc.markNodeError(node.Name, err)
// If retry policy is not set, or if it is not set to Always or OnError, we won't attempt to retry an errored container
Expand Down Expand Up @@ -1592,6 +1609,13 @@ func (woc *wfOperationCtx) markNodeError(nodeName string, err error) *wfv1.NodeS
return woc.markNodePhase(nodeName, wfv1.NodeError, err.Error())
}

// markNodePending is a convenience method to mark a node and set the message from the error
func (woc *wfOperationCtx) markNodePending(nodeName string, err error) *wfv1.NodeStatus {
woc.log.Infof("Mark node %s as Pending, due to: %+v", nodeName, err)
node := woc.getNodeByName(nodeName)
return woc.markNodePhase(nodeName, wfv1.NodePending, fmt.Sprintf("Pending %s", time.Since(node.StartedAt.Time)))
}

// checkParallelism checks if the given template is able to be executed, considering the current active pods and workflow/template parallelism
func (woc *wfOperationCtx) checkParallelism(tmpl *wfv1.Template, node *wfv1.NodeStatus, boundaryID string) error {
if woc.wf.Spec.Parallelism != nil && woc.activePods >= *woc.wf.Spec.Parallelism {
Expand Down
3 changes: 3 additions & 0 deletions workflow/controller/workflowpod.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,9 @@ func (woc *wfOperationCtx) createWorkflowPod(nodeName string, mainCtr apiv1.Cont
woc.log.Infof("Skipped pod %s (%s) creation: already exists", nodeName, nodeID)
return created, nil
}
if apierr.IsForbidden(err) {
return nil, err
}
woc.log.Infof("Failed to create pod %s (%s): %v", nodeName, nodeID, err)
return nil, errors.InternalWrapError(err)
}
Expand Down