Skip to content

Commit

Permalink
Set pipeline status when all tasks complete
Browse files Browse the repository at this point in the history
We used to set the pipeline status to failed as soon as the first
task in the pipeline failed or was cancelled.

As soon as the first task in the pipeline fails or is cancelled, we
stop scheduling new tasks, as we did before, but we will report
status Unknown until all Tasks are complete, with reason "stopping".

This allows to:
- the completion time at the same time that the status is set
  and avoid inconsistencies
- wait until all tasks are complete before we cleanup the pipeline
  artifact storage, affinity assistant and record metrics
- report the correct number of failed / cancelled tasks, as there
  may be more than one. Other tasks that were already running
  when the first failure happened may fail too
- prepare the pipeline controller more complex workflows, where
  the controller may continue working scheduling after failures

Add test coverage for isSkipped and extend the pipelineresolution
module unit test coverage to capture more input pipeline state.

The DAG / GetNextTasks functions have not been touched at all.
The pipeline run reconciler, when the pipeline run is in stopping
mode, stops asking for next tasks to run. Once all running tasks
finish, the pipeline run finally gets in failed state.

When the pipeline run is in stopping mode, tasks that have not
been started yet are also counted as skipped in the status message
reported to the user.
  • Loading branch information
afrittoli authored and tekton-robot committed Jun 11, 2020
1 parent 64678e3 commit 3b95494
Show file tree
Hide file tree
Showing 5 changed files with 592 additions and 162 deletions.
2 changes: 2 additions & 0 deletions docs/events.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ No events are emitted for `Conditions` today (https://github.com/tektoncd/pipeli
## TaskRuns

`TaskRun` events are generated for the following `Reasons`:

- `Started`: this is triggered the first time the `TaskRun` is picked by the
reconciler from its work queue, so it only happens if web-hook validation was
successful. Note that this event does not imply that a step started executing,
Expand All @@ -32,6 +33,7 @@ No events are emitted for `Conditions` today (https://github.com/tektoncd/pipeli
## PipelineRuns

`PipelineRun` events are generated for the following `Reasons`:

- `Succeeded`: this is triggered once all `Tasks` reachable via the DAG are
executed successfully.
- `Failed`: this is triggered if the `PipelineRun` is completed, but not
Expand Down
3 changes: 3 additions & 0 deletions pkg/apis/pipeline/v1beta1/pipelinerun_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,9 @@ const (
PipelineRunReasonCancelled PipelineRunReason = "Cancelled"
// PipelineRunReasonTimedOut is the reason set when the PipelineRun has timed out
PipelineRunReasonTimedOut PipelineRunReason = "PipelineRunTimeout"
// ReasonStopping indicates that no new Tasks will be scheduled by the controller, and the
// pipeline will stop once all running tasks complete their work
PipelineRunReasonStopping PipelineRunReason = "PipelineRunStopping"
)

func (t PipelineRunReason) String() string {
Expand Down
63 changes: 42 additions & 21 deletions pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,9 +426,51 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun) err
}
}

as, err := artifacts.InitializeArtifactStorage(c.Images, pr, pipelineSpec, c.KubeClientSet, logger)
if err != nil {
logger.Infof("PipelineRun failed to initialize artifact storage %s", pr.Name)
return err
}

// When the pipeline run is stopping, we don't schedule any new task and only
// wait for all running tasks to complete and report their status
if !pipelineState.IsStopping() {
err = c.runNextSchedulableTask(ctx, pr, d, pipelineState, as)
if err != nil {
return err
}
}

before := pr.Status.GetCondition(apis.ConditionSucceeded)
after := resources.GetPipelineConditionStatus(pr, pipelineState, logger, d)
switch after.Status {
case corev1.ConditionTrue:
pr.Status.MarkSucceeded(after.Reason, after.Message)
case corev1.ConditionFalse:
pr.Status.MarkFailed(after.Reason, after.Message)
case corev1.ConditionUnknown:
pr.Status.MarkRunning(after.Reason, after.Message)
}
// Read the condition the way it was set by the Mark* helpers
after = pr.Status.GetCondition(apis.ConditionSucceeded)
events.Emit(recorder, before, after, pr)

pr.Status.TaskRuns = getTaskRunsStatus(pr, pipelineState)
logger.Infof("PipelineRun %s status is being set to %s", pr.Name, after)
return nil
}

// runNextSchedulableTask gets the next schedulable Tasks from the dag based on the current
// pipeline run state, and starts them
func (c *Reconciler) runNextSchedulableTask(ctx context.Context, pr *v1beta1.PipelineRun, d *dag.Graph, pipelineState resources.PipelineRunState, as artifacts.ArtifactStorageInterface) error {

logger := logging.FromContext(ctx)
recorder := controller.GetEventRecorder(ctx)

candidateTasks, err := dag.GetSchedulable(d, pipelineState.SuccessfulPipelineTaskNames()...)
if err != nil {
logger.Errorf("Error getting potential next tasks for valid pipelinerun %s: %v", pr.Name, err)
return nil
}

nextRprts := pipelineState.GetNextTasks(candidateTasks)
Expand All @@ -440,13 +482,6 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun) err
}
resources.ApplyTaskResults(nextRprts, resolvedResultRefs)

var as artifacts.ArtifactStorageInterface

if as, err = artifacts.InitializeArtifactStorage(c.Images, pr, pipelineSpec, c.KubeClientSet, logger); err != nil {
logger.Infof("PipelineRun failed to initialize artifact storage %s", pr.Name)
return err
}

for _, rprt := range nextRprts {
if rprt == nil {
continue
Expand All @@ -467,20 +502,6 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun) err
}
}
}
before := pr.Status.GetCondition(apis.ConditionSucceeded)
after := resources.GetPipelineConditionStatus(pr, pipelineState, logger, d)
switch after.Status {
case corev1.ConditionTrue:
pr.Status.MarkSucceeded(after.Reason, after.Message)
case corev1.ConditionFalse:
pr.Status.MarkFailed(after.Reason, after.Message)
case corev1.ConditionUnknown:
pr.Status.MarkRunning(after.Reason, after.Message)
}
events.Emit(recorder, before, after, pr)

pr.Status.TaskRuns = getTaskRunsStatus(pr, pipelineState)
logger.Infof("PipelineRun %s status is being set to %s", pr.Name, after)
return nil
}

Expand Down
141 changes: 94 additions & 47 deletions pkg/reconciler/pipelinerun/resources/pipelinerunresolution.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,20 @@ func (t ResolvedPipelineRunTask) IsCancelled() bool {
return c.IsFalse() && c.Reason == v1beta1.TaskRunReasonCancelled.String()
}

// IsStarted returns true only if the PipelineRunTask itself has a TaskRun associated
func (t ResolvedPipelineRunTask) IsStarted() bool {
if t.TaskRun == nil {
return false
}

c := t.TaskRun.Status.GetCondition(apis.ConditionSucceeded)
if c == nil {
return false
}

return true
}

// ToMap returns a map that maps pipeline task name to the resolved pipeline run task
func (state PipelineRunState) ToMap() map[string]*ResolvedPipelineRunTask {
m := make(map[string]*ResolvedPipelineRunTask)
Expand Down Expand Up @@ -160,6 +174,20 @@ func (state PipelineRunState) IsBeforeFirstTaskRun() bool {
return true
}

// IsStopping returns true if the PipelineRun won't be scheduling any new Task because
// at least one task already failed or was cancelled
func (state PipelineRunState) IsStopping() bool {
for _, t := range state {
if t.IsCancelled() {
return true
}
if t.IsFailure() {
return true
}
}
return false
}

// GetNextTasks will return the next ResolvedPipelineRunTasks to execute, which are the ones in the
// list of candidateTasks which aren't yet indicated in state to be running.
func (state PipelineRunState) GetNextTasks(candidateTasks map[string]struct{}) []*ResolvedPipelineRunTask {
Expand Down Expand Up @@ -389,9 +417,9 @@ func GetTaskRunName(taskRunsStatus map[string]*v1beta1.PipelineRunTaskRunStatus,
func GetPipelineConditionStatus(pr *v1beta1.PipelineRun, state PipelineRunState, logger *zap.SugaredLogger, dag *dag.Graph) *apis.Condition {
// We have 4 different states here:
// 1. Timed out -> Failed
// 2. Any one TaskRun has failed - >Failed. This should change with #1020 and #1023
// 2. All tasks are done and at least one has failed or has been cancelled -> Failed
// 3. All tasks are done or are skipped (i.e. condition check failed).-> Success
// 4. A Task or Condition is running right now or there are things left to run -> Running
// 4. A Task or Condition is running right now or there are things left to run -> Running
if pr.IsTimedOut() {
return &apis.Condition{
Type: apis.ConditionSucceeded,
Expand All @@ -401,72 +429,91 @@ func GetPipelineConditionStatus(pr *v1beta1.PipelineRun, state PipelineRunState,
}
}

// A single failed task mean we fail the pipeline
for _, rprt := range state {
if rprt.IsCancelled() {
logger.Infof("TaskRun %s is cancelled, so PipelineRun %s is cancelled", rprt.TaskRunName, pr.Name)
return &apis.Condition{
Type: apis.ConditionSucceeded,
Status: corev1.ConditionFalse,
Reason: v1beta1.PipelineRunReasonCancelled.String(),
Message: fmt.Sprintf("TaskRun %s has cancelled", rprt.TaskRun.Name),
}
}

if rprt.IsFailure() { //IsDone ensures we have crossed the retry limit
logger.Infof("TaskRun %s has failed, so PipelineRun %s has failed, retries done: %b", rprt.TaskRunName, pr.Name, len(rprt.TaskRun.Status.RetriesStatus))
return &apis.Condition{
Type: apis.ConditionSucceeded,
Status: corev1.ConditionFalse,
Reason: v1beta1.PipelineRunReasonFailed.String(),
Message: fmt.Sprintf("TaskRun %s has failed", rprt.TaskRun.Name),
}
}
}

allTasks := []string{}
successOrSkipTasks := []string{}
withStatusTasks := []string{}
skipTasks := int(0)
failedTasks := int(0)
cancelledTasks := int(0)
reason := v1beta1.PipelineRunReasonSuccessful.String()
stateAsMap := state.ToMap()
isStopping := state.IsStopping()

// Check to see if all tasks are success or skipped
//
// The completion reason is also calculated here, but it will only be used
// if all tasks are completed.
//
// The pipeline run completion reason is set from the taskrun completion reason
// according to the following logic:
//
// - All successful: ReasonSucceeded
// - Some successful, some skipped: ReasonCompleted
// - Some cancelled, none failed: ReasonCancelled
// - At least one failed: ReasonFailed
for _, rprt := range state {
allTasks = append(allTasks, rprt.PipelineTask.Name)
if rprt.IsSuccessful() {
successOrSkipTasks = append(successOrSkipTasks, rprt.PipelineTask.Name)
}
if isSkipped(rprt, state.ToMap(), dag) {
switch {
case !rprt.IsStarted() && isStopping:
// If the pipeline is in stopping mode, all tasks that are not running
// already will be skipped. Otherwise these tasks end up in the
// incomplete count.
skipTasks++
withStatusTasks = append(withStatusTasks, rprt.PipelineTask.Name)
case rprt.IsSuccessful():
withStatusTasks = append(withStatusTasks, rprt.PipelineTask.Name)
case isSkipped(rprt, stateAsMap, dag):
skipTasks++
successOrSkipTasks = append(successOrSkipTasks, rprt.PipelineTask.Name)
withStatusTasks = append(withStatusTasks, rprt.PipelineTask.Name)
// At least one is skipped and no failure yet, mark as completed
if reason == v1beta1.PipelineRunReasonSuccessful.String() {
reason = v1beta1.PipelineRunReasonCompleted.String()
}
case rprt.IsCancelled():
cancelledTasks++
withStatusTasks = append(withStatusTasks, rprt.PipelineTask.Name)
if reason != v1beta1.PipelineRunReasonFailed.String() {
reason = v1beta1.PipelineRunReasonCancelled.String()
}
case rprt.IsFailure():
withStatusTasks = append(withStatusTasks, rprt.PipelineTask.Name)
failedTasks++
reason = v1beta1.PipelineRunReasonFailed.String()
}
}

if reflect.DeepEqual(allTasks, successOrSkipTasks) {
logger.Infof("All TaskRuns have finished for PipelineRun %s so it has finished", pr.Name)
reason := v1beta1.PipelineRunReasonSuccessful.String()
if skipTasks != 0 {
reason = v1beta1.PipelineRunReasonCompleted.String()
if reflect.DeepEqual(allTasks, withStatusTasks) {
status := corev1.ConditionTrue
if failedTasks > 0 || cancelledTasks > 0 {
status = corev1.ConditionFalse
}

logger.Infof("All TaskRuns have finished for PipelineRun %s so it has finished", pr.Name)
return &apis.Condition{
Type: apis.ConditionSucceeded,
Status: corev1.ConditionTrue,
Reason: reason,
Message: fmt.Sprintf("Tasks Completed: %d, Skipped: %d", len(successOrSkipTasks)-skipTasks, skipTasks),
Type: apis.ConditionSucceeded,
Status: status,
Reason: reason,
Message: fmt.Sprintf("Tasks Completed: %d (Failed: %d, Cancelled %d), Skipped: %d",
len(allTasks)-skipTasks, failedTasks, cancelledTasks, skipTasks),
}
}

// Hasn't timed out; no taskrun failed yet; and not all tasks have finished....
// Hasn't timed out; not all tasks have finished....
// Must keep running then....
if failedTasks > 0 || cancelledTasks > 0 {
reason = v1beta1.PipelineRunReasonStopping.String()
} else {
reason = v1beta1.PipelineRunReasonRunning.String()
}
return &apis.Condition{
Type: apis.ConditionSucceeded,
Status: corev1.ConditionUnknown,
Reason: v1beta1.PipelineRunReasonRunning.String(),
Message: fmt.Sprintf("Tasks Completed: %d, Incomplete: %d, Skipped: %d", len(successOrSkipTasks)-skipTasks, len(allTasks)-len(successOrSkipTasks), skipTasks),
Type: apis.ConditionSucceeded,
Status: corev1.ConditionUnknown,
Reason: reason,
Message: fmt.Sprintf("Tasks Completed: %d (Failed: %d, Cancelled %d), Incomplete: %d, Skipped: %d",
len(withStatusTasks)-skipTasks, failedTasks, cancelledTasks, len(allTasks)-len(withStatusTasks), skipTasks),
}
}

// isSkipped returns true if a Task in a TaskRun will not be run either because
// its Condition Checks failed or because one of the parent tasks's conditions failed
// its Condition Checks failed or because one of the parent tasks' conditions failed
// Note that this means isSkipped returns false if a conditionCheck is in progress
func isSkipped(rprt *ResolvedPipelineRunTask, stateMap map[string]*ResolvedPipelineRunTask, d *dag.Graph) bool {
// Taskrun not skipped if it already exists
Expand Down
Loading

0 comments on commit 3b95494

Please sign in to comment.