Skip to content

Commit

Permalink
Set pipeline status when all tasks complete
Browse files Browse the repository at this point in the history
We used to set the pipeline status to failed as soon as the first
task in the pipeline failed or was cancelled.

As soon as the first task in the pipeline fails or is cancelled, we
stop scheduling new tasks, as we did before, but we will report
status Unknown until all Tasks are complete.

This allows to:
- the completion time at the same time that the status is set
  and avoid inconsistencies
- wait until all tasks are complete before we cleanup the pipeline
  artifact storage, affinity assistant and record metrics
- report the correct number of failed / cancelled tasks, as there
  may be more than one. Other tasks that were already running
  when the first failure happened may fail too
- prepare the pipeline controller more complex workflows, where
  the controller may continue working scheduling after failures
  • Loading branch information
afrittoli committed Jun 7, 2020
1 parent 194102f commit e72a32e
Show file tree
Hide file tree
Showing 3 changed files with 257 additions and 164 deletions.
61 changes: 39 additions & 22 deletions pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,9 +470,48 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun) err
}
}

var as artifacts.ArtifactStorageInterface

if as, err = artifacts.InitializeArtifactStorage(c.Images, pr, pipelineSpec, c.KubeClientSet, c.Logger); err != nil {
c.Logger.Infof("PipelineRun failed to initialize artifact storage %s", pr.Name)
return err
}

// When the pipeline run is stopping, we don't schedule any new task and only
// wait for all running tasks to complete and report their status
if !pipelineState.IsStopping() {
err = c.runNextSchedulableTask(pr, &d, pipelineState)
if err != nil {
return err
}
}

before := pr.Status.GetCondition(apis.ConditionSucceeded)
after := resources.GetPipelineConditionStatus(pr, pipelineState, c.Logger, d)
switch after.Status {
case corev1.ConditionTrue:
pr.Status.MarkSucceeded(after.Reason, after.Message)
case corev1.ConditionFalse:
pr.Status.MarkFailed(after.Reason, after.Message)
case corev1.ConditionUnknown:
pr.Status.MarkRunning(after.Reason, after.Message)
}
// Read the condition the way it was set by the Mark* helpers
after = pr.Status.GetCondition(apis.ConditionSucceeded)
events.Emit(c.Recorder, before, after, pr)

pr.Status.TaskRuns = getTaskRunsStatus(pr, pipelineState)
c.Logger.Infof("PipelineRun %s status is being set to %s", pr.Name, after)
return nil
}

// runNextSchedulableTask gets the next schedulable Tasks from the dag based on the current
// pipeline run state, and starts them
func (c *Reconciler) runNextSchedulableTask(pr v1beta1.PipelineRun, d *dag.Graph, pipelineState resources.PipelineRunState) error {
candidateTasks, err := dag.GetSchedulable(d, pipelineState.SuccessfulPipelineTaskNames()...)
if err != nil {
c.Logger.Errorf("Error getting potential next tasks for valid pipelinerun %s: %v", pr.Name, err)
return
}

nextRprts := pipelineState.GetNextTasks(candidateTasks)
Expand All @@ -484,13 +523,6 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun) err
}
resources.ApplyTaskResults(nextRprts, resolvedResultRefs)

var as artifacts.ArtifactStorageInterface

if as, err = artifacts.InitializeArtifactStorage(c.Images, pr, pipelineSpec, c.KubeClientSet, c.Logger); err != nil {
c.Logger.Infof("PipelineRun failed to initialize artifact storage %s", pr.Name)
return err
}

for _, rprt := range nextRprts {
if rprt == nil {
continue
Expand All @@ -511,21 +543,6 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun) err
}
}
}
before := pr.Status.GetCondition(apis.ConditionSucceeded)
after := resources.GetPipelineConditionStatus(pr, pipelineState, c.Logger, d)
switch after.Status {
case corev1.ConditionTrue:
pr.Status.MarkSucceeded(after.Reason, after.Message)
case corev1.ConditionFalse:
pr.Status.MarkFailed(after.Reason, after.Message)
case corev1.ConditionUnknown:
pr.Status.MarkRunning(after.Reason, after.Message)
}
events.Emit(c.Recorder, before, after, pr)

pr.Status.TaskRuns = getTaskRunsStatus(pr, pipelineState)
c.Logger.Infof("PipelineRun %s status is being set to %s", pr.Name, after)
return nil
}

func getPipelineRunResults(pipelineSpec *v1beta1.PipelineSpec, resolvedResultRefs resources.ResolvedResultRefs) []v1beta1.PipelineRunResult {
Expand Down
119 changes: 74 additions & 45 deletions pkg/reconciler/pipelinerun/resources/pipelinerunresolution.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ const (
// ReasonConditionCheckFailed indicates that the reason for the failure status is that the
// condition check associated to the pipeline task evaluated to false
ReasonConditionCheckFailed = "ConditionCheckFailed"

// ReasonStopping indicates that no new Tasks will be scheduled by the controller, and the
// pipeline will stop once all running tasks complete their work
ReasonStopping = "PipelineRunStopping"
)

// TaskNotFoundError indicates that the resolution failed because a referenced Task couldn't be retrieved
Expand Down Expand Up @@ -181,6 +185,20 @@ func (state PipelineRunState) IsBeforeFirstTaskRun() bool {
return true
}

// IsStopping returns true if the PipelineRun won't be scheduling any new Task because
// at least one task already failed or was cancelled
func (state PipelineRunState) IsStopping() bool {
for _, t := range state {
if t.IsCancelled() {
return true
}
if t.IsFailure() {
return true
}
}
return false
}

// GetNextTasks will return the next ResolvedPipelineRunTasks to execute, which are the ones in the
// list of candidateTasks which aren't yet indicated in state to be running.
func (state PipelineRunState) GetNextTasks(candidateTasks map[string]struct{}) []*ResolvedPipelineRunTask {
Expand Down Expand Up @@ -410,7 +428,7 @@ func GetTaskRunName(taskRunsStatus map[string]*v1beta1.PipelineRunTaskRunStatus,
func GetPipelineConditionStatus(pr *v1beta1.PipelineRun, state PipelineRunState, logger *zap.SugaredLogger, dag *dag.Graph) *apis.Condition {
// We have 4 different states here:
// 1. Timed out -> Failed
// 2. Any one TaskRun has failed - >Failed. This should change with #1020 and #1023
// 2. All tasks are done and at least one has failed or has been cancelled -> Failed
// 3. All tasks are done or are skipped (i.e. condition check failed).-> Success
// 4. A Task or Condition is running right now or there are things left to run -> Running
if pr.IsTimedOut() {
Expand All @@ -422,67 +440,78 @@ func GetPipelineConditionStatus(pr *v1beta1.PipelineRun, state PipelineRunState,
}
}

// A single failed task mean we fail the pipeline
for _, rprt := range state {
if rprt.IsCancelled() {
logger.Infof("TaskRun %s is cancelled, so PipelineRun %s is cancelled", rprt.TaskRunName, pr.Name)
return &apis.Condition{
Type: apis.ConditionSucceeded,
Status: corev1.ConditionFalse,
Reason: ReasonCancelled,
Message: fmt.Sprintf("TaskRun %s has cancelled", rprt.TaskRun.Name),
}
}

if rprt.IsFailure() { //IsDone ensures we have crossed the retry limit
logger.Infof("TaskRun %s has failed, so PipelineRun %s has failed, retries done: %b", rprt.TaskRunName, pr.Name, len(rprt.TaskRun.Status.RetriesStatus))
return &apis.Condition{
Type: apis.ConditionSucceeded,
Status: corev1.ConditionFalse,
Reason: ReasonFailed,
Message: fmt.Sprintf("TaskRun %s has failed", rprt.TaskRun.Name),
}
}
}

allTasks := []string{}
successOrSkipTasks := []string{}
withStatusTasks := []string{}
skipTasks := int(0)
failedTasks := int(0)
cancelledTasks := int(0)
reason := ReasonSucceeded

// Check to see if all tasks are success or skipped
//
// The completion reason is also calculated here, but it will only be used
// if all tasks are completed.
//
// The pipeline run completion reason is set from the taskrun completion reason
// according to the following logic:
//
// - All successful: ReasonSucceeded
// - Some successful, some skipped: ReasonCompleted
// - Some cancelled, none failed: ReasonCancelled
// - At least one failed: ReasonFailed
for _, rprt := range state {
allTasks = append(allTasks, rprt.PipelineTask.Name)
if rprt.IsSuccessful() {
successOrSkipTasks = append(successOrSkipTasks, rprt.PipelineTask.Name)
}
if isSkipped(rprt, state.ToMap(), dag) {
switch {
case rprt.IsSuccessful():
withStatusTasks = append(withStatusTasks, rprt.PipelineTask.Name)
case isSkipped(rprt, state.ToMap(), dag):
skipTasks++
successOrSkipTasks = append(successOrSkipTasks, rprt.PipelineTask.Name)
withStatusTasks = append(withStatusTasks, rprt.PipelineTask.Name)
// At least one is skipped and no failure yet, mark as completed
if reason == ReasonSucceeded {
reason = ReasonCompleted
}
case rprt.IsCancelled():
cancelledTasks++
withStatusTasks = append(withStatusTasks, rprt.PipelineTask.Name)
if reason != ReasonFailed {
reason = ReasonCancelled
}
case rprt.IsFailure():
withStatusTasks = append(withStatusTasks, rprt.PipelineTask.Name)
failedTasks++
reason = ReasonFailed
}
}

if reflect.DeepEqual(allTasks, successOrSkipTasks) {
logger.Infof("All TaskRuns have finished for PipelineRun %s so it has finished", pr.Name)
reason := ReasonSucceeded
if skipTasks != 0 {
reason = ReasonCompleted
if reflect.DeepEqual(allTasks, withStatusTasks) {
status := corev1.ConditionTrue
if failedTasks > 0 || cancelledTasks > 0 {
status = corev1.ConditionFalse
}

logger.Infof("All TaskRuns have finished for PipelineRun %s so it has finished", pr.Name)
return &apis.Condition{
Type: apis.ConditionSucceeded,
Status: corev1.ConditionTrue,
Reason: reason,
Message: fmt.Sprintf("Tasks Completed: %d, Skipped: %d", len(successOrSkipTasks)-skipTasks, skipTasks),
Type: apis.ConditionSucceeded,
Status: status,
Reason: reason,
Message: fmt.Sprintf("Tasks Completed: %d (Failed: %d, Cancelled %d), Skipped: %d",
len(allTasks)-skipTasks, failedTasks, cancelledTasks, skipTasks),
}
}

// Hasn't timed out; no taskrun failed yet; and not all tasks have finished....
// Hasn't timed out; not all tasks have finished....
// Must keep running then....
if failedTasks > 0 || cancelledTasks > 0 {
reason = ReasonStopping
} else {
reason = ReasonRunning
}
return &apis.Condition{
Type: apis.ConditionSucceeded,
Status: corev1.ConditionUnknown,
Reason: ReasonRunning,
Message: fmt.Sprintf("Tasks Completed: %d, Incomplete: %d, Skipped: %d", len(successOrSkipTasks)-skipTasks, len(allTasks)-len(successOrSkipTasks), skipTasks),
Type: apis.ConditionSucceeded,
Status: corev1.ConditionUnknown,
Reason: reason,
Message: fmt.Sprintf("Tasks Completed: %d (Failed: %d, Cancelled %d), Incomplete: %d, Skipped: %d",
len(withStatusTasks)-skipTasks, failedTasks, cancelledTasks, len(allTasks)-len(withStatusTasks), skipTasks),
}
}

Expand Down
Loading

0 comments on commit e72a32e

Please sign in to comment.