Skip to content

Commit

Permalink
[#545][FOLLOWUP]feat(operator): support specifying custom affinity & …
Browse files Browse the repository at this point in the history
…tolerations (#641)

### What changes were proposed in this pull request?
Support specifying coordinator/shuffler server's affinity and tolerations fields for RSS spec.  

### Why are the changes needed?
1. Production environment deployment needs to use.
2. In order to provider flexible scheduling semantics

This is a followup of #545 

### Does this PR introduce _any_ user-facing change?
For RSS cluster admin, they can set custom affinity for shuffle servers and coordinators.

### How was this patch tested?
Added UTs and manually verified.

![image](https://user-images.githubusercontent.com/45311215/220524164-4d21758a-9f5e-4285-b455-2546058fe359.png)
  • Loading branch information
crain-cn authored Feb 22, 2023
1 parent 8b347a2 commit b3a10d3
Show file tree
Hide file tree
Showing 7 changed files with 2,329 additions and 320 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,14 @@ type RSSPodSpec struct {
// Selector which must match a node's labels for the pod to be scheduled on that node.
// +optional
NodeSelector map[string]string `json:"nodeSelector,omitempty"`

// Tolerations indicates the tolerations the pods under this subset have.
// +optional
Tolerations []corev1.Toleration `json:"tolerations,omitempty"`

// Affinity is a group of affinity scheduling rules.
// +optional
Affinity *corev1.Affinity `json:"affinity,omitempty"`
}

// MainContainer stores information of the main container of coordinators or shuffle servers,
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -178,14 +178,10 @@ func GenerateDeploy(rss *unifflev1alpha1.RemoteShuffleService, index int) *appsv
podSpec := corev1.PodSpec{
HostNetwork: *rss.Spec.Coordinator.HostNetwork,
ServiceAccountName: utils.GenerateCoordinatorName(rss),
Tolerations: []corev1.Toleration{
{
Effect: corev1.TaintEffectNoSchedule,
Key: "node-role.kubernetes.io/master",
},
},
Volumes: rss.Spec.Coordinator.Volumes,
NodeSelector: rss.Spec.Coordinator.NodeSelector,
Tolerations: rss.Spec.Coordinator.Tolerations,
Volumes: rss.Spec.Coordinator.Volumes,
NodeSelector: rss.Spec.Coordinator.NodeSelector,
Affinity: rss.Spec.Coordinator.Affinity,
}
configurationVolume := corev1.Volume{
Name: controllerconstants.ConfigurationVolumeName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,53 @@ var (
},
},
}
testTolerationName = "test-toleration"
testTolerations = []corev1.Toleration{
{
Key: testTolerationName,
Operator: corev1.TolerationOperator("In"),
Value: "",
Effect: corev1.TaintEffectNoSchedule,
},
}
testAffinity = &corev1.Affinity{
NodeAffinity: &corev1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{
NodeSelectorTerms: []corev1.NodeSelectorTerm{
{
MatchExpressions: []corev1.NodeSelectorRequirement{
{
Key: "key2",
Operator: corev1.NodeSelectorOpIn,
Values: []string{"value1", "value2"},
},
},
MatchFields: []corev1.NodeSelectorRequirement{
{
Key: "metadata.name",
Operator: corev1.NodeSelectorOpIn,
Values: []string{"host1"},
},
},
},
},
},
PreferredDuringSchedulingIgnoredDuringExecution: []corev1.PreferredSchedulingTerm{
{
Weight: 10,
Preference: corev1.NodeSelectorTerm{
MatchExpressions: []corev1.NodeSelectorRequirement{
{
Key: "foo",
Operator: corev1.NodeSelectorOpIn,
Values: []string{"bar"},
},
},
},
},
},
},
}
)

func buildRssWithLabels() *uniffleapi.RemoteShuffleService {
Expand All @@ -107,6 +154,18 @@ func withCustomVolumes(volumes []corev1.Volume) *uniffleapi.RemoteShuffleService
return rss
}

func withCustomTolerations(tolerations []corev1.Toleration) *uniffleapi.RemoteShuffleService {
rss := utils.BuildRSSWithDefaultValue()
rss.Spec.Coordinator.Tolerations = tolerations
return rss
}

func withCustomAffinity(affinity *corev1.Affinity) *uniffleapi.RemoteShuffleService {
rss := utils.BuildRSSWithDefaultValue()
rss.Spec.Coordinator.Affinity = affinity
return rss
}

func buildRssWithCustomRPCPort() *uniffleapi.RemoteShuffleService {
rss := utils.BuildRSSWithDefaultValue()
rss.Spec.Coordinator.RPCPort = pointer.Int32(testRPCPort)
Expand Down Expand Up @@ -345,6 +404,38 @@ func TestGenerateDeploy(t *testing.T) {
return false, fmt.Errorf("generated deploy should include volume: %s", testVolumeName)
},
},
{
name: "set custom tolerations",
rss: withCustomTolerations(testTolerations),
IsValidDeploy: func(deploy *appsv1.Deployment, rss *uniffleapi.RemoteShuffleService) (bool, error) {
for _, toleration := range deploy.Spec.Template.Spec.Tolerations {
if toleration.Key == testTolerationName {
expectedToleration := testTolerations[0]
equal := reflect.DeepEqual(expectedToleration, toleration)
if equal {
return true, nil
}
tolerationJSON, _ := json.Marshal(expectedToleration)
return false, fmt.Errorf("generated deploy doesn't contain expected tolerations: %s", tolerationJSON)
}
}
return false, fmt.Errorf("generated deploy should include tolerations: %s", testTolerationName)
},
},
{
name: "set custom affinity",
rss: withCustomAffinity(testAffinity),
IsValidDeploy: func(deploy *appsv1.Deployment, rss *uniffleapi.RemoteShuffleService) (bool, error) {
if deploy.Spec.Template.Spec.Affinity != nil {
deploy.Spec.Template.Spec.Affinity = rss.Spec.Coordinator.Affinity
equal := reflect.DeepEqual(deploy.Spec.Template.Spec.Affinity, testAffinity)
if equal {
return true, nil
}
}
return false, fmt.Errorf("generated deploy should include affinity: %v", testAffinity)
},
},
} {
t.Run(tt.name, func(tc *testing.T) {
deploy := GenerateDeploy(tt.rss, 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,12 @@ func GenerateSts(rss *unifflev1alpha1.RemoteShuffleService) *appsv1.StatefulSet
SecurityContext: rss.Spec.ShuffleServer.SecurityContext,
HostNetwork: *rss.Spec.ShuffleServer.HostNetwork,
ServiceAccountName: GenerateName(rss),
Tolerations: []corev1.Toleration{
{
Effect: corev1.TaintEffectNoSchedule,
Key: "node-role.kubernetes.io/master",
},
},
Volumes: rss.Spec.ShuffleServer.Volumes,
NodeSelector: rss.Spec.ShuffleServer.NodeSelector,
Tolerations: rss.Spec.ShuffleServer.Tolerations,
Volumes: rss.Spec.ShuffleServer.Volumes,
NodeSelector: rss.Spec.ShuffleServer.NodeSelector,
Affinity: rss.Spec.ShuffleServer.Affinity,
}

configurationVolume := corev1.Volume{
Name: controllerconstants.ConfigurationVolumeName,
VolumeSource: corev1.VolumeSource{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,53 @@ var (
},
},
}
testTolerationName = "test-toleration"
testTolerations = []corev1.Toleration{
{
Key: testTolerationName,
Operator: corev1.TolerationOperator("In"),
Value: "",
Effect: corev1.TaintEffectNoSchedule,
},
}
testAffinity = &corev1.Affinity{
NodeAffinity: &corev1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{
NodeSelectorTerms: []corev1.NodeSelectorTerm{
{
MatchExpressions: []corev1.NodeSelectorRequirement{
{
Key: "key2",
Operator: corev1.NodeSelectorOpIn,
Values: []string{"value1", "value2"},
},
},
MatchFields: []corev1.NodeSelectorRequirement{
{
Key: "metadata.name",
Operator: corev1.NodeSelectorOpIn,
Values: []string{"host1"},
},
},
},
},
},
PreferredDuringSchedulingIgnoredDuringExecution: []corev1.PreferredSchedulingTerm{
{
Weight: 10,
Preference: corev1.NodeSelectorTerm{
MatchExpressions: []corev1.NodeSelectorRequirement{
{
Key: "foo",
Operator: corev1.NodeSelectorOpIn,
Values: []string{"bar"},
},
},
},
},
},
},
}
)

func buildRssWithLabels() *uniffleapi.RemoteShuffleService {
Expand Down Expand Up @@ -124,6 +171,18 @@ func buildRssWithCustomHTTPPort() *uniffleapi.RemoteShuffleService {
return rss
}

func withCustomTolerations(tolerations []corev1.Toleration) *uniffleapi.RemoteShuffleService {
rss := utils.BuildRSSWithDefaultValue()
rss.Spec.ShuffleServer.Tolerations = tolerations
return rss
}

func withCustomAffinity(affinity *corev1.Affinity) *uniffleapi.RemoteShuffleService {
rss := utils.BuildRSSWithDefaultValue()
rss.Spec.ShuffleServer.Affinity = affinity
return rss
}

func buildCommonExpectedENVs(rss *uniffleapi.RemoteShuffleService) []corev1.EnvVar {
return []corev1.EnvVar{
{
Expand Down Expand Up @@ -352,6 +411,38 @@ func TestGenerateSts(t *testing.T) {
return false, fmt.Errorf("generated sts should include volume: %s", testVolumeName)
},
},
{
name: "test custom tolerations",
rss: withCustomTolerations(testTolerations),
IsValidSts: func(sts *appsv1.StatefulSet, rss *uniffleapi.RemoteShuffleService) (valid bool, err error) {
for _, toleration := range sts.Spec.Template.Spec.Tolerations {
if toleration.Key == testTolerationName {
expectedToleration := testTolerations[0]
equal := reflect.DeepEqual(expectedToleration, toleration)
if equal {
return true, nil
}
tolerationJSON, _ := json.Marshal(expectedToleration)
return false, fmt.Errorf("generated sts doesn't contain expected toleration: %s", tolerationJSON)
}
}
return false, fmt.Errorf("generated sts should include toleration: %s", testTolerationName)
},
},
{
name: "test custom affinity",
rss: withCustomAffinity(testAffinity),
IsValidSts: func(sts *appsv1.StatefulSet, rss *uniffleapi.RemoteShuffleService) (valid bool, err error) {
if sts.Spec.Template.Spec.Affinity != nil {
sts.Spec.Template.Spec.Affinity = rss.Spec.ShuffleServer.Affinity
equal := reflect.DeepEqual(sts.Spec.Template.Spec.Affinity, testAffinity)
if equal {
return true, nil
}
}
return false, fmt.Errorf("generated sts should include affinity: %v", testAffinity)
},
},
} {
t.Run(tt.name, func(tc *testing.T) {
sts := GenerateSts(tt.rss)
Expand Down

0 comments on commit b3a10d3

Please sign in to comment.