Skip to content

Commit

Permalink
Add a cloud event helper
Browse files Browse the repository at this point in the history
Add a cloud event helper that is going to be used by the
CloudEventPipelineResource to send cloud events.
This includes an initial list of the type of events that can be
generated by Tekton.

Signed-off-by: Andrea Frittoli <andrea.frittoli@uk.ibm.com>
  • Loading branch information
afrittoli committed Jul 16, 2019
1 parent c4858b6 commit b5d8cc0
Show file tree
Hide file tree
Showing 75 changed files with 7,323 additions and 0 deletions.
33 changes: 33 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

113 changes: 113 additions & 0 deletions pkg/reconciler/v1alpha1/taskrun/resources/cloudevent/cloudevent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
Copyright 2019 The Tekton Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package cloudevent

import (
"context"
"encoding/json"
"errors"
"fmt"
"time"

"github.com/cloudevents/sdk-go/pkg/cloudevents"
"github.com/cloudevents/sdk-go/pkg/cloudevents/client"
cecontext "github.com/cloudevents/sdk-go/pkg/cloudevents/context"
"github.com/cloudevents/sdk-go/pkg/cloudevents/types"
"github.com/knative/eventing-sources/pkg/kncloudevents"
"github.com/knative/pkg/apis"
"go.uber.org/zap"

"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1"
)

// TektonEventType holds the types of cloud events sent by Tekton
type TektonEventType string

const (
// TektonTaskRunUnknown is sent for TaskRuns with "ConditionSucceeded" "Unknown"
TektonTaskRunUnknown TektonEventType = "dev.tekton.event.task.unknown"
// TektonTaskRunSuccessful is sent for TaskRuns with "ConditionSucceeded" "True"
TektonTaskRunSuccessful TektonEventType = "dev.tekton.event.task.successful"
// TektonTaskRunFailed is sent for TaskRuns with "ConditionSucceeded" "False"
TektonTaskRunFailed TektonEventType = "dev.tekton.event.task.failed"
)

// CEClient matches the `Client` interface from github.com/cloudevents/sdk-go/pkg/cloudevents
type CEClient client.Client

// SendCloudEvent sends a Cloud Event to the specified SinkURI
func SendCloudEvent(sinkURI, eventID, eventSourceURI string, data []byte, eventType TektonEventType, logger *zap.SugaredLogger, cloudEventClient CEClient) (cloudevents.Event, error) {
var event cloudevents.Event

cloudEventSource := types.ParseURLRef(eventSourceURI)
if cloudEventSource == nil {
logger.Errorf("Invalid eventSourceURI: %s", eventSourceURI)
return event, fmt.Errorf("Invalid eventSourceURI: %s", eventSourceURI)
}

event = cloudevents.Event{
Context: cloudevents.EventContextV02{
ID: eventID,
Type: string(eventType),
Source: *cloudEventSource,
Time: &types.Timestamp{Time: time.Now()},
Extensions: nil,
}.AsV02(),
Data: data,
}
ctxt := cecontext.WithTarget(context.TODO(), sinkURI)
_, err := cloudEventClient.Send(ctxt, event)
if err != nil {
logger.Errorf("Error sending the cloud-event: %s", err)
return event, err
}
return event, nil
}

// SendTaskRunCloudEvent sends a cloud event for a TaskRun
func SendTaskRunCloudEvent(sinkURI string, taskRun *v1alpha1.TaskRun, logger *zap.SugaredLogger, cloudEventClient CEClient) (cloudevents.Event, error) {
var event cloudevents.Event
var err error
// Check if a client was provided, if not build one on the fly
if cloudEventClient == nil {
cloudEventClient, err = kncloudevents.NewDefaultClient()
if err != nil {
logger.Errorf("Error creating the cloud-event client: %s", err)
return event, err
}
}
// Check if the TaskRun is defined
if taskRun == nil {
return event, errors.New("Cannot send an event for an empty TaskRun")
}
eventID := taskRun.ObjectMeta.Name
taskRunStatus := taskRun.Status.GetCondition(apis.ConditionSucceeded)
var eventType TektonEventType
if taskRunStatus.IsUnknown() {
eventType = TektonTaskRunUnknown
} else if taskRunStatus.IsFalse() {
eventType = TektonTaskRunFailed
} else if taskRunStatus.IsTrue() {
eventType = TektonTaskRunSuccessful
} else {
return event, fmt.Errorf("Unknown condition for in TaskRun.Status %s", taskRunStatus.Status)
}
eventSourceURI := taskRun.ObjectMeta.SelfLink
data, _ := json.Marshal(taskRun)
event, err = SendCloudEvent(sinkURI, eventID, eventSourceURI, data, eventType, logger, cloudEventClient)
return event, err
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
/*
Copyright 2019 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package cloudevent

import (
"encoding/json"
"fmt"
"regexp"
"testing"

"github.com/google/go-cmp/cmp"
"github.com/knative/eventing-sources/pkg/kncloudevents"
"github.com/knative/pkg/apis"
duckv1beta1 "github.com/knative/pkg/apis/duck/v1beta1"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1"
"github.com/tektoncd/pipeline/pkg/logging"
"github.com/tektoncd/pipeline/test/names"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const (
defaultSinkURI = "http://sink"
invalidSinkURI = "invalid_URI"
defaultEventID = "event1234"
defaultEventSourceURI = "/taskrun/1234"
invalidEventSourceURI = "htt%23p://_invalid_URI##"
defaultEventType = TektonTaskRunUnknown
taskRunName = "faketaskrunname"
invalidConditionSuccessStatus = "foobar"
)

var (
defaultRawData = []byte(`{"metadata": {"name":"faketaskrun"}}`)
nilEventType TektonEventType
defaultCloudEventClient, _ = kncloudevents.NewDefaultClient()
happyClientBehaviour = FakeClientBehaviour{SendSuccessfully: true}
failingClientBehaviour = FakeClientBehaviour{SendSuccessfully: false}
)

func TestSendCloudEvent(t *testing.T) {
for _, c := range []struct {
desc string
sinkURI string
eventSourceURI string
cloudEventClient CEClient
wantErr bool
errRegexp string
}{{
desc: "send a cloud event when all inputs are valid",
sinkURI: defaultSinkURI,
eventSourceURI: defaultEventSourceURI,
cloudEventClient: NewFakeClient(&happyClientBehaviour),
wantErr: false,
errRegexp: "",
}, {
desc: "send a cloud event with invalid sink URI",
sinkURI: invalidSinkURI,
eventSourceURI: defaultEventSourceURI,
cloudEventClient: defaultCloudEventClient,
wantErr: true,
errRegexp: fmt.Sprintf("%s: unsupported protocol scheme", invalidSinkURI),
}, {
desc: "send a cloud event, fail to send",
sinkURI: defaultSinkURI,
eventSourceURI: defaultEventSourceURI,
cloudEventClient: NewFakeClient(&failingClientBehaviour),
wantErr: true,
errRegexp: fmt.Sprintf("%s had to fail", defaultEventID),
}, {
desc: "send a cloud event with invalid source URI",
sinkURI: defaultSinkURI,
eventSourceURI: invalidEventSourceURI,
cloudEventClient: defaultCloudEventClient,
wantErr: true,
errRegexp: fmt.Sprintf("Invalid eventSourceURI: %s", invalidEventSourceURI),
}} {
t.Run(c.desc, func(t *testing.T) {
logger, _ := logging.NewLogger("", "")
names.TestingSeed()
_, err := SendCloudEvent(c.sinkURI, defaultEventID, c.eventSourceURI, defaultRawData, defaultEventType, logger, c.cloudEventClient)
if c.wantErr != (err != nil) {
if c.wantErr {
t.Fatalf("I expected an error but I got nil")
} else {
t.Fatalf("I did not expect an error but I got %s", err)
}
} else {
if c.wantErr {
match, _ := regexp.Match(c.errRegexp, []byte(err.Error()))
if !match {
t.Fatalf("I expected an error like %s, but I got %s", c.errRegexp, err)
}
}
}
})
}
}

func getTaskRunByCondition(status corev1.ConditionStatus) *v1alpha1.TaskRun {
return &v1alpha1.TaskRun{
ObjectMeta: metav1.ObjectMeta{
Name: taskRunName,
Namespace: "marshmallow",
SelfLink: defaultEventSourceURI,
},
Spec: v1alpha1.TaskRunSpec{},
Status: v1alpha1.TaskRunStatus{
Status: duckv1beta1.Status{
Conditions: []apis.Condition{{
Type: apis.ConditionSucceeded,
Status: status,
}},
},
},
}
}

func TestSendTaskRunCloudEvent(t *testing.T) {
for _, c := range []struct {
desc string
taskRun *v1alpha1.TaskRun
wantEventType TektonEventType
wantErr bool
errRegexp string
}{{
desc: "send a cloud event with a nil taskrun",
taskRun: nil,
wantEventType: nilEventType,
wantErr: true,
errRegexp: "Cannot send an event for an empty TaskRun",
}, {
desc: "send a cloud event with unknown status taskrun",
taskRun: getTaskRunByCondition(corev1.ConditionUnknown),
wantEventType: TektonTaskRunUnknown,
wantErr: false,
errRegexp: "",
}, {
desc: "send a cloud event with successful status taskrun",
taskRun: getTaskRunByCondition(corev1.ConditionTrue),
wantEventType: TektonTaskRunSuccessful,
wantErr: false,
errRegexp: "",
}, {
desc: "send a cloud event with unknown status taskrun",
taskRun: getTaskRunByCondition(corev1.ConditionFalse),
wantEventType: TektonTaskRunFailed,
wantErr: false,
errRegexp: "",
}, {
desc: "send a cloud event with invalid status taskrun",
taskRun: getTaskRunByCondition(invalidConditionSuccessStatus),
wantEventType: nilEventType,
wantErr: true,
errRegexp: fmt.Sprintf("Unknown condition for in TaskRun.Status %s", invalidConditionSuccessStatus),
}} {
t.Run(c.desc, func(t *testing.T) {
logger, _ := logging.NewLogger("", "")
names.TestingSeed()
event, err := SendTaskRunCloudEvent(defaultSinkURI, c.taskRun, logger, NewFakeClient(&happyClientBehaviour))
if c.wantErr != (err != nil) {
if c.wantErr {
t.Fatalf("I expected an error but I got nil")
} else {
t.Fatalf("I did not expect an error but I got %s", err)
}
} else {
if c.wantErr {
match, _ := regexp.Match(c.errRegexp, []byte(err.Error()))
if !match {
t.Fatalf("I expected an error like %s, but I got %s", c.errRegexp, err)
}
} else {
wantEventID := taskRunName
if diff := cmp.Diff(wantEventID, event.Context.GetID()); diff != "" {
t.Errorf("Wrong Event ID (-want +got) = %s", diff)
}
gotEventType := event.Context.GetType()
if diff := cmp.Diff(string(c.wantEventType), gotEventType); diff != "" {
t.Errorf("Wrong Event Type (-want +got) = %s", diff)
}
wantData, _ := json.Marshal(c.taskRun)
gotData, err := event.DataBytes()
if err != nil {
t.Fatalf("Could not get data from event %v: %v", event, err)
}
if diff := cmp.Diff(wantData, gotData); diff != "" {
t.Errorf("Wrong Event data (-want +got) = %s", diff)
}
}
}
})
}
}
Loading

0 comments on commit b5d8cc0

Please sign in to comment.