From 1448d2da0ee8ef6e6a82a2f1fde4fd860c3963be Mon Sep 17 00:00:00 2001 From: xianlubird Date: Mon, 8 Apr 2019 14:10:11 +0800 Subject: [PATCH 1/2] Refactor checkandEstimate to optimize podReconciliation --- workflow/controller/operator.go | 79 ++++++++------------------------- 1 file changed, 18 insertions(+), 61 deletions(-) diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index acc4b335bb15..ca59d5073147 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -61,15 +61,6 @@ type wfOperationCtx struct { // workflowDeadline is the deadline which the workflow is expected to complete before we // terminate the workflow. workflowDeadline *time.Time - - // currentWFSize is current Workflow size - currentWFSize int - - // unSavedNodeStatusSize is unsaved workflow size - unSavedNodeStatusSize int - - // isWFCompressionFailed is workflow compression failed status - isWFCompressionFailed bool } var ( @@ -464,9 +455,9 @@ func (woc *wfOperationCtx) podReconciliation() error { seenPodLock := &sync.Mutex{} wfNodesLock := &sync.RWMutex{} - performAssessment := func(pod *apiv1.Pod) string { + performAssessment := func(pod *apiv1.Pod) { if pod == nil { - return "" + return } nodeNameForPod := pod.Annotations[common.AnnotationKeyNodeName] nodeID := woc.wf.NodeID(nodeNameForPod) @@ -486,51 +477,43 @@ func (woc *wfOperationCtx) podReconciliation() error { if node.Completed() && !node.IsDaemoned() { if tmpVal, tmpOk := pod.Labels[common.LabelKeyCompleted]; tmpOk { if tmpVal == "true" { - return nodeID + return } } woc.completedPods[pod.ObjectMeta.Name] = true } } - return nodeID + return } parallelPodNum := make(chan string, 500) var wg sync.WaitGroup - - woc.currentWFSize = woc.getSize() + origNodeStatus := *woc.wf.Status.DeepCopy() for _, pod := range podList.Items { parallelPodNum <- pod.Name wg.Add(1) go func(tmpPod apiv1.Pod) { defer wg.Done() - wfNodesLock.Lock() - origNodeStatus := *woc.wf.Status.DeepCopy() - wfNodesLock.Unlock() - nodeID := performAssessment(&tmpPod) + performAssessment(&tmpPod) err = woc.applyExecutionControl(&tmpPod, wfNodesLock) if err != nil { woc.log.Warnf("Failed to apply execution control to pod %s", tmpPod.Name) } - wfNodesLock.Lock() - defer wfNodesLock.Unlock() - err = woc.checkAndEstimate(nodeID) - if err != nil { - woc.wf.Status = origNodeStatus - nodeNameForPod := tmpPod.Annotations[common.AnnotationKeyNodeName] - woc.log.Warnf("%v", err) - woc.markNodeErrorClearOuput(nodeNameForPod, err) - err = woc.checkAndCompress() - if err != nil { - woc.isWFCompressionFailed = true - } - } <-parallelPodNum }(pod) } wg.Wait() + + err = woc.checkAndCompress() + if err != nil { + woc.wf.Status = origNodeStatus + woc.log.Warnf("%v", err) + err = woc.checkAndCompress() + woc.markWorkflowFailed(err.Error()) + } + // Now check for deleted pods. Iterate our nodes. If any one of our nodes does not show up in // the seen list it implies that the pod was deleted without the controller seeing the event. // It is now impossible to infer pod status. The only thing we can do at this point is to mark @@ -1682,7 +1665,7 @@ func (woc *wfOperationCtx) getSize() int { // The compressed content will be assign to compressedNodes element and clear the nodestatus map. func (woc *wfOperationCtx) checkAndCompress() error { - if !woc.isWFCompressionFailed && (woc.wf.Status.CompressedNodes != "" || (woc.wf.Status.CompressedNodes == "" && woc.getSize() >= maxWorkflowSize)) { + if woc.wf.Status.CompressedNodes != "" || (woc.wf.Status.CompressedNodes == "" && woc.getSize() >= maxWorkflowSize) { nodeContent, err := json.Marshal(woc.wf.Status.Nodes) if err != nil { return errors.InternalWrapError(err) @@ -1691,10 +1674,10 @@ func (woc *wfOperationCtx) checkAndCompress() error { woc.wf.Status.CompressedNodes = file.CompressEncodeString(buff) } - if woc.isWFCompressionFailed || (woc.wf.Status.CompressedNodes != "" && woc.getSize() >= maxWorkflowSize) { - woc.isWFCompressionFailed = true + if woc.wf.Status.CompressedNodes != "" && woc.getSize() >= maxWorkflowSize { return errors.InternalError(fmt.Sprintf("Workflow is longer than maximum allowed size. Size=%d", woc.getSize())) } + return nil } @@ -1716,29 +1699,3 @@ func (woc *wfOperationCtx) checkAndDecompress() error { } return nil } - -// checkAndEstimate will check and estimate the workflow size with current nodestatus -func (woc *wfOperationCtx) checkAndEstimate(nodeID string) error { - if nodeID == "" { - return nil - } - - if woc.isWFCompressionFailed { - return errors.InternalErrorf("Workflow is longer than maximum allowed size. Size=%d", woc.currentWFSize+woc.unSavedNodeStatusSize) - } - - if woc.wf.Status.CompressedNodes != "" { - if node, ok := woc.wf.Status.Nodes[nodeID]; ok { - content, err := json.Marshal(node) - if err != nil { - return errors.InternalWrapError(err) - } - nodeSize := len(file.CompressEncodeString(string(content))) - if (nodeSize + woc.unSavedNodeStatusSize + woc.currentWFSize) >= maxWorkflowSize { - return errors.InternalErrorf("Workflow is longer than maximum allowed size. Size=%d", woc.currentWFSize+nodeSize+woc.unSavedNodeStatusSize) - } - woc.unSavedNodeStatusSize += nodeSize - } - } - return nil -} From 3dfffc13adbff417061f0ed6c0eb4f2fde29fceb Mon Sep 17 00:00:00 2001 From: xianlubird Date: Tue, 9 Apr 2019 10:10:04 +0800 Subject: [PATCH 2/2] Move compress function to persistUpdates --- workflow/controller/operator.go | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index ca59d5073147..44e72be6340c 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -291,6 +291,7 @@ func (woc *wfOperationCtx) persistUpdates() { err := woc.checkAndCompress() if err != nil { woc.log.Warnf("Error compressing workflow: %v", err) + woc.markWorkflowFailed(err.Error()) } if woc.wf.Status.CompressedNodes != "" { woc.wf.Status.Nodes = nil @@ -488,7 +489,6 @@ func (woc *wfOperationCtx) podReconciliation() error { parallelPodNum := make(chan string, 500) var wg sync.WaitGroup - origNodeStatus := *woc.wf.Status.DeepCopy() for _, pod := range podList.Items { parallelPodNum <- pod.Name @@ -506,14 +506,6 @@ func (woc *wfOperationCtx) podReconciliation() error { wg.Wait() - err = woc.checkAndCompress() - if err != nil { - woc.wf.Status = origNodeStatus - woc.log.Warnf("%v", err) - err = woc.checkAndCompress() - woc.markWorkflowFailed(err.Error()) - } - // Now check for deleted pods. Iterate our nodes. If any one of our nodes does not show up in // the seen list it implies that the pod was deleted without the controller seeing the event. // It is now impossible to infer pod status. The only thing we can do at this point is to mark