diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index d775d05a4c0..72fb11a08c8 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -35,6 +35,8 @@ import ( "antrea.io/antrea/pkg/agent/flowexporter" "antrea.io/antrea/pkg/agent/flowexporter/exporter" "antrea.io/antrea/pkg/agent/interfacestore" + "antrea.io/antrea/pkg/agent/ipassigner" + "antrea.io/antrea/pkg/agent/memberlist" "antrea.io/antrea/pkg/agent/metrics" npl "antrea.io/antrea/pkg/agent/nodeportlocal" "antrea.io/antrea/pkg/agent/openflow" @@ -45,6 +47,7 @@ import ( "antrea.io/antrea/pkg/agent/stats" "antrea.io/antrea/pkg/agent/types" crdinformers "antrea.io/antrea/pkg/client/informers/externalversions" + "antrea.io/antrea/pkg/controller/externalippool" "antrea.io/antrea/pkg/features" "antrea.io/antrea/pkg/log" "antrea.io/antrea/pkg/monitor" @@ -291,10 +294,25 @@ func run(o *Options) error { } else { return fmt.Errorf("invalid Node Transport IPAddr in Node config: %v", nodeConfig) } + + var externalIPPoolController *externalippool.ExternalIPPoolController + var memberlistCluster *memberlist.Cluster + var localIPDetector ipassigner.LocalIPDetector + if features.DefaultFeatureGate.Enabled(features.Egress) { + externalIPPoolController = externalippool.NewExternalIPPoolController( + crdClient, externalIPPoolInformer, + ) + localIPDetector = ipassigner.NewLocalIPDetector() + memberlistCluster, err = memberlist.NewCluster(o.config.ClusterMembershipPort, + nodeConfig.Name, nodeInformer, externalIPPoolInformer, + ) + if err != nil { + return fmt.Errorf("error creating new MemberList cluster: %v", err) + } egressController, err = egress.NewEgressController( ofClient, antreaClientProvider, crdClient, ifaceStore, routeClient, nodeConfig.Name, nodeTransportIP, - o.config.ClusterMembershipPort, egressInformer, nodeInformer, externalIPPoolInformer, + memberlistCluster, egressInformer, nodeInformer, localIPDetector, ) if err != nil { return fmt.Errorf("error creating new Egress controller: %v", err) @@ -426,6 +444,9 @@ func run(o *Options) error { go networkPolicyController.Run(stopCh) if features.DefaultFeatureGate.Enabled(features.Egress) { + go externalIPPoolController.Run(stopCh) + go localIPDetector.Run(stopCh) + go memberlistCluster.Run(stopCh) go egressController.Run(stopCh) } diff --git a/cmd/antrea-controller/controller.go b/cmd/antrea-controller/controller.go index a2ce52e3a13..4eb32e3b91a 100644 --- a/cmd/antrea-controller/controller.go +++ b/cmd/antrea-controller/controller.go @@ -42,6 +42,7 @@ import ( "antrea.io/antrea/pkg/controller/crdmirroring/crdhandler" "antrea.io/antrea/pkg/controller/egress" egressstore "antrea.io/antrea/pkg/controller/egress/store" + "antrea.io/antrea/pkg/controller/externalippool" "antrea.io/antrea/pkg/controller/grouping" "antrea.io/antrea/pkg/controller/metrics" "antrea.io/antrea/pkg/controller/networkpolicy" @@ -232,8 +233,12 @@ func run(o *Options) error { controllerMonitor := monitor.NewControllerMonitor(crdClient, legacyCRDClient, nodeInformer, controllerQuerier) var egressController *egress.EgressController + var externalIPPoolController *externalippool.ExternalIPPoolController if features.DefaultFeatureGate.Enabled(features.Egress) { - egressController = egress.NewEgressController(crdClient, groupEntityIndex, egressInformer, externalIPPoolInformer, egressGroupStore) + externalIPPoolController = externalippool.NewExternalIPPoolController( + crdClient, externalIPPoolInformer, + ) + egressController = egress.NewEgressController(crdClient, groupEntityIndex, egressInformer, externalIPPoolController, egressGroupStore) } var traceflowController *traceflow.Controller @@ -368,7 +373,9 @@ func run(o *Options) error { go eeMirroringController.Run(stopCh) } } + if features.DefaultFeatureGate.Enabled(features.Egress) { + go externalIPPoolController.Run(stopCh) go egressController.Run(stopCh) } diff --git a/hack/update-codegen-dockerized.sh b/hack/update-codegen-dockerized.sh index e349a3a3297..362d0d7ec0c 100755 --- a/hack/update-codegen-dockerized.sh +++ b/hack/update-codegen-dockerized.sh @@ -164,7 +164,7 @@ MOCKGEN_TARGETS=( "pkg/agent/proxy Proxier testing" "pkg/agent/querier AgentQuerier testing" "pkg/agent/route Interface testing" - "pkg/agent/controller/egress/ipassigner IPAssigner testing" + "pkg/agent/ipassigner IPAssigner testing" "pkg/antctl AntctlClient ." "pkg/controller/networkpolicy EndpointQuerier testing" "pkg/controller/querier ControllerQuerier testing" diff --git a/pkg/agent/controller/egress/egress_controller.go b/pkg/agent/controller/egress/egress_controller.go index 37be9269f15..29f317030a2 100644 --- a/pkg/agent/controller/egress/egress_controller.go +++ b/pkg/agent/controller/egress/egress_controller.go @@ -37,8 +37,8 @@ import ( "k8s.io/klog/v2" "antrea.io/antrea/pkg/agent" - "antrea.io/antrea/pkg/agent/controller/egress/ipassigner" "antrea.io/antrea/pkg/agent/interfacestore" + "antrea.io/antrea/pkg/agent/ipassigner" "antrea.io/antrea/pkg/agent/memberlist" "antrea.io/antrea/pkg/agent/openflow" "antrea.io/antrea/pkg/agent/route" @@ -121,7 +121,7 @@ type EgressController struct { queue workqueue.RateLimitingInterface // Use an interface for IP detector to enable testing. - localIPDetector LocalIPDetector + localIPDetector ipassigner.LocalIPDetector ifaceStore interfacestore.InterfaceStore nodeName string idAllocator *idAllocator @@ -152,12 +152,11 @@ func NewEgressController( routeClient route.Interface, nodeName string, nodeTransportIP net.IP, - clusterPort int, + cluster *memberlist.Cluster, egressInformer crdinformers.EgressInformer, nodeInformer coreinformers.NodeInformer, - externalIPPoolInformer crdinformers.ExternalIPPoolInformer, + localIPDetector ipassigner.LocalIPDetector, ) (*EgressController, error) { - localIPDetector := NewLocalIPDetector() c := &EgressController{ ofClient: ofClient, routeClient: routeClient, @@ -175,6 +174,7 @@ func NewEgressController( egressBindings: map[string]*egressBinding{}, localIPDetector: localIPDetector, idAllocator: newIDAllocator(minEgressMark, maxEgressMark), + cluster: cluster, } ipAssigner, err := ipassigner.NewIPAssigner(nodeTransportIP, egressDummyDevice) if err != nil { @@ -182,12 +182,6 @@ func NewEgressController( } c.ipAssigner = ipAssigner - cluster, err := memberlist.NewCluster(clusterPort, nodeName, nodeInformer, externalIPPoolInformer) - if err != nil { - return nil, fmt.Errorf("initializing memberlist cluster failed: %v", err) - } - c.cluster = cluster - c.egressInformer.AddIndexers(cache.Indexers{egressIPIndex: func(obj interface{}) ([]string, error) { egress, ok := obj.(*crdv1a2.Egress) if !ok { @@ -214,7 +208,7 @@ func NewEgressController( }, resyncPeriod, ) - localIPDetector.AddEventHandler(c.onLocalIPUpdate) + c.localIPDetector.AddEventHandler(c.onLocalIPUpdate) c.cluster.AddClusterEventHandler(c.enqueueEgressesByExternalIPPool) return c, nil } @@ -304,8 +298,6 @@ func (c *EgressController) Run(stopCh <-chan struct{}) { c.removeStaleEgressIPs() - go c.cluster.Run(stopCh) - go wait.NonSlidingUntil(c.watchEgressGroup, 5*time.Second, stopCh) for i := 0; i < defaultWorkers; i++ { @@ -618,7 +610,7 @@ func (c *EgressController) syncEgress(egressName string) error { eState = c.newEgressState(egressName, egress.Spec.EgressIP) } - localNodeSelected, err := c.cluster.ShouldSelectEgress(egress) + localNodeSelected, err := c.cluster.ShouldSelectIP(egress.Spec.EgressIP, egress.Spec.ExternalIPPool) if err != nil { return err } diff --git a/pkg/agent/controller/egress/egress_controller_test.go b/pkg/agent/controller/egress/egress_controller_test.go index 31e54b7cf97..5d43a42275e 100644 --- a/pkg/agent/controller/egress/egress_controller_test.go +++ b/pkg/agent/controller/egress/egress_controller_test.go @@ -33,8 +33,9 @@ import ( k8stesting "k8s.io/client-go/testing" "k8s.io/client-go/util/workqueue" - ipassignertest "antrea.io/antrea/pkg/agent/controller/egress/ipassigner/testing" "antrea.io/antrea/pkg/agent/interfacestore" + "antrea.io/antrea/pkg/agent/ipassigner" + ipassignertest "antrea.io/antrea/pkg/agent/ipassigner/testing" openflowtest "antrea.io/antrea/pkg/agent/openflow/testing" routetest "antrea.io/antrea/pkg/agent/route/testing" "antrea.io/antrea/pkg/agent/util" @@ -65,7 +66,7 @@ func (d *fakeLocalIPDetector) Run(stopCh <-chan struct{}) { <-stopCh } -func (d *fakeLocalIPDetector) AddEventHandler(handler eventHandler) { +func (d *fakeLocalIPDetector) AddEventHandler(handler ipassigner.LocalIPEventHandler) { return } @@ -73,7 +74,7 @@ func (d *fakeLocalIPDetector) HasSynced() bool { return true } -var _ LocalIPDetector = &fakeLocalIPDetector{} +var _ ipassigner.LocalIPDetector = &fakeLocalIPDetector{} type antreaClientGetter struct { clientset versioned.Interface diff --git a/pkg/agent/controller/egress/ipassigner/ip_assigner.go b/pkg/agent/ipassigner/ip_assigner.go similarity index 100% rename from pkg/agent/controller/egress/ipassigner/ip_assigner.go rename to pkg/agent/ipassigner/ip_assigner.go diff --git a/pkg/agent/controller/egress/ipassigner/ip_assigner_linux.go b/pkg/agent/ipassigner/ip_assigner_linux.go similarity index 100% rename from pkg/agent/controller/egress/ipassigner/ip_assigner_linux.go rename to pkg/agent/ipassigner/ip_assigner_linux.go diff --git a/pkg/agent/controller/egress/ipassigner/ip_assigner_windows.go b/pkg/agent/ipassigner/ip_assigner_windows.go similarity index 100% rename from pkg/agent/controller/egress/ipassigner/ip_assigner_windows.go rename to pkg/agent/ipassigner/ip_assigner_windows.go diff --git a/pkg/agent/controller/egress/local_ip_detector.go b/pkg/agent/ipassigner/local_ip_detector.go similarity index 89% rename from pkg/agent/controller/egress/local_ip_detector.go rename to pkg/agent/ipassigner/local_ip_detector.go index 108ed138358..df6c9b80ba2 100644 --- a/pkg/agent/controller/egress/local_ip_detector.go +++ b/pkg/agent/ipassigner/local_ip_detector.go @@ -12,9 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -package egress +package ipassigner -type eventHandler func(ip string, added bool) +type LocalIPEventHandler func(ip string, added bool) type LocalIPDetector interface { IsLocalIP(ip string) bool @@ -24,7 +24,7 @@ type LocalIPDetector interface { // AddEventHandler registers an eventHandler of IP address update. It's not thread-safe and should be called before // starting the detector. - AddEventHandler(handler eventHandler) + AddEventHandler(handler LocalIPEventHandler) // HasSynced returns true if the cache has been initialized with the full lists of IP addresses. HasSynced() bool diff --git a/pkg/agent/controller/egress/local_ip_detector_linux.go b/pkg/agent/ipassigner/local_ip_detector_linux.go similarity index 97% rename from pkg/agent/controller/egress/local_ip_detector_linux.go rename to pkg/agent/ipassigner/local_ip_detector_linux.go index a26336d91d0..c4e41dcf68f 100644 --- a/pkg/agent/controller/egress/local_ip_detector_linux.go +++ b/pkg/agent/ipassigner/local_ip_detector_linux.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package egress +package ipassigner import ( "sync" @@ -31,7 +31,7 @@ type localIPDetector struct { mutex sync.RWMutex localIPs sets.String cacheSynced bool - eventHandlers []eventHandler + eventHandlers []LocalIPEventHandler } func NewLocalIPDetector() *localIPDetector { @@ -51,7 +51,7 @@ func (d *localIPDetector) HasSynced() bool { return d.cacheSynced } -func (d *localIPDetector) AddEventHandler(handler eventHandler) { +func (d *localIPDetector) AddEventHandler(handler LocalIPEventHandler) { d.eventHandlers = append(d.eventHandlers, handler) } diff --git a/pkg/agent/controller/egress/local_ip_detector_windows.go b/pkg/agent/ipassigner/local_ip_detector_windows.go similarity index 91% rename from pkg/agent/controller/egress/local_ip_detector_windows.go rename to pkg/agent/ipassigner/local_ip_detector_windows.go index 99a21931c20..5aaf32166ca 100644 --- a/pkg/agent/controller/egress/local_ip_detector_windows.go +++ b/pkg/agent/ipassigner/local_ip_detector_windows.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package egress +package ipassigner // Not implemented yet. The feature gate verification will protect this from being run. type localIPDetector struct{} @@ -25,7 +25,7 @@ func (d *localIPDetector) Run(stopCh <-chan struct{}) { return } -func (d *localIPDetector) AddEventHandler(handler eventHandler) { +func (d *localIPDetector) AddEventHandler(handler LocalIPEventHandler) { return } diff --git a/pkg/agent/controller/egress/ipassigner/testing/mock_ipassigner.go b/pkg/agent/ipassigner/testing/mock_ipassigner.go similarity index 96% rename from pkg/agent/controller/egress/ipassigner/testing/mock_ipassigner.go rename to pkg/agent/ipassigner/testing/mock_ipassigner.go index 0569cb0f84e..4baecb05cee 100644 --- a/pkg/agent/controller/egress/ipassigner/testing/mock_ipassigner.go +++ b/pkg/agent/ipassigner/testing/mock_ipassigner.go @@ -14,7 +14,7 @@ // // Code generated by MockGen. DO NOT EDIT. -// Source: antrea.io/antrea/pkg/agent/controller/egress/ipassigner (interfaces: IPAssigner) +// Source: antrea.io/antrea/pkg/agent/ipassigner (interfaces: IPAssigner) // Package testing is a generated GoMock package. package testing diff --git a/pkg/agent/memberlist/cluster.go b/pkg/agent/memberlist/cluster.go index 5e54eba05ce..ab6c9b5c5d7 100644 --- a/pkg/agent/memberlist/cluster.go +++ b/pkg/agent/memberlist/cluster.go @@ -458,22 +458,25 @@ func (c *Cluster) aliveNodes() sets.String { return nodes } -// ShouldSelectEgress returns true if the local Node selected as the owner Node of the Egress, -// the local Node in the cluster holds the same consistent hash ring for each ExternalIPPool, -// consistentHash.Get gets the closest item (Node name) in the hash to the provided key(egressIP), -// if the name of the local Node is equal to the name of the selected Node, returns true. -func (c *Cluster) ShouldSelectEgress(egress *v1alpha2.Egress) (bool, error) { - eipName := egress.Spec.ExternalIPPool - if eipName == "" || egress.Spec.EgressIP == "" { +// ShouldSelectIP returns true if the local Node selected as the owner Node of the IP in the specific +// ExternalIPPool. The local Node in the cluster holds the same consistent hash ring for each ExternalIPPool, +// consistentHash.Get gets the closest item (Node name) in the hash to the provided key (IP), if the name of +// the local Node is equal to the name of the selected Node, returns true. +func (c *Cluster) ShouldSelectIP(ip, externalIPPool string) (bool, error) { + if externalIPPool == "" || ip == "" { return false, nil } c.consistentHashRWMutex.RLock() defer c.consistentHashRWMutex.RUnlock() - consistentHash, ok := c.consistentHashMap[eipName] + consistentHash, ok := c.consistentHashMap[externalIPPool] if !ok { - return false, fmt.Errorf("local Node consistentHashMap has not synced, ExternalIPPool %s", eipName) + return false, fmt.Errorf("local Node consistentHashMap has not synced, ExternalIPPool %s", externalIPPool) } - return consistentHash.Get(egress.Spec.EgressIP) == c.nodeName, nil + node := consistentHash.Get(ip) + if node == "" { + klog.Warningf("No valid Node chosen for IP %s in externalIPPool %s", ip, externalIPPool) + } + return node == c.nodeName, nil } func (c *Cluster) notify(objName string) { diff --git a/pkg/agent/memberlist/cluster_test.go b/pkg/agent/memberlist/cluster_test.go index e8207946445..ce8e58dffd4 100644 --- a/pkg/agent/memberlist/cluster_test.go +++ b/pkg/agent/memberlist/cluster_test.go @@ -53,7 +53,6 @@ func newFakeCluster(nodeConfig *config.NodeConfig, stopCh <-chan struct{}, i int informerFactory := informers.NewSharedInformerFactory(clientset, 0) nodeInformer := informerFactory.Core().V1().Nodes() - crdClient := fakeversioned.NewSimpleClientset([]runtime.Object{}...) crdInformerFactory := crdinformers.NewSharedInformerFactory(crdClient, 0) ipPoolInformer := crdInformerFactory.Crd().V1alpha2().ExternalIPPools() @@ -159,7 +158,7 @@ func TestCluster_Run(t *testing.T) { })) tCase.egress.Spec.ExternalIPPool = eip.Name - res, err := fakeCluster.cluster.ShouldSelectEgress(tCase.egress) + res, err := fakeCluster.cluster.ShouldSelectIP(tCase.egress.Spec.EgressIP, eip.Name) // Cluster should hold the same consistent hash ring for each ExternalIPPool. assert.NoError(t, err) allMembers, err := fakeCluster.cluster.allClusterMembers() @@ -259,7 +258,7 @@ func TestCluster_RunClusterEvents(t *testing.T) { t.Run(tCase.name, func(t *testing.T) { localNode.Labels = tCase.newNodeLabels updateNode(localNode) - res, err := fakeCluster.cluster.ShouldSelectEgress(tCase.egress) + res, err := fakeCluster.cluster.ShouldSelectIP(tCase.egress.Spec.EgressIP, tCase.egress.Spec.ExternalIPPool) assert.NoError(t, err) assert.Equal(t, tCase.expectEgressSelectResult, res, "select Node for Egress result not match") }) @@ -305,7 +304,7 @@ func TestCluster_RunClusterEvents(t *testing.T) { newEIP, _ := fakeCluster.cluster.externalIPPoolLister.Get(fakeEIP1.Name) return reflect.DeepEqual(fakeEIP1, newEIP), nil })) - res, err := fakeCluster.cluster.ShouldSelectEgress(fakeEgress1) + res, err := fakeCluster.cluster.ShouldSelectIP(fakeEgress1.Spec.EgressIP, fakeEgress1.Spec.ExternalIPPool) assert.NoError(t, err) assert.Equal(t, tCase.expectEgressSelectResult, res, "select Node for Egress result not match") }) @@ -332,7 +331,7 @@ func TestCluster_RunClusterEvents(t *testing.T) { return reflect.DeepEqual(newEIP, fakeEIP2), nil })) assertEgressSelectResult := func(egress *crdv1a2.Egress, expectedRes bool, hasSyncedErr bool) { - res, err := fakeCluster.cluster.ShouldSelectEgress(egress) + res, err := fakeCluster.cluster.ShouldSelectIP(egress.Spec.EgressIP, egress.Spec.ExternalIPPool) if !hasSyncedErr { assert.NoError(t, err) } @@ -466,7 +465,7 @@ func TestCluster_ConsistentHashDistribute(t *testing.T) { ObjectMeta: metav1.ObjectMeta{Name: "fakeEgress"}, Spec: crdv1a2.EgressSpec{ExternalIPPool: fakeEIPName, EgressIP: fmt.Sprintf("10.1.1.%d", i)}, } - selected, err := fakeCluster.ShouldSelectEgress(fakeEgress) + selected, err := fakeCluster.ShouldSelectIP(fakeEgress.Spec.EgressIP, fakeEgress.Spec.ExternalIPPool) assert.NoError(t, err) if selected { selectedNodes = append(selectedNodes, i) @@ -535,7 +534,7 @@ func TestCluster_ShouldSelectEgress(t *testing.T) { for i := 0; i < tCase.nodeNum; i++ { node := fmt.Sprintf("node-%d", i) fakeCluster.nodeName = node - selected, err := fakeCluster.ShouldSelectEgress(fakeEgress) + selected, err := fakeCluster.ShouldSelectIP(fakeEgress.Spec.EgressIP, fakeEgress.Spec.ExternalIPPool) assert.NoError(t, err) assert.Equal(t, node == tCase.expectedNode, selected, "Selected Node for Egress not match") } @@ -574,7 +573,7 @@ func BenchmarkCluster_ShouldSelect(b *testing.B) { b.Run(fmt.Sprintf("%s-nodeSelectedForEgress", bc.name), func(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - fakeCluster.ShouldSelectEgress(fakeEgress) + fakeCluster.ShouldSelectIP(fakeEgress.Spec.EgressIP, fakeEgress.Spec.ExternalIPPool) } }) } diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index b49ed894adc..777a7b0a32f 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -54,6 +54,7 @@ import ( "antrea.io/antrea/pkg/apiserver/registry/system/supportbundle" "antrea.io/antrea/pkg/apiserver/storage" "antrea.io/antrea/pkg/controller/egress" + "antrea.io/antrea/pkg/controller/externalippool" "antrea.io/antrea/pkg/controller/ipam" controllernetworkpolicy "antrea.io/antrea/pkg/controller/networkpolicy" "antrea.io/antrea/pkg/controller/querier" @@ -101,6 +102,7 @@ type ExtraConfig struct { endpointQuerier controllernetworkpolicy.EndpointQuerier networkPolicyController *controllernetworkpolicy.NetworkPolicyController egressController *egress.EgressController + externalIPPoolController *externalippool.ExternalIPPoolController caCertController *certificate.CACertController statsAggregator *stats.Aggregator networkPolicyStatusController *controllernetworkpolicy.StatusController @@ -308,8 +310,8 @@ func installHandlers(c *ExtraConfig, s *genericapiserver.GenericAPIServer) { } if features.DefaultFeatureGate.Enabled(features.Egress) { - s.Handler.NonGoRestfulMux.HandleFunc("/validate/externalippool", webhook.HandlerForValidateFunc(c.egressController.ValidateExternalIPPool)) s.Handler.NonGoRestfulMux.HandleFunc("/validate/egress", webhook.HandlerForValidateFunc(c.egressController.ValidateEgress)) + s.Handler.NonGoRestfulMux.HandleFunc("/validate/externalippool", webhook.HandlerForValidateFunc(c.externalIPPoolController.ValidateExternalIPPool)) } if features.DefaultFeatureGate.Enabled(features.AntreaIPAM) { diff --git a/pkg/controller/egress/controller.go b/pkg/controller/egress/controller.go index 2f96f0dc9d6..d7df808c0a2 100644 --- a/pkg/controller/egress/controller.go +++ b/pkg/controller/egress/controller.go @@ -17,24 +17,21 @@ package egress import ( "context" "encoding/json" - "errors" "fmt" "net" "reflect" "sync" "time" - apierrors "k8s.io/apimachinery/pkg/api/errors" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/cache" - "k8s.io/client-go/util/retry" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" - utilnet "k8s.io/utils/net" "antrea.io/antrea/pkg/apis/controlplane" egressv1alpha2 "antrea.io/antrea/pkg/apis/crd/v1alpha2" @@ -42,11 +39,9 @@ import ( clientset "antrea.io/antrea/pkg/client/clientset/versioned" egressinformers "antrea.io/antrea/pkg/client/informers/externalversions/crd/v1alpha2" egresslisters "antrea.io/antrea/pkg/client/listers/crd/v1alpha2" + "antrea.io/antrea/pkg/controller/externalippool" "antrea.io/antrea/pkg/controller/grouping" - "antrea.io/antrea/pkg/controller/metrics" antreatypes "antrea.io/antrea/pkg/controller/types" - "antrea.io/antrea/pkg/ipam/ipallocator" - iputil "antrea.io/antrea/pkg/util/ip" ) const ( @@ -64,10 +59,6 @@ const ( externalIPPoolIndex = "externalIPPool" ) -var ( - externalIPPoolNotFound = errors.New("ExternalIPPool not found") -) - // ipAllocation contains the IP and the IP Pool which allocates it. type ipAllocation struct { ip net.IP @@ -76,12 +67,9 @@ type ipAllocation struct { // EgressController is responsible for synchronizing the EgressGroups selected by Egresses. type EgressController struct { - crdClient clientset.Interface - externalIPPoolLister egresslisters.ExternalIPPoolLister - externalIPPoolListerSynced cache.InformerSynced - // ipAllocatorMap is a map from ExternalIPPool name to MultiIPAllocator. - ipAllocatorMap map[string]ipallocator.MultiIPAllocator - ipAllocatorMutex sync.RWMutex + crdClient clientset.Interface + + externalIPAllocator externalippool.ExternalIPAllocator // ipAllocationMap is a map from Egress name to ipAllocation, which is used to check whether the Egress's IP has // changed and to release the IP after the Egress is removed. @@ -97,8 +85,6 @@ type EgressController struct { egressGroupStore storage.Interface // queue maintains the EgressGroup objects that need to be synced. queue workqueue.RateLimitingInterface - // poolQueue maintains the ExternalIPPool objects that need to be synced. - poolQueue workqueue.RateLimitingInterface // groupingInterface knows Pods that a given group selects. groupingInterface grouping.Interface // Added as a member to the struct to allow injection for testing. @@ -109,23 +95,20 @@ type EgressController struct { func NewEgressController(crdClient clientset.Interface, groupingInterface grouping.Interface, egressInformer egressinformers.EgressInformer, - externalIPPoolInformer egressinformers.ExternalIPPoolInformer, + externalIPAllocator externalippool.ExternalIPAllocator, egressGroupStore storage.Interface) *EgressController { c := &EgressController{ - crdClient: crdClient, - egressInformer: egressInformer, - egressLister: egressInformer.Lister(), - egressListerSynced: egressInformer.Informer().HasSynced, - egressIndexer: egressInformer.Informer().GetIndexer(), - externalIPPoolLister: externalIPPoolInformer.Lister(), - externalIPPoolListerSynced: externalIPPoolInformer.Informer().HasSynced, - egressGroupStore: egressGroupStore, - queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "egress"), - poolQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "externalIPPool"), - groupingInterface: groupingInterface, - groupingInterfaceSynced: groupingInterface.HasSynced, - ipAllocatorMap: map[string]ipallocator.MultiIPAllocator{}, - ipAllocationMap: map[string]*ipAllocation{}, + crdClient: crdClient, + egressInformer: egressInformer, + egressLister: egressInformer.Lister(), + egressListerSynced: egressInformer.Informer().HasSynced, + egressIndexer: egressInformer.Informer().GetIndexer(), + egressGroupStore: egressGroupStore, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "egress"), + groupingInterface: groupingInterface, + groupingInterfaceSynced: groupingInterface.HasSynced, + ipAllocationMap: map[string]*ipAllocation{}, + externalIPAllocator: externalIPAllocator, } // Add handlers for Group events and Egress events. c.groupingInterface.AddEventHandler(egressGroupType, c.enqueueEgressGroup) @@ -148,143 +131,55 @@ func NewEgressController(crdClient clientset.Interface, } return []string{egress.Spec.ExternalIPPool}, nil }}) - externalIPPoolInformer.Informer().AddEventHandlerWithResyncPeriod( - cache.ResourceEventHandlerFuncs{ - AddFunc: c.addExternalIPPool, - UpdateFunc: c.updateExternalIPPool, - DeleteFunc: c.deleteExternalIPPool, - }, - resyncPeriod, - ) + c.externalIPAllocator.AddEventHandler(func(ipPool string) { + c.enqueueEgresses(ipPool) + }) return c } // Run begins watching and syncing of the EgressController. func (c *EgressController) Run(stopCh <-chan struct{}) { defer c.queue.ShutDown() - defer c.poolQueue.ShutDown() klog.Infof("Starting %s", controllerName) defer klog.Infof("Shutting down %s", controllerName) - cacheSyncs := []cache.InformerSynced{c.egressListerSynced, c.externalIPPoolListerSynced, c.groupingInterfaceSynced} + cacheSyncs := []cache.InformerSynced{c.egressListerSynced, c.groupingInterfaceSynced, c.externalIPAllocator.HasSynced} if !cache.WaitForNamedCacheSync(controllerName, stopCh, cacheSyncs...) { return } - - // Initialize the ipAllocatorMap and ipAllocationMap with the existing ExternalIPPools and Egresses. - ipPools, _ := c.externalIPPoolLister.List(labels.Everything()) - for _, ipPool := range ipPools { - c.createOrUpdateIPAllocator(ipPool) - } egresses, _ := c.egressLister.List(labels.Everything()) - for _, egress := range egresses { - c.updateIPAllocation(egress) - } - + c.restoreIPAllocations(egresses) for i := 0; i < defaultWorkers; i++ { go wait.Until(c.egressGroupWorker, time.Second, stopCh) - go wait.Until(c.externalIPPoolWorker, time.Second, stopCh) } <-stopCh } -// updateIPAllocation sets the EgressIP of an Egress as allocated in the specified ExternalIPPool and records the -// allocation in ipAllocationMap. -func (c *EgressController) updateIPAllocation(egress *egressv1alpha2.Egress) { - // Ignore Egress that is not associated to ExternalIPPool or doesn't have EgressIP assigned. - if egress.Spec.ExternalIPPool == "" || egress.Spec.EgressIP == "" { - return - } - ipAllocator, exists := c.getIPAllocator(egress.Spec.ExternalIPPool) - if !exists { - klog.ErrorS(externalIPPoolNotFound, "Failed to allocate EgressIP", "egress", egress.Name, "ip", egress.Spec.EgressIP, "pool", egress.Spec.ExternalIPPool) - return - } - ip := net.ParseIP(egress.Spec.EgressIP) - err := ipAllocator.AllocateIP(ip) - if err != nil { - klog.ErrorS(err, "Failed to allocate EgressIP", "egress", egress.Name, "ip", egress.Spec.EgressIP, "pool", egress.Spec.ExternalIPPool) - return - } - // Record the valid IP allocation. - c.setIPAllocation(egress.Name, ip, egress.Spec.ExternalIPPool) - klog.InfoS("Allocated EgressIP", "egress", egress.Name, "ip", egress.Spec.EgressIP, "pool", egress.Spec.ExternalIPPool) -} - -// createOrUpdateIPAllocator creates or updates the IP allocator based on the provided ExternalIPPool. -// Currently it's assumed that only new ranges will be added and existing ranges should not be deleted. -// TODO: Use validation webhook to ensure it. -func (c *EgressController) createOrUpdateIPAllocator(ipPool *egressv1alpha2.ExternalIPPool) bool { - changed := false - c.ipAllocatorMutex.Lock() - defer c.ipAllocatorMutex.Unlock() - - existingIPRanges := sets.NewString() - multiIPAllocator, exists := c.ipAllocatorMap[ipPool.Name] - if !exists { - multiIPAllocator = ipallocator.MultiIPAllocator{} - changed = true - } else { - existingIPRanges.Insert(multiIPAllocator.Names()...) - } - - for _, ipRange := range ipPool.Spec.IPRanges { - ipAllocator, err := func() (*ipallocator.SingleIPAllocator, error) { - if ipRange.CIDR != "" { - _, ipNet, err := net.ParseCIDR(ipRange.CIDR) - if err != nil { - return nil, err - } - // Must use normalized IPNet string to check if the IP range exists. Otherwise non-strict CIDR like - // 192.168.0.1/24 will be considered new even if it doesn't change. - // Validating or normalizing the input CIDR should be a better solution but the externalIPPools that - // have been created will still have this issue, so we just normalize the CIDR when using it. - if existingIPRanges.Has(ipNet.String()) { - return nil, nil - } - // Don't use the IPv4 network's broadcast address. - var reservedIPs []net.IP - if utilnet.IsIPv4CIDR(ipNet) { - reservedIPs = append(reservedIPs, iputil.GetLocalBroadcastIP(ipNet)) - } - return ipallocator.NewCIDRAllocator(ipNet, reservedIPs) - } else { - if existingIPRanges.Has(fmt.Sprintf("%s-%s", ipRange.Start, ipRange.End)) { - return nil, nil - } - return ipallocator.NewIPRangeAllocator(net.ParseIP(ipRange.Start), net.ParseIP(ipRange.End)) - } - }() - if err != nil { - klog.ErrorS(err, "Failed to create IPAllocator", "ipRange", ipRange) +// restoreIPAllocations restores the existing EgressIPs of Egresses and records the successful ones in ipAllocationMap. +func (c *EgressController) restoreIPAllocations(egresses []*egressv1alpha2.Egress) { + var previousIPAllocations []externalippool.IPAllocation + for _, egress := range egresses { + // Ignore Egress that is not associated to ExternalIPPool or doesn't have EgressIP assigned. + if egress.Spec.ExternalIPPool == "" || egress.Spec.EgressIP == "" { continue } - // The IP range already exists in multiIPAllocator. - if ipAllocator == nil { - continue + ip := net.ParseIP(egress.Spec.EgressIP) + allocation := externalippool.IPAllocation{ + ObjectReference: v1.ObjectReference{ + Name: egress.Name, + Kind: egress.Kind, + }, + IPPoolName: egress.Spec.ExternalIPPool, + IP: ip, } - multiIPAllocator = append(multiIPAllocator, ipAllocator) - changed = true + previousIPAllocations = append(previousIPAllocations, allocation) + } + succeededAllocations := c.externalIPAllocator.RestoreIPAllocations(previousIPAllocations) + for _, alloc := range succeededAllocations { + c.setIPAllocation(alloc.ObjectReference.Name, alloc.IP, alloc.IPPoolName) + klog.InfoS("Restored EgressIP", "egress", alloc.ObjectReference.Name, "ip", alloc.IP, "pool", alloc.IPPoolName) } - c.ipAllocatorMap[ipPool.Name] = multiIPAllocator - c.poolQueue.Add(ipPool.Name) - return changed -} - -// deleteIPAllocator deletes the IP allocator of the given IP pool. -func (c *EgressController) deleteIPAllocator(ipPoolName string) { - c.ipAllocatorMutex.Lock() - defer c.ipAllocatorMutex.Unlock() - delete(c.ipAllocatorMap, ipPoolName) -} - -// getIPAllocator gets the IP allocator of the given IP pool. -func (c *EgressController) getIPAllocator(ipPoolName string) (ipallocator.MultiIPAllocator, bool) { - c.ipAllocatorMutex.RLock() - defer c.ipAllocatorMutex.RUnlock() - ipAllocator, exists := c.ipAllocatorMap[ipPoolName] - return ipAllocator, exists } func (c *EgressController) egressGroupWorker() { @@ -337,57 +232,18 @@ func (c *EgressController) setIPAllocation(egressName string, ip net.IP, poolNam } } -func (c *EgressController) updateExternalIPPoolStatus(poolName string) error { - eip, err := c.externalIPPoolLister.Get(poolName) - if err != nil { - if apierrors.IsNotFound(err) { - return nil - } - return err - } - - ipAllocator, exists := c.getIPAllocator(eip.Name) - if !exists { - return externalIPPoolNotFound - } - total, used := ipAllocator.Total(), ipAllocator.Used() - toUpdate := eip.DeepCopy() - var getErr error - if err := retry.RetryOnConflict(retry.DefaultRetry, func() error { - actualStatus := eip.Status - usage := egressv1alpha2.ExternalIPPoolUsage{Total: total, Used: used} - if actualStatus.Usage == usage { - return nil - } - klog.V(2).InfoS("Updating ExternalIPPool status", "ExternalIPPool", poolName, "usage", usage) - toUpdate.Status.Usage = usage - if _, updateErr := c.crdClient.CrdV1alpha2().ExternalIPPools().UpdateStatus(context.TODO(), toUpdate, metav1.UpdateOptions{}); updateErr != nil && apierrors.IsConflict(updateErr) { - toUpdate, getErr = c.crdClient.CrdV1alpha2().ExternalIPPools().Get(context.TODO(), poolName, metav1.GetOptions{}) - if getErr != nil { - return getErr - } - return updateErr - } - return nil - }); err != nil { - return fmt.Errorf("updating ExternalIPPool %s status error: %v", poolName, err) - } - klog.V(2).InfoS("Updated ExternalIPPool status", "ExternalIPPool", poolName) - metrics.AntreaExternalIPPoolStatusUpdates.Inc() - return nil -} - // syncEgressIP is responsible for releasing stale EgressIP and allocating new EgressIP for an Egress if applicable. func (c *EgressController) syncEgressIP(egress *egressv1alpha2.Egress) (net.IP, error) { prevIP, prevIPPool, exists := c.getIPAllocation(egress.Name) if exists { - _, ipAllocatorExists := c.getIPAllocator(prevIPPool) // The EgressIP and the ExternalIPPool don't change, do nothing. - if prevIP.String() == egress.Spec.EgressIP && prevIPPool == egress.Spec.ExternalIPPool && ipAllocatorExists { + if prevIP.String() == egress.Spec.EgressIP && prevIPPool == egress.Spec.ExternalIPPool && c.externalIPAllocator.IPPoolExists(egress.Spec.ExternalIPPool) { return prevIP, nil } // Either EgressIP or ExternalIPPool changes, release the previous one first. - c.releaseEgressIP(egress.Name, prevIP, prevIPPool) + if err := c.releaseEgressIP(egress.Name, prevIP, prevIPPool); err != nil { + return nil, err + } } // Skip allocating EgressIP if ExternalIPPool is not specified and return whatever user specifies. @@ -395,15 +251,14 @@ func (c *EgressController) syncEgressIP(egress *egressv1alpha2.Egress) (net.IP, return net.ParseIP(egress.Spec.EgressIP), nil } - ipAllocator, exists := c.getIPAllocator(egress.Spec.ExternalIPPool) - if !exists { + if !c.externalIPAllocator.IPPoolExists(egress.Spec.ExternalIPPool) { // The IP pool has been deleted, reclaim the IP from the Egress API. if egress.Spec.EgressIP != "" { if err := c.updateEgressIP(egress, ""); err != nil { return nil, err } } - return nil, externalIPPoolNotFound + return nil, fmt.Errorf("ExternalIPPool %s not exists", egress.Spec.ExternalIPPool) } var ip net.IP @@ -412,22 +267,24 @@ func (c *EgressController) syncEgressIP(egress *egressv1alpha2.Egress) (net.IP, // TODO: Use validation webhook to ensure the requested IP matches the pool. if egress.Spec.EgressIP != "" { ip = net.ParseIP(egress.Spec.EgressIP) - if err := ipAllocator.AllocateIP(ip); err != nil { + if err := c.externalIPAllocator.UpdateIPAllocation(egress.Spec.ExternalIPPool, ip); err != nil { return nil, fmt.Errorf("error when allocating IP %v for Egress %s from ExternalIPPool %s: %v", ip, egress.Name, egress.Spec.ExternalIPPool, err) } } else { var err error // User doesn't specify the Egress IP, allocate one. - if ip, err = ipAllocator.AllocateNext(); err != nil { + if ip, err = c.externalIPAllocator.AllocateIPFromPool(egress.Spec.ExternalIPPool); err != nil { return nil, err } if err = c.updateEgressIP(egress, ip.String()); err != nil { - ipAllocator.Release(ip) + if rerr := c.externalIPAllocator.ReleaseIP(egress.Spec.ExternalIPPool, ip); rerr != nil && + rerr != externalippool.ErrExternalIPPoolNotFound { + klog.ErrorS(rerr, "Failed to release IP", "ip", ip, "pool", egress.Spec.ExternalIPPool) + } return nil, err } } c.setIPAllocation(egress.Name, ip, egress.Spec.ExternalIPPool) - c.poolQueue.Add(egress.Spec.ExternalIPPool) klog.InfoS("Allocated EgressIP", "egress", egress.Name, "ip", ip, "pool", egress.Spec.ExternalIPPool) return ip, nil } @@ -451,19 +308,20 @@ func (c *EgressController) updateEgressIP(egress *egressv1alpha2.Egress, ip stri } // releaseEgressIP removes the Egress's ipAllocation in the cache and releases the IP to the pool. -func (c *EgressController) releaseEgressIP(egressName string, egressIP net.IP, poolName string) { - c.deleteIPAllocation(egressName) - allocator, exists := c.getIPAllocator(poolName) - if !exists { - klog.ErrorS(externalIPPoolNotFound, "Failed to release EgressIP", "egress", egressName, "ip", egressIP, "pool", poolName) - return - } - if err := allocator.Release(egressIP); err != nil { - klog.ErrorS(err, "Failed to release EgressIP", "egress", egressName, "ip", egressIP, "pool", poolName) - return +func (c *EgressController) releaseEgressIP(egressName string, egressIP net.IP, poolName string) error { + if err := c.externalIPAllocator.ReleaseIP(poolName, egressIP); err != nil { + if err == externalippool.ErrExternalIPPoolNotFound { + // Ignore the error since the external IP Pool could be deleted. + klog.Warningf("Failed to release IP %s because IP Pool %s does not exist", egressIP, poolName) + } else { + klog.ErrorS(err, "Failed to release IP", "ip", egressIP, "pool", poolName) + return err + } + } else { + klog.InfoS("Released EgressIP", "egress", egressName, "ip", egressIP, "pool", poolName) } - c.poolQueue.Add(poolName) - klog.InfoS("Released EgressIP", "egress", egressName, "ip", egressIP, "pool", poolName) + c.deleteIPAllocation(egressName) + return nil } func (c *EgressController) syncEgress(key string) error { @@ -535,31 +393,6 @@ func (c *EgressController) syncEgress(key string) error { return nil } -func (c *EgressController) externalIPPoolWorker() { - for c.processNextExternalIPPoolWorkItem() { - } -} - -func (c *EgressController) processNextExternalIPPoolWorkItem() bool { - key, quit := c.poolQueue.Get() - if quit { - return false - } - defer c.poolQueue.Done(key) - - err := c.updateExternalIPPoolStatus(key.(string)) - if err != nil { - // Put the item back in the workqueue to handle any transient errors. - c.poolQueue.AddRateLimited(key) - klog.ErrorS(err, "Failed to sync ExternalIPPool status", "ExternalIPPool", key) - return true - } - // If no error occurs we Forget this item so it does not get queued again until - // another change happens. - c.poolQueue.Forget(key) - return true -} - func (c *EgressController) enqueueEgressGroup(key string) { klog.V(4).Infof("Adding new key %s to EgressGroup queue", key) c.queue.Add(key) @@ -607,15 +440,6 @@ func (c *EgressController) deleteEgress(obj interface{}) { c.queue.Add(egress.Name) } -// addExternalIPPool processes ExternalIPPool ADD events. It creates an IPAllocator for the pool and triggers -// reconciliation of Egresses that refer to the pool. -func (c *EgressController) addExternalIPPool(obj interface{}) { - pool := obj.(*egressv1alpha2.ExternalIPPool) - klog.InfoS("Processing ExternalIPPool ADD event", "pool", pool.Name, "ipRanges", pool.Spec.IPRanges) - c.createOrUpdateIPAllocator(pool) - c.enqueueEgresses(pool.Name) -} - // enqueueEgresses enqueues all Egresses that refer to the provided ExternalIPPool. func (c *EgressController) enqueueEgresses(poolName string) { objects, _ := c.egressIndexer.ByIndex(externalIPPoolIndex, poolName) @@ -624,23 +448,3 @@ func (c *EgressController) enqueueEgresses(poolName string) { c.queue.Add(egress.Name) } } - -// updateExternalIPPool processes ExternalIPPool UPDATE events. It updates the IPAllocator for the pool and triggers -// reconciliation of Egresses that refer to the pool if the IPAllocator changes. -func (c *EgressController) updateExternalIPPool(_, cur interface{}) { - pool := cur.(*egressv1alpha2.ExternalIPPool) - klog.InfoS("Processing ExternalIPPool UPDATE event", "pool", pool.Name, "ipRanges", pool.Spec.IPRanges) - if c.createOrUpdateIPAllocator(pool) { - c.enqueueEgresses(pool.Name) - } -} - -// deleteExternalIPPool processes ExternalIPPool DELETE events. It deletes the IPAllocator for the pool and triggers -// reconciliation of Egresses that refer to the pool. -func (c *EgressController) deleteExternalIPPool(obj interface{}) { - pool := obj.(*egressv1alpha2.ExternalIPPool) - klog.InfoS("Processing ExternalIPPool DELETE event", "pool", pool.Name, "ipRanges", pool.Spec.IPRanges) - c.deleteIPAllocator(pool.Name) - // Enqueue the Egresses to reclaim the IPs allocated from the pool. - c.enqueueEgresses(pool.Name) -} diff --git a/pkg/controller/egress/controller_test.go b/pkg/controller/egress/controller_test.go index 578babe1d63..025e3af3758 100644 --- a/pkg/controller/egress/controller_test.go +++ b/pkg/controller/egress/controller_test.go @@ -32,6 +32,7 @@ import ( "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/cache" "antrea.io/antrea/pkg/apis/controlplane" "antrea.io/antrea/pkg/apis/crd/v1alpha2" @@ -40,6 +41,7 @@ import ( fakeversioned "antrea.io/antrea/pkg/client/clientset/versioned/fake" crdinformers "antrea.io/antrea/pkg/client/informers/externalversions" "antrea.io/antrea/pkg/controller/egress/store" + "antrea.io/antrea/pkg/controller/externalippool" "antrea.io/antrea/pkg/controller/grouping" ) @@ -123,11 +125,12 @@ func newPod(namespace, name string, labels map[string]string, nodeName string, i type egressController struct { *EgressController - client kubernetes.Interface - crdClient versioned.Interface - informerFactory informers.SharedInformerFactory - crdInformerFactory crdinformers.SharedInformerFactory - groupingController *grouping.GroupEntityController + client kubernetes.Interface + crdClient versioned.Interface + informerFactory informers.SharedInformerFactory + crdInformerFactory crdinformers.SharedInformerFactory + groupingController *grouping.GroupEntityController + externalIPAllocator *externalippool.ExternalIPPoolController } // objects is an initial set of K8s objects that is exposed through the client. @@ -136,15 +139,15 @@ func newController(objects, crdObjects []runtime.Object) *egressController { crdClient := fakeversioned.NewSimpleClientset(crdObjects...) informerFactory := informers.NewSharedInformerFactory(client, resyncPeriod) crdInformerFactory := crdinformers.NewSharedInformerFactory(crdClient, resyncPeriod) + externalIPAllocator := externalippool.NewExternalIPPoolController(crdClient, crdInformerFactory.Crd().V1alpha2().ExternalIPPools()) egressGroupStore := store.NewEgressGroupStore() egressInformer := crdInformerFactory.Crd().V1alpha2().Egresses() - externalIPPoolInformer := crdInformerFactory.Crd().V1alpha2().ExternalIPPools() groupEntityIndex := grouping.NewGroupEntityIndex() groupingController := grouping.NewGroupEntityController(groupEntityIndex, informerFactory.Core().V1().Pods(), informerFactory.Core().V1().Namespaces(), crdInformerFactory.Crd().V1alpha2().ExternalEntities()) - controller := NewEgressController(crdClient, groupEntityIndex, egressInformer, externalIPPoolInformer, egressGroupStore) + controller := NewEgressController(crdClient, groupEntityIndex, egressInformer, externalIPAllocator, egressGroupStore) return &egressController{ controller, client, @@ -152,6 +155,7 @@ func newController(objects, crdObjects []runtime.Object) *egressController { informerFactory, crdInformerFactory, groupingController, + externalIPAllocator, } } @@ -304,6 +308,8 @@ func TestAddEgress(t *testing.T) { controller.crdInformerFactory.Start(stopCh) controller.informerFactory.WaitForCacheSync(stopCh) controller.crdInformerFactory.WaitForCacheSync(stopCh) + go controller.externalIPAllocator.Run(stopCh) + require.True(t, cache.WaitForCacheSync(stopCh, controller.externalIPAllocator.HasSynced)) go controller.groupingInterface.Run(stopCh) go controller.groupingController.Run(stopCh) go controller.Run(stopCh) @@ -341,11 +347,7 @@ func TestAddEgress(t *testing.T) { require.NoError(t, err) assert.Equal(t, tt.expectedEgressIP, gotEgress.Spec.EgressIP) if gotEgress.Spec.ExternalIPPool != "" && gotEgress.Spec.EgressIP != "" { - poolName := gotEgress.Spec.ExternalIPPool - eip, err := controller.crdClient.CrdV1alpha2().ExternalIPPools().Get(context.TODO(), poolName, metav1.GetOptions{}) - require.NoError(t, err) - usage := eip.Status.Usage - assert.Equal(t, 1, usage.Used, "Expected one used IP in EgressIPPool Status") + checkExternalIPPoolUsed(t, controller, gotEgress.Spec.ExternalIPPool, 1) } }) } @@ -359,6 +361,8 @@ func TestUpdateEgress(t *testing.T) { controller.crdInformerFactory.Start(stopCh) controller.informerFactory.WaitForCacheSync(stopCh) controller.crdInformerFactory.WaitForCacheSync(stopCh) + go controller.externalIPAllocator.Run(stopCh) + require.True(t, cache.WaitForCacheSync(stopCh, controller.externalIPAllocator.HasSynced)) go controller.groupingInterface.Run(stopCh) go controller.groupingController.Run(stopCh) go controller.Run(stopCh) @@ -467,8 +471,6 @@ func TestUpdateEgress(t *testing.T) { return !exists, nil }) assert.NoError(t, err, "IP allocation was not deleted after the ExternalIPPool was deleted") - _, exists := controller.getIPAllocator(eipFoo2.Name) - assert.False(t, exists, "IP allocator was not deleted after the ExternalIPPool was deleted") assert.Equal(t, "", gotEgressIP(), "EgressIP was not deleted after the ExternalIPPool was deleted") // Recreate the ExternalIPPool. An EgressIP should be allocated. @@ -690,10 +692,9 @@ func TestSyncEgressIP(t *testing.T) { controller.crdInformerFactory.Start(stopCh) controller.informerFactory.WaitForCacheSync(stopCh) controller.crdInformerFactory.WaitForCacheSync(stopCh) - controller.createOrUpdateIPAllocator(tt.existingExternalIPPool) - for _, egress := range tt.existingEgresses { - controller.updateIPAllocation(egress) - } + go controller.externalIPAllocator.Run(stopCh) + require.True(t, cache.WaitForCacheSync(stopCh, controller.externalIPAllocator.HasSynced)) + controller.restoreIPAllocations(tt.existingEgresses) gotEgressIP, err := controller.syncEgressIP(tt.inputEgress) if tt.expectErr { assert.Error(t, err) @@ -706,60 +707,15 @@ func TestSyncEgressIP(t *testing.T) { } } -func TestCreateOrUpdateIPAllocator(t *testing.T) { - stopCh := make(chan struct{}) - defer close(stopCh) - controller := newController(nil, nil) - controller.informerFactory.Start(stopCh) - controller.crdInformerFactory.Start(stopCh) - controller.informerFactory.WaitForCacheSync(stopCh) - controller.crdInformerFactory.WaitForCacheSync(stopCh) - - ipPool := newExternalIPPool("ipPoolA", "1.1.1.0/30", "", "") - changed := controller.createOrUpdateIPAllocator(ipPool) - assert.True(t, changed) - allocator, exists := controller.getIPAllocator(ipPool.Name) - require.True(t, exists) - assert.Equal(t, 1, len(allocator)) - assert.Equal(t, 2, allocator.Total()) - - // Append a non-strict CIDR, it should handle it correctly. - ipPool.Spec.IPRanges = append(ipPool.Spec.IPRanges, corev1a2.IPRange{CIDR: "1.1.2.1/30"}) - changed = controller.createOrUpdateIPAllocator(ipPool) - assert.True(t, changed) - allocator, exists = controller.getIPAllocator(ipPool.Name) - require.True(t, exists) - assert.Equal(t, 2, len(allocator)) - assert.Equal(t, 4, allocator.Total()) - - ipPool.Spec.IPRanges = append(ipPool.Spec.IPRanges, corev1a2.IPRange{Start: "1.1.3.1", End: "1.1.3.10"}) - changed = controller.createOrUpdateIPAllocator(ipPool) - assert.True(t, changed) - allocator, exists = controller.getIPAllocator(ipPool.Name) - require.True(t, exists) - assert.Equal(t, 3, len(allocator)) - assert.Equal(t, 14, allocator.Total()) - - // IPv6 CIDR shouldn't exclude broadcast address, so total should be increased by 15. - ipPool.Spec.IPRanges = append(ipPool.Spec.IPRanges, corev1a2.IPRange{CIDR: "2021:3::aaa1/124"}) - changed = controller.createOrUpdateIPAllocator(ipPool) - assert.True(t, changed) - allocator, exists = controller.getIPAllocator(ipPool.Name) - require.True(t, exists) - assert.Equal(t, 4, len(allocator)) - assert.Equal(t, 29, allocator.Total()) - - // When there is no change, the method should do nothing and the return value should be false. - changed = controller.createOrUpdateIPAllocator(ipPool) - assert.False(t, changed) - allocator, exists = controller.getIPAllocator(ipPool.Name) - require.True(t, exists) - assert.Equal(t, 4, len(allocator)) - assert.Equal(t, 29, allocator.Total()) -} - func checkExternalIPPoolUsed(t *testing.T, controller *egressController, poolName string, used int) { - ipAllocator, exists := controller.getIPAllocator(poolName) + exists := controller.externalIPAllocator.IPPoolExists(poolName) require.True(t, exists) - assert.Equal(t, used, ipAllocator.Used()) + err := wait.PollImmediate(50*time.Millisecond, 2*time.Second, func() (found bool, err error) { + eip, err := controller.crdClient.CrdV1alpha2().ExternalIPPools().Get(context.TODO(), poolName, metav1.GetOptions{}) + if err != nil { + return false, err + } + return eip.Status.Usage.Used == used, nil + }) + assert.NoError(t, err) } diff --git a/pkg/controller/egress/validate.go b/pkg/controller/egress/validate.go index d77129202f7..5769dd985bc 100644 --- a/pkg/controller/egress/validate.go +++ b/pkg/controller/egress/validate.go @@ -21,64 +21,11 @@ import ( admv1 "k8s.io/api/admission/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/sets" "k8s.io/klog/v2" crdv1alpha2 "antrea.io/antrea/pkg/apis/crd/v1alpha2" ) -func (c *EgressController) ValidateExternalIPPool(review *admv1.AdmissionReview) *admv1.AdmissionResponse { - var result *metav1.Status - var msg string - allowed := true - - klog.V(2).Info("Validating ExternalIPPool", "request", review.Request) - var newObj, oldObj crdv1alpha2.ExternalIPPool - if review.Request.Object.Raw != nil { - if err := json.Unmarshal(review.Request.Object.Raw, &newObj); err != nil { - klog.ErrorS(err, "Error de-serializing current ExternalIPPool") - return newAdmissionResponseForErr(err) - } - } - if review.Request.OldObject.Raw != nil { - if err := json.Unmarshal(review.Request.OldObject.Raw, &oldObj); err != nil { - klog.ErrorS(err, "Error de-serializing old ExternalIPPool") - return newAdmissionResponseForErr(err) - } - } - - switch review.Request.Operation { - case admv1.Create: - // This shouldn't happen with the webhook configuration we include in the Antrea YAML manifests. - klog.V(2).Info("Validating CREATE request for ExternalIPPool") - // Always allow CREATE request. - case admv1.Update: - klog.V(2).Info("Validating UPDATE request for ExternalIPPool") - - oldIPRangeSet := getIPRangeSet(oldObj.Spec.IPRanges) - newIPRangeSet := getIPRangeSet(newObj.Spec.IPRanges) - deletedIPRanges := oldIPRangeSet.Difference(newIPRangeSet) - if deletedIPRanges.Len() > 0 { - allowed = false - msg = fmt.Sprintf("existing IPRanges %v cannot be deleted", deletedIPRanges.List()) - } - case admv1.Delete: - // This shouldn't happen with the webhook configuration we include in the Antrea YAML manifests. - klog.V(2).Info("Validating DELETE request for ExternalIPPool") - // Always allow DELETE request. - } - - if msg != "" { - result = &metav1.Status{ - Message: msg, - } - } - return &admv1.AdmissionResponse{ - Allowed: allowed, - Result: result, - } -} - func (c *EgressController) ValidateEgress(review *admv1.AdmissionReview) *admv1.AdmissionResponse { var result *metav1.Status var msg string @@ -112,12 +59,10 @@ func (c *EgressController) ValidateEgress(review *admv1.AdmissionReview) *admv1. if ip == nil { return false, fmt.Sprintf("IP %s is not valid", newEgress.Spec.EgressIP) } - ipAllocator, exists := c.getIPAllocator(newEgress.Spec.ExternalIPPool) - // The ExternalIPPool doesn't exist, cannot determine whether the IP is in the pool. - if !exists { + if !c.externalIPAllocator.IPPoolExists(newEgress.Spec.ExternalIPPool) { return false, fmt.Sprintf("ExternalIPPool %s does not exist", newEgress.Spec.ExternalIPPool) } - if !ipAllocator.Has(ip) { + if !c.externalIPAllocator.IPPoolHasIP(newEgress.Spec.ExternalIPPool, ip) { return false, fmt.Sprintf("IP %s is not within the IP range", newEgress.Spec.EgressIP) } return true, "" @@ -147,18 +92,6 @@ func (c *EgressController) ValidateEgress(review *admv1.AdmissionReview) *admv1. } } -func getIPRangeSet(ipRanges []crdv1alpha2.IPRange) sets.String { - set := sets.NewString() - for _, ipRange := range ipRanges { - ipRangeStr := ipRange.CIDR - if ipRangeStr == "" { - ipRangeStr = fmt.Sprintf("%s-%s", ipRange.Start, ipRange.End) - } - set.Insert(ipRangeStr) - } - return set -} - func newAdmissionResponseForErr(err error) *admv1.AdmissionResponse { return &admv1.AdmissionResponse{ Result: &metav1.Status{ diff --git a/pkg/controller/egress/validate_test.go b/pkg/controller/egress/validate_test.go index 3b8008ad352..e5496d679f5 100644 --- a/pkg/controller/egress/validate_test.go +++ b/pkg/controller/egress/validate_test.go @@ -19,9 +19,11 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" admv1 "k8s.io/api/admission/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/tools/cache" crdv1alpha2 "antrea.io/antrea/pkg/apis/crd/v1alpha2" ) @@ -31,68 +33,6 @@ func marshal(object runtime.Object) []byte { return raw } -func TestEgressControllerValidateExternalIPPool(t *testing.T) { - tests := []struct { - name string - request *admv1.AdmissionRequest - expectedResponse *admv1.AdmissionResponse - }{ - { - name: "CREATE operation should be allowed", - request: &admv1.AdmissionRequest{ - Name: "foo", - Operation: "CREATE", - Object: runtime.RawExtension{Raw: marshal(newExternalIPPool("foo", "10.10.10.0/24", "", ""))}, - }, - expectedResponse: &admv1.AdmissionResponse{Allowed: true}, - }, - { - name: "Deleting IPRange should not be allowed", - request: &admv1.AdmissionRequest{ - Name: "foo", - Operation: "UPDATE", - OldObject: runtime.RawExtension{Raw: marshal(newExternalIPPool("foo", "10.10.10.0/24", "10.10.20.1", "10.10.20.2"))}, - Object: runtime.RawExtension{Raw: marshal(newExternalIPPool("foo", "10.10.10.0/24", "", ""))}, - }, - expectedResponse: &admv1.AdmissionResponse{ - Allowed: false, - Result: &metav1.Status{ - Message: "existing IPRanges [10.10.20.1-10.10.20.2] cannot be deleted", - }, - }, - }, - { - name: "Adding IPRange should be allowed", - request: &admv1.AdmissionRequest{ - Name: "foo", - Operation: "UPDATE", - OldObject: runtime.RawExtension{Raw: marshal(newExternalIPPool("foo", "10.10.10.0/24", "", ""))}, - Object: runtime.RawExtension{Raw: marshal(newExternalIPPool("foo", "10.10.10.0/24", "10.10.20.1", "10.10.20.2"))}, - }, - expectedResponse: &admv1.AdmissionResponse{Allowed: true}, - }, - { - name: "DELETE operation should be allowed", - request: &admv1.AdmissionRequest{ - Name: "foo", - Operation: "DELETE", - Object: runtime.RawExtension{Raw: marshal(newExternalIPPool("foo", "10.10.10.0/24", "", ""))}, - }, - expectedResponse: &admv1.AdmissionResponse{Allowed: true}, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - c := newController(nil, nil) - review := &admv1.AdmissionReview{ - Request: tt.request, - } - gotResponse := c.ValidateExternalIPPool(review) - assert.Equal(t, tt.expectedResponse, gotResponse) - }) - } -} - func TestEgressControllerValidateEgress(t *testing.T) { tests := []struct { name string @@ -192,14 +132,24 @@ func TestEgressControllerValidateEgress(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - c := newController(nil, nil) + stopCh := make(chan struct{}) + defer close(stopCh) + var objs []runtime.Object if tt.existingExternalIPPool != nil { - c.createOrUpdateIPAllocator(tt.existingExternalIPPool) + objs = append(objs, tt.existingExternalIPPool) } + controller := newController(nil, objs) + controller.informerFactory.Start(stopCh) + controller.crdInformerFactory.Start(stopCh) + controller.informerFactory.WaitForCacheSync(stopCh) + controller.crdInformerFactory.WaitForCacheSync(stopCh) + go controller.externalIPAllocator.Run(stopCh) + require.True(t, cache.WaitForCacheSync(stopCh, controller.externalIPAllocator.HasSynced)) + controller.externalIPAllocator.RestoreIPAllocations(nil) review := &admv1.AdmissionReview{ Request: tt.request, } - gotResponse := c.ValidateEgress(review) + gotResponse := controller.ValidateEgress(review) assert.Equal(t, tt.expectedResponse, gotResponse) }) } diff --git a/pkg/controller/externalippool/controller.go b/pkg/controller/externalippool/controller.go new file mode 100644 index 00000000000..6941ab1e16c --- /dev/null +++ b/pkg/controller/externalippool/controller.go @@ -0,0 +1,447 @@ +// Copyright 2021 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package externalippool + +import ( + "context" + "errors" + "fmt" + "net" + "sync" + "sync/atomic" + "time" + + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/retry" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + utilnet "k8s.io/utils/net" + + antreacrds "antrea.io/antrea/pkg/apis/crd/v1alpha2" + clientset "antrea.io/antrea/pkg/client/clientset/versioned" + antreainformers "antrea.io/antrea/pkg/client/informers/externalversions/crd/v1alpha2" + antrealisters "antrea.io/antrea/pkg/client/listers/crd/v1alpha2" + "antrea.io/antrea/pkg/controller/metrics" + "antrea.io/antrea/pkg/ipam/ipallocator" + iputil "antrea.io/antrea/pkg/util/ip" +) + +const ( + controllerName = "ExternalIPPoolController" + // Set resyncPeriod to 0 to disable resyncing. + resyncPeriod time.Duration = 0 + // How long to wait before retrying the processing of an ExternalIPPool change. + minRetryDelay = 5 * time.Second + maxRetryDelay = 300 * time.Second + // Default number of workers processing an ExternalIPPool change. + defaultWorkers = 4 +) + +var ( + ErrExternalIPPoolNotFound = errors.New("ExternalIPPool not found") +) + +// IPAllocation contains the IP and the IP Pool which allocates it. +type IPAllocation struct { + // ObjectReference is useful to track the owner of this IP allocation. + ObjectReference corev1.ObjectReference + // IPPoolName is the name of the IP pool. + IPPoolName string + // IP is the allocated IP. + IP net.IP +} + +// ExternalIPPoolEventHandler defines a consumer to subscribe for external ExternalIPPool events. +type ExternalIPPoolEventHandler func(externalIPPool string) + +type ExternalIPAllocator interface { + // AddEventHandler adds a consumer for ExternalIPPool events. It will block other consumers from allocating new IPs by + // AllocateIPFromPool() or AllocateIP() until it calls RestoreIPAllocations(). + AddEventHandler(handler ExternalIPPoolEventHandler) + // RestoreIPAllocations is used to restore the previous allocated IPs after controller restarts. It will return the + // succeeded IP Allocations. + RestoreIPAllocations(allocations []IPAllocation) []IPAllocation + // AllocateIPFromPool allocates an IP from the given IP pool. + AllocateIPFromPool(externalIPPool string) (net.IP, error) + // AllocateIP allocates an IP from an available IP pool. + AllocateIP() (string, net.IP, error) + // IPPoolExists checks whether the IP pool exists. + IPPoolExists(externalIPPool string) bool + // IPPoolHasIP checks whether the IP pool contains the given IP. + IPPoolHasIP(externalIPPool string, ip net.IP) bool + // LocateIP finds which IP Pool contains the given IP. + LocateIP(ip net.IP) (string, error) + // UpdateIPAllocation marks the IP in the specified ExternalIPPool as occupied. + UpdateIPAllocation(externalIPPool string, ip net.IP) error + // ReleaseIP releases the IP to the IP pool. + ReleaseIP(externalIPPool string, ip net.IP) error + // HasSynced indicates ExternalIPAllocator has finished syncing all ExternalIPPool resources. + HasSynced() bool +} + +var _ ExternalIPAllocator = (*ExternalIPPoolController)(nil) + +// ExternalIPPoolController is responsible for synchronizing the ExternalIPPool resources. +type ExternalIPPoolController struct { + crdClient clientset.Interface + externalIPPoolLister antrealisters.ExternalIPPoolLister + externalIPPoolListerSynced cache.InformerSynced + + // ipAllocatorMap is a map from ExternalIPPool name to MultiIPAllocator. + ipAllocatorMap map[string]ipallocator.MultiIPAllocator + ipAllocatorMutex sync.RWMutex + + // ipAllocatorInitialized stores a boolean value, which tracks if the ipAllocatorMap has been initialized + // with the full list of ExternalIPPool. + ipAllocatorInitialized *atomic.Value + + // handlers is an array of handlers will be notified when ExternalIPPool updates. + handlers []ExternalIPPoolEventHandler + handlersWaitGroup sync.WaitGroup + + // queue maintains the ExternalIPPool objects that need to be synced. + queue workqueue.RateLimitingInterface +} + +// NewExternalIPPoolController returns a new *ExternalIPPoolController. +func NewExternalIPPoolController(crdClient clientset.Interface, externalIPPoolInformer antreainformers.ExternalIPPoolInformer) *ExternalIPPoolController { + c := &ExternalIPPoolController{ + crdClient: crdClient, + externalIPPoolLister: externalIPPoolInformer.Lister(), + externalIPPoolListerSynced: externalIPPoolInformer.Informer().HasSynced, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "externalIPPool"), + ipAllocatorInitialized: &atomic.Value{}, + ipAllocatorMap: make(map[string]ipallocator.MultiIPAllocator), + } + externalIPPoolInformer.Informer().AddEventHandlerWithResyncPeriod( + cache.ResourceEventHandlerFuncs{ + AddFunc: c.addExternalIPPool, + UpdateFunc: c.updateExternalIPPool, + DeleteFunc: c.deleteExternalIPPool, + }, + resyncPeriod, + ) + c.ipAllocatorInitialized.Store(false) + return c +} + +func (c *ExternalIPPoolController) HasSynced() bool { + return c.ipAllocatorInitialized.Load().(bool) +} + +func (c *ExternalIPPoolController) AddEventHandler(handler ExternalIPPoolEventHandler) { + c.handlers = append(c.handlers, handler) + c.handlersWaitGroup.Add(1) +} + +func (c *ExternalIPPoolController) RestoreIPAllocations(allocations []IPAllocation) []IPAllocation { + var succeeded []IPAllocation + for _, allocation := range allocations { + if err := c.UpdateIPAllocation(allocation.IPPoolName, allocation.IP); err != nil { + klog.ErrorS(err, "Failed to restore IP allocation", "ip", allocation.IP, "ipPool", allocation.IPPoolName) + } else { + succeeded = append(succeeded, allocation) + } + } + c.handlersWaitGroup.Done() + return succeeded +} + +// Run begins watching and syncing of the ExternalIPPoolController. +func (c *ExternalIPPoolController) Run(stopCh <-chan struct{}) { + defer c.queue.ShutDown() + + klog.Infof("Starting %s", controllerName) + defer klog.Infof("Shutting down %s", controllerName) + + cacheSyncs := []cache.InformerSynced{c.externalIPPoolListerSynced} + if !cache.WaitForNamedCacheSync(controllerName, stopCh, cacheSyncs...) { + return + } + + // Initialize the ipAllocatorMap with the existing ExternalIPPools. + ipPools, _ := c.externalIPPoolLister.List(labels.Everything()) + for _, ipPool := range ipPools { + c.createOrUpdateIPAllocator(ipPool) + } + + c.ipAllocatorInitialized.Store(true) + + for i := 0; i < defaultWorkers; i++ { + go wait.Until(c.worker, time.Second, stopCh) + } + <-stopCh +} + +// createOrUpdateIPAllocator creates or updates the IP allocator based on the provided ExternalIPPool. +// Currently it's assumed that only new ranges will be added and existing ranges should not be deleted. +// TODO: Use validation webhook to ensure it. +func (c *ExternalIPPoolController) createOrUpdateIPAllocator(ipPool *antreacrds.ExternalIPPool) bool { + changed := false + c.ipAllocatorMutex.Lock() + defer c.ipAllocatorMutex.Unlock() + + existingIPRanges := sets.NewString() + multiIPAllocator, exists := c.ipAllocatorMap[ipPool.Name] + if !exists { + multiIPAllocator = ipallocator.MultiIPAllocator{} + changed = true + } else { + existingIPRanges.Insert(multiIPAllocator.Names()...) + } + + for _, ipRange := range ipPool.Spec.IPRanges { + ipAllocator, err := func() (*ipallocator.SingleIPAllocator, error) { + if ipRange.CIDR != "" { + _, ipNet, err := net.ParseCIDR(ipRange.CIDR) + if err != nil { + return nil, err + } + // Must use normalized IPNet string to check if the IP range exists. Otherwise non-strict CIDR like + // 192.168.0.1/24 will be considered new even if it doesn't change. + // Validating or normalizing the input CIDR should be a better solution but the externalIPPools that + // have been created will still have this issue, so we just normalize the CIDR when using it. + if existingIPRanges.Has(ipNet.String()) { + return nil, nil + } + // Don't use the IPv4 network's broadcast address. + var reservedIPs []net.IP + if utilnet.IsIPv4CIDR(ipNet) { + reservedIPs = append(reservedIPs, iputil.GetLocalBroadcastIP(ipNet)) + } + return ipallocator.NewCIDRAllocator(ipNet, reservedIPs) + } else { + if existingIPRanges.Has(fmt.Sprintf("%s-%s", ipRange.Start, ipRange.End)) { + return nil, nil + } + return ipallocator.NewIPRangeAllocator(net.ParseIP(ipRange.Start), net.ParseIP(ipRange.End)) + } + }() + if err != nil { + klog.ErrorS(err, "Failed to create IPAllocator", "ipRange", ipRange) + continue + } + // The IP range already exists in multiIPAllocator. + if ipAllocator == nil { + continue + } + multiIPAllocator = append(multiIPAllocator, ipAllocator) + changed = true + } + c.ipAllocatorMap[ipPool.Name] = multiIPAllocator + c.queue.Add(ipPool.Name) + return changed +} + +// deleteIPAllocator deletes the IP allocator of the given IP pool. +func (c *ExternalIPPoolController) deleteIPAllocator(poolName string) { + c.ipAllocatorMutex.Lock() + defer c.ipAllocatorMutex.Unlock() + delete(c.ipAllocatorMap, poolName) +} + +// getIPAllocator gets the IP allocator of the given IP pool. +func (c *ExternalIPPoolController) getIPAllocator(poolName string) (ipallocator.MultiIPAllocator, bool) { + c.ipAllocatorMutex.RLock() + defer c.ipAllocatorMutex.RUnlock() + ipAllocator, exists := c.ipAllocatorMap[poolName] + return ipAllocator, exists +} + +// AllocateIPFromPool allocates an IP from the the given IP pool. +func (c *ExternalIPPoolController) AllocateIPFromPool(ipPoolName string) (net.IP, error) { + c.handlersWaitGroup.Wait() + ipAllocator, exists := c.getIPAllocator(ipPoolName) + if !exists { + return nil, ErrExternalIPPoolNotFound + } + ip, err := ipAllocator.AllocateNext() + if err != nil { + return ip, err + } + c.queue.Add(ipPoolName) + return ip, nil +} + +// AllocateIP allocate an IP from an available IP pool. +func (c *ExternalIPPoolController) AllocateIP() (string, net.IP, error) { + c.handlersWaitGroup.Wait() + c.ipAllocatorMutex.RLock() + defer c.ipAllocatorMutex.RUnlock() + for pool, allocator := range c.ipAllocatorMap { + ip, err := allocator.AllocateNext() + if err == nil { + c.queue.Add(pool) + return pool, ip, nil + } + } + return "", nil, errors.New("no ExternalIPPool available") +} + +// UpdateIPAllocation sets the IP in the specified ExternalIPPool. +func (c *ExternalIPPoolController) UpdateIPAllocation(poolName string, ip net.IP) error { + ipAllocator, exists := c.getIPAllocator(poolName) + if !exists { + return ErrExternalIPPoolNotFound + } + err := ipAllocator.AllocateIP(ip) + if err != nil { + return err + } + c.queue.Add(poolName) + return nil +} + +func (c *ExternalIPPoolController) updateExternalIPPoolStatus(poolName string) error { + eip, err := c.externalIPPoolLister.Get(poolName) + if err != nil { + if apierrors.IsNotFound(err) { + return nil + } + return err + } + ipAllocator, exists := c.getIPAllocator(eip.Name) + if !exists { + return ErrExternalIPPoolNotFound + } + total, used := ipAllocator.Total(), ipAllocator.Used() + toUpdate := eip.DeepCopy() + var getErr error + if err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + actualStatus := eip.Status + usage := antreacrds.ExternalIPPoolUsage{Total: total, Used: used} + if actualStatus.Usage == usage { + return nil + } + klog.V(2).InfoS("Updating ExternalIPPool status", "ExternalIPPool", poolName, "usage", usage) + toUpdate.Status.Usage = usage + if _, updateErr := c.crdClient.CrdV1alpha2().ExternalIPPools().UpdateStatus(context.TODO(), toUpdate, metav1.UpdateOptions{}); updateErr != nil && apierrors.IsConflict(updateErr) { + toUpdate, getErr = c.crdClient.CrdV1alpha2().ExternalIPPools().Get(context.TODO(), poolName, metav1.GetOptions{}) + if getErr != nil { + return getErr + } + return updateErr + } + return nil + }); err != nil { + return fmt.Errorf("updating ExternalIPPool %s status error: %v", poolName, err) + } + klog.V(2).InfoS("Updated ExternalIPPool status", "ExternalIPPool", poolName) + metrics.AntreaExternalIPPoolStatusUpdates.Inc() + return nil +} + +// ReleaseIP releases the IP to the pool. +func (c *ExternalIPPoolController) ReleaseIP(poolName string, ip net.IP) error { + allocator, exists := c.getIPAllocator(poolName) + if !exists { + return ErrExternalIPPoolNotFound + } + if err := allocator.Release(ip); err != nil { + return err + } + c.queue.Add(poolName) + return nil +} + +func (c *ExternalIPPoolController) IPPoolHasIP(poolName string, ip net.IP) bool { + allocator, exists := c.getIPAllocator(poolName) + if !exists { + return false + } + return allocator.Has(ip) +} + +func (c *ExternalIPPoolController) IPPoolExists(pool string) bool { + _, exists := c.getIPAllocator(pool) + return exists +} + +func (c *ExternalIPPoolController) LocateIP(ip net.IP) (string, error) { + c.ipAllocatorMutex.RLock() + defer c.ipAllocatorMutex.RUnlock() + for pool, allocator := range c.ipAllocatorMap { + if allocator.Has(ip) { + return pool, nil + } + } + return "", ErrExternalIPPoolNotFound +} + +func (c *ExternalIPPoolController) worker() { + for c.processNextWorkItem() { + } +} + +func (c *ExternalIPPoolController) processNextWorkItem() bool { + key, quit := c.queue.Get() + if quit { + return false + } + defer c.queue.Done(key) + + err := c.updateExternalIPPoolStatus(key.(string)) + if err != nil { + // Put the item back in the workqueue to handle any transient errors. + c.queue.AddRateLimited(key) + klog.ErrorS(err, "Failed to sync ExternalIPPool status", "ExternalIPPool", key) + return true + } + // If no error occurs we Forget this item so it does not get queued again until + // another change happens. + c.queue.Forget(key) + return true +} + +// addExternalIPPool processes ExternalIPPool ADD events. It creates an IPAllocator for the pool and triggers +// reconciliation of consumers that refer to the pool. +func (c *ExternalIPPoolController) addExternalIPPool(obj interface{}) { + pool := obj.(*antreacrds.ExternalIPPool) + klog.InfoS("Processing ExternalIPPool ADD event", "pool", pool.Name, "ipRanges", pool.Spec.IPRanges) + c.createOrUpdateIPAllocator(pool) + for _, h := range c.handlers { + h(pool.Name) + } +} + +// updateExternalIPPool processes ExternalIPPool UPDATE events. It updates the IPAllocator for the pool and triggers +// reconciliation of consumers that refer to the pool if the IPAllocator changes. +func (c *ExternalIPPoolController) updateExternalIPPool(_, cur interface{}) { + pool := cur.(*antreacrds.ExternalIPPool) + klog.InfoS("Processing ExternalIPPool UPDATE event", "pool", pool.Name, "ipRanges", pool.Spec.IPRanges) + if c.createOrUpdateIPAllocator(pool) { + for _, h := range c.handlers { + h(pool.Name) + } + } +} + +// deleteExternalIPPool processes ExternalIPPool DELETE events. It deletes the IPAllocator for the pool and triggers +// reconciliation of all consumers that refer to the pool. +func (c *ExternalIPPoolController) deleteExternalIPPool(obj interface{}) { + pool := obj.(*antreacrds.ExternalIPPool) + klog.InfoS("Processing ExternalIPPool DELETE event", "pool", pool.Name, "ipRanges", pool.Spec.IPRanges) + c.deleteIPAllocator(pool.Name) + // Call consumers to reclaim the IPs allocated from the pool. + for _, h := range c.handlers { + h(pool.Name) + } +} diff --git a/pkg/controller/externalippool/controller_test.go b/pkg/controller/externalippool/controller_test.go new file mode 100644 index 00000000000..f74b284b796 --- /dev/null +++ b/pkg/controller/externalippool/controller_test.go @@ -0,0 +1,791 @@ +// Copyright 2021 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package externalippool + +import ( + "context" + "net" + "sort" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/cache" + + antreacrds "antrea.io/antrea/pkg/apis/crd/v1alpha2" + "antrea.io/antrea/pkg/client/clientset/versioned" + fakeversioned "antrea.io/antrea/pkg/client/clientset/versioned/fake" + crdinformers "antrea.io/antrea/pkg/client/informers/externalversions" +) + +func newExternalIPPool(name, cidr, start, end string) *antreacrds.ExternalIPPool { + pool := &antreacrds.ExternalIPPool{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + } + if len(cidr) > 0 { + pool.Spec.IPRanges = append(pool.Spec.IPRanges, antreacrds.IPRange{CIDR: cidr}) + } + if len(start) > 0 && len(end) > 0 { + pool.Spec.IPRanges = append(pool.Spec.IPRanges, antreacrds.IPRange{Start: start, End: end}) + } + return pool +} + +type controller struct { + *ExternalIPPoolController + crdClient versioned.Interface + crdInformerFactory crdinformers.SharedInformerFactory +} + +// objects is an initial set of K8s objects that is exposed through the client. +func newController(crdObjects []runtime.Object) *controller { + crdClient := fakeversioned.NewSimpleClientset(crdObjects...) + crdInformerFactory := crdinformers.NewSharedInformerFactory(crdClient, resyncPeriod) + externalIPPoolController := NewExternalIPPoolController(crdClient, crdInformerFactory.Crd().V1alpha2().ExternalIPPools()) + return &controller{ + externalIPPoolController, + crdClient, + crdInformerFactory, + } +} + +func TestAllocateIP(t *testing.T) { + tests := []struct { + name string + ipPools []*antreacrds.ExternalIPPool + allocatedIP []struct { + ip string + pool string + } + allocateFrom string + expectedIP string + expectError bool + expectedIPPoolStatus []antreacrds.ExternalIPPoolUsage + }{ + { + name: "allocate from proper IP pool", + ipPools: []*antreacrds.ExternalIPPool{ + newExternalIPPool("eip1", "", "10.10.10.2", "10.10.10.3"), + }, + allocatedIP: nil, + allocateFrom: "eip1", + expectedIP: "10.10.10.2", + expectError: false, + expectedIPPoolStatus: []antreacrds.ExternalIPPoolUsage{ + {Total: 2, Used: 1}, + }, + }, + { + name: "allocate from exhausted IP pool", + ipPools: []*antreacrds.ExternalIPPool{ + newExternalIPPool("eip1", "", "10.10.10.2", "10.10.10.3"), + }, + allocatedIP: []struct { + ip string + pool string + }{ + {"10.10.10.2", "eip1"}, + {"10.10.10.3", "eip1"}, + }, + allocateFrom: "eip1", + expectedIP: "", + expectError: true, + expectedIPPoolStatus: []antreacrds.ExternalIPPoolUsage{ + {Total: 2, Used: 2}, + }, + }, + { + name: "allocate from non existing IP pool", + ipPools: []*antreacrds.ExternalIPPool{ + newExternalIPPool("eip1", "", "10.10.10.2", "10.10.10.3"), + }, + allocatedIP: nil, + allocateFrom: "eip2", + expectedIP: "", + expectError: true, + expectedIPPoolStatus: []antreacrds.ExternalIPPoolUsage{ + {Total: 2, Used: 0}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + stopCh := make(chan struct{}) + defer close(stopCh) + var fakeCRDObjects []runtime.Object + for _, p := range tt.ipPools { + fakeCRDObjects = append(fakeCRDObjects, p) + } + controller := newController(fakeCRDObjects) + controller.crdInformerFactory.Start(stopCh) + controller.crdInformerFactory.WaitForCacheSync(stopCh) + go controller.Run(stopCh) + require.True(t, cache.WaitForCacheSync(stopCh, controller.HasSynced)) + for _, alloc := range tt.allocatedIP { + require.NoError(t, controller.UpdateIPAllocation(alloc.pool, net.ParseIP(alloc.ip))) + } + ipGot, err := controller.AllocateIPFromPool(tt.allocateFrom) + assert.Equal(t, tt.expectError, err != nil) + assert.Equal(t, net.ParseIP(tt.expectedIP), ipGot) + for idx, pool := range tt.ipPools { + checkExternalIPPoolStatus(t, controller, pool.Name, tt.expectedIPPoolStatus[idx]) + } + }) + } +} + +func TestAllocateIPFromPool(t *testing.T) { + tests := []struct { + name string + ipPools []*antreacrds.ExternalIPPool + allocatedIP []struct { + ip string + pool string + } + expectedIPPool string + expectedIP string + expectError bool + expectedIPPoolStatus []antreacrds.ExternalIPPoolUsage + }{ + { + name: "allocate from proper IP pool", + ipPools: []*antreacrds.ExternalIPPool{ + newExternalIPPool("eip1", "", "10.10.10.2", "10.10.10.3"), + }, + allocatedIP: nil, + expectedIPPool: "eip1", + expectedIP: "10.10.10.2", + expectError: false, + expectedIPPoolStatus: []antreacrds.ExternalIPPoolUsage{ + {Total: 2, Used: 1}, + }, + }, + { + name: "allocate from IP pools and one is full", + ipPools: []*antreacrds.ExternalIPPool{ + newExternalIPPool("eip1", "", "10.10.10.2", "10.10.10.3"), + newExternalIPPool("eip2", "", "10.10.11.2", "10.10.11.3"), + }, + allocatedIP: []struct { + ip string + pool string + }{ + {"10.10.10.2", "eip1"}, + {"10.10.10.3", "eip1"}, + }, + expectedIPPool: "eip2", + expectedIP: "10.10.11.2", + expectError: false, + expectedIPPoolStatus: []antreacrds.ExternalIPPoolUsage{ + {Total: 2, Used: 2}, + {Total: 2, Used: 1}, + }, + }, + { + name: "allocate from IP pools and all are full", + ipPools: []*antreacrds.ExternalIPPool{ + newExternalIPPool("eip1", "", "10.10.10.2", "10.10.10.3"), + newExternalIPPool("eip2", "", "10.10.11.2", "10.10.11.3"), + }, + allocatedIP: []struct { + ip string + pool string + }{ + {"10.10.10.2", "eip1"}, + {"10.10.10.3", "eip1"}, + {"10.10.11.2", "eip2"}, + {"10.10.11.3", "eip2"}, + }, + expectedIPPool: "", + expectedIP: "", + expectError: true, + expectedIPPoolStatus: []antreacrds.ExternalIPPoolUsage{ + {Total: 2, Used: 2}, + {Total: 2, Used: 2}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + stopCh := make(chan struct{}) + defer close(stopCh) + var fakeCRDObjects []runtime.Object + for _, p := range tt.ipPools { + fakeCRDObjects = append(fakeCRDObjects, p) + } + controller := newController(fakeCRDObjects) + controller.crdInformerFactory.Start(stopCh) + controller.crdInformerFactory.WaitForCacheSync(stopCh) + go controller.Run(stopCh) + require.True(t, cache.WaitForCacheSync(stopCh, controller.HasSynced)) + for _, alloc := range tt.allocatedIP { + require.NoError(t, controller.UpdateIPAllocation(alloc.pool, net.ParseIP(alloc.ip))) + } + ipPoolGot, ipGot, err := controller.AllocateIP() + assert.Equal(t, tt.expectError, err != nil) + assert.Equal(t, net.ParseIP(tt.expectedIP), ipGot) + assert.Equal(t, tt.expectedIPPool, ipPoolGot) + for idx, pool := range tt.ipPools { + checkExternalIPPoolStatus(t, controller, pool.Name, tt.expectedIPPoolStatus[idx]) + } + }) + } +} + +func TestReleaseIP(t *testing.T) { + tests := []struct { + name string + ipPools []*antreacrds.ExternalIPPool + allocatedIP []struct { + ip string + pool string + } + ipPoolToRelease string + ipToRelease string + expectError bool + expectedIPPoolStatus []antreacrds.ExternalIPPoolUsage + }{ + { + name: "release IP to pool", + ipPools: []*antreacrds.ExternalIPPool{ + newExternalIPPool("eip1", "", "10.10.10.2", "10.10.10.3"), + }, + allocatedIP: []struct { + ip string + pool string + }{ + {"10.10.10.2", "eip1"}, + {"10.10.10.3", "eip1"}, + }, + ipPoolToRelease: "eip1", + ipToRelease: "10.10.10.2", + expectError: false, + expectedIPPoolStatus: []antreacrds.ExternalIPPoolUsage{ + {Total: 2, Used: 1}, + }, + }, + { + name: "release unknown IP to pool", + ipPools: []*antreacrds.ExternalIPPool{ + newExternalIPPool("eip1", "", "10.10.10.2", "10.10.10.3"), + }, + allocatedIP: []struct { + ip string + pool string + }{ + {"10.10.10.2", "eip1"}, + {"10.10.10.3", "eip1"}, + }, + ipPoolToRelease: "eip1", + ipToRelease: "10.10.11.2", + expectError: true, + expectedIPPoolStatus: []antreacrds.ExternalIPPoolUsage{ + {Total: 2, Used: 2}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + stopCh := make(chan struct{}) + defer close(stopCh) + var fakeCRDObjects []runtime.Object + for _, p := range tt.ipPools { + fakeCRDObjects = append(fakeCRDObjects, p) + } + controller := newController(fakeCRDObjects) + controller.crdInformerFactory.Start(stopCh) + controller.crdInformerFactory.WaitForCacheSync(stopCh) + go controller.Run(stopCh) + require.True(t, cache.WaitForCacheSync(stopCh, controller.HasSynced)) + for _, alloc := range tt.allocatedIP { + require.NoError(t, controller.UpdateIPAllocation(alloc.pool, net.ParseIP(alloc.ip))) + } + err := controller.ReleaseIP(tt.ipPoolToRelease, net.ParseIP(tt.ipToRelease)) + assert.Equal(t, tt.expectError, err != nil) + for idx, pool := range tt.ipPools { + checkExternalIPPoolStatus(t, controller, pool.Name, tt.expectedIPPoolStatus[idx]) + } + }) + } +} + +func TestCreateOrUpdateIPAllocator(t *testing.T) { + stopCh := make(chan struct{}) + defer close(stopCh) + controller := newController(nil) + controller.crdInformerFactory.Start(stopCh) + controller.crdInformerFactory.WaitForCacheSync(stopCh) + + ipPool := newExternalIPPool("ipPoolA", "1.1.1.0/30", "", "") + changed := controller.createOrUpdateIPAllocator(ipPool) + assert.True(t, changed) + allocator, exists := controller.getIPAllocator(ipPool.Name) + require.True(t, exists) + assert.Equal(t, 1, len(allocator)) + assert.Equal(t, 2, allocator.Total()) + + // Append a non-strict CIDR, it should handle it correctly. + ipPool.Spec.IPRanges = append(ipPool.Spec.IPRanges, antreacrds.IPRange{CIDR: "1.1.2.1/30"}) + changed = controller.createOrUpdateIPAllocator(ipPool) + assert.True(t, changed) + allocator, exists = controller.getIPAllocator(ipPool.Name) + require.True(t, exists) + assert.Equal(t, 2, len(allocator)) + assert.Equal(t, 4, allocator.Total()) + + ipPool.Spec.IPRanges = append(ipPool.Spec.IPRanges, antreacrds.IPRange{Start: "1.1.3.1", End: "1.1.3.10"}) + changed = controller.createOrUpdateIPAllocator(ipPool) + assert.True(t, changed) + allocator, exists = controller.getIPAllocator(ipPool.Name) + require.True(t, exists) + assert.Equal(t, 3, len(allocator)) + assert.Equal(t, 14, allocator.Total()) + + // IPv6 CIDR shouldn't exclude broadcast address, so total should be increased by 15. + ipPool.Spec.IPRanges = append(ipPool.Spec.IPRanges, antreacrds.IPRange{CIDR: "2021:3::aaa1/124"}) + changed = controller.createOrUpdateIPAllocator(ipPool) + assert.True(t, changed) + allocator, exists = controller.getIPAllocator(ipPool.Name) + require.True(t, exists) + assert.Equal(t, 4, len(allocator)) + assert.Equal(t, 29, allocator.Total()) + + // When there is no change, the method should do nothing and the return value should be false. + changed = controller.createOrUpdateIPAllocator(ipPool) + assert.False(t, changed) + allocator, exists = controller.getIPAllocator(ipPool.Name) + require.True(t, exists) + assert.Equal(t, 4, len(allocator)) + assert.Equal(t, 29, allocator.Total()) +} + +func TestIPPoolEvents(t *testing.T) { + stopCh := make(chan struct{}) + defer close(stopCh) + context, cancel := context.WithCancel(context.Background()) + defer cancel() + controller := newController(nil) + consumerCh := make(chan string) + controller.AddEventHandler( + func(ippool string) { + consumerCh <- ippool + }) + controller.crdInformerFactory.Start(stopCh) + controller.crdInformerFactory.WaitForCacheSync(stopCh) + go controller.Run(stopCh) + require.True(t, cache.WaitForCacheSync(stopCh, controller.HasSynced)) + // ADD event + eip, err := controller.crdClient.CrdV1alpha2().ExternalIPPools().Create(context, + newExternalIPPool("eip1", "", "10.10.10.2", "10.10.10.3"), + metav1.CreateOptions{}, + ) + require.NoError(t, err) + assert.Equal(t, "eip1", <-consumerCh) + // UPDATE event + eip.Spec.IPRanges[0].End = "10.10.10.4" + eip, err = controller.crdClient.CrdV1alpha2().ExternalIPPools().Update(context, + eip, + metav1.UpdateOptions{}, + ) + require.NoError(t, err) + assert.Equal(t, "eip1", <-consumerCh) + // DELETE event + err = controller.crdClient.CrdV1alpha2().ExternalIPPools().Delete(context, + eip.Name, + metav1.DeleteOptions{}, + ) + require.NoError(t, err) + assert.Equal(t, "eip1", <-consumerCh) +} + +func TestConsumersRestoreIPAllocation(t *testing.T) { + stopCh := make(chan struct{}) + defer close(stopCh) + eip := newExternalIPPool("eip1", "", "10.10.10.2", "10.10.10.10") + controller := newController([]runtime.Object{eip}) + controller.AddEventHandler(func(ippool string) {}) + controller.AddEventHandler(func(ippool string) {}) + controller.crdInformerFactory.Start(stopCh) + controller.crdInformerFactory.WaitForCacheSync(stopCh) + go controller.Run(stopCh) + require.True(t, cache.WaitForCacheSync(stopCh, controller.HasSynced)) + allocatedIPCh := make(chan string) + go func() { + allocatedIPs := []IPAllocation{ + { + IPPoolName: "eip1", + IP: net.ParseIP("10.10.10.2"), + }, + } + restored := controller.RestoreIPAllocations(allocatedIPs) + assert.Equal(t, allocatedIPs, restored) + ip, err := controller.AllocateIPFromPool("eip1") + assert.NoError(t, err) + allocatedIPCh <- ip.String() + }() + go func() { + allocatedIPs := []IPAllocation{ + { + IPPoolName: "eip1", + IP: net.ParseIP("10.10.10.3"), + }, + } + restored := controller.RestoreIPAllocations(allocatedIPs) + assert.Equal(t, allocatedIPs, restored) + ip, err := controller.AllocateIPFromPool("eip1") + assert.NoError(t, err) + allocatedIPCh <- ip.String() + }() + var allocated [2]string + for idx := 0; idx < len(allocated); idx++ { + allocated[idx] = <-allocatedIPCh + } + sort.Strings(allocated[:]) + assert.Equal(t, "10.10.10.4", allocated[0]) + assert.Equal(t, "10.10.10.5", allocated[1]) +} + +func TestIPPoolExists(t *testing.T) { + tests := []struct { + name string + ipPools []*antreacrds.ExternalIPPool + ipPoolToCheck string + expectedExists bool + }{ + { + name: "check for existing IPPool", + ipPools: []*antreacrds.ExternalIPPool{ + newExternalIPPool("eip1", "", "10.10.10.2", "10.10.10.3"), + }, + ipPoolToCheck: "eip1", + expectedExists: true, + }, + { + name: "check for non-existing IPPool", + ipPools: []*antreacrds.ExternalIPPool{ + newExternalIPPool("eip1", "", "10.10.10.2", "10.10.10.3"), + }, + ipPoolToCheck: "eip2", + expectedExists: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + stopCh := make(chan struct{}) + defer close(stopCh) + var fakeCRDObjects []runtime.Object + for _, p := range tt.ipPools { + fakeCRDObjects = append(fakeCRDObjects, p) + } + controller := newController(fakeCRDObjects) + controller.crdInformerFactory.Start(stopCh) + controller.crdInformerFactory.WaitForCacheSync(stopCh) + go controller.Run(stopCh) + require.True(t, cache.WaitForCacheSync(stopCh, controller.HasSynced)) + exists := controller.IPPoolExists(tt.ipPoolToCheck) + assert.Equal(t, tt.expectedExists, exists) + }) + } +} + +func TestIPPoolHasIP(t *testing.T) { + tests := []struct { + name string + ipPools []*antreacrds.ExternalIPPool + ipPoolToCheck string + ipToCheck net.IP + expectedExists bool + }{ + { + name: "check for existing IP in IPPool", + ipPools: []*antreacrds.ExternalIPPool{ + newExternalIPPool("eip1", "", "10.10.10.2", "10.10.10.3"), + }, + ipPoolToCheck: "eip1", + ipToCheck: net.ParseIP("10.10.10.2"), + expectedExists: true, + }, + { + name: "check for non-existing IP in IPPool", + ipPools: []*antreacrds.ExternalIPPool{ + newExternalIPPool("eip1", "", "10.10.10.2", "10.10.10.3"), + }, + ipPoolToCheck: "eip1", + ipToCheck: net.ParseIP("10.10.10.1"), + expectedExists: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + stopCh := make(chan struct{}) + defer close(stopCh) + var fakeCRDObjects []runtime.Object + for _, p := range tt.ipPools { + fakeCRDObjects = append(fakeCRDObjects, p) + } + controller := newController(fakeCRDObjects) + controller.crdInformerFactory.Start(stopCh) + controller.crdInformerFactory.WaitForCacheSync(stopCh) + go controller.Run(stopCh) + require.True(t, cache.WaitForCacheSync(stopCh, controller.HasSynced)) + exists := controller.IPPoolHasIP(tt.ipPoolToCheck, tt.ipToCheck) + assert.Equal(t, tt.expectedExists, exists) + }) + } +} + +func TestLocateIP(t *testing.T) { + tests := []struct { + name string + ipPools []*antreacrds.ExternalIPPool + ipToCheck net.IP + expectedIPPool string + expectedError bool + }{ + { + name: "check for known IP 1", + ipPools: []*antreacrds.ExternalIPPool{ + newExternalIPPool("eip1", "", "10.10.10.2", "10.10.10.3"), + newExternalIPPool("eip2", "", "10.10.11.2", "10.10.11.3"), + }, + ipToCheck: net.ParseIP("10.10.10.2"), + expectedIPPool: "eip1", + expectedError: false, + }, + { + name: "check for known IP 2", + ipPools: []*antreacrds.ExternalIPPool{ + newExternalIPPool("eip1", "", "10.10.10.2", "10.10.10.3"), + newExternalIPPool("eip2", "", "10.10.11.2", "10.10.11.3"), + }, + ipToCheck: net.ParseIP("10.10.11.2"), + expectedIPPool: "eip2", + expectedError: false, + }, + { + name: "check for unknown IP", + ipPools: []*antreacrds.ExternalIPPool{ + newExternalIPPool("eip1", "", "10.10.10.2", "10.10.10.3"), + newExternalIPPool("eip2", "", "10.10.11.2", "10.10.11.3"), + }, + ipToCheck: net.ParseIP("10.10.13.1"), + expectedIPPool: "", + expectedError: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + stopCh := make(chan struct{}) + defer close(stopCh) + var fakeCRDObjects []runtime.Object + for _, p := range tt.ipPools { + fakeCRDObjects = append(fakeCRDObjects, p) + } + controller := newController(fakeCRDObjects) + controller.crdInformerFactory.Start(stopCh) + controller.crdInformerFactory.WaitForCacheSync(stopCh) + go controller.Run(stopCh) + require.True(t, cache.WaitForCacheSync(stopCh, controller.HasSynced)) + pool, err := controller.LocateIP(tt.ipToCheck) + assert.Equal(t, tt.expectedIPPool, pool) + assert.Equal(t, tt.expectedError, err != nil) + }) + } +} + +func checkExternalIPPoolStatus(t *testing.T, controller *controller, poolName string, expectedStatus antreacrds.ExternalIPPoolUsage) { + exists := controller.IPPoolExists(poolName) + require.True(t, exists) + err := wait.PollImmediate(50*time.Millisecond, 2*time.Second, func() (found bool, err error) { + eip, err := controller.crdClient.CrdV1alpha2().ExternalIPPools().Get(context.TODO(), poolName, metav1.GetOptions{}) + if err != nil { + return false, err + } + return eip.Status.Usage == expectedStatus, nil + }) + assert.NoError(t, err) +} + +func TestExternalIPPoolController_RestoreIPAllocations(t *testing.T) { + tests := []struct { + name string + ipPools []*antreacrds.ExternalIPPool + allocations []IPAllocation + allocationsToRestore []IPAllocation + expectedSucceeded []IPAllocation + expectedIPPoolStatus []antreacrds.ExternalIPPoolUsage + }{ + { + name: "restore all IP successfully", + ipPools: []*antreacrds.ExternalIPPool{ + newExternalIPPool("eip1", "", "10.10.10.2", "10.10.10.3"), + newExternalIPPool("eip2", "", "10.10.11.2", "10.10.11.3"), + }, + allocations: nil, + allocationsToRestore: []IPAllocation{ + { + v1.ObjectReference{ + Name: "egress-1", + }, + "eip1", + net.ParseIP("10.10.10.2"), + }, + { + v1.ObjectReference{ + Name: "egress-2", + }, + "eip2", + net.ParseIP("10.10.11.2"), + }, + }, + expectedSucceeded: []IPAllocation{ + { + v1.ObjectReference{ + Name: "egress-1", + }, + "eip1", + net.ParseIP("10.10.10.2"), + }, + { + v1.ObjectReference{ + Name: "egress-2", + }, + "eip2", + net.ParseIP("10.10.11.2"), + }, + }, + expectedIPPoolStatus: []antreacrds.ExternalIPPoolUsage{ + {Total: 2, Used: 1}, + {Total: 2, Used: 1}, + }, + }, + { + name: "restore IP conflict", + ipPools: []*antreacrds.ExternalIPPool{ + newExternalIPPool("eip1", "", "10.10.10.2", "10.10.10.3"), + newExternalIPPool("eip2", "", "10.10.11.2", "10.10.11.3"), + }, + allocations: []IPAllocation{ + { + v1.ObjectReference{ + Name: "other-service-1", + }, + "eip1", + net.ParseIP("10.10.10.2"), + }, + }, + allocationsToRestore: []IPAllocation{ + { + v1.ObjectReference{ + Name: "egress-1", + }, + "eip1", + net.ParseIP("10.10.10.2"), + }, + { + v1.ObjectReference{ + Name: "egress-2", + }, + "eip2", + net.ParseIP("10.10.11.2"), + }, + }, + expectedSucceeded: []IPAllocation{ + { + v1.ObjectReference{ + Name: "egress-2", + }, + "eip2", + net.ParseIP("10.10.11.2"), + }, + }, + expectedIPPoolStatus: []antreacrds.ExternalIPPoolUsage{ + {Total: 2, Used: 1}, + {Total: 2, Used: 1}, + }, + }, + { + name: "restore IP from unknown IP pool", + ipPools: []*antreacrds.ExternalIPPool{ + newExternalIPPool("eip1", "", "10.10.10.2", "10.10.10.3"), + newExternalIPPool("eip2", "", "10.10.11.2", "10.10.11.3"), + }, + allocations: nil, + allocationsToRestore: []IPAllocation{ + { + v1.ObjectReference{ + Name: "egress-1", + }, + "eip2", + net.ParseIP("10.10.11.2"), + }, + { + v1.ObjectReference{ + Name: "egress-2", + }, + "eip3", + net.ParseIP("10.10.12.2"), + }, + }, + expectedSucceeded: []IPAllocation{ + { + v1.ObjectReference{ + Name: "egress-1", + }, + "eip2", + net.ParseIP("10.10.11.2"), + }, + }, + expectedIPPoolStatus: []antreacrds.ExternalIPPoolUsage{ + {Total: 2, Used: 0}, + {Total: 2, Used: 1}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + stopCh := make(chan struct{}) + defer close(stopCh) + var fakeCRDObjects []runtime.Object + for _, p := range tt.ipPools { + fakeCRDObjects = append(fakeCRDObjects, p) + } + controller := newController(fakeCRDObjects) + controller.AddEventHandler( + func(ippool string) { + }) + controller.crdInformerFactory.Start(stopCh) + controller.crdInformerFactory.WaitForCacheSync(stopCh) + go controller.Run(stopCh) + require.True(t, cache.WaitForCacheSync(stopCh, controller.HasSynced)) + for _, alloc := range tt.allocations { + err := controller.UpdateIPAllocation(alloc.IPPoolName, alloc.IP) + require.NoError(t, err) + } + succeeded := controller.RestoreIPAllocations(tt.allocationsToRestore) + assert.Equal(t, tt.expectedSucceeded, succeeded) + for idx, pool := range tt.ipPools { + checkExternalIPPoolStatus(t, controller, pool.Name, tt.expectedIPPoolStatus[idx]) + } + }) + } +} diff --git a/pkg/controller/externalippool/validate.go b/pkg/controller/externalippool/validate.go new file mode 100644 index 00000000000..bb104fbb8fe --- /dev/null +++ b/pkg/controller/externalippool/validate.go @@ -0,0 +1,98 @@ +// Copyright 2021 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package externalippool + +import ( + "encoding/json" + "fmt" + + admv1 "k8s.io/api/admission/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/klog/v2" + + crdv1alpha2 "antrea.io/antrea/pkg/apis/crd/v1alpha2" +) + +func (c *ExternalIPPoolController) ValidateExternalIPPool(review *admv1.AdmissionReview) *admv1.AdmissionResponse { + var result *metav1.Status + var msg string + allowed := true + + klog.V(2).Info("Validating ExternalIPPool", "request", review.Request) + var newObj, oldObj crdv1alpha2.ExternalIPPool + if review.Request.Object.Raw != nil { + if err := json.Unmarshal(review.Request.Object.Raw, &newObj); err != nil { + klog.ErrorS(err, "Error de-serializing current ExternalIPPool") + return newAdmissionResponseForErr(err) + } + } + if review.Request.OldObject.Raw != nil { + if err := json.Unmarshal(review.Request.OldObject.Raw, &oldObj); err != nil { + klog.ErrorS(err, "Error de-serializing old ExternalIPPool") + return newAdmissionResponseForErr(err) + } + } + + switch review.Request.Operation { + case admv1.Create: + // This shouldn't happen with the webhook configuration we include in the Antrea YAML manifests. + klog.V(2).Info("Validating CREATE request for ExternalIPPool") + // Always allow CREATE request. + case admv1.Update: + klog.V(2).Info("Validating UPDATE request for ExternalIPPool") + + oldIPRangeSet := getIPRangeSet(oldObj.Spec.IPRanges) + newIPRangeSet := getIPRangeSet(newObj.Spec.IPRanges) + deletedIPRanges := oldIPRangeSet.Difference(newIPRangeSet) + if deletedIPRanges.Len() > 0 { + allowed = false + msg = fmt.Sprintf("existing IPRanges %v cannot be deleted", deletedIPRanges.List()) + } + case admv1.Delete: + // This shouldn't happen with the webhook configuration we include in the Antrea YAML manifests. + klog.V(2).Info("Validating DELETE request for ExternalIPPool") + // Always allow DELETE request. + } + + if msg != "" { + result = &metav1.Status{ + Message: msg, + } + } + return &admv1.AdmissionResponse{ + Allowed: allowed, + Result: result, + } +} +func getIPRangeSet(ipRanges []crdv1alpha2.IPRange) sets.String { + set := sets.NewString() + for _, ipRange := range ipRanges { + ipRangeStr := ipRange.CIDR + if ipRangeStr == "" { + ipRangeStr = fmt.Sprintf("%s-%s", ipRange.Start, ipRange.End) + } + set.Insert(ipRangeStr) + } + return set +} + +func newAdmissionResponseForErr(err error) *admv1.AdmissionResponse { + return &admv1.AdmissionResponse{ + Result: &metav1.Status{ + Message: err.Error(), + }, + } +} diff --git a/pkg/controller/externalippool/validate_test.go b/pkg/controller/externalippool/validate_test.go new file mode 100644 index 00000000000..fd45823cb14 --- /dev/null +++ b/pkg/controller/externalippool/validate_test.go @@ -0,0 +1,100 @@ +// Copyright 2021 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package externalippool + +import ( + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + admv1 "k8s.io/api/admission/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/tools/cache" +) + +func marshal(object runtime.Object) []byte { + raw, _ := json.Marshal(object) + return raw +} + +func TestControllerValidateExternalIPPool(t *testing.T) { + tests := []struct { + name string + request *admv1.AdmissionRequest + expectedResponse *admv1.AdmissionResponse + }{ + { + name: "CREATE operation should be allowed", + request: &admv1.AdmissionRequest{ + Name: "foo", + Operation: "CREATE", + Object: runtime.RawExtension{Raw: marshal(newExternalIPPool("foo", "10.10.10.0/24", "", ""))}, + }, + expectedResponse: &admv1.AdmissionResponse{Allowed: true}, + }, + { + name: "Deleting IPRange should not be allowed", + request: &admv1.AdmissionRequest{ + Name: "foo", + Operation: "UPDATE", + OldObject: runtime.RawExtension{Raw: marshal(newExternalIPPool("foo", "10.10.10.0/24", "10.10.20.1", "10.10.20.2"))}, + Object: runtime.RawExtension{Raw: marshal(newExternalIPPool("foo", "10.10.10.0/24", "", ""))}, + }, + expectedResponse: &admv1.AdmissionResponse{ + Allowed: false, + Result: &metav1.Status{ + Message: "existing IPRanges [10.10.20.1-10.10.20.2] cannot be deleted", + }, + }, + }, + { + name: "Adding IPRange should be allowed", + request: &admv1.AdmissionRequest{ + Name: "foo", + Operation: "UPDATE", + OldObject: runtime.RawExtension{Raw: marshal(newExternalIPPool("foo", "10.10.10.0/24", "", ""))}, + Object: runtime.RawExtension{Raw: marshal(newExternalIPPool("foo", "10.10.10.0/24", "10.10.20.1", "10.10.20.2"))}, + }, + expectedResponse: &admv1.AdmissionResponse{Allowed: true}, + }, + { + name: "DELETE operation should be allowed", + request: &admv1.AdmissionRequest{ + Name: "foo", + Operation: "DELETE", + Object: runtime.RawExtension{Raw: marshal(newExternalIPPool("foo", "10.10.10.0/24", "", ""))}, + }, + expectedResponse: &admv1.AdmissionResponse{Allowed: true}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := newController(nil) + stopCh := make(chan struct{}) + defer close(stopCh) + c.crdInformerFactory.Start(stopCh) + c.crdInformerFactory.WaitForCacheSync(stopCh) + go c.Run(stopCh) + require.True(t, cache.WaitForCacheSync(stopCh, c.HasSynced)) + review := &admv1.AdmissionReview{ + Request: tt.request, + } + gotResponse := c.ValidateExternalIPPool(review) + assert.Equal(t, tt.expectedResponse, gotResponse) + }) + } +} diff --git a/test/integration/agent/ip_assigner_linux_test.go b/test/integration/agent/ip_assigner_linux_test.go index bf31901409f..9d10b090e9a 100644 --- a/test/integration/agent/ip_assigner_linux_test.go +++ b/test/integration/agent/ip_assigner_linux_test.go @@ -24,7 +24,7 @@ import ( "github.com/vishvananda/netlink" "k8s.io/apimachinery/pkg/util/sets" - "antrea.io/antrea/pkg/agent/controller/egress/ipassigner" + "antrea.io/antrea/pkg/agent/ipassigner" ) const dummyDeviceName = "antrea-dummy0"