Skip to content

Commit

Permalink
(apache#525)[operator] service creation refinement
Browse files Browse the repository at this point in the history
  • Loading branch information
advancedxy committed Feb 6, 2023
1 parent e1a1567 commit dede8d6
Show file tree
Hide file tree
Showing 7 changed files with 117 additions and 119 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,11 @@ type CoordinatorConfig struct {
ExcludeNodesFilePath string `json:"excludeNodesFilePath,omitempty"`

// RPCNodePort defines rpc port of node port service used for coordinators' external access.
// +optional
RPCNodePort []int32 `json:"rpcNodePort"`

// HTTPNodePort defines http port of node port service used for coordinators' external access.
// +optional
HTTPNodePort []int32 `json:"httpNodePort"`
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3271,9 +3271,7 @@ spec:
type: string
required:
- configDir
- httpNodePort
- image
- rpcNodePort
- xmxSize
type: object
shuffleServer:
Expand Down
11 changes: 6 additions & 5 deletions deploy/kubernetes/operator/pkg/controller/controller/rss.go
Original file line number Diff line number Diff line change
Expand Up @@ -618,16 +618,17 @@ func (r *rssController) syncShuffleServer(rss *unifflev1alpha1.RemoteShuffleServ
if rss.Status.Phase == unifflev1alpha1.RSSRunning && !*rss.Spec.ShuffleServer.Sync {
return nil
}
serviceAccount, services, statefulSet := shuffleserver.GenerateShuffleServers(rss)
// we don't need to generate svc for shuffle servers:
// shuffle servers are access directly through coordinator's shuffler assignments. service for shuffle server is
// pointless. For spark apps running in the cluster, executor containers could access shuffler server via container
// network(overlay or host network). If shuffle servers should be exposed to external, host network should be used
// and external executor should access the host node ip:port directly.
serviceAccount, statefulSet := shuffleserver.GenerateShuffleServers(rss)
if err := kubeutil.SyncServiceAccount(r.kubeClient, serviceAccount); err != nil {
klog.Errorf("sync SA (%v) for rss (%v) failed: %v",
utils.UniqueName(serviceAccount), utils.UniqueName(rss), err)
return err
}
if err := kubeutil.SyncServices(r.kubeClient, services); err != nil {
klog.Errorf("sync SVCs for rss (%v) failed: %v", utils.UniqueName(rss), err)
return err
}
if _, _, err := kubeutil.SyncStatefulSet(r.kubeClient, statefulSet, true); err != nil {
klog.Errorf("sync StatefulSet for rss (%v) failed: %v", utils.UniqueName(rss), err)
return err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,18 @@ func GenerateCoordinators(rss *unifflev1alpha1.RemoteShuffleService) (
sa := GenerateSA(rss)
cm := GenerateCM(rss)
count := *rss.Spec.Coordinator.Count
services := make([]*corev1.Service, count)
services := make([]*corev1.Service, 0)
deployments := make([]*appsv1.Deployment, count)
for i := 0; i < int(count); i++ {
svc := GenerateSvc(rss, i)
// only generate svc when nodePorts are specified
if len(rss.Spec.Coordinator.RPCNodePort) > 0 {
svc := GenerateSvc(rss, i)
services = append(services, svc)
}
headlessSvc := GenerateHeadlessSvc(rss, i)
deploy := GenerateDeploy(rss, i)
services[i] = svc
deployments[i] = deploy
services = append(services, headlessSvc)
}
return sa, cm, services, deployments
}
Expand Down Expand Up @@ -93,7 +98,43 @@ func GenerateCM(rss *unifflev1alpha1.RemoteShuffleService) *corev1.ConfigMap {
return cm
}

// GenerateSvc generates service used by specific coordinator.
// GenerateHeadlessSvc generates a headless service for corresponding coordinator.
func GenerateHeadlessSvc(rss *unifflev1alpha1.RemoteShuffleService, index int) *corev1.Service {
name := GenerateNameByIndex(rss, index)
serviceName := appendHeadless(name)

svc := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: serviceName,
Namespace: rss.Namespace,
},
Spec: corev1.ServiceSpec{
ClusterIP: corev1.ClusterIPNone,
Selector: map[string]string{
"app": name,
},
Ports: []corev1.ServicePort{
{
Name: "rpc",
Protocol: corev1.ProtocolTCP,
Port: controllerconstants.ContainerCoordinatorRPCPort,
TargetPort: intstr.FromInt(int(*rss.Spec.Coordinator.RPCPort)),
},
{
Name: "http",
Protocol: corev1.ProtocolTCP,
Port: controllerconstants.ContainerCoordinatorHTTPPort,
TargetPort: intstr.FromInt(int(*rss.Spec.Coordinator.HTTPPort)),
},
},
},
}
util.AddOwnerReference(&svc.ObjectMeta, rss)
return svc
}

// GenerateSvc generates NodePort service used by specific coordinator. If no RPCNodePort/HTTPNodePort is specified,
// this function is skipped.
func GenerateSvc(rss *unifflev1alpha1.RemoteShuffleService, index int) *corev1.Service {
name := GenerateNameByIndex(rss, index)
svc := &corev1.Service{
Expand Down Expand Up @@ -223,12 +264,17 @@ func GenerateNameByIndex(rss *unifflev1alpha1.RemoteShuffleService, index int) s
return fmt.Sprintf("%v-%v-%v", constants.RSSCoordinator, rss.Name, index)
}

func appendHeadless(name string) string {
return name + "-headless"
}

// GenerateAddresses returns addresses of coordinators accessed by shuffle servers.
func GenerateAddresses(rss *unifflev1alpha1.RemoteShuffleService) string {
var names []string
for i := 0; i < int(*rss.Spec.Coordinator.Count); i++ {
current := fmt.Sprintf("%v:%v", GenerateNameByIndex(rss, i),
controllerconstants.ContainerShuffleServerRPCPort)
name := GenerateNameByIndex(rss, i)
serviceName := appendHeadless(name)
current := fmt.Sprintf("%v:%v", serviceName, controllerconstants.ContainerShuffleServerRPCPort)
names = append(names, current)
}
return strings.Join(names, ",")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"

unifflev1alpha1 "github.com/apache/incubator-uniffle/deploy/kubernetes/operator/api/uniffle/v1alpha1"
uniffleapi "github.com/apache/incubator-uniffle/deploy/kubernetes/operator/api/uniffle/v1alpha1"
"github.com/apache/incubator-uniffle/deploy/kubernetes/operator/pkg/utils"
)

Expand All @@ -36,7 +38,7 @@ var commonLabels = map[string]string{
"key3": "value3",
}

func buildRssWithLabels() *unifflev1alpha1.RemoteShuffleService {
func buildRssWithLabels() *uniffleapi.RemoteShuffleService {
rss := utils.BuildRSSWithDefaultValue()
rss.Spec.Coordinator.Labels = commonLabels
return rss
Expand All @@ -45,7 +47,7 @@ func buildRssWithLabels() *unifflev1alpha1.RemoteShuffleService {
func TestGenerateDeploy(t *testing.T) {
for _, tt := range []struct {
name string
rss *unifflev1alpha1.RemoteShuffleService
rss *uniffleapi.RemoteShuffleService
IsValidDeploy
}{
{
Expand Down Expand Up @@ -88,3 +90,49 @@ func TestGenerateDeploy(t *testing.T) {
})
}
}

func TestGenerateSvcForCoordinator(t *testing.T) {
for _, tt := range []struct {
name string
rss *uniffleapi.RemoteShuffleService
serviceCntMap map[v1.ServiceType]int
}{
{
name: "with RPCNodePort",
rss: buildRssWithLabels(),
serviceCntMap: map[v1.ServiceType]int{
"": 2, // defaults to headless service
v1.ServiceTypeNodePort: 2,
},
},
{
name: "without rpcNodePort",
rss: func() *uniffleapi.RemoteShuffleService {
withoutRPCNodePortRss := buildRssWithLabels()
withoutRPCNodePortRss.Spec.Coordinator.RPCNodePort = make([]int32, 0)
withoutRPCNodePortRss.Spec.Coordinator.HTTPNodePort = make([]int32, 0)
return withoutRPCNodePortRss
}(),
serviceCntMap: map[v1.ServiceType]int{
"": 2,
},
},
} {
t.Run(tt.name, func(t *testing.T) {
assertion := assert.New(t)
_, _, services, _ := GenerateCoordinators(tt.rss)
result := make(map[v1.ServiceType]int)
for _, service := range services {
result[service.Spec.Type]++
}
assertion.Equal(tt.serviceCntMap, result)
})
}
}

func TestGenerateAddresses(t *testing.T) {
assertion := assert.New(t)
rss := buildRssWithLabels()
quorum := GenerateAddresses(rss)
assertion.Contains(quorum, "headless")
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/utils/pointer"

unifflev1alpha1 "github.com/apache/incubator-uniffle/deploy/kubernetes/operator/api/uniffle/v1alpha1"
Expand All @@ -38,18 +37,10 @@ import (
)

// GenerateShuffleServers generates objects related to shuffle servers.
func GenerateShuffleServers(rss *unifflev1alpha1.RemoteShuffleService) (
*corev1.ServiceAccount, []*corev1.Service, *appsv1.StatefulSet) {
func GenerateShuffleServers(rss *unifflev1alpha1.RemoteShuffleService) (*corev1.ServiceAccount, *appsv1.StatefulSet) {
sa := GenerateSA(rss)
var services []*corev1.Service
if needGenerateHeadlessSVC(rss) {
services = append(services, GenerateHeadlessSVC(rss))
}
if needGenerateNodePortSVC(rss) {
services = append(services, GenerateNodePortSVC(rss))
}
sts := GenerateSts(rss)
return sa, services, sts
return sa, sts
}

// GenerateSA generates service account of shuffle servers.
Expand All @@ -64,76 +55,6 @@ func GenerateSA(rss *unifflev1alpha1.RemoteShuffleService) *corev1.ServiceAccoun
return sa
}

// GenerateHeadlessSVC generates headless service used by shuffle servers.
func GenerateHeadlessSVC(rss *unifflev1alpha1.RemoteShuffleService) *corev1.Service {
name := generateHeadlessSVCName(rss)
svc := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: rss.Namespace,
},
Spec: corev1.ServiceSpec{
ClusterIP: corev1.ClusterIPNone,
Selector: map[string]string{
"app": GenerateName(rss),
},
},
}
if rss.Spec.ShuffleServer.RPCPort != nil {
svc.Spec.Ports = append(svc.Spec.Ports, corev1.ServicePort{
Name: "rpc",
Protocol: corev1.ProtocolTCP,
Port: controllerconstants.ContainerShuffleServerRPCPort,
TargetPort: intstr.FromInt(int(*rss.Spec.ShuffleServer.RPCPort)),
})
}
if rss.Spec.ShuffleServer.HTTPPort != nil {
svc.Spec.Ports = append(svc.Spec.Ports, corev1.ServicePort{
Name: "http",
Protocol: corev1.ProtocolTCP,
Port: controllerconstants.ContainerShuffleServerHTTPPort,
TargetPort: intstr.FromInt(int(*rss.Spec.ShuffleServer.HTTPPort)),
})
}
util.AddOwnerReference(&svc.ObjectMeta, rss)
return svc
}

// GenerateNodePortSVC generates nodePort service used by shuffle servers.
func GenerateNodePortSVC(rss *unifflev1alpha1.RemoteShuffleService) *corev1.Service {
name := GenerateName(rss)
svc := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: rss.Namespace,
},
Spec: corev1.ServiceSpec{
Type: corev1.ServiceTypeNodePort,
Selector: map[string]string{
"app": name,
},
},
}
if needNodePortForRPC(rss) {
svc.Spec.Ports = append(svc.Spec.Ports, corev1.ServicePort{
Protocol: corev1.ProtocolTCP,
Port: controllerconstants.ContainerShuffleServerRPCPort,
TargetPort: intstr.FromInt(int(*rss.Spec.ShuffleServer.RPCPort)),
NodePort: *rss.Spec.ShuffleServer.RPCNodePort,
})
}
if needNodePortForHTTP(rss) {
svc.Spec.Ports = append(svc.Spec.Ports, corev1.ServicePort{
Protocol: corev1.ProtocolTCP,
Port: controllerconstants.ContainerShuffleServerHTTPPort,
TargetPort: intstr.FromInt(int(*rss.Spec.ShuffleServer.HTTPPort)),
NodePort: *rss.Spec.ShuffleServer.HTTPNodePort,
})
}
util.AddOwnerReference(&svc.ObjectMeta, rss)
return svc
}

// getReplicas returns replicas of shuffle servers.
func getReplicas(rss *unifflev1alpha1.RemoteShuffleService) *int32 {
// TODO: we will support hpa for rss object,
Expand Down Expand Up @@ -332,23 +253,3 @@ func generateMainContainerENV(rss *unifflev1alpha1.RemoteShuffleService) []corev
},
}
}

// needGenerateNodePortSVC returns whether we need node port service for shuffle servers.
func needGenerateNodePortSVC(rss *unifflev1alpha1.RemoteShuffleService) bool {
return needNodePortForRPC(rss) || needNodePortForHTTP(rss)
}

// needGenerateHeadlessSVC returns whether we need headless service for shuffle servers.
func needGenerateHeadlessSVC(rss *unifflev1alpha1.RemoteShuffleService) bool {
return rss.Spec.ShuffleServer.RPCPort != nil || rss.Spec.ShuffleServer.HTTPPort != nil
}

// needNodePortForRPC returns whether we need node port service for rpc service of shuffle servers.
func needNodePortForRPC(rss *unifflev1alpha1.RemoteShuffleService) bool {
return rss.Spec.ShuffleServer.RPCPort != nil && rss.Spec.ShuffleServer.RPCNodePort != nil
}

// needNodePortForRPC returns whether we need node port service for http service of shuffle servers.
func needNodePortForHTTP(rss *unifflev1alpha1.RemoteShuffleService) bool {
return rss.Spec.ShuffleServer.HTTPPort != nil && rss.Spec.ShuffleServer.HTTPNodePort != nil
}
6 changes: 4 additions & 2 deletions deploy/kubernetes/operator/pkg/webhook/inspector/rss.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,10 @@ func generateRSSPatches(ar *admissionv1.AdmissionReview,

// validateCoordinator validates configurations for coordinators.
func validateCoordinator(coordinator *unifflev1alpha1.CoordinatorConfig) error {
if len(coordinator.RPCNodePort) != int(*coordinator.Count) ||
len(coordinator.HTTPNodePort) != int(*coordinator.Count) {
// number of RPCNodePort must equal with number of HTTPNodePort
if len(coordinator.RPCNodePort) != len(coordinator.HTTPNodePort) ||
// RPCNodePort/HTTPNodePort could be zero
(len(coordinator.HTTPNodePort) > 0 && len(coordinator.HTTPNodePort) != int(*coordinator.Count)) {
return fmt.Errorf("invalid number of http or rpc node ports (%v/%v) <> (%v)",
len(coordinator.RPCNodePort), len(coordinator.HTTPNodePort), *coordinator.Count)
}
Expand Down

0 comments on commit dede8d6

Please sign in to comment.