Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Actually stop trying to time out finished Runs ⏰ #3078

Merged
merged 1 commit into from
Sep 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions pkg/apis/pipeline/v1beta1/pipelinerun_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ limitations under the License.
package v1beta1

import (
"fmt"
"time"

"github.com/tektoncd/pipeline/pkg/apis/config"
"github.com/tektoncd/pipeline/pkg/apis/pipeline"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"knative.dev/pkg/apis"
duckv1beta1 "knative.dev/pkg/apis/duck/v1beta1"
)
Expand Down Expand Up @@ -98,10 +98,9 @@ func (pr *PipelineRun) IsCancelled() bool {
return pr.Spec.Status == PipelineRunSpecStatusCancelled
}

// GetRunKey return the pipelinerun key for timeout handler map
func (pr *PipelineRun) GetRunKey() string {
// The address of the pointer is a threadsafe unique identifier for the pipelinerun
return fmt.Sprintf("%s/%p", pipeline.PipelineRunControllerName, pr)
// GetNamespacedName returns a k8s namespaced name that identifies this PipelineRun
func (pr *PipelineRun) GetNamespacedName() types.NamespacedName {
return types.NamespacedName{Namespace: pr.Namespace, Name: pr.Name}
vdemeester marked this conversation as resolved.
Show resolved Hide resolved
}

// IsTimedOut returns true if a pipelinerun has exceeded its spec.Timeout based on its status.Timeout
Expand Down Expand Up @@ -231,7 +230,7 @@ const (
PipelineRunReasonCancelled PipelineRunReason = "Cancelled"
// PipelineRunReasonTimedOut is the reason set when the PipelineRun has timed out
PipelineRunReasonTimedOut PipelineRunReason = "PipelineRunTimeout"
// ReasonStopping indicates that no new Tasks will be scheduled by the controller, and the
// PipelineRunReasonStopping indicates that no new Tasks will be scheduled by the controller, and the
// pipeline will stop once all running tasks complete their work
PipelineRunReasonStopping PipelineRunReason = "PipelineRunStopping"
)
Expand Down
13 changes: 6 additions & 7 deletions pkg/apis/pipeline/v1beta1/pipelinerun_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,10 @@ limitations under the License.
package v1beta1_test

import (
"fmt"
"testing"
"time"

"github.com/google/go-cmp/cmp"
tb "github.com/tektoncd/pipeline/internal/builder/v1beta1"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
"github.com/tektoncd/pipeline/test/diff"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -167,11 +165,12 @@ func TestPipelineRunHasVolumeClaimTemplate(t *testing.T) {
}
}

func TestPipelineRunKey(t *testing.T) {
pr := tb.PipelineRun("prunname")
expectedKey := fmt.Sprintf("PipelineRun/%p", pr)
if pr.GetRunKey() != expectedKey {
t.Fatalf("Expected taskrun key to be %s but got %s", expectedKey, pr.GetRunKey())
func TestGetNamespacedName(t *testing.T) {
pr := &v1beta1.PipelineRun{ObjectMeta: metav1.ObjectMeta{Namespace: "foo", Name: "prunname"}}
n := pr.GetNamespacedName()
expected := "foo/prunname"
if n.String() != expected {
t.Fatalf("Expected name to be %s but got %s", expected, n.String())
}
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/apis/pipeline/v1beta1/taskrun_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"knative.dev/pkg/apis"
duckv1beta1 "knative.dev/pkg/apis/duck/v1beta1"
)
Expand Down Expand Up @@ -411,10 +412,9 @@ func (tr *TaskRun) GetTimeout() time.Duration {
return tr.Spec.Timeout.Duration
}

// GetRunKey return the taskrun key for timeout handler map
func (tr *TaskRun) GetRunKey() string {
// The address of the pointer is a threadsafe unique identifier for the taskrun
return fmt.Sprintf("%s/%p", pipeline.TaskRunControllerName, tr)
// GetNamespacedName returns a k8s namespaced name that identifies this TaskRun
func (tr *TaskRun) GetNamespacedName() types.NamespacedName {
return types.NamespacedName{Namespace: tr.Namespace, Name: tr.Name}
}

// IsPartOfPipeline return true if TaskRun is a part of a Pipeline.
Expand Down
14 changes: 5 additions & 9 deletions pkg/apis/pipeline/v1beta1/taskrun_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package v1beta1_test

import (
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -172,14 +171,11 @@ func TestTaskRunHasVolumeClaimTemplate(t *testing.T) {
}

func TestTaskRunKey(t *testing.T) {
tr := &v1beta1.TaskRun{
ObjectMeta: metav1.ObjectMeta{
Name: "taskrunname",
},
}
expectedKey := fmt.Sprintf("TaskRun/%p", tr)
if tr.GetRunKey() != expectedKey {
t.Fatalf("Expected taskrun key to be %s but got %s", expectedKey, tr.GetRunKey())
tr := &v1beta1.TaskRun{ObjectMeta: metav1.ObjectMeta{Namespace: "foo", Name: "trunname"}}
n := tr.GetNamespacedName()
expected := "foo/trunname"
if n.String() != expected {
t.Fatalf("Expected name to be %s but got %s", expected, n.String())
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/reconciler/pipelinerun/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ func NewController(namespace string, images pipeline.Images) func(context.Contex
}
})

timeoutHandler.SetPipelineRunCallbackFunc(impl.Enqueue)
timeoutHandler.CheckTimeouts(namespace, kubeclientset, pipelineclientset)
timeoutHandler.SetCallbackFunc(impl.EnqueueKey)
timeoutHandler.CheckTimeouts(ctx, namespace, kubeclientset, pipelineclientset)

logger.Info("Setting up event handlers")
pipelineRunInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand Down
6 changes: 3 additions & 3 deletions pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,11 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, pr *v1beta1.PipelineRun)
pr.Status.InitializeConditions()
// In case node time was not synchronized, when controller has been scheduled to other nodes.
if pr.Status.StartTime.Sub(pr.CreationTimestamp.Time) < 0 {
logger.Warnf("PipelineRun %s createTimestamp %s is after the pipelineRun started %s", pr.GetRunKey(), pr.CreationTimestamp, pr.Status.StartTime)
logger.Warnf("PipelineRun %s createTimestamp %s is after the pipelineRun started %s", pr.GetNamespacedName().String(), pr.CreationTimestamp, pr.Status.StartTime)
pr.Status.StartTime = &pr.CreationTimestamp
}
// start goroutine to track pipelinerun timeout only startTime is not set
go c.timeoutHandler.WaitPipelineRun(pr, pr.Status.StartTime)
go c.timeoutHandler.Wait(pr.GetNamespacedName(), *pr.Status.StartTime, *pr.Spec.Timeout)
// Emit events. During the first reconcile the status of the PipelineRun may change twice
// from not Started to Started and then to Running, so we need to sent the event here
// and at the end of 'Reconcile' again.
Expand All @@ -171,7 +171,7 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, pr *v1beta1.PipelineRun)
logger.Errorf("Failed to delete StatefulSet for PipelineRun %s: %v", pr.Name, err)
return c.finishReconcileUpdateEmitEvents(ctx, pr, before, err)
}
c.timeoutHandler.Release(pr)
c.timeoutHandler.Release(pr.GetNamespacedName())
if err := c.updateTaskRunsStatusDirectly(pr); err != nil {
logger.Errorf("Failed to update TaskRun status for PipelineRun %s: %v", pr.Name, err)
return c.finishReconcileUpdateEmitEvents(ctx, pr, before, err)
Expand Down
4 changes: 2 additions & 2 deletions pkg/reconciler/taskrun/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ func NewController(namespace string, images pipeline.Images) func(context.Contex
}
})

timeoutHandler.SetTaskRunCallbackFunc(impl.Enqueue)
timeoutHandler.CheckTimeouts(namespace, kubeclientset, pipelineclientset)
timeoutHandler.SetCallbackFunc(impl.EnqueueKey)
timeoutHandler.CheckTimeouts(ctx, namespace, kubeclientset, pipelineclientset)

logger.Info("Setting up event handlers")
taskRunInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand Down
10 changes: 5 additions & 5 deletions pkg/reconciler/taskrun/taskrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, tr *v1beta1.TaskRun) pkg
tr.Status.InitializeConditions()
// In case node time was not synchronized, when controller has been scheduled to other nodes.
if tr.Status.StartTime.Sub(tr.CreationTimestamp.Time) < 0 {
logger.Warnf("TaskRun %s createTimestamp %s is after the taskRun started %s", tr.GetRunKey(), tr.CreationTimestamp, tr.Status.StartTime)
logger.Warnf("TaskRun %s createTimestamp %s is after the taskRun started %s", tr.GetNamespacedName().String(), tr.CreationTimestamp, tr.Status.StartTime)
tr.Status.StartTime = &tr.CreationTimestamp
}
// Emit events. During the first reconcile the status of the TaskRun may change twice
Expand All @@ -120,7 +120,7 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, tr *v1beta1.TaskRun) pkg
// send cloud events. So we stop here an return errors encountered this far.
return merr.ErrorOrNil()
}
c.timeoutHandler.Release(tr)
c.timeoutHandler.Release(tr.GetNamespacedName())
pod, err := c.KubeClientSet.CoreV1().Pods(tr.Namespace).Get(tr.Status.PodName, metav1.GetOptions{})
if err == nil {
err = podconvert.StopSidecars(c.Images.NopImage, c.KubeClientSet, *pod)
Expand Down Expand Up @@ -379,7 +379,7 @@ func (c *Reconciler) reconcile(ctx context.Context, tr *v1beta1.TaskRun,
logger.Error("Failed to create task run pod for task %q: %v", tr.Name, newErr)
return newErr
}
go c.timeoutHandler.WaitTaskRun(tr, tr.Status.StartTime)
go c.timeoutHandler.Wait(tr.GetNamespacedName(), *tr.Status.StartTime, *tr.Spec.Timeout)
}
if err := c.tracker.Track(tr.GetBuildPodRef(), tr); err != nil {
logger.Errorf("Failed to create tracker for build pod %q for taskrun %q: %v", tr.Name, tr.Name, err)
Expand Down Expand Up @@ -460,9 +460,9 @@ func (c *Reconciler) updateLabelsAndAnnotations(tr *v1beta1.TaskRun) (*v1beta1.T
func (c *Reconciler) handlePodCreationError(ctx context.Context, tr *v1beta1.TaskRun, err error) error {
var msg string
if isExceededResourceQuotaError(err) {
backoff, currentlyBackingOff := c.timeoutHandler.GetBackoff(tr)
backoff, currentlyBackingOff := c.timeoutHandler.GetBackoff(tr.GetNamespacedName(), *tr.Status.StartTime, *tr.Spec.Timeout)
if !currentlyBackingOff {
go c.timeoutHandler.SetTaskRunTimer(tr, time.Until(backoff.NextAttempt))
go c.timeoutHandler.SetTimer(tr.GetNamespacedName(), time.Until(backoff.NextAttempt))
}
msg = fmt.Sprintf("TaskRun Pod exceeded available resources, reattempted %d times", backoff.NumAttempts)
tr.Status.SetCondition(&apis.Condition{
Expand Down
2 changes: 1 addition & 1 deletion pkg/reconciler/taskrun/taskrun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2075,7 +2075,7 @@ func TestHandlePodCreationError(t *testing.T) {
}

// Prevent backoff timer from starting
c.timeoutHandler.SetTaskRunCallbackFunc(nil)
c.timeoutHandler.SetCallbackFunc(nil)

testcases := []struct {
description string
Expand Down
Loading