Skip to content

Commit

Permalink
TEP-0135: implement per-pipelinerun coscheduling
Browse files Browse the repository at this point in the history
Part of [tektoncd#6740][tektoncd#6740]. [TEP-0135][tep-0135] introduces a feature that allows a cluster operator
to ensure that all of a PipelineRun's pods are scheduled to the same node.

This commit implements the `pipelineruns` scheduling mode, where all the `pods` of a `PipelineRun` are scheduled to the same node.
This commit renames the current `createOrUpdateAffinityAssistants` function to `createOrUpdateAffinityAssistantsPerWorkspace`, and adds a new
function `createOrUpdateAffinityAssistantsPerPipelineRun` for the `pipelineruns` scheduling mode (with some refactoring).

There is no functionality change of the existing `createOrUpdateAffinityAssistants` function.
The `createOrUpdateAffinityAssistantsPerPipelineRun` function is implemented, but not used.
The usage of the `createOrUpdateAffinityAssistantsPerPipelineRun` function will be added in the followup PRs.

/kind feature

[tektoncd#6740]: tektoncd#6740
[tep-0135]: https://github.com/tektoncd/community/blob/main/teps/0135-coscheduling-pipelinerun-pods.md
  • Loading branch information
QuanZhang-William authored and tekton-robot committed Jun 29, 2023
1 parent 6ca7f3b commit d2cb90d
Show file tree
Hide file tree
Showing 3 changed files with 291 additions and 150 deletions.
159 changes: 100 additions & 59 deletions pkg/reconciler/pipelinerun/affinity_assistant.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/tektoncd/pipeline/pkg/apis/pipeline"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/pod"
v1 "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1"
aa "github.com/tektoncd/pipeline/pkg/internal/affinityassistant"
"github.com/tektoncd/pipeline/pkg/reconciler/volumeclaim"
"github.com/tektoncd/pipeline/pkg/workspace"
appsv1 "k8s.io/api/apps/v1"
Expand All @@ -39,92 +40,132 @@ import (
)

const (
// ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSet indicates that a PipelineRun uses workspaces with PersistentVolumeClaim
// ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSetPerWorkspace indicates that a PipelineRun uses workspaces with PersistentVolumeClaim
// as a volume source and expect an Assistant StatefulSet, but couldn't create a StatefulSet.
ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSet = "CouldntCreateOrUpdateAffinityAssistantstatefulSet"
ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSetPerWorkspace = "ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSetPerWorkspace"

featureFlagDisableAffinityAssistantKey = "disable-affinity-assistant"
)

// createOrUpdateAffinityAssistants creates an Affinity Assistant StatefulSet for every workspace in the PipelineRun that
// use a PersistentVolumeClaim volume. This is done to achieve Node Affinity for all TaskRuns that
// share the workspace volume and make it possible for the tasks to execute parallel while sharing volume.
func (c *Reconciler) createOrUpdateAffinityAssistants(ctx context.Context, wb []v1.WorkspaceBinding, pr *v1.PipelineRun, namespace string) error {
logger := logging.FromContext(ctx)
cfg := config.FromContextOrDefaults(ctx)

// createOrUpdateAffinityAssistantsPerAABehavior creates Affinity Assistant StatefulSets based on AffinityAssistantBehavior.
// This is done to achieve Node Affinity for taskruns in a pipelinerun, and make it possible for the taskruns to execute parallel while sharing volume.
// If the AffinityAssitantBehavior is AffinityAssistantPerWorkspace, it creates an Affinity Assistant for
// every taskrun in the pipelinerun that use the same PVC based volume.
// If the AffinityAssitantBehavior is AffinityAssistantPerPipelineRun or AffinityAssistantPerPipelineRunWithIsolation,
// it creates one Affinity Assistant for the pipelinerun.
// Other AffinityAssitantBehaviors are invalid.
func (c *Reconciler) createOrUpdateAffinityAssistantsPerAABehavior(ctx context.Context, pr *v1.PipelineRun, aaBehavior aa.AffinityAssitantBehavior) error {
var errs []error
var unschedulableNodes sets.Set[string] = nil
for _, w := range wb {

var claimTemplates []corev1.PersistentVolumeClaim
var claims []corev1.PersistentVolumeClaimVolumeSource
claimToWorkspace := map[*corev1.PersistentVolumeClaimVolumeSource]string{}
claimTemplatesToWorkspace := map[*corev1.PersistentVolumeClaim]string{}

for _, w := range pr.Spec.Workspaces {
if w.PersistentVolumeClaim == nil && w.VolumeClaimTemplate == nil {
continue
}

var claimTemplates []corev1.PersistentVolumeClaim
var claims []corev1.PersistentVolumeClaimVolumeSource
if w.PersistentVolumeClaim != nil {
claims = append(claims, *w.PersistentVolumeClaim.DeepCopy())
claim := w.PersistentVolumeClaim.DeepCopy()
claims = append(claims, *claim)
claimToWorkspace[claim] = w.Name
} else if w.VolumeClaimTemplate != nil {
claimTemplate := w.VolumeClaimTemplate.DeepCopy()
// PVCs Will be created by Affinity Assistant StatefulSet and will follow the naming format `<claimTemplate.Name>-<AffinityAssistantName>-0`
claimTemplate.Name = volumeclaim.GetPVCNameWithoutAffinityAssistant(w.VolumeClaimTemplate.Name, w, *kmeta.NewControllerRef(pr))
claimTemplates = append(claimTemplates, *claimTemplate)
claimTemplatesToWorkspace[claimTemplate] = w.Name
}
}

switch aaBehavior {
case aa.AffinityAssistantPerWorkspace:
for claim, workspaceName := range claimToWorkspace {
aaName := getAffinityAssistantName(workspaceName, pr.Name)
err := c.createOrUpdateAffinityAssistant(ctx, aaName, pr, nil, []corev1.PersistentVolumeClaimVolumeSource{*claim}, unschedulableNodes)
errs = append(errs, err...)
}
for claimTemplate, workspaceName := range claimTemplatesToWorkspace {
aaName := getAffinityAssistantName(workspaceName, pr.Name)
err := c.createOrUpdateAffinityAssistant(ctx, aaName, pr, []corev1.PersistentVolumeClaim{*claimTemplate}, nil, unschedulableNodes)
errs = append(errs, err...)
}
case aa.AffinityAssistantPerPipelineRun, aa.AffinityAssistantPerPipelineRunWithIsolation:
if claims != nil || claimTemplates != nil {
aaName := getAffinityAssistantName("", pr.Name)
err := c.createOrUpdateAffinityAssistant(ctx, aaName, pr, claimTemplates, claims, unschedulableNodes)
errs = append(errs, err...)
}
case aa.AffinityAssistantDisabled:
return fmt.Errorf("unexpected Affinity Assistant behavior %v", aa.AffinityAssistantDisabled)
}

return errorutils.NewAggregate(errs)
}

// createOrUpdateAffinityAssistant creates an Affinity Assistant Statefulset with the provided affinityAssistantName and pipelinerun information.
// The VolumeClaimTemplates and Volumes of StatefulSet reference the resolved claimTemplates and claims respectively.
// It maintains a set of unschedulableNodes to detect and recreate Affinity Assistant in case of the node is cordoned to avoid pipelinerun deadlock.
func (c *Reconciler) createOrUpdateAffinityAssistant(ctx context.Context, affinityAssistantName string, pr *v1.PipelineRun, claimTemplates []corev1.PersistentVolumeClaim, claims []corev1.PersistentVolumeClaimVolumeSource, unschedulableNodes sets.Set[string]) []error {
logger := logging.FromContext(ctx)
cfg := config.FromContextOrDefaults(ctx)

affinityAssistantName := getAffinityAssistantName(w.Name, pr.Name)
a, err := c.KubeClientSet.AppsV1().StatefulSets(namespace).Get(ctx, affinityAssistantName, metav1.GetOptions{})
switch {
// check whether the affinity assistant (StatefulSet) exists or not, create one if it does not exist
case apierrors.IsNotFound(err):
affinityAssistantStatefulSet := affinityAssistantStatefulSet(affinityAssistantName, pr, claimTemplates, claims, c.Images.NopImage, cfg.Defaults.DefaultAAPodTemplate)
_, err := c.KubeClientSet.AppsV1().StatefulSets(namespace).Create(ctx, affinityAssistantStatefulSet, metav1.CreateOptions{})
var errs []error
a, err := c.KubeClientSet.AppsV1().StatefulSets(pr.Namespace).Get(ctx, affinityAssistantName, metav1.GetOptions{})
switch {
// check whether the affinity assistant (StatefulSet) exists or not, create one if it does not exist
case apierrors.IsNotFound(err):
affinityAssistantStatefulSet := affinityAssistantStatefulSet(affinityAssistantName, pr, claimTemplates, claims, c.Images.NopImage, cfg.Defaults.DefaultAAPodTemplate)
_, err := c.KubeClientSet.AppsV1().StatefulSets(pr.Namespace).Create(ctx, affinityAssistantStatefulSet, metav1.CreateOptions{})
if err != nil {
errs = append(errs, fmt.Errorf("failed to create StatefulSet %s: %w", affinityAssistantName, err))
}
if err == nil {
logger.Infof("Created StatefulSet %s in namespace %s", affinityAssistantName, pr.Namespace)
}
// check whether the affinity assistant (StatefulSet) exists and the affinity assistant pod is created
// this check requires the StatefulSet to have the readyReplicas set to 1 to allow for any delay between the StatefulSet creation
// and the necessary pod creation, the delay can be caused by any dependency on PVCs and PVs creation
// this case addresses issues specified in https://github.com/tektoncd/pipeline/issues/6586
case err == nil && a != nil && a.Status.ReadyReplicas == 1:
if unschedulableNodes == nil {
ns, err := c.KubeClientSet.CoreV1().Nodes().List(ctx, metav1.ListOptions{
FieldSelector: "spec.unschedulable=true",
})
if err != nil {
errs = append(errs, fmt.Errorf("failed to create StatefulSet %s: %w", affinityAssistantName, err))
errs = append(errs, fmt.Errorf("could not get the list of nodes, err: %w", err))
}
if err == nil {
logger.Infof("Created StatefulSet %s in namespace %s", affinityAssistantName, namespace)
unschedulableNodes = sets.Set[string]{}
// maintain the list of nodes which are unschedulable
for _, n := range ns.Items {
unschedulableNodes.Insert(n.Name)
}
// check whether the affinity assistant (StatefulSet) exists and the affinity assistant pod is created
// this check requires the StatefulSet to have the readyReplicas set to 1 to allow for any delay between the StatefulSet creation
// and the necessary pod creation, the delay can be caused by any dependency on PVCs and PVs creation
// this case addresses issues specified in https://github.com/tektoncd/pipeline/issues/6586
case err == nil && a != nil && a.Status.ReadyReplicas == 1:
if unschedulableNodes == nil {
ns, err := c.KubeClientSet.CoreV1().Nodes().List(ctx, metav1.ListOptions{
FieldSelector: "spec.unschedulable=true",
})
if err != nil {
errs = append(errs, fmt.Errorf("could not get the list of nodes, err: %w", err))
}
unschedulableNodes = sets.Set[string]{}
// maintain the list of nodes which are unschedulable
for _, n := range ns.Items {
unschedulableNodes.Insert(n.Name)
}
}
if unschedulableNodes.Len() > 0 {
// get the pod created for a given StatefulSet, pod is assigned ordinal of 0 with the replicas set to 1
p, err := c.KubeClientSet.CoreV1().Pods(pr.Namespace).Get(ctx, a.Name+"-0", metav1.GetOptions{})
// ignore instead of failing if the affinity assistant pod was not found
if err != nil && !apierrors.IsNotFound(err) {
errs = append(errs, fmt.Errorf("could not get the affinity assistant pod for StatefulSet %s: %w", a.Name, err))
}
if unschedulableNodes.Len() > 0 {
// get the pod created for a given StatefulSet, pod is assigned ordinal of 0 with the replicas set to 1
p, err := c.KubeClientSet.CoreV1().Pods(pr.Namespace).Get(ctx, a.Name+"-0", metav1.GetOptions{})
// ignore instead of failing if the affinity assistant pod was not found
if err != nil && !apierrors.IsNotFound(err) {
errs = append(errs, fmt.Errorf("could not get the affinity assistant pod for StatefulSet %s: %w", a.Name, err))
}
// check the node which hosts the affinity assistant pod if it is unschedulable or cordoned
if p != nil && unschedulableNodes.Has(p.Spec.NodeName) {
// if the node is unschedulable, delete the affinity assistant pod such that a StatefulSet can recreate the same pod on a different node
err = c.KubeClientSet.CoreV1().Pods(p.Namespace).Delete(ctx, p.Name, metav1.DeleteOptions{})
if err != nil {
errs = append(errs, fmt.Errorf("error deleting affinity assistant pod %s in ns %s: %w", p.Name, p.Namespace, err))
}
// check the node which hosts the affinity assistant pod if it is unschedulable or cordoned
if p != nil && unschedulableNodes.Has(p.Spec.NodeName) {
// if the node is unschedulable, delete the affinity assistant pod such that a StatefulSet can recreate the same pod on a different node
err = c.KubeClientSet.CoreV1().Pods(p.Namespace).Delete(ctx, p.Name, metav1.DeleteOptions{})
if err != nil {
errs = append(errs, fmt.Errorf("error deleting affinity assistant pod %s in ns %s: %w", p.Name, p.Namespace, err))
}
}
case err != nil:
errs = append(errs, fmt.Errorf("failed to retrieve StatefulSet %s: %w", affinityAssistantName, err))
}
case err != nil:
errs = append(errs, fmt.Errorf("failed to retrieve StatefulSet %s: %w", affinityAssistantName, err))
}
return errorutils.NewAggregate(errs)

return errs
}

// TODO(#6740)(WIP) implement cleanupAffinityAssistants for AffinityAssistantPerPipelineRun and AffinityAssistantPerPipelineRunWithIsolation affinity assistant modes
func (c *Reconciler) cleanupAffinityAssistants(ctx context.Context, pr *v1.PipelineRun) error {
// omit cleanup if the feature is disabled
if c.isAffinityAssistantDisabled(ctx) {
Expand Down
Loading

0 comments on commit d2cb90d

Please sign in to comment.