diff --git a/docs/resources.md b/docs/resources.md index 43c95decd71..87aeefb83a5 100644 --- a/docs/resources.md +++ b/docs/resources.md @@ -52,6 +52,7 @@ The following `PipelineResources` are currently supported: - [Storage Resource](#storage-resource) - [GCS Storage Resource](#gcs-storage-resource) - [BuildGCS Storage Resource](#buildgcs-storage-resource) +- [Cloud Event Resource](#cloud-event-resource) ### Git Resource @@ -609,6 +610,68 @@ the container image [gcr.io/cloud-builders//gcs-fetcher](https://github.com/GoogleCloudPlatform/cloud-builders/tree/master/gcs-fetcher) does not support configuring secrets. +### Cloud Event Resource + +The Cloud Event Resource represents a [cloud event](https://github.com/cloudevents/spec) +that is sent to a target `URI` upon completion of a `TaskRun`. +The Cloud Event Resource sends Tekton specific events; the body of the event includes +the entire TaskRun spec plus status; the types of events defined for now are: + +- dev.tekton.event.task.unknown +- dev.tekton.event.task.successful +- dev.tekton.event.task.failed + +Cloud event resources are useful to notify a third party upon the completion and +status of a `TaskRun`. In combinations with the [Tekton triggers](https://github.com/tektoncd/triggers) +project they can be used to link `Task/PipelineRuns` asynchronously. + +To create a CloudEvent resource using the `PipelineResource` CRD: + +```yaml +apiVersion: tekton.dev/v1alpha1 +kind: PipelineResource +metadata: + name: event-to-sink +spec: + type: cloudevent + params: + - name: targetURI + value: http://sink:8080 +``` + +The content of an event is for example: + +```yaml +Context Attributes, + SpecVersion: 0.2 + Type: dev.tekton.event.task.successful + Source: /apis/tekton.dev/v1alpha1/namespaces/default/taskruns/pipeline-run-api-16aa55-source-to-image-task-rpndl + ID: pipeline-run-api-16aa55-source-to-image-task-rpndl + Time: 2019-07-04T11:03:53.058694712Z + ContentType: application/json +Transport Context, + URI: / + Host: my-sink.default.my-cluster.containers.appdomain.cloud + Method: POST +Data, + { + "taskRun": { + "metadata": {...} + "spec": { + "inputs": {...} + "outputs": {...} + "serviceAccount": "default", + "taskRef": { + "name": "source-to-image", + "kind": "Task" + }, + "timeout": "1h0m0s" + }, + "status": {...} + } + } +``` + Except as otherwise noted, the content of this page is licensed under the [Creative Commons Attribution 4.0 License](https://creativecommons.org/licenses/by/4.0/), and code samples are licensed under the diff --git a/examples/taskruns/taskrun-cloud-event.yaml b/examples/taskruns/taskrun-cloud-event.yaml new file mode 100644 index 00000000000..d855c1af2cf --- /dev/null +++ b/examples/taskruns/taskrun-cloud-event.yaml @@ -0,0 +1,187 @@ +--- +apiVersion: v1 +kind: Service +metadata: + name: sink + namespace: default +spec: + selector: + app: cloudevent + ports: + - protocol: TCP + port: 8080 + targetPort: 8080 +--- +apiVersion: v1 +kind: Pod +metadata: + labels: + app: cloudevent + name: message-sink + namespace: default +spec: + containers: + - env: + - name: PORT + value: "8080" + name: cloudeventlistener + image: python:3-alpine + imagePullPolicy: IfNotPresent + command: ["/bin/sh"] + args: + - -ce + - | + cat <

POST!

') + + def do_GET(self): + with open("content.txt", mode="rb") as f: + content = f.read() + self.send_response(200 if content else 404) + self.send_header('Content-type', 'text/plain') + self.end_headers() + self.wfile.write(content) + + if __name__ == "__main__": + open("content.txt", 'a').close() + httpd = HTTPServer(('', $PORT), GetAndPostHandler) + print('Starting httpd...') + httpd.serve_forever() + EOF + ports: + - containerPort: 8080 + name: user-port + protocol: TCP +--- +apiVersion: tekton.dev/v1alpha1 +kind: PipelineResource +metadata: + name: to-message-sink +spec: + type: cloudEvent + params: + - name: targetURI + value: http://sink.default:8080 +--- +apiVersion: tekton.dev/v1alpha1 +kind: PipelineResource +metadata: + name: fake-image +spec: + type: image + params: + - name: url + value: fake-registry/test/fake-image +--- +apiVersion: tekton.dev/v1alpha1 +kind: Task +metadata: + name: send-cloud-event-task +spec: + outputs: + resources: + - name: myimage + type: image + - name: notification + type: cloudEvent + steps: + - name: build-index-json + image: busybox + command: + - /bin/sh + args: + - -ce + - | + set -e + mkdir -p /builder/home/image-outputs/myimage/ + cat < /builder/home/image-outputs/myimage/index.json + { + "schemaVersion": 2, + "manifests": [ + { + "mediaType": "application/vnd.oci.image.index.v1+json", + "size": 314, + "digest": "sha256:NOTAREALDIGEST" + } + ] + } + EOF +--- +apiVersion: tekton.dev/v1alpha1 +kind: Task +metadata: + name: poll-for-content-task +spec: + steps: + - name: polling + image: python:3-alpine + imagePullPolicy: IfNotPresent + command: ["/bin/sh"] + args: + - -ce + - | + cat < 0 { - initialState := CloudEventDeliveryState{ - Condition: CloudEventConditionUnknown, - RetryCount: 0, - } - events := make([]CloudEventDelivery, len(targets)) - for idx, target := range targets { - events[idx] = CloudEventDelivery{ - Target: target, - Status: initialState, - } - } - tr.CloudEvents = events - } -} - // StepState reports the results of running a step in the Task. type StepState struct { corev1.ContainerState diff --git a/pkg/apis/pipeline/v1alpha1/taskrun_types_test.go b/pkg/apis/pipeline/v1alpha1/taskrun_types_test.go index aaf3fc69e04..0b0e46ea942 100644 --- a/pkg/apis/pipeline/v1alpha1/taskrun_types_test.go +++ b/pkg/apis/pipeline/v1alpha1/taskrun_types_test.go @@ -151,53 +151,3 @@ func TestTaskRunHasStarted(t *testing.T) { }) } } - -func TestInitializeCloudEvents(t *testing.T) { - tests := []struct { - name string - targets []string - wantCloudEvents []v1alpha1.CloudEventDelivery - }{{ - name: "testWithNilTarget", - targets: nil, - wantCloudEvents: nil, - }, { - name: "testWithEmptyListTarget", - targets: make([]string, 0), - wantCloudEvents: nil, - }, { - name: "testWithTwoTargets", - targets: []string{"target1", "target2"}, - wantCloudEvents: []v1alpha1.CloudEventDelivery{ - { - Target: "target1", - Status: v1alpha1.CloudEventDeliveryState{ - Condition: v1alpha1.CloudEventConditionUnknown, - SentAt: nil, - Error: "", - RetryCount: 0, - }, - }, - { - Target: "target2", - Status: v1alpha1.CloudEventDeliveryState{ - Condition: v1alpha1.CloudEventConditionUnknown, - SentAt: nil, - Error: "", - RetryCount: 0, - }, - }, - }, - }} - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - tr := tb.TaskRun("taskrunname", "testns", tb.TaskRunStatus()) - trs := tr.Status - trs.InitializeCloudEvents(tc.targets) - gotCloudEvents := trs.CloudEvents - if diff := cmp.Diff(tc.wantCloudEvents, gotCloudEvents); diff != "" { - t.Errorf("Wrong Cloud Events (-want +got) = %s", diff) - } - }) - } -} diff --git a/pkg/reconciler/v1alpha1/taskrun/controller.go b/pkg/reconciler/v1alpha1/taskrun/controller.go index 2c6e2618507..1f77b050cbd 100644 --- a/pkg/reconciler/v1alpha1/taskrun/controller.go +++ b/pkg/reconciler/v1alpha1/taskrun/controller.go @@ -27,6 +27,7 @@ import ( taskruninformer "github.com/tektoncd/pipeline/pkg/client/injection/informers/pipeline/v1alpha1/taskrun" "github.com/tektoncd/pipeline/pkg/reconciler" "github.com/tektoncd/pipeline/pkg/reconciler/v1alpha1/taskrun/entrypoint" + cloudeventclient "github.com/tektoncd/pipeline/pkg/reconciler/v1alpha1/taskrun/resources/cloudevent" "k8s.io/client-go/tools/cache" "knative.dev/pkg/configmap" "knative.dev/pkg/controller" @@ -69,6 +70,7 @@ func NewController( clusterTaskLister: clusterTaskInformer.Lister(), resourceLister: resourceInformer.Lister(), timeoutHandler: timeoutHandler, + cloudEventClient: cloudeventclient.Get(ctx), } impl := controller.NewImpl(c, c.Logger, taskRunControllerName) diff --git a/pkg/reconciler/v1alpha1/taskrun/resources/cloudevent/cloud_event_controller.go b/pkg/reconciler/v1alpha1/taskrun/resources/cloudevent/cloud_event_controller.go new file mode 100644 index 00000000000..e2c3082104f --- /dev/null +++ b/pkg/reconciler/v1alpha1/taskrun/resources/cloudevent/cloud_event_controller.go @@ -0,0 +1,97 @@ +/* +Copyright 2019 The Tekton Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cloudevent + +import ( + "time" + + "github.com/hashicorp/go-multierror" + "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1" + "go.uber.org/zap" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// InitializeCloudEvents initializes the CloudEvents part of the +// TaskRunStatus from a slice of PipelineResources +func InitializeCloudEvents(tr *v1alpha1.TaskRun, prs []*v1alpha1.PipelineResource) { + // FIXME(afrittoli) If there are no events this is run every time + if len(tr.Status.CloudEvents) == 0 { + var targets []string + for _, output := range prs { + if output.Spec.Type == v1alpha1.PipelineResourceTypeCloudEvent { + cer, _ := v1alpha1.NewCloudEventResource(output) + targets = append(targets, cer.TargetURI) + } + } + if len(targets) > 0 { + tr.Status.CloudEvents = cloudEventDeliveryFromTargets(targets) + } + } +} + +func cloudEventDeliveryFromTargets(targets []string) []v1alpha1.CloudEventDelivery { + // len(nil slice) is 0 + if len(targets) > 0 { + initialState := v1alpha1.CloudEventDeliveryState{ + Condition: v1alpha1.CloudEventConditionUnknown, + RetryCount: 0, + } + events := make([]v1alpha1.CloudEventDelivery, len(targets)) + for idx, target := range targets { + events[idx] = v1alpha1.CloudEventDelivery{ + Target: target, + Status: initialState, + } + } + return events + } + return nil +} + +// SendCloudEvents is used by the TaskRun controller to send cloud events once +// the TaskRun is complete. `tr` is used to obtain the list of targets but also +// to construct the body of the +func SendCloudEvents(tr *v1alpha1.TaskRun, ceclient CEClient, logger *zap.SugaredLogger) error { + // Using multierror here so we can attempt to send all cloud events defined, + // regardless of whether they fail or not, and report all failed ones + var merr *multierror.Error + for idx, cloudEventDelivery := range tr.Status.CloudEvents { + eventStatus := &(tr.Status.CloudEvents[idx].Status) + // Skip events that have already been sent (successfully or unsuccessfully) + // Ensure we try to send all events once (possibly through different reconcile calls) + if eventStatus.Condition != v1alpha1.CloudEventConditionUnknown || eventStatus.RetryCount > 0 { + continue + } + _, err := SendTaskRunCloudEvent(cloudEventDelivery.Target, tr, logger, ceclient) + eventStatus.SentAt = &metav1.Time{Time: time.Now()} + eventStatus.RetryCount = eventStatus.RetryCount + 1 + if err != nil { + merr = multierror.Append(merr, err) + eventStatus.Condition = v1alpha1.CloudEventConditionFailed + eventStatus.Error = merr.Error() + } else { + logger.Infof("Sent event for target %s", cloudEventDelivery.Target) + eventStatus.Condition = v1alpha1.CloudEventConditionSent + } + } + if merr != nil && merr.Len() > 0 { + logger.Errorf("Failed to send %d cloud events for TaskRun %s", merr.Len(), tr.Name) + // Return all send error + return merr + } + return merr +} diff --git a/pkg/reconciler/v1alpha1/taskrun/resources/cloudevent/cloud_event_controller_test.go b/pkg/reconciler/v1alpha1/taskrun/resources/cloudevent/cloud_event_controller_test.go new file mode 100644 index 00000000000..b907a65d6f3 --- /dev/null +++ b/pkg/reconciler/v1alpha1/taskrun/resources/cloudevent/cloud_event_controller_test.go @@ -0,0 +1,251 @@ +/* +Copyright 2019 The Tekton Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cloudevent + +import ( + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1" + "github.com/tektoncd/pipeline/pkg/logging" + tb "github.com/tektoncd/pipeline/test/builder" +) + +func TestCloudEventDeliveryFromTargets(t *testing.T) { + tests := []struct { + name string + targets []string + wantCloudEvents []v1alpha1.CloudEventDelivery + }{{ + name: "testWithNilTarget", + targets: nil, + wantCloudEvents: nil, + }, { + name: "testWithEmptyListTarget", + targets: make([]string, 0), + wantCloudEvents: nil, + }, { + name: "testWithTwoTargets", + targets: []string{"target1", "target2"}, + wantCloudEvents: []v1alpha1.CloudEventDelivery{ + { + Target: "target1", + Status: v1alpha1.CloudEventDeliveryState{ + Condition: v1alpha1.CloudEventConditionUnknown, + SentAt: nil, + Error: "", + RetryCount: 0, + }, + }, + { + Target: "target2", + Status: v1alpha1.CloudEventDeliveryState{ + Condition: v1alpha1.CloudEventConditionUnknown, + SentAt: nil, + Error: "", + RetryCount: 0, + }, + }, + }, + }} + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + gotCloudEvents := cloudEventDeliveryFromTargets(tc.targets) + if diff := cmp.Diff(tc.wantCloudEvents, gotCloudEvents); diff != "" { + t.Errorf("Wrong Cloud Events (-want +got) = %s", diff) + } + }) + } +} + +func TestSendCloudEvents(t *testing.T) { + tests := []struct { + name string + taskRun *v1alpha1.TaskRun + wantTaskRun *v1alpha1.TaskRun + }{{ + name: "testWithMultipleMixedCloudEvents", + taskRun: tb.TaskRun("test-taskrun-multiple-cloudeventdelivery", "foo", + tb.TaskRunSelfLink("/task/1234"), + tb.TaskRunSpec( + tb.TaskRunTaskRef("fakeTaskName"), + ), + tb.TaskRunStatus( + tb.TaskRunCloudEvent("http//notattemptedunknown", "", 0, v1alpha1.CloudEventConditionUnknown), + tb.TaskRunCloudEvent("http//notattemptedfailed", "somehow", 0, v1alpha1.CloudEventConditionFailed), + tb.TaskRunCloudEvent("http//notattemptedsucceeded", "", 0, v1alpha1.CloudEventConditionSent), + tb.TaskRunCloudEvent("http//attemptedunknown", "", 1, v1alpha1.CloudEventConditionUnknown), + tb.TaskRunCloudEvent("http//attemptedfailed", "iknewit", 1, v1alpha1.CloudEventConditionFailed), + tb.TaskRunCloudEvent("http//attemptedsucceeded", "", 1, v1alpha1.CloudEventConditionSent), + ), + ), + wantTaskRun: tb.TaskRun("test-taskrun-multiple-cloudeventdelivery", "foo", + tb.TaskRunSpec( + tb.TaskRunTaskRef("fakeTaskName"), + ), + tb.TaskRunStatus( + tb.TaskRunCloudEvent("http//notattemptedunknown", "", 1, v1alpha1.CloudEventConditionSent), + tb.TaskRunCloudEvent("http//notattemptedfailed", "somehow", 0, v1alpha1.CloudEventConditionFailed), + tb.TaskRunCloudEvent("http//notattemptedsucceeded", "", 0, v1alpha1.CloudEventConditionSent), + tb.TaskRunCloudEvent("http//attemptedunknown", "", 1, v1alpha1.CloudEventConditionUnknown), + tb.TaskRunCloudEvent("http//attemptedfailed", "iknewit", 1, v1alpha1.CloudEventConditionFailed), + tb.TaskRunCloudEvent("http//attemptedsucceeded", "", 1, v1alpha1.CloudEventConditionSent), + ), + ), + }} + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + logger, _ := logging.NewLogger("", "") + successfulBehaviour := FakeClientBehaviour{ + SendSuccessfully: true, + } + err := SendCloudEvents(tc.taskRun, NewFakeClient(&successfulBehaviour), logger) + if err == nil { + t.Fatalf("Unexpected error sending cloud events: %v", err) + } + opts := GetCloudEventDeliveryCompareOptions() + if diff := cmp.Diff(tc.wantTaskRun.Status, tc.taskRun.Status, opts...); diff != "" { + t.Errorf("Wrong Cloud Events Status (-want +got) = %s", diff) + } + }) + } +} + +func TestSendCloudEventsErrors(t *testing.T) { + tests := []struct { + name string + taskRun *v1alpha1.TaskRun + wantTaskRun *v1alpha1.TaskRun + }{{ + name: "testWithMultipleMixedCloudEvents", + taskRun: tb.TaskRun("test-taskrun-multiple-cloudeventdelivery", "foo", + tb.TaskRunSelfLink("/task/1234"), + tb.TaskRunSpec( + tb.TaskRunTaskRef("fakeTaskName"), + ), + tb.TaskRunStatus( + tb.TaskRunCloudEvent("http//sink1", "", 0, v1alpha1.CloudEventConditionUnknown), + tb.TaskRunCloudEvent("http//sink2", "", 0, v1alpha1.CloudEventConditionUnknown), + ), + ), + wantTaskRun: tb.TaskRun("test-taskrun-multiple-cloudeventdelivery", "foo", + tb.TaskRunSpec( + tb.TaskRunTaskRef("fakeTaskName"), + ), + tb.TaskRunStatus( + // Error message is not checked in the Diff below + tb.TaskRunCloudEvent("http//sink1", "", 1, v1alpha1.CloudEventConditionFailed), + tb.TaskRunCloudEvent("http//sink2", "", 1, v1alpha1.CloudEventConditionFailed), + ), + ), + }} + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + logger, _ := logging.NewLogger("", "") + unsuccessfulBehaviour := FakeClientBehaviour{ + SendSuccessfully: false, + } + err := SendCloudEvents(tc.taskRun, NewFakeClient(&unsuccessfulBehaviour), logger) + if err == nil { + t.Fatalf("Unexpected success sending cloud events: %v", err) + } + opts := GetCloudEventDeliveryCompareOptions() + if diff := cmp.Diff(tc.wantTaskRun.Status, tc.taskRun.Status, opts...); diff != "" { + t.Errorf("Wrong Cloud Events Status (-want +got) = %s", diff) + } + }) + } +} + +func TestInitializeCloudEvents(t *testing.T) { + tests := []struct { + name string + taskRun *v1alpha1.TaskRun + pipelineResources []*v1alpha1.PipelineResource + wantTaskRun *v1alpha1.TaskRun + }{{ + name: "testWithMultipleMixedResources", + taskRun: tb.TaskRun("test-taskrun-multiple-mixed-resources", "foo", + tb.TaskRunSelfLink("/task/1234"), tb.TaskRunSpec( + tb.TaskRunTaskRef("fakeTaskName"), + tb.TaskRunOutputs( + tb.TaskRunOutputsResource("ce1", tb.TaskResourceBindingRef("ce1")), + tb.TaskRunOutputsResource("git", tb.TaskResourceBindingRef("git")), + tb.TaskRunOutputsResource("ce2", tb.TaskResourceBindingRef("ce2")), + ), + ), + ), + pipelineResources: []*v1alpha1.PipelineResource{ + tb.PipelineResource("ce1", "foo", tb.PipelineResourceSpec( + v1alpha1.PipelineResourceTypeCloudEvent, + tb.PipelineResourceSpecParam("TargetURI", "http://foosink"), + )), + tb.PipelineResource("ce2", "foo", tb.PipelineResourceSpec( + v1alpha1.PipelineResourceTypeCloudEvent, + tb.PipelineResourceSpecParam("TargetURI", "http://barsink"), + )), + tb.PipelineResource("git", "foo", tb.PipelineResourceSpec( + v1alpha1.PipelineResourceTypeGit, + tb.PipelineResourceSpecParam("URL", "http://git.fake"), + tb.PipelineResourceSpecParam("Revision", "abcd"), + )), + }, + wantTaskRun: tb.TaskRun("test-taskrun-multiple-mixed-resources", "foo", + tb.TaskRunSpec( + tb.TaskRunTaskRef("fakeTaskName"), + ), + tb.TaskRunStatus( + tb.TaskRunCloudEvent("http://barsink", "", 0, v1alpha1.CloudEventConditionUnknown), + tb.TaskRunCloudEvent("http://foosink", "", 0, v1alpha1.CloudEventConditionUnknown), + ), + ), + }, { + name: "testWithNoCloudEventResources", + taskRun: tb.TaskRun("test-taskrun-no-cloudevent-resources", "foo", + tb.TaskRunSelfLink("/task/1234"), tb.TaskRunSpec( + tb.TaskRunTaskRef("fakeTaskName"), + tb.TaskRunOutputs( + tb.TaskRunOutputsResource("git", tb.TaskResourceBindingRef("git")), + ), + ), + ), + pipelineResources: []*v1alpha1.PipelineResource{ + tb.PipelineResource("git", "foo", tb.PipelineResourceSpec( + v1alpha1.PipelineResourceTypeGit, + tb.PipelineResourceSpecParam("URL", "http://git.fake"), + tb.PipelineResourceSpecParam("Revision", "abcd"), + )), + }, + wantTaskRun: tb.TaskRun("test-taskrun-no-cloudevent-resources", "foo", + tb.TaskRunSpec( + tb.TaskRunTaskRef("fakeTaskName"), + ), + tb.TaskRunStatus(), + ), + }} + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + InitializeCloudEvents(tc.taskRun, tc.pipelineResources) + opts := GetCloudEventDeliveryCompareOptions() + if diff := cmp.Diff(tc.wantTaskRun.Status, tc.taskRun.Status, opts...); diff != "" { + t.Errorf("Wrong Cloud Events Status (-want +got) = %s", diff) + } + }) + } +} + +// TBD: TestSendCloudEvents error tests diff --git a/pkg/reconciler/v1alpha1/taskrun/resources/cloudevent/cloudevent.go b/pkg/reconciler/v1alpha1/taskrun/resources/cloudevent/cloudevent.go index ba742ad2811..3ee0f382262 100644 --- a/pkg/reconciler/v1alpha1/taskrun/resources/cloudevent/cloudevent.go +++ b/pkg/reconciler/v1alpha1/taskrun/resources/cloudevent/cloudevent.go @@ -21,12 +21,15 @@ import ( "encoding/json" "errors" "fmt" + "strings" "time" "github.com/cloudevents/sdk-go/pkg/cloudevents" "github.com/cloudevents/sdk-go/pkg/cloudevents/client" cecontext "github.com/cloudevents/sdk-go/pkg/cloudevents/context" "github.com/cloudevents/sdk-go/pkg/cloudevents/types" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" "go.uber.org/zap" "knative.dev/eventing-contrib/pkg/kncloudevents" "knative.dev/pkg/apis" @@ -125,3 +128,21 @@ func SendTaskRunCloudEvent(sinkURI string, taskRun *v1alpha1.TaskRun, logger *za event, err = SendCloudEvent(sinkURI, eventID, eventSourceURI, data, eventType, logger, cloudEventClient) return event, err } + +// GetCloudEventDeliveryCompareOptions returns compare options to sort +// and compare a list of CloudEventDelivery +func GetCloudEventDeliveryCompareOptions() []cmp.Option { + // Setup cmp options + cloudDeliveryStateCompare := func(x, y v1alpha1.CloudEventDeliveryState) bool { + return cmp.Equal(x.Condition, y.Condition) && cmp.Equal(x.RetryCount, y.RetryCount) + } + less := func(x, y v1alpha1.CloudEventDelivery) bool { + return strings.Compare(x.Target, y.Target) < 0 || (strings.Compare(x.Target, y.Target) == 0 && x.Status.SentAt.Before(y.Status.SentAt)) + } + return []cmp.Option{ + cmpopts.SortSlices(less), + cmp.Comparer(func(x, y v1alpha1.CloudEventDelivery) bool { + return (strings.Compare(x.Target, y.Target) == 0) && cloudDeliveryStateCompare(x.Status, y.Status) + }), + } +} diff --git a/pkg/reconciler/v1alpha1/taskrun/taskrun.go b/pkg/reconciler/v1alpha1/taskrun/taskrun.go index 9d4af00300b..48ce3e92eda 100644 --- a/pkg/reconciler/v1alpha1/taskrun/taskrun.go +++ b/pkg/reconciler/v1alpha1/taskrun/taskrun.go @@ -23,6 +23,7 @@ import ( "strings" "time" + "github.com/hashicorp/go-multierror" "github.com/tektoncd/pipeline/pkg/apis/config" "github.com/tektoncd/pipeline/pkg/apis/pipeline" "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1" @@ -30,6 +31,7 @@ import ( "github.com/tektoncd/pipeline/pkg/reconciler" "github.com/tektoncd/pipeline/pkg/reconciler/v1alpha1/taskrun/entrypoint" "github.com/tektoncd/pipeline/pkg/reconciler/v1alpha1/taskrun/resources" + "github.com/tektoncd/pipeline/pkg/reconciler/v1alpha1/taskrun/resources/cloudevent" "github.com/tektoncd/pipeline/pkg/reconciler/v1alpha1/taskrun/sidecars" "github.com/tektoncd/pipeline/pkg/status" "go.uber.org/zap" @@ -65,6 +67,7 @@ type Reconciler struct { taskLister listers.TaskLister clusterTaskLister listers.ClusterTaskLister resourceLister listers.PipelineResourceLister + cloudEventClient cloudevent.CEClient tracker tracker.Interface cache *entrypoint.Cache timeoutHandler *reconciler.TimeoutSet @@ -108,25 +111,42 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error { } if tr.IsDone() { + var merr *multierror.Error + // Try to send cloud events first + cloudEventErr := cloudevent.SendCloudEvents(tr, c.cloudEventClient, c.Logger) + // Regardless of `err`, we must write back any status update that may have + // been generated by `sendCloudEvents` + updateErr := c.updateStatusLabelsAndAnnotations(tr, original) + merr = multierror.Append(cloudEventErr, updateErr) + if cloudEventErr != nil { + // Let's keep timeouts and sidecars running as long as we're trying to + // send cloud events. So we stop here an return errors encountered this far. + return merr.ErrorOrNil() + } c.timeoutHandler.Release(tr) pod, err := c.KubeClientSet.CoreV1().Pods(tr.Namespace).Get(tr.Status.PodName, metav1.GetOptions{}) if err == nil { err = sidecars.Stop(pod, c.KubeClientSet.CoreV1().Pods(tr.Namespace).Update) } else if errors.IsNotFound(err) { - return nil + return merr.ErrorOrNil() } if err != nil { c.Logger.Errorf("Error stopping sidecars for TaskRun %q: %v", name, err) + merr = multierror.Append(merr, err) } - return err + return merr.ErrorOrNil() } - // Reconcile this copy of the task run and then write back any status // updates regardless of whether the reconciliation errored out. if err := c.reconcile(ctx, tr); err != nil { c.Logger.Errorf("Reconcile error: %v", err.Error()) return err } + return c.updateStatusLabelsAndAnnotations(tr, original) +} + +func (c *Reconciler) updateStatusLabelsAndAnnotations(tr, original *v1alpha1.TaskRun) error { + var err error if equality.Semantic.DeepEqual(original.Status, tr.Status) { // If we didn't change anything then don't call updateStatus. // This is important because the copy we loaded from the informer's @@ -144,7 +164,6 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error { return err } } - return err } @@ -253,6 +272,17 @@ func (c *Reconciler) reconcile(ctx context.Context, tr *v1alpha1.TaskRun) error return nil } + // Initialize the cloud events if at least a CloudEventResource is defined + // and they have not been initialized yet. + // FIXME(afrittoli) This resource specific logic will have to be replaced + // once we have a custom PipelineResource framework in place. + c.Logger.Infof("Cloud Events: %s", tr.Status.CloudEvents) + prs := make([]*v1alpha1.PipelineResource, 0, len(rtr.Outputs)) + for _, pr := range rtr.Outputs { + prs = append(prs, pr) + } + cloudevent.InitializeCloudEvents(tr, prs) + // Get the TaskRun's Pod if it should have one. Otherwise, create the Pod. pod, err := resources.TryGetPod(tr.Status, c.KubeClientSet.CoreV1().Pods(tr.Namespace).Get) if err != nil { @@ -293,7 +323,6 @@ func (c *Reconciler) reconcile(ctx context.Context, tr *v1alpha1.TaskRun) error } reconciler.EmitEvent(c.Recorder, before, after, tr) - c.Logger.Infof("Successfully reconciled taskrun %s/%s with status: %#v", tr.Name, tr.Namespace, after) return nil diff --git a/pkg/reconciler/v1alpha1/taskrun/taskrun_test.go b/pkg/reconciler/v1alpha1/taskrun/taskrun_test.go index 7f8aaf7943c..9fb610ead08 100644 --- a/pkg/reconciler/v1alpha1/taskrun/taskrun_test.go +++ b/pkg/reconciler/v1alpha1/taskrun/taskrun_test.go @@ -30,6 +30,7 @@ import ( "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1" "github.com/tektoncd/pipeline/pkg/reconciler/v1alpha1/taskrun/entrypoint" "github.com/tektoncd/pipeline/pkg/reconciler/v1alpha1/taskrun/resources" + "github.com/tektoncd/pipeline/pkg/reconciler/v1alpha1/taskrun/resources/cloudevent" "github.com/tektoncd/pipeline/pkg/status" "github.com/tektoncd/pipeline/pkg/system" "github.com/tektoncd/pipeline/test" @@ -72,6 +73,8 @@ var ( resourceQuantityCmp = cmp.Comparer(func(x, y resource.Quantity) bool { return x.Cmp(y) == 0 }) + cloudEventTarget1 = "https://foo" + cloudEventTarget2 = "https://bar" simpleStep = tb.Step("simple-step", "foo", tb.StepCommand("/mycmd")) simpleTask = tb.Task("test-task", "foo", tb.TaskSpec(simpleStep)) @@ -134,6 +137,13 @@ var ( })), )) + twoOutputsTask = tb.Task("test-two-output-task", "foo", tb.TaskSpec( + simpleStep, tb.TaskOutputs( + tb.OutputsResource(cloudEventResource.Name, v1alpha1.PipelineResourceTypeCloudEvent), + tb.OutputsResource(anotherCloudEventResource.Name, v1alpha1.PipelineResourceTypeCloudEvent), + ), + )) + gitResource = tb.PipelineResource("git-resource", "foo", tb.PipelineResourceSpec( v1alpha1.PipelineResourceTypeGit, tb.PipelineResourceSpecParam("URL", "https://foo.git"), )) @@ -143,6 +153,12 @@ var ( imageResource = tb.PipelineResource("image-resource", "foo", tb.PipelineResourceSpec( v1alpha1.PipelineResourceTypeImage, tb.PipelineResourceSpecParam("URL", "gcr.io/kristoff/sven"), )) + cloudEventResource = tb.PipelineResource("cloud-event-resource", "foo", tb.PipelineResourceSpec( + v1alpha1.PipelineResourceTypeCloudEvent, tb.PipelineResourceSpecParam("TargetURI", cloudEventTarget1), + )) + anotherCloudEventResource = tb.PipelineResource("another-cloud-event-resource", "foo", tb.PipelineResourceSpec( + v1alpha1.PipelineResourceTypeCloudEvent, tb.PipelineResourceSpecParam("TargetURI", cloudEventTarget2), + )) toolsVolume = corev1.Volume{ Name: "tools", @@ -241,6 +257,10 @@ func getRunName(tr *v1alpha1.TaskRun) string { func getTaskRunController(t *testing.T, d test.Data) (test.TestAssets, func()) { ctx, _ := rtesting.SetupFakeContext(t) ctx, cancel := context.WithCancel(ctx) + cloudEventClientBehaviour := cloudevent.FakeClientBehaviour{ + SendSuccessfully: true, + } + ctx = cloudevent.WithClient(ctx, &cloudEventClientBehaviour) entrypointCache, _ = entrypoint.NewCache() c, _ := test.SeedTestData(t, ctx, d) configMapWatcher := configmap.NewInformedWatcher(c.Kube, system.GetNamespace()) @@ -1616,3 +1636,182 @@ func TestHandlePodCreationError(t *testing.T) { }) } } + +func TestReconcileCloudEvents(t *testing.T) { + + taskRunWithNoCEResources := tb.TaskRun("test-taskrun-no-ce-resources", "foo", + tb.TaskRunSpec( + tb.TaskRunTaskRef(simpleTask.Name, tb.TaskRefAPIVersion("a1")), + )) + taskRunWithTwoCEResourcesNoInit := tb.TaskRun("test-taskrun-two-ce-resources-no-init", "foo", + tb.TaskRunSpec( + tb.TaskRunTaskRef(twoOutputsTask.Name), + tb.TaskRunOutputs( + tb.TaskRunOutputsResource(cloudEventResource.Name), + tb.TaskRunOutputsResource(anotherCloudEventResource.Name), + ), + ), + ) + taskRunWithTwoCEResourcesInit := tb.TaskRun("test-taskrun-two-ce-resources-init", "foo", + tb.TaskRunSpec( + tb.TaskRunTaskRef(twoOutputsTask.Name), + tb.TaskRunOutputs( + tb.TaskRunOutputsResource(cloudEventResource.Name), + tb.TaskRunOutputsResource(anotherCloudEventResource.Name), + ), + ), + tb.TaskRunStatus( + tb.TaskRunCloudEvent(cloudEventTarget1, "", 0, v1alpha1.CloudEventConditionUnknown), + tb.TaskRunCloudEvent(cloudEventTarget2, "", 0, v1alpha1.CloudEventConditionUnknown), + ), + ) + taskRunWithCESucceded := tb.TaskRun("test-taskrun-ce-succeeded", "foo", + tb.TaskRunSelfLink("/task/1234"), + tb.TaskRunSpec( + tb.TaskRunTaskRef(twoOutputsTask.Name), + tb.TaskRunOutputs( + tb.TaskRunOutputsResource(cloudEventResource.Name), + tb.TaskRunOutputsResource(anotherCloudEventResource.Name), + ), + ), + tb.TaskRunStatus( + tb.StatusCondition(apis.Condition{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionTrue, + }), + tb.TaskRunCloudEvent(cloudEventTarget1, "", 0, v1alpha1.CloudEventConditionUnknown), + tb.TaskRunCloudEvent(cloudEventTarget2, "", 0, v1alpha1.CloudEventConditionUnknown), + ), + ) + taskRunWithCEFailed := tb.TaskRun("test-taskrun-ce-failed", "foo", + tb.TaskRunSelfLink("/task/1234"), + tb.TaskRunSpec( + tb.TaskRunTaskRef(twoOutputsTask.Name), + tb.TaskRunOutputs( + tb.TaskRunOutputsResource(cloudEventResource.Name), + tb.TaskRunOutputsResource(anotherCloudEventResource.Name), + ), + ), + tb.TaskRunStatus( + tb.StatusCondition(apis.Condition{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionFalse, + }), + tb.TaskRunCloudEvent(cloudEventTarget1, "", 0, v1alpha1.CloudEventConditionUnknown), + tb.TaskRunCloudEvent(cloudEventTarget2, "", 0, v1alpha1.CloudEventConditionUnknown), + ), + ) + taskRunWithCESuccededOneAttempt := tb.TaskRun("test-taskrun-ce-succeeded-one-attempt", "foo", + tb.TaskRunSelfLink("/task/1234"), + tb.TaskRunSpec( + tb.TaskRunTaskRef(twoOutputsTask.Name), + tb.TaskRunOutputs( + tb.TaskRunOutputsResource(cloudEventResource.Name), + tb.TaskRunOutputsResource(anotherCloudEventResource.Name), + ), + ), + tb.TaskRunStatus( + tb.StatusCondition(apis.Condition{ + Type: apis.ConditionSucceeded, + Status: corev1.ConditionTrue, + }), + tb.TaskRunCloudEvent(cloudEventTarget1, "", 1, v1alpha1.CloudEventConditionUnknown), + tb.TaskRunCloudEvent(cloudEventTarget2, "fakemessage", 0, v1alpha1.CloudEventConditionUnknown), + ), + ) + taskruns := []*v1alpha1.TaskRun{ + taskRunWithNoCEResources, taskRunWithTwoCEResourcesNoInit, + taskRunWithTwoCEResourcesInit, taskRunWithCESucceded, taskRunWithCEFailed, + taskRunWithCESuccededOneAttempt, + } + + d := test.Data{ + TaskRuns: taskruns, + Tasks: []*v1alpha1.Task{simpleTask, twoOutputsTask}, + ClusterTasks: []*v1alpha1.ClusterTask{}, + PipelineResources: []*v1alpha1.PipelineResource{cloudEventResource, anotherCloudEventResource}, + } + for _, tc := range []struct { + name string + taskRun *v1alpha1.TaskRun + wantCloudEvents []v1alpha1.CloudEventDelivery + }{{ + name: "no-ce-resources", + taskRun: taskRunWithNoCEResources, + wantCloudEvents: taskRunWithNoCEResources.Status.CloudEvents, + }, { + name: "ce-resources-no-init", + taskRun: taskRunWithTwoCEResourcesNoInit, + wantCloudEvents: tb.TaskRun("want", "foo", tb.TaskRunStatus( + tb.TaskRunCloudEvent(cloudEventTarget1, "", 0, v1alpha1.CloudEventConditionUnknown), + tb.TaskRunCloudEvent(cloudEventTarget2, "", 0, v1alpha1.CloudEventConditionUnknown), + )).Status.CloudEvents, + }, { + name: "ce-resources-init", + taskRun: taskRunWithTwoCEResourcesInit, + wantCloudEvents: tb.TaskRun("want2", "foo", tb.TaskRunStatus( + tb.TaskRunCloudEvent(cloudEventTarget1, "", 0, v1alpha1.CloudEventConditionUnknown), + tb.TaskRunCloudEvent(cloudEventTarget2, "", 0, v1alpha1.CloudEventConditionUnknown), + )).Status.CloudEvents, + }, { + name: "ce-resources-init-task-successful", + taskRun: taskRunWithCESucceded, + wantCloudEvents: tb.TaskRun("want3", "foo", tb.TaskRunStatus( + tb.TaskRunCloudEvent(cloudEventTarget1, "", 1, v1alpha1.CloudEventConditionSent), + tb.TaskRunCloudEvent(cloudEventTarget2, "", 1, v1alpha1.CloudEventConditionSent), + )).Status.CloudEvents, + }, { + name: "ce-resources-init-task-failed", + taskRun: taskRunWithCEFailed, + wantCloudEvents: tb.TaskRun("want4", "foo", tb.TaskRunStatus( + tb.TaskRunCloudEvent(cloudEventTarget1, "", 1, v1alpha1.CloudEventConditionSent), + tb.TaskRunCloudEvent(cloudEventTarget2, "", 1, v1alpha1.CloudEventConditionSent), + )).Status.CloudEvents, + }, { + name: "ce-resources-init-task-successful-one-attempt", + taskRun: taskRunWithCESuccededOneAttempt, + wantCloudEvents: tb.TaskRun("want5", "foo", tb.TaskRunStatus( + tb.TaskRunCloudEvent(cloudEventTarget1, "", 1, v1alpha1.CloudEventConditionUnknown), + tb.TaskRunCloudEvent(cloudEventTarget2, "fakemessage", 1, v1alpha1.CloudEventConditionSent), + )).Status.CloudEvents, + }} { + t.Run(tc.name, func(t *testing.T) { + names.TestingSeed() + testAssets, cancel := getTaskRunController(t, d) + defer cancel() + c := testAssets.Controller + clients := testAssets.Clients + + saName := tc.taskRun.Spec.ServiceAccount + if saName == "" { + saName = "default" + } + if _, err := clients.Kube.CoreV1().ServiceAccounts(tc.taskRun.Namespace).Create(&corev1.ServiceAccount{ + ObjectMeta: metav1.ObjectMeta{ + Name: saName, + Namespace: tc.taskRun.Namespace, + }, + }); err != nil { + t.Fatal(err) + } + + if err := c.Reconciler.Reconcile(context.Background(), getRunName(tc.taskRun)); err != nil { + t.Errorf("expected no error. Got error %v", err) + } + namespace, name, err := cache.SplitMetaNamespaceKey(tc.taskRun.Name) + if err != nil { + t.Errorf("Invalid resource key: %v", err) + } + + tr, err := clients.Pipeline.TektonV1alpha1().TaskRuns(namespace).Get(name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("getting updated taskrun: %v", err) + } + opts := cloudevent.GetCloudEventDeliveryCompareOptions() + t.Log(tr.Status.CloudEvents) + if diff := cmp.Diff(tc.wantCloudEvents, tr.Status.CloudEvents, opts...); diff != "" { + t.Errorf("Unexpected status of cloud events (-want +got) = %s", diff) + } + }) + } +} diff --git a/test/builder/task.go b/test/builder/task.go index 6cc54e25b64..52973a5fa48 100644 --- a/test/builder/task.go +++ b/test/builder/task.go @@ -314,6 +314,24 @@ func TaskRunStartTime(startTime time.Time) TaskRunStatusOp { } } +// TaskRunCloudEvent adds an event to the TaskRunStatus. +func TaskRunCloudEvent(target, error string, retryCount int32, condition v1alpha1.CloudEventCondition) TaskRunStatusOp { + return func(s *v1alpha1.TaskRunStatus) { + if len(s.CloudEvents) == 0 { + s.CloudEvents = make([]v1alpha1.CloudEventDelivery, 0) + } + cloudEvent := v1alpha1.CloudEventDelivery{ + Target: target, + Status: v1alpha1.CloudEventDeliveryState{ + Condition: condition, + RetryCount: retryCount, + Error: error, + }, + } + s.CloudEvents = append(s.CloudEvents, cloudEvent) + } +} + // TaskRunTimeout sets the timeout duration to the TaskRunSpec. func TaskRunTimeout(d time.Duration) TaskRunSpecOp { return func(spec *v1alpha1.TaskRunSpec) { @@ -396,6 +414,13 @@ func TaskRunAnnotation(key, value string) TaskRunOp { } } +// TaskRunSelfLink adds a SelfLink +func TaskRunSelfLink(selflink string) TaskRunOp { + return func(tr *v1alpha1.TaskRun) { + tr.ObjectMeta.SelfLink = selflink + } +} + // TaskRunSpec sets the specified spec of the TaskRun. // Any number of TaskRunSpec modifier can be passed to transform it. func TaskRunSpec(ops ...TaskRunSpecOp) TaskRunOp {