Skip to content

Commit

Permalink
Cloud Event pipeline resource
Browse files Browse the repository at this point in the history
Implements in the TaskRun controller the logic to provide
the cloud event pipeline resource.

This commits puts together the API, cloud event helper and
resource definition from four pull requests:
- #1090
- #1091
- #1092

It adds unit tests for the new code and one E2E YAML test.
The YAML test runs a simple http server that can receive the
cloudevent for test purposes.

The list of cloud events to be sent is added to the TaskRun
status and processed by the TaskRun controller once the pod
associated to the TaskRun completes its execution.
The `isDone` definition of the TaskRun is not altered, the
reconciler checks for events to be sent once the
TaskRun.isDone is true.

Retries are not implemented yet in the sense that every
scheduled event will be attempted exactly once, but it may
be that those attempts happen across different invocations
of Reconcile.

Signed-off-by: Andrea Frittoli <andrea.frittoli@uk.ibm.com>
  • Loading branch information
afrittoli authored and tekton-robot committed Aug 15, 2019
1 parent 9e964ec commit 53c936b
Show file tree
Hide file tree
Showing 11 changed files with 879 additions and 75 deletions.
63 changes: 63 additions & 0 deletions docs/resources.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ The following `PipelineResources` are currently supported:
- [Storage Resource](#storage-resource)
- [GCS Storage Resource](#gcs-storage-resource)
- [BuildGCS Storage Resource](#buildgcs-storage-resource)
- [Cloud Event Resource](#cloud-event-resource)

### Git Resource

Expand Down Expand Up @@ -609,6 +610,68 @@ the container image
[gcr.io/cloud-builders//gcs-fetcher](https://github.com/GoogleCloudPlatform/cloud-builders/tree/master/gcs-fetcher)
does not support configuring secrets.

### Cloud Event Resource

The Cloud Event Resource represents a [cloud event](https://github.com/cloudevents/spec)
that is sent to a target `URI` upon completion of a `TaskRun`.
The Cloud Event Resource sends Tekton specific events; the body of the event includes
the entire TaskRun spec plus status; the types of events defined for now are:

- dev.tekton.event.task.unknown
- dev.tekton.event.task.successful
- dev.tekton.event.task.failed

Cloud event resources are useful to notify a third party upon the completion and
status of a `TaskRun`. In combinations with the [Tekton triggers](https://github.com/tektoncd/triggers)
project they can be used to link `Task/PipelineRuns` asynchronously.

To create a CloudEvent resource using the `PipelineResource` CRD:

```yaml
apiVersion: tekton.dev/v1alpha1
kind: PipelineResource
metadata:
name: event-to-sink
spec:
type: cloudevent
params:
- name: targetURI
value: http://sink:8080
```

The content of an event is for example:

```yaml
Context Attributes,
SpecVersion: 0.2
Type: dev.tekton.event.task.successful
Source: /apis/tekton.dev/v1alpha1/namespaces/default/taskruns/pipeline-run-api-16aa55-source-to-image-task-rpndl
ID: pipeline-run-api-16aa55-source-to-image-task-rpndl
Time: 2019-07-04T11:03:53.058694712Z
ContentType: application/json
Transport Context,
URI: /
Host: my-sink.default.my-cluster.containers.appdomain.cloud
Method: POST
Data,
{
"taskRun": {
"metadata": {...}
"spec": {
"inputs": {...}
"outputs": {...}
"serviceAccount": "default",
"taskRef": {
"name": "source-to-image",
"kind": "Task"
},
"timeout": "1h0m0s"
},
"status": {...}
}
}
```

Except as otherwise noted, the content of this page is licensed under the
[Creative Commons Attribution 4.0 License](https://creativecommons.org/licenses/by/4.0/),
and code samples are licensed under the
Expand Down
187 changes: 187 additions & 0 deletions examples/taskruns/taskrun-cloud-event.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
---
apiVersion: v1
kind: Service
metadata:
name: sink
namespace: default
spec:
selector:
app: cloudevent
ports:
- protocol: TCP
port: 8080
targetPort: 8080
---
apiVersion: v1
kind: Pod
metadata:
labels:
app: cloudevent
name: message-sink
namespace: default
spec:
containers:
- env:
- name: PORT
value: "8080"
name: cloudeventlistener
image: python:3-alpine
imagePullPolicy: IfNotPresent
command: ["/bin/sh"]
args:
- -ce
- |
cat <<EOF | python
from http.server import BaseHTTPRequestHandler, HTTPServer
class GetAndPostHandler(BaseHTTPRequestHandler):
def do_POST(self):
content = self.rfile.read(int(self.headers.get('Content-Length')))
with open("content.txt", mode="wb") as f:
f.write(content)
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write(b'<html><body><h1>POST!</h1></body></html>')
def do_GET(self):
with open("content.txt", mode="rb") as f:
content = f.read()
self.send_response(200 if content else 404)
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.wfile.write(content)
if __name__ == "__main__":
open("content.txt", 'a').close()
httpd = HTTPServer(('', $PORT), GetAndPostHandler)
print('Starting httpd...')
httpd.serve_forever()
EOF
ports:
- containerPort: 8080
name: user-port
protocol: TCP
---
apiVersion: tekton.dev/v1alpha1
kind: PipelineResource
metadata:
name: to-message-sink
spec:
type: cloudEvent
params:
- name: targetURI
value: http://sink.default:8080
---
apiVersion: tekton.dev/v1alpha1
kind: PipelineResource
metadata:
name: fake-image
spec:
type: image
params:
- name: url
value: fake-registry/test/fake-image
---
apiVersion: tekton.dev/v1alpha1
kind: Task
metadata:
name: send-cloud-event-task
spec:
outputs:
resources:
- name: myimage
type: image
- name: notification
type: cloudEvent
steps:
- name: build-index-json
image: busybox
command:
- /bin/sh
args:
- -ce
- |
set -e
mkdir -p /builder/home/image-outputs/myimage/
cat <<EOF > /builder/home/image-outputs/myimage/index.json
{
"schemaVersion": 2,
"manifests": [
{
"mediaType": "application/vnd.oci.image.index.v1+json",
"size": 314,
"digest": "sha256:NOTAREALDIGEST"
}
]
}
EOF
---
apiVersion: tekton.dev/v1alpha1
kind: Task
metadata:
name: poll-for-content-task
spec:
steps:
- name: polling
image: python:3-alpine
imagePullPolicy: IfNotPresent
command: ["/bin/sh"]
args:
- -ce
- |
cat <<EOF | python
import http.client
import json
import sys
import time
while True:
conn = http.client.HTTPConnection("sink.default:8080")
try:
conn.request("GET", "/")
except:
# Perhaps the service is not setup yet, so service name does not
# resolve or it does not accept connections on 8080 yet
print("Not yet...")
time.sleep(10)
continue
response = conn.getresponse()
if response.status == 200:
print("Got it!")
taskrun = json.loads(response.read().decode('utf-8'))
digest = taskrun['taskRun']['status']['resourcesResult'][0]['digest']
image_name = taskrun['taskRun']['status']['resourcesResult'][0]['name']
print("Got digest %s for image %s" % (digest, image_name))
if image_name == "fake-image" and digest:
break
else:
sys.exit(1)
else:
print("Not yet...")
time.sleep(10)
---
apiVersion: tekton.dev/v1alpha1
kind: TaskRun
metadata:
name: send-cloud-event
spec:
outputs:
resources:
- name: myimage
resourceRef:
name: fake-image
- name: notification
resourceRef:
name: to-message-sink
taskRef:
name: send-cloud-event-task
---
apiVersion: tekton.dev/v1alpha1
kind: TaskRun
metadata:
name: poll-for-content-run
spec:
taskRef:
name: poll-for-content-task
20 changes: 0 additions & 20 deletions pkg/apis/pipeline/v1alpha1/taskrun_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,26 +171,6 @@ func (tr *TaskRunStatus) SetCondition(newCond *apis.Condition) {
}
}

// InitializeCloudEvents initializes the CloudEvents part of the TaskRunStatus
// from a list of event targets
func (tr *TaskRunStatus) InitializeCloudEvents(targets []string) {
// len(nil slice) is 0
if len(targets) > 0 {
initialState := CloudEventDeliveryState{
Condition: CloudEventConditionUnknown,
RetryCount: 0,
}
events := make([]CloudEventDelivery, len(targets))
for idx, target := range targets {
events[idx] = CloudEventDelivery{
Target: target,
Status: initialState,
}
}
tr.CloudEvents = events
}
}

// StepState reports the results of running a step in the Task.
type StepState struct {
corev1.ContainerState
Expand Down
50 changes: 0 additions & 50 deletions pkg/apis/pipeline/v1alpha1/taskrun_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,53 +151,3 @@ func TestTaskRunHasStarted(t *testing.T) {
})
}
}

func TestInitializeCloudEvents(t *testing.T) {
tests := []struct {
name string
targets []string
wantCloudEvents []v1alpha1.CloudEventDelivery
}{{
name: "testWithNilTarget",
targets: nil,
wantCloudEvents: nil,
}, {
name: "testWithEmptyListTarget",
targets: make([]string, 0),
wantCloudEvents: nil,
}, {
name: "testWithTwoTargets",
targets: []string{"target1", "target2"},
wantCloudEvents: []v1alpha1.CloudEventDelivery{
{
Target: "target1",
Status: v1alpha1.CloudEventDeliveryState{
Condition: v1alpha1.CloudEventConditionUnknown,
SentAt: nil,
Error: "",
RetryCount: 0,
},
},
{
Target: "target2",
Status: v1alpha1.CloudEventDeliveryState{
Condition: v1alpha1.CloudEventConditionUnknown,
SentAt: nil,
Error: "",
RetryCount: 0,
},
},
},
}}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
tr := tb.TaskRun("taskrunname", "testns", tb.TaskRunStatus())
trs := tr.Status
trs.InitializeCloudEvents(tc.targets)
gotCloudEvents := trs.CloudEvents
if diff := cmp.Diff(tc.wantCloudEvents, gotCloudEvents); diff != "" {
t.Errorf("Wrong Cloud Events (-want +got) = %s", diff)
}
})
}
}
2 changes: 2 additions & 0 deletions pkg/reconciler/v1alpha1/taskrun/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
taskruninformer "github.com/tektoncd/pipeline/pkg/client/injection/informers/pipeline/v1alpha1/taskrun"
"github.com/tektoncd/pipeline/pkg/reconciler"
"github.com/tektoncd/pipeline/pkg/reconciler/v1alpha1/taskrun/entrypoint"
cloudeventclient "github.com/tektoncd/pipeline/pkg/reconciler/v1alpha1/taskrun/resources/cloudevent"
"k8s.io/client-go/tools/cache"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
Expand Down Expand Up @@ -69,6 +70,7 @@ func NewController(
clusterTaskLister: clusterTaskInformer.Lister(),
resourceLister: resourceInformer.Lister(),
timeoutHandler: timeoutHandler,
cloudEventClient: cloudeventclient.Get(ctx),
}
impl := controller.NewImpl(c, c.Logger, taskRunControllerName)

Expand Down
Loading

0 comments on commit 53c936b

Please sign in to comment.