diff --git a/cmd/controller/main.go b/cmd/controller/main.go index d7834b79331..e8b3e09548c 100644 --- a/cmd/controller/main.go +++ b/cmd/controller/main.go @@ -70,7 +70,7 @@ func main() { ImageDigestExporterImage: *imageDigestExporterImage, } sharedmain.MainWithContext(injection.WithNamespaceScope(signals.NewContext(), *namespace), ControllerLogKey, - taskrun.NewController(images), - pipelinerun.NewController(images), + taskrun.NewController(*namespace, images), + pipelinerun.NewController(*namespace, images), ) } diff --git a/pkg/reconciler/pipelinerun/controller.go b/pkg/reconciler/pipelinerun/controller.go index 98cfda94771..f07c458ffb3 100644 --- a/pkg/reconciler/pipelinerun/controller.go +++ b/pkg/reconciler/pipelinerun/controller.go @@ -45,7 +45,7 @@ const ( ) // NewController instantiates a new controller.Impl from knative.dev/pkg/controller -func NewController(images pipeline.Images) func(context.Context, configmap.Watcher) *controller.Impl { +func NewController(namespace string, images pipeline.Images) func(context.Context, configmap.Watcher) *controller.Impl { return func(ctx context.Context, cmw configmap.Watcher) *controller.Impl { logger := logging.FromContext(ctx) kubeclientset := kubeclient.Get(ctx) @@ -87,7 +87,7 @@ func NewController(images pipeline.Images) func(context.Context, configmap.Watch impl := controller.NewImpl(c, c.Logger, pipeline.PipelineRunControllerName) timeoutHandler.SetPipelineRunCallbackFunc(impl.Enqueue) - timeoutHandler.CheckTimeouts(kubeclientset, pipelineclientset) + timeoutHandler.CheckTimeouts(namespace, kubeclientset, pipelineclientset) c.Logger.Info("Setting up event handlers") pipelineRunInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ diff --git a/pkg/reconciler/pipelinerun/pipelinerun_test.go b/pkg/reconciler/pipelinerun/pipelinerun_test.go index a8aca96c10b..42934845106 100644 --- a/pkg/reconciler/pipelinerun/pipelinerun_test.go +++ b/pkg/reconciler/pipelinerun/pipelinerun_test.go @@ -49,6 +49,7 @@ import ( ) var ( + namespace = "" ignoreLastTransitionTime = cmpopts.IgnoreTypes(apis.Condition{}.LastTransitionTime.Inner.Time) images = pipeline.Images{ EntrypointImage: "override-with-entrypoint:latest", @@ -77,7 +78,7 @@ func getPipelineRunController(t *testing.T, d test.Data) (test.Assets, func()) { configMapWatcher := configmap.NewInformedWatcher(c.Kube, system.GetNamespace()) ctx, cancel := context.WithCancel(ctx) return test.Assets{ - Controller: NewController(images)(ctx, configMapWatcher), + Controller: NewController(namespace, images)(ctx, configMapWatcher), Clients: c, }, cancel } diff --git a/pkg/reconciler/taskrun/controller.go b/pkg/reconciler/taskrun/controller.go index 1ef766049b9..9bb05f33971 100644 --- a/pkg/reconciler/taskrun/controller.go +++ b/pkg/reconciler/taskrun/controller.go @@ -45,7 +45,7 @@ const ( ) // NewController instantiates a new controller.Impl from knative.dev/pkg/controller -func NewController(images pipeline.Images) func(context.Context, configmap.Watcher) *controller.Impl { +func NewController(namespace string, images pipeline.Images) func(context.Context, configmap.Watcher) *controller.Impl { return func(ctx context.Context, cmw configmap.Watcher) *controller.Impl { logger := logging.FromContext(ctx) kubeclientset := kubeclient.Get(ctx) @@ -90,7 +90,7 @@ func NewController(images pipeline.Images) func(context.Context, configmap.Watch impl := controller.NewImpl(c, c.Logger, pipeline.TaskRunControllerName) timeoutHandler.SetTaskRunCallbackFunc(impl.Enqueue) - timeoutHandler.CheckTimeouts(kubeclientset, pipelineclientset) + timeoutHandler.CheckTimeouts(namespace, kubeclientset, pipelineclientset) c.Logger.Info("Setting up event handlers") taskRunInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ diff --git a/pkg/reconciler/taskrun/taskrun_test.go b/pkg/reconciler/taskrun/taskrun_test.go index a2864db453d..12d3707dea0 100644 --- a/pkg/reconciler/taskrun/taskrun_test.go +++ b/pkg/reconciler/taskrun/taskrun_test.go @@ -63,7 +63,8 @@ const ( ) var ( - images = pipeline.Images{ + namespace = "" // all namespaces + images = pipeline.Images{ EntrypointImage: "override-with-entrypoint:latest", NopImage: "tianon/true", GitImage: "override-with-git:latest", @@ -267,7 +268,7 @@ func getTaskRunController(t *testing.T, d test.Data) (test.Assets, func()) { c, _ := test.SeedTestData(t, ctx, d) configMapWatcher := configmap.NewInformedWatcher(c.Kube, system.GetNamespace()) return test.Assets{ - Controller: NewController(images)(ctx, configMapWatcher), + Controller: NewController(namespace, images)(ctx, configMapWatcher), Clients: c, }, cancel } diff --git a/pkg/reconciler/timeout_handler.go b/pkg/reconciler/timeout_handler.go index d186e5d688f..52eb9d55cce 100644 --- a/pkg/reconciler/timeout_handler.go +++ b/pkg/reconciler/timeout_handler.go @@ -184,17 +184,27 @@ func (t *TimeoutSet) checkPipelineRunTimeouts(namespace string, pipelineclientse } } -// CheckTimeouts function iterates through all namespaces and calls corresponding -// taskrun/pipelinerun timeout functions -func (t *TimeoutSet) CheckTimeouts(kubeclientset kubernetes.Interface, pipelineclientset clientset.Interface) { - namespaces, err := kubeclientset.CoreV1().Namespaces().List(metav1.ListOptions{}) - if err != nil { - t.logger.Errorf("Can't get namespaces list: %s", err) - return +// CheckTimeouts function iterates through a given namespace or all namespaces +// (if empty string) and calls corresponding taskrun/pipelinerun timeout functions +func (t *TimeoutSet) CheckTimeouts(namespace string, kubeclientset kubernetes.Interface, pipelineclientset clientset.Interface) { + // scoped namespace + namespaceNames := []string{namespace} + // all namespaces + if namespace == "" { + namespaces, err := kubeclientset.CoreV1().Namespaces().List(metav1.ListOptions{}) + if err != nil { + t.logger.Errorf("Can't get namespaces list: %s", err) + return + } + namespaceNames = make([]string, len(namespaces.Items)) + for i, namespace := range namespaces.Items { + namespaceNames[i] = namespace.GetName() + } } - for _, namespace := range namespaces.Items { - t.checkTaskRunTimeouts(namespace.GetName(), pipelineclientset) - t.checkPipelineRunTimeouts(namespace.GetName(), pipelineclientset) + + for _, namespace := range namespaceNames { + t.checkTaskRunTimeouts(namespace, pipelineclientset) + t.checkPipelineRunTimeouts(namespace, pipelineclientset) } } diff --git a/pkg/reconciler/timeout_handler_test.go b/pkg/reconciler/timeout_handler_test.go index ee78ac03186..411a9fd7f3b 100644 --- a/pkg/reconciler/timeout_handler_test.go +++ b/pkg/reconciler/timeout_handler_test.go @@ -36,6 +36,7 @@ import ( ) var ( + allNs = "" testNs = "foo" simpleStep = tb.Step(testNs, tb.StepCommand("/mycmd")) simpleTask = tb.Task("test-task", tb.TaskSpec(simpleStep)) @@ -109,7 +110,7 @@ func TestTaskRunCheckTimeouts(t *testing.T) { } th.SetTaskRunCallbackFunc(f) - th.CheckTimeouts(c.Kube, c.Pipeline) + th.CheckTimeouts(allNs, c.Kube, c.Pipeline) for _, tc := range []struct { name string @@ -157,6 +158,84 @@ func TestTaskRunCheckTimeouts(t *testing.T) { } +func TestTaskRunSingleNamespaceCheckTimeouts(t *testing.T) { + taskRunTimedout := tb.TaskRun("test-taskrun-run-timedout-foo", tb.TaskRunNamespace(testNs), tb.TaskRunSpec( + tb.TaskRunTaskRef(simpleTask.Name, tb.TaskRefAPIVersion("a1")), + tb.TaskRunTimeout(1*time.Second), + ), tb.TaskRunStatus(tb.StatusCondition(apis.Condition{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionUnknown}), + tb.TaskRunStartTime(time.Now().Add(-10*time.Second)), + )) + + taskRunTimedoutOtherNS := tb.TaskRun("test-taskrun-run-timedout-bar", tb.TaskRunNamespace("otherNS"), tb.TaskRunSpec( + tb.TaskRunTaskRef(simpleTask.Name, tb.TaskRefAPIVersion("a1")), + tb.TaskRunTimeout(1*time.Second), + ), tb.TaskRunStatus(tb.StatusCondition(apis.Condition{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionUnknown}), + tb.TaskRunStartTime(time.Now().Add(-10*time.Second)), + )) + + d := test.Data{ + TaskRuns: []*v1beta1.TaskRun{taskRunTimedout, taskRunTimedoutOtherNS}, + Tasks: []*v1beta1.Task{simpleTask}, + Namespaces: []*corev1.Namespace{{ + ObjectMeta: metav1.ObjectMeta{ + Name: testNs, + }, + }}, + } + ctx, _ := ttesting.SetupFakeContext(t) + c, _ := test.SeedTestData(t, ctx, d) + stopCh := make(chan struct{}) + defer close(stopCh) + observer, _ := observer.New(zap.InfoLevel) + + th := NewTimeoutHandler(stopCh, zap.New(observer).Sugar()) + gotCallback := sync.Map{} + f := func(tr interface{}) { + trNew := tr.(*v1beta1.TaskRun) + gotCallback.Store(trNew.Name, struct{}{}) + } + + th.SetTaskRunCallbackFunc(f) + th.CheckTimeouts(testNs, c.Kube, c.Pipeline) + + for _, tc := range []struct { + name string + taskRun *v1beta1.TaskRun + expectCallback bool + }{{ + name: "timedout", + taskRun: taskRunTimedout, + expectCallback: true, + }, { + name: "timedout", + taskRun: taskRunTimedoutOtherNS, + expectCallback: false, + }} { + t.Run(tc.name, func(t *testing.T) { + if err := wait.PollImmediate(100*time.Millisecond, 3*time.Second, func() (bool, error) { + if tc.expectCallback { + if _, ok := gotCallback.Load(tc.taskRun.Name); ok { + return true, nil + } + return false, nil + } + // not expecting callback + if _, ok := gotCallback.Load(tc.taskRun.Name); ok { + return false, fmt.Errorf("did not expect call back for %s why", tc.taskRun.Name) + } + return true, nil + }); err != nil { + t.Fatalf("Expected %s callback to be %t but got error: %s", tc.name, tc.expectCallback, err) + } + }) + } + +} + func TestPipelinRunCheckTimeouts(t *testing.T) { simplePipeline := tb.Pipeline("test-pipeline", tb.PipelineNamespace(testNs), tb.PipelineSpec( tb.PipelineTask("hello-world-1", "hello-world"), @@ -235,7 +314,7 @@ func TestPipelinRunCheckTimeouts(t *testing.T) { } th.SetPipelineRunCallbackFunc(f) - th.CheckTimeouts(c.Kube, c.Pipeline) + th.CheckTimeouts(allNs, c.Kube, c.Pipeline) for _, tc := range []struct { name string pr *v1beta1.PipelineRun @@ -314,7 +393,7 @@ func TestWithNoFunc(t *testing.T) { t.Fatal("Expected CheckTimeouts function not to panic") } }() - testHandler.CheckTimeouts(c.Kube, c.Pipeline) + testHandler.CheckTimeouts(allNs, c.Kube, c.Pipeline) }