Skip to content

Commit

Permalink
Change TimeoutHandler to respect namespace scoping
Browse files Browse the repository at this point in the history
* Pass `--namespace` value to taskrun and pipelinerun NewController
to facilitate namespace-scoped behaviors
* Change TimeoutHandler `CheckTimeouts` to timeout taskrun/pipelinerun's
in the scoped namespace or all namespaces (default if unset)

Related: #2603
  • Loading branch information
dghubble committed May 12, 2020
1 parent 62f41d8 commit b850226
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 22 deletions.
4 changes: 2 additions & 2 deletions cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func main() {
ImageDigestExporterImage: *imageDigestExporterImage,
}
sharedmain.MainWithContext(injection.WithNamespaceScope(signals.NewContext(), *namespace), ControllerLogKey,
taskrun.NewController(images),
pipelinerun.NewController(images),
taskrun.NewController(*namespace, images),
pipelinerun.NewController(*namespace, images),
)
}
4 changes: 2 additions & 2 deletions pkg/reconciler/pipelinerun/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ const (
)

// NewController instantiates a new controller.Impl from knative.dev/pkg/controller
func NewController(images pipeline.Images) func(context.Context, configmap.Watcher) *controller.Impl {
func NewController(namespace string, images pipeline.Images) func(context.Context, configmap.Watcher) *controller.Impl {
return func(ctx context.Context, cmw configmap.Watcher) *controller.Impl {
logger := logging.FromContext(ctx)
kubeclientset := kubeclient.Get(ctx)
Expand Down Expand Up @@ -87,7 +87,7 @@ func NewController(images pipeline.Images) func(context.Context, configmap.Watch
impl := controller.NewImpl(c, c.Logger, pipeline.PipelineRunControllerName)

timeoutHandler.SetPipelineRunCallbackFunc(impl.Enqueue)
timeoutHandler.CheckTimeouts(kubeclientset, pipelineclientset)
timeoutHandler.CheckTimeouts(namespace, kubeclientset, pipelineclientset)

c.Logger.Info("Setting up event handlers")
pipelineRunInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand Down
3 changes: 2 additions & 1 deletion pkg/reconciler/pipelinerun/pipelinerun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
)

var (
namespace = ""
ignoreLastTransitionTime = cmpopts.IgnoreTypes(apis.Condition{}.LastTransitionTime.Inner.Time)
images = pipeline.Images{
EntrypointImage: "override-with-entrypoint:latest",
Expand Down Expand Up @@ -75,7 +76,7 @@ func getPipelineRunController(t *testing.T, d test.Data) (test.Assets, func()) {
configMapWatcher := configmap.NewInformedWatcher(c.Kube, system.GetNamespace())
ctx, cancel := context.WithCancel(ctx)
return test.Assets{
Controller: NewController(images)(ctx, configMapWatcher),
Controller: NewController(namespace, images)(ctx, configMapWatcher),
Clients: c,
}, cancel
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/reconciler/taskrun/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ const (
)

// NewController instantiates a new controller.Impl from knative.dev/pkg/controller
func NewController(images pipeline.Images) func(context.Context, configmap.Watcher) *controller.Impl {
func NewController(namespace string, images pipeline.Images) func(context.Context, configmap.Watcher) *controller.Impl {
return func(ctx context.Context, cmw configmap.Watcher) *controller.Impl {
logger := logging.FromContext(ctx)
kubeclientset := kubeclient.Get(ctx)
Expand Down Expand Up @@ -90,7 +90,7 @@ func NewController(images pipeline.Images) func(context.Context, configmap.Watch
impl := controller.NewImpl(c, c.Logger, pipeline.TaskRunControllerName)

timeoutHandler.SetTaskRunCallbackFunc(impl.Enqueue)
timeoutHandler.CheckTimeouts(kubeclientset, pipelineclientset)
timeoutHandler.CheckTimeouts(namespace, kubeclientset, pipelineclientset)

c.Logger.Info("Setting up event handlers")
taskRunInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand Down
5 changes: 3 additions & 2 deletions pkg/reconciler/taskrun/taskrun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ const (
)

var (
images = pipeline.Images{
namespace = "" // all namespaces
images = pipeline.Images{
EntrypointImage: "override-with-entrypoint:latest",
NopImage: "tianon/true",
GitImage: "override-with-git:latest",
Expand Down Expand Up @@ -268,7 +269,7 @@ func getTaskRunController(t *testing.T, d test.Data) (test.Assets, func()) {
c, _ := test.SeedTestData(t, ctx, d)
configMapWatcher := configmap.NewInformedWatcher(c.Kube, system.GetNamespace())
return test.Assets{
Controller: NewController(images)(ctx, configMapWatcher),
Controller: NewController(namespace, images)(ctx, configMapWatcher),
Clients: c,
}, cancel
}
Expand Down
30 changes: 20 additions & 10 deletions pkg/reconciler/timeout_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,17 +184,27 @@ func (t *TimeoutSet) checkPipelineRunTimeouts(namespace string, pipelineclientse
}
}

// CheckTimeouts function iterates through all namespaces and calls corresponding
// taskrun/pipelinerun timeout functions
func (t *TimeoutSet) CheckTimeouts(kubeclientset kubernetes.Interface, pipelineclientset clientset.Interface) {
namespaces, err := kubeclientset.CoreV1().Namespaces().List(metav1.ListOptions{})
if err != nil {
t.logger.Errorf("Can't get namespaces list: %s", err)
return
// CheckTimeouts function iterates through a given namespace or all namespaces
// (if empty string) and calls corresponding taskrun/pipelinerun timeout functions
func (t *TimeoutSet) CheckTimeouts(namespace string, kubeclientset kubernetes.Interface, pipelineclientset clientset.Interface) {
// scoped namespace
namespaceNames := []string{namespace}
// all namespaces
if namespace == "" {
namespaces, err := kubeclientset.CoreV1().Namespaces().List(metav1.ListOptions{})
if err != nil {
t.logger.Errorf("Can't get namespaces list: %s", err)
return
}
namespaceNames = make([]string, len(namespaces.Items))
for i, namespace := range namespaces.Items {
namespaceNames[i] = namespace.GetName()
}
}
for _, namespace := range namespaces.Items {
t.checkTaskRunTimeouts(namespace.GetName(), pipelineclientset)
t.checkPipelineRunTimeouts(namespace.GetName(), pipelineclientset)

for _, namespace := range namespaceNames {
t.checkTaskRunTimeouts(namespace, pipelineclientset)
t.checkPipelineRunTimeouts(namespace, pipelineclientset)
}
}

Expand Down
85 changes: 82 additions & 3 deletions pkg/reconciler/timeout_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
)

var (
allNs = ""
testNs = "foo"
simpleStep = tb.Step(testNs, tb.StepCommand("/mycmd"))
simpleTask = tb.Task("test-task", tb.TaskSpec(simpleStep))
Expand Down Expand Up @@ -109,7 +110,7 @@ func TestTaskRunCheckTimeouts(t *testing.T) {
}

th.SetTaskRunCallbackFunc(f)
th.CheckTimeouts(c.Kube, c.Pipeline)
th.CheckTimeouts(allNs, c.Kube, c.Pipeline)

for _, tc := range []struct {
name string
Expand Down Expand Up @@ -157,6 +158,84 @@ func TestTaskRunCheckTimeouts(t *testing.T) {

}

func TestTaskRunSingleNamespaceCheckTimeouts(t *testing.T) {
taskRunTimedout := tb.TaskRun("test-taskrun-run-timedout-foo", tb.TaskRunNamespace(testNs), tb.TaskRunSpec(
tb.TaskRunTaskRef(simpleTask.Name, tb.TaskRefAPIVersion("a1")),
tb.TaskRunTimeout(1*time.Second),
), tb.TaskRunStatus(tb.StatusCondition(apis.Condition{
Type: apis.ConditionSucceeded,
Status: corev1.ConditionUnknown}),
tb.TaskRunStartTime(time.Now().Add(-10*time.Second)),
))

taskRunTimedoutOtherNS := tb.TaskRun("test-taskrun-run-timedout-bar", tb.TaskRunNamespace("otherNS"), tb.TaskRunSpec(
tb.TaskRunTaskRef(simpleTask.Name, tb.TaskRefAPIVersion("a1")),
tb.TaskRunTimeout(1*time.Second),
), tb.TaskRunStatus(tb.StatusCondition(apis.Condition{
Type: apis.ConditionSucceeded,
Status: corev1.ConditionUnknown}),
tb.TaskRunStartTime(time.Now().Add(-10*time.Second)),
))

d := test.Data{
TaskRuns: []*v1alpha1.TaskRun{taskRunTimedout, taskRunTimedoutOtherNS},
Tasks: []*v1alpha1.Task{simpleTask},
Namespaces: []*corev1.Namespace{{
ObjectMeta: metav1.ObjectMeta{
Name: testNs,
},
}},
}
ctx, _ := ttesting.SetupFakeContext(t)
c, _ := test.SeedTestData(t, ctx, d)
stopCh := make(chan struct{})
defer close(stopCh)
observer, _ := observer.New(zap.InfoLevel)

th := NewTimeoutHandler(stopCh, zap.New(observer).Sugar())
gotCallback := sync.Map{}
f := func(tr interface{}) {
trNew := tr.(*v1alpha1.TaskRun)
gotCallback.Store(trNew.Name, struct{}{})
}

th.SetTaskRunCallbackFunc(f)
th.CheckTimeouts(testNs, c.Kube, c.Pipeline)

for _, tc := range []struct {
name string
taskRun *v1alpha1.TaskRun
expectCallback bool
}{{
name: "timedout",
taskRun: taskRunTimedout,
expectCallback: true,
}, {
name: "timedout",
taskRun: taskRunTimedoutOtherNS,
expectCallback: false,
}} {
t.Run(tc.name, func(t *testing.T) {
if err := wait.PollImmediate(100*time.Millisecond, 3*time.Second, func() (bool, error) {
if tc.expectCallback {
if _, ok := gotCallback.Load(tc.taskRun.Name); ok {
return true, nil
}
return false, nil
}
// not expecting callback
if _, ok := gotCallback.Load(tc.taskRun.Name); ok {
return false, fmt.Errorf("did not expect call back for %s why", tc.taskRun.Name)
}
return true, nil
}); err != nil {
t.Fatalf("Expected %s callback to be %t but got error: %s", tc.name, tc.expectCallback, err)
}
})
}

}

func TestPipelinRunCheckTimeouts(t *testing.T) {
simplePipeline := tb.Pipeline("test-pipeline", tb.PipelineNamespace(testNs), tb.PipelineSpec(
tb.PipelineTask("hello-world-1", "hello-world"),
Expand Down Expand Up @@ -235,7 +314,7 @@ func TestPipelinRunCheckTimeouts(t *testing.T) {
}

th.SetPipelineRunCallbackFunc(f)
th.CheckTimeouts(c.Kube, c.Pipeline)
th.CheckTimeouts(allNs, c.Kube, c.Pipeline)
for _, tc := range []struct {
name string
pr *v1alpha1.PipelineRun
Expand Down Expand Up @@ -314,7 +393,7 @@ func TestWithNoFunc(t *testing.T) {
t.Fatal("Expected CheckTimeouts function not to panic")
}
}()
testHandler.CheckTimeouts(c.Kube, c.Pipeline)
testHandler.CheckTimeouts(allNs, c.Kube, c.Pipeline)

}

Expand Down

0 comments on commit b850226

Please sign in to comment.