diff --git a/build/charts/antrea/templates/agent/clusterrole.yaml b/build/charts/antrea/templates/agent/clusterrole.yaml index ef7e7ea8da5..a82a865e2ec 100644 --- a/build/charts/antrea/templates/agent/clusterrole.yaml +++ b/build/charts/antrea/templates/agent/clusterrole.yaml @@ -42,13 +42,7 @@ rules: verbs: - get - watch - - list - - apiGroups: - - "" - resources: - - services/status - verbs: - - update + - list - apiGroups: - discovery.k8s.io resources: diff --git a/build/charts/antrea/templates/antctl/clusterrole.yaml b/build/charts/antrea/templates/antctl/clusterrole.yaml index 2b60e193731..4a6a7420b64 100644 --- a/build/charts/antrea/templates/antctl/clusterrole.yaml +++ b/build/charts/antrea/templates/antctl/clusterrole.yaml @@ -53,5 +53,6 @@ rules: - /ovstracing - /podinterfaces - /featuregates + - /serviceexternalip verbs: - get diff --git a/build/yamls/antrea-aks.yml b/build/yamls/antrea-aks.yml index e084345b48b..d9f6ed3fe5a 100644 --- a/build/yamls/antrea-aks.yml +++ b/build/yamls/antrea-aks.yml @@ -2538,13 +2538,7 @@ rules: verbs: - get - watch - - list - - apiGroups: - - "" - resources: - - services/status - verbs: - - update + - list - apiGroups: - discovery.k8s.io resources: @@ -2738,6 +2732,7 @@ rules: - /ovstracing - /podinterfaces - /featuregates + - /serviceexternalip verbs: - get --- diff --git a/build/yamls/antrea-eks.yml b/build/yamls/antrea-eks.yml index eecf6726355..25160d38bcc 100644 --- a/build/yamls/antrea-eks.yml +++ b/build/yamls/antrea-eks.yml @@ -2538,13 +2538,7 @@ rules: verbs: - get - watch - - list - - apiGroups: - - "" - resources: - - services/status - verbs: - - update + - list - apiGroups: - discovery.k8s.io resources: @@ -2738,6 +2732,7 @@ rules: - /ovstracing - /podinterfaces - /featuregates + - /serviceexternalip verbs: - get --- diff --git a/build/yamls/antrea-gke.yml b/build/yamls/antrea-gke.yml index f2a7182d03d..3ebd309d9ab 100644 --- a/build/yamls/antrea-gke.yml +++ b/build/yamls/antrea-gke.yml @@ -2538,13 +2538,7 @@ rules: verbs: - get - watch - - list - - apiGroups: - - "" - resources: - - services/status - verbs: - - update + - list - apiGroups: - discovery.k8s.io resources: @@ -2738,6 +2732,7 @@ rules: - /ovstracing - /podinterfaces - /featuregates + - /serviceexternalip verbs: - get --- diff --git a/build/yamls/antrea-ipsec.yml b/build/yamls/antrea-ipsec.yml index 3202ea4b687..f63d80cbf2f 100644 --- a/build/yamls/antrea-ipsec.yml +++ b/build/yamls/antrea-ipsec.yml @@ -2551,13 +2551,7 @@ rules: verbs: - get - watch - - list - - apiGroups: - - "" - resources: - - services/status - verbs: - - update + - list - apiGroups: - discovery.k8s.io resources: @@ -2751,6 +2745,7 @@ rules: - /ovstracing - /podinterfaces - /featuregates + - /serviceexternalip verbs: - get --- diff --git a/build/yamls/antrea.yml b/build/yamls/antrea.yml index 85f0de4bae3..5f7f1df6dd4 100644 --- a/build/yamls/antrea.yml +++ b/build/yamls/antrea.yml @@ -2538,13 +2538,7 @@ rules: verbs: - get - watch - - list - - apiGroups: - - "" - resources: - - services/status - verbs: - - update + - list - apiGroups: - discovery.k8s.io resources: @@ -2738,6 +2732,7 @@ rules: - /ovstracing - /podinterfaces - /featuregates + - /serviceexternalip verbs: - get --- diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index 0d3a8f8e797..68277f3dba2 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -610,6 +610,7 @@ func run(o *Options) error { apiServer, err := apiserver.New( agentQuerier, networkPolicyController, + externalIPController, o.config.APIPort, *o.config.EnablePrometheusMetrics, o.config.ClientConnection.Kubeconfig, diff --git a/pkg/agent/apiserver/apiserver.go b/pkg/agent/apiserver/apiserver.go index 763df749281..0862aeae431 100644 --- a/pkg/agent/apiserver/apiserver.go +++ b/pkg/agent/apiserver/apiserver.go @@ -40,6 +40,7 @@ import ( "antrea.io/antrea/pkg/agent/apiserver/handlers/ovsflows" "antrea.io/antrea/pkg/agent/apiserver/handlers/ovstracing" "antrea.io/antrea/pkg/agent/apiserver/handlers/podinterface" + "antrea.io/antrea/pkg/agent/apiserver/handlers/serviceexternalip" agentquerier "antrea.io/antrea/pkg/agent/querier" systeminstall "antrea.io/antrea/pkg/apis/system/install" systemv1beta1 "antrea.io/antrea/pkg/apis/system/v1beta1" @@ -72,7 +73,7 @@ func (s *agentAPIServer) Run(stopCh <-chan struct{}) error { return s.GenericAPIServer.PrepareRun().Run(stopCh) } -func installHandlers(aq agentquerier.AgentQuerier, npq querier.AgentNetworkPolicyInfoQuerier, s *genericapiserver.GenericAPIServer) { +func installHandlers(aq agentquerier.AgentQuerier, npq querier.AgentNetworkPolicyInfoQuerier, seipq querier.ServiceExternalIPStatusQuerier, s *genericapiserver.GenericAPIServer) { s.Handler.NonGoRestfulMux.HandleFunc("/loglevel", loglevel.HandleFunc()) s.Handler.NonGoRestfulMux.HandleFunc("/featuregates", featuregates.HandleFunc()) s.Handler.NonGoRestfulMux.HandleFunc("/agentinfo", agentinfo.HandleFunc(aq)) @@ -82,6 +83,7 @@ func installHandlers(aq agentquerier.AgentQuerier, npq querier.AgentNetworkPolic s.Handler.NonGoRestfulMux.HandleFunc("/addressgroups", addressgroup.HandleFunc(npq)) s.Handler.NonGoRestfulMux.HandleFunc("/ovsflows", ovsflows.HandleFunc(aq)) s.Handler.NonGoRestfulMux.HandleFunc("/ovstracing", ovstracing.HandleFunc(aq)) + s.Handler.NonGoRestfulMux.HandleFunc("/serviceexternalip", serviceexternalip.HandleFunc(seipq)) } func installAPIGroup(s *genericapiserver.GenericAPIServer, aq agentquerier.AgentQuerier, npq querier.AgentNetworkPolicyInfoQuerier, v4Enabled, v6Enabled bool) error { @@ -95,8 +97,8 @@ func installAPIGroup(s *genericapiserver.GenericAPIServer, aq agentquerier.Agent } // New creates an APIServer for running in antrea agent. -func New(aq agentquerier.AgentQuerier, npq querier.AgentNetworkPolicyInfoQuerier, bindPort int, - enableMetrics bool, kubeconfig string, cipherSuites []uint16, tlsMinVersion uint16, v4Enabled, v6Enabled bool) (*agentAPIServer, error) { +func New(aq agentquerier.AgentQuerier, npq querier.AgentNetworkPolicyInfoQuerier, seipq querier.ServiceExternalIPStatusQuerier, + bindPort int, enableMetrics bool, kubeconfig string, cipherSuites []uint16, tlsMinVersion uint16, v4Enabled, v6Enabled bool) (*agentAPIServer, error) { cfg, err := newConfig(npq, bindPort, enableMetrics, kubeconfig) if err != nil { return nil, err @@ -110,7 +112,7 @@ func New(aq agentquerier.AgentQuerier, npq querier.AgentNetworkPolicyInfoQuerier if err := installAPIGroup(s, aq, npq, v4Enabled, v6Enabled); err != nil { return nil, err } - installHandlers(aq, npq, s) + installHandlers(aq, npq, seipq, s) return &agentAPIServer{GenericAPIServer: s}, nil } diff --git a/pkg/agent/apiserver/handlers/serviceexternalip/handler.go b/pkg/agent/apiserver/handlers/serviceexternalip/handler.go new file mode 100644 index 00000000000..c397f24f505 --- /dev/null +++ b/pkg/agent/apiserver/handlers/serviceexternalip/handler.go @@ -0,0 +1,70 @@ +// Copyright 2022 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package serviceexternalip + +import ( + "encoding/json" + "net/http" + + "antrea.io/antrea/pkg/antctl/transform/common" + "antrea.io/antrea/pkg/features" + "antrea.io/antrea/pkg/querier" +) + +// HandleFunc creates a http.HandlerFunc which uses an ServiceExternalIPStatusQuerier +// to query Service external IP status. +func HandleFunc(sq querier.ServiceExternalIPStatusQuerier) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + name := r.URL.Query().Get("name") + ns := r.URL.Query().Get("namespace") + if !features.DefaultFeatureGate.Enabled(features.ServiceExternalIP) { + http.Error(w, "ServiceExternalIP is not enabled", http.StatusServiceUnavailable) + return + } + result := sq.GetServiceExternalIPStatus() + var response []Response + for _, r := range result { + if (len(name) == 0 || name == r.ServiceName) && (len(ns) == 0 || ns == r.Namespace) { + response = append(response, Response{r}) + } + } + if len(name) > 0 && len(response) == 0 { + w.WriteHeader(http.StatusNotFound) + return + } + if err := json.NewEncoder(w).Encode(response); err != nil { + http.Error(w, "Failed to encode response: "+err.Error(), http.StatusInternalServerError) + } + } +} + +// Response describes the response struct of serviceexternalip command. +type Response struct { + querier.ServiceExternalIPInfo +} + +var _ common.TableOutput = (*Response)(nil) + +func (r Response) GetTableHeader() []string { + return []string{"NAMESPACE", "NAME", "EXTERNAL-IP-POOL", "EXTERNAL-IP", "ASSIGNED-NODE"} +} + +func (r Response) GetTableRow(_ int) []string { + return []string{r.Namespace, r.ServiceName, r.ExternalIPPool, r.ExternalIP, r.AssignedNode} +} + +func (r Response) SortRows() bool { + return true +} diff --git a/pkg/agent/controller/serviceexternalip/controller.go b/pkg/agent/controller/serviceexternalip/controller.go index e458b5e98c0..d6c1b9d0d84 100644 --- a/pkg/agent/controller/serviceexternalip/controller.go +++ b/pkg/agent/controller/serviceexternalip/controller.go @@ -15,14 +15,12 @@ package serviceexternalip import ( - "context" "fmt" "sync" "time" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" apimachinerytypes "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" @@ -36,6 +34,7 @@ import ( "antrea.io/antrea/pkg/agent/ipassigner" "antrea.io/antrea/pkg/agent/memberlist" "antrea.io/antrea/pkg/agent/types" + "antrea.io/antrea/pkg/querier" ) const ( @@ -54,6 +53,7 @@ const ( type externalIPState struct { ip string + ipPool string assignedNode string } @@ -81,6 +81,8 @@ type ServiceExternalIPController struct { assignedIPsMutex sync.Mutex } +var _ querier.ServiceExternalIPStatusQuerier = (*ServiceExternalIPController)(nil) + func NewServiceExternalIPController( nodeName string, nodeTransportInterface string, @@ -302,21 +304,21 @@ func (c *ServiceExternalIPController) getServiceState(service *corev1.Service) ( return state, exist } -func (c *ServiceExternalIPController) saveServiceState(service *corev1.Service, state externalIPState) { +func (c *ServiceExternalIPController) saveServiceState(service *corev1.Service, state *externalIPState) { c.externalIPStatesMutex.Lock() defer c.externalIPStatesMutex.Unlock() name := apimachinerytypes.NamespacedName{ Namespace: service.Namespace, Name: service.Name, } - c.externalIPStates[name] = state + c.externalIPStates[name] = *state } -func (c *ServiceExternalIPController) getServiceExternalIPAndHostname(service *corev1.Service) (string, string) { +func (c *ServiceExternalIPController) getServiceExternalIP(service *corev1.Service) string { if len(service.Status.LoadBalancer.Ingress) == 0 { - return "", "" + return "" } - return service.Status.LoadBalancer.Ingress[0].IP, service.Status.LoadBalancer.Ingress[0].Hostname + return service.Status.LoadBalancer.Ingress[0].IP } func (c *ServiceExternalIPController) syncService(key apimachinerytypes.NamespacedName) error { @@ -337,9 +339,9 @@ func (c *ServiceExternalIPController) syncService(key apimachinerytypes.Namespac return c.deleteService(key) } - state, exist := c.getServiceState(service) - currentExternalIP, currentHostname := c.getServiceExternalIPAndHostname(service) - if exist && state.ip != currentExternalIP { + prevState, exist := c.getServiceState(service) + currentExternalIP := c.getServiceExternalIP(service) + if exist && prevState.ip != currentExternalIP { // External IP of the Service has changed. Delete the previous assigned IP if exists. if err := c.deleteService(key); err != nil { return err @@ -347,58 +349,41 @@ func (c *ServiceExternalIPController) syncService(key apimachinerytypes.Namespac } ipPool := service.ObjectMeta.Annotations[types.ServiceExternalIPPoolAnnotationKey] + state := &externalIPState{ + ip: currentExternalIP, + ipPool: ipPool, + } + defer c.saveServiceState(service, state) + if currentExternalIP == "" || ipPool == "" { return nil } - selectNode := true var filters []func(string) bool if service.Spec.ExternalTrafficPolicy == corev1.ServiceExternalTrafficPolicyTypeLocal { nodes, err := c.nodesHasHealthyServiceEndpoint(service) if err != nil { return err } - // Avoid unnecessary migration caused by Endpoints changes. - if currentHostname != "" && c.cluster.AliveNodes().Has(currentHostname) && nodes.Has(currentHostname) { - selectNode = false - state = externalIPState{ - ip: currentExternalIP, - assignedNode: currentHostname, - } - c.saveServiceState(service, state) - } else { - filters = append(filters, func(s string) bool { - return nodes.Has(s) - }) - } + filters = append(filters, func(s string) bool { + return nodes.Has(s) + }) } - if selectNode { - nodeName, err := c.cluster.SelectNodeForIP(currentExternalIP, ipPool, filters...) - if err != nil { - if err == memberlist.ErrNoNodeAvailable { - // No Node is available for the moment. The Service will be requeued by Endpoints, Node, or Memberlist update events. - klog.InfoS("No Node available", "ip", currentExternalIP, "ipPool", ipPool) - return nil - } - return err - } - klog.InfoS("Select Node for IP", "service", key, "nodeName", nodeName, "currentExternalIP", currentExternalIP, "ipPool", ipPool) - state = externalIPState{ - ip: currentExternalIP, - assignedNode: nodeName, + nodeName, err := c.cluster.SelectNodeForIP(currentExternalIP, ipPool, filters...) + if err != nil { + if err == memberlist.ErrNoNodeAvailable { + // No Node is available at the moment. The Service will be requeued by Endpoints, Node, or Memberlist update events. + klog.InfoS("No Node available", "ip", currentExternalIP, "ipPool", ipPool) + return nil } - c.saveServiceState(service, state) + return err } - // Update the hostname field of Service status and assign the external IP if this Node is selected. + klog.InfoS("Select Node for IP", "service", key, "nodeName", nodeName, "currentExternalIP", currentExternalIP, "ipPool", ipPool) + + state.assignedNode = nodeName + if state.assignedNode == c.nodeName { - if service.Status.LoadBalancer.Ingress[0].Hostname != state.assignedNode { - serviceToUpdate := service.DeepCopy() - serviceToUpdate.Status.LoadBalancer.Ingress[0].Hostname = state.assignedNode - if _, err = c.client.CoreV1().Services(serviceToUpdate.Namespace).UpdateStatus(context.TODO(), serviceToUpdate, v1.UpdateOptions{}); err != nil { - return err - } - } return c.assignIP(currentExternalIP, key) } return c.unassignIP(currentExternalIP, key) @@ -453,3 +438,19 @@ func (c *ServiceExternalIPController) nodesHasHealthyServiceEndpoint(service *co } return nodes, nil } + +func (c *ServiceExternalIPController) GetServiceExternalIPStatus() []querier.ServiceExternalIPInfo { + c.externalIPStatesMutex.RLock() + defer c.externalIPStatesMutex.RUnlock() + info := make([]querier.ServiceExternalIPInfo, 0, len(c.externalIPStates)) + for k, v := range c.externalIPStates { + info = append(info, querier.ServiceExternalIPInfo{ + ServiceName: k.Name, + Namespace: k.Namespace, + ExternalIP: v.ip, + ExternalIPPool: v.ipPool, + AssignedNode: v.assignedNode, + }) + } + return info +} diff --git a/pkg/agent/controller/serviceexternalip/controller_test.go b/pkg/agent/controller/serviceexternalip/controller_test.go index 5c9ff627d55..ea24b84fca2 100644 --- a/pkg/agent/controller/serviceexternalip/controller_test.go +++ b/pkg/agent/controller/serviceexternalip/controller_test.go @@ -33,6 +33,7 @@ import ( ipassignertest "antrea.io/antrea/pkg/agent/ipassigner/testing" "antrea.io/antrea/pkg/agent/memberlist" "antrea.io/antrea/pkg/agent/types" + "antrea.io/antrea/pkg/querier" ) const ( @@ -228,6 +229,7 @@ func TestCreateService(t *testing.T) { expectedExternalIPStates: map[apimachinerytypes.NamespacedName]externalIPState{ keyFor(servicePolicyCluster): { ip: fakeServiceExternalIP1, + ipPool: fakeExternalIPPoolName, assignedNode: fakeNode1, }, }, @@ -244,6 +246,7 @@ func TestCreateService(t *testing.T) { expectedExternalIPStates: map[apimachinerytypes.NamespacedName]externalIPState{ keyFor(servicePolicyCluster): { ip: fakeServiceExternalIP1, + ipPool: fakeExternalIPPoolName, assignedNode: fakeNode2, }, }, @@ -267,6 +270,7 @@ func TestCreateService(t *testing.T) { expectedExternalIPStates: map[apimachinerytypes.NamespacedName]externalIPState{ keyFor(servicePolicyLocal): { ip: fakeServiceExternalIP1, + ipPool: fakeExternalIPPoolName, assignedNode: fakeNode1, }, }, @@ -294,6 +298,7 @@ func TestCreateService(t *testing.T) { expectedExternalIPStates: map[apimachinerytypes.NamespacedName]externalIPState{ keyFor(servicePolicyLocal): { ip: fakeServiceExternalIP1, + ipPool: fakeExternalIPPoolName, assignedNode: fakeNode2, }, }, @@ -317,6 +322,7 @@ func TestCreateService(t *testing.T) { expectedExternalIPStates: map[apimachinerytypes.NamespacedName]externalIPState{ keyFor(servicePolicyLocal): { ip: fakeServiceExternalIP1, + ipPool: fakeExternalIPPoolName, assignedNode: fakeNode2, }, }, @@ -333,17 +339,22 @@ func TestCreateService(t *testing.T) { "2.3.4.6": fakeNode2, }), }, - serviceToCreate: servicePolicyLocal, - healthyNodes: []string{fakeNode1, fakeNode2}, - expectedCalls: func(mockIPAssigner *ipassignertest.MockIPAssigner) {}, - expectedExternalIPStates: map[apimachinerytypes.NamespacedName]externalIPState{}, - expectError: true, + serviceToCreate: servicePolicyLocal, + healthyNodes: []string{fakeNode1, fakeNode2}, + expectedCalls: func(mockIPAssigner *ipassignertest.MockIPAssigner) {}, + expectedExternalIPStates: map[apimachinerytypes.NamespacedName]externalIPState{ + keyFor(servicePolicyLocal): { + ip: fakeServiceExternalIP1, + ipPool: fakeExternalIPPoolName, + }, + }, + expectError: true, }, { name: "new Service created and local Node selected and IP already assigned by other Service", existingEndpoints: nil, previousExternalIPStates: map[apimachinerytypes.NamespacedName]externalIPState{ - keyFor(servicePolicyLocal): {fakeServiceExternalIP1, fakeNode1}, + keyFor(servicePolicyLocal): {fakeServiceExternalIP1, fakeExternalIPPoolName, fakeNode1}, }, serviceToCreate: servicePolicyCluster, healthyNodes: []string{fakeNode1, fakeNode2}, @@ -351,10 +362,12 @@ func TestCreateService(t *testing.T) { expectedExternalIPStates: map[apimachinerytypes.NamespacedName]externalIPState{ keyFor(servicePolicyLocal): { ip: fakeServiceExternalIP1, + ipPool: fakeExternalIPPoolName, assignedNode: fakeNode1, }, keyFor(servicePolicyCluster): { ip: fakeServiceExternalIP1, + ipPool: fakeExternalIPPoolName, assignedNode: fakeNode1, }, }, @@ -405,12 +418,6 @@ func TestUpdateService(t *testing.T) { serviceExternalTrafficPolicyClusterWithSameExternalIP.Name = "svc-same-eip" serviceExternalTrafficPolicyClusterWithSameExternalIP.Status.LoadBalancer.Ingress[0].IP = fakeServiceExternalIP2 - serviceExternalTrafficLocalWithNodeSelected := servicePolicyLocal.DeepCopy() - serviceExternalTrafficLocalWithNodeSelected.Status.LoadBalancer.Ingress[0].Hostname = fakeNode1 - - serviceExternalTrafficLocalUpdatedHostname := servicePolicyLocal.DeepCopy() - serviceExternalTrafficLocalUpdatedHostname.Status.LoadBalancer.Ingress[0].Hostname = fakeNode2 - serviceChangedType := servicePolicyCluster.DeepCopy() serviceChangedType.Spec.Type = corev1.ServiceTypeClusterIP @@ -436,10 +443,10 @@ func TestUpdateService(t *testing.T) { endpoints: nil, serviceToUpdate: serviceExternalTrafficPolicyClusterUpdatedExternalIP, previousExternalIPStates: map[apimachinerytypes.NamespacedName]externalIPState{ - keyFor(serviceExternalTrafficPolicyClusterUpdatedExternalIP): {fakeServiceExternalIP1, fakeNode1}, + keyFor(serviceExternalTrafficPolicyClusterUpdatedExternalIP): {fakeServiceExternalIP1, fakeExternalIPPoolName, fakeNode1}, }, expectedExternalIPStates: map[apimachinerytypes.NamespacedName]externalIPState{ - keyFor(serviceExternalTrafficPolicyClusterUpdatedExternalIP): {fakeServiceExternalIP2, fakeNode1}, + keyFor(serviceExternalTrafficPolicyClusterUpdatedExternalIP): {fakeServiceExternalIP2, fakeExternalIPPoolName, fakeNode1}, }, healthyNodes: []string{fakeNode1, fakeNode2}, expectedCalls: func(mockIPAssigner *ipassignertest.MockIPAssigner) { @@ -453,12 +460,12 @@ func TestUpdateService(t *testing.T) { endpoints: nil, serviceToUpdate: serviceExternalTrafficPolicyClusterUpdatedExternalIP, previousExternalIPStates: map[apimachinerytypes.NamespacedName]externalIPState{ - keyFor(serviceExternalTrafficPolicyClusterWithSameExternalIP): {fakeServiceExternalIP1, fakeNode1}, - keyFor(serviceExternalTrafficPolicyClusterUpdatedExternalIP): {fakeServiceExternalIP1, fakeNode1}, + keyFor(serviceExternalTrafficPolicyClusterWithSameExternalIP): {fakeServiceExternalIP1, fakeExternalIPPoolName, fakeNode1}, + keyFor(serviceExternalTrafficPolicyClusterUpdatedExternalIP): {fakeServiceExternalIP1, fakeExternalIPPoolName, fakeNode1}, }, expectedExternalIPStates: map[apimachinerytypes.NamespacedName]externalIPState{ - keyFor(serviceExternalTrafficPolicyClusterWithSameExternalIP): {fakeServiceExternalIP1, fakeNode1}, - keyFor(serviceExternalTrafficPolicyClusterUpdatedExternalIP): {fakeServiceExternalIP2, fakeNode1}, + keyFor(serviceExternalTrafficPolicyClusterWithSameExternalIP): {fakeServiceExternalIP1, fakeExternalIPPoolName, fakeNode1}, + keyFor(serviceExternalTrafficPolicyClusterUpdatedExternalIP): {fakeServiceExternalIP2, fakeExternalIPPoolName, fakeNode1}, }, healthyNodes: []string{fakeNode1, fakeNode2}, expectedCalls: func(mockIPAssigner *ipassignertest.MockIPAssigner) { @@ -471,10 +478,10 @@ func TestUpdateService(t *testing.T) { endpoints: nil, serviceToUpdate: serviceExternalTrafficPolicyClusterUpdatedExternalIP, previousExternalIPStates: map[apimachinerytypes.NamespacedName]externalIPState{ - keyFor(serviceExternalTrafficPolicyClusterUpdatedExternalIP): {fakeServiceExternalIP1, fakeNode1}, + keyFor(serviceExternalTrafficPolicyClusterUpdatedExternalIP): {fakeServiceExternalIP1, fakeExternalIPPoolName, fakeNode1}, }, expectedExternalIPStates: map[apimachinerytypes.NamespacedName]externalIPState{ - keyFor(serviceExternalTrafficPolicyClusterUpdatedExternalIP): {fakeServiceExternalIP2, fakeNode2}, + keyFor(serviceExternalTrafficPolicyClusterUpdatedExternalIP): {fakeServiceExternalIP2, fakeExternalIPPoolName, fakeNode2}, }, healthyNodes: []string{fakeNode1, fakeNode2}, overrideHashFn: fakeHashFn(true), @@ -488,7 +495,7 @@ func TestUpdateService(t *testing.T) { endpoints: nil, serviceToUpdate: serviceChangedType, previousExternalIPStates: map[apimachinerytypes.NamespacedName]externalIPState{ - keyFor(serviceExternalTrafficPolicyClusterUpdatedExternalIP): {fakeServiceExternalIP1, fakeNode1}, + keyFor(serviceExternalTrafficPolicyClusterUpdatedExternalIP): {fakeServiceExternalIP1, fakeExternalIPPoolName, fakeNode1}, }, expectedExternalIPStates: map[apimachinerytypes.NamespacedName]externalIPState{}, healthyNodes: []string{fakeNode1, fakeNode2}, @@ -502,7 +509,7 @@ func TestUpdateService(t *testing.T) { endpoints: nil, serviceToUpdate: serviceChangedType, previousExternalIPStates: map[apimachinerytypes.NamespacedName]externalIPState{ - keyFor(serviceExternalTrafficPolicyClusterUpdatedExternalIP): {fakeServiceExternalIP1, fakeNode1}, + keyFor(serviceExternalTrafficPolicyClusterUpdatedExternalIP): {fakeServiceExternalIP1, fakeExternalIPPoolName, fakeNode1}, }, expectedExternalIPStates: map[apimachinerytypes.NamespacedName]externalIPState{}, healthyNodes: []string{fakeNode1, fakeNode2}, @@ -523,10 +530,10 @@ func TestUpdateService(t *testing.T) { }, serviceToUpdate: serviceChangedExternalTrafficPolicy, previousExternalIPStates: map[apimachinerytypes.NamespacedName]externalIPState{ - keyFor(serviceChangedExternalTrafficPolicy): {fakeServiceExternalIP1, fakeNode1}, + keyFor(serviceChangedExternalTrafficPolicy): {fakeServiceExternalIP1, fakeExternalIPPoolName, fakeNode1}, }, expectedExternalIPStates: map[apimachinerytypes.NamespacedName]externalIPState{ - keyFor(serviceChangedExternalTrafficPolicy): {fakeServiceExternalIP1, fakeNode2}, + keyFor(serviceChangedExternalTrafficPolicy): {fakeServiceExternalIP1, fakeExternalIPPoolName, fakeNode2}, }, healthyNodes: []string{fakeNode1, fakeNode2}, expectedCalls: func(mockIPAssigner *ipassignertest.MockIPAssigner) { @@ -539,10 +546,10 @@ func TestUpdateService(t *testing.T) { endpoints: nil, serviceToUpdate: servicePolicyCluster, previousExternalIPStates: map[apimachinerytypes.NamespacedName]externalIPState{ - keyFor(servicePolicyCluster): {fakeServiceExternalIP1, fakeNode1}, + keyFor(servicePolicyCluster): {fakeServiceExternalIP1, fakeExternalIPPoolName, fakeNode1}, }, expectedExternalIPStates: map[apimachinerytypes.NamespacedName]externalIPState{ - keyFor(servicePolicyCluster): {fakeServiceExternalIP1, fakeNode2}, + keyFor(servicePolicyCluster): {fakeServiceExternalIP1, fakeExternalIPPoolName, fakeNode2}, }, healthyNodes: []string{fakeNode1, fakeNode2}, overrideHashFn: fakeHashFn(true), @@ -561,59 +568,12 @@ func TestUpdateService(t *testing.T) { "2.3.4.5": fakeNode1, }), }, - serviceToUpdate: serviceExternalTrafficLocalWithNodeSelected, - previousExternalIPStates: map[apimachinerytypes.NamespacedName]externalIPState{ - keyFor(servicePolicyLocal): {fakeServiceExternalIP1, fakeNode1}, - }, - expectedExternalIPStates: map[apimachinerytypes.NamespacedName]externalIPState{ - keyFor(servicePolicyLocal): {fakeServiceExternalIP1, fakeNode2}, - }, - healthyNodes: []string{fakeNode1, fakeNode2}, - expectedCalls: func(mockIPAssigner *ipassignertest.MockIPAssigner) { - mockIPAssigner.EXPECT().UnassignIP(fakeServiceExternalIP1) - }, - expectError: false, - }, - { - name: "should not migrate to other Nodes if selected Node still have healthy endpoints", - endpoints: []*corev1.Endpoints{ - makeEndpoints(servicePolicyLocal.Name, servicePolicyLocal.Namespace, - map[string]string{ - "2.3.4.5": fakeNode1, - "2.3.4.6": fakeNode2, - }, - nil, - ), - }, - overrideHashFn: fakeHashFn(true), - serviceToUpdate: serviceExternalTrafficLocalWithNodeSelected, - previousExternalIPStates: map[apimachinerytypes.NamespacedName]externalIPState{ - keyFor(servicePolicyLocal): {fakeServiceExternalIP1, fakeNode1}, - }, - expectedExternalIPStates: map[apimachinerytypes.NamespacedName]externalIPState{ - keyFor(servicePolicyLocal): {fakeServiceExternalIP1, fakeNode1}, - }, - healthyNodes: []string{fakeNode1, fakeNode2}, - expectedCalls: func(mockIPAssigner *ipassignertest.MockIPAssigner) {}, - expectError: false, - }, - { - name: "other Node could promote itself as the new owner", - endpoints: []*corev1.Endpoints{ - makeEndpoints(servicePolicyLocal.Name, servicePolicyLocal.Namespace, - map[string]string{ - "2.3.4.5": fakeNode1, - "2.3.4.6": fakeNode2, - }, - nil, - ), - }, - serviceToUpdate: serviceExternalTrafficLocalUpdatedHostname, + serviceToUpdate: servicePolicyLocal, previousExternalIPStates: map[apimachinerytypes.NamespacedName]externalIPState{ - keyFor(servicePolicyLocal): {fakeServiceExternalIP1, fakeNode1}, + keyFor(servicePolicyLocal): {fakeServiceExternalIP1, fakeExternalIPPoolName, fakeNode1}, }, expectedExternalIPStates: map[apimachinerytypes.NamespacedName]externalIPState{ - keyFor(servicePolicyLocal): {fakeServiceExternalIP1, fakeNode2}, + keyFor(servicePolicyLocal): {fakeServiceExternalIP1, fakeExternalIPPoolName, fakeNode2}, }, healthyNodes: []string{fakeNode1, fakeNode2}, expectedCalls: func(mockIPAssigner *ipassignertest.MockIPAssigner) { @@ -621,29 +581,6 @@ func TestUpdateService(t *testing.T) { }, expectError: false, }, - { - name: "agent restarts and should not select new Node if current selected Node still healthy and have healthy endpoints", - endpoints: []*corev1.Endpoints{ - makeEndpoints(servicePolicyLocal.Name, servicePolicyLocal.Namespace, - map[string]string{ - "2.3.4.5": fakeNode1, - "2.3.4.6": fakeNode2, - }, - nil, - ), - }, - overrideHashFn: fakeHashFn(true), - serviceToUpdate: serviceExternalTrafficLocalWithNodeSelected, - previousExternalIPStates: map[apimachinerytypes.NamespacedName]externalIPState{}, - expectedExternalIPStates: map[apimachinerytypes.NamespacedName]externalIPState{ - keyFor(servicePolicyLocal): {fakeServiceExternalIP1, fakeNode1}, - }, - healthyNodes: []string{fakeNode1, fakeNode2}, - expectedCalls: func(mockIPAssigner *ipassignertest.MockIPAssigner) { - mockIPAssigner.EXPECT().AssignIP(fakeServiceExternalIP1) - }, - expectError: false, - }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -763,3 +700,66 @@ func TestServiceExternalIPController_nodesHasHealthyServiceEndpoint(t *testing.T }) } } + +func TestServiceExternalIPController_GetServiceExternalIPStatus(t *testing.T) { + tests := []struct { + name string + externalIPStates map[apimachinerytypes.NamespacedName]externalIPState + expectedServiceExternalIPInfo []querier.ServiceExternalIPInfo + }{ + { + name: "no Service available should return empty slice", + externalIPStates: map[apimachinerytypes.NamespacedName]externalIPState{}, + expectedServiceExternalIPInfo: []querier.ServiceExternalIPInfo{}, + }, + { + name: "one Service processed", + externalIPStates: map[apimachinerytypes.NamespacedName]externalIPState{ + keyFor(servicePolicyCluster): {fakeServiceExternalIP1, fakeExternalIPPoolName, fakeNode1}, + }, + expectedServiceExternalIPInfo: []querier.ServiceExternalIPInfo{ + { + + ServiceName: servicePolicyCluster.Name, + Namespace: servicePolicyCluster.Namespace, + ExternalIP: fakeServiceExternalIP1, + ExternalIPPool: fakeExternalIPPoolName, + AssignedNode: fakeNode1, + }, + }, + }, + { + name: "two Services processed", + externalIPStates: map[apimachinerytypes.NamespacedName]externalIPState{ + keyFor(servicePolicyCluster): {fakeServiceExternalIP1, fakeExternalIPPoolName, fakeNode1}, + keyFor(servicePolicyLocal): {fakeServiceExternalIP2, fakeExternalIPPoolName, fakeNode2}, + }, + expectedServiceExternalIPInfo: []querier.ServiceExternalIPInfo{ + { + + ServiceName: servicePolicyCluster.Name, + Namespace: servicePolicyCluster.Namespace, + ExternalIP: fakeServiceExternalIP1, + ExternalIPPool: fakeExternalIPPoolName, + AssignedNode: fakeNode1, + }, + { + + ServiceName: servicePolicyLocal.Name, + Namespace: servicePolicyLocal.Namespace, + ExternalIP: fakeServiceExternalIP2, + ExternalIPPool: fakeExternalIPPoolName, + AssignedNode: fakeNode2, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := newFakeController(t) + c.externalIPStates = tt.externalIPStates + got := c.ServiceExternalIPController.GetServiceExternalIPStatus() + assert.ElementsMatch(t, tt.expectedServiceExternalIPInfo, got) + }) + } +} diff --git a/pkg/antctl/antctl.go b/pkg/antctl/antctl.go index 7a58b19a907..cfe9605e132 100644 --- a/pkg/antctl/antctl.go +++ b/pkg/antctl/antctl.go @@ -21,6 +21,7 @@ import ( "antrea.io/antrea/pkg/agent/apiserver/handlers/agentinfo" "antrea.io/antrea/pkg/agent/apiserver/handlers/ovsflows" "antrea.io/antrea/pkg/agent/apiserver/handlers/podinterface" + "antrea.io/antrea/pkg/agent/apiserver/handlers/serviceexternalip" "antrea.io/antrea/pkg/agent/openflow" fallbackversion "antrea.io/antrea/pkg/antctl/fallback/version" "antrea.io/antrea/pkg/antctl/raw/featuregates" @@ -509,6 +510,32 @@ var CommandList = &commandList{ }, transformedResponse: reflect.TypeOf(recordmetrics.Response{}), }, + { + use: "serviceexternalip", + short: "Print Service external IP status", + long: "Print Service external IP status. It includes the external IP, external IP pool and the assigned Node for Services with type LoadBalancer managed by Antrea", + commandGroup: get, + aliases: []string{"seip", "serviceexternalips"}, + agentEndpoint: &endpoint{ + nonResourceEndpoint: &nonResourceEndpoint{ + path: "/serviceexternalip", + params: []flagInfo{ + { + name: "name", + usage: "Name of the Service; if present, Namespace must be provided as well.", + arg: true, + }, + { + name: "namespace", + usage: "Only get the external IP status for Services in the provided Namespace.", + shorthand: "n", + }, + }, + outputType: multiple, + }, + }, + transformedResponse: reflect.TypeOf(serviceexternalip.Response{}), + }, }, rawCommands: []rawCommand{ { diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 29b9ebb3fad..19de222def5 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -90,3 +90,19 @@ type NetworkPolicyQueryFilter struct { // The type of the original NetworkPolicy that the internal NetworkPolicy is created for.(K8sNP, CNP, ANP) SourceType cpv1beta.NetworkPolicyType } + +// ServiceExternalIPStatusQuerier queries the Service external IP status for debugging purposes. +// Ideally, every Node should have consistent results eventually. This should only be used when +// ServiceExternalIP feature is enabled. +type ServiceExternalIPStatusQuerier interface { + GetServiceExternalIPStatus() []ServiceExternalIPInfo +} + +// ServiceExternalIPInfo contains the essential information for Services with type of Loadbalancer managed by Antrea. +type ServiceExternalIPInfo struct { + ServiceName string `json:"serviceName,omitempty" antctl:"name,Name of the Service"` + Namespace string `json:"namespace,omitempty"` + ExternalIP string `json:"externalIP,omitempty"` + ExternalIPPool string `json:"externalIPPool,omitempty"` + AssignedNode string `json:"assignedNode,omitempty"` +} diff --git a/test/e2e/service_externalip_test.go b/test/e2e/service_externalip_test.go index 635c9425a68..67d964e00c3 100644 --- a/test/e2e/service_externalip_test.go +++ b/test/e2e/service_externalip_test.go @@ -16,6 +16,7 @@ package e2e import ( "context" + "encoding/json" "fmt" "net" "strconv" @@ -38,6 +39,7 @@ import ( "antrea.io/antrea/pkg/apis/crd/v1alpha2" agentconfig "antrea.io/antrea/pkg/config/agent" controllerconfig "antrea.io/antrea/pkg/config/controller" + "antrea.io/antrea/pkg/querier" ) func TestServiceExternalIP(t *testing.T) { @@ -243,18 +245,16 @@ func testServiceExternalTrafficPolicyLocal(t *testing.T, data *TestData) { require.NoError(t, err) defer data.clientset.CoreV1().Endpoints(eps.Namespace).Delete(context.TODO(), eps.Name, metav1.DeleteOptions{}) - service, err = data.waitForServiceConfigured(service, tt.expectedExternalIP, tt.expectedNodeOrigin != "", tt.expectedNodeOrigin) + service, node, err := data.waitForServiceConfigured(service, tt.expectedExternalIP, tt.expectedNodeOrigin) require.NoError(t, err) - _, node := getServiceExternalIPAndHost(service) assert.Equal(t, tt.expectedNodeOrigin, node) epsToUpdate := eps.DeepCopy() epsToUpdate.Subsets = tt.updatedEndpointSubsets _, err = data.clientset.CoreV1().Endpoints(eps.Namespace).Update(context.TODO(), epsToUpdate, metav1.UpdateOptions{}) require.NoError(t, err) - service, err = data.waitForServiceConfigured(service, tt.expectedExternalIP, tt.expectedNodeUpdated != "", tt.expectedNodeUpdated) + _, node, err = data.waitForServiceConfigured(service, tt.expectedExternalIP, tt.expectedNodeUpdated) require.NoError(t, err) - _, node = getServiceExternalIPAndHost(service) assert.Equal(t, tt.expectedNodeUpdated, node) assert.NoError(t, err) }) @@ -347,12 +347,10 @@ func testServiceWithExternalIPCRUD(t *testing.T, data *TestData) { require.NoError(t, err) defer data.clientset.CoreV1().Services(service.Namespace).Delete(context.TODO(), service.Name, metav1.DeleteOptions{}) - waitForNodeConfigured := len(tt.expectedNodes) != 0 - service, err = data.waitForServiceConfigured(service, tt.expectedExternalIP, waitForNodeConfigured, "") + service, assignedNode, err := data.waitForServiceConfigured(service, tt.expectedExternalIP, "") require.NoError(t, err) if len(tt.expectedNodes) > 0 { - _, assignedNode := getServiceExternalIPAndHost(service) assert.True(t, tt.expectedNodes.Has(assignedNode), "expected assigned Node in %s, got %s", tt.expectedNodes, assignedNode) } @@ -441,7 +439,7 @@ func testServiceUpdateExternalIP(t *testing.T, data *TestData) { require.NoError(t, err) defer data.clientset.CoreV1().Services(service.Namespace).Delete(context.TODO(), service.Name, metav1.DeleteOptions{}) - service, err = data.waitForServiceConfigured(service, tt.originalExternalIP, true, tt.originalNode) + service, _, err = data.waitForServiceConfigured(service, tt.originalExternalIP, tt.originalNode) require.NoError(t, err) toUpdate := service.DeepCopy() @@ -455,7 +453,7 @@ func testServiceUpdateExternalIP(t *testing.T, data *TestData) { }) require.NoError(t, err, "Failed to update Service") - _, err = data.waitForServiceConfigured(service, tt.newExternalIP, true, tt.newNode) + _, _, err = data.waitForServiceConfigured(service, tt.newExternalIP, tt.newNode) assert.NoError(t, err) }) } @@ -523,9 +521,8 @@ func testServiceNodeFailure(t *testing.T, data *TestData) { require.NoError(t, err) defer data.clientset.CoreV1().Services(service.Namespace).Delete(context.TODO(), service.Name, metav1.DeleteOptions{}) - service, err = data.waitForServiceConfigured(service, tt.expectedIP, true, "") + service, originalNode, err := data.waitForServiceConfigured(service, tt.expectedIP, "") assert.NoError(t, err) - _, originalNode := getServiceExternalIPAndHost(service) pauseAgent(originalNode) defer restoreAgent(originalNode) @@ -535,10 +532,17 @@ func testServiceNodeFailure(t *testing.T, data *TestData) { } else { expectedMigratedNode = nodeName(0) } - service, err = data.waitForServiceConfigured(service, tt.expectedIP, true, expectedMigratedNode) + // The Agent on the original Node is paused. Run antctl from the expected migrated Node instead. + err = wait.PollImmediate(200*time.Millisecond, 15*time.Second, func() (done bool, err error) { + assigndNode, err := data.getServiceAssignedNode(expectedMigratedNode, service) + if err != nil { + return false, nil + } + return assigndNode == expectedMigratedNode, nil + }) assert.NoError(t, err) restoreAgent(originalNode) - _, err = data.waitForServiceConfigured(service, tt.expectedIP, true, originalNode) + _, _, err = data.waitForServiceConfigured(service, tt.expectedIP, originalNode) assert.NoError(t, err) }) } @@ -608,20 +612,23 @@ func testExternalIPAccess(t *testing.T, data *TestData) { } waitExternalIPConfigured := func(service *v1.Service) (string, string, error) { var ip string - var host string + var assigndNode string err := wait.PollImmediate(200*time.Millisecond, 5*time.Second, func() (done bool, err error) { service, err = data.clientset.CoreV1().Services(service.Namespace).Get(context.TODO(), service.Name, metav1.GetOptions{}) if err != nil { return false, err } - if len(service.Status.LoadBalancer.Ingress) == 0 || service.Status.LoadBalancer.Ingress[0].IP == "" || service.Status.LoadBalancer.Ingress[0].Hostname == "" { + if len(service.Status.LoadBalancer.Ingress) == 0 || service.Status.LoadBalancer.Ingress[0].IP == "" { + return false, nil + } + assigndNode, err = data.getServiceAssignedNode("", service) + if err != nil { return false, nil } ip = service.Status.LoadBalancer.Ingress[0].IP - host = service.Status.LoadBalancer.Ingress[0].Hostname return true, nil }) - return ip, host, err + return ip, assigndNode, err } for _, et := range externalIPTestCases { t.Run(et.name, func(t *testing.T) { @@ -704,14 +711,32 @@ sleep 3600`, tt.clientName, tt.clientIP, tt.localIP, tt.clientIPMaskLen) } } -func getServiceExternalIPAndHost(service *v1.Service) (string, string) { - if service == nil || len(service.Status.LoadBalancer.Ingress) == 0 { - return "", "" +func (data *TestData) getServiceAssignedNode(node string, service *v1.Service) (string, error) { + if node == "" { + node = nodeName(0) + } + agentPodName, err := data.getAntreaPodOnNode(node) + if err != nil { + return "", err } - return service.Status.LoadBalancer.Ingress[0].IP, service.Status.LoadBalancer.Ingress[0].Hostname + cmd := []string{"antctl", "get", "serviceexternalip", service.Name, "-n", service.Namespace, "-o", "json"} + + stdout, _, err := runAntctl(agentPodName, cmd, data) + if err != nil { + return "", err + } + var serviceExternalIPInfo []querier.ServiceExternalIPInfo + if err := json.Unmarshal([]byte(stdout), &serviceExternalIPInfo); err != nil { + return "", err + } + if len(serviceExternalIPInfo) != 1 { + return "", fmt.Errorf("expected exactly one entry, got %#v", serviceExternalIPInfo) + } + return serviceExternalIPInfo[0].AssignedNode, nil } -func (data *TestData) waitForServiceConfigured(service *v1.Service, expectedExternalIP string, waitForNodeConfigured bool, expectedNodeName string) (*corev1.Service, error) { +func (data *TestData) waitForServiceConfigured(service *v1.Service, expectedExternalIP string, expectedNodeName string) (*corev1.Service, string, error) { + var assignedNode string err := wait.PollImmediate(200*time.Millisecond, 15*time.Second, func() (done bool, err error) { service, err = data.clientset.CoreV1().Services(service.Namespace).Get(context.TODO(), service.Name, metav1.GetOptions{}) if err != nil { @@ -720,19 +745,18 @@ func (data *TestData) waitForServiceConfigured(service *v1.Service, expectedExte if len(service.Status.LoadBalancer.Ingress) == 0 || service.Status.LoadBalancer.Ingress[0].IP != expectedExternalIP { return false, nil } - if waitForNodeConfigured || expectedNodeName != "" { - if service.Status.LoadBalancer.Ingress[0].Hostname == "" { - return false, nil - } + assignedNode, err = data.getServiceAssignedNode("", service) + if err != nil { + return false, nil } - if expectedNodeName != "" && service.Status.LoadBalancer.Ingress[0].Hostname != expectedNodeName { + if expectedNodeName != "" && assignedNode != expectedNodeName { return false, nil } return true, nil }) if err != nil { - return service, fmt.Errorf("wait for Service %q configured failed: %v. Expected external IP %s on Node %s, actual status %#v", - service.Name, err, expectedExternalIP, expectedNodeName, service.Status) + return service, assignedNode, fmt.Errorf("wait for Service %q configured failed: %v. Expected external IP %s on Node %s, actual status %#v, assigned Node: %s"+ + service.Name, err, expectedExternalIP, expectedNodeName, service.Status, assignedNode) } - return service, nil + return service, assignedNode, nil }