Skip to content

Commit

Permalink
Delete secondaryNetwork OVS ports correctly after an Agent restart
Browse files Browse the repository at this point in the history
Signed-off-by: KMAnju-2021 <km074btcse18@igdtuw.ac.in>
  • Loading branch information
KMAnju-2021 committed Feb 10, 2025
1 parent 5ee28ec commit d49d06a
Show file tree
Hide file tree
Showing 5 changed files with 404 additions and 14 deletions.
25 changes: 13 additions & 12 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,19 @@ func run(o *Options) error {
}
}

// Secondary network controller should be created before CNIServer.Run() to make sure no Pod CNI updates will be missed.
var secondaryNetworkController *secondarynetwork.Controller
if features.DefaultFeatureGate.Enabled(features.SecondaryNetwork) {
secondaryNetworkController, err = secondarynetwork.NewController(
o.config.ClientConnection, o.config.KubeAPIServerOverride,
k8sClient, localPodInformer.Get(),
podUpdateChannel, ifaceStore,
&o.config.SecondaryNetwork, ovsdbConnection)
if err != nil {
return fmt.Errorf("failed to create secondary network controller: %w", err)
}
}

var traceflowController *traceflow.Controller
if features.DefaultFeatureGate.Enabled(features.Traceflow) {
traceflowController = traceflow.NewTraceflowController(
Expand Down Expand Up @@ -760,18 +773,6 @@ func run(o *Options) error {
go ipamController.Run(stopCh)
}

var secondaryNetworkController *secondarynetwork.Controller
if features.DefaultFeatureGate.Enabled(features.SecondaryNetwork) {
secondaryNetworkController, err = secondarynetwork.NewController(
o.config.ClientConnection, o.config.KubeAPIServerOverride,
k8sClient, localPodInformer.Get(),
podUpdateChannel,
&o.config.SecondaryNetwork, ovsdbConnection)
if err != nil {
return fmt.Errorf("failed to create secondary network controller: %w", err)
}
}

var bgpController *bgp.Controller
if features.DefaultFeatureGate.Enabled(features.BGPPolicy) {
bgpPolicyInformer := crdInformerFactory.Crd().V1alpha1().BGPPolicies()
Expand Down
4 changes: 3 additions & 1 deletion pkg/agent/secondarynetwork/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
componentbaseconfig "k8s.io/component-base/config"
"k8s.io/klog/v2"

"antrea.io/antrea/pkg/agent/interfacestore"
"antrea.io/antrea/pkg/agent/secondarynetwork/podwatch"
agentconfig "antrea.io/antrea/pkg/config/agent"
"antrea.io/antrea/pkg/ovs/ovsconfig"
Expand All @@ -47,6 +48,7 @@ func NewController(
k8sClient clientset.Interface,
podInformer cache.SharedIndexInformer,
podUpdateSubscriber channel.Subscriber,
pIfaceStore interfacestore.InterfaceStore,
secNetConfig *agentconfig.SecondaryNetworkConfig, ovsdb *ovsdb.OVSDB,
) (*Controller, error) {
ovsBridgeClient, err := createOVSBridge(secNetConfig.OVSBridges, ovsdb)
Expand All @@ -65,7 +67,7 @@ func NewController(
// k8s.v1.cni.cncf.io/networks Annotation defined.
podWatchController, err := podwatch.NewPodController(
k8sClient, netAttachDefClient, podInformer,
podUpdateSubscriber, ovsBridgeClient)
podUpdateSubscriber, pIfaceStore, ovsBridgeClient)
if err != nil {
return nil, err
}
Expand Down
92 changes: 92 additions & 0 deletions pkg/agent/secondarynetwork/podwatch/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func NewPodController(
netAttachDefClient netdefclient.K8sCniCncfIoV1Interface,
podInformer cache.SharedIndexInformer,
podUpdateSubscriber channel.Subscriber,
pIfaceStore interfacestore.InterfaceStore,
ovsBridgeClient ovsconfig.OVSBridgeClient,
) (*PodController, error) {
ifaceStore := interfacestore.NewInterfaceStore()
Expand Down Expand Up @@ -133,6 +134,15 @@ func NewPodController(
},
resyncPeriod,
)

if err := pc.initializeSecondaryInterfaceStore(); err != nil {
return nil, fmt.Errorf("failed to initialize secondary interface store: %v", err)
}

if err := pc.reconcileSecondaryInterfaces(pIfaceStore); err != nil {
return nil, fmt.Errorf("failed to restore CNI cache and reconcile secondary interfaces: %v", err)
}

// podUpdateSubscriber can be nil with test code.
if podUpdateSubscriber != nil {
// Subscribe Pod CNI add/del events.
Expand Down Expand Up @@ -502,3 +512,85 @@ func checkForPodSecondaryNetworkAttachement(pod *corev1.Pod) (string, bool) {
return netObj, false
}
}

// initializeSecondaryInterfaceStore restores secondary interfaceStore when agent restarts.
func (pc *PodController) initializeSecondaryInterfaceStore() error {
if pc.ovsBridgeClient == nil {
return nil
}

ovsPorts, err := pc.ovsBridgeClient.GetPortList()
if err != nil {
return fmt.Errorf("failed to list OVS ports for the secondary bridge: %v", err)
}

ifaceList := make([]*interfacestore.InterfaceConfig, 0, len(ovsPorts))
for index := range ovsPorts {
port := &ovsPorts[index]
ovsPort := &interfacestore.OVSPortConfig{
PortUUID: port.UUID,
OFPort: port.OFPort,
}

interfaceType, ok := port.ExternalIDs[interfacestore.AntreaInterfaceTypeKey]
if !ok {
klog.InfoS("Interface type is not set for the secondary bridge", "interfaceName", port.Name)
continue
}

var intf *interfacestore.InterfaceConfig
switch interfaceType {
case interfacestore.AntreaContainer:
intf = cniserver.ParseOVSPortInterfaceConfig(port, ovsPort)
default:
klog.InfoS("Unknown Antrea interface type for the secondary bridge", "type", interfaceType)
}

if intf != nil {
ifaceList = append(ifaceList, intf)
}

}

pc.interfaceStore.Initialize(ifaceList)
klog.InfoS("Successfully initialized the secondary bridge interface store")

return nil
}

// reconcileSecondaryInterfaces restores cniCache when agent restarts using primary interfaceStore.
func (pc *PodController) reconcileSecondaryInterfaces(pIfaceStore interfacestore.InterfaceStore) error {
if pIfaceStore == nil {
klog.InfoS("Primary interfaceStore is nil, skipping reconciliation for Secondary Network")
return nil
}

knownInterfaces := pIfaceStore.GetInterfacesByType(interfacestore.ContainerInterface)
for _, containerConfig := range knownInterfaces {
config := containerConfig.ContainerInterfaceConfig
podKey := podKeyGet(config.PodName, config.PodNamespace)
pc.cniCache.Store(podKey, &podCNIInfo{
containerID: config.ContainerID,
})
}

var staleInterfaces []*interfacestore.InterfaceConfig
// secondaryInterfaces is the list of interfaces currently in the secondary local cache.
secondaryInterfaces := pc.interfaceStore.GetInterfacesByType(interfacestore.ContainerInterface)
for _, containerConfig := range secondaryInterfaces {
_, exists := pIfaceStore.GetContainerInterface(containerConfig.ContainerID)
if !exists || containerConfig.OFPort == -1 {
// Deletes ports not in the CNI cache.
staleInterfaces = append(staleInterfaces, containerConfig)
}
}

// If there are any stale interfaces, pass them to removeInterfaces()
if len(staleInterfaces) > 0 {
if err := pc.removeInterfaces(staleInterfaces); err != nil {
klog.ErrorS(err, "Failed to remove stale secondary interfaces", "staleInterfaces", staleInterfaces)
}
}

return nil
}
146 changes: 145 additions & 1 deletion pkg/agent/secondarynetwork/podwatch/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"time"

current "github.com/containernetworking/cni/pkg/types/100"
"github.com/google/uuid"
netdefv1 "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/apis/k8s.cni.cncf.io/v1"
netdefclientfake "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/clientset/versioned/fake"
"github.com/stretchr/testify/assert"
Expand All @@ -43,12 +44,15 @@ import (
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/util/workqueue"

"antrea.io/antrea/pkg/agent/cniserver"
"antrea.io/antrea/pkg/agent/cniserver/ipam"
cnitypes "antrea.io/antrea/pkg/agent/cniserver/types"
"antrea.io/antrea/pkg/agent/interfacestore"
podwatchtesting "antrea.io/antrea/pkg/agent/secondarynetwork/podwatch/testing"
"antrea.io/antrea/pkg/agent/types"
crdv1beta1 "antrea.io/antrea/pkg/apis/crd/v1beta1"
"antrea.io/antrea/pkg/ovs/ovsconfig"
ovsconfigtest "antrea.io/antrea/pkg/ovs/ovsconfig/testing"
)

const (
Expand Down Expand Up @@ -205,7 +209,7 @@ func TestPodControllerRun(t *testing.T) {
client,
netdefclient,
informerFactory.Core().V1().Pods().Informer(),
nil, nil)
nil, nil, nil)
podController.interfaceConfigurator = interfaceConfigurator
podController.ipamAllocator = mockIPAM
cniCache := &podController.cniCache
Expand Down Expand Up @@ -936,3 +940,143 @@ func testPodControllerStart(ctrl *gomock.Controller) (
informerFactory.WaitForCacheSync(stopCh)
return podController, mockIPAM, interfaceConfigurator
}

func convertExternalIDMap(in map[string]interface{}) map[string]string {
out := make(map[string]string, len(in))
for k, v := range in {
out[k] = v.(string)
}
return out
}

func createTestInterfaces() (map[string]string, []ovsconfig.OVSPortData, []*interfacestore.InterfaceConfig) {
uuid1 := uuid.New().String()
uuid2 := uuid.New().String()
uuid3 := uuid.New().String()

p1MAC, p1IP := "11:22:33:44:55:66", "192.168.1.10"
p2MAC, p2IP := "11:22:33:44:55:77", "192.168.1.11"

p1NetMAC, _ := net.ParseMAC(p1MAC)
p1NetIP := net.ParseIP(p1IP)
p2NetMAC, _ := net.ParseMAC(p2MAC)
p2NetIP := net.ParseIP(p2IP)

ovsPort1 := ovsconfig.OVSPortData{
UUID: uuid1, Name: "p1", OFPort: 11,
ExternalIDs: convertExternalIDMap(cniserver.BuildOVSPortExternalIDs(
interfacestore.NewContainerInterface("p1", uuid1, "pod1", "ns1", "eth0", p1NetMAC, []net.IP{p1NetIP}, 100)))}

ovsPort2 := ovsconfig.OVSPortData{
UUID: uuid2, Name: "p2", OFPort: 12,
ExternalIDs: convertExternalIDMap(cniserver.BuildOVSPortExternalIDs(
interfacestore.NewContainerInterface("p2", uuid2, "pod2", "ns2", "eth0", p2NetMAC, []net.IP{p2NetIP}, 100)))}

ovsPort3 := ovsconfig.OVSPortData{
UUID: uuid3, Name: "p3", OFPort: -1,
ExternalIDs: convertExternalIDMap(cniserver.BuildOVSPortExternalIDs(
interfacestore.NewContainerInterface("p3", uuid3, "pod3", "ns3", "eth0", p2NetMAC, []net.IP{p2NetIP}, 100)))}

ovsPort4 := ovsconfig.OVSPortData{
UUID: uuid3,
Name: "unknownIface",
OFPort: 20,
ExternalIDs: map[string]string{
"unknownKey": "unknownValue"}}

// Interface configurations
iface1 := cniserver.ParseOVSPortInterfaceConfig(&ovsPort1, &interfacestore.OVSPortConfig{PortUUID: ovsPort1.UUID, OFPort: ovsPort1.OFPort})
iface2 := cniserver.ParseOVSPortInterfaceConfig(&ovsPort2, &interfacestore.OVSPortConfig{PortUUID: ovsPort2.UUID, OFPort: ovsPort2.OFPort})
iface3 := cniserver.ParseOVSPortInterfaceConfig(&ovsPort3, &interfacestore.OVSPortConfig{PortUUID: ovsPort3.UUID, OFPort: ovsPort3.OFPort})

return map[string]string{"uuid1": uuid1, "uuid2": uuid2, "uuid3": uuid3}, []ovsconfig.OVSPortData{ovsPort1, ovsPort2, ovsPort3, ovsPort4}, []*interfacestore.InterfaceConfig{iface1, iface2, iface3}
}

func setupMockController(t *testing.T) (*gomock.Controller, *ovsconfigtest.MockOVSBridgeClient, *podwatchtesting.MockInterfaceConfigurator, *podwatchtesting.MockIPAMAllocator, *PodController) {
ctrl := gomock.NewController(t)
mockOVSBridgeClient := ovsconfigtest.NewMockOVSBridgeClient(ctrl)
interfaceConfigurator := podwatchtesting.NewMockInterfaceConfigurator(ctrl)
mockIPAM := podwatchtesting.NewMockIPAMAllocator(ctrl)

store := interfacestore.NewInterfaceStore()
pc := &PodController{
ovsBridgeClient: mockOVSBridgeClient,
interfaceStore: store,
cniCache: sync.Map{},
interfaceConfigurator: interfaceConfigurator,
ipamAllocator: mockIPAM,
}
return ctrl, mockOVSBridgeClient, interfaceConfigurator, mockIPAM, pc

}

func TestInitializeSecondaryInterfaceStore(t *testing.T) {
controller := gomock.NewController(t)
defer controller.Finish()

// Test Case 1: OVSBridgeClient is nil
store := interfacestore.NewInterfaceStore()
pc := &PodController{
ovsBridgeClient: nil,
interfaceStore: store,
}
err := pc.initializeSecondaryInterfaceStore()
assert.NoError(t, err, "No error when OVSBridgeClient is nil")

// Test Case 2: OVSBridgeClient returns an error
ctrl, mockOVSBridgeClient, _, _, pc := setupMockController(t)
defer ctrl.Finish()

mockOVSBridgeClient.EXPECT().GetPortList().Return(nil, ovsconfig.NewTransactionError(fmt.Errorf("Failed to list OVS ports"), true))
err = pc.initializeSecondaryInterfaceStore()
assert.Error(t, err, "Failed to list OVS ports")

// Test Case 3: OVSBridgeClient returns valid ports
uuids, ovsPorts, _ := createTestInterfaces()
mockOVSBridgeClient.EXPECT().GetPortList().Return(ovsPorts, nil)

err = pc.initializeSecondaryInterfaceStore()
assert.NoError(t, err, "OVS ports list successfully")

// Validate stored interfaces
assert.Equal(t, 3, pc.interfaceStore.Len(), "Only valid interfaces should be stored")
_, found1 := pc.interfaceStore.GetContainerInterface(uuids["uuid1"])
assert.True(t, found1, "Interface 1 should be stored")
_, found2 := pc.interfaceStore.GetContainerInterface(uuids["uuid2"])
assert.True(t, found2, "Interface 2 should be stored")
_, found3 := pc.interfaceStore.GetContainerInterface(uuids["uuid4"])
assert.False(t, found3, "Unknown interface type should not be stored")
}

func TestReconcileSecondaryInterfaces(t *testing.T) {
_, _, interfaceConfigurator, mockIPAM, pc := setupMockController(t)
primaryStore := interfacestore.NewInterfaceStore()

_, _, ifaces := createTestInterfaces()

// Add interfaces to primary store
primaryStore.AddInterface(ifaces[0])
primaryStore.AddInterface(ifaces[1])

// Add interfaces to controller secondaryInterfaceStore
pc.interfaceStore.AddInterface(ifaces[0])
pc.interfaceStore.AddInterface(ifaces[1])
pc.interfaceStore.AddInterface(ifaces[2])

interfaceConfigurator.EXPECT().DeleteVLANSecondaryInterface(gomock.Any()).Return(nil).Times(1)
mockIPAM.EXPECT().SecondaryNetworkRelease(gomock.Any()).Return(nil).Times(1)

err := pc.reconcileSecondaryInterfaces(primaryStore)
assert.NoError(t, err)
pc.interfaceStore.DeleteInterface(ifaces[2])

// Check CNI Cache
_, foundPod1 := pc.cniCache.Load("ns1/pod1")
_, foundPod2 := pc.cniCache.Load("ns2/pod2")
assert.True(t, foundPod1, "CNI Cache should contain ns1/pod1")
assert.True(t, foundPod2, "CNI Cache should contain ns2/pod2")

// Ensure stale interfaces are removed
_, foundPod3 := pc.cniCache.Load("ns3/pod3")
assert.False(t, foundPod3, "Stale interface should have been removed")
}
Loading

0 comments on commit d49d06a

Please sign in to comment.