Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor checkandEstimate to optimize podReconciliation #1311

Merged
merged 2 commits into from
Apr 9, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 10 additions & 61 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,6 @@ type wfOperationCtx struct {
// workflowDeadline is the deadline which the workflow is expected to complete before we
// terminate the workflow.
workflowDeadline *time.Time

// currentWFSize is current Workflow size
currentWFSize int

// unSavedNodeStatusSize is unsaved workflow size
unSavedNodeStatusSize int

// isWFCompressionFailed is workflow compression failed status
isWFCompressionFailed bool
}

var (
Expand Down Expand Up @@ -300,6 +291,7 @@ func (woc *wfOperationCtx) persistUpdates() {
err := woc.checkAndCompress()
if err != nil {
woc.log.Warnf("Error compressing workflow: %v", err)
woc.markWorkflowFailed(err.Error())
}
if woc.wf.Status.CompressedNodes != "" {
woc.wf.Status.Nodes = nil
Expand Down Expand Up @@ -464,9 +456,9 @@ func (woc *wfOperationCtx) podReconciliation() error {
seenPodLock := &sync.Mutex{}
wfNodesLock := &sync.RWMutex{}

performAssessment := func(pod *apiv1.Pod) string {
performAssessment := func(pod *apiv1.Pod) {
if pod == nil {
return ""
return
}
nodeNameForPod := pod.Annotations[common.AnnotationKeyNodeName]
nodeID := woc.wf.NodeID(nodeNameForPod)
Expand All @@ -486,51 +478,34 @@ func (woc *wfOperationCtx) podReconciliation() error {
if node.Completed() && !node.IsDaemoned() {
if tmpVal, tmpOk := pod.Labels[common.LabelKeyCompleted]; tmpOk {
if tmpVal == "true" {
return nodeID
return
}
}
woc.completedPods[pod.ObjectMeta.Name] = true
}
}
return nodeID
return
}

parallelPodNum := make(chan string, 500)
var wg sync.WaitGroup

woc.currentWFSize = woc.getSize()

for _, pod := range podList.Items {
parallelPodNum <- pod.Name
wg.Add(1)
go func(tmpPod apiv1.Pod) {
defer wg.Done()
wfNodesLock.Lock()
origNodeStatus := *woc.wf.Status.DeepCopy()
wfNodesLock.Unlock()
nodeID := performAssessment(&tmpPod)
performAssessment(&tmpPod)
err = woc.applyExecutionControl(&tmpPod, wfNodesLock)
if err != nil {
woc.log.Warnf("Failed to apply execution control to pod %s", tmpPod.Name)
}
wfNodesLock.Lock()
defer wfNodesLock.Unlock()
err = woc.checkAndEstimate(nodeID)
if err != nil {
woc.wf.Status = origNodeStatus
nodeNameForPod := tmpPod.Annotations[common.AnnotationKeyNodeName]
woc.log.Warnf("%v", err)
woc.markNodeErrorClearOuput(nodeNameForPod, err)
err = woc.checkAndCompress()
if err != nil {
woc.isWFCompressionFailed = true
}
}
<-parallelPodNum
}(pod)
}

wg.Wait()

// Now check for deleted pods. Iterate our nodes. If any one of our nodes does not show up in
// the seen list it implies that the pod was deleted without the controller seeing the event.
// It is now impossible to infer pod status. The only thing we can do at this point is to mark
Expand Down Expand Up @@ -1682,7 +1657,7 @@ func (woc *wfOperationCtx) getSize() int {
// The compressed content will be assign to compressedNodes element and clear the nodestatus map.
func (woc *wfOperationCtx) checkAndCompress() error {

if !woc.isWFCompressionFailed && (woc.wf.Status.CompressedNodes != "" || (woc.wf.Status.CompressedNodes == "" && woc.getSize() >= maxWorkflowSize)) {
if woc.wf.Status.CompressedNodes != "" || (woc.wf.Status.CompressedNodes == "" && woc.getSize() >= maxWorkflowSize) {
nodeContent, err := json.Marshal(woc.wf.Status.Nodes)
if err != nil {
return errors.InternalWrapError(err)
Expand All @@ -1691,10 +1666,10 @@ func (woc *wfOperationCtx) checkAndCompress() error {
woc.wf.Status.CompressedNodes = file.CompressEncodeString(buff)
}

if woc.isWFCompressionFailed || (woc.wf.Status.CompressedNodes != "" && woc.getSize() >= maxWorkflowSize) {
woc.isWFCompressionFailed = true
if woc.wf.Status.CompressedNodes != "" && woc.getSize() >= maxWorkflowSize {
return errors.InternalError(fmt.Sprintf("Workflow is longer than maximum allowed size. Size=%d", woc.getSize()))
}

return nil
}

Expand All @@ -1716,29 +1691,3 @@ func (woc *wfOperationCtx) checkAndDecompress() error {
}
return nil
}

// checkAndEstimate will check and estimate the workflow size with current nodestatus
func (woc *wfOperationCtx) checkAndEstimate(nodeID string) error {
if nodeID == "" {
return nil
}

if woc.isWFCompressionFailed {
return errors.InternalErrorf("Workflow is longer than maximum allowed size. Size=%d", woc.currentWFSize+woc.unSavedNodeStatusSize)
}

if woc.wf.Status.CompressedNodes != "" {
if node, ok := woc.wf.Status.Nodes[nodeID]; ok {
content, err := json.Marshal(node)
if err != nil {
return errors.InternalWrapError(err)
}
nodeSize := len(file.CompressEncodeString(string(content)))
if (nodeSize + woc.unSavedNodeStatusSize + woc.currentWFSize) >= maxWorkflowSize {
return errors.InternalErrorf("Workflow is longer than maximum allowed size. Size=%d", woc.currentWFSize+nodeSize+woc.unSavedNodeStatusSize)
}
woc.unSavedNodeStatusSize += nodeSize
}
}
return nil
}