From c463028cc12a19154b855582dc6177af0e0a30c4 Mon Sep 17 00:00:00 2001 From: Jianjun Shen Date: Wed, 13 Apr 2022 09:06:31 -0700 Subject: [PATCH] Support multiple IPPools for secondary network IPAM (#3606) With the change, multiple IPPools can be specified in the CNI IPAM config, and then antrea-agent will try allocating one IP in each IPPool for the secondary network interface. An example CNI config: { "cniVersion": "0.3.0", "type": "macvlan", "master": "enp0s9", "mode": "bridge", "ipam": { "type": "antrea-ipam", "ippool": ["ippool-v4", "ipppol-v6"] } } Signed-off-by: Jianjun Shen --- pkg/agent/cniserver/ipam/antrea_ipam.go | 66 ++++++------ .../cniserver/ipam/antrea_ipam_controller.go | 12 +-- pkg/agent/cniserver/ipam/antrea_ipam_test.go | 4 +- pkg/ipam/poolallocator/allocator.go | 100 +++++++++++------- pkg/ipam/poolallocator/allocator_test.go | 17 +-- test/e2e/secondary_network_ipam_test.go | 50 +++++++-- 6 files changed, 153 insertions(+), 96 deletions(-) diff --git a/pkg/agent/cniserver/ipam/antrea_ipam.go b/pkg/agent/cniserver/ipam/antrea_ipam.go index f41e231bccd..693552ee47d 100644 --- a/pkg/agent/cniserver/ipam/antrea_ipam.go +++ b/pkg/agent/cniserver/ipam/antrea_ipam.go @@ -23,7 +23,6 @@ import ( "github.com/containernetworking/cni/pkg/invoke" cnitypes "github.com/containernetworking/cni/pkg/types" "github.com/containernetworking/cni/pkg/types/current" - "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" @@ -59,7 +58,6 @@ const ( // Resource needs to be unique since it is used as identifier in Del. // Therefore Container ID is used, while Pod/Namespace are shown for visibility. -// TODO: Consider multi-interface case func getAllocationOwner(args *invoke.Args, k8sArgs *types.K8sArgs, reservedOwner *crdv1a2.IPAddressOwner, secondary bool) crdv1a2.IPAddressOwner { podOwner := &crdv1a2.PodOwner{ Name: string(k8sArgs.K8S_POD_NAME), @@ -182,21 +180,21 @@ func (d *AntreaIPAM) Check(args *invoke.Args, k8sArgs *types.K8sArgs, networkCon return false, nil } - found, err := allocator.HasContainer(args.ContainerID, "") + ip, err := allocator.GetContainerIP(args.ContainerID, "") if err != nil { return true, err } - if !found { + if ip == nil { return true, fmt.Errorf("no IP Address association found for container %s", string(k8sArgs.K8S_POD_NAME)) } - return true, nil } func (d *AntreaIPAM) secondaryNetworkAdd(args *invoke.Args, k8sArgs *types.K8sArgs, networkConfig *types.NetworkConfig) (*current.Result, error) { ipamConf := networkConfig.IPAM - if len(ipamConf.IPPools) == 0 { + numPools := len(ipamConf.IPPools) + if numPools == 0 { return nil, fmt.Errorf("Antrea IPPool must be specified") } @@ -205,37 +203,42 @@ func (d *AntreaIPAM) secondaryNetworkAdd(args *invoke.Args, k8sArgs *types.K8sAr return nil, err } - var err error - var allocator *poolallocator.IPPoolAllocator - for _, p := range ipamConf.IPPools { - allocator, err = d.controller.getPoolAllocatorByName(p) - if err == nil { - // Use the first IPPool that exists. - break + owner := getAllocationOwner(args, k8sArgs, nil, true) + var allocatorsToRelease []*poolallocator.IPPoolAllocator + defer func() { + for _, allocator := range allocatorsToRelease { + // Try to release the allocated IPs after an error. + allocator.ReleaseContainer(owner.Pod.ContainerID, owner.Pod.IFName) } - if !errors.IsNotFound(err) { - return nil, fmt.Errorf("failed to get IPPool %s: %v", p, err) + }() + + result := ¤t.Result{} + for _, p := range ipamConf.IPPools { + allocator, err := d.controller.getPoolAllocatorByName(p) + if err != nil { + return nil, err } - klog.InfoS("IPPool not found", "pool", p) - } - if allocator == nil { - return nil, fmt.Errorf("no valid IPPool found") - } + var ip net.IP + var subnetInfo *crdv1a2.SubnetInfo + ip, subnetInfo, err = allocator.AllocateNext(crdv1a2.IPAddressPhaseAllocated, owner) + if err != nil { + return nil, err + } + if numPools > 1 { + allocatorsToRelease = append(allocatorsToRelease, allocator) + } - owner := getAllocationOwner(args, k8sArgs, nil, true) - var ip net.IP - var subnetInfo *crdv1a2.SubnetInfo - ip, subnetInfo, err = allocator.AllocateNext(crdv1a2.IPAddressPhaseAllocated, owner) - if err != nil { - return nil, err + gwIP := net.ParseIP(subnetInfo.Gateway) + ipConfig, _ := generateIPConfig(ip, int(subnetInfo.PrefixLength), gwIP) + result.IPs = append(result.IPs, ipConfig) } - // Copy routes and DNS from the input IPAM configuration. - result := ¤t.Result{Routes: ipamConf.Routes, DNS: ipamConf.DNS} - gwIP := net.ParseIP(subnetInfo.Gateway) - ipConfig, _ := generateIPConfig(ip, int(subnetInfo.PrefixLength), gwIP) - result.IPs = append(result.IPs, ipConfig) + result.Routes = ipamConf.Routes + result.DNS = ipamConf.DNS + + // No failed allocation, so do not release allocated IPs. + allocatorsToRelease = nil return result, nil } @@ -270,7 +273,6 @@ func (d *AntreaIPAM) del(podOwner *crdv1a2.PodOwner) (foundAllocation bool, err for _, a := range allocators { err = a.ReleaseContainer(podOwner.ContainerID, podOwner.IFName) if err != nil { - klog.Errorf("xxx err: %v", err) return true, err } } diff --git a/pkg/agent/cniserver/ipam/antrea_ipam_controller.go b/pkg/agent/cniserver/ipam/antrea_ipam_controller.go index 563cf1f5cf0..f57a8aa435f 100644 --- a/pkg/agent/cniserver/ipam/antrea_ipam_controller.go +++ b/pkg/agent/cniserver/ipam/antrea_ipam_controller.go @@ -63,9 +63,9 @@ func podIndexFunc(obj interface{}) ([]string, error) { return nil, fmt.Errorf("obj is not IPPool: %+v", obj) } podNames := sets.NewString() - for _, IPAddress := range ipPool.Status.IPAddresses { - if IPAddress.Owner.Pod != nil { - podNames.Insert(k8s.NamespacedName(IPAddress.Owner.Pod.Namespace, IPAddress.Owner.Pod.Name)) + for _, ipAddress := range ipPool.Status.IPAddresses { + if ipAddress.Owner.Pod != nil { + podNames.Insert(k8s.NamespacedName(ipAddress.Owner.Pod.Namespace, ipAddress.Owner.Pod.Name)) } } return podNames.UnsortedList(), nil @@ -217,7 +217,7 @@ func (c *AntreaIPAMController) getPoolAllocatorByPod(namespace, podName string) break } } - if allocator == nil { + if err == nil && allocator == nil { err = fmt.Errorf("no valid IPPool found") } @@ -231,8 +231,8 @@ func (c *AntreaIPAMController) getPoolAllocatorsByOwner(podOwner *crdv1a2.PodOwn k8s.NamespacedName(podOwner.Namespace, podOwner.Name)) for _, item := range ipPools { ipPool := item.(*crdv1a2.IPPool) - for _, IPAddress := range ipPool.Status.IPAddresses { - savedPod := IPAddress.Owner.Pod + for _, ipAddress := range ipPool.Status.IPAddresses { + savedPod := ipAddress.Owner.Pod if savedPod != nil && savedPod.ContainerID == podOwner.ContainerID && savedPod.IFName == podOwner.IFName { allocator, err := poolallocator.NewIPPoolAllocator(ipPool.Name, c.crdClient, c.ipPoolLister) if err != nil { diff --git a/pkg/agent/cniserver/ipam/antrea_ipam_test.go b/pkg/agent/cniserver/ipam/antrea_ipam_test.go index 1f8b8cc634b..3808ceb3621 100644 --- a/pkg/agent/cniserver/ipam/antrea_ipam_test.go +++ b/pkg/agent/cniserver/ipam/antrea_ipam_test.go @@ -523,8 +523,8 @@ func TestAntreaIPAMDriver(t *testing.T) { testAdd("apple1", "10.2.2.100", "10.2.2.1", "ffffff00", false) testAdd("apple-sts-0", "10.2.2.102", "10.2.2.1", "ffffff00", true) - // Make sure repeated call for previous container results in error - testAddError("apple2") + // Make sure repeated call for previous container gets identical result + testAdd("apple2", "10.2.2.101", "10.2.2.1", "ffffff00", false) // Make sure repeated Add works for pod that was previously released testAdd("pear3", "10.2.3.199", "10.2.3.1", "ffffff00", false) diff --git a/pkg/ipam/poolallocator/allocator.go b/pkg/ipam/poolallocator/allocator.go index 7dc82a20f65..fe7b5a9a1b4 100644 --- a/pkg/ipam/poolallocator/allocator.go +++ b/pkg/ipam/poolallocator/allocator.go @@ -51,13 +51,11 @@ type IPPoolAllocator struct { // NewIPPoolAllocator creates an IPPoolAllocator based on the provided IP pool. func NewIPPoolAllocator(poolName string, client crdclientset.Interface, poolLister informers.IPPoolLister) (*IPPoolAllocator, error) { - - // Validate the pool exists - // This has an extra roundtrip cost, however this would allow fallback to - // default IPAM driver if needed + // Validate the pool exists. pool, err := poolLister.Get(poolName) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to get IPPool %s: %v", poolName, err) + } allocator := &IPPoolAllocator{ @@ -252,6 +250,35 @@ func (a *IPPoolAllocator) removeIPAddressState(ipPool *v1alpha2.IPPool, ip net.I } +// getExistingAllocation looks up the existing IP allocation for a Pod network interface, and +// returns the IP address and SubnetInfo if found. +func (a *IPPoolAllocator) getExistingAllocation(podOwner *v1alpha2.PodOwner) (net.IP, *v1alpha2.SubnetInfo, error) { + ip, err := a.GetContainerIP(podOwner.ContainerID, podOwner.IFName) + if err != nil { + return nil, nil, err + } + if ip == nil { + return nil, nil, nil + } + + ipPool, allocators, err := a.getPoolAndInitIPAllocators() + if err != nil { + return nil, nil, err + } + + index := -1 + for i, allocator := range allocators { + if allocator.Has(ip) { + index = i + break + } + } + if index == -1 { + return nil, nil, fmt.Errorf("IP %v does not belong to IPPool %s", ip, a.ipPoolName) + } + return ip, &ipPool.Spec.IPRanges[index].SubnetInfo, nil +} + // AllocateIP allocates the specified IP. It returns error if the IP is not in the range or already // allocated, or in case CRD failed to update its state. // In case of success, IP pool CRD status is updated with allocated IP/state/resource/container. @@ -296,21 +323,22 @@ func (a *IPPoolAllocator) AllocateIP(ip net.IP, state v1alpha2.IPAddressPhase, o // AllocateNext allocates the next available IP. It returns error if pool is exausted, // or in case CRD failed to update its state. -// In case of success, IP pool CRD status is updated with allocated IP/state/resource/container. +// In case of success, IPPool CRD status is updated with allocated IP/state/resource/container. // AllocateIP returns subnet details for the requested IP, as defined in IP pool spec. func (a *IPPoolAllocator) AllocateNext(state v1alpha2.IPAddressPhase, owner v1alpha2.IPAddressOwner) (net.IP, *v1alpha2.SubnetInfo, error) { - var subnetSpec *v1alpha2.SubnetInfo - var ip net.IP - // Same resource can not ask for allocation twice without release - // This needs to be verified even at the expense of another API call - exists, err := a.HasContainer(owner.Pod.ContainerID, owner.Pod.IFName) + podOwner := owner.Pod + // Same resource can not ask for allocation twice without release. + // This needs to be verified even at the expense of another API call. + ip, subnetSpec, err := a.getExistingAllocation(podOwner) if err != nil { return nil, nil, err } - - if exists { - return nil, nil, fmt.Errorf("container %s interface %s was already allocated an address from IP Pool %s", - owner.Pod.ContainerID, owner.Pod.IFName, a.ipPoolName) + if ip != nil { + // This can happen when the container requests IPs from multiple pools, and after an + // allocation failure, not all allocated IPs were successfully released, and then + // CNI ADD is retried. + klog.InfoS("Container already has an IP allocated", "container", podOwner.ContainerID, "interface", podOwner.IFName, "IPPool", a.ipPoolName) + return ip, subnetSpec, err } // Retry on CRD update conflict which is caused by multiple agents updating a pool at same time. @@ -350,26 +378,25 @@ func (a *IPPoolAllocator) AllocateNext(state v1alpha2.IPAddressPhase, owner v1al // success, IP pool status is updated with allocated IP/state/resource/container. // AllocateReservedOrNext returns subnet details for the requested IP, as defined in IP pool spec. func (a *IPPoolAllocator) AllocateReservedOrNext(state v1alpha2.IPAddressPhase, owner v1alpha2.IPAddressOwner) (net.IP, *v1alpha2.SubnetInfo, error) { - var subnetSpec *v1alpha2.SubnetInfo - var ip net.IP - ip, err := a.getReservedIP(owner) if err != nil { return nil, nil, err } if ip == nil { - // ip is not reserved, allocate next available ip + // IP is not reserved, allocate next available IP. return a.AllocateNext(state, owner) } - // Same resource can not ask for allocation twice without release - // This needs to be verified even at the expense of another API call - exists, err := a.HasContainer(owner.Pod.ContainerID, owner.Pod.IFName) + var prevIP net.IP + var subnetSpec *v1alpha2.SubnetInfo + podOwner := owner.Pod + prevIP, subnetSpec, err = a.getExistingAllocation(podOwner) if err != nil { return nil, nil, err } - if exists { - return nil, nil, fmt.Errorf("container %s was already allocated an address from IP Pool %s", owner.Pod.ContainerID, a.ipPoolName) + if prevIP != nil { + klog.InfoS("Container already has an IP allocated", "container", podOwner.ContainerID, "interface", podOwner.IFName, "IPPool", a.ipPoolName) + return prevIP, subnetSpec, err } // Retry on CRD update conflict which is caused by multiple agents updating a pool at same time. @@ -389,7 +416,7 @@ func (a *IPPoolAllocator) AllocateReservedOrNext(state v1alpha2.IPAddressPhase, if index == -1 { // Failed to find matching range - return fmt.Errorf("IP %v does not belong to IP pool %s", ip, a.ipPoolName) + return fmt.Errorf("IP %v does not belong to IPPool %s", ip, a.ipPoolName) } subnetSpec = &ipPool.Spec.IPRanges[index].SubnetInfo @@ -397,7 +424,7 @@ func (a *IPPoolAllocator) AllocateReservedOrNext(state v1alpha2.IPAddressPhase, }) if err != nil { - klog.ErrorS(err, "Failed to allocate IP address", "ip", ip, "ipPool", a.ipPoolName) + klog.ErrorS(err, "Failed to allocate IP address", "ip", ip, "IPPool", a.ipPoolName) } return ip, subnetSpec, err } @@ -520,7 +547,7 @@ func (a *IPPoolAllocator) ReleaseContainer(containerID, ifName string) error { return err } - // Mark allocated IPs from pool status as unavailable + // Mark the released IPs as available in the IPPool status. for _, ip := range ipPool.Status.IPAddresses { savedOwner := ip.Owner.Pod if savedOwner != nil && savedOwner.ContainerID == containerID && savedOwner.IFName == ifName { @@ -540,11 +567,10 @@ func (a *IPPoolAllocator) ReleaseContainer(containerID, ifName string) error { return err } -// HasResource checks whether an IP was associated with specified pod. It returns error if the resource is crd fails to be retrieved. -func (a *IPPoolAllocator) HasPod(namespace, podName string) (bool, error) { - +// hasPod checks whether an IP was associated with specified pod. It returns the error if fails to +// retrieve the IPPool CR. +func (a *IPPoolAllocator) hasPod(namespace, podName string) (bool, error) { ipPool, err := a.getPool() - if err != nil { return false, err } @@ -557,21 +583,19 @@ func (a *IPPoolAllocator) HasPod(namespace, podName string) (bool, error) { return false, nil } -// HasContainer checks whether an IP was associated with specified container. It returns error if the resource crd fails to be retrieved. -func (a *IPPoolAllocator) HasContainer(containerID, ifName string) (bool, error) { - +// GetContainerIP returns the IP allocated for the container interface if found. +func (a *IPPoolAllocator) GetContainerIP(containerID, ifName string) (net.IP, error) { ipPool, err := a.getPool() - if err != nil { - return false, err + return nil, err } for _, ip := range ipPool.Status.IPAddresses { if ip.Owner.Pod != nil && ip.Owner.Pod.ContainerID == containerID && ip.Owner.Pod.IFName == ifName { - return true, nil + return net.ParseIP(ip.IPAddress), nil } } - return false, nil + return nil, nil } // getReservedIP checks whether an IP was reserved with specified owner. It returns error if the resource crd fails to be retrieved. diff --git a/pkg/ipam/poolallocator/allocator_test.go b/pkg/ipam/poolallocator/allocator_test.go index 79b9a845059..62725442756 100644 --- a/pkg/ipam/poolallocator/allocator_test.go +++ b/pkg/ipam/poolallocator/allocator_test.go @@ -373,23 +373,24 @@ func TestHas(t *testing.T) { _, _, err := allocator.AllocateNext(crdv1a2.IPAddressPhaseAllocated, owner) require.NoError(t, err) err = wait.PollImmediate(100*time.Millisecond, 1*time.Second, func() (bool, error) { - has, _ := allocator.HasPod(testNamespace, "fakePod") + has, _ := allocator.hasPod(testNamespace, "fakePod") return has, nil }) require.NoError(t, err) - has, err := allocator.HasPod(testNamespace, "realPod") + has, err := allocator.hasPod(testNamespace, "realPod") require.NoError(t, err) assert.False(t, has) - has, err = allocator.HasContainer("fakeContainer", "eth1") + var ip net.IP + ip, err = allocator.GetContainerIP("fakeContainer", "eth1") require.NoError(t, err) - assert.True(t, has) - has, err = allocator.HasContainer("fakeContainer", "") + assert.NotNil(t, ip) + ip, err = allocator.GetContainerIP("fakeContainer", "") require.NoError(t, err) - assert.False(t, has) - has, err = allocator.HasContainer("realContainer", "eth1") + assert.Nil(t, ip) + ip, err = allocator.GetContainerIP("realContainer", "eth1") require.NoError(t, err) - assert.False(t, has) + assert.Nil(t, ip) } func TestAllocateReleaseStatefulSet(t *testing.T) { diff --git a/test/e2e/secondary_network_ipam_test.go b/test/e2e/secondary_network_ipam_test.go index bd8d5fc9d88..e51ad09c314 100644 --- a/test/e2e/secondary_network_ipam_test.go +++ b/test/e2e/secondary_network_ipam_test.go @@ -29,14 +29,12 @@ import ( var ( testIPPoolv4 = &crdv1alpha2.IPPool{ ObjectMeta: metav1.ObjectMeta{ - Name: "test-ippool-ipv4-1", + Name: "test-ippool-ipv4", }, Spec: crdv1alpha2.IPPoolSpec{ IPVersion: crdv1alpha2.IPv4, IPRanges: []crdv1alpha2.SubnetIPRange{{IPRange: crdv1alpha2.IPRange{ - CIDR: "", - Start: "10.123.1.101", - End: "10.123.1.200", + CIDR: "10.123.1.0/24", }, SubnetInfo: crdv1alpha2.SubnetInfo{ Gateway: "10.123.1.254", @@ -45,6 +43,23 @@ var ( }, } + testIPPoolv6 = &crdv1alpha2.IPPool{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-ippool-ipv6", + }, + Spec: crdv1alpha2.IPPoolSpec{ + IPVersion: crdv1alpha2.IPv6, + IPRanges: []crdv1alpha2.SubnetIPRange{{IPRange: crdv1alpha2.IPRange{ + Start: "3ffe:ffff:1:01ff::0101", + End: "3ffe:ffff:1:01ff::0200", + }, + SubnetInfo: crdv1alpha2.SubnetInfo{ + Gateway: "3ffe:ffff:1:01ff::1", + PrefixLength: 64, + }}}, + }, + } + cniCmd = "/opt/cni/bin/antrea" cniEnvs = map[string]string{ @@ -61,7 +76,7 @@ var ( "keyA": ["some more", "plugin specific", "configuration"], "ipam": { "type": "antrea", - "ippools": [ "test-ippool-ipv4-1" ], + "ippools": [ "test-ippool-ipv4", "test-ippool-ipv6" ], "routes": [ { "dst": "0.0.0.0/0" }, { "dst": "192.168.0.0/16", "gw": "10.10.5.1" }, @@ -79,8 +94,13 @@ var ( "ips": [ { "version": "4", - "address": "10.123.1.101/24", + "address": "10.123.1.1/24", "gateway": "10.123.1.254" + }, + { + "version": "6", + "address": "3ffe:ffff:1:1ff::101/64", + "gateway": "3ffe:ffff:1:1ff::1" } ], "routes": [ @@ -110,8 +130,13 @@ var ( "ips": [ { "version": "4", - "address": "10.123.1.102/24", + "address": "10.123.1.2/24", "gateway": "10.123.1.254" + }, + { + "version": "6", + "address": "3ffe:ffff:1:1ff::102/64", + "gateway": "3ffe:ffff:1:1ff::1" } ], "routes": [ @@ -205,15 +230,20 @@ func TestSecondaryNetworkIPAM(t *testing.T) { _, err = data.crdClient.CrdV1alpha2().IPPools().Create(context.TODO(), testIPPoolv4, metav1.CreateOptions{}) defer deleteIPPoolWrapper(t, data, testIPPoolv4.Name) if err != nil { - t.Fatalf("Failed to create IPPool CR: %v", err) + t.Fatalf("Failed to create v4 IPPool CR: %v", err) + } + _, err = data.crdClient.CrdV1alpha2().IPPools().Create(context.TODO(), testIPPoolv6, metav1.CreateOptions{}) + defer deleteIPPoolWrapper(t, data, testIPPoolv6.Name) + if err != nil { + t.Fatalf("Failed to create v6 IPPool CR: %v", err) } // DEL non-existing network. Should return no error. executeCNI(t, data, false, true, "net1", 0, "") // Allocate the first IP. executeCNI(t, data, true, false, "net1", 0, testOutput1) - // Duplicated ADD request. Should return exit code 1. - executeCNI(t, data, true, false, "net1", 1, "") + // CNI ADD retry should return the same result. + executeCNI(t, data, true, false, "net1", 0, testOutput1) // Allocate the second IP, and then DEL. executeCNI(t, data, true, true, "net2", 0, testOutput2) // The second IP should be re-allocated, as it was releaed with the previous CNI DEL.