Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set pipeline status when all tasks complete #2774

Merged
merged 1 commit into from
Jun 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/events.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ No events are emitted for `Conditions` today (https://github.com/tektoncd/pipeli
## TaskRuns

`TaskRun` events are generated for the following `Reasons`:

- `Started`: this is triggered the first time the `TaskRun` is picked by the
reconciler from its work queue, so it only happens if web-hook validation was
successful. Note that this event does not imply that a step started executing,
Expand All @@ -32,6 +33,7 @@ No events are emitted for `Conditions` today (https://github.com/tektoncd/pipeli
## PipelineRuns

`PipelineRun` events are generated for the following `Reasons`:

- `Succeeded`: this is triggered once all `Tasks` reachable via the DAG are
executed successfully.
- `Failed`: this is triggered if the `PipelineRun` is completed, but not
Expand Down
3 changes: 3 additions & 0 deletions pkg/apis/pipeline/v1beta1/pipelinerun_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,9 @@ const (
PipelineRunReasonCancelled PipelineRunReason = "Cancelled"
// PipelineRunReasonTimedOut is the reason set when the PipelineRun has timed out
PipelineRunReasonTimedOut PipelineRunReason = "PipelineRunTimeout"
// ReasonStopping indicates that no new Tasks will be scheduled by the controller, and the
// pipeline will stop once all running tasks complete their work
PipelineRunReasonStopping PipelineRunReason = "PipelineRunStopping"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it looks little odd, I ran a simple pipeline with these changes and the state transitions from stop to failed, pipeline stopped while task is running 😕

kubectl get pr
NAME                                  SUCCEEDED   REASON                STARTTIME   COMPLETIONTIME
pipelinerun-one-failure-two-success   Unknown     PipelineRunStopping   21s

kubectl get tr
NAME                                               SUCCEEDED   REASON      STARTTIME   COMPLETIONTIME
pipelinerun-one-failure-two-success-task-a-qdphk   False       Failed      25s         14s
pipelinerun-one-failure-two-success-task-b-z8ctx   True        Succeeded   25s         7s
pipelinerun-one-failure-two-success-task-c-pwkhv   Unknown     Running     25s

kubectl get pr
NAME                                  SUCCEEDED   REASON   STARTTIME   COMPLETIONTIME
pipelinerun-one-failure-two-success   False       Failed   31s         5s

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for trying it out on a real cloud - I kind of relied on unit and E2E tests :)

The reason changes as follows:

  • start / running / stopping / failed
    which is what I was hoping to achieve.
    When we extend the pipeline with finally, it could look something like:
  • start / running / stopping / running-finally / failed

The message could also help providing more details.

)

func (t PipelineRunReason) String() string {
Expand Down
63 changes: 42 additions & 21 deletions pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,9 +426,51 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun) err
}
}

as, err := artifacts.InitializeArtifactStorage(c.Images, pr, pipelineSpec, c.KubeClientSet, logger)
if err != nil {
logger.Infof("PipelineRun failed to initialize artifact storage %s", pr.Name)
return err
}

// When the pipeline run is stopping, we don't schedule any new task and only
// wait for all running tasks to complete and report their status
if !pipelineState.IsStopping() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The diff does not help very much, but the only changes here are:

  • if !pipelineState.IsStopping()
  • the code that used to be here moved "as-is" into runNextSchedulableTask

Copy link
Member

@pritidesai pritidesai Jun 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be this could be simplified and kept it as is, instead change GetNextTasks() such that no tasks are returned if pipeline is in stopping mode:

func (state PipelineRunState) GetNextTasks(candidateTasks map[string]struct{}) []*ResolvedPipelineRunTask {
	tasks := []*ResolvedPipelineRunTask{}
	if state.IsStopping() {
		return tasks
	}

With an empty execution queue, everything from runNextSchedulableTask is escaped except InitializeArtificatStorage and ResolveResultRefs (could be surrounded by the condition)

Dont worry if you are not comfortable changing it, I will have to change it in finally PR, as final tasks have to be executed after pipeline is stopping and once all running tasks are finished.

err = c.runNextSchedulableTask(ctx, pr, d, pipelineState, as)
if err != nil {
return err
}
}

before := pr.Status.GetCondition(apis.ConditionSucceeded)
after := resources.GetPipelineConditionStatus(pr, pipelineState, logger, d)
switch after.Status {
case corev1.ConditionTrue:
pr.Status.MarkSucceeded(after.Reason, after.Message)
case corev1.ConditionFalse:
pr.Status.MarkFailed(after.Reason, after.Message)
case corev1.ConditionUnknown:
pr.Status.MarkRunning(after.Reason, after.Message)
}
// Read the condition the way it was set by the Mark* helpers
after = pr.Status.GetCondition(apis.ConditionSucceeded)
events.Emit(recorder, before, after, pr)

pr.Status.TaskRuns = getTaskRunsStatus(pr, pipelineState)
logger.Infof("PipelineRun %s status is being set to %s", pr.Name, after)
return nil
}

// runNextSchedulableTask gets the next schedulable Tasks from the dag based on the current
// pipeline run state, and starts them
func (c *Reconciler) runNextSchedulableTask(ctx context.Context, pr *v1beta1.PipelineRun, d *dag.Graph, pipelineState resources.PipelineRunState, as artifacts.ArtifactStorageInterface) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: how to avoid any more reconciler receiver functions and at the same time not explode the reconcile itself? 🤔

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point!
Getting the recorder and the logger from the context helps with that.
This function depends on other receiver functions, so it still needs c, but perhaps it is possible now to reduce the number of receiver functions - in a separate PR


logger := logging.FromContext(ctx)
recorder := controller.GetEventRecorder(ctx)

candidateTasks, err := dag.GetSchedulable(d, pipelineState.SuccessfulPipelineTaskNames()...)
if err != nil {
logger.Errorf("Error getting potential next tasks for valid pipelinerun %s: %v", pr.Name, err)
return nil
}

nextRprts := pipelineState.GetNextTasks(candidateTasks)
Expand All @@ -440,13 +482,6 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun) err
}
resources.ApplyTaskResults(nextRprts, resolvedResultRefs)

var as artifacts.ArtifactStorageInterface

if as, err = artifacts.InitializeArtifactStorage(c.Images, pr, pipelineSpec, c.KubeClientSet, logger); err != nil {
logger.Infof("PipelineRun failed to initialize artifact storage %s", pr.Name)
return err
}

for _, rprt := range nextRprts {
if rprt == nil {
continue
Expand All @@ -467,20 +502,6 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun) err
}
}
}
before := pr.Status.GetCondition(apis.ConditionSucceeded)
after := resources.GetPipelineConditionStatus(pr, pipelineState, logger, d)
switch after.Status {
case corev1.ConditionTrue:
pr.Status.MarkSucceeded(after.Reason, after.Message)
case corev1.ConditionFalse:
pr.Status.MarkFailed(after.Reason, after.Message)
case corev1.ConditionUnknown:
pr.Status.MarkRunning(after.Reason, after.Message)
}
events.Emit(recorder, before, after, pr)

pr.Status.TaskRuns = getTaskRunsStatus(pr, pipelineState)
logger.Infof("PipelineRun %s status is being set to %s", pr.Name, after)
return nil
}

Expand Down
141 changes: 94 additions & 47 deletions pkg/reconciler/pipelinerun/resources/pipelinerunresolution.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,20 @@ func (t ResolvedPipelineRunTask) IsCancelled() bool {
return c.IsFalse() && c.Reason == v1beta1.TaskRunReasonCancelled.String()
}

// IsStarted returns true only if the PipelineRunTask itself has a TaskRun associated
func (t ResolvedPipelineRunTask) IsStarted() bool {
if t.TaskRun == nil {
return false
}

c := t.TaskRun.Status.GetCondition(apis.ConditionSucceeded)
Copy link
Member

@pritidesai pritidesai Jun 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

task with a condition should be covered here and should help mark pipeline as started when a condition container has started and condition is executing.

if c == nil {
return false
}

return true
}

// ToMap returns a map that maps pipeline task name to the resolved pipeline run task
func (state PipelineRunState) ToMap() map[string]*ResolvedPipelineRunTask {
m := make(map[string]*ResolvedPipelineRunTask)
Expand Down Expand Up @@ -160,6 +174,20 @@ func (state PipelineRunState) IsBeforeFirstTaskRun() bool {
return true
}

// IsStopping returns true if the PipelineRun won't be scheduling any new Task because
// at least one task already failed or was cancelled
func (state PipelineRunState) IsStopping() bool {
for _, t := range state {
if t.IsCancelled() {
return true
}
if t.IsFailure() {
return true
}
}
return false
}

// GetNextTasks will return the next ResolvedPipelineRunTasks to execute, which are the ones in the
// list of candidateTasks which aren't yet indicated in state to be running.
func (state PipelineRunState) GetNextTasks(candidateTasks map[string]struct{}) []*ResolvedPipelineRunTask {
Expand Down Expand Up @@ -389,9 +417,9 @@ func GetTaskRunName(taskRunsStatus map[string]*v1beta1.PipelineRunTaskRunStatus,
func GetPipelineConditionStatus(pr *v1beta1.PipelineRun, state PipelineRunState, logger *zap.SugaredLogger, dag *dag.Graph) *apis.Condition {
// We have 4 different states here:
// 1. Timed out -> Failed
// 2. Any one TaskRun has failed - >Failed. This should change with #1020 and #1023
// 2. All tasks are done and at least one has failed or has been cancelled -> Failed
// 3. All tasks are done or are skipped (i.e. condition check failed).-> Success
// 4. A Task or Condition is running right now or there are things left to run -> Running
// 4. A Task or Condition is running right now or there are things left to run -> Running
if pr.IsTimedOut() {
return &apis.Condition{
Type: apis.ConditionSucceeded,
Expand All @@ -401,72 +429,91 @@ func GetPipelineConditionStatus(pr *v1beta1.PipelineRun, state PipelineRunState,
}
}

// A single failed task mean we fail the pipeline
for _, rprt := range state {
if rprt.IsCancelled() {
logger.Infof("TaskRun %s is cancelled, so PipelineRun %s is cancelled", rprt.TaskRunName, pr.Name)
return &apis.Condition{
Type: apis.ConditionSucceeded,
Status: corev1.ConditionFalse,
Reason: v1beta1.PipelineRunReasonCancelled.String(),
Message: fmt.Sprintf("TaskRun %s has cancelled", rprt.TaskRun.Name),
}
}

if rprt.IsFailure() { //IsDone ensures we have crossed the retry limit
logger.Infof("TaskRun %s has failed, so PipelineRun %s has failed, retries done: %b", rprt.TaskRunName, pr.Name, len(rprt.TaskRun.Status.RetriesStatus))
return &apis.Condition{
Type: apis.ConditionSucceeded,
Status: corev1.ConditionFalse,
Reason: v1beta1.PipelineRunReasonFailed.String(),
Message: fmt.Sprintf("TaskRun %s has failed", rprt.TaskRun.Name),
}
}
}

allTasks := []string{}
successOrSkipTasks := []string{}
withStatusTasks := []string{}
skipTasks := int(0)
failedTasks := int(0)
cancelledTasks := int(0)
reason := v1beta1.PipelineRunReasonSuccessful.String()
stateAsMap := state.ToMap()
isStopping := state.IsStopping()

// Check to see if all tasks are success or skipped
//
// The completion reason is also calculated here, but it will only be used
// if all tasks are completed.
//
// The pipeline run completion reason is set from the taskrun completion reason
// according to the following logic:
//
// - All successful: ReasonSucceeded
// - Some successful, some skipped: ReasonCompleted
// - Some cancelled, none failed: ReasonCancelled
// - At least one failed: ReasonFailed
for _, rprt := range state {
allTasks = append(allTasks, rprt.PipelineTask.Name)
if rprt.IsSuccessful() {
successOrSkipTasks = append(successOrSkipTasks, rprt.PipelineTask.Name)
}
if isSkipped(rprt, state.ToMap(), dag) {
switch {
case !rprt.IsStarted() && isStopping:
// If the pipeline is in stopping mode, all tasks that are not running
// already will be skipped. Otherwise these tasks end up in the
// incomplete count.
skipTasks++
withStatusTasks = append(withStatusTasks, rprt.PipelineTask.Name)
case rprt.IsSuccessful():
withStatusTasks = append(withStatusTasks, rprt.PipelineTask.Name)
case isSkipped(rprt, stateAsMap, dag):
afrittoli marked this conversation as resolved.
Show resolved Hide resolved
skipTasks++
successOrSkipTasks = append(successOrSkipTasks, rprt.PipelineTask.Name)
withStatusTasks = append(withStatusTasks, rprt.PipelineTask.Name)
// At least one is skipped and no failure yet, mark as completed
if reason == v1beta1.PipelineRunReasonSuccessful.String() {
reason = v1beta1.PipelineRunReasonCompleted.String()
}
case rprt.IsCancelled():
cancelledTasks++
withStatusTasks = append(withStatusTasks, rprt.PipelineTask.Name)
if reason != v1beta1.PipelineRunReasonFailed.String() {
reason = v1beta1.PipelineRunReasonCancelled.String()
}
case rprt.IsFailure():
withStatusTasks = append(withStatusTasks, rprt.PipelineTask.Name)
failedTasks++
reason = v1beta1.PipelineRunReasonFailed.String()
}
}

if reflect.DeepEqual(allTasks, successOrSkipTasks) {
logger.Infof("All TaskRuns have finished for PipelineRun %s so it has finished", pr.Name)
reason := v1beta1.PipelineRunReasonSuccessful.String()
if skipTasks != 0 {
reason = v1beta1.PipelineRunReasonCompleted.String()
if reflect.DeepEqual(allTasks, withStatusTasks) {
status := corev1.ConditionTrue
if failedTasks > 0 || cancelledTasks > 0 {
status = corev1.ConditionFalse
}

logger.Infof("All TaskRuns have finished for PipelineRun %s so it has finished", pr.Name)
return &apis.Condition{
Type: apis.ConditionSucceeded,
Status: corev1.ConditionTrue,
Reason: reason,
Message: fmt.Sprintf("Tasks Completed: %d, Skipped: %d", len(successOrSkipTasks)-skipTasks, skipTasks),
Type: apis.ConditionSucceeded,
Status: status,
Reason: reason,
Message: fmt.Sprintf("Tasks Completed: %d (Failed: %d, Cancelled %d), Skipped: %d",
len(allTasks)-skipTasks, failedTasks, cancelledTasks, skipTasks),
}
}

// Hasn't timed out; no taskrun failed yet; and not all tasks have finished....
// Hasn't timed out; not all tasks have finished....
// Must keep running then....
if failedTasks > 0 || cancelledTasks > 0 {
afrittoli marked this conversation as resolved.
Show resolved Hide resolved
reason = v1beta1.PipelineRunReasonStopping.String()
} else {
reason = v1beta1.PipelineRunReasonRunning.String()
}
return &apis.Condition{
Type: apis.ConditionSucceeded,
Status: corev1.ConditionUnknown,
Reason: v1beta1.PipelineRunReasonRunning.String(),
Message: fmt.Sprintf("Tasks Completed: %d, Incomplete: %d, Skipped: %d", len(successOrSkipTasks)-skipTasks, len(allTasks)-len(successOrSkipTasks), skipTasks),
Type: apis.ConditionSucceeded,
Status: corev1.ConditionUnknown,
Reason: reason,
Message: fmt.Sprintf("Tasks Completed: %d (Failed: %d, Cancelled %d), Incomplete: %d, Skipped: %d",
len(withStatusTasks)-skipTasks, failedTasks, cancelledTasks, len(allTasks)-len(withStatusTasks), skipTasks),
}
}

// isSkipped returns true if a Task in a TaskRun will not be run either because
// its Condition Checks failed or because one of the parent tasks's conditions failed
// its Condition Checks failed or because one of the parent tasks' conditions failed
// Note that this means isSkipped returns false if a conditionCheck is in progress
func isSkipped(rprt *ResolvedPipelineRunTask, stateMap map[string]*ResolvedPipelineRunTask, d *dag.Graph) bool {
// Taskrun not skipped if it already exists
Expand Down
Loading