diff --git a/build/charts/antrea/crds/ippool.yaml b/build/charts/antrea/crds/ippool.yaml index 41ba5d53092..ae0d4b8f995 100644 --- a/build/charts/antrea/crds/ippool.yaml +++ b/build/charts/antrea/crds/ippool.yaml @@ -100,6 +100,13 @@ spec: type: string type: object type: array + usage: + properties: + used: + type: integer + total: + type: integer + type: object type: object subresources: status: {} diff --git a/build/yamls/antrea-aks.yml b/build/yamls/antrea-aks.yml index 03e3874d5a4..ef141cb8daa 100644 --- a/build/yamls/antrea-aks.yml +++ b/build/yamls/antrea-aks.yml @@ -1370,6 +1370,13 @@ spec: type: string type: object type: array + usage: + properties: + used: + type: integer + total: + type: integer + type: object type: object subresources: status: {} diff --git a/build/yamls/antrea-crds.yml b/build/yamls/antrea-crds.yml index 64f7a1a2b62..7629b750118 100644 --- a/build/yamls/antrea-crds.yml +++ b/build/yamls/antrea-crds.yml @@ -1355,6 +1355,13 @@ spec: type: string type: object type: array + usage: + properties: + used: + type: integer + total: + type: integer + type: object type: object subresources: status: {} diff --git a/build/yamls/antrea-eks.yml b/build/yamls/antrea-eks.yml index 5c9fd89a175..9228ff96849 100644 --- a/build/yamls/antrea-eks.yml +++ b/build/yamls/antrea-eks.yml @@ -1370,6 +1370,13 @@ spec: type: string type: object type: array + usage: + properties: + used: + type: integer + total: + type: integer + type: object type: object subresources: status: {} diff --git a/build/yamls/antrea-gke.yml b/build/yamls/antrea-gke.yml index 7822ecf0c82..0c2c7922f26 100644 --- a/build/yamls/antrea-gke.yml +++ b/build/yamls/antrea-gke.yml @@ -1370,6 +1370,13 @@ spec: type: string type: object type: array + usage: + properties: + used: + type: integer + total: + type: integer + type: object type: object subresources: status: {} diff --git a/build/yamls/antrea-ipsec.yml b/build/yamls/antrea-ipsec.yml index 04fd7cbf74a..6e87d4b5174 100644 --- a/build/yamls/antrea-ipsec.yml +++ b/build/yamls/antrea-ipsec.yml @@ -1370,6 +1370,13 @@ spec: type: string type: object type: array + usage: + properties: + used: + type: integer + total: + type: integer + type: object type: object subresources: status: {} diff --git a/build/yamls/antrea.yml b/build/yamls/antrea.yml index ba216d1e056..ea0fb5e39de 100644 --- a/build/yamls/antrea.yml +++ b/build/yamls/antrea.yml @@ -1370,6 +1370,13 @@ spec: type: string type: object type: array + usage: + properties: + used: + type: integer + total: + type: integer + type: object type: object subresources: status: {} diff --git a/pkg/apis/crd/v1alpha2/types.go b/pkg/apis/crd/v1alpha2/types.go index 15d18e2b926..39232ea1a81 100644 --- a/pkg/apis/crd/v1alpha2/types.go +++ b/pkg/apis/crd/v1alpha2/types.go @@ -266,14 +266,7 @@ type IPRange struct { } type ExternalIPPoolStatus struct { - Usage ExternalIPPoolUsage `json:"usage,omitempty"` -} - -type ExternalIPPoolUsage struct { - // Total number of IPs. - Total int `json:"total"` - // Number of allocated IPs. - Used int `json:"used"` + Usage IPPoolUsage `json:"usage,omitempty"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object @@ -337,9 +330,15 @@ type SubnetIPRange struct { type IPPoolStatus struct { IPAddresses []IPAddressState `json:"ipAddresses,omitempty"` - // TODO: add usage statistics + Usage IPPoolUsage `json:"usage,omitempty"` } +type IPPoolUsage struct { + // Total number of IPs. + Total int `json:"total"` + // Number of allocated IPs. + Used int `json:"used"` +} type IPAddressPhase string const ( diff --git a/pkg/apis/crd/v1alpha2/zz_generated.deepcopy.go b/pkg/apis/crd/v1alpha2/zz_generated.deepcopy.go index ab1449891e8..9ee9725dd92 100644 --- a/pkg/apis/crd/v1alpha2/zz_generated.deepcopy.go +++ b/pkg/apis/crd/v1alpha2/zz_generated.deepcopy.go @@ -449,22 +449,6 @@ func (in *ExternalIPPoolStatus) DeepCopy() *ExternalIPPoolStatus { return out } -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *ExternalIPPoolUsage) DeepCopyInto(out *ExternalIPPoolUsage) { - *out = *in - return -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ExternalIPPoolUsage. -func (in *ExternalIPPoolUsage) DeepCopy() *ExternalIPPoolUsage { - if in == nil { - return nil - } - out := new(ExternalIPPoolUsage) - in.DeepCopyInto(out) - return out -} - // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *GRETunnel) DeepCopyInto(out *GRETunnel) { *out = *in @@ -712,6 +696,7 @@ func (in *IPPoolStatus) DeepCopyInto(out *IPPoolStatus) { (*in)[i].DeepCopyInto(&(*out)[i]) } } + out.Usage = in.Usage return } @@ -725,6 +710,22 @@ func (in *IPPoolStatus) DeepCopy() *IPPoolStatus { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *IPPoolUsage) DeepCopyInto(out *IPPoolUsage) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new IPPoolUsage. +func (in *IPPoolUsage) DeepCopy() *IPPoolUsage { + if in == nil { + return nil + } + out := new(IPPoolUsage) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *IPRange) DeepCopyInto(out *IPRange) { *out = *in diff --git a/pkg/controller/externalippool/controller.go b/pkg/controller/externalippool/controller.go index 0fd219c6c79..326c9e53ed4 100644 --- a/pkg/controller/externalippool/controller.go +++ b/pkg/controller/externalippool/controller.go @@ -308,7 +308,7 @@ func (c *ExternalIPPoolController) updateExternalIPPoolStatus(poolName string) e var getErr error if err := retry.RetryOnConflict(retry.DefaultRetry, func() error { actualStatus := eip.Status - usage := antreacrds.ExternalIPPoolUsage{Total: total, Used: used} + usage := antreacrds.IPPoolUsage{Total: total, Used: used} if actualStatus.Usage == usage { return nil } diff --git a/pkg/controller/externalippool/controller_test.go b/pkg/controller/externalippool/controller_test.go index 9d7abf8d029..9e4e40e3c0f 100644 --- a/pkg/controller/externalippool/controller_test.go +++ b/pkg/controller/externalippool/controller_test.go @@ -79,7 +79,7 @@ func TestAllocateIPFromPool(t *testing.T) { allocateFrom string expectedIP string expectError bool - expectedIPPoolStatus []antreacrds.ExternalIPPoolUsage + expectedIPPoolStatus []antreacrds.IPPoolUsage }{ { name: "allocate from proper IP pool", @@ -90,7 +90,7 @@ func TestAllocateIPFromPool(t *testing.T) { allocateFrom: "eip1", expectedIP: "10.10.10.2", expectError: false, - expectedIPPoolStatus: []antreacrds.ExternalIPPoolUsage{ + expectedIPPoolStatus: []antreacrds.IPPoolUsage{ {Total: 2, Used: 1}, }, }, @@ -109,7 +109,7 @@ func TestAllocateIPFromPool(t *testing.T) { allocateFrom: "eip1", expectedIP: "", expectError: true, - expectedIPPoolStatus: []antreacrds.ExternalIPPoolUsage{ + expectedIPPoolStatus: []antreacrds.IPPoolUsage{ {Total: 2, Used: 2}, }, }, @@ -122,7 +122,7 @@ func TestAllocateIPFromPool(t *testing.T) { allocateFrom: "eip2", expectedIP: "", expectError: true, - expectedIPPoolStatus: []antreacrds.ExternalIPPoolUsage{ + expectedIPPoolStatus: []antreacrds.IPPoolUsage{ {Total: 2, Used: 0}, }, }, @@ -164,7 +164,7 @@ func TestReleaseIP(t *testing.T) { ipPoolToRelease string ipToRelease string expectError bool - expectedIPPoolStatus []antreacrds.ExternalIPPoolUsage + expectedIPPoolStatus []antreacrds.IPPoolUsage }{ { name: "release IP to pool", @@ -181,7 +181,7 @@ func TestReleaseIP(t *testing.T) { ipPoolToRelease: "eip1", ipToRelease: "10.10.10.2", expectError: false, - expectedIPPoolStatus: []antreacrds.ExternalIPPoolUsage{ + expectedIPPoolStatus: []antreacrds.IPPoolUsage{ {Total: 2, Used: 1}, }, }, @@ -200,7 +200,7 @@ func TestReleaseIP(t *testing.T) { ipPoolToRelease: "eip1", ipToRelease: "10.10.11.2", expectError: true, - expectedIPPoolStatus: []antreacrds.ExternalIPPoolUsage{ + expectedIPPoolStatus: []antreacrds.IPPoolUsage{ {Total: 2, Used: 2}, }, }, @@ -455,7 +455,7 @@ func TestIPPoolHasIP(t *testing.T) { } } -func checkExternalIPPoolStatus(t *testing.T, controller *controller, poolName string, expectedStatus antreacrds.ExternalIPPoolUsage) { +func checkExternalIPPoolStatus(t *testing.T, controller *controller, poolName string, expectedStatus antreacrds.IPPoolUsage) { exists := controller.IPPoolExists(poolName) require.True(t, exists) err := wait.PollImmediate(50*time.Millisecond, 2*time.Second, func() (found bool, err error) { @@ -475,7 +475,7 @@ func TestExternalIPPoolController_RestoreIPAllocations(t *testing.T) { allocations []IPAllocation allocationsToRestore []IPAllocation expectedSucceeded []IPAllocation - expectedIPPoolStatus []antreacrds.ExternalIPPoolUsage + expectedIPPoolStatus []antreacrds.IPPoolUsage }{ { name: "restore all IP successfully", @@ -516,7 +516,7 @@ func TestExternalIPPoolController_RestoreIPAllocations(t *testing.T) { net.ParseIP("10.10.11.2"), }, }, - expectedIPPoolStatus: []antreacrds.ExternalIPPoolUsage{ + expectedIPPoolStatus: []antreacrds.IPPoolUsage{ {Total: 2, Used: 1}, {Total: 2, Used: 1}, }, @@ -561,7 +561,7 @@ func TestExternalIPPoolController_RestoreIPAllocations(t *testing.T) { net.ParseIP("10.10.11.2"), }, }, - expectedIPPoolStatus: []antreacrds.ExternalIPPoolUsage{ + expectedIPPoolStatus: []antreacrds.IPPoolUsage{ {Total: 2, Used: 1}, {Total: 2, Used: 1}, }, @@ -598,7 +598,7 @@ func TestExternalIPPoolController_RestoreIPAllocations(t *testing.T) { net.ParseIP("10.10.11.2"), }, }, - expectedIPPoolStatus: []antreacrds.ExternalIPPoolUsage{ + expectedIPPoolStatus: []antreacrds.IPPoolUsage{ {Total: 2, Used: 0}, {Total: 2, Used: 1}, }, diff --git a/pkg/controller/ipam/antrea_ipam_controller.go b/pkg/controller/ipam/antrea_ipam_controller.go index 14b4b597905..43416360989 100644 --- a/pkg/controller/ipam/antrea_ipam_controller.go +++ b/pkg/controller/ipam/antrea_ipam_controller.go @@ -56,6 +56,9 @@ const ( maxRetryDelay = 300 * time.Second garbageCollectionInterval = 10 * time.Minute + + // Default number of workers processing an IPPool change. + defaultWorkers = 4 ) // AntreaIPAMController is responsible for: @@ -84,6 +87,9 @@ type AntreaIPAMController struct { ipPoolInformer crdinformers.IPPoolInformer ipPoolLister crdlisters.IPPoolLister ipPoolListerSynced cache.InformerSynced + + // statusQueue maintains the IPPool objects that need to be synced. + statusQueue workqueue.RateLimitingInterface } func statefulSetIndexFunc(obj interface{}) ([]string, error) { @@ -124,6 +130,7 @@ func NewAntreaIPAMController(crdClient versioned.Interface, ipPoolInformer: ipPoolInformer, ipPoolLister: ipPoolInformer.Lister(), ipPoolListerSynced: ipPoolInformer.Informer().HasSynced, + statusQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "IPPoolStatus"), } // Add handlers for Stateful Set events. @@ -362,14 +369,93 @@ func (c *AntreaIPAMController) processNextStatefulSetWorkItem() bool { return true } +func (c *AntreaIPAMController) updateIPPoolCounters(poolName string) error { + ipPool, err := c.ipPoolLister.Get(poolName) + if err != nil { + if errors.IsNotFound(err) { + return nil + } + return fmt.Errorf("failed to retrieve IPPool %s, error: %v", poolName, err) + } + + allocator, err := poolallocator.NewIPPoolAllocator(ipPool.Name, c.crdClient, c.ipPoolLister) + + if err != nil { + return fmt.Errorf("failed to initialize allocator for IPPool %s, error: %v", poolName, err) + } + + // Total is fetched from allocator as here are trapped changes to CRD, e.g addition of new IPRange + total := allocator.Total() + + // Used is gathered from IP allocation status within the CRD - as it can be set by each one of the agents + used := len(ipPool.Status.IPAddresses) + + // If update has no effect, exit + if ipPool.Status.Usage.Used == used && ipPool.Status.Usage.Total == total { + return nil + } + + ipPoolToUpdate := ipPool.DeepCopy() + ipPoolToUpdate.Status.Usage.Used = used + ipPoolToUpdate.Status.Usage.Total = total + + _, err = c.crdClient.CrdV1alpha2().IPPools().UpdateStatus(context.TODO(), ipPoolToUpdate, metav1.UpdateOptions{}) + if err != nil { + return fmt.Errorf("failed to update IPPool %s counters, error: %v", poolName, err) + } + + return nil +} + +func (c *AntreaIPAMController) createHandler(obj interface{}) { + ipPool := obj.(*crdv1a2.IPPool) + c.statusQueue.Add(ipPool.Name) +} + +func (c *AntreaIPAMController) updateHandler(oldObj, newObj interface{}) { + ipPool := newObj.(*crdv1a2.IPPool) + c.statusQueue.Add(ipPool.Name) +} + +func (c *AntreaIPAMController) processNextWorkItem() bool { + key, quit := c.statusQueue.Get() + if quit { + return false + } + defer c.statusQueue.Done(key) + + err := c.updateIPPoolCounters(key.(string)) + if err != nil { + // Put the item back in the workqueue to handle any transient errors. + c.statusQueue.AddRateLimited(key) + klog.ErrorS(err, "Failed to sync IPPool status", "IPPool", key) + return true + } + // If no error occurs we Forget this item so it does not get queued again until + // another change happens. + c.statusQueue.Forget(key) + return true +} + +func (c *AntreaIPAMController) worker() { + for c.processNextWorkItem() { + } +} + // Run begins watching and syncing of a AntreaIPAMController. func (c *AntreaIPAMController) Run(stopCh <-chan struct{}) { defer c.statefulSetQueue.ShutDown() + defer c.statusQueue.ShutDown() klog.InfoS("Starting", "controller", controllerName) defer klog.InfoS("Shutting down", "controller", controllerName) + c.ipPoolInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: c.createHandler, + UpdateFunc: c.updateHandler, + }) + cacheSyncs := []cache.InformerSynced{c.namespaceListerSynced, c.podInformerSynced, c.statefulSetListerSynced, c.ipPoolListerSynced} if !cache.WaitForNamedCacheSync(controllerName, stopCh, cacheSyncs...) { return @@ -380,5 +466,9 @@ func (c *AntreaIPAMController) Run(stopCh <-chan struct{}) { go wait.Until(c.statefulSetWorker, time.Second, stopCh) + for i := 0; i < defaultWorkers; i++ { + go wait.Until(c.worker, time.Second, stopCh) + } + <-stopCh } diff --git a/pkg/ipam/ipallocator/allocator.go b/pkg/ipam/ipallocator/allocator.go index 71798394f8d..b02fb4928dd 100644 --- a/pkg/ipam/ipallocator/allocator.go +++ b/pkg/ipam/ipallocator/allocator.go @@ -247,6 +247,11 @@ func (a *SingleIPAllocator) Free() int { return a.max - a.count - len(a.reservedIPs) } +// Total returns the number total of IPs within the pool. +func (a *SingleIPAllocator) Total() int { + return a.max - len(a.reservedIPs) +} + // Has returns whether the provided IP is in the range or not. func (a *SingleIPAllocator) Has(ip net.IP) bool { offset := a.getOffset(ip) @@ -322,7 +327,7 @@ func (ma MultiIPAllocator) Free() int { func (ma MultiIPAllocator) Total() int { total := 0 for _, a := range ma { - total += a.max - len(a.reservedIPs) + total += a.Total() } return total } diff --git a/pkg/ipam/poolallocator/allocator.go b/pkg/ipam/poolallocator/allocator.go index fe7b5a9a1b4..e1e09965583 100644 --- a/pkg/ipam/poolallocator/allocator.go +++ b/pkg/ipam/poolallocator/allocator.go @@ -145,6 +145,7 @@ func (a *IPPoolAllocator) appendPoolUsage(ipPool *v1alpha2.IPPool, ip net.IP, st } newPool.Status.IPAddresses = append(newPool.Status.IPAddresses, usageEntry) + a.updateUsage(newPool) _, err := a.crdClient.CrdV1alpha2().IPPools().UpdateStatus(context.TODO(), newPool, metav1.UpdateOptions{}) if err != nil { klog.Warningf("IP Pool %s update with status %+v failed: %+v", newPool.Name, newPool.Status, err) @@ -239,6 +240,7 @@ func (a *IPPoolAllocator) removeIPAddressState(ipPool *v1alpha2.IPPool, ip net.I } newPool.Status.IPAddresses = newList + a.updateUsage(newPool) _, err := a.crdClient.CrdV1alpha2().IPPools().UpdateStatus(context.TODO(), newPool, metav1.UpdateOptions{}) if err != nil { @@ -622,3 +624,8 @@ func (a IPPoolAllocator) Total() int { } return allocators.Total() } + +func (a *IPPoolAllocator) updateUsage(ipPool *v1alpha2.IPPool) { + ipPool.Status.Usage.Total = a.Total() + ipPool.Status.Usage.Used = len(ipPool.Status.IPAddresses) +} diff --git a/pkg/ipam/poolallocator/allocator_test.go b/pkg/ipam/poolallocator/allocator_test.go index 62725442756..6900ff8aa1c 100644 --- a/pkg/ipam/poolallocator/allocator_test.go +++ b/pkg/ipam/poolallocator/allocator_test.go @@ -113,6 +113,7 @@ func TestAllocateIP(t *testing.T) { allocator := newTestIPPoolAllocator(&pool, stopCh) require.NotNil(t, allocator) + assert.Equal(t, 21, allocator.Total()) // Allocate specific IP from the range returnInfo, err := allocator.AllocateIP(net.ParseIP("10.2.2.101"), crdv1a2.IPAddressPhaseAllocated, fakePodOwner) @@ -135,7 +136,7 @@ func TestAllocateNext(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) - poolName := uuid.New().String() + poolName := "fakePool" ipRange := crdv1a2.IPRange{ Start: "10.2.2.100", End: "10.2.2.120", @@ -154,6 +155,7 @@ func TestAllocateNext(t *testing.T) { allocator := newTestIPPoolAllocator(&pool, stopCh) require.NotNil(t, allocator) + assert.Equal(t, 21, allocator.Total()) validateAllocationSequence(t, allocator, subnetInfo, []string{"10.2.2.100", "10.2.2.101"}) } @@ -185,6 +187,7 @@ func TestAllocateNextMultiRange(t *testing.T) { allocator := newTestIPPoolAllocator(&pool, stopCh) require.NotNil(t, allocator) + assert.Equal(t, 16, allocator.Total()) // Allocate the 2 available IPs from first range then switch to second range validateAllocationSequence(t, allocator, subnetInfo, []string{"10.2.2.100", "10.2.2.101", "10.2.2.2", "10.2.2.3"}) @@ -220,6 +223,7 @@ func TestAllocateNextMultiRangeExausted(t *testing.T) { allocator := newTestIPPoolAllocator(&pool, stopCh) require.NotNil(t, allocator) + assert.Equal(t, 3, allocator.Total()) // Allocate all available IPs validateAllocationSequence(t, allocator, subnetInfo, []string{"10.2.2.100", "10.2.2.101", "10.2.2.200"}) @@ -324,6 +328,7 @@ func TestReleaseResource(t *testing.T) { allocator := newTestIPPoolAllocator(&pool, stopCh) require.NotNil(t, allocator) + assert.Equal(t, 15, allocator.Total()) // Allocate the single available IPs from first range then 3 IPs from second range validateAllocationSequence(t, allocator, subnetInfo, []string{"2001::1000", "2001::2", "2001::3", "2001::4", "2001::5"})