Skip to content

Commit

Permalink
[TEP-0137] Restructure customrun event controller
Browse files Browse the repository at this point in the history
The events controllers for different resources (CustomRun, TaskRun
and PipelineRun) will be almost identical, with only the resource
type being different.

This commit refactors the CustomRun events controller to factor up
as much as possible of the reconciler, controller and test logic
so that we can reuse it in the upcoming commits for the other
resources.

I've done a slight change of strategy in the unit test structure
compared to what we do for the core controller tests.
A set of tests verifies as much as possible of the shared
functions, by mocking the event functionality away. These tests
are independent of the specific target format of the events.

Most of the functionality of the ReconcileKind functions is
handled in reconciler tests, which do not need a controller object
or a config map watcher to run, which reduces the complexity of
these tests without sacrifying coverage.

Finally, a smaller set of tests covers the controller -> reconciler
logic, so verify that our controller works well when invoking
the ReconcileKind indirectly through the generated package.

Signed-off-by: Andrea Frittoli <andrea.frittoli@uk.ibm.com>
  • Loading branch information
afrittoli committed Jul 3, 2023
1 parent ea090ba commit 3be4da5
Show file tree
Hide file tree
Showing 16 changed files with 583 additions and 125 deletions.
3 changes: 2 additions & 1 deletion cmd/events/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ func main() {
}()

// start the events controller
sharedmain.Main(eventsControllerName, customrun.NewController())
sharedmain.Main(eventsControllerName,
customrun.NewController())
}

func handler(w http.ResponseWriter, r *http.Request) {
Expand Down
7 changes: 6 additions & 1 deletion docs/pipeline-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -11445,7 +11445,12 @@ this ResultsType.</p>
<h3 id="tekton.dev/v1beta1.RunObject">RunObject
</h3>
<div>
<p>RunObject is implemented by CustomRun and Run</p>
<p>RunObject is implemented by Run, CustomRun, TaskRun and PipelineRun</p>
</div>
<h3 id="tekton.dev/v1beta1.RunObjectWithRetries">RunObjectWithRetries
</h3>
<div>
<p>RunObject is implemented by Run and CustomRun</p>
</div>
<h3 id="tekton.dev/v1beta1.Sidecar">Sidecar
</h3>
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/config/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ type Store struct {
func NewStore(logger configmap.Logger, onAfterStore ...func(name string, value interface{})) *Store {
store := &Store{
UntypedStore: configmap.NewUntypedStore(
"defaults/features/artifacts",
"defaults/features/metrics/spire/events",
logger,
configmap.Constructors{
GetDefaultsConfigName(): NewDefaultsFromConfigMap,
Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/pipeline/v1/pipelinerun_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ func (pr *PipelineRun) HasStarted() bool {
return pr.Status.StartTime != nil && !pr.Status.StartTime.IsZero()
}

// IsSuccessful returns true if the TaskRun's status indicates that it has succeeded.
func (tr *PipelineRun) IsSuccessful() bool {
return tr != nil && tr.Status.GetCondition(apis.ConditionSucceeded).IsTrue()
}

// IsCancelled returns true if the PipelineRun's spec status is set to Cancelled state
func (pr *PipelineRun) IsCancelled() bool {
return pr.Spec.Status == PipelineRunSpecStatusCancelled
Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/pipeline/v1beta1/pipelinerun_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ func (pr *PipelineRun) HasStarted() bool {
return pr.Status.StartTime != nil && !pr.Status.StartTime.IsZero()
}

// IsSuccessful returns true if the TaskRun's status indicates that it has succeeded.
func (tr *PipelineRun) IsSuccessful() bool {
return tr != nil && tr.Status.GetCondition(apis.ConditionSucceeded).IsTrue()
}

// IsCancelled returns true if the PipelineRun's spec status is set to Cancelled state
func (pr *PipelineRun) IsCancelled() bool {
return pr.Spec.Status == PipelineRunSpecStatusCancelled
Expand Down
7 changes: 6 additions & 1 deletion pkg/apis/pipeline/v1beta1/run_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"knative.dev/pkg/apis"
)

// RunObject is implemented by CustomRun and Run
// RunObject is implemented by Run, CustomRun, TaskRun and PipelineRun
type RunObject interface {
// Object requires GetObjectKind() and DeepCopyObject()
runtime.Object
Expand All @@ -38,6 +38,11 @@ type RunObject interface {
IsCancelled() bool
HasStarted() bool
IsDone() bool
}

// RunObject is implemented by Run and CustomRun
type RunObjectWithRetries interface {
RunObject

GetRetryCount() int
}
53 changes: 53 additions & 0 deletions pkg/reconciler/notifications/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
Copyright 2023 The Tekton Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package notifications

import (
"context"

"github.com/tektoncd/pipeline/pkg/apis/config"
cacheclient "github.com/tektoncd/pipeline/pkg/reconciler/events/cache"
cloudeventclient "github.com/tektoncd/pipeline/pkg/reconciler/events/cloudevent"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
"knative.dev/pkg/logging"
)

// ConfigStoreFromContext initialise the config store from the context
func ConfigStoreFromContext(ctx context.Context, cmw configmap.Watcher) *config.Store {
logger := logging.FromContext(ctx)
configStore := config.NewStore(logger.Named("config-store"))
configStore.WatchConfigs(cmw)
return configStore
}

// ReconcilerFromContext initialises a Reconciler from the context
func ReconcilerFromContext(ctx context.Context, c Reconciler) {
c.SetCloudEventsClient(cloudeventclient.Get(ctx))
c.SetCacheClient(cacheclient.Get(ctx))
}

// ControllerOptions returns a function that returns options for a controller implementation
func ControllerOptions(name string, store *config.Store) func(impl *controller.Impl) controller.Options {
return func(impl *controller.Impl) controller.Options {
return controller.Options{
AgentName: name,
ConfigStore: store,
SkipStatusUpdates: true,
}
}
}
31 changes: 10 additions & 21 deletions pkg/reconciler/notifications/customrun/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,39 +19,28 @@ package customrun
import (
"context"

"github.com/tektoncd/pipeline/pkg/apis/config"
"github.com/tektoncd/pipeline/pkg/apis/pipeline"
customruninformer "github.com/tektoncd/pipeline/pkg/client/injection/informers/pipeline/v1beta1/customrun"
customrunreconciler "github.com/tektoncd/pipeline/pkg/client/injection/reconciler/pipeline/v1beta1/customrun"
cacheclient "github.com/tektoncd/pipeline/pkg/reconciler/events/cache"
cloudeventclient "github.com/tektoncd/pipeline/pkg/reconciler/events/cloudevent"
"github.com/tektoncd/pipeline/pkg/reconciler/notifications"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
"knative.dev/pkg/logging"
)

const ControllerName = "CustomRunEvents"

// NewController instantiates a new controller.Impl from knative.dev/pkg/controller
// This is a read-only controller, hence the SkipStatusUpdates set to true
func NewController() func(context.Context, configmap.Watcher) *controller.Impl {
return func(ctx context.Context, cmw configmap.Watcher) *controller.Impl {
logger := logging.FromContext(ctx)
customRunInformer := customruninformer.Get(ctx)

configStore := config.NewStore(logger.Named("config-store"))
configStore.WatchConfigs(cmw)

c := &Reconciler{
cloudEventClient: cloudeventclient.Get(ctx),
cacheClient: cacheclient.Get(ctx),
}
impl := customrunreconciler.NewImpl(ctx, c, func(impl *controller.Impl) controller.Options {
return controller.Options{
AgentName: pipeline.CustomRunControllerName,
ConfigStore: configStore,
SkipStatusUpdates: true,
}
})
configStore := notifications.ConfigStoreFromContext(ctx, cmw)

c := &Reconciler{}
notifications.ReconcilerFromContext(ctx, c)

impl := customrunreconciler.NewImpl(ctx, c, notifications.ControllerOptions("", configStore))

customRunInformer := customruninformer.Get(ctx)
customRunInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue))

return impl
Expand Down
141 changes: 141 additions & 0 deletions pkg/reconciler/notifications/customrun/controller_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
Copyright 2019 The Tekton Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package customrun_test

import (
"testing"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/tektoncd/pipeline/pkg/apis/config"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
"github.com/tektoncd/pipeline/pkg/reconciler/events/cloudevent"
"github.com/tektoncd/pipeline/pkg/reconciler/notifications/customrun"
rtesting "github.com/tektoncd/pipeline/pkg/reconciler/testing"
"github.com/tektoncd/pipeline/test"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"
cminformer "knative.dev/pkg/configmap/informer"
pkgreconciler "knative.dev/pkg/reconciler"
"knative.dev/pkg/system"
_ "knative.dev/pkg/system/testing" // Setup system.Namespace()
)

func InitializeTestController(t *testing.T, d test.Data, a test.Assets) test.Assets {
t.Helper()
configMapWatcher := cminformer.NewInformedWatcher(a.Clients.Kube, system.Namespace())
ctl := customrun.NewController()(a.Ctx, configMapWatcher)
if err := configMapWatcher.Start(a.Ctx.Done()); err != nil {
t.Fatalf("error starting configmap watcher: %v", err)
}

if la, ok := ctl.Reconciler.(pkgreconciler.LeaderAware); ok {
la.Promote(pkgreconciler.UniversalBucket(), func(pkgreconciler.Bucket, types.NamespacedName) {})
}
a.Controller = ctl
return a
}

// TestReconcileNewController runs reconcile with a cloud event sink configured
// to ensure that events are sent in different cases
func TestReconcileNewController(t *testing.T) {
ignoreResourceVersion := cmpopts.IgnoreFields(v1beta1.CustomRun{}, "ObjectMeta.ResourceVersion")

cms := []*corev1.ConfigMap{
{
ObjectMeta: metav1.ObjectMeta{Name: config.GetEventsConfigName(), Namespace: system.Namespace()},
Data: map[string]string{
"sink": "http://synk:8080",
},
}, {
ObjectMeta: metav1.ObjectMeta{Name: config.GetFeatureFlagsConfigName(), Namespace: system.Namespace()},
Data: map[string]string{
"send-cloudevents-for-runs": "true",
},
},
}

condition := &apis.Condition{
Type: apis.ConditionSucceeded,
Status: corev1.ConditionTrue,
Reason: v1beta1.CustomRunReasonSuccessful.String(),
}
objectStatus := duckv1.Status{
Conditions: []apis.Condition{},
}
crStatusFields := v1beta1.CustomRunStatusFields{}
objectStatus.Conditions = append(objectStatus.Conditions, *condition)
customRun := v1beta1.CustomRun{
ObjectMeta: metav1.ObjectMeta{
Name: "test-customRun",
Namespace: "foo",
},
Spec: v1beta1.CustomRunSpec{},
Status: v1beta1.CustomRunStatus{
Status: objectStatus,
CustomRunStatusFields: crStatusFields,
},
}
customRuns := []*v1beta1.CustomRun{&customRun}
wantCloudEvents := []string{`(?s)dev.tekton.event.customrun.successful.v1.*test-customRun`}

d := test.Data{
CustomRuns: customRuns,
ConfigMaps: cms,
ExpectedCloudEventCount: len(wantCloudEvents),
}
testAssets, cancel := rtesting.InitializeTestAssets(t, &d)
defer cancel()
clients := testAssets.Clients

// Initialise the controller.
// Verify that the config map watcher and reconciler setup works well
testAssets = InitializeTestController(t, d, testAssets)
c := testAssets.Controller

if err := c.Reconciler.Reconcile(testAssets.Ctx, rtesting.GetTestResourceName(&customRun)); err != nil {
t.Errorf("didn't expect an error, but got one: %v", err)
}

for _, a := range clients.Kube.Actions() {
aVerb := a.GetVerb()
if aVerb != "get" && aVerb != "list" && aVerb != "watch" {
t.Errorf("Expected only read actions to be logged in the kubeclient, got %s", aVerb)
}
}

crAfter, err := clients.Pipeline.TektonV1beta1().CustomRuns(customRun.Namespace).Get(testAssets.Ctx, customRun.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("getting updated customRun: %v", err)
}

if d := cmp.Diff(&customRun, crAfter, ignoreResourceVersion); d != "" {
t.Fatalf("CustomRun should not have changed, got %v instead", d)
}

ceClient := clients.CloudEvents.(cloudevent.FakeClient)
ceClient.CheckCloudEventsUnordered(t, "controller test", wantCloudEvents)

// Try and reconcile again - expect no event
if err := c.Reconciler.Reconcile(testAssets.Ctx, rtesting.GetTestResourceName(&customRun)); err != nil {
t.Errorf("didn't expect an error, but got one: %v", err)
}
ceClient.CheckCloudEventsUnordered(t, "controller test", []string{})
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,8 @@ import (
"github.com/tektoncd/pipeline/pkg/apis/config"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
customrunreconciler "github.com/tektoncd/pipeline/pkg/client/injection/reconciler/pipeline/v1beta1/customrun"
"github.com/tektoncd/pipeline/pkg/reconciler/events"
"github.com/tektoncd/pipeline/pkg/reconciler/events/cache"
"github.com/tektoncd/pipeline/pkg/reconciler/events/cloudevent"
"knative.dev/pkg/apis"
"knative.dev/pkg/logging"
"github.com/tektoncd/pipeline/pkg/reconciler/notifications"
pkgreconciler "knative.dev/pkg/reconciler"
)

Expand All @@ -37,35 +34,36 @@ type Reconciler struct {
cacheClient *lru.Cache
}

func (c *Reconciler) GetCloudEventsClient() cloudevent.CEClient {
return c.cloudEventClient
}

func (c *Reconciler) GetCacheClient() *lru.Cache {
return c.cacheClient
}

func (c *Reconciler) SetCloudEventsClient(client cloudevent.CEClient) {
c.cloudEventClient = client
}

func (c *Reconciler) SetCacheClient(client *lru.Cache) {
c.cacheClient = client
}

// Check that our Reconciler implements customrunreconciler.Interface
var (
_ customrunreconciler.Interface = (*Reconciler)(nil)
)

// ReconcileKind compares the actual state with the desired, and attempts to
// converge the two. It then updates the Status block of the CustomRun
// resource with the current status of the resource.
// ReconcileKind oberves the resource conditions and triggers notifications accordingly
func (c *Reconciler) ReconcileKind(ctx context.Context, customRun *v1beta1.CustomRun) pkgreconciler.Event {
logger := logging.FromContext(ctx)
configs := config.FromContextOrDefaults(ctx)
ctx = cloudevent.ToContext(ctx, c.cloudEventClient)
ctx = cache.ToContext(ctx, c.cacheClient)
logger.Infof("Reconciling %s", customRun.Name)

// Create a copy of the CustomRun object, just in case, to avoid sync'ing changes
customRunEvents := *customRun.DeepCopy()

if configs.FeatureFlags.SendCloudEventsForRuns {
// Custom task controllers may be sending events for "CustomRuns" associated
// to the custom tasks they control. To avoid sending duplicate events,
// CloudEvents for "CustomRuns" are only sent when enabled

// Read and log the condition
condition := customRunEvents.Status.GetCondition(apis.ConditionSucceeded)
logger.Debugf("Emitting cloudevent for %s, condition: %s", customRunEvents.Name, condition)

events.EmitCloudEvents(ctx, &customRunEvents)
return notifications.ReconcileRuntimeObject(ctx, c, customRun)
}

return nil
}
Loading

0 comments on commit 3be4da5

Please sign in to comment.