Skip to content

Commit

Permalink
Support multiple IPPools for secondary network IPAM (#3606)
Browse files Browse the repository at this point in the history
With the change, multiple IPPools can be specified in the CNI IPAM
config, and then antrea-agent will try allocating one IP in each IPPool
for the secondary network interface.

An example CNI config:
  {
      "cniVersion": "0.3.0",
      "type": "macvlan",
      "master": "enp0s9",
      "mode": "bridge",
      "ipam": {
          "type": "antrea-ipam",
          "ippool": ["ippool-v4", "ipppol-v6"]
      }
  }

Signed-off-by: Jianjun Shen <shenj@vmware.com>
  • Loading branch information
jianjuns authored Apr 13, 2022
1 parent ecece42 commit c463028
Show file tree
Hide file tree
Showing 6 changed files with 153 additions and 96 deletions.
66 changes: 34 additions & 32 deletions pkg/agent/cniserver/ipam/antrea_ipam.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/containernetworking/cni/pkg/invoke"
cnitypes "github.com/containernetworking/cni/pkg/types"
"github.com/containernetworking/cni/pkg/types/current"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"

Expand Down Expand Up @@ -59,7 +58,6 @@ const (

// Resource needs to be unique since it is used as identifier in Del.
// Therefore Container ID is used, while Pod/Namespace are shown for visibility.
// TODO: Consider multi-interface case
func getAllocationOwner(args *invoke.Args, k8sArgs *types.K8sArgs, reservedOwner *crdv1a2.IPAddressOwner, secondary bool) crdv1a2.IPAddressOwner {
podOwner := &crdv1a2.PodOwner{
Name: string(k8sArgs.K8S_POD_NAME),
Expand Down Expand Up @@ -182,21 +180,21 @@ func (d *AntreaIPAM) Check(args *invoke.Args, k8sArgs *types.K8sArgs, networkCon
return false, nil
}

found, err := allocator.HasContainer(args.ContainerID, "")
ip, err := allocator.GetContainerIP(args.ContainerID, "")
if err != nil {
return true, err
}

if !found {
if ip == nil {
return true, fmt.Errorf("no IP Address association found for container %s", string(k8sArgs.K8S_POD_NAME))
}

return true, nil
}

func (d *AntreaIPAM) secondaryNetworkAdd(args *invoke.Args, k8sArgs *types.K8sArgs, networkConfig *types.NetworkConfig) (*current.Result, error) {
ipamConf := networkConfig.IPAM
if len(ipamConf.IPPools) == 0 {
numPools := len(ipamConf.IPPools)
if numPools == 0 {
return nil, fmt.Errorf("Antrea IPPool must be specified")
}

Expand All @@ -205,37 +203,42 @@ func (d *AntreaIPAM) secondaryNetworkAdd(args *invoke.Args, k8sArgs *types.K8sAr
return nil, err
}

var err error
var allocator *poolallocator.IPPoolAllocator
for _, p := range ipamConf.IPPools {
allocator, err = d.controller.getPoolAllocatorByName(p)
if err == nil {
// Use the first IPPool that exists.
break
owner := getAllocationOwner(args, k8sArgs, nil, true)
var allocatorsToRelease []*poolallocator.IPPoolAllocator
defer func() {
for _, allocator := range allocatorsToRelease {
// Try to release the allocated IPs after an error.
allocator.ReleaseContainer(owner.Pod.ContainerID, owner.Pod.IFName)
}
if !errors.IsNotFound(err) {
return nil, fmt.Errorf("failed to get IPPool %s: %v", p, err)
}()

result := &current.Result{}
for _, p := range ipamConf.IPPools {
allocator, err := d.controller.getPoolAllocatorByName(p)
if err != nil {
return nil, err
}
klog.InfoS("IPPool not found", "pool", p)

}
if allocator == nil {
return nil, fmt.Errorf("no valid IPPool found")
}
var ip net.IP
var subnetInfo *crdv1a2.SubnetInfo
ip, subnetInfo, err = allocator.AllocateNext(crdv1a2.IPAddressPhaseAllocated, owner)
if err != nil {
return nil, err
}
if numPools > 1 {
allocatorsToRelease = append(allocatorsToRelease, allocator)
}

owner := getAllocationOwner(args, k8sArgs, nil, true)
var ip net.IP
var subnetInfo *crdv1a2.SubnetInfo
ip, subnetInfo, err = allocator.AllocateNext(crdv1a2.IPAddressPhaseAllocated, owner)
if err != nil {
return nil, err
gwIP := net.ParseIP(subnetInfo.Gateway)
ipConfig, _ := generateIPConfig(ip, int(subnetInfo.PrefixLength), gwIP)
result.IPs = append(result.IPs, ipConfig)
}

// Copy routes and DNS from the input IPAM configuration.
result := &current.Result{Routes: ipamConf.Routes, DNS: ipamConf.DNS}
gwIP := net.ParseIP(subnetInfo.Gateway)
ipConfig, _ := generateIPConfig(ip, int(subnetInfo.PrefixLength), gwIP)
result.IPs = append(result.IPs, ipConfig)
result.Routes = ipamConf.Routes
result.DNS = ipamConf.DNS

// No failed allocation, so do not release allocated IPs.
allocatorsToRelease = nil
return result, nil
}

Expand Down Expand Up @@ -270,7 +273,6 @@ func (d *AntreaIPAM) del(podOwner *crdv1a2.PodOwner) (foundAllocation bool, err
for _, a := range allocators {
err = a.ReleaseContainer(podOwner.ContainerID, podOwner.IFName)
if err != nil {
klog.Errorf("xxx err: %v", err)
return true, err
}
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/agent/cniserver/ipam/antrea_ipam_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ func podIndexFunc(obj interface{}) ([]string, error) {
return nil, fmt.Errorf("obj is not IPPool: %+v", obj)
}
podNames := sets.NewString()
for _, IPAddress := range ipPool.Status.IPAddresses {
if IPAddress.Owner.Pod != nil {
podNames.Insert(k8s.NamespacedName(IPAddress.Owner.Pod.Namespace, IPAddress.Owner.Pod.Name))
for _, ipAddress := range ipPool.Status.IPAddresses {
if ipAddress.Owner.Pod != nil {
podNames.Insert(k8s.NamespacedName(ipAddress.Owner.Pod.Namespace, ipAddress.Owner.Pod.Name))
}
}
return podNames.UnsortedList(), nil
Expand Down Expand Up @@ -217,7 +217,7 @@ func (c *AntreaIPAMController) getPoolAllocatorByPod(namespace, podName string)
break
}
}
if allocator == nil {
if err == nil && allocator == nil {
err = fmt.Errorf("no valid IPPool found")
}

Expand All @@ -231,8 +231,8 @@ func (c *AntreaIPAMController) getPoolAllocatorsByOwner(podOwner *crdv1a2.PodOwn
k8s.NamespacedName(podOwner.Namespace, podOwner.Name))
for _, item := range ipPools {
ipPool := item.(*crdv1a2.IPPool)
for _, IPAddress := range ipPool.Status.IPAddresses {
savedPod := IPAddress.Owner.Pod
for _, ipAddress := range ipPool.Status.IPAddresses {
savedPod := ipAddress.Owner.Pod
if savedPod != nil && savedPod.ContainerID == podOwner.ContainerID && savedPod.IFName == podOwner.IFName {
allocator, err := poolallocator.NewIPPoolAllocator(ipPool.Name, c.crdClient, c.ipPoolLister)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/agent/cniserver/ipam/antrea_ipam_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,8 +523,8 @@ func TestAntreaIPAMDriver(t *testing.T) {
testAdd("apple1", "10.2.2.100", "10.2.2.1", "ffffff00", false)
testAdd("apple-sts-0", "10.2.2.102", "10.2.2.1", "ffffff00", true)

// Make sure repeated call for previous container results in error
testAddError("apple2")
// Make sure repeated call for previous container gets identical result
testAdd("apple2", "10.2.2.101", "10.2.2.1", "ffffff00", false)

// Make sure repeated Add works for pod that was previously released
testAdd("pear3", "10.2.3.199", "10.2.3.1", "ffffff00", false)
Expand Down
100 changes: 62 additions & 38 deletions pkg/ipam/poolallocator/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,11 @@ type IPPoolAllocator struct {

// NewIPPoolAllocator creates an IPPoolAllocator based on the provided IP pool.
func NewIPPoolAllocator(poolName string, client crdclientset.Interface, poolLister informers.IPPoolLister) (*IPPoolAllocator, error) {

// Validate the pool exists
// This has an extra roundtrip cost, however this would allow fallback to
// default IPAM driver if needed
// Validate the pool exists.
pool, err := poolLister.Get(poolName)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to get IPPool %s: %v", poolName, err)

}

allocator := &IPPoolAllocator{
Expand Down Expand Up @@ -252,6 +250,35 @@ func (a *IPPoolAllocator) removeIPAddressState(ipPool *v1alpha2.IPPool, ip net.I

}

// getExistingAllocation looks up the existing IP allocation for a Pod network interface, and
// returns the IP address and SubnetInfo if found.
func (a *IPPoolAllocator) getExistingAllocation(podOwner *v1alpha2.PodOwner) (net.IP, *v1alpha2.SubnetInfo, error) {
ip, err := a.GetContainerIP(podOwner.ContainerID, podOwner.IFName)
if err != nil {
return nil, nil, err
}
if ip == nil {
return nil, nil, nil
}

ipPool, allocators, err := a.getPoolAndInitIPAllocators()
if err != nil {
return nil, nil, err
}

index := -1
for i, allocator := range allocators {
if allocator.Has(ip) {
index = i
break
}
}
if index == -1 {
return nil, nil, fmt.Errorf("IP %v does not belong to IPPool %s", ip, a.ipPoolName)
}
return ip, &ipPool.Spec.IPRanges[index].SubnetInfo, nil
}

// AllocateIP allocates the specified IP. It returns error if the IP is not in the range or already
// allocated, or in case CRD failed to update its state.
// In case of success, IP pool CRD status is updated with allocated IP/state/resource/container.
Expand Down Expand Up @@ -296,21 +323,22 @@ func (a *IPPoolAllocator) AllocateIP(ip net.IP, state v1alpha2.IPAddressPhase, o

// AllocateNext allocates the next available IP. It returns error if pool is exausted,
// or in case CRD failed to update its state.
// In case of success, IP pool CRD status is updated with allocated IP/state/resource/container.
// In case of success, IPPool CRD status is updated with allocated IP/state/resource/container.
// AllocateIP returns subnet details for the requested IP, as defined in IP pool spec.
func (a *IPPoolAllocator) AllocateNext(state v1alpha2.IPAddressPhase, owner v1alpha2.IPAddressOwner) (net.IP, *v1alpha2.SubnetInfo, error) {
var subnetSpec *v1alpha2.SubnetInfo
var ip net.IP
// Same resource can not ask for allocation twice without release
// This needs to be verified even at the expense of another API call
exists, err := a.HasContainer(owner.Pod.ContainerID, owner.Pod.IFName)
podOwner := owner.Pod
// Same resource can not ask for allocation twice without release.
// This needs to be verified even at the expense of another API call.
ip, subnetSpec, err := a.getExistingAllocation(podOwner)
if err != nil {
return nil, nil, err
}

if exists {
return nil, nil, fmt.Errorf("container %s interface %s was already allocated an address from IP Pool %s",
owner.Pod.ContainerID, owner.Pod.IFName, a.ipPoolName)
if ip != nil {
// This can happen when the container requests IPs from multiple pools, and after an
// allocation failure, not all allocated IPs were successfully released, and then
// CNI ADD is retried.
klog.InfoS("Container already has an IP allocated", "container", podOwner.ContainerID, "interface", podOwner.IFName, "IPPool", a.ipPoolName)
return ip, subnetSpec, err
}

// Retry on CRD update conflict which is caused by multiple agents updating a pool at same time.
Expand Down Expand Up @@ -350,26 +378,25 @@ func (a *IPPoolAllocator) AllocateNext(state v1alpha2.IPAddressPhase, owner v1al
// success, IP pool status is updated with allocated IP/state/resource/container.
// AllocateReservedOrNext returns subnet details for the requested IP, as defined in IP pool spec.
func (a *IPPoolAllocator) AllocateReservedOrNext(state v1alpha2.IPAddressPhase, owner v1alpha2.IPAddressOwner) (net.IP, *v1alpha2.SubnetInfo, error) {
var subnetSpec *v1alpha2.SubnetInfo
var ip net.IP

ip, err := a.getReservedIP(owner)
if err != nil {
return nil, nil, err
}
if ip == nil {
// ip is not reserved, allocate next available ip
// IP is not reserved, allocate next available IP.
return a.AllocateNext(state, owner)
}

// Same resource can not ask for allocation twice without release
// This needs to be verified even at the expense of another API call
exists, err := a.HasContainer(owner.Pod.ContainerID, owner.Pod.IFName)
var prevIP net.IP
var subnetSpec *v1alpha2.SubnetInfo
podOwner := owner.Pod
prevIP, subnetSpec, err = a.getExistingAllocation(podOwner)
if err != nil {
return nil, nil, err
}
if exists {
return nil, nil, fmt.Errorf("container %s was already allocated an address from IP Pool %s", owner.Pod.ContainerID, a.ipPoolName)
if prevIP != nil {
klog.InfoS("Container already has an IP allocated", "container", podOwner.ContainerID, "interface", podOwner.IFName, "IPPool", a.ipPoolName)
return prevIP, subnetSpec, err
}

// Retry on CRD update conflict which is caused by multiple agents updating a pool at same time.
Expand All @@ -389,15 +416,15 @@ func (a *IPPoolAllocator) AllocateReservedOrNext(state v1alpha2.IPAddressPhase,

if index == -1 {
// Failed to find matching range
return fmt.Errorf("IP %v does not belong to IP pool %s", ip, a.ipPoolName)
return fmt.Errorf("IP %v does not belong to IPPool %s", ip, a.ipPoolName)
}

subnetSpec = &ipPool.Spec.IPRanges[index].SubnetInfo
return a.updateIPAddressState(ipPool, ip, state, owner)
})

if err != nil {
klog.ErrorS(err, "Failed to allocate IP address", "ip", ip, "ipPool", a.ipPoolName)
klog.ErrorS(err, "Failed to allocate IP address", "ip", ip, "IPPool", a.ipPoolName)
}
return ip, subnetSpec, err
}
Expand Down Expand Up @@ -520,7 +547,7 @@ func (a *IPPoolAllocator) ReleaseContainer(containerID, ifName string) error {
return err
}

// Mark allocated IPs from pool status as unavailable
// Mark the released IPs as available in the IPPool status.
for _, ip := range ipPool.Status.IPAddresses {
savedOwner := ip.Owner.Pod
if savedOwner != nil && savedOwner.ContainerID == containerID && savedOwner.IFName == ifName {
Expand All @@ -540,11 +567,10 @@ func (a *IPPoolAllocator) ReleaseContainer(containerID, ifName string) error {
return err
}

// HasResource checks whether an IP was associated with specified pod. It returns error if the resource is crd fails to be retrieved.
func (a *IPPoolAllocator) HasPod(namespace, podName string) (bool, error) {

// hasPod checks whether an IP was associated with specified pod. It returns the error if fails to
// retrieve the IPPool CR.
func (a *IPPoolAllocator) hasPod(namespace, podName string) (bool, error) {
ipPool, err := a.getPool()

if err != nil {
return false, err
}
Expand All @@ -557,21 +583,19 @@ func (a *IPPoolAllocator) HasPod(namespace, podName string) (bool, error) {
return false, nil
}

// HasContainer checks whether an IP was associated with specified container. It returns error if the resource crd fails to be retrieved.
func (a *IPPoolAllocator) HasContainer(containerID, ifName string) (bool, error) {

// GetContainerIP returns the IP allocated for the container interface if found.
func (a *IPPoolAllocator) GetContainerIP(containerID, ifName string) (net.IP, error) {
ipPool, err := a.getPool()

if err != nil {
return false, err
return nil, err
}

for _, ip := range ipPool.Status.IPAddresses {
if ip.Owner.Pod != nil && ip.Owner.Pod.ContainerID == containerID && ip.Owner.Pod.IFName == ifName {
return true, nil
return net.ParseIP(ip.IPAddress), nil
}
}
return false, nil
return nil, nil
}

// getReservedIP checks whether an IP was reserved with specified owner. It returns error if the resource crd fails to be retrieved.
Expand Down
17 changes: 9 additions & 8 deletions pkg/ipam/poolallocator/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,23 +373,24 @@ func TestHas(t *testing.T) {
_, _, err := allocator.AllocateNext(crdv1a2.IPAddressPhaseAllocated, owner)
require.NoError(t, err)
err = wait.PollImmediate(100*time.Millisecond, 1*time.Second, func() (bool, error) {
has, _ := allocator.HasPod(testNamespace, "fakePod")
has, _ := allocator.hasPod(testNamespace, "fakePod")
return has, nil
})
require.NoError(t, err)

has, err := allocator.HasPod(testNamespace, "realPod")
has, err := allocator.hasPod(testNamespace, "realPod")
require.NoError(t, err)
assert.False(t, has)
has, err = allocator.HasContainer("fakeContainer", "eth1")
var ip net.IP
ip, err = allocator.GetContainerIP("fakeContainer", "eth1")
require.NoError(t, err)
assert.True(t, has)
has, err = allocator.HasContainer("fakeContainer", "")
assert.NotNil(t, ip)
ip, err = allocator.GetContainerIP("fakeContainer", "")
require.NoError(t, err)
assert.False(t, has)
has, err = allocator.HasContainer("realContainer", "eth1")
assert.Nil(t, ip)
ip, err = allocator.GetContainerIP("realContainer", "eth1")
require.NoError(t, err)
assert.False(t, has)
assert.Nil(t, ip)
}

func TestAllocateReleaseStatefulSet(t *testing.T) {
Expand Down
Loading

0 comments on commit c463028

Please sign in to comment.