Skip to content

Commit

Permalink
[TEP-0135] Coschedule per (isolate) PipelineRun
Browse files Browse the repository at this point in the history
Part of [tektoncd#6740]. [TEP-0135][tep-0135] introduces a feature that allows a cluster operator to ensure
that all of a PipelineRun's pods are scheduled to the same node.

This commit consumes the functions added in [tektoncd#6819] to implement end to end support of `Coschedule:PipelineRuns` where all the `PipelineRun pods` are scheduled to the same node,
and the `Coschedule:isolate-pipelinerun` coschedule modes where only 1 PipelineRun is allowed to run in a node at the same time.

/kind feature

[tektoncd#6819]: tektoncd#6819
[tektoncd#6740]: tektoncd#6740
[tep-0135]: https://github.com/tektoncd/community/blob/main/teps/0135-coscheduling-pipelinerun-pods.md
  • Loading branch information
QuanZhang-William committed Jul 21, 2023
1 parent 84b8621 commit e0d1168
Show file tree
Hide file tree
Showing 7 changed files with 481 additions and 144 deletions.
68 changes: 40 additions & 28 deletions pkg/reconciler/pipelinerun/affinity_assistant.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ package pipelinerun
import (
"context"
"crypto/sha256"
"errors"
"fmt"

"github.com/tektoncd/pipeline/pkg/apis/config"
"github.com/tektoncd/pipeline/pkg/apis/pipeline"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/pod"
v1 "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1"
"github.com/tektoncd/pipeline/pkg/internal/affinityassistant"
aa "github.com/tektoncd/pipeline/pkg/internal/affinityassistant"
"github.com/tektoncd/pipeline/pkg/reconciler/volumeclaim"
"github.com/tektoncd/pipeline/pkg/workspace"
Expand All @@ -40,21 +42,25 @@ import (
)

const (
// ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSetPerWorkspace indicates that a PipelineRun uses workspaces with PersistentVolumeClaim
// ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSet indicates that a PipelineRun uses workspaces with PersistentVolumeClaim
// as a volume source and expect an Assistant StatefulSet in AffinityAssistantPerWorkspace behavior, but couldn't create a StatefulSet.
ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSetPerWorkspace = "ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSetPerWorkspace"
ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSet = "ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSet"

featureFlagDisableAffinityAssistantKey = "disable-affinity-assistant"
)

var (
ErrPvcCreationFailed = errors.New("PVC creation error")
ErrAffinityAssistantCreationFailed = errors.New("Affinity Assistant creation error")
)

// createOrUpdateAffinityAssistantsAndPVCs creates Affinity Assistant StatefulSets and PVCs based on AffinityAssistantBehavior.
// This is done to achieve Node Affinity for taskruns in a pipelinerun, and make it possible for the taskruns to execute parallel while sharing volume.
// If the AffinityAssistantBehavior is AffinityAssistantPerWorkspace, it creates an Affinity Assistant for
// every taskrun in the pipelinerun that use the same PVC based volume.
// If the AffinityAssistantBehavior is AffinityAssistantPerPipelineRun or AffinityAssistantPerPipelineRunWithIsolation,
// it creates one Affinity Assistant for the pipelinerun.
func (c *Reconciler) createOrUpdateAffinityAssistantsAndPVCs(ctx context.Context, pr *v1.PipelineRun, aaBehavior aa.AffinityAssistantBehavior) error {
var errs []error
var unschedulableNodes sets.Set[string] = nil

var claimTemplates []corev1.PersistentVolumeClaim
Expand Down Expand Up @@ -82,38 +88,39 @@ func (c *Reconciler) createOrUpdateAffinityAssistantsAndPVCs(ctx context.Context
// affinity assistant so that the OwnerReference of the PVCs are the pipelineruns, which is used to achieve PVC auto deletion at PipelineRun deletion time
if (aaBehavior == aa.AffinityAssistantPerWorkspace || aaBehavior == aa.AffinityAssistantDisabled) && pr.HasVolumeClaimTemplate() {
if err := c.pvcHandler.CreatePVCsForWorkspaces(ctx, pr.Spec.Workspaces, *kmeta.NewControllerRef(pr), pr.Namespace); err != nil {
return fmt.Errorf("failed to create PVC for PipelineRun %s: %w", pr.Name, err)
return fmt.Errorf("%w: %v", ErrPvcCreationFailed, err) //nolint:errorlint
}
}
switch aaBehavior {
case aa.AffinityAssistantPerWorkspace:
for claim, workspaceName := range claimToWorkspace {
aaName := GetAffinityAssistantName(workspaceName, pr.Name)
err := c.createOrUpdateAffinityAssistant(ctx, aaName, pr, nil, []corev1.PersistentVolumeClaimVolumeSource{*claim}, unschedulableNodes)
errs = append(errs, err...)
if err := c.createOrUpdateAffinityAssistant(ctx, aaName, pr, nil, []corev1.PersistentVolumeClaimVolumeSource{*claim}, unschedulableNodes); err != nil {
return fmt.Errorf("%w: %v", ErrAffinityAssistantCreationFailed, err)
}
}
for claimTemplate, workspaceName := range claimTemplatesToWorkspace {
aaName := GetAffinityAssistantName(workspaceName, pr.Name)
// To support PVC auto deletion at pipelinerun deletion time, the OwnerReference of the PVCs should be set to the owning pipelinerun.
// In AffinityAssistantPerWorkspace mode, the reconciler has created PVCs (owned by pipelinerun) from pipelinerun's VolumeClaimTemplate at this point,
// so the VolumeClaimTemplates are pass in as PVCs when creating affinity assistant StatefulSet for volume scheduling.
// If passed in as VolumeClaimTemplates, the PVCs are owned by Affinity Assistant StatefulSet instead of the pipelinerun.
err := c.createOrUpdateAffinityAssistant(ctx, aaName, pr, nil, []corev1.PersistentVolumeClaimVolumeSource{{ClaimName: claimTemplate.Name}}, unschedulableNodes)
errs = append(errs, err...)
if err := c.createOrUpdateAffinityAssistant(ctx, aaName, pr, nil, []corev1.PersistentVolumeClaimVolumeSource{{ClaimName: claimTemplate.Name}}, unschedulableNodes); err != nil {
return fmt.Errorf("%w: %v", ErrAffinityAssistantCreationFailed, err)
}
}
case aa.AffinityAssistantPerPipelineRun, aa.AffinityAssistantPerPipelineRunWithIsolation:
if claims != nil || claimTemplates != nil {
aaName := GetAffinityAssistantName("", pr.Name)
// In AffinityAssistantPerPipelineRun or AffinityAssistantPerPipelineRunWithIsolation modes, the PVCs are created via StatefulSet for volume scheduling.
// PVCs from pipelinerun's VolumeClaimTemplate are enforced to be deleted at pipelinerun completion time,
// so we don't need to worry the OwnerReference of the PVCs
err := c.createOrUpdateAffinityAssistant(ctx, aaName, pr, claimTemplates, claims, unschedulableNodes)
errs = append(errs, err...)
aaName := GetAffinityAssistantName("", pr.Name)
// In AffinityAssistantPerPipelineRun or AffinityAssistantPerPipelineRunWithIsolation modes, the PVCs are created via StatefulSet for volume scheduling.
// PVCs from pipelinerun's VolumeClaimTemplate are enforced to be deleted at pipelinerun completion time,
// so we don't need to worry the OwnerReference of the PVCs
if err := c.createOrUpdateAffinityAssistant(ctx, aaName, pr, claimTemplates, claims, unschedulableNodes); err != nil {
return fmt.Errorf("%w: %v", ErrAffinityAssistantCreationFailed, err)
}
case aa.AffinityAssistantDisabled:
}

return errorutils.NewAggregate(errs)
return nil
}

// createOrUpdateAffinityAssistant creates an Affinity Assistant Statefulset with the provided affinityAssistantName and pipelinerun information.
Expand Down Expand Up @@ -225,13 +232,29 @@ func (c *Reconciler) cleanupAffinityAssistantsAndPVCs(ctx context.Context, pr *v
// getPersistentVolumeClaimNameWithAffinityAssistant returns the PersistentVolumeClaim name that is
// created by the Affinity Assistant StatefulSet VolumeClaimTemplate when Affinity Assistant is enabled.
// The PVCs created by StatefulSet VolumeClaimTemplates follow the format `<pvcName>-<affinityAssistantName>-0`
// TODO(#6740)(WIP): use this function when adding end-to-end support for AffinityAssistantPerPipelineRun mode
func getPersistentVolumeClaimNameWithAffinityAssistant(pipelineWorkspaceName, prName string, wb v1.WorkspaceBinding, owner metav1.OwnerReference) string {
pvcName := volumeclaim.GetPVCNameWithoutAffinityAssistant(wb.VolumeClaimTemplate.Name, wb, owner)
affinityAssistantName := GetAffinityAssistantName(pipelineWorkspaceName, prName)
return fmt.Sprintf("%s-%s-0", pvcName, affinityAssistantName)
}

// getAffinityAssistantAnnotationVal generates and returns the value for `pipeline.tekton.dev/affinity-assistant` annotation
// based on aaBehavior, pipelinePVCWorkspaceName and prName
func getAffinityAssistantAnnotationVal(aaBehavior affinityassistant.AffinityAssistantBehavior, pipelinePVCWorkspaceName string, prName string) string {
switch aaBehavior {
case affinityassistant.AffinityAssistantPerWorkspace:
if pipelinePVCWorkspaceName != "" {
return GetAffinityAssistantName(pipelinePVCWorkspaceName, prName)
}
case affinityassistant.AffinityAssistantPerPipelineRun, affinityassistant.AffinityAssistantPerPipelineRunWithIsolation:
return GetAffinityAssistantName("", prName)

case affinityassistant.AffinityAssistantDisabled:
}

return ""
}

// GetAffinityAssistantName returns the Affinity Assistant name based on pipelineWorkspaceName and pipelineRunName
func GetAffinityAssistantName(pipelineWorkspaceName string, pipelineRunName string) string {
hashBytes := sha256.Sum256([]byte(pipelineWorkspaceName + pipelineRunName))
Expand Down Expand Up @@ -348,17 +371,6 @@ func affinityAssistantStatefulSet(aaBehavior aa.AffinityAssistantBehavior, name
}
}

// isAffinityAssistantDisabled returns a bool indicating whether an Affinity Assistant should
// be created for each PipelineRun that use workspaces with PersistentVolumeClaims
// as volume source. The default behaviour is to enable the Affinity Assistant to
// provide Node Affinity for TaskRuns that share a PVC workspace.
//
// TODO(#6740)(WIP): replace this function with GetAffinityAssistantBehavior
func (c *Reconciler) isAffinityAssistantDisabled(ctx context.Context) bool {
cfg := config.FromContextOrDefaults(ctx)
return cfg.FeatureFlags.DisableAffinityAssistant
}

// getAssistantAffinityMergedWithPodTemplateAffinity return the affinity that merged with PipelineRun PodTemplate affinity.
func getAssistantAffinityMergedWithPodTemplateAffinity(pr *v1.PipelineRun, aaBehavior aa.AffinityAssistantBehavior) *corev1.Affinity {
affinityAssistantsAffinity := &corev1.Affinity{}
Expand Down
182 changes: 131 additions & 51 deletions pkg/reconciler/pipelinerun/affinity_assistant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package pipelinerun
import (
"context"
"errors"
"fmt"
"testing"

"github.com/google/go-cmp/cmp"
Expand Down Expand Up @@ -159,14 +160,20 @@ func TestCreateAndDeleteOfAffinityAssistantPerPipelineRun(t *testing.T) {
}, {
name: "other Workspace type",
pr: testPRWithEmptyDir,
expectStatefulSetSpec: nil,
expectStatefulSetSpec: &appsv1.StatefulSetSpec{},
}}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ctx := context.Background()
configMap := map[string]string{
"disable-affinity-assistant": "true",
"coschedule": "pipelineruns",
}
kubeClientSet := fakek8s.NewSimpleClientset()
ctx := cfgtesting.SetFeatureFlags(context.Background(), t, configMap)
c := Reconciler{
KubeClientSet: fakek8s.NewSimpleClientset(),
KubeClientSet: kubeClientSet,
pvcHandler: volumeclaim.NewPVCHandler(kubeClientSet, zap.NewExample().Sugar()),
}

err := c.createOrUpdateAffinityAssistantsAndPVCs(ctx, tc.pr, aa.AffinityAssistantPerPipelineRun)
Expand All @@ -178,7 +185,15 @@ func TestCreateAndDeleteOfAffinityAssistantPerPipelineRun(t *testing.T) {
expectAAName := GetAffinityAssistantName("", tc.pr.Name)
validateStatefulSetSpec(t, ctx, c, expectAAName, tc.expectStatefulSetSpec)

// TODO(#6740)(WIP): test cleanupAffinityAssistantsAndPVCs for coscheduling-pipelinerun mode when fully implemented
// clean up Affinity Assistant
c.cleanupAffinityAssistantsAndPVCs(ctx, tc.pr)
if err != nil {
t.Errorf("unexpected error from cleanupAffinityAssistants: %v", err)
}
_, err = c.KubeClientSet.AppsV1().StatefulSets(tc.pr.Namespace).Get(ctx, expectAAName, metav1.GetOptions{})
if !apierrors.IsNotFound(err) {
t.Errorf("expected a NotFound response, got: %v", err)
}
})
}
}
Expand Down Expand Up @@ -314,6 +329,76 @@ func TestCreateAndDeleteOfAffinityAssistantPerWorkspaceOrDisabled(t *testing.T)
}
}

func TestCreateOrUpdateAffinityAssistantsAndPVCs_Failure(t *testing.T) {
testCases := []struct {
name, failureType string
aaBehavior aa.AffinityAssistantBehavior
expectedErr error
}{{
name: "affinity assistant creation failed - per workspace",
failureType: "statefulset",
aaBehavior: aa.AffinityAssistantPerWorkspace,
expectedErr: fmt.Errorf("%w: [failed to create StatefulSet affinity-assistant-4cf1a1c468: error creating statefulsets]", ErrAffinityAssistantCreationFailed),
}, {
name: "affinity assistant creation failed - per pipelinerun",
failureType: "statefulset",
aaBehavior: aa.AffinityAssistantPerPipelineRun,
expectedErr: fmt.Errorf("%w: [failed to create StatefulSet affinity-assistant-426b306c50: error creating statefulsets]", ErrAffinityAssistantCreationFailed),
}, {
name: "pvc creation failed - per workspace",
failureType: "pvc",
aaBehavior: aa.AffinityAssistantPerWorkspace,
expectedErr: fmt.Errorf("%w: failed to create PVC pvc-b9eea16dce: error creating persistentvolumeclaims", ErrPvcCreationFailed),
}, {
name: "pvc creation failed - disabled",
failureType: "pvc",
aaBehavior: aa.AffinityAssistantDisabled,
expectedErr: fmt.Errorf("%w: failed to create PVC pvc-b9eea16dce: error creating persistentvolumeclaims", ErrPvcCreationFailed),
}}

for _, tc := range testCases {
ctx := context.Background()
kubeClientSet := fakek8s.NewSimpleClientset()
c := Reconciler{
KubeClientSet: kubeClientSet,
pvcHandler: volumeclaim.NewPVCHandler(kubeClientSet, zap.NewExample().Sugar()),
}

switch tc.failureType {
case "pvc":
c.KubeClientSet.CoreV1().(*fake.FakeCoreV1).PrependReactor("create", "persistentvolumeclaims",
func(action testing2.Action) (handled bool, ret runtime.Object, err error) {
return true, &corev1.PersistentVolumeClaim{}, errors.New("error creating persistentvolumeclaims")
})
case "statefulset":
c.KubeClientSet.CoreV1().(*fake.FakeCoreV1).PrependReactor("create", "statefulsets",
func(action testing2.Action) (handled bool, ret runtime.Object, err error) {
return true, &appsv1.StatefulSet{}, errors.New("error creating statefulsets")
})
}

err := c.createOrUpdateAffinityAssistantsAndPVCs(ctx, testPRWithVolumeClaimTemplate, tc.aaBehavior)

if err == nil {
t.Errorf("expect error from createOrUpdateAffinityAssistantsAndPVCs but got nil")
}

switch tc.failureType {
case "pvc":
if !errors.Is(err, ErrPvcCreationFailed) {
t.Errorf("expected err type mismatching, expecting %v but got: %v", ErrPvcCreationFailed, err)
}
case "statefulset":
if !errors.Is(err, ErrAffinityAssistantCreationFailed) {
t.Errorf("expected err type mismatching, expecting %v but got: %v", ErrAffinityAssistantCreationFailed, err)
}
}
if d := cmp.Diff(tc.expectedErr.Error(), err.Error()); d != "" {
t.Errorf("expected err mismatching: %v", diff.PrintWantGot(d))
}
}
}

// TestCreateAffinityAssistantWhenNodeIsCordoned tests an existing Affinity Assistant can identify the node failure and
// can migrate the affinity assistant pod to a healthy node so that the existing pipelineRun runs to competition
func TestCreateOrUpdateAffinityAssistantWhenNodeIsCordoned(t *testing.T) {
Expand Down Expand Up @@ -809,53 +894,6 @@ func TestThatCleanupIsAvoidedIfAssistantIsDisabled(t *testing.T) {
}
}

func TestDisableAffinityAssistant(t *testing.T) {
for _, tc := range []struct {
description string
configMap *corev1.ConfigMap
expected bool
}{{
description: "Default behaviour: A missing disable-affinity-assistant flag should result in false",
configMap: &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{Name: config.GetFeatureFlagsConfigName(), Namespace: system.Namespace()},
Data: map[string]string{},
},
expected: false,
}, {
description: "Setting disable-affinity-assistant to false should result in false",
configMap: &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{Name: config.GetFeatureFlagsConfigName(), Namespace: system.Namespace()},
Data: map[string]string{
featureFlagDisableAffinityAssistantKey: "false",
},
},
expected: false,
}, {
description: "Setting disable-affinity-assistant to true should result in true",
configMap: &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{Name: config.GetFeatureFlagsConfigName(), Namespace: system.Namespace()},
Data: map[string]string{
featureFlagDisableAffinityAssistantKey: "true",
},
},
expected: true,
}} {
t.Run(tc.description, func(t *testing.T) {
c := Reconciler{
KubeClientSet: fakek8s.NewSimpleClientset(
tc.configMap,
),
Images: pipeline.Images{},
}
store := config.NewStore(logtesting.TestLogger(t))
store.OnConfigChanged(tc.configMap)
if result := c.isAffinityAssistantDisabled(store.ToContext(context.Background())); result != tc.expected {
t.Errorf("Expected %t Received %t", tc.expected, result)
}
})
}
}

func TestGetAssistantAffinityMergedWithPodTemplateAffinity(t *testing.T) {
labelSelector := &metav1.LabelSelector{
MatchLabels: map[string]string{
Expand Down Expand Up @@ -1030,6 +1068,48 @@ spec:
}
}

func TestGetAffinityAssistantAnnotationVal(t *testing.T) {
tcs := []struct {
name string
aaBehavior aa.AffinityAssistantBehavior
wsName, prName, expectAffinityAssistantAnnotationVal string
}{{
name: "per workspace",
aaBehavior: aa.AffinityAssistantPerWorkspace,
wsName: "my-ws",
prName: "my-pipelinerun",
expectAffinityAssistantAnnotationVal: "affinity-assistant-315f58d30d",
}, {
name: "per workspace - empty pipeline workspace name",
aaBehavior: aa.AffinityAssistantPerWorkspace,
prName: "my-pipelinerun",
}, {
name: "per pipelinerun",
aaBehavior: aa.AffinityAssistantPerPipelineRun,
wsName: "my-ws",
prName: "my-pipelinerun",
expectAffinityAssistantAnnotationVal: "affinity-assistant-0b79942a50",
}, {
name: "isolate pipelinerun",
aaBehavior: aa.AffinityAssistantPerPipelineRunWithIsolation,
wsName: "my-ws",
prName: "my-pipelinerun",
expectAffinityAssistantAnnotationVal: "affinity-assistant-0b79942a50",
}, {
name: "disabled",
aaBehavior: aa.AffinityAssistantDisabled,
wsName: "my-ws",
prName: "my-pipelinerun",
}}

for _, tc := range tcs {
aaAnnotationVal := getAffinityAssistantAnnotationVal(tc.aaBehavior, tc.wsName, tc.prName)
if diff := cmp.Diff(tc.expectAffinityAssistantAnnotationVal, aaAnnotationVal); diff != "" {
t.Errorf("Affinity Assistant Annotation Val mismatch: %v", diff)
}
}
}

type Data struct {
StatefulSets []*appsv1.StatefulSet
Nodes []*corev1.Node
Expand Down
Loading

0 comments on commit e0d1168

Please sign in to comment.