Skip to content

Commit

Permalink
Antrea IPAM for secondary networks managed by Multus
Browse files Browse the repository at this point in the history
Extend Antrea IPAM and Antrea CNI plugin to support IPAM for Pod
secondary networks. A CNI call that specifies a non-Antrea CNI type and
Antrea IPAM type is identified as an IPAM request for a secondary
network. The IPAM configuration of the CNI call should specify the
Antrea IPPool(s) to allocate IPs for the secondary network.
Additionally, Routes and DNS parameters are supported in the IPAM
configuration.

This implementation is for secondary network managed by Multus, not
Antrea native secondary network support. Only a single IPv4 IPPool is
supported as of now.

An example Multus NetworkAttachmentDefinition with Antrea IPAM:

kind: NetworkAttachmentDefinition
metadata:
  name: macvlan-network1
spec:
  config: '{
      "cniVersion": "0.3.0",
      "type": "macvlan",
      "master": "enp0s9",
      "mode": "bridge",
      "ipam": {
        "type": "antrea-ipam",
        "ippool": "macvlan-subnet1"
        "routes": [
          { "dst": "0.0.0.0/0" },
          { "dst": "192.168.0.0/16", "gw": "10.10.5.1" },
          { "dst": "3ffe:ffff:0:01ff::1/64" }
        ],
        "dns": {
          "nameservers" : ["8.8.8.8"],
          "domain": "example.com",
          "search": [ "example.com" ]
        }
      }
    }'

Signed-off-by: Jianjun Shen <shenj@vmware.com>
  • Loading branch information
jianjuns committed Apr 7, 2022
1 parent 03b3f2b commit 9ba0b1f
Show file tree
Hide file tree
Showing 32 changed files with 1,045 additions and 398 deletions.
2 changes: 2 additions & 0 deletions build/yamls/antrea-aks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1278,6 +1278,8 @@ spec:
properties:
containerID:
type: string
ifName:
type: string
name:
type: string
namespace:
Expand Down
2 changes: 2 additions & 0 deletions build/yamls/antrea-eks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1278,6 +1278,8 @@ spec:
properties:
containerID:
type: string
ifName:
type: string
name:
type: string
namespace:
Expand Down
2 changes: 2 additions & 0 deletions build/yamls/antrea-gke.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1278,6 +1278,8 @@ spec:
properties:
containerID:
type: string
ifName:
type: string
name:
type: string
namespace:
Expand Down
2 changes: 2 additions & 0 deletions build/yamls/antrea-ipsec.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1278,6 +1278,8 @@ spec:
properties:
containerID:
type: string
ifName:
type: string
name:
type: string
namespace:
Expand Down
2 changes: 2 additions & 0 deletions build/yamls/antrea.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1278,6 +1278,8 @@ spec:
properties:
containerID:
type: string
ifName:
type: string
name:
type: string
namespace:
Expand Down
2 changes: 2 additions & 0 deletions build/yamls/base/crds.yml
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,8 @@ spec:
type: string
containerID:
type: string
ifName:
type: string
type: object
statefulSet:
properties:
Expand Down
20 changes: 9 additions & 11 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ func run(o *Options) error {
defer ovsdbConnection.Close()

egressEnabled := features.DefaultFeatureGate.Enabled(features.Egress)
enableBridgingMode := features.DefaultFeatureGate.Enabled(features.AntreaIPAM) && o.config.EnableBridgingMode
enableAntreaIPAM := features.DefaultFeatureGate.Enabled(features.AntreaIPAM)
enableBridgingMode := enableAntreaIPAM && o.config.EnableBridgingMode
// Bridging mode will connect the uplink interface to the OVS bridge.
connectUplinkToBridge := enableBridgingMode

Expand Down Expand Up @@ -362,9 +363,10 @@ func run(o *Options) error {
o.config.HostProcPathPrefix,
nodeConfig,
k8sClient,
isChaining,
enableBridgingMode, // activate AntreaIPAM in CNIServer when bridging mode is enabled
routeClient,
isChaining,
enableBridgingMode,
enableAntreaIPAM,
networkReadyCh)

var cniPodInfoStore cnipodcache.CNIPodInfoStore
Expand Down Expand Up @@ -493,15 +495,11 @@ func run(o *Options) error {
go nplController.Run(stopCh)
}

// Now Antrea IPAM is used only by bridging mode, so we initialize AntreaIPAMController only
// when the bridging mode is enabled.
if enableBridgingMode {
// Antrea IPAM is needed by bridging mode and secondary network IPAM.
if enableAntreaIPAM {
ipamController, err := ipam.InitializeAntreaIPAMController(
k8sClient,
crdClient,
informerFactory,
localPodInformer,
crdInformerFactory)
crdClient, informerFactory, crdInformerFactory,
localPodInformer, enableBridgingMode)
if err != nil {
return fmt.Errorf("failed to start Antrea IPAM agent: %v", err)
}
Expand Down
167 changes: 128 additions & 39 deletions pkg/agent/cniserver/ipam/antrea_ipam.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,17 @@ import (
"github.com/containernetworking/cni/pkg/invoke"
cnitypes "github.com/containernetworking/cni/pkg/types"
"github.com/containernetworking/cni/pkg/types/current"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"

argtypes "antrea.io/antrea/pkg/agent/cniserver/types"
"antrea.io/antrea/pkg/agent/cniserver/types"
crdv1a2 "antrea.io/antrea/pkg/apis/crd/v1alpha2"
"antrea.io/antrea/pkg/ipam/poolallocator"
)

const (
AntreaIPAMType = "antrea-ipam"
AntreaIPAMType = "antrea"
)

// Antrea IPAM driver would allocate IP addresses according to object IPAM annotation,
Expand All @@ -59,12 +60,17 @@ const (
// Resource needs to be unique since it is used as identifier in Del.
// Therefore Container ID is used, while Pod/Namespace are shown for visibility.
// TODO: Consider multi-interface case
func getAllocationOwner(args *invoke.Args, k8sArgs *argtypes.K8sArgs, reservedOwner *crdv1a2.IPAddressOwner) crdv1a2.IPAddressOwner {
func getAllocationOwner(args *invoke.Args, k8sArgs *types.K8sArgs, reservedOwner *crdv1a2.IPAddressOwner, secondary bool) crdv1a2.IPAddressOwner {
podOwner := &crdv1a2.PodOwner{
Name: string(k8sArgs.K8S_POD_NAME),
Namespace: string(k8sArgs.K8S_POD_NAMESPACE),
ContainerID: args.ContainerID,
}
if secondary {
// Add interface name for secondary network to uniquely identify
// the secondary network interface.
podOwner.IFName = args.IfName
}
if reservedOwner != nil {
owner := *reservedOwner
owner.Pod = podOwner
Expand Down Expand Up @@ -115,7 +121,7 @@ func (d *AntreaIPAM) setController(controller *AntreaIPAMController) {

// Add allocates next available IP address from associated IP Pool
// Allocated IP and associated resource are stored in IP Pool status
func (d *AntreaIPAM) Add(args *invoke.Args, k8sArgs *argtypes.K8sArgs, networkConfig []byte) (bool, *IPAMResult, error) {
func (d *AntreaIPAM) Add(args *invoke.Args, k8sArgs *types.K8sArgs, networkConfig []byte) (bool, *IPAMResult, error) {
mine, allocator, ips, reservedOwner, err := d.owns(k8sArgs)
if err != nil {
return true, nil, err
Expand All @@ -125,9 +131,9 @@ func (d *AntreaIPAM) Add(args *invoke.Args, k8sArgs *argtypes.K8sArgs, networkCo
return false, nil, nil
}

owner := getAllocationOwner(args, k8sArgs, reservedOwner)
owner := getAllocationOwner(args, k8sArgs, reservedOwner, false)
var ip net.IP
var subnetInfo crdv1a2.SubnetInfo
var subnetInfo *crdv1a2.SubnetInfo
if reservedOwner != nil {
ip, subnetInfo, err = allocator.AllocateReservedOrNext(crdv1a2.IPAddressPhaseAllocated, owner)
} else if len(ips) == 0 {
Expand All @@ -142,7 +148,7 @@ func (d *AntreaIPAM) Add(args *invoke.Args, k8sArgs *argtypes.K8sArgs, networkCo

klog.V(4).InfoS("IP allocation successful", "IP", ip.String(), "Pod", string(k8sArgs.K8S_POD_NAME))

result := IPAMResult{Result: current.Result{CNIVersion: current.ImplementedSpecVersion}, VLANID: subnetInfo.VLAN & 0xfff}
result := IPAMResult{Result: current.Result{CNIVersion: current.ImplementedSpecVersion}, VLANID: subnetInfo.VLAN}
gwIP := net.ParseIP(subnetInfo.Gateway)

ipConfig, defaultRoute := generateIPConfig(ip, int(subnetInfo.PrefixLength), gwIP)
Expand All @@ -153,23 +159,20 @@ func (d *AntreaIPAM) Add(args *invoke.Args, k8sArgs *argtypes.K8sArgs, networkCo
}

// Del deletes IP associated with resource from IP Pool status
func (d *AntreaIPAM) Del(args *invoke.Args, k8sArgs *argtypes.K8sArgs, networkConfig []byte) (bool, error) {
mine, allocator, _, _, err := d.owns(k8sArgs)
if mine == mineFalse || mine == mineUnknown {
// pass this request to next driver
return false, nil
}
func (d *AntreaIPAM) Del(args *invoke.Args, k8sArgs *types.K8sArgs, networkConfig []byte) (bool, error) {
owner := getAllocationOwner(args, k8sArgs, nil, false)
foundAllocation, err := d.del(owner.Pod)
if err != nil {
// Let the invoker retry at error.
return true, err
}

owner := getAllocationOwner(args, k8sArgs, nil)
err = allocator.ReleaseContainerIfPresent(owner.Pod.ContainerID)
return true, err
// If no allocation found, pass CNI DEL to the next driver.
return foundAllocation, nil
}

// Check verifues IP associated with resource is tracked in IP Pool status
func (d *AntreaIPAM) Check(args *invoke.Args, k8sArgs *argtypes.K8sArgs, networkConfig []byte) (bool, error) {
func (d *AntreaIPAM) Check(args *invoke.Args, k8sArgs *types.K8sArgs, networkConfig []byte) (bool, error) {
mine, allocator, _, _, err := d.owns(k8sArgs)
if err != nil {
return true, err
Expand All @@ -179,8 +182,7 @@ func (d *AntreaIPAM) Check(args *invoke.Args, k8sArgs *argtypes.K8sArgs, network
return false, nil
}

owner := getAllocationOwner(args, k8sArgs, nil)
found, err := allocator.HasContainer(owner.Pod.ContainerID)
found, err := allocator.HasContainer(args.ContainerID, "")
if err != nil {
return true, err
}
Expand All @@ -192,6 +194,89 @@ func (d *AntreaIPAM) Check(args *invoke.Args, k8sArgs *argtypes.K8sArgs, network
return true, nil
}

func (d *AntreaIPAM) secondaryNetworkAdd(args *invoke.Args, k8sArgs *types.K8sArgs, networkConfig *types.NetworkConfig) (*current.Result, error) {
ipamConf := networkConfig.IPAM
if len(ipamConf.IPPools) == 0 {
return nil, fmt.Errorf("Antrea IPPool must be specified")
}

if err := d.waitForControllerReady(); err != nil {
// Return error to let the invoker retry.
return nil, err
}

var err error
var allocator *poolallocator.IPPoolAllocator
for _, p := range ipamConf.IPPools {
allocator, err = d.controller.getPoolAllocatorByName(p)
if err == nil {
// Use the first IPPool that exists.
break
}
if !errors.IsNotFound(err) {
return nil, fmt.Errorf("failed to get IPPool %s: %v", p, err)
}
klog.InfoS("IPPool not found", "pool", p)

}
if allocator == nil {
return nil, fmt.Errorf("no valid IPPool found")
}

owner := getAllocationOwner(args, k8sArgs, nil, true)
var ip net.IP
var subnetInfo *crdv1a2.SubnetInfo
ip, subnetInfo, err = allocator.AllocateNext(crdv1a2.IPAddressPhaseAllocated, owner)
if err != nil {
return nil, err
}

// Copy routes and DNS from the input IPAM configuration.
result := &current.Result{Routes: ipamConf.Routes, DNS: ipamConf.DNS}
gwIP := net.ParseIP(subnetInfo.Gateway)
ipConfig, _ := generateIPConfig(ip, int(subnetInfo.PrefixLength), gwIP)
result.IPs = append(result.IPs, ipConfig)
return result, nil
}

func (d *AntreaIPAM) secondaryNetworkDel(args *invoke.Args, k8sArgs *types.K8sArgs, networkConfig *types.NetworkConfig) error {
owner := getAllocationOwner(args, k8sArgs, nil, true)
_, err := d.del(owner.Pod)
return err
}

func (d *AntreaIPAM) secondaryNetworkCheck(args *invoke.Args, k8sArgs *types.K8sArgs, networkConfig *types.NetworkConfig) error {
return fmt.Errorf("CNI CHECK is not implemented for secondary network")
}

func (d *AntreaIPAM) del(podOwner *crdv1a2.PodOwner) (foundAllocation bool, err error) {
if err := d.waitForControllerReady(); err != nil {
// Return error to let the invoker retry.
return false, err
}
// The Pod resource might have been removed; and for a secondary we
// would rely on the passed IPPool for CNI DEL. So, search IPPools with
// the matched PodOwner.
allocators, err := d.controller.getPoolAllocatorsByOwner(podOwner)
if err != nil {
return false, err
}

if len(allocators) == 0 {
return false, nil
}
// Multiple allocators can be returned if the network interface has IPs
// allocated from more than one IPPools.
for _, a := range allocators {
err = a.ReleaseContainer(podOwner.ContainerID, podOwner.IFName)
if err != nil {
klog.Errorf("xxx err: %v", err)
return true, err
}
}
return true, nil
}

// owns checks whether this driver owns coming IPAM request. This decision is based on
// Antrea IPAM annotation for the resource (only Namespace annotation is supported as
// of today). If annotation is not present, or annotated IP Pool not found, the driver
Expand All @@ -203,18 +288,10 @@ func (d *AntreaIPAM) Check(args *invoke.Args, k8sArgs *argtypes.K8sArgs, network
// mineTrue + timeout error
// mineTrue + IPPoolNotFound error
// mineTrue + nil error
func (d *AntreaIPAM) owns(k8sArgs *argtypes.K8sArgs) (mineType, *poolallocator.IPPoolAllocator, []net.IP, *crdv1a2.IPAddressOwner, error) {
func (d *AntreaIPAM) owns(k8sArgs *types.K8sArgs) (mineType, *poolallocator.IPPoolAllocator, []net.IP, *crdv1a2.IPAddressOwner, error) {
// Wait controller ready to avoid inappropriate behavior on CNI request
if err := wait.PollImmediate(500*time.Millisecond, 5*time.Second, func() (bool, error) {
d.controllerMutex.RLock()
defer d.controllerMutex.RUnlock()
if d.controller == nil {
klog.Warningf("Antrea IPAM driver is not ready.")
return false, nil
}
return true, nil
}); err != nil {
// return mineTrue to make this request failed and kubelet will retry
if err := d.waitForControllerReady(); err != nil {
// Return mineTrue to make this request failed and kubelet will retry.
return mineTrue, nil, nil, nil, err
}

Expand All @@ -227,17 +304,29 @@ func (d *AntreaIPAM) owns(k8sArgs *argtypes.K8sArgs) (mineType, *poolallocator.I
return d.controller.getPoolAllocatorByPod(namespace, podName)
}

func init() {
// Antrea driver must come first
// NOTE: this is global variable that requires follow-up setup post agent Init
antreaIPAMDriver = &AntreaIPAM{}
func (d *AntreaIPAM) waitForControllerReady() error {
err := wait.PollImmediate(500*time.Millisecond, 5*time.Second, func() (bool, error) {
d.controllerMutex.RLock()
defer d.controllerMutex.RUnlock()
if d.controller == nil {
klog.Warningf("Antrea IPAM driver is not ready.")
return false, nil
}
return true, nil
})

if err := RegisterIPAMDriver(AntreaIPAMType, antreaIPAMDriver); err != nil {
klog.Errorf("Failed to register IPAM plugin on type %s", AntreaIPAMType)
if err != nil {
return fmt.Errorf("Antrea IPAM driver not ready: %v", err)
}
return nil
}

func init() {
// Antrea driver must come first.
// NOTE: this is global variable that requires follow-up setup post agent initialization.
antreaIPAMDriver = &AntreaIPAM{}
RegisterIPAMDriver(AntreaIPAMType, antreaIPAMDriver)

// Host local plugin is fallback driver
if err := RegisterIPAMDriver(AntreaIPAMType, &IPAMDelegator{pluginType: ipamHostLocal}); err != nil {
klog.Errorf("Failed to register IPAM plugin on type %s", ipamHostLocal)
}
RegisterIPAMDriver(AntreaIPAMType, &IPAMDelegator{pluginType: ipamHostLocal})
}
Loading

0 comments on commit 9ba0b1f

Please sign in to comment.