Skip to content

Commit

Permalink
Use Service ClusterIPs as MC Service endpoint
Browse files Browse the repository at this point in the history
Use Service ClusterIPs instead of Pod IP as MC Service Endpoint.
The ServiceExport controller will watch only watch ServiceExport and
Service events, wrap Services' ClusterIPs into a new Endpoint kind of
ResourceExport.

Signed-off-by: Lan Luo <luola@vmware.com>
  • Loading branch information
luolanzone committed Apr 27, 2022
1 parent 288d6ee commit be8750e
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 98 deletions.
2 changes: 1 addition & 1 deletion ci/jenkins/test-mc.sh
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ function deliver_multicluster_controller {
export GOROOT=/usr/local/go
export PATH=${GOROOT}/bin:$PATH

docker images | grep 'mc-controller' | awk '{print $3}' | xargs -r docker rmi || true
docker images | grep 'mc-controller' | awk '{print $3}' | xargs -r docker rmi -f || true
export NO_PULL=1;make antrea-mc-controller

docker save "${DOCKER_REGISTRY}"/antrea/antrea-mc-controller:latest -o "${WORKDIR}"/antrea-mcs.tar
Expand Down
72 changes: 31 additions & 41 deletions multicluster/controllers/multicluster/serviceexport_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package multicluster
import (
"context"
"reflect"
"sort"

corev1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
Expand Down Expand Up @@ -75,7 +74,6 @@ type (
const (
// cached indexer
svcIndexerByType = "svc.type"
epIndexerByLabel = "ep.label"
)

type reason int
Expand All @@ -96,9 +94,7 @@ func NewServiceExportReconciler(
installedSvcs: cache.NewIndexer(svcInfoKeyFunc, cache.Indexers{
svcIndexerByType: svcIndexerByTypeFunc,
}),
installedEps: cache.NewIndexer(epInfoKeyFunc, cache.Indexers{
epIndexerByLabel: epIndexerByLabelFunc,
}),
installedEps: cache.NewIndexer(epInfoKeyFunc, cache.Indexers{}),
}
return reconciler
}
Expand All @@ -117,34 +113,17 @@ func epInfoKeyFunc(obj interface{}) (string, error) {
return common.NamespacedName(ep.namespace, ep.name), nil
}

func epIndexerByLabelFunc(obj interface{}) ([]string, error) {
var info []string
ep := obj.(*epInfo)
keys := make([]string, len(ep.labels))
i := 0
for k := range ep.labels {
keys[i] = k
i++
}
sort.Strings(keys)
for _, k := range keys {
info = append(info, k+ep.labels[k])
}
return info, nil
}

//+kubebuilder:rbac:groups=multicluster.crd.antrea.io,resources=resourceexports,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=multicluster.crd.antrea.io,resources=resourceexports/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=multicluster.crd.antrea.io,resources=resourceexports/finalizers,verbs=update
//+kubebuilder:rbac:groups=multicluster.x-k8s.io,resources=serviceexports,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=multicluster.x-k8s.io,resources=serviceexports/status,verbs=get;update;patch
//+kubebuilder:rbac:groups="",resources=services,verbs=get;list;watch;update
//+kubebuilder:rbac:groups="",resources=endpoints,verbs=get;list;watch;update

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
// For ServiceExport Reconcile, it watches events of ServiceExport resources,
// and also Endpoints/Services resource. It will create/update/remove ResourceExport
// and also Services resource. It will create/update/remove ResourceExport
// in a leader cluster for corresponding ServiceExport from a member cluster.
func (r *ServiceExportReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
klog.V(2).InfoS("Reconciling ServiceExport", "serviceexport", req.NamespacedName)
Expand Down Expand Up @@ -246,8 +225,8 @@ func (r *ServiceExportReconciler) Reconcile(ctx context.Context, req ctrl.Reques
}
}

// We also watch Service and Endpoints events via events mapping function.
// Need to check cache and compare with cache if there is any change for Service or Endpoints.
// We also watch Service events via events mapping function.
// Need to check cache and compare with cache if there is any change for Service.
var svcNoChange, epNoChange bool
svcExportNSName := common.NamespacedName(r.leaderNamespace, svcResExportName)
epExportNSName := common.NamespacedName(r.leaderNamespace, epResExportName)
Expand All @@ -267,20 +246,7 @@ func (r *ServiceExportReconciler) Reconcile(ctx context.Context, req ctrl.Reques
Namespace: req.Namespace,
},
}

err = r.Client.Get(ctx, req.NamespacedName, ep)
if err != nil {
klog.ErrorS(err, "Failed to get Endpoints", "endpoints", req.String())
if apierrors.IsNotFound(err) && epInstalled {
err = r.handleEndpointDeleteEvent(ctx, req, remoteCluster)
if err != nil {
return ctrl.Result{}, err
}
r.installedEps.Delete(epObj)
}
return ctrl.Result{}, client.IgnoreNotFound(err)
}

ep.Subsets = []corev1.EndpointSubset{getSubsets(svc)}
if epInstalled {
installedEp := epObj.(*epInfo)
if apiequality.Semantic.DeepEqual(getEndPointsPorts(ep), installedEp.ports) &&
Expand Down Expand Up @@ -449,7 +415,6 @@ func (r *ServiceExportReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&k8smcsv1alpha1.ServiceExport{}).
Watches(&source.Kind{Type: &corev1.Service{}}, handler.EnqueueRequestsFromMapFunc(objMapFunc)).
Watches(&source.Kind{Type: &corev1.Endpoints{}}, handler.EnqueueRequestsFromMapFunc(objMapFunc)).
WithOptions(controller.Options{
MaxConcurrentReconciles: common.DefaultWorkerCount,
}).
Expand Down Expand Up @@ -517,7 +482,6 @@ func (r *ServiceExportReconciler) endpointsHandler(
namespace: ep.Namespace,
addressIPs: getEndPointsAddress(ep),
ports: getEndPointsPorts(ep),
labels: ep.Labels,
}
r.refreshResourceExport(resName, kind, nil, ep, &re)
existResExport := &mcsv1alpha1.ResourceExport{}
Expand Down Expand Up @@ -621,3 +585,29 @@ func getResourceExportName(clusterID string, req ctrl.Request, kind string) stri
func getStringPointer(str string) *string {
return &str
}

func getSubsets(svc *corev1.Service) corev1.EndpointSubset {
var epSubset corev1.EndpointSubset
for _, ip := range svc.Spec.ClusterIPs {
epSubset.Addresses = append(epSubset.Addresses, corev1.EndpointAddress{IP: ip})
}

epSubset.Ports = portsConverter(svc.Spec.Ports)
return epSubset
}

// portsConverter will convert Service's port to EndpointPort
func portsConverter(ports []corev1.ServicePort) []corev1.EndpointPort {
if len(ports) == 0 {
return nil
}
var epPorts []corev1.EndpointPort
for _, p := range ports {
epPorts = append(epPorts, corev1.EndpointPort{
Name: p.Name,
Port: p.Port,
Protocol: p.Protocol,
})
}
return epPorts
}
Original file line number Diff line number Diff line change
Expand Up @@ -222,18 +222,9 @@ func TestServiceExportReconciler_handleServiceUpdateEvent(t *testing.T) {
ports: svcNginx.Spec.Ports,
svcType: string(svcNginx.Spec.Type),
}
epInfo := &epInfo{
name: epNginx.Name,
namespace: epNginx.Namespace,
addressIPs: getEndPointsAddress(epNginx),
ports: getEndPointsPorts(epNginx),
labels: epNginx.Labels,
}

newSvcNginx := svcNginx.DeepCopy()
newSvcNginx.Spec.Ports = []corev1.ServicePort{svcPort8080}
newEpNginx := epNginx.DeepCopy()
newEpNginx.Subsets[0].Ports = epPorts8080

re := mcsv1alpha1.ResourceExport{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -259,13 +250,12 @@ func TestServiceExportReconciler_handleServiceUpdateEvent(t *testing.T) {
existEpRe.Name = "cluster-a-default-nginx-endpoints"
existEpRe.Spec.Endpoints = &mcsv1alpha1.EndpointsExport{Subsets: epNginxSubset}

fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(newSvcNginx, newEpNginx, existSvcExport).Build()
fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(newSvcNginx, existSvcExport).Build()
fakeRemoteClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(existSvcRe, existEpRe).Build()

_ = commonarea.NewFakeRemoteCommonArea(scheme, &remoteMgr, fakeRemoteClient, "leader-cluster", "default")
r := NewServiceExportReconciler(fakeClient, scheme, &remoteMgr)
r.installedSvcs.Add(sinfo)
r.installedEps.Add(epInfo)
if _, err := r.Reconcile(ctx, nginxReq); err != nil {
t.Errorf("ServiceExport Reconciler should update ResourceExports but got error = %v", err)
} else {
Expand Down Expand Up @@ -296,7 +286,7 @@ func TestServiceExportReconciler_handleServiceUpdateEvent(t *testing.T) {
{
Addresses: []corev1.EndpointAddress{
{
IP: "192.168.17.11",
IP: "192.168.2.3",
},
},
Ports: epPorts8080,
Expand Down
6 changes: 2 additions & 4 deletions multicluster/test/e2e/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,18 +97,16 @@ func TestConnectivity(t *testing.T) {
}
defer teardownTest(t, data)

if testOptions.enableDataplane {
defer teardownForDataplane(t, data)
}
t.Run("testServiceExport", func(t *testing.T) {
if testOptions.enableDataplane {
defer teardownForDataplane(t, data)
initializeDataplane(t, data)
}
testServiceExport(t, data)
})

defer tearDownForPolicyTest()
t.Run("testAntreaPolicy", func(t *testing.T) {
defer tearDownForPolicyTest()
initializeForPolicyTest(t, data)
testMCAntreaPolicy(t, data)
})
Expand Down
4 changes: 3 additions & 1 deletion multicluster/test/e2e/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (data *MCTestData) verifyMCServiceACNP(t *testing.T, clientPodName, westIP
func (data *MCTestData) deployServiceExport(clusterName string) error {
var rc int
var err error
rc, _, _, err = provider.RunCommandOnNode(clusterName, fmt.Sprintf("sudo kubectl apply -f %s", serviceExportYML))
rc, _, _, err = provider.RunCommandOnNode(clusterName, fmt.Sprintf("kubectl apply -f %s", serviceExportYML))
if err != nil || rc != 0 {
return fmt.Errorf("error when deploying the ServiceExport: %v", err)
}
Expand Down Expand Up @@ -191,6 +191,7 @@ func (data *MCTestData) annotateGatewayNode(clusterName string, nodeName string)
var rc int
var err error
var stderr string
log.Infof("adding annotation for Node %s in cluster %s", nodeName, clusterName)
rc, _, stderr, err = provider.RunCommandOnNode(clusterName, fmt.Sprintf("kubectl annotate node %s multicluster.antrea.io/gateway=true", nodeName))
if err != nil || rc != 0 {
return fmt.Errorf("error when annotate the Node %s: %s, %v", nodeName, stderr, err)
Expand All @@ -201,6 +202,7 @@ func (data *MCTestData) annotateGatewayNode(clusterName string, nodeName string)
func (data *MCTestData) deleteAnnotation(clusterName string, nodeName string) error {
var rc int
var err error
log.Infof("cleaning up annotation for Node %s in cluster %s", nodeName, clusterName)
rc, _, _, err = provider.RunCommandOnNode(clusterName, fmt.Sprintf("kubectl annotate node %s multicluster.antrea.io/gateway-", nodeName))
if err != nil || rc != 0 {
return fmt.Errorf("error when cleaning up annotation of the Node: %v", err)
Expand Down
42 changes: 3 additions & 39 deletions multicluster/test/integration/serviceexport_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ import (

var _ = Describe("ServiceExport controller", func() {
svcSpec := corev1.ServiceSpec{
Ports: svcPorts,
ClusterIP: "10.96.11.10",
Ports: svcPorts,
}

svc := &corev1.Service{
Expand All @@ -53,7 +54,6 @@ var _ = Describe("ServiceExport controller", func() {
Namespace: svc.Namespace,
Name: svc.Name,
}
epNamespacedName := svcNamespacedName

ep := &corev1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -108,7 +108,7 @@ var _ = Describe("ServiceExport controller", func() {
{
Addresses: []corev1.EndpointAddress{
{
IP: "192.168.17.11",
IP: "10.96.11.10",
},
},
Ports: epPorts,
Expand Down Expand Up @@ -180,42 +180,6 @@ var _ = Describe("ServiceExport controller", func() {
Expect(*conditions[0].Message).Should(Equal("the Service does not exist"))
})

It("Should update existing ResourceExport when corresponding Endpoints has new Endpoint", func() {
By("By update an Endpoint with a new address")
latestEp := &corev1.Endpoints{}
Expect(k8sClient.Get(ctx, epNamespacedName, latestEp)).Should(Succeed())
addresses := latestEp.Subsets[0].Addresses
addresses = append(addresses, addr3)
latestEp.Subsets[0].Addresses = addresses
Expect(k8sClient.Update(ctx, latestEp)).Should(Succeed())
time.Sleep(2 * time.Second)
epResExport := &mcsv1alpha1.ResourceExport{}
expectedEpResExport.Spec.Endpoints = &mcsv1alpha1.EndpointsExport{
Subsets: []corev1.EndpointSubset{
{
Addresses: []corev1.EndpointAddress{
{
IP: "192.168.17.11",
},
{
IP: "192.168.17.13",
},
},
Ports: epPorts,
},
},
}

var err error
Eventually(func() bool {
err = k8sClient.Get(ctx, types.NamespacedName{Namespace: LeaderNamespace, Name: epResExportName}, epResExport)
return err == nil
}, timeout, interval).Should(BeTrue())
Expect(epResExport.ObjectMeta.Labels["sourceKind"]).Should(Equal("Endpoints"))
Expect(epResExport.Spec).Should(Equal(expectedEpResExport.Spec))

})

It("Should delete existing ResourceExport when existing ServiceExport is deleted", func() {
By("By remove a ServiceExport resource")
err := k8sClient.Delete(ctx, svcExport)
Expand Down

0 comments on commit be8750e

Please sign in to comment.