-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Set pipeline status when all tasks complete #2774
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -426,9 +426,51 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun) err | |
} | ||
} | ||
|
||
as, err := artifacts.InitializeArtifactStorage(c.Images, pr, pipelineSpec, c.KubeClientSet, logger) | ||
if err != nil { | ||
logger.Infof("PipelineRun failed to initialize artifact storage %s", pr.Name) | ||
return err | ||
} | ||
|
||
// When the pipeline run is stopping, we don't schedule any new task and only | ||
// wait for all running tasks to complete and report their status | ||
if !pipelineState.IsStopping() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The diff does not help very much, but the only changes here are:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. May be this could be simplified and kept it as is, instead change
With an empty execution queue, everything from Dont worry if you are not comfortable changing it, I will have to change it in |
||
err = c.runNextSchedulableTask(ctx, pr, d, pipelineState, as) | ||
if err != nil { | ||
return err | ||
} | ||
} | ||
|
||
before := pr.Status.GetCondition(apis.ConditionSucceeded) | ||
after := resources.GetPipelineConditionStatus(pr, pipelineState, logger, d) | ||
switch after.Status { | ||
case corev1.ConditionTrue: | ||
pr.Status.MarkSucceeded(after.Reason, after.Message) | ||
case corev1.ConditionFalse: | ||
pr.Status.MarkFailed(after.Reason, after.Message) | ||
case corev1.ConditionUnknown: | ||
pr.Status.MarkRunning(after.Reason, after.Message) | ||
} | ||
// Read the condition the way it was set by the Mark* helpers | ||
after = pr.Status.GetCondition(apis.ConditionSucceeded) | ||
events.Emit(recorder, before, after, pr) | ||
|
||
pr.Status.TaskRuns = getTaskRunsStatus(pr, pipelineState) | ||
logger.Infof("PipelineRun %s status is being set to %s", pr.Name, after) | ||
return nil | ||
} | ||
|
||
// runNextSchedulableTask gets the next schedulable Tasks from the dag based on the current | ||
// pipeline run state, and starts them | ||
func (c *Reconciler) runNextSchedulableTask(ctx context.Context, pr *v1beta1.PipelineRun, d *dag.Graph, pipelineState resources.PipelineRunState, as artifacts.ArtifactStorageInterface) error { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. NIT: how to avoid any more reconciler receiver functions and at the same time not explode the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point! |
||
|
||
logger := logging.FromContext(ctx) | ||
recorder := controller.GetEventRecorder(ctx) | ||
|
||
candidateTasks, err := dag.GetSchedulable(d, pipelineState.SuccessfulPipelineTaskNames()...) | ||
if err != nil { | ||
logger.Errorf("Error getting potential next tasks for valid pipelinerun %s: %v", pr.Name, err) | ||
return nil | ||
} | ||
|
||
nextRprts := pipelineState.GetNextTasks(candidateTasks) | ||
|
@@ -440,13 +482,6 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun) err | |
} | ||
resources.ApplyTaskResults(nextRprts, resolvedResultRefs) | ||
|
||
var as artifacts.ArtifactStorageInterface | ||
|
||
if as, err = artifacts.InitializeArtifactStorage(c.Images, pr, pipelineSpec, c.KubeClientSet, logger); err != nil { | ||
logger.Infof("PipelineRun failed to initialize artifact storage %s", pr.Name) | ||
return err | ||
} | ||
|
||
for _, rprt := range nextRprts { | ||
if rprt == nil { | ||
continue | ||
|
@@ -467,20 +502,6 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun) err | |
} | ||
} | ||
} | ||
before := pr.Status.GetCondition(apis.ConditionSucceeded) | ||
after := resources.GetPipelineConditionStatus(pr, pipelineState, logger, d) | ||
switch after.Status { | ||
case corev1.ConditionTrue: | ||
pr.Status.MarkSucceeded(after.Reason, after.Message) | ||
case corev1.ConditionFalse: | ||
pr.Status.MarkFailed(after.Reason, after.Message) | ||
case corev1.ConditionUnknown: | ||
pr.Status.MarkRunning(after.Reason, after.Message) | ||
} | ||
events.Emit(recorder, before, after, pr) | ||
|
||
pr.Status.TaskRuns = getTaskRunsStatus(pr, pipelineState) | ||
logger.Infof("PipelineRun %s status is being set to %s", pr.Name, after) | ||
return nil | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -127,6 +127,20 @@ func (t ResolvedPipelineRunTask) IsCancelled() bool { | |
return c.IsFalse() && c.Reason == v1beta1.TaskRunReasonCancelled.String() | ||
} | ||
|
||
// IsStarted returns true only if the PipelineRunTask itself has a TaskRun associated | ||
func (t ResolvedPipelineRunTask) IsStarted() bool { | ||
if t.TaskRun == nil { | ||
return false | ||
} | ||
|
||
c := t.TaskRun.Status.GetCondition(apis.ConditionSucceeded) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. task with a condition should be covered here and should help mark pipeline as |
||
if c == nil { | ||
return false | ||
} | ||
|
||
return true | ||
} | ||
|
||
// ToMap returns a map that maps pipeline task name to the resolved pipeline run task | ||
func (state PipelineRunState) ToMap() map[string]*ResolvedPipelineRunTask { | ||
m := make(map[string]*ResolvedPipelineRunTask) | ||
|
@@ -160,6 +174,20 @@ func (state PipelineRunState) IsBeforeFirstTaskRun() bool { | |
return true | ||
} | ||
|
||
// IsStopping returns true if the PipelineRun won't be scheduling any new Task because | ||
// at least one task already failed or was cancelled | ||
func (state PipelineRunState) IsStopping() bool { | ||
for _, t := range state { | ||
if t.IsCancelled() { | ||
return true | ||
} | ||
if t.IsFailure() { | ||
return true | ||
} | ||
} | ||
return false | ||
} | ||
|
||
// GetNextTasks will return the next ResolvedPipelineRunTasks to execute, which are the ones in the | ||
// list of candidateTasks which aren't yet indicated in state to be running. | ||
func (state PipelineRunState) GetNextTasks(candidateTasks map[string]struct{}) []*ResolvedPipelineRunTask { | ||
|
@@ -389,9 +417,9 @@ func GetTaskRunName(taskRunsStatus map[string]*v1beta1.PipelineRunTaskRunStatus, | |
func GetPipelineConditionStatus(pr *v1beta1.PipelineRun, state PipelineRunState, logger *zap.SugaredLogger, dag *dag.Graph) *apis.Condition { | ||
// We have 4 different states here: | ||
// 1. Timed out -> Failed | ||
// 2. Any one TaskRun has failed - >Failed. This should change with #1020 and #1023 | ||
// 2. All tasks are done and at least one has failed or has been cancelled -> Failed | ||
// 3. All tasks are done or are skipped (i.e. condition check failed).-> Success | ||
// 4. A Task or Condition is running right now or there are things left to run -> Running | ||
// 4. A Task or Condition is running right now or there are things left to run -> Running | ||
if pr.IsTimedOut() { | ||
return &apis.Condition{ | ||
Type: apis.ConditionSucceeded, | ||
|
@@ -401,72 +429,91 @@ func GetPipelineConditionStatus(pr *v1beta1.PipelineRun, state PipelineRunState, | |
} | ||
} | ||
|
||
// A single failed task mean we fail the pipeline | ||
for _, rprt := range state { | ||
if rprt.IsCancelled() { | ||
logger.Infof("TaskRun %s is cancelled, so PipelineRun %s is cancelled", rprt.TaskRunName, pr.Name) | ||
return &apis.Condition{ | ||
Type: apis.ConditionSucceeded, | ||
Status: corev1.ConditionFalse, | ||
Reason: v1beta1.PipelineRunReasonCancelled.String(), | ||
Message: fmt.Sprintf("TaskRun %s has cancelled", rprt.TaskRun.Name), | ||
} | ||
} | ||
|
||
if rprt.IsFailure() { //IsDone ensures we have crossed the retry limit | ||
logger.Infof("TaskRun %s has failed, so PipelineRun %s has failed, retries done: %b", rprt.TaskRunName, pr.Name, len(rprt.TaskRun.Status.RetriesStatus)) | ||
return &apis.Condition{ | ||
Type: apis.ConditionSucceeded, | ||
Status: corev1.ConditionFalse, | ||
Reason: v1beta1.PipelineRunReasonFailed.String(), | ||
Message: fmt.Sprintf("TaskRun %s has failed", rprt.TaskRun.Name), | ||
} | ||
} | ||
} | ||
|
||
allTasks := []string{} | ||
successOrSkipTasks := []string{} | ||
withStatusTasks := []string{} | ||
skipTasks := int(0) | ||
failedTasks := int(0) | ||
cancelledTasks := int(0) | ||
reason := v1beta1.PipelineRunReasonSuccessful.String() | ||
stateAsMap := state.ToMap() | ||
isStopping := state.IsStopping() | ||
|
||
// Check to see if all tasks are success or skipped | ||
// | ||
// The completion reason is also calculated here, but it will only be used | ||
// if all tasks are completed. | ||
// | ||
// The pipeline run completion reason is set from the taskrun completion reason | ||
// according to the following logic: | ||
// | ||
// - All successful: ReasonSucceeded | ||
// - Some successful, some skipped: ReasonCompleted | ||
// - Some cancelled, none failed: ReasonCancelled | ||
// - At least one failed: ReasonFailed | ||
for _, rprt := range state { | ||
allTasks = append(allTasks, rprt.PipelineTask.Name) | ||
if rprt.IsSuccessful() { | ||
successOrSkipTasks = append(successOrSkipTasks, rprt.PipelineTask.Name) | ||
} | ||
if isSkipped(rprt, state.ToMap(), dag) { | ||
switch { | ||
case !rprt.IsStarted() && isStopping: | ||
// If the pipeline is in stopping mode, all tasks that are not running | ||
// already will be skipped. Otherwise these tasks end up in the | ||
// incomplete count. | ||
skipTasks++ | ||
withStatusTasks = append(withStatusTasks, rprt.PipelineTask.Name) | ||
case rprt.IsSuccessful(): | ||
withStatusTasks = append(withStatusTasks, rprt.PipelineTask.Name) | ||
case isSkipped(rprt, stateAsMap, dag): | ||
afrittoli marked this conversation as resolved.
Show resolved
Hide resolved
|
||
skipTasks++ | ||
successOrSkipTasks = append(successOrSkipTasks, rprt.PipelineTask.Name) | ||
withStatusTasks = append(withStatusTasks, rprt.PipelineTask.Name) | ||
// At least one is skipped and no failure yet, mark as completed | ||
if reason == v1beta1.PipelineRunReasonSuccessful.String() { | ||
reason = v1beta1.PipelineRunReasonCompleted.String() | ||
} | ||
case rprt.IsCancelled(): | ||
cancelledTasks++ | ||
withStatusTasks = append(withStatusTasks, rprt.PipelineTask.Name) | ||
if reason != v1beta1.PipelineRunReasonFailed.String() { | ||
reason = v1beta1.PipelineRunReasonCancelled.String() | ||
} | ||
case rprt.IsFailure(): | ||
withStatusTasks = append(withStatusTasks, rprt.PipelineTask.Name) | ||
failedTasks++ | ||
reason = v1beta1.PipelineRunReasonFailed.String() | ||
} | ||
} | ||
|
||
if reflect.DeepEqual(allTasks, successOrSkipTasks) { | ||
logger.Infof("All TaskRuns have finished for PipelineRun %s so it has finished", pr.Name) | ||
reason := v1beta1.PipelineRunReasonSuccessful.String() | ||
if skipTasks != 0 { | ||
reason = v1beta1.PipelineRunReasonCompleted.String() | ||
if reflect.DeepEqual(allTasks, withStatusTasks) { | ||
status := corev1.ConditionTrue | ||
if failedTasks > 0 || cancelledTasks > 0 { | ||
status = corev1.ConditionFalse | ||
} | ||
|
||
logger.Infof("All TaskRuns have finished for PipelineRun %s so it has finished", pr.Name) | ||
return &apis.Condition{ | ||
Type: apis.ConditionSucceeded, | ||
Status: corev1.ConditionTrue, | ||
Reason: reason, | ||
Message: fmt.Sprintf("Tasks Completed: %d, Skipped: %d", len(successOrSkipTasks)-skipTasks, skipTasks), | ||
Type: apis.ConditionSucceeded, | ||
Status: status, | ||
Reason: reason, | ||
Message: fmt.Sprintf("Tasks Completed: %d (Failed: %d, Cancelled %d), Skipped: %d", | ||
len(allTasks)-skipTasks, failedTasks, cancelledTasks, skipTasks), | ||
} | ||
} | ||
|
||
// Hasn't timed out; no taskrun failed yet; and not all tasks have finished.... | ||
// Hasn't timed out; not all tasks have finished.... | ||
// Must keep running then.... | ||
if failedTasks > 0 || cancelledTasks > 0 { | ||
afrittoli marked this conversation as resolved.
Show resolved
Hide resolved
|
||
reason = v1beta1.PipelineRunReasonStopping.String() | ||
} else { | ||
reason = v1beta1.PipelineRunReasonRunning.String() | ||
} | ||
return &apis.Condition{ | ||
Type: apis.ConditionSucceeded, | ||
Status: corev1.ConditionUnknown, | ||
Reason: v1beta1.PipelineRunReasonRunning.String(), | ||
Message: fmt.Sprintf("Tasks Completed: %d, Incomplete: %d, Skipped: %d", len(successOrSkipTasks)-skipTasks, len(allTasks)-len(successOrSkipTasks), skipTasks), | ||
Type: apis.ConditionSucceeded, | ||
Status: corev1.ConditionUnknown, | ||
Reason: reason, | ||
Message: fmt.Sprintf("Tasks Completed: %d (Failed: %d, Cancelled %d), Incomplete: %d, Skipped: %d", | ||
len(withStatusTasks)-skipTasks, failedTasks, cancelledTasks, len(allTasks)-len(withStatusTasks), skipTasks), | ||
} | ||
} | ||
|
||
// isSkipped returns true if a Task in a TaskRun will not be run either because | ||
// its Condition Checks failed or because one of the parent tasks's conditions failed | ||
// its Condition Checks failed or because one of the parent tasks' conditions failed | ||
// Note that this means isSkipped returns false if a conditionCheck is in progress | ||
func isSkipped(rprt *ResolvedPipelineRunTask, stateMap map[string]*ResolvedPipelineRunTask, d *dag.Graph) bool { | ||
// Taskrun not skipped if it already exists | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it looks little odd, I ran a simple pipeline with these changes and the state transitions from stop to failed, pipeline stopped while task is running 😕
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for trying it out on a real cloud - I kind of relied on unit and E2E tests :)
The reason changes as follows:
which is what I was hoping to achieve.
When we extend the pipeline with finally, it could look something like:
The message could also help providing more details.