diff --git a/cmd/events/main.go b/cmd/events/main.go
index 8636eed1663..7b4a20e9712 100644
--- a/cmd/events/main.go
+++ b/cmd/events/main.go
@@ -47,7 +47,8 @@ func main() {
}()
// start the events controller
- sharedmain.Main(eventsControllerName, customrun.NewController())
+ sharedmain.Main(eventsControllerName,
+ customrun.NewController())
}
func handler(w http.ResponseWriter, r *http.Request) {
diff --git a/docs/pipeline-api.md b/docs/pipeline-api.md
index 4f7bba7bbea..74573b4464f 100644
--- a/docs/pipeline-api.md
+++ b/docs/pipeline-api.md
@@ -11445,7 +11445,12 @@ this ResultsType.
RunObject
-
RunObject is implemented by CustomRun and Run
+
RunObject is implemented by Run, CustomRun, TaskRun and PipelineRun
+
+RunObjectWithRetries
+
+
+
RunObject is implemented by Run and CustomRun
Sidecar
diff --git a/pkg/apis/config/store.go b/pkg/apis/config/store.go
index 76307196936..75e40d85fe9 100644
--- a/pkg/apis/config/store.go
+++ b/pkg/apis/config/store.go
@@ -76,7 +76,7 @@ type Store struct {
func NewStore(logger configmap.Logger, onAfterStore ...func(name string, value interface{})) *Store {
store := &Store{
UntypedStore: configmap.NewUntypedStore(
- "defaults/features/artifacts",
+ "defaults/features/metrics/spire/events",
logger,
configmap.Constructors{
GetDefaultsConfigName(): NewDefaultsFromConfigMap,
diff --git a/pkg/apis/pipeline/v1/pipelinerun_types.go b/pkg/apis/pipeline/v1/pipelinerun_types.go
index 88120472ea8..2cfddefef0a 100644
--- a/pkg/apis/pipeline/v1/pipelinerun_types.go
+++ b/pkg/apis/pipeline/v1/pipelinerun_types.go
@@ -83,6 +83,11 @@ func (pr *PipelineRun) HasStarted() bool {
return pr.Status.StartTime != nil && !pr.Status.StartTime.IsZero()
}
+// IsSuccessful returns true if the TaskRun's status indicates that it has succeeded.
+func (tr *PipelineRun) IsSuccessful() bool {
+ return tr != nil && tr.Status.GetCondition(apis.ConditionSucceeded).IsTrue()
+}
+
// IsCancelled returns true if the PipelineRun's spec status is set to Cancelled state
func (pr *PipelineRun) IsCancelled() bool {
return pr.Spec.Status == PipelineRunSpecStatusCancelled
diff --git a/pkg/apis/pipeline/v1beta1/pipelinerun_types.go b/pkg/apis/pipeline/v1beta1/pipelinerun_types.go
index 53c1f738c0f..766530c821a 100644
--- a/pkg/apis/pipeline/v1beta1/pipelinerun_types.go
+++ b/pkg/apis/pipeline/v1beta1/pipelinerun_types.go
@@ -82,6 +82,11 @@ func (pr *PipelineRun) HasStarted() bool {
return pr.Status.StartTime != nil && !pr.Status.StartTime.IsZero()
}
+// IsSuccessful returns true if the TaskRun's status indicates that it has succeeded.
+func (tr *PipelineRun) IsSuccessful() bool {
+ return tr != nil && tr.Status.GetCondition(apis.ConditionSucceeded).IsTrue()
+}
+
// IsCancelled returns true if the PipelineRun's spec status is set to Cancelled state
func (pr *PipelineRun) IsCancelled() bool {
return pr.Spec.Status == PipelineRunSpecStatusCancelled
diff --git a/pkg/apis/pipeline/v1beta1/run_interface.go b/pkg/apis/pipeline/v1beta1/run_interface.go
index 2a0ada19d68..107affe304e 100644
--- a/pkg/apis/pipeline/v1beta1/run_interface.go
+++ b/pkg/apis/pipeline/v1beta1/run_interface.go
@@ -23,7 +23,7 @@ import (
"knative.dev/pkg/apis"
)
-// RunObject is implemented by CustomRun and Run
+// RunObject is implemented by Run, CustomRun, TaskRun and PipelineRun
type RunObject interface {
// Object requires GetObjectKind() and DeepCopyObject()
runtime.Object
@@ -38,6 +38,11 @@ type RunObject interface {
IsCancelled() bool
HasStarted() bool
IsDone() bool
+}
+
+// RunObject is implemented by Run and CustomRun
+type RunObjectWithRetries interface {
+ RunObject
GetRetryCount() int
}
diff --git a/pkg/reconciler/notifications/controller.go b/pkg/reconciler/notifications/controller.go
new file mode 100644
index 00000000000..f1fce17f520
--- /dev/null
+++ b/pkg/reconciler/notifications/controller.go
@@ -0,0 +1,53 @@
+/*
+Copyright 2023 The Tekton Authors
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package notifications
+
+import (
+ "context"
+
+ "github.com/tektoncd/pipeline/pkg/apis/config"
+ cacheclient "github.com/tektoncd/pipeline/pkg/reconciler/events/cache"
+ cloudeventclient "github.com/tektoncd/pipeline/pkg/reconciler/events/cloudevent"
+ "knative.dev/pkg/configmap"
+ "knative.dev/pkg/controller"
+ "knative.dev/pkg/logging"
+)
+
+// ConfigStoreFromContext initialise the config store from the context
+func ConfigStoreFromContext(ctx context.Context, cmw configmap.Watcher) *config.Store {
+ logger := logging.FromContext(ctx)
+ configStore := config.NewStore(logger.Named("config-store"))
+ configStore.WatchConfigs(cmw)
+ return configStore
+}
+
+// ReconcilerFromContext initialises a Reconciler from the context
+func ReconcilerFromContext(ctx context.Context, c Reconciler) {
+ c.SetCloudEventsClient(cloudeventclient.Get(ctx))
+ c.SetCacheClient(cacheclient.Get(ctx))
+}
+
+// ControllerOptions returns a function that returns options for a controller implementation
+func ControllerOptions(name string, store *config.Store) func(impl *controller.Impl) controller.Options {
+ return func(impl *controller.Impl) controller.Options {
+ return controller.Options{
+ AgentName: name,
+ ConfigStore: store,
+ SkipStatusUpdates: true,
+ }
+ }
+}
diff --git a/pkg/reconciler/notifications/customrun/controller.go b/pkg/reconciler/notifications/customrun/controller.go
index 5e198c9cf54..e7b1a47c201 100644
--- a/pkg/reconciler/notifications/customrun/controller.go
+++ b/pkg/reconciler/notifications/customrun/controller.go
@@ -19,39 +19,28 @@ package customrun
import (
"context"
- "github.com/tektoncd/pipeline/pkg/apis/config"
- "github.com/tektoncd/pipeline/pkg/apis/pipeline"
customruninformer "github.com/tektoncd/pipeline/pkg/client/injection/informers/pipeline/v1beta1/customrun"
customrunreconciler "github.com/tektoncd/pipeline/pkg/client/injection/reconciler/pipeline/v1beta1/customrun"
- cacheclient "github.com/tektoncd/pipeline/pkg/reconciler/events/cache"
- cloudeventclient "github.com/tektoncd/pipeline/pkg/reconciler/events/cloudevent"
+ "github.com/tektoncd/pipeline/pkg/reconciler/notifications"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
- "knative.dev/pkg/logging"
)
+const ControllerName = "CustomRunEvents"
+
// NewController instantiates a new controller.Impl from knative.dev/pkg/controller
// This is a read-only controller, hence the SkipStatusUpdates set to true
func NewController() func(context.Context, configmap.Watcher) *controller.Impl {
return func(ctx context.Context, cmw configmap.Watcher) *controller.Impl {
- logger := logging.FromContext(ctx)
- customRunInformer := customruninformer.Get(ctx)
- configStore := config.NewStore(logger.Named("config-store"))
- configStore.WatchConfigs(cmw)
-
- c := &Reconciler{
- cloudEventClient: cloudeventclient.Get(ctx),
- cacheClient: cacheclient.Get(ctx),
- }
- impl := customrunreconciler.NewImpl(ctx, c, func(impl *controller.Impl) controller.Options {
- return controller.Options{
- AgentName: pipeline.CustomRunControllerName,
- ConfigStore: configStore,
- SkipStatusUpdates: true,
- }
- })
+ configStore := notifications.ConfigStoreFromContext(ctx, cmw)
+
+ c := &Reconciler{}
+ notifications.ReconcilerFromContext(ctx, c)
+ impl := customrunreconciler.NewImpl(ctx, c, notifications.ControllerOptions("", configStore))
+
+ customRunInformer := customruninformer.Get(ctx)
customRunInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue))
return impl
diff --git a/pkg/reconciler/notifications/customrun/controller_test.go b/pkg/reconciler/notifications/customrun/controller_test.go
new file mode 100644
index 00000000000..b99e10defe1
--- /dev/null
+++ b/pkg/reconciler/notifications/customrun/controller_test.go
@@ -0,0 +1,141 @@
+/*
+Copyright 2019 The Tekton Authors
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package customrun_test
+
+import (
+ "testing"
+
+ "github.com/google/go-cmp/cmp"
+ "github.com/google/go-cmp/cmp/cmpopts"
+ "github.com/tektoncd/pipeline/pkg/apis/config"
+ "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
+ "github.com/tektoncd/pipeline/pkg/reconciler/events/cloudevent"
+ "github.com/tektoncd/pipeline/pkg/reconciler/notifications/customrun"
+ rtesting "github.com/tektoncd/pipeline/pkg/reconciler/testing"
+ "github.com/tektoncd/pipeline/test"
+ corev1 "k8s.io/api/core/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/types"
+ "knative.dev/pkg/apis"
+ duckv1 "knative.dev/pkg/apis/duck/v1"
+ cminformer "knative.dev/pkg/configmap/informer"
+ pkgreconciler "knative.dev/pkg/reconciler"
+ "knative.dev/pkg/system"
+ _ "knative.dev/pkg/system/testing" // Setup system.Namespace()
+)
+
+func InitializeTestController(t *testing.T, d test.Data, a test.Assets) test.Assets {
+ t.Helper()
+ configMapWatcher := cminformer.NewInformedWatcher(a.Clients.Kube, system.Namespace())
+ ctl := customrun.NewController()(a.Ctx, configMapWatcher)
+ if err := configMapWatcher.Start(a.Ctx.Done()); err != nil {
+ t.Fatalf("error starting configmap watcher: %v", err)
+ }
+
+ if la, ok := ctl.Reconciler.(pkgreconciler.LeaderAware); ok {
+ la.Promote(pkgreconciler.UniversalBucket(), func(pkgreconciler.Bucket, types.NamespacedName) {})
+ }
+ a.Controller = ctl
+ return a
+}
+
+// TestReconcileNewController runs reconcile with a cloud event sink configured
+// to ensure that events are sent in different cases
+func TestReconcileNewController(t *testing.T) {
+ ignoreResourceVersion := cmpopts.IgnoreFields(v1beta1.CustomRun{}, "ObjectMeta.ResourceVersion")
+
+ cms := []*corev1.ConfigMap{
+ {
+ ObjectMeta: metav1.ObjectMeta{Name: config.GetEventsConfigName(), Namespace: system.Namespace()},
+ Data: map[string]string{
+ "sink": "http://synk:8080",
+ },
+ }, {
+ ObjectMeta: metav1.ObjectMeta{Name: config.GetFeatureFlagsConfigName(), Namespace: system.Namespace()},
+ Data: map[string]string{
+ "send-cloudevents-for-runs": "true",
+ },
+ },
+ }
+
+ condition := &apis.Condition{
+ Type: apis.ConditionSucceeded,
+ Status: corev1.ConditionTrue,
+ Reason: v1beta1.CustomRunReasonSuccessful.String(),
+ }
+ objectStatus := duckv1.Status{
+ Conditions: []apis.Condition{},
+ }
+ crStatusFields := v1beta1.CustomRunStatusFields{}
+ objectStatus.Conditions = append(objectStatus.Conditions, *condition)
+ customRun := v1beta1.CustomRun{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "test-customRun",
+ Namespace: "foo",
+ },
+ Spec: v1beta1.CustomRunSpec{},
+ Status: v1beta1.CustomRunStatus{
+ Status: objectStatus,
+ CustomRunStatusFields: crStatusFields,
+ },
+ }
+ customRuns := []*v1beta1.CustomRun{&customRun}
+ wantCloudEvents := []string{`(?s)dev.tekton.event.customrun.successful.v1.*test-customRun`}
+
+ d := test.Data{
+ CustomRuns: customRuns,
+ ConfigMaps: cms,
+ ExpectedCloudEventCount: len(wantCloudEvents),
+ }
+ testAssets, cancel := rtesting.InitializeTestAssets(t, &d)
+ defer cancel()
+ clients := testAssets.Clients
+
+ // Initialise the controller.
+ // Verify that the config map watcher and reconciler setup works well
+ testAssets = InitializeTestController(t, d, testAssets)
+ c := testAssets.Controller
+
+ if err := c.Reconciler.Reconcile(testAssets.Ctx, rtesting.GetTestResourceName(&customRun)); err != nil {
+ t.Errorf("didn't expect an error, but got one: %v", err)
+ }
+
+ for _, a := range clients.Kube.Actions() {
+ aVerb := a.GetVerb()
+ if aVerb != "get" && aVerb != "list" && aVerb != "watch" {
+ t.Errorf("Expected only read actions to be logged in the kubeclient, got %s", aVerb)
+ }
+ }
+
+ crAfter, err := clients.Pipeline.TektonV1beta1().CustomRuns(customRun.Namespace).Get(testAssets.Ctx, customRun.Name, metav1.GetOptions{})
+ if err != nil {
+ t.Fatalf("getting updated customRun: %v", err)
+ }
+
+ if d := cmp.Diff(&customRun, crAfter, ignoreResourceVersion); d != "" {
+ t.Fatalf("CustomRun should not have changed, got %v instead", d)
+ }
+
+ ceClient := clients.CloudEvents.(cloudevent.FakeClient)
+ ceClient.CheckCloudEventsUnordered(t, "controller test", wantCloudEvents)
+
+ // Try and reconcile again - expect no event
+ if err := c.Reconciler.Reconcile(testAssets.Ctx, rtesting.GetTestResourceName(&customRun)); err != nil {
+ t.Errorf("didn't expect an error, but got one: %v", err)
+ }
+ ceClient.CheckCloudEventsUnordered(t, "controller test", []string{})
+}
diff --git a/pkg/reconciler/notifications/customrun/customrun.go b/pkg/reconciler/notifications/customrun/reconciler.go
similarity index 65%
rename from pkg/reconciler/notifications/customrun/customrun.go
rename to pkg/reconciler/notifications/customrun/reconciler.go
index 9ec495f5855..7290e2aa44d 100644
--- a/pkg/reconciler/notifications/customrun/customrun.go
+++ b/pkg/reconciler/notifications/customrun/reconciler.go
@@ -23,11 +23,8 @@ import (
"github.com/tektoncd/pipeline/pkg/apis/config"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
customrunreconciler "github.com/tektoncd/pipeline/pkg/client/injection/reconciler/pipeline/v1beta1/customrun"
- "github.com/tektoncd/pipeline/pkg/reconciler/events"
- "github.com/tektoncd/pipeline/pkg/reconciler/events/cache"
"github.com/tektoncd/pipeline/pkg/reconciler/events/cloudevent"
- "knative.dev/pkg/apis"
- "knative.dev/pkg/logging"
+ "github.com/tektoncd/pipeline/pkg/reconciler/notifications"
pkgreconciler "knative.dev/pkg/reconciler"
)
@@ -37,35 +34,36 @@ type Reconciler struct {
cacheClient *lru.Cache
}
+func (c *Reconciler) GetCloudEventsClient() cloudevent.CEClient {
+ return c.cloudEventClient
+}
+
+func (c *Reconciler) GetCacheClient() *lru.Cache {
+ return c.cacheClient
+}
+
+func (c *Reconciler) SetCloudEventsClient(client cloudevent.CEClient) {
+ c.cloudEventClient = client
+}
+
+func (c *Reconciler) SetCacheClient(client *lru.Cache) {
+ c.cacheClient = client
+}
+
// Check that our Reconciler implements customrunreconciler.Interface
var (
_ customrunreconciler.Interface = (*Reconciler)(nil)
)
-// ReconcileKind compares the actual state with the desired, and attempts to
-// converge the two. It then updates the Status block of the CustomRun
-// resource with the current status of the resource.
+// ReconcileKind oberves the resource conditions and triggers notifications accordingly
func (c *Reconciler) ReconcileKind(ctx context.Context, customRun *v1beta1.CustomRun) pkgreconciler.Event {
- logger := logging.FromContext(ctx)
configs := config.FromContextOrDefaults(ctx)
- ctx = cloudevent.ToContext(ctx, c.cloudEventClient)
- ctx = cache.ToContext(ctx, c.cacheClient)
- logger.Infof("Reconciling %s", customRun.Name)
-
- // Create a copy of the CustomRun object, just in case, to avoid sync'ing changes
- customRunEvents := *customRun.DeepCopy()
-
if configs.FeatureFlags.SendCloudEventsForRuns {
// Custom task controllers may be sending events for "CustomRuns" associated
// to the custom tasks they control. To avoid sending duplicate events,
// CloudEvents for "CustomRuns" are only sent when enabled
- // Read and log the condition
- condition := customRunEvents.Status.GetCondition(apis.ConditionSucceeded)
- logger.Debugf("Emitting cloudevent for %s, condition: %s", customRunEvents.Name, condition)
-
- events.EmitCloudEvents(ctx, &customRunEvents)
+ return notifications.ReconcileRuntimeObject(ctx, c, customRun)
}
-
return nil
}
diff --git a/pkg/reconciler/notifications/customrun/customrun_test.go b/pkg/reconciler/notifications/customrun/reconciler_test.go
similarity index 62%
rename from pkg/reconciler/notifications/customrun/customrun_test.go
rename to pkg/reconciler/notifications/customrun/reconciler_test.go
index 38333e0deac..0de13fc35b7 100644
--- a/pkg/reconciler/notifications/customrun/customrun_test.go
+++ b/pkg/reconciler/notifications/customrun/reconciler_test.go
@@ -17,8 +17,6 @@ limitations under the License.
package customrun_test
import (
- "context"
- "strings"
"testing"
"github.com/google/go-cmp/cmp"
@@ -26,73 +24,27 @@ import (
"github.com/tektoncd/pipeline/pkg/apis/config"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
"github.com/tektoncd/pipeline/pkg/reconciler/events/cloudevent"
- "github.com/tektoncd/pipeline/pkg/reconciler/notifications/customrun"
- ttesting "github.com/tektoncd/pipeline/pkg/reconciler/testing"
+ "github.com/tektoncd/pipeline/pkg/reconciler/notifications"
+ rtesting "github.com/tektoncd/pipeline/pkg/reconciler/testing"
"github.com/tektoncd/pipeline/test"
- "github.com/tektoncd/pipeline/test/names"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/types"
- "k8s.io/client-go/tools/record"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"
- cminformer "knative.dev/pkg/configmap/informer"
- "knative.dev/pkg/controller"
- "knative.dev/pkg/logging"
- pkgreconciler "knative.dev/pkg/reconciler"
"knative.dev/pkg/system"
_ "knative.dev/pkg/system/testing" // Setup system.Namespace()
)
-func initializeCustomRunControllerAssets(t *testing.T, d test.Data) (test.Assets, func()) {
- t.Helper()
- ctx, _ := ttesting.SetupFakeContext(t)
- ctx = ttesting.SetupFakeCloudClientContext(ctx, d.ExpectedCloudEventCount)
- ctx, cancel := context.WithCancel(ctx)
- test.EnsureConfigurationConfigMapsExist(&d)
- c, informers := test.SeedTestData(t, ctx, d)
- configMapWatcher := cminformer.NewInformedWatcher(c.Kube, system.Namespace())
- ctl := customrun.NewController()(ctx, configMapWatcher)
- if err := configMapWatcher.Start(ctx.Done()); err != nil {
- t.Fatalf("error starting configmap watcher: %v", err)
- }
-
- if la, ok := ctl.Reconciler.(pkgreconciler.LeaderAware); ok {
- la.Promote(pkgreconciler.UniversalBucket(), func(pkgreconciler.Bucket, types.NamespacedName) {})
- }
-
- return test.Assets{
- Logger: logging.FromContext(ctx),
- Controller: ctl,
- Clients: c,
- Informers: informers,
- Recorder: controller.GetEventRecorder(ctx).(*record.FakeRecorder),
- Ctx: ctx,
- }, cancel
-}
-
-func getCustomRunName(customRun v1beta1.CustomRun) string {
- return strings.Join([]string{customRun.Namespace, customRun.Name}, "/")
-}
-
-// getCustomRunController returns an instance of the CustomRun controller/reconciler that has been seeded with
-// d, where d represents the state of the system (existing resources) needed for the test.
-func getCustomRunController(t *testing.T, d test.Data) (test.Assets, func()) {
- t.Helper()
- names.TestingSeed()
- return initializeCustomRunControllerAssets(t, d)
-}
-
-// TestReconcile_CloudEvents runs reconcile with a cloud event sink configured
+// TestReconcileKind_CloudEvents runs reconcile with a cloud event sink configured
// to ensure that events are sent in different cases
-func TestReconcile_CloudEvents(t *testing.T) {
+func TestReconcileKind_CloudEvents(t *testing.T) {
ignoreResourceVersion := cmpopts.IgnoreFields(v1beta1.CustomRun{}, "ObjectMeta.ResourceVersion")
cms := []*corev1.ConfigMap{
{
- ObjectMeta: metav1.ObjectMeta{Name: config.GetDefaultsConfigName(), Namespace: system.Namespace()},
+ ObjectMeta: metav1.ObjectMeta{Name: config.GetEventsConfigName(), Namespace: system.Namespace()},
Data: map[string]string{
- "default-cloud-events-sink": "http://synk:8080",
+ "sink": "http://synk:8080",
},
}, {
ObjectMeta: metav1.ObjectMeta{Name: config.GetFeatureFlagsConfigName(), Namespace: system.Namespace()},
@@ -101,6 +53,7 @@ func TestReconcile_CloudEvents(t *testing.T) {
},
},
}
+
testcases := []struct {
name string
condition *apis.Condition
@@ -162,13 +115,14 @@ func TestReconcile_CloudEvents(t *testing.T) {
ConfigMaps: cms,
ExpectedCloudEventCount: len(tc.wantCloudEvents),
}
- testAssets, cancel := getCustomRunController(t, d)
+ testAssets, cancel := rtesting.InitializeTestAssets(t, &d)
defer cancel()
- c := testAssets.Controller
clients := testAssets.Clients
+ reconciler := &rtesting.FakeReconciler{}
+ notifications.ReconcilerFromContext(testAssets.Ctx, reconciler)
- if err := c.Reconciler.Reconcile(testAssets.Ctx, getCustomRunName(customRun)); err != nil {
- t.Fatal("Didn't expect an error, but got one.")
+ if err := notifications.ReconcileRuntimeObject(testAssets.Ctx, reconciler, &customRun); err != nil {
+ t.Fatalf("didn't expect an error, but got one: %v", err)
}
for _, a := range clients.Kube.Actions() {
@@ -178,12 +132,12 @@ func TestReconcile_CloudEvents(t *testing.T) {
}
}
- updatedCR, err := clients.Pipeline.TektonV1beta1().CustomRuns(customRun.Namespace).Get(testAssets.Ctx, customRun.Name, metav1.GetOptions{})
+ crAfter, err := clients.Pipeline.TektonV1beta1().CustomRuns(customRun.Namespace).Get(testAssets.Ctx, customRun.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("getting updated customRun: %v", err)
}
- if d := cmp.Diff(&customRun, updatedCR, ignoreResourceVersion); d != "" {
+ if d := cmp.Diff(&customRun, crAfter, ignoreResourceVersion); d != "" {
t.Fatalf("CustomRun should not have changed, got %v instead", d)
}
@@ -191,8 +145,8 @@ func TestReconcile_CloudEvents(t *testing.T) {
ceClient.CheckCloudEventsUnordered(t, tc.name, tc.wantCloudEvents)
// Try and reconcile again - expect no event
- if err := c.Reconciler.Reconcile(testAssets.Ctx, getCustomRunName(customRun)); err != nil {
- t.Fatal("Didn't expect an error, but got one.")
+ if err := notifications.ReconcileRuntimeObject(testAssets.Ctx, reconciler, &customRun); err != nil {
+ t.Fatalf("didn't expect an error, but got one: %v", err)
}
ceClient.CheckCloudEventsUnordered(t, tc.name, []string{})
})
@@ -201,15 +155,15 @@ func TestReconcile_CloudEvents(t *testing.T) {
func TestReconcile_CloudEvents_Disabled(t *testing.T) {
cmSinkOn := &corev1.ConfigMap{
- ObjectMeta: metav1.ObjectMeta{Name: config.GetDefaultsConfigName(), Namespace: system.Namespace()},
+ ObjectMeta: metav1.ObjectMeta{Name: config.GetEventsConfigName(), Namespace: system.Namespace()},
Data: map[string]string{
- "default-cloud-events-sink": "http://synk:8080",
+ "sink": "http://synk:8080",
},
}
cmSinkOff := &corev1.ConfigMap{
- ObjectMeta: metav1.ObjectMeta{Name: config.GetDefaultsConfigName(), Namespace: system.Namespace()},
+ ObjectMeta: metav1.ObjectMeta{Name: config.GetEventsConfigName(), Namespace: system.Namespace()},
Data: map[string]string{
- "default-cloud-events-sink": "",
+ "sink": "",
},
}
cmRunsOn := &corev1.ConfigMap{
@@ -224,7 +178,7 @@ func TestReconcile_CloudEvents_Disabled(t *testing.T) {
"send-cloudevents-for-runs": "false",
},
}
- testcases := []struct {
+ for _, tc := range []struct {
name string
cms []*corev1.ConfigMap
}{{
@@ -236,9 +190,7 @@ func TestReconcile_CloudEvents_Disabled(t *testing.T) {
}, {
name: "CustomRuns Disabled",
cms: []*corev1.ConfigMap{cmSinkOn, cmRunsOff},
- }}
-
- for _, tc := range testcases {
+ }} {
t.Run(tc.name, func(t *testing.T) {
objectStatus := duckv1.Status{
Conditions: []apis.Condition{
@@ -267,24 +219,22 @@ func TestReconcile_CloudEvents_Disabled(t *testing.T) {
CustomRuns: customRuns,
ConfigMaps: tc.cms,
}
- testAssets, cancel := getCustomRunController(t, d)
+ testAssets, cancel := rtesting.InitializeTestAssets(t, &d)
defer cancel()
- c := testAssets.Controller
clients := testAssets.Clients
+ reconciler := &rtesting.FakeReconciler{}
+ notifications.ReconcilerFromContext(testAssets.Ctx, reconciler)
- if err := c.Reconciler.Reconcile(testAssets.Ctx, getCustomRunName(customRun)); err != nil {
- t.Fatal("Didn't expect an error, but got one.")
- }
- if len(clients.Kube.Actions()) == 0 {
- t.Errorf("Expected actions to be logged in the kubeclient, got none")
+ if err := notifications.ReconcileRuntimeObject(testAssets.Ctx, reconciler, &customRun); err != nil {
+ t.Fatalf("didn't expect an error, but got one: %v", err)
}
- updatedCR, err := clients.Pipeline.TektonV1beta1().CustomRuns(customRun.Namespace).Get(testAssets.Ctx, customRun.Name, metav1.GetOptions{})
+ crAfter, err := clients.Pipeline.TektonV1beta1().CustomRuns(customRun.Namespace).Get(testAssets.Ctx, customRun.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("getting updated customRun: %v", err)
}
- if d := cmp.Diff(customRun.Status, updatedCR.Status); d != "" {
+ if d := cmp.Diff(customRun.Status, crAfter.Status); d != "" {
t.Fatalf("CustomRun should not have changed, got %v instead", d)
}
diff --git a/pkg/reconciler/notifications/runtimeobject.go b/pkg/reconciler/notifications/runtimeobject.go
new file mode 100644
index 00000000000..6a3f6033065
--- /dev/null
+++ b/pkg/reconciler/notifications/runtimeobject.go
@@ -0,0 +1,53 @@
+/*
+Copyright 2023 The Tekton Authors
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package notifications
+
+import (
+ "context"
+
+ lru "github.com/hashicorp/golang-lru"
+ "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
+ "github.com/tektoncd/pipeline/pkg/reconciler/events"
+ "github.com/tektoncd/pipeline/pkg/reconciler/events/cache"
+ "github.com/tektoncd/pipeline/pkg/reconciler/events/cloudevent"
+ "knative.dev/pkg/apis"
+ "knative.dev/pkg/logging"
+ pkgreconciler "knative.dev/pkg/reconciler"
+)
+
+// Reconciler implements controller.Reconciler for Configuration resources.
+type Reconciler interface {
+ GetCloudEventsClient() cloudevent.CEClient
+ GetCacheClient() *lru.Cache
+ SetCloudEventsClient(cloudevent.CEClient)
+ SetCacheClient(*lru.Cache)
+}
+
+// ReconcileRuntimeObject observes a v1beta1.RunObject and triggers notifications
+func ReconcileRuntimeObject(ctx context.Context, c Reconciler, readOnlyRun v1beta1.RunObject) pkgreconciler.Event {
+ logger := logging.FromContext(ctx)
+ ctx = cloudevent.ToContext(ctx, c.GetCloudEventsClient())
+ ctx = cache.ToContext(ctx, c.GetCacheClient())
+
+ logger.Infof("reconciling %s", readOnlyRun.GetObjectMeta().GetName())
+
+ condition := readOnlyRun.GetStatusCondition().GetCondition(apis.ConditionSucceeded)
+ logger.Debugf("customRun %s, condition: %s", readOnlyRun.GetObjectMeta().GetName(), condition)
+
+ events.EmitCloudEvents(ctx, readOnlyRun)
+ return nil
+}
diff --git a/pkg/reconciler/notifications/runtimeobject_test.go b/pkg/reconciler/notifications/runtimeobject_test.go
new file mode 100644
index 00000000000..7f7302c3724
--- /dev/null
+++ b/pkg/reconciler/notifications/runtimeobject_test.go
@@ -0,0 +1,126 @@
+/*
+Copyright 2023 The Tekton Authors
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package notifications_test
+
+import (
+ "context"
+ "testing"
+
+ "github.com/tektoncd/pipeline/pkg/apis/config"
+ v1 "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1"
+ "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
+ "github.com/tektoncd/pipeline/pkg/reconciler/events"
+ "github.com/tektoncd/pipeline/pkg/reconciler/events/cache"
+ "github.com/tektoncd/pipeline/pkg/reconciler/events/cloudevent"
+ "github.com/tektoncd/pipeline/pkg/reconciler/notifications"
+ rtesting "github.com/tektoncd/pipeline/pkg/reconciler/testing"
+ "github.com/tektoncd/pipeline/test"
+ corev1 "k8s.io/api/core/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/runtime"
+ "knative.dev/pkg/system"
+ _ "knative.dev/pkg/system/testing" // Setup system.Namespace()
+)
+
+// TestReconcileRuntimeObject runs reconcile with a cloud event sink configured
+// and ensures that the event logic is correctly invoked for all supported types
+func TestReconcileRuntimeObject(t *testing.T) {
+
+ cms := []*corev1.ConfigMap{
+ {
+ ObjectMeta: metav1.ObjectMeta{Name: config.GetEventsConfigName(), Namespace: system.Namespace()},
+ Data: map[string]string{
+ "sink": "http://synk:8080",
+ },
+ }, {
+ ObjectMeta: metav1.ObjectMeta{Name: config.GetFeatureFlagsConfigName(), Namespace: system.Namespace()},
+ Data: map[string]string{
+ "send-cloudevents-for-runs": "true",
+ },
+ },
+ }
+
+ for _, tc := range []struct {
+ name string
+ runObject v1beta1.RunObject
+ }{{
+ name: "v1 TaskRun",
+ runObject: &v1.TaskRun{},
+ }, {
+ name: "v1 PipelineRun",
+ runObject: &v1.PipelineRun{},
+ }, {
+ name: "v1beta1 TaskRun",
+ runObject: &v1beta1.TaskRun{},
+ }, {
+ name: "v1beta1 PipelineRun",
+ runObject: &v1beta1.PipelineRun{},
+ }, {
+ name: "v1beta1 CustomRun",
+ runObject: &v1beta1.CustomRun{},
+ }} {
+ t.Run(tc.name, func(t *testing.T) {
+ // Setup mock EmitCloudEvents
+ calls := []rtesting.TestEmitCloudEventsParams{}
+ events.EmitCloudEvents = func(ctx context.Context, object runtime.Object) {
+ calls = append(calls, rtesting.TestEmitCloudEventsParams{
+ Ctx: ctx,
+ Object: object,
+ })
+ }
+
+ d := test.Data{
+ ConfigMaps: cms,
+ }
+ testAssets, cancel := rtesting.InitializeTestAssets(t, &d)
+ defer cancel()
+ clients := testAssets.Clients
+ reconciler := &rtesting.FakeReconciler{}
+ notifications.ReconcilerFromContext(testAssets.Ctx, reconciler)
+
+ if err := notifications.ReconcileRuntimeObject(testAssets.Ctx, reconciler, tc.runObject); err != nil {
+ t.Errorf("didn't expect an error, but got one: %v", err)
+ }
+
+ if len(calls) != 1 {
+ t.Errorf("expected one call to EmitCloudEvents, got: %d", len(calls))
+ }
+
+ // Check the context
+ ctx := calls[0].Ctx
+ if ceClient := cloudevent.Get(ctx); ceClient == nil {
+ t.Error("expected the cloudevents client in the context, but got none")
+ }
+ if cacheClient := cache.Get(ctx); cacheClient == nil {
+ t.Error("expected the cache client in the context, but got none")
+ }
+
+ for _, a := range clients.Kube.Actions() {
+ aVerb := a.GetVerb()
+ if aVerb != "get" && aVerb != "list" && aVerb != "watch" {
+ t.Errorf("Expected only read actions to be logged in the kubeclient, got %s", aVerb)
+ }
+ }
+
+ // Check that the object is the same passed to reconcile
+ runObject := calls[0].Object
+ if runObject != tc.runObject {
+ t.Error("expected EmitCloudEvents to receive exactly the same object from the reconcile")
+ }
+ })
+ }
+}
diff --git a/pkg/reconciler/testing/notifications.go b/pkg/reconciler/testing/notifications.go
new file mode 100644
index 00000000000..4875886de3c
--- /dev/null
+++ b/pkg/reconciler/testing/notifications.go
@@ -0,0 +1,113 @@
+/*
+Copyright 2023 The Tekton Authors
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package testing
+
+import (
+ "context"
+ "strings"
+ "testing"
+
+ lru "github.com/hashicorp/golang-lru"
+ "github.com/tektoncd/pipeline/pkg/apis/config"
+ "github.com/tektoncd/pipeline/pkg/reconciler/events/cloudevent"
+ "github.com/tektoncd/pipeline/test"
+ "github.com/tektoncd/pipeline/test/names"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/client-go/tools/record"
+ "knative.dev/pkg/controller"
+ "knative.dev/pkg/logging"
+)
+
+// TestEmitCloudEventsParams matches the parameter of the `EmitCloudEvents`
+// function, it is used to record invocations of the function
+type TestEmitCloudEventsParams struct {
+ Ctx context.Context //nolint:containedctx
+ Object runtime.Object
+}
+
+// Reconciler implements the Reconciler interface from the notification package
+type FakeReconciler struct {
+ cloudEventClient cloudevent.CEClient
+ cacheClient *lru.Cache
+}
+
+func (c *FakeReconciler) GetCloudEventsClient() cloudevent.CEClient {
+ return c.cloudEventClient
+}
+
+func (c *FakeReconciler) GetCacheClient() *lru.Cache {
+ return c.cacheClient
+}
+
+func (c *FakeReconciler) SetCloudEventsClient(client cloudevent.CEClient) {
+ c.cloudEventClient = client
+}
+
+func (c *FakeReconciler) SetCacheClient(client *lru.Cache) {
+ c.cacheClient = client
+}
+
+func configFromConfigMap(d test.Data) config.Config {
+ testConfig := config.Config{}
+ for _, cm := range d.ConfigMaps {
+ switch cm.Name {
+ case config.GetDefaultsConfigName():
+ testConfig.Defaults, _ = config.NewDefaultsFromConfigMap(cm)
+ case config.GetFeatureFlagsConfigName():
+ testConfig.FeatureFlags, _ = config.NewFeatureFlagsFromConfigMap(cm)
+ case config.GetEventsConfigName():
+ testConfig.Events, _ = config.NewEventsFromConfigMap(cm)
+ }
+ }
+ return testConfig
+}
+
+// InitializeTestAssets sets up test assets to be used for direct testing
+// of the ReconcileKind method (i.e. with no controller object)
+// Config maps are loaded into the context and no config map watcher is setup
+//
+// Example usage:
+//
+// testAssets, cancel := rtesting.InitializeTestAssets(t, &d)
+// defer cancel()
+// reconciler := &rtesting.FakeReconciler{}
+// notifications.ReconcilerFromContext(testAssets.Ctx, reconciler)
+func InitializeTestAssets(t *testing.T, d *test.Data) (test.Assets, func()) {
+ t.Helper()
+ names.TestingSeed()
+ ctx, _ := SetupFakeContext(t)
+ ctx = SetupFakeCloudClientContext(ctx, d.ExpectedCloudEventCount)
+ ctx, cancel := context.WithCancel(ctx)
+ // Ensure all cm exists before seeding the data
+ test.EnsureConfigurationConfigMapsExist(d)
+ c, informers := test.SeedTestData(t, ctx, *d)
+ testConfig := configFromConfigMap(*d)
+ ctx = config.ToContext(ctx, &testConfig)
+
+ return test.Assets{
+ Logger: logging.FromContext(ctx),
+ Clients: c,
+ Informers: informers,
+ Recorder: controller.GetEventRecorder(ctx).(*record.FakeRecorder),
+ Ctx: ctx,
+ }, cancel
+}
+
+func GetTestResourceName(run metav1.ObjectMetaAccessor) string {
+ return strings.Join([]string{run.GetObjectMeta().GetNamespace(), run.GetObjectMeta().GetName()}, "/")
+}
diff --git a/vendor/knative.dev/pkg/environment/client_config.go b/vendor/knative.dev/pkg/environment/client_config.go
index 04d4220b0ac..4a835d985da 100644
--- a/vendor/knative.dev/pkg/environment/client_config.go
+++ b/vendor/knative.dev/pkg/environment/client_config.go
@@ -21,6 +21,7 @@ import (
"fmt"
"math"
"os"
+ "runtime"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
@@ -37,6 +38,12 @@ type ClientConfig struct {
}
func (c *ClientConfig) InitFlags(fs *flag.FlagSet) {
+ pc, _, _, ok := runtime.Caller(1)
+ details := runtime.FuncForPC(pc)
+ if ok && details != nil {
+ fmt.Printf("InitFlags called from %s\n", details.Name())
+ }
+
fs.StringVar(&c.Cluster, "cluster", "", "Defaults to the current cluster in kubeconfig.")
fs.StringVar(&c.ServerURL, "server", "",
diff --git a/vendor/knative.dev/pkg/test/e2e_flags.go b/vendor/knative.dev/pkg/test/e2e_flags.go
index ef9b5519e70..05e0da111b6 100644
--- a/vendor/knative.dev/pkg/test/e2e_flags.go
+++ b/vendor/knative.dev/pkg/test/e2e_flags.go
@@ -22,6 +22,8 @@ package test
import (
"bytes"
"flag"
+ "fmt"
+ "runtime"
"text/template"
env "knative.dev/pkg/environment"
@@ -44,6 +46,11 @@ type EnvironmentFlags struct {
}
func initializeFlags() *EnvironmentFlags {
+ pc, _, _, ok := runtime.Caller(1)
+ details := runtime.FuncForPC(pc)
+ if ok && details != nil {
+ fmt.Printf("initializeFlags called from %s\n", details.Name())
+ }
f := new(EnvironmentFlags)
f.ClientConfig.InitFlags(flag.CommandLine)