Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend workflow controller to handle creating pods in namespaces with a resource quota and limit range #1096

Closed
wants to merge 11 commits into from
73 changes: 62 additions & 11 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ var (
// for before requeuing the workflow onto the workqueue.
const maxOperationTime time.Duration = 10 * time.Second

// exceededQuota is a string used to check for errors during pod creation that are the result of a resource quota.
const exceededQuota string = "exceeded quota"

// newWorkflowOperationCtx creates and initializes a new wfOperationCtx object.
func newWorkflowOperationCtx(wf *wfv1.Workflow, wfc *WorkflowController) *wfOperationCtx {
// NEVER modify objects from the store. It's a read-only, local cache.
Expand Down Expand Up @@ -456,8 +459,8 @@ func (woc *wfOperationCtx) podReconciliation() error {
// It is now impossible to infer pod status. The only thing we can do at this point is to mark
// the node with Error.
for nodeID, node := range woc.wf.Status.Nodes {
if node.Type != wfv1.NodeTypePod || node.Completed() {
// node is not a pod, or it is already complete
if node.Type != wfv1.NodeTypePod || node.Completed() || strings.Contains(node.Message, exceededQuota) {
// node is not a pod, or it is already complete, or it failed to create because of a quota
continue
}
if _, ok := seenPods[nodeID]; !ok {
Expand Down Expand Up @@ -1160,16 +1163,32 @@ func (woc *wfOperationCtx) checkParallelism(tmpl *wfv1.Template, node *wfv1.Node
}

func (woc *wfOperationCtx) executeContainer(nodeName string, tmpl *wfv1.Template, boundaryID string) *wfv1.NodeStatus {
var skipNodeInitialization bool
node := woc.getNodeByName(nodeName)
if node != nil {
return node
if strings.Contains(node.Message, exceededQuota) {
skipNodeInitialization = true
woc.log.Infof("Node %s exists but a pod does not: %s", nodeName, node.Message)
} else {
return node
}
}
woc.log.Debugf("Executing node %s with container template: %v\n", nodeName, tmpl)
var messages []string
_, err := woc.createWorkflowPod(nodeName, *tmpl.Container, tmpl)
if err != nil {
return woc.initializeNode(nodeName, wfv1.NodeTypePod, tmpl.Name, boundaryID, wfv1.NodeError, err.Error())
if strings.Contains(err.Error(), exceededQuota) {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move the strings.Contains call into a function called ExceededQuota or something?

messages = append(messages, exceededQuota)
woc.log.Infof("Failed to create pod due to a lack of resources. It will be marked pending: %v", err)
} else {
return woc.initializeNode(nodeName, wfv1.NodeTypePod, tmpl.Name, boundaryID, wfv1.NodeError, err.Error())
}
}
return woc.initializeNode(nodeName, wfv1.NodeTypePod, tmpl.Name, boundaryID, wfv1.NodePending)
if skipNodeInitialization {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this branch is only reached when a node exists but the pod failed to create because of an exceededQuota error. Should a sleep be added here to avoid slamming the master with Pod create requests?

woc.updated = true
return node
}
return woc.initializeNode(nodeName, wfv1.NodeTypePod, tmpl.Name, boundaryID, wfv1.NodePending, messages...)
}

func (woc *wfOperationCtx) getOutboundNodes(nodeID string) []string {
Expand Down Expand Up @@ -1231,17 +1250,33 @@ func getTemplateOutputsFromScope(tmpl *wfv1.Template, scope *wfScope) (*wfv1.Out
}

func (woc *wfOperationCtx) executeScript(nodeName string, tmpl *wfv1.Template, boundaryID string) *wfv1.NodeStatus {
var skipNodeInitialization bool
node := woc.getNodeByName(nodeName)
if node != nil {
return node
if strings.Contains(node.Message, exceededQuota) {
skipNodeInitialization = true
woc.log.Infof("Node %s exists but a pod does not: %s", nodeName, node.Message)
} else {
return node
}
}
mainCtr := tmpl.Script.Container
mainCtr.Args = append(mainCtr.Args, common.ExecutorScriptSourcePath)
var messages []string
_, err := woc.createWorkflowPod(nodeName, mainCtr, tmpl)
if err != nil {
return woc.initializeNode(nodeName, wfv1.NodeTypePod, tmpl.Name, boundaryID, wfv1.NodeError, err.Error())
if strings.Contains(err.Error(), exceededQuota) {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move the strings.Contains call into a function called ExceededQuota or something?

messages = append(messages, exceededQuota)
woc.log.Infof("Failed to create pod due to a lack of resources. It will be marked pending: %v", err)
} else {
return woc.initializeNode(nodeName, wfv1.NodeTypePod, tmpl.Name, boundaryID, wfv1.NodeError, err.Error())
}
}
if skipNodeInitialization {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this branch is only reached when a node exists but the pod failed to create because of an exceededQuota error. Should a sleep be added here to avoid slamming the master with Pod create requests?

woc.updated = true
return node
}
return woc.initializeNode(nodeName, wfv1.NodeTypePod, tmpl.Name, boundaryID, wfv1.NodePending)
return woc.initializeNode(nodeName, wfv1.NodeTypePod, tmpl.Name, boundaryID, wfv1.NodePending, messages...)
}

// processNodeOutputs adds all of a nodes outputs to the local scope with the given prefix, as well
Expand Down Expand Up @@ -1437,9 +1472,15 @@ func (woc *wfOperationCtx) addChildNode(parent string, child string) {

// executeResource is runs a kubectl command against a manifest
func (woc *wfOperationCtx) executeResource(nodeName string, tmpl *wfv1.Template, boundaryID string) *wfv1.NodeStatus {
var skipNodeInitialization bool
node := woc.getNodeByName(nodeName)
if node != nil {
return node
if strings.Contains(node.Message, exceededQuota) {
skipNodeInitialization = true
woc.log.Infof("Node %s exists but a pod does not: '%s'", nodeName, node.Message)
} else {
return node
}
}
mainCtr := apiv1.Container{
Image: woc.controller.executorImage(),
Expand All @@ -1450,11 +1491,21 @@ func (woc *wfOperationCtx) executeResource(nodeName string, tmpl *wfv1.Template,
},
Env: execEnvVars,
}
var messages []string
_, err := woc.createWorkflowPod(nodeName, mainCtr, tmpl)
if err != nil {
return woc.initializeNode(nodeName, wfv1.NodeTypePod, tmpl.Name, boundaryID, wfv1.NodeError, err.Error())
if strings.Contains(err.Error(), exceededQuota) {
messages = append(messages, exceededQuota)
woc.log.Infof("Failed to create pod due to a lack of resources. It will be marked pending: %v", err)
} else {
return woc.initializeNode(nodeName, wfv1.NodeTypePod, tmpl.Name, boundaryID, wfv1.NodeError, err.Error())
}
}
if skipNodeInitialization {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this branch is only reached when a node exists but the pod failed to create because of an exceededQuota error. Should a sleep be added here to avoid slamming the master with Pod create requests?

woc.updated = true
return node
}
return woc.initializeNode(nodeName, wfv1.NodeTypePod, tmpl.Name, boundaryID, wfv1.NodePending)
return woc.initializeNode(nodeName, wfv1.NodeTypePod, tmpl.Name, boundaryID, wfv1.NodePending, messages...)
}

func processItem(fstTmpl *fasttemplate.Template, name string, index int, item wfv1.Item, obj interface{}) (string, error) {
Expand Down