Skip to content

Commit

Permalink
fix e2e again
Browse files Browse the repository at this point in the history
a targetFilter is added to WorkloadSpread to make it possible to manage only a part of Pods owned by a target workload. And it also provides support for workloads without replicas.

Signed-off-by: AiRanthem <zhongtianyun.zty@alibaba-inc.com>
  • Loading branch information
AiRanthem committed Nov 28, 2024
1 parent 4661b6e commit 02df344
Show file tree
Hide file tree
Showing 20 changed files with 1,190 additions and 269 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/e2e-1.18.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,8 @@ jobs:
- name: Install Kruise
run: |
set -ex
kubectl create ns kruise-system
kubectl apply -f test/kruise-e2e-config.yaml
kubectl cluster-info
IMG=openkruise/kruise-manager:e2e-${GITHUB_RUN_ID} ./scripts/deploy_kind.sh
NODES=$(kubectl get node | wc -l)
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/e2e-1.24.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,8 @@ jobs:
- name: Install Kruise
run: |
set -ex
kubectl create ns kruise-system
kubectl apply -f test/kruise-e2e-config.yaml
kubectl cluster-info
IMG=openkruise/kruise-manager:e2e-${GITHUB_RUN_ID} ./scripts/deploy_kind.sh
NODES=$(kubectl get node | wc -l)
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/e2e-1.26.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,8 @@ jobs:
- name: Install Kruise
run: |
set -ex
kubectl create ns kruise-system
kubectl apply -f test/kruise-e2e-config.yaml
kubectl cluster-info
IMG=openkruise/kruise-manager:e2e-${GITHUB_RUN_ID} ./scripts/deploy_kind.sh
NODES=$(kubectl get node | wc -l)
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/e2e-1.28.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,8 @@ jobs:
- name: Install Kruise
run: |
set -ex
kubectl create ns kruise-system
kubectl apply -f test/kruise-e2e-config.yaml
kubectl cluster-info
IMG=openkruise/kruise-manager:e2e-${GITHUB_RUN_ID} ./scripts/deploy_kind.sh
NODES=$(kubectl get node | wc -l)
Expand Down
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ kube-load-image: $(tools/kind)
# install-kruise install kruise with local build image to kube cluster.
.PHONY: install-kruise
install-kruise:
kubectl create ns kruise-system
kubectl apply -f test/kruise-e2e-config.yaml
tools/hack/install-kruise.sh $(IMG)

# run-kruise-e2e-test starts to run kruise e2e tests.
Expand Down
11 changes: 11 additions & 0 deletions apis/apps/v1alpha1/workloadspread_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ type WorkloadSpreadSpec struct {
// TargetReference is the target workload that WorkloadSpread want to control.
TargetReference *TargetReference `json:"targetRef"`

// TargetFilter allows WorkloadSpread to manage only a portion of the Pods in the TargetReference:
// by specifying the criteria for the Pods to be managed through a label selector,
// and by specifying how to obtain the total number of these selected Pods from the workload using replicasPaths.
TargetFilter *TargetFilter `json:"targetFilter,omitempty"`

// Subsets describes the pods distribution details between each of subsets.
// +patchMergeKey=name
// +patchStrategy=merge
Expand All @@ -48,6 +53,12 @@ type TargetReference struct {
Name string `json:"name"`
}

// TargetFilter is used to filter the Pods managed by WorkloadSpread from the TargetReference.
type TargetFilter struct {
Selector *metav1.LabelSelector `json:"selector,omitempty"`
ReplicasPathList []string `json:"replicasPathList,omitempty"`
}

// WorkloadSpreadScheduleStrategyType is a string enumeration type that enumerates
// all possible schedule strategies for the WorkloadSpread controller.
// +kubebuilder:validation:Enum=Adaptive;Fixed;""
Expand Down
30 changes: 30 additions & 0 deletions apis/apps/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

58 changes: 58 additions & 0 deletions config/crd/bases/apps.kruise.io_workloadspreads.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,64 @@ spec:
- name
type: object
type: array
targetFilter:
description: |-
TargetFilter allows WorkloadSpread to manage only a portion of the Pods in the TargetReference:
by specifying the criteria for the Pods to be managed through a label selector,
and by specifying how to obtain the total number of these selected Pods from the workload using replicasPaths.
properties:
replicasPathList:
items:
type: string
type: array
selector:
description: |-
A label selector is a label query over a set of resources. The result of matchLabels and
matchExpressions are ANDed. An empty label selector matches all objects. A null
label selector matches no objects.
properties:
matchExpressions:
description: matchExpressions is a list of label selector
requirements. The requirements are ANDed.
items:
description: |-
A label selector requirement is a selector that contains values, a key, and an operator that
relates the key and values.
properties:
key:
description: key is the label key that the selector
applies to.
type: string
operator:
description: |-
operator represents a key's relationship to a set of values.
Valid operators are In, NotIn, Exists and DoesNotExist.
type: string
values:
description: |-
values is an array of string values. If the operator is In or NotIn,
the values array must be non-empty. If the operator is Exists or DoesNotExist,
the values array must be empty. This array is replaced during a strategic
merge patch.
items:
type: string
type: array
required:
- key
- operator
type: object
type: array
matchLabels:
additionalProperties:
type: string
description: |-
matchLabels is a map of {key,value} pairs. A single {key,value} in the matchLabels
map is equivalent to an element of matchExpressions, whose key field is "key", the
operator is "In", and the values array contains only "value". The requirements are ANDed.
type: object
type: object
x-kubernetes-map-type: atomic
type: object
targetRef:
description: TargetReference is the target workload that WorkloadSpread
want to control.
Expand Down
116 changes: 101 additions & 15 deletions pkg/controller/workloadspread/workloadspread_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/tools/record"
Expand Down Expand Up @@ -237,7 +239,7 @@ func (r *ReconcileWorkloadSpread) Reconcile(_ context.Context, req reconcile.Req
func (r *ReconcileWorkloadSpread) getPodJob(ref *appsv1alpha1.TargetReference, namespace string) ([]*corev1.Pod, int32, error) {
ok, err := wsutil.VerifyGroupKind(ref, controllerKindJob.Kind, []string{controllerKindJob.Group})
if err != nil || !ok {
return nil, -1, err
return nil, 0, err

Check warning on line 242 in pkg/controller/workloadspread/workloadspread_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/workloadspread/workloadspread_controller.go#L242

Added line #L242 was not covered by tests
}

job := &batchv1.Job{}
Expand All @@ -248,13 +250,13 @@ func (r *ReconcileWorkloadSpread) getPodJob(ref *appsv1alpha1.TargetReference, n
klog.V(3).InfoS("Could not find Job", "job", klog.KRef(namespace, ref.Name))
return nil, 0, nil
}
return nil, -1, err
return nil, 0, err

Check warning on line 253 in pkg/controller/workloadspread/workloadspread_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/workloadspread/workloadspread_controller.go#L253

Added line #L253 was not covered by tests
}

labelSelector, err := util.ValidatedLabelSelectorAsSelector(job.Spec.Selector)
if err != nil {
klog.ErrorS(err, "Failed to get labelSelector")
return nil, -1, nil
return nil, 0, err

Check warning on line 259 in pkg/controller/workloadspread/workloadspread_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/workloadspread/workloadspread_controller.go#L259

Added line #L259 was not covered by tests
}

podList := &corev1.PodList{}
Expand All @@ -265,7 +267,7 @@ func (r *ReconcileWorkloadSpread) getPodJob(ref *appsv1alpha1.TargetReference, n
}
err = r.List(context.TODO(), podList, listOption)
if err != nil {
return nil, -1, err
return nil, 0, err

Check warning on line 270 in pkg/controller/workloadspread/workloadspread_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/workloadspread/workloadspread_controller.go#L270

Added line #L270 was not covered by tests
}

matchedPods := make([]*corev1.Pod, 0, len(podList.Items))
Expand All @@ -275,15 +277,35 @@ func (r *ReconcileWorkloadSpread) getPodJob(ref *appsv1alpha1.TargetReference, n
return matchedPods, *(job.Spec.Parallelism), nil
}

func (r *ReconcileWorkloadSpread) getReplicasPathList(ws *appsv1alpha1.WorkloadSpread) ([]string, error) {
if ws.Spec.TargetReference == nil {
return nil, nil
}

Check warning on line 283 in pkg/controller/workloadspread/workloadspread_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/workloadspread/workloadspread_controller.go#L282-L283

Added lines #L282 - L283 were not covered by tests
if ws.Spec.TargetFilter != nil && len(ws.Spec.TargetFilter.ReplicasPathList) > 0 {
return ws.Spec.TargetFilter.ReplicasPathList, nil
}
whiteList, err := configuration.GetWSWatchCustomWorkloadWhiteList(r.Client)
if err != nil {
return nil, err
}

Check warning on line 290 in pkg/controller/workloadspread/workloadspread_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/workloadspread/workloadspread_controller.go#L289-L290

Added lines #L289 - L290 were not covered by tests
gv, err := schema.ParseGroupVersion(ws.Spec.TargetReference.APIVersion)
if err != nil {
return nil, err
}

Check warning on line 294 in pkg/controller/workloadspread/workloadspread_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/workloadspread/workloadspread_controller.go#L293-L294

Added lines #L293 - L294 were not covered by tests
for _, wl := range whiteList.Workloads {
if wl.GroupVersion() != gv || wl.GroupVersionKind.Kind != ws.Spec.TargetReference.Kind {
continue

Check warning on line 297 in pkg/controller/workloadspread/workloadspread_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/workloadspread/workloadspread_controller.go#L297

Added line #L297 was not covered by tests
}
return []string{wl.ReplicasPath}, nil
}
return nil, nil
}

// getPodsForWorkloadSpread returns Pods managed by the WorkloadSpread object.
// return two parameters
// 1. podList for workloadSpread
// 2. workloadReplicas
func (r *ReconcileWorkloadSpread) getPodsForWorkloadSpread(ws *appsv1alpha1.WorkloadSpread) ([]*corev1.Pod, int32, error) {
if ws.Spec.TargetReference == nil {
return nil, -1, nil
}

var pods []*corev1.Pod
var workloadReplicas int32
var err error
Expand All @@ -294,28 +316,92 @@ func (r *ReconcileWorkloadSpread) getPodsForWorkloadSpread(ws *appsv1alpha1.Work
pods, workloadReplicas, err = r.getPodJob(targetRef, ws.Namespace)
default:
pods, workloadReplicas, err = r.controllerFinder.GetPodsForRef(targetRef.APIVersion, targetRef.Kind, ws.Namespace, targetRef.Name, false)
if err != nil {
break

Check warning on line 320 in pkg/controller/workloadspread/workloadspread_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/workloadspread/workloadspread_controller.go#L320

Added line #L320 was not covered by tests
}
workloadReplicas, pods, err = r.filterWorkload(ws, pods, workloadReplicas)
}

if err != nil {
klog.ErrorS(err, "WorkloadSpread handled targetReference failed", "workloadSpread", klog.KObj(ws))
return nil, -1, err
return nil, 0, err
}

Check warning on line 328 in pkg/controller/workloadspread/workloadspread_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/workloadspread/workloadspread_controller.go#L327-L328

Added lines #L327 - L328 were not covered by tests

if ws.Spec.TargetFilter != nil {
var filteredPods []*corev1.Pod
selector, err := util.ValidatedLabelSelectorAsSelector(ws.Spec.TargetFilter.Selector)
if err != nil {
return nil, 0, err
}

Check warning on line 335 in pkg/controller/workloadspread/workloadspread_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/workloadspread/workloadspread_controller.go#L334-L335

Added lines #L334 - L335 were not covered by tests
for _, pod := range pods {
if selector.Matches(labels.Set(pod.GetLabels())) {
filteredPods = append(filteredPods, pod)
}
}
pods = filteredPods
}

return pods, workloadReplicas, err
}

func (r *ReconcileWorkloadSpread) filterWorkload(ws *appsv1alpha1.WorkloadSpread, pods []*corev1.Pod, replicas int32) (int32, []*corev1.Pod, error) {
replicasPathList, err := r.getReplicasPathList(ws)
if err != nil {
return replicas, pods, err
}

Check warning on line 351 in pkg/controller/workloadspread/workloadspread_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/workloadspread/workloadspread_controller.go#L350-L351

Added lines #L350 - L351 were not covered by tests
var filteredReplicas int32
if len(replicasPathList) > 0 {
// replicas path list configured in someplace, should overwrite replicas value
targetRef := ws.Spec.TargetReference
wl, err := r.controllerFinder.GetControllerAsUnstructured(controllerfinder.ControllerReference{
APIVersion: targetRef.APIVersion,
Kind: targetRef.Kind,
Name: targetRef.Name,
}, ws.Namespace)
if err != nil {
return replicas, pods, client.IgnoreNotFound(err)
}

Check warning on line 363 in pkg/controller/workloadspread/workloadspread_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/workloadspread/workloadspread_controller.go#L362-L363

Added lines #L362 - L363 were not covered by tests
for _, replicasPath := range replicasPathList {
n, err := wsutil.GetReplicasFromObject(wl, replicasPath)
if err != nil {
return replicas, pods, err
}

Check warning on line 368 in pkg/controller/workloadspread/workloadspread_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/workloadspread/workloadspread_controller.go#L367-L368

Added lines #L367 - L368 were not covered by tests
filteredReplicas += n
}
} else {
filteredReplicas = replicas
}
var filteredPods []*corev1.Pod
if ws.Spec.TargetFilter != nil && ws.Spec.TargetFilter.Selector != nil {
for _, pod := range pods {
selected, err := wsutil.IsPodSelected(ws.Spec.TargetFilter, pod.Labels)
if err != nil {
return replicas, pods, err
}

Check warning on line 380 in pkg/controller/workloadspread/workloadspread_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/workloadspread/workloadspread_controller.go#L379-L380

Added lines #L379 - L380 were not covered by tests
if selected {
filteredPods = append(filteredPods, pod)
}
}
} else {
filteredPods = pods
}
return filteredReplicas, filteredPods, nil
}

// syncWorkloadSpread is the main logic of the WorkloadSpread controller. Firstly, we get Pods from workload managed by
// WorkloadSpread and then classify these Pods to each corresponding subset. Secondly, we set Pod deletion-cost annotation
// value by compare the number of subset's Pods with the subset's maxReplicas, and then we consider rescheduling failed Pods.
// Lastly, we update the WorkloadSpread's Status and clean up scheduled failed Pods. controller should collaborate with webhook
// to maintain WorkloadSpread status together. The controller is responsible for calculating the real status, and the webhook
// mainly counts missingReplicas and records the creation or deletion entry of Pod into map.
func (r *ReconcileWorkloadSpread) syncWorkloadSpread(ws *appsv1alpha1.WorkloadSpread) error {
if ws.Spec.TargetReference == nil {
klog.InfoS("WorkloadSpread has no target reference")
return nil
}

Check warning on line 401 in pkg/controller/workloadspread/workloadspread_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/workloadspread/workloadspread_controller.go#L399-L401

Added lines #L399 - L401 were not covered by tests
pods, workloadReplicas, err := r.getPodsForWorkloadSpread(ws)
if err != nil || workloadReplicas == -1 {
if err != nil {
klog.ErrorS(err, "WorkloadSpread got matched pods failed", "workloadSpread", klog.KObj(ws))
}
if err != nil {
klog.ErrorS(err, "WorkloadSpread got matched pods failed", "workloadSpread", klog.KObj(ws))

Check warning on line 404 in pkg/controller/workloadspread/workloadspread_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/workloadspread/workloadspread_controller.go#L404

Added line #L404 was not covered by tests
return err
}
if len(pods) == 0 {
Expand Down Expand Up @@ -398,7 +484,7 @@ func (r *ReconcileWorkloadSpread) groupPodBySubset(ws *appsv1alpha1.WorkloadSpre
for _, subset := range ws.Spec.Subsets {
podMap[subset.Name] = []*corev1.Pod{}
subsetMissingReplicas[subset.Name], _ = intstr.GetScaledValueFromIntOrPercent(
intstr.ValueOrDefault(subset.MaxReplicas, intstr.FromInt(math.MaxInt32)), int(replicas), true)
intstr.ValueOrDefault(subset.MaxReplicas, intstr.FromInt32(math.MaxInt32)), int(replicas), true)
}

// count managed pods for each subset
Expand Down Expand Up @@ -649,7 +735,7 @@ func (r *ReconcileWorkloadSpread) calculateWorkloadSpreadSubsetStatus(ws *appsv1
// MaxReplicas is nil, which means there is no limit for subset replicas, using -1 to represent it.
subsetMaxReplicas = -1
} else {
subsetMaxReplicas, err = intstr.GetValueFromIntOrPercent(subset.MaxReplicas, int(workloadReplicas), true)
subsetMaxReplicas, err = intstr.GetScaledValueFromIntOrPercent(subset.MaxReplicas, int(workloadReplicas), true)
if err != nil || subsetMaxReplicas < 0 {
klog.ErrorS(err, "Failed to get maxReplicas value from subset of WorkloadSpread", "subsetName", subset.Name, "workloadSpread", klog.KObj(ws))
return nil
Expand Down
Loading

0 comments on commit 02df344

Please sign in to comment.