Skip to content

Commit

Permalink
Propagate labels from Pipeline/Task to PipelineRun/TaskRun
Browse files Browse the repository at this point in the history
With this change, labels are propagated from Pipeline and Task to
PipelineRun and TaskRun, respectively, giving us full label propagation
from Pipeline to PipelineRun to TaskRun to Pod and Task to TaskRun to
Pod.

This commit also adds a label whose key is pipeline.knative.dev/task
to all TaskRuns that refer to a Task with a TaskRef (the label is not
added to TaskRuns using an embedded TaskSpec) that contains the name of
the Task.

In addition, this commit introduces a builder for Pod values to increase
the readability of the taskrun reconciliation tests.

Fixes tektoncd#501
  • Loading branch information
dwnusbaum committed Feb 18, 2019
1 parent 4fd0f55 commit 755862b
Show file tree
Hide file tree
Showing 13 changed files with 759 additions and 1,062 deletions.
1 change: 1 addition & 0 deletions pkg/apis/pipeline/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package pipeline
// GroupName is the Kubernetes resource group name for Pipeline types.
const (
GroupName = "pipeline.knative.dev"
TaskLabelKey = "/task"
TaskRunLabelKey = "/taskRun"
PipelineLabelKey = "/pipeline"
PipelineRunLabelKey = "/pipelineRun"
Expand Down
30 changes: 21 additions & 9 deletions pkg/reconciler/v1alpha1/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,15 +171,16 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error {
return err
}

// Reconcile this copy of the task run and then write back any status
// Reconcile this copy of the task run and then write back any status or label
// updates regardless of whether the reconciliation errored out.
err = c.reconcile(ctx, pr)
if equality.Semantic.DeepEqual(original.Status, pr.Status) {
// If we didn't change anything then don't call updateStatus.
if equality.Semantic.DeepEqual(original.Status, pr.Status) &&
reflect.DeepEqual(original.ObjectMeta.Labels, pr.ObjectMeta.Labels) {
// If we didn't change anything then don't call updateStatusAndLabels.
// This is important because the copy we loaded from the informer's
// cache may be stale and we don't want to overwrite a prior update
// to status with this stale state.
} else if _, err := c.updateStatus(pr); err != nil {
} else if _, err := c.updateStatusAndLabels(pr); err != nil {
c.Logger.Warn("Failed to update PipelineRun status", zap.Error(err))
c.Recorder.Event(pr, corev1.EventTypeWarning, eventReasonFailed, "PipelineRun failed to update")
return err
Expand Down Expand Up @@ -225,6 +226,15 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1alpha1.PipelineRun) er
// Apply parameter templating from the PipelineRun
p = resources.ApplyParameters(p, pr)

// Propagate labels from Pipeline to PipelineRun.
if pr.ObjectMeta.Labels == nil {
pr.ObjectMeta.Labels = make(map[string]string, len(p.ObjectMeta.Labels)+1)
}
for key, value := range p.ObjectMeta.Labels {
pr.ObjectMeta.Labels[key] = value
}
pr.ObjectMeta.Labels[pipeline.GroupName+pipeline.PipelineLabelKey] = p.Name

pipelineState, err := resources.ResolvePipelineRun(
*pr,
func(name string) (v1alpha1.TaskInterface, error) {
Expand Down Expand Up @@ -360,11 +370,11 @@ func (c *Reconciler) createTaskRun(logger *zap.SugaredLogger, rprt *resources.Re
taskRunTimeout = nil
}

labels := make(map[string]string, len(pr.ObjectMeta.Labels)+2)
// Propagate labels from PipelineRun to TaskRun.
labels := make(map[string]string, len(pr.ObjectMeta.Labels)+1)
for key, val := range pr.ObjectMeta.Labels {
labels[key] = val
}
labels[pipeline.GroupName+pipeline.PipelineLabelKey] = pr.Spec.PipelineRef.Name
labels[pipeline.GroupName+pipeline.PipelineRunLabelKey] = pr.Name

tr := &v1alpha1.TaskRun{
Expand Down Expand Up @@ -392,14 +402,16 @@ func (c *Reconciler) createTaskRun(logger *zap.SugaredLogger, rprt *resources.Re
return c.PipelineClientSet.PipelineV1alpha1().TaskRuns(pr.Namespace).Create(tr)
}

func (c *Reconciler) updateStatus(pr *v1alpha1.PipelineRun) (*v1alpha1.PipelineRun, error) {
func (c *Reconciler) updateStatusAndLabels(pr *v1alpha1.PipelineRun) (*v1alpha1.PipelineRun, error) {
newPr, err := c.pipelineRunLister.PipelineRuns(pr.Namespace).Get(pr.Name)
if err != nil {
return nil, fmt.Errorf("Error getting PipelineRun %s when updating status: %s", pr.Name, err)
}
if !reflect.DeepEqual(pr.Status, newPr.Status) {
if !reflect.DeepEqual(pr.Status, newPr.Status) ||
!reflect.DeepEqual(pr.ObjectMeta.Labels, newPr.ObjectMeta.Labels) {
newPr.Status = pr.Status
return c.PipelineClientSet.PipelineV1alpha1().PipelineRuns(pr.Namespace).UpdateStatus(newPr)
newPr.ObjectMeta.Labels = pr.ObjectMeta.Labels
return c.PipelineClientSet.PipelineV1alpha1().PipelineRuns(pr.Namespace).Update(newPr)
}
return newPr, nil
}
Expand Down
28 changes: 15 additions & 13 deletions pkg/reconciler/v1alpha1/taskrun/resources/taskspec.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"

"github.com/knative/build-pipeline/pkg/apis/pipeline/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// GetTask is a function used to retrieve Tasks.
Expand All @@ -28,24 +29,25 @@ type GetTask func(string) (v1alpha1.TaskInterface, error)
// GetClusterTask is a function that will retrieve the Task from name and namespace.
type GetClusterTask func(name string) (v1alpha1.TaskInterface, error)

// GetTaskSpec will retrieve the Task Spec associated with the provieded TaskRun. This can come from a
// reference Task or from an embedded Task spec.
func GetTaskSpec(taskRunSpec *v1alpha1.TaskRunSpec, taskRunName string, getTask GetTask) (*v1alpha1.TaskSpec, string, error) {
// GetTaskData will retrieve the Task metadata and Spec associated with the
// provided TaskRun. This can come from a reference Task or from the TaskRun's
// metadata and embedded TaskSpec.
func GetTaskData(taskRun *v1alpha1.TaskRun, getTask GetTask) (*metav1.ObjectMeta, *v1alpha1.TaskSpec, error) {
taskMeta := metav1.ObjectMeta{}
taskSpec := v1alpha1.TaskSpec{}
taskName := ""
if taskRunSpec.TaskRef != nil && taskRunSpec.TaskRef.Name != "" {
if taskRun.Spec.TaskRef != nil && taskRun.Spec.TaskRef.Name != "" {
// Get related task for taskrun
t, err := getTask(taskRunSpec.TaskRef.Name)
t, err := getTask(taskRun.Spec.TaskRef.Name)
if err != nil {
return nil, taskName, fmt.Errorf("error when listing tasks for taskRun %s %v", taskRunName, err)
return nil, nil, fmt.Errorf("error when listing tasks for taskRun %s %v", taskRun.Name, err)
}
taskMeta = t.TaskMetadata()
taskSpec = t.TaskSpec()
taskName = t.TaskMetadata().Name
} else if taskRunSpec.TaskSpec != nil {
taskSpec = *taskRunSpec.TaskSpec
taskName = taskRunName
} else if taskRun.Spec.TaskSpec != nil {
taskMeta = taskRun.ObjectMeta
taskSpec = *taskRun.Spec.TaskSpec
} else {
return &taskSpec, taskName, fmt.Errorf("TaskRun %s not providing TaskRef or TaskSpec", taskRunName)
return nil, nil, fmt.Errorf("TaskRun %s not providing TaskRef or TaskSpec", taskRun.Name)
}
return &taskSpec, taskName, nil
return &taskMeta, &taskSpec, nil
}
62 changes: 41 additions & 21 deletions pkg/reconciler/v1alpha1/taskrun/resources/taskspec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,25 @@ func TestGetTaskSpec_Ref(t *testing.T) {
}},
},
}
spec := &v1alpha1.TaskRunSpec{
TaskRef: &v1alpha1.TaskRef{
Name: "orchestrate",
tr := &v1alpha1.TaskRun{
ObjectMeta: metav1.ObjectMeta{
Name: "mytaskrun",
},
Spec: v1alpha1.TaskRunSpec{
TaskRef: &v1alpha1.TaskRef{
Name: "orchestrate",
},
},
}
gt := func(n string) (v1alpha1.TaskInterface, error) { return task, nil }
taskSpec, name, err := GetTaskSpec(spec, "mytaskrun", gt)
taskMeta, taskSpec, err := GetTaskData(tr, gt)

if err != nil {
t.Fatalf("Did not expect error getting task spec but got: %s", err)
}

if name != "orchestrate" {
t.Errorf("Expected task name to be `orchestrate` but was %q", name)
if taskMeta.Name != "orchestrate" {
t.Errorf("Expected task name to be `orchestrate` but was %q", taskMeta.Name)
}

if len(taskSpec.Steps) != 1 || taskSpec.Steps[0].Name != "step1" {
Expand All @@ -58,21 +63,27 @@ func TestGetTaskSpec_Ref(t *testing.T) {
}

func TestGetTaskSpec_Embedded(t *testing.T) {
spec := &v1alpha1.TaskRunSpec{
TaskSpec: &v1alpha1.TaskSpec{
Steps: []corev1.Container{{
Name: "step1",
}},
}}
tr := &v1alpha1.TaskRun{
ObjectMeta: metav1.ObjectMeta{
Name: "mytaskrun",
},
Spec: v1alpha1.TaskRunSpec{
TaskSpec: &v1alpha1.TaskSpec{
Steps: []corev1.Container{{
Name: "step1",
}},
},
},
}
gt := func(n string) (v1alpha1.TaskInterface, error) { return nil, fmt.Errorf("shouldn't be called") }
taskSpec, name, err := GetTaskSpec(spec, "mytaskrun", gt)
taskMeta, taskSpec, err := GetTaskData(tr, gt)

if err != nil {
t.Fatalf("Did not expect error getting task spec but got: %s", err)
}

if name != "mytaskrun" {
t.Errorf("Expected task name for embedded task to default to name of task run but was %q", name)
if taskMeta.Name != "mytaskrun" {
t.Errorf("Expected task name for embedded task to default to name of task run but was %q", taskMeta.Name)
}

if len(taskSpec.Steps) != 1 || taskSpec.Steps[0].Name != "step1" {
Expand All @@ -81,22 +92,31 @@ func TestGetTaskSpec_Embedded(t *testing.T) {
}

func TestGetTaskSpec_Invalid(t *testing.T) {
spec := &v1alpha1.TaskRunSpec{}
tr := &v1alpha1.TaskRun{
ObjectMeta: metav1.ObjectMeta{
Name: "mytaskrun",
},
}
gt := func(n string) (v1alpha1.TaskInterface, error) { return nil, fmt.Errorf("shouldn't be called") }
_, _, err := GetTaskSpec(spec, "mytaskrun", gt)
_, _, err := GetTaskData(tr, gt)
if err == nil {
t.Fatalf("Expected error resolving spec with no embedded or referenced task spec but didn't get error")
}
}

func TestGetTaskSpec_Error(t *testing.T) {
spec := &v1alpha1.TaskRunSpec{
TaskRef: &v1alpha1.TaskRef{
Name: "orchestrate",
tr := &v1alpha1.TaskRun{
ObjectMeta: metav1.ObjectMeta{
Name: "mytaskrun",
},
Spec: v1alpha1.TaskRunSpec{
TaskRef: &v1alpha1.TaskRef{
Name: "orchestrate",
},
},
}
gt := func(n string) (v1alpha1.TaskInterface, error) { return nil, fmt.Errorf("something went wrong") }
_, _, err := GetTaskSpec(spec, "mytaskrun", gt)
_, _, err := GetTaskData(tr, gt)
if err == nil {
t.Fatalf("Expected error when unable to find referenced Task but got none")
}
Expand Down
36 changes: 25 additions & 11 deletions pkg/reconciler/v1alpha1/taskrun/taskrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,12 +175,13 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error {
c.Logger.Errorf("Reconcile error: %v", err.Error())
return err
}
if equality.Semantic.DeepEqual(original.Status, tr.Status) {
// If we didn't change anything then don't call updateStatus.
if equality.Semantic.DeepEqual(original.Status, tr.Status) &&
reflect.DeepEqual(original.ObjectMeta.Labels, tr.ObjectMeta.Labels) {
// If we didn't change anything then don't call updateStatusAndLabels.
// This is important because the copy we loaded from the informer's
// cache may be stale and we don't want to overwrite a prior update
// to status with this stale state.
} else if _, err := c.updateStatus(tr); err != nil {
} else if _, err := c.updateStatusAndLabels(tr); err != nil {
c.Logger.Warn("Failed to update taskRun status", zap.Error(err))
return err
}
Expand Down Expand Up @@ -220,7 +221,7 @@ func (c *Reconciler) reconcile(ctx context.Context, tr *v1alpha1.TaskRun) error
}

getTaskFunc := c.getTaskFunc(tr)
spec, taskName, err := resources.GetTaskSpec(&tr.Spec, tr.Name, getTaskFunc)
taskMeta, taskSpec, err := resources.GetTaskData(tr, getTaskFunc)
if err != nil {
c.Logger.Errorf("Failed to determine Task spec to use for taskrun %s: %v", tr.Name, err)
tr.Status.SetCondition(&duckv1alpha1.Condition{
Expand All @@ -232,15 +233,26 @@ func (c *Reconciler) reconcile(ctx context.Context, tr *v1alpha1.TaskRun) error
return nil
}

// Propagate labels from Task to TaskRun.
if tr.ObjectMeta.Labels == nil {
tr.ObjectMeta.Labels = make(map[string]string, len(taskMeta.Labels)+1)
}
for key, value := range taskMeta.Labels {
tr.ObjectMeta.Labels[key] = value
}
if tr.Spec.TaskRef != nil {
tr.ObjectMeta.Labels[pipeline.GroupName+pipeline.TaskLabelKey] = taskMeta.Name
}

// Check if the TaskRun has timed out; if it is, this will set its status
// accordingly.
if timedOut, err := c.checkTimeout(tr, spec, c.KubeClientSet.CoreV1().Pods(tr.Namespace).Delete); err != nil {
if timedOut, err := c.checkTimeout(tr, taskSpec, c.KubeClientSet.CoreV1().Pods(tr.Namespace).Delete); err != nil {
return err
} else if timedOut {
return nil
}

rtr, err := resources.ResolveTaskResources(spec, taskName, tr.Spec.Inputs.Resources, tr.Spec.Outputs.Resources, c.resourceLister.PipelineResources(tr.Namespace).Get)
rtr, err := resources.ResolveTaskResources(taskSpec, taskMeta.Name, tr.Spec.Inputs.Resources, tr.Spec.Outputs.Resources, c.resourceLister.PipelineResources(tr.Namespace).Get)
if err != nil {
c.Logger.Errorf("Failed to resolve references for taskrun %s: %v", tr.Name, err)
tr.Status.SetCondition(&duckv1alpha1.Condition{
Expand Down Expand Up @@ -341,14 +353,16 @@ func updateStatusFromBuildStatus(taskRun *v1alpha1.TaskRun, buildStatus buildv1a
}
}

func (c *Reconciler) updateStatus(taskrun *v1alpha1.TaskRun) (*v1alpha1.TaskRun, error) {
func (c *Reconciler) updateStatusAndLabels(taskrun *v1alpha1.TaskRun) (*v1alpha1.TaskRun, error) {
newtaskrun, err := c.taskRunLister.TaskRuns(taskrun.Namespace).Get(taskrun.Name)
if err != nil {
return nil, err
return nil, fmt.Errorf("Error getting TaskRun %s when updating status: %s", taskrun.Name, err)
}
if !reflect.DeepEqual(taskrun.Status, newtaskrun.Status) {
if !reflect.DeepEqual(taskrun.Status, newtaskrun.Status) ||
!reflect.DeepEqual(taskrun.ObjectMeta.Labels, newtaskrun.ObjectMeta.Labels) {
newtaskrun.Status = taskrun.Status
return c.PipelineClientSet.PipelineV1alpha1().TaskRuns(taskrun.Namespace).UpdateStatus(newtaskrun)
newtaskrun.ObjectMeta.Labels = newtaskrun.ObjectMeta.Labels
return c.PipelineClientSet.PipelineV1alpha1().TaskRuns(taskrun.Namespace).Update(newtaskrun)
}
return newtaskrun, nil
}
Expand Down Expand Up @@ -477,7 +491,7 @@ func createRedirectedBuild(ctx context.Context, bs *buildv1alpha1.BuildSpec, tr
return b, nil
}

// makeLabels constructs the labels we will apply to TaskRun resources.
// makeLabels constructs the labels we will propagate from TaskRuns to Pods.
func makeLabels(s *v1alpha1.TaskRun) map[string]string {
labels := make(map[string]string, len(s.ObjectMeta.Labels)+1)
for k, v := range s.ObjectMeta.Labels {
Expand Down
Loading

0 comments on commit 755862b

Please sign in to comment.