Skip to content

Commit

Permalink
Populate the *Run.Status.Provenance.ConfigSource field
Browse files Browse the repository at this point in the history
Prior, remote `ResolutionRequest` CRD supports **recording** the source information
in its Status that identifies where the remote resource came from.
Similarly, `TaskRun/PipelineRun` CRD supports **receiving** the source information
via a field in its status from remote ResolutionRequest. Specifically,
this field named `ConfigSource` is a subfield of the `Provenance` field in status.

In this PR, we are trying to pass the data from `ResolutionRequest` to pipeline
reconciler so that the data can be captured in TaskRun/PipelineRun's `Status.Provenance.ConfigSource`.
The implication of this change is that many functions' interface has been
changed because we are passing this extra source data alongside the remote resoure.

Note:
- The `provenance` field in Run.status is behind a feature flag named `enable-provenance-in-status`
, which was introduced in tektoncd#5670. The field will be populated iff
the flag is set to `true`.
- If a pipeline yaml is from remote place A, and the individual tasks
are from other remote places, pipelinerun status will only record the source
for the pipeline, and the individual taskrun status will record the
source for the corresponding task.

Related PRs:
- tektoncd#5580
- tektoncd#5551
- tektoncd#5670

Signed-off-by: Chuang Wang <chuangw@google.com>
  • Loading branch information
chuangw6 committed Oct 24, 2022
1 parent a132c7a commit 70b071e
Show file tree
Hide file tree
Showing 20 changed files with 403 additions and 152 deletions.
15 changes: 13 additions & 2 deletions pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun, get
if len(pipelineSpec.Finally) > 0 {
tasks = append(tasks, pipelineSpec.Finally...)
}
pipelineRunState, err := c.resolvePipelineState(ctx, tasks, pipelineMeta, pr, providedResources)
pipelineRunState, err := c.resolvePipelineState(ctx, tasks, pipelineMeta.ObjectMeta, pr, providedResources)
switch {
case errors.Is(err, remote.ErrorRequestInProgress):
message := fmt.Sprintf("PipelineRun %s/%s awaiting remote resource", pr.Namespace, pr.Name)
Expand Down Expand Up @@ -1253,10 +1253,21 @@ func (c *Reconciler) updateLabelsAndAnnotations(ctx context.Context, pr *v1beta1
return newPr, nil
}

func storePipelineSpecAndMergeMeta(pr *v1beta1.PipelineRun, ps *v1beta1.PipelineSpec, meta *metav1.ObjectMeta) error {
func storePipelineSpecAndMergeMeta(pr *v1beta1.PipelineRun, ps *v1beta1.PipelineSpec, meta *tresources.ResolvedObjectMeta) error {
// Only store the PipelineSpec once, if it has never been set before.
if pr.Status.PipelineSpec == nil {
pr.Status.PipelineSpec = ps
if meta == nil {
return nil
}

// Propogate ConfigSource from remote resolution to PipelineRun Status
if meta.ConfigSource != nil {
if pr.Status.Provenance == nil {
pr.Status.Provenance = &v1beta1.Provenance{}
}
pr.Status.Provenance.ConfigSource = meta.ConfigSource
}

// Propagate labels from Pipeline to PipelineRun. PipelineRun labels take precedences over Pipeline.
pr.ObjectMeta.Labels = kmap.Union(meta.Labels, pr.ObjectMeta.Labels)
Expand Down
29 changes: 25 additions & 4 deletions pkg/reconciler/pipelinerun/pipelinerun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
resourcev1alpha1 "github.com/tektoncd/pipeline/pkg/apis/resource/v1alpha1"
"github.com/tektoncd/pipeline/pkg/reconciler/events/cloudevent"
"github.com/tektoncd/pipeline/pkg/reconciler/pipelinerun/resources"
tresources "github.com/tektoncd/pipeline/pkg/reconciler/taskrun/resources"
ttesting "github.com/tektoncd/pipeline/pkg/reconciler/testing"
"github.com/tektoncd/pipeline/pkg/reconciler/volumeclaim"
resolutioncommon "github.com/tektoncd/pipeline/pkg/resolution/common"
Expand Down Expand Up @@ -5535,7 +5536,7 @@ status:
}
}

func Test_storePipelineSpec(t *testing.T) {
func Test_storePipelineSpecAndConfigSource(t *testing.T) {
pr := parse.MustParsePipelineRun(t, `
metadata:
name: test-pipeline-run-success
Expand All @@ -5552,20 +5553,38 @@ metadata:
want.Status = v1beta1.PipelineRunStatus{
PipelineRunStatusFields: v1beta1.PipelineRunStatusFields{
PipelineSpec: ps.DeepCopy(),
Provenance: &v1beta1.Provenance{
ConfigSource: &v1beta1.ConfigSource{
URI: "abc.com",
Digest: map[string]string{
"sha1": "a123",
},
EntryPoint: "foo/bar",
},
},
},
}
want.ObjectMeta.Labels["tekton.dev/pipeline"] = pr.ObjectMeta.Name

// The first time we set it, it should get copied.
if err := storePipelineSpecAndMergeMeta(pr, &ps, &pr.ObjectMeta); err != nil {
if err := storePipelineSpecAndMergeMeta(pr, &ps, &tresources.ResolvedObjectMeta{
ObjectMeta: &pr.ObjectMeta,
ConfigSource: &v1beta1.ConfigSource{
URI: "abc.com",
Digest: map[string]string{
"sha1": "a123",
},
EntryPoint: "foo/bar",
},
}); err != nil {
t.Errorf("storePipelineSpec() error = %v", err)
}
if d := cmp.Diff(pr, want); d != "" {
t.Fatalf(diff.PrintWantGot(d))
}

// The next time, it should not get overwritten
if err := storePipelineSpecAndMergeMeta(pr, &ps1, &metav1.ObjectMeta{}); err != nil {
if err := storePipelineSpecAndMergeMeta(pr, &ps1, &tresources.ResolvedObjectMeta{}); err != nil {
t.Errorf("storePipelineSpec() error = %v", err)
}
if d := cmp.Diff(pr, want); d != "" {
Expand All @@ -5585,7 +5604,9 @@ func Test_storePipelineSpec_metadata(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{Name: "foo", Labels: pipelinerunlabels, Annotations: pipelinerunannotations},
}
meta := metav1.ObjectMeta{Name: "bar", Labels: pipelinelabels, Annotations: pipelineannotations}
if err := storePipelineSpecAndMergeMeta(pr, &v1beta1.PipelineSpec{}, &meta); err != nil {
if err := storePipelineSpecAndMergeMeta(pr, &v1beta1.PipelineSpec{}, &tresources.ResolvedObjectMeta{
ObjectMeta: &meta,
}); err != nil {
t.Errorf("storePipelineSpecAndMergeMeta error = %v", err)
}
if d := cmp.Diff(pr.ObjectMeta.Labels, wantedlabels); d != "" {
Expand Down
23 changes: 16 additions & 7 deletions pkg/reconciler/pipelinerun/pipelinespec/pipelinespec.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,32 +22,37 @@ import (
"fmt"

"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
tresources "github.com/tektoncd/pipeline/pkg/reconciler/taskrun/resources"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// GetPipeline is a function used to retrieve Pipelines.
type GetPipeline func(context.Context, string) (v1beta1.PipelineObject, error)
type GetPipeline func(context.Context, string) (v1beta1.PipelineObject, *v1beta1.ConfigSource, error)

// GetPipelineData will retrieve the Pipeline metadata and Spec associated with the
// provided PipelineRun. This can come from a reference Pipeline or from the PipelineRun's
// metadata and embedded PipelineSpec.
func GetPipelineData(ctx context.Context, pipelineRun *v1beta1.PipelineRun, getPipeline GetPipeline) (*metav1.ObjectMeta, *v1beta1.PipelineSpec, error) {
func GetPipelineData(ctx context.Context, pipelineRun *v1beta1.PipelineRun, getPipeline GetPipeline) (*tresources.ResolvedObjectMeta, *v1beta1.PipelineSpec, error) {
pipelineMeta := metav1.ObjectMeta{}
var configSource *v1beta1.ConfigSource
pipelineSpec := v1beta1.PipelineSpec{}
switch {
case pipelineRun.Spec.PipelineRef != nil && pipelineRun.Spec.PipelineRef.Name != "":
// Get related pipeline for pipelinerun
t, err := getPipeline(ctx, pipelineRun.Spec.PipelineRef.Name)
p, source, err := getPipeline(ctx, pipelineRun.Spec.PipelineRef.Name)
if err != nil {
return nil, nil, fmt.Errorf("error when listing pipelines for pipelineRun %s: %w", pipelineRun.Name, err)
}
pipelineMeta = t.PipelineMetadata()
pipelineSpec = t.PipelineSpec()
pipelineMeta = p.PipelineMetadata()
pipelineSpec = p.PipelineSpec()
configSource = source
case pipelineRun.Spec.PipelineSpec != nil:
pipelineMeta = pipelineRun.ObjectMeta
pipelineSpec = *pipelineRun.Spec.PipelineSpec
// TODO (@chuangw6): if we want to set source for embedded pipeline, set it here.
// https://github.com/tektoncd/pipeline/issues/5522
case pipelineRun.Spec.PipelineRef != nil && pipelineRun.Spec.PipelineRef.Resolver != "":
pipeline, err := getPipeline(ctx, "")
pipeline, source, err := getPipeline(ctx, "")
switch {
case err != nil:
return nil, nil, err
Expand All @@ -57,10 +62,14 @@ func GetPipelineData(ctx context.Context, pipelineRun *v1beta1.PipelineRun, getP
pipelineMeta = pipeline.PipelineMetadata()
pipelineSpec = pipeline.PipelineSpec()
}
configSource = source
default:
return nil, nil, fmt.Errorf("pipelineRun %s not providing PipelineRef or PipelineSpec", pipelineRun.Name)
}

pipelineSpec.SetDefaults(ctx)
return &pipelineMeta, &pipelineSpec, nil
return &tresources.ResolvedObjectMeta{
ObjectMeta: &pipelineMeta,
ConfigSource: configSource,
}, &pipelineSpec, nil
}
60 changes: 41 additions & 19 deletions pkg/reconciler/pipelinerun/pipelinespec/pipelinespec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,20 +51,26 @@ func TestGetPipelineSpec_Ref(t *testing.T) {
},
},
}
gt := func(ctx context.Context, n string) (v1beta1.PipelineObject, error) { return pipeline, nil }
pipelineMeta, pipelineSpec, err := GetPipelineData(context.Background(), pr, gt)
gt := func(ctx context.Context, n string) (v1beta1.PipelineObject, *v1beta1.ConfigSource, error) {
return pipeline, nil, nil
}
resolvedObjectMeta, pipelineSpec, err := GetPipelineData(context.Background(), pr, gt)

if err != nil {
t.Fatalf("Did not expect error getting pipeline spec but got: %s", err)
}

if pipelineMeta.Name != "orchestrate" {
t.Errorf("Expected pipeline name to be `orchestrate` but was %q", pipelineMeta.Name)
if resolvedObjectMeta.Name != "orchestrate" {
t.Errorf("Expected pipeline name to be `orchestrate` but was %q", resolvedObjectMeta.Name)
}

if len(pipelineSpec.Tasks) != 1 || pipelineSpec.Tasks[0].Name != "mytask" {
t.Errorf("Pipeline Spec not resolved as expected, expected referenced Pipeline spec but got: %v", pipelineSpec)
}

if resolvedObjectMeta.ConfigSource != nil {
t.Errorf("Expected resolved configsource is nil, but got %v", resolvedObjectMeta.ConfigSource)
}
}

func TestGetPipelineSpec_Embedded(t *testing.T) {
Expand All @@ -83,22 +89,26 @@ func TestGetPipelineSpec_Embedded(t *testing.T) {
},
},
}
gt := func(ctx context.Context, n string) (v1beta1.PipelineObject, error) {
return nil, errors.New("shouldn't be called")
gt := func(ctx context.Context, n string) (v1beta1.PipelineObject, *v1beta1.ConfigSource, error) {
return nil, nil, errors.New("shouldn't be called")
}
pipelineMeta, pipelineSpec, err := GetPipelineData(context.Background(), pr, gt)
resolvedObjectMeta, pipelineSpec, err := GetPipelineData(context.Background(), pr, gt)

if err != nil {
t.Fatalf("Did not expect error getting pipeline spec but got: %s", err)
}

if pipelineMeta.Name != "mypipelinerun" {
t.Errorf("Expected pipeline name for embedded pipeline to default to name of pipeline run but was %q", pipelineMeta.Name)
if resolvedObjectMeta.Name != "mypipelinerun" {
t.Errorf("Expected pipeline name for embedded pipeline to default to name of pipeline run but was %q", resolvedObjectMeta.Name)
}

if len(pipelineSpec.Tasks) != 1 || pipelineSpec.Tasks[0].Name != "mytask" {
t.Errorf("Pipeline Spec not resolved as expected, expected embedded Pipeline spec but got: %v", pipelineSpec)
}

if resolvedObjectMeta.ConfigSource != nil {
t.Errorf("Expected resolved configsource is nil, but got %v", resolvedObjectMeta.ConfigSource)
}
}

func TestGetPipelineSpec_Invalid(t *testing.T) {
Expand All @@ -107,8 +117,8 @@ func TestGetPipelineSpec_Invalid(t *testing.T) {
Name: "mypipelinerun",
},
}
gt := func(ctx context.Context, n string) (v1beta1.PipelineObject, error) {
return nil, errors.New("shouldn't be called")
gt := func(ctx context.Context, n string) (v1beta1.PipelineObject, *v1beta1.ConfigSource, error) {
return nil, nil, errors.New("shouldn't be called")
}
_, _, err := GetPipelineData(context.Background(), tr, gt)
if err == nil {
Expand Down Expand Up @@ -148,11 +158,19 @@ func TestGetPipelineData_ResolutionSuccess(t *testing.T) {
},
}},
}
getPipeline := func(ctx context.Context, n string) (v1beta1.PipelineObject, error) {
expectedConfigSource := &v1beta1.ConfigSource{
URI: "abc.com",
Digest: map[string]string{
"sha1": "a123",
},
EntryPoint: "foo/bar",
}

getPipeline := func(ctx context.Context, n string) (v1beta1.PipelineObject, *v1beta1.ConfigSource, error) {
return &v1beta1.Pipeline{
ObjectMeta: *sourceMeta.DeepCopy(),
Spec: *sourceSpec.DeepCopy(),
}, nil
}, expectedConfigSource.DeepCopy(), nil
}
ctx := context.Background()
resolvedMeta, resolvedSpec, err := GetPipelineData(ctx, pr, getPipeline)
Expand All @@ -165,6 +183,10 @@ func TestGetPipelineData_ResolutionSuccess(t *testing.T) {
if d := cmp.Diff(sourceSpec, *resolvedSpec); d != "" {
t.Errorf(diff.PrintWantGot(d))
}

if d := cmp.Diff(expectedConfigSource, resolvedMeta.ConfigSource); d != "" {
t.Errorf("configsource did not match: %s", diff.PrintWantGot(d))
}
}

func TestGetPipelineSpec_Error(t *testing.T) {
Expand All @@ -178,8 +200,8 @@ func TestGetPipelineSpec_Error(t *testing.T) {
},
},
}
gt := func(ctx context.Context, n string) (v1beta1.PipelineObject, error) {
return nil, errors.New("something went wrong")
gt := func(ctx context.Context, n string) (v1beta1.PipelineObject, *v1beta1.ConfigSource, error) {
return nil, nil, errors.New("something went wrong")
}
_, _, err := GetPipelineData(context.Background(), tr, gt)
if err == nil {
Expand All @@ -200,8 +222,8 @@ func TestGetPipelineData_ResolutionError(t *testing.T) {
},
},
}
getPipeline := func(ctx context.Context, n string) (v1beta1.PipelineObject, error) {
return nil, errors.New("something went wrong")
getPipeline := func(ctx context.Context, n string) (v1beta1.PipelineObject, *v1beta1.ConfigSource, error) {
return nil, nil, errors.New("something went wrong")
}
ctx := context.Background()
_, _, err := GetPipelineData(ctx, pr, getPipeline)
Expand All @@ -223,8 +245,8 @@ func TestGetPipelineData_ResolvedNilPipeline(t *testing.T) {
},
},
}
getPipeline := func(ctx context.Context, n string) (v1beta1.PipelineObject, error) {
return nil, nil
getPipeline := func(ctx context.Context, n string) (v1beta1.PipelineObject, *v1beta1.ConfigSource, error) {
return nil, nil, nil
}
ctx := context.Background()
_, _, err := GetPipelineData(ctx, pr, getPipeline)
Expand Down
38 changes: 24 additions & 14 deletions pkg/reconciler/pipelinerun/resources/pipelineref.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,36 +42,42 @@ func GetPipelineFunc(ctx context.Context, k8s kubernetes.Interface, tekton clien
cfg := config.FromContextOrDefaults(ctx)
pr := pipelineRun.Spec.PipelineRef
namespace := pipelineRun.Namespace
// if the spec is already in the status, do not try to fetch it again, just use it as source of truth
// if the spec is already in the status, do not try to fetch it again, just use it as source of truth.
// Same for the Source field in the Status.Provenance.
if pipelineRun.Status.PipelineSpec != nil {
return func(_ context.Context, name string) (v1beta1.PipelineObject, error) {
return func(_ context.Context, name string) (v1beta1.PipelineObject, *v1beta1.ConfigSource, error) {
var configSource *v1beta1.ConfigSource
if pipelineRun.Status.Provenance != nil {
configSource = pipelineRun.Status.Provenance.ConfigSource
}
return &v1beta1.Pipeline{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
},
Spec: *pipelineRun.Status.PipelineSpec,
}, nil
}, configSource, nil
}, nil
}

switch {
case cfg.FeatureFlags.EnableTektonOCIBundles && pr != nil && pr.Bundle != "":
// Return an inline function that implements GetTask by calling Resolver.Get with the specified task type and
// casting it to a PipelineObject.
return func(ctx context.Context, name string) (v1beta1.PipelineObject, error) {
return func(ctx context.Context, name string) (v1beta1.PipelineObject, *v1beta1.ConfigSource, error) {
// If there is a bundle url at all, construct an OCI resolver to fetch the pipeline.
kc, err := k8schain.New(ctx, k8s, k8schain.Options{
Namespace: namespace,
ServiceAccountName: pipelineRun.Spec.ServiceAccountName,
})
if err != nil {
return nil, fmt.Errorf("failed to get keychain: %w", err)
return nil, nil, fmt.Errorf("failed to get keychain: %w", err)
}
resolver := oci.NewResolver(pr.Bundle, kc)
return resolvePipeline(ctx, resolver, name)
}, nil
case pr != nil && pr.Resolver != "" && requester != nil:
return func(ctx context.Context, name string) (v1beta1.PipelineObject, error) {
return func(ctx context.Context, name string) (v1beta1.PipelineObject, *v1beta1.ConfigSource, error) {
stringReplacements, arrayReplacements, objectReplacements := paramsFromPipelineRun(ctx, pipelineRun)
for k, v := range getContextReplacements("", pipelineRun) {
stringReplacements[k] = v
Expand All @@ -98,28 +104,32 @@ type LocalPipelineRefResolver struct {

// GetPipeline will resolve a Pipeline from the local cluster using a versioned Tekton client. It will
// return an error if it can't find an appropriate Pipeline for any reason.
func (l *LocalPipelineRefResolver) GetPipeline(ctx context.Context, name string) (v1beta1.PipelineObject, error) {
// TODO (@chuangw6): if we want to set source for in-cluster pipeline, set it here.
// https://github.com/tektoncd/pipeline/issues/5522
func (l *LocalPipelineRefResolver) GetPipeline(ctx context.Context, name string) (v1beta1.PipelineObject, *v1beta1.ConfigSource, error) {
// If we are going to resolve this reference locally, we need a namespace scope.
if l.Namespace == "" {
return nil, fmt.Errorf("Must specify namespace to resolve reference to pipeline %s", name)
return nil, nil, fmt.Errorf("Must specify namespace to resolve reference to pipeline %s", name)
}
return l.Tektonclient.TektonV1beta1().Pipelines(l.Namespace).Get(ctx, name, metav1.GetOptions{})
pipeline, err := l.Tektonclient.TektonV1beta1().Pipelines(l.Namespace).Get(ctx, name, metav1.GetOptions{})
return pipeline, nil, err
}

// resolvePipeline accepts an impl of remote.Resolver and attempts to
// fetch a pipeline with given name. An error is returned if the
// resolution doesn't work or the returned data isn't a valid
// v1beta1.PipelineObject.
func resolvePipeline(ctx context.Context, resolver remote.Resolver, name string) (v1beta1.PipelineObject, error) {
obj, err := resolver.Get(ctx, "pipeline", name)
func resolvePipeline(ctx context.Context, resolver remote.Resolver, name string) (v1beta1.PipelineObject, *v1beta1.ConfigSource, error) {
obj, source, err := resolver.Get(ctx, "pipeline", name)
if err != nil {
return nil, err
return nil, nil, err
}
pipelineObj, err := readRuntimeObjectAsPipeline(ctx, obj)
if err != nil {
return nil, fmt.Errorf("failed to convert obj %s into Pipeline", obj.GetObjectKind().GroupVersionKind().String())
return nil, nil, fmt.Errorf("failed to convert obj %s into Pipeline", obj.GetObjectKind().GroupVersionKind().String())
}
return pipelineObj, nil

return pipelineObj, source, nil
}

// readRuntimeObjectAsPipeline tries to convert a generic runtime.Object
Expand Down
Loading

0 comments on commit 70b071e

Please sign in to comment.