diff --git a/controllers/core/node_controller.go b/controllers/core/node_controller.go index c9529d1e..8a1f8b0e 100644 --- a/controllers/core/node_controller.go +++ b/controllers/core/node_controller.go @@ -41,7 +41,9 @@ import ( // on cache only a single go routine should be sufficient. Using more than // one routines to help high rate churn and larger nodes groups restarting // when the controller has to be restarted for various reasons. -const MaxNodeConcurrentReconciles = 7 +const ( + MaxNodeConcurrentReconciles = 10 +) // NodeReconciler reconciles a Node object type NodeReconciler struct { @@ -96,6 +98,9 @@ func (r *NodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl. if found { logger.V(1).Info("updating node") err = r.Manager.UpdateNode(req.Name) + + // ReconcileNode actually run a branch ENI leaking check from an independent goroutine on added nodes. + r.Manager.CheckNodeForLeakedENIs(req.Name) } else { logger.Info("adding node") err = r.Manager.AddNode(req.Name) diff --git a/controllers/core/node_controller_test.go b/controllers/core/node_controller_test.go index 48e881bd..c592dccc 100644 --- a/controllers/core/node_controller_test.go +++ b/controllers/core/node_controller_test.go @@ -16,6 +16,7 @@ package controllers import ( "context" "testing" + "time" mock_condition "github.com/aws/amazon-vpc-resource-controller-k8s/mocks/amazon-vcp-resource-controller-k8s/pkg/condition" mock_node "github.com/aws/amazon-vpc-resource-controller-k8s/mocks/amazon-vcp-resource-controller-k8s/pkg/node" @@ -83,8 +84,9 @@ func TestNodeReconciler_Reconcile_AddNode(t *testing.T) { mock := NewNodeMock(ctrl, mockNodeObj) mock.Conditions.EXPECT().GetPodDataStoreSyncStatus().Return(true) - mock.Manager.EXPECT().GetNode(mockNodeName).Return(mock.MockNode, false) + mock.Manager.EXPECT().GetNode(mockNodeName).Return(mock.MockNode, false).Times(1) mock.Manager.EXPECT().AddNode(mockNodeName).Return(nil) + mock.Manager.EXPECT().CheckNodeForLeakedENIs(mockNodeName).Times(0) res, err := mock.Reconciler.Reconcile(context.TODO(), reconcileRequest) assert.NoError(t, err) @@ -98,10 +100,12 @@ func TestNodeReconciler_Reconcile_UpdateNode(t *testing.T) { mock := NewNodeMock(ctrl, mockNodeObj) mock.Conditions.EXPECT().GetPodDataStoreSyncStatus().Return(true) - mock.Manager.EXPECT().GetNode(mockNodeName).Return(mock.MockNode, true) + mock.Manager.EXPECT().GetNode(mockNodeName).Return(mock.MockNode, true).Times(1) mock.Manager.EXPECT().UpdateNode(mockNodeName).Return(nil) + mock.Manager.EXPECT().CheckNodeForLeakedENIs(mockNodeName).Times(1) res, err := mock.Reconciler.Reconcile(context.TODO(), reconcileRequest) + time.Sleep(time.Second) assert.NoError(t, err) assert.Equal(t, res, reconcile.Result{}) } diff --git a/mocks/amazon-vcp-resource-controller-k8s/pkg/node/manager/mock_manager.go b/mocks/amazon-vcp-resource-controller-k8s/pkg/node/manager/mock_manager.go index 62ab3fa8..092caf34 100644 --- a/mocks/amazon-vcp-resource-controller-k8s/pkg/node/manager/mock_manager.go +++ b/mocks/amazon-vcp-resource-controller-k8s/pkg/node/manager/mock_manager.go @@ -61,6 +61,18 @@ func (mr *MockManagerMockRecorder) AddNode(arg0 interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddNode", reflect.TypeOf((*MockManager)(nil).AddNode), arg0) } +// CheckNodeForLeakedENIs mocks base method. +func (m *MockManager) CheckNodeForLeakedENIs(arg0 string) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "CheckNodeForLeakedENIs", arg0) +} + +// CheckNodeForLeakedENIs indicates an expected call of CheckNodeForLeakedENIs. +func (mr *MockManagerMockRecorder) CheckNodeForLeakedENIs(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CheckNodeForLeakedENIs", reflect.TypeOf((*MockManager)(nil).CheckNodeForLeakedENIs), arg0) +} + // DeleteNode mocks base method. func (m *MockManager) DeleteNode(arg0 string) error { m.ctrl.T.Helper() diff --git a/mocks/amazon-vcp-resource-controller-k8s/pkg/node/mock_node.go b/mocks/amazon-vcp-resource-controller-k8s/pkg/node/mock_node.go index f61edccd..59879491 100644 --- a/mocks/amazon-vcp-resource-controller-k8s/pkg/node/mock_node.go +++ b/mocks/amazon-vcp-resource-controller-k8s/pkg/node/mock_node.go @@ -19,6 +19,7 @@ package mock_node import ( reflect "reflect" + time "time" resource "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/resource" gomock "github.com/golang/mock/gomock" @@ -61,6 +62,20 @@ func (mr *MockNodeMockRecorder) DeleteResources(arg0 interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteResources", reflect.TypeOf((*MockNode)(nil).DeleteResources), arg0) } +// GetNextReconciliationTime mocks base method. +func (m *MockNode) GetNextReconciliationTime() time.Time { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetNextReconciliationTime") + ret0, _ := ret[0].(time.Time) + return ret0 +} + +// GetNextReconciliationTime indicates an expected call of GetNextReconciliationTime. +func (mr *MockNodeMockRecorder) GetNextReconciliationTime() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetNextReconciliationTime", reflect.TypeOf((*MockNode)(nil).GetNextReconciliationTime)) +} + // GetNodeInstanceID mocks base method. func (m *MockNode) GetNodeInstanceID() string { m.ctrl.T.Helper() @@ -75,6 +90,20 @@ func (mr *MockNodeMockRecorder) GetNodeInstanceID() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetNodeInstanceID", reflect.TypeOf((*MockNode)(nil).GetNodeInstanceID)) } +// GetReconciliationInterval mocks base method. +func (m *MockNode) GetReconciliationInterval() time.Duration { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetReconciliationInterval") + ret0, _ := ret[0].(time.Duration) + return ret0 +} + +// GetReconciliationInterval indicates an expected call of GetReconciliationInterval. +func (mr *MockNodeMockRecorder) GetReconciliationInterval() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetReconciliationInterval", reflect.TypeOf((*MockNode)(nil).GetReconciliationInterval)) +} + // HasInstance mocks base method. func (m *MockNode) HasInstance() bool { m.ctrl.T.Helper() @@ -145,6 +174,30 @@ func (mr *MockNodeMockRecorder) IsReady() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsReady", reflect.TypeOf((*MockNode)(nil).IsReady)) } +// SetNextReconciliationTime mocks base method. +func (m *MockNode) SetNextReconciliationTime(arg0 time.Time) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "SetNextReconciliationTime", arg0) +} + +// SetNextReconciliationTime indicates an expected call of SetNextReconciliationTime. +func (mr *MockNodeMockRecorder) SetNextReconciliationTime(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetNextReconciliationTime", reflect.TypeOf((*MockNode)(nil).SetNextReconciliationTime), arg0) +} + +// SetReconciliationInterval mocks base method. +func (m *MockNode) SetReconciliationInterval(arg0 time.Duration) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "SetReconciliationInterval", arg0) +} + +// SetReconciliationInterval indicates an expected call of SetReconciliationInterval. +func (mr *MockNodeMockRecorder) SetReconciliationInterval(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetReconciliationInterval", reflect.TypeOf((*MockNode)(nil).SetReconciliationInterval), arg0) +} + // UpdateCustomNetworkingSpecs mocks base method. func (m *MockNode) UpdateCustomNetworkingSpecs(arg0 string, arg1 []string) { m.ctrl.T.Helper() diff --git a/mocks/amazon-vcp-resource-controller-k8s/pkg/provider/branch/trunk/mock_trunk.go b/mocks/amazon-vcp-resource-controller-k8s/pkg/provider/branch/trunk/mock_trunk.go index dfef2d23..ac4b1c73 100644 --- a/mocks/amazon-vcp-resource-controller-k8s/pkg/provider/branch/trunk/mock_trunk.go +++ b/mocks/amazon-vcp-resource-controller-k8s/pkg/provider/branch/trunk/mock_trunk.go @@ -141,10 +141,10 @@ func (mr *MockTrunkENIMockRecorder) PushENIsToFrontOfDeleteQueue(arg0, arg1 inte } // Reconcile mocks base method. -func (m *MockTrunkENI) Reconcile(arg0 []v1.Pod) error { +func (m *MockTrunkENI) Reconcile(arg0 []v1.Pod) bool { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Reconcile", arg0) - ret0, _ := ret[0].(error) + ret0, _ := ret[0].(bool) return ret0 } diff --git a/mocks/amazon-vcp-resource-controller-k8s/pkg/provider/mock_provider.go b/mocks/amazon-vcp-resource-controller-k8s/pkg/provider/mock_provider.go index 4fe2387e..0046284b 100644 --- a/mocks/amazon-vcp-resource-controller-k8s/pkg/provider/mock_provider.go +++ b/mocks/amazon-vcp-resource-controller-k8s/pkg/provider/mock_provider.go @@ -178,6 +178,20 @@ func (mr *MockResourceProviderMockRecorder) ProcessAsyncJob(arg0 interface{}) *g return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ProcessAsyncJob", reflect.TypeOf((*MockResourceProvider)(nil).ProcessAsyncJob), arg0) } +// ReconcileNode mocks base method. +func (m *MockResourceProvider) ReconcileNode(arg0 string) bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ReconcileNode", arg0) + ret0, _ := ret[0].(bool) + return ret0 +} + +// ReconcileNode indicates an expected call of ReconcileNode. +func (mr *MockResourceProviderMockRecorder) ReconcileNode(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReconcileNode", reflect.TypeOf((*MockResourceProvider)(nil).ReconcileNode), arg0) +} + // SubmitAsyncJob mocks base method. func (m *MockResourceProvider) SubmitAsyncJob(arg0 interface{}) { m.ctrl.T.Helper() diff --git a/pkg/aws/errors/ec2_errors.go b/pkg/aws/errors/ec2_errors.go new file mode 100644 index 00000000..c226d429 --- /dev/null +++ b/pkg/aws/errors/ec2_errors.go @@ -0,0 +1,6 @@ +package errors + +const ( + NotFoundAssociationID = "InvalidAssociationID.NotFound" + NotFoundInterfaceID = "InvalidNetworkInterfaceID.NotFound" +) diff --git a/pkg/node/manager/manager.go b/pkg/node/manager/manager.go index 0516a3d1..c9b6f9c2 100644 --- a/pkg/node/manager/manager.go +++ b/pkg/node/manager/manager.go @@ -19,6 +19,7 @@ import ( "net/http" "strings" "sync" + "time" "github.com/aws/amazon-vpc-resource-controller-k8s/apis/vpcresources/v1alpha1" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/api" @@ -33,6 +34,7 @@ import ( "github.com/samber/lo" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/healthz" @@ -63,6 +65,7 @@ type Manager interface { AddNode(nodeName string) error UpdateNode(nodeName string) error DeleteNode(nodeName string) error + CheckNodeForLeakedENIs(nodeName string) } // AsyncOperation is operation on a node after the lock has been released. @@ -115,6 +118,37 @@ func NewNodeManager(logger logr.Logger, resourceManager resource.ResourceManager return manager, worker.StartWorkerPool(manager.performAsyncOperation) } +func (m *manager) CheckNodeForLeakedENIs(nodeName string) { + managedNode, found := m.GetNode(nodeName) + if !found { + m.Log.Info("Node manager couldn't find the node for reconciliation cleanup", "NodeName", nodeName) + return + } + + // Only start a goroutine when need to + if time.Now().After(managedNode.GetNextReconciliationTime()) { + go func() { + if resourceProvider, found := m.resourceManager.GetResourceProvider(config.ResourceNamePodENI); found { + foundLeakedENI := resourceProvider.ReconcileNode(nodeName) + if foundLeakedENI { + managedNode.SetReconciliationInterval(node.NodeInitialCleanupInterval) + } else { + interval := wait.Jitter(managedNode.GetReconciliationInterval(), 5) + if interval > node.MaxNodeReconciliationInterval { + interval = node.MaxNodeReconciliationInterval + } + managedNode.SetReconciliationInterval(interval) + } + managedNode.SetNextReconciliationTime(time.Now().Add(managedNode.GetReconciliationInterval())) + m.Log.Info("reconciled cleanup node for leaking branch interfaces", "NodeName", nodeName, "NextInterval", managedNode.GetReconciliationInterval(), "NextReconciliationTime", managedNode.GetNextReconciliationTime()) + } else { + // no SGP provider enabled + return + } + }() + } +} + // GetNode returns the node from in memory data store func (m *manager) GetNode(nodeName string) (node node.Node, found bool) { m.lock.RLock() diff --git a/pkg/node/node.go b/pkg/node/node.go index 4ab151e5..5e45f7a0 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -17,6 +17,7 @@ import ( "errors" "fmt" "sync" + "time" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/aws/ec2" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/aws/ec2/api" @@ -44,8 +45,17 @@ type node struct { k8sAPI k8s.K8sWrapper // node has reference to EC2 APIs ec2API api.EC2APIHelper + // lastReconciledTime time.Time + nextReconciliationTime time.Time + // reconciliation interval between cleanups + reconciliationInterval time.Duration } +const ( + MaxNodeReconciliationInterval = 15 * time.Minute + NodeInitialCleanupInterval = 1 * time.Minute +) + // ErrInitResources to wrap error messages for all errors encountered // during node initialization so the node can be de-registered on failure type ErrInitResources struct { @@ -69,6 +79,11 @@ type Node interface { GetNodeInstanceID() string HasInstance() bool + + GetNextReconciliationTime() time.Time + SetNextReconciliationTime(time time.Time) + GetReconciliationInterval() time.Duration + SetReconciliationInterval(time time.Duration) } // NewManagedNode returns node managed by the controller @@ -77,9 +92,10 @@ func NewManagedNode(log logr.Logger, nodeName string, instanceID string, os stri managed: true, log: log.WithName("node resource handler"). WithValues("node name", nodeName), - instance: ec2.NewEC2Instance(nodeName, instanceID, os), - k8sAPI: k8sAPI, - ec2API: ec2API, + instance: ec2.NewEC2Instance(nodeName, instanceID, os), + k8sAPI: k8sAPI, + ec2API: ec2API, + reconciliationInterval: NodeInitialCleanupInterval, } } @@ -248,3 +264,31 @@ func (n *node) IsNitroInstance() bool { isNitroInstance, err := utils.IsNitroInstance(n.instance.Type()) return err == nil && isNitroInstance } + +func (n *node) GetNextReconciliationTime() time.Time { + n.lock.RLock() + defer n.lock.RUnlock() + + return n.nextReconciliationTime +} + +func (n *node) SetNextReconciliationTime(time time.Time) { + n.lock.Lock() + defer n.lock.Unlock() + + n.nextReconciliationTime = time +} + +func (n *node) GetReconciliationInterval() time.Duration { + n.lock.RLock() + defer n.lock.RUnlock() + + return n.reconciliationInterval +} + +func (n *node) SetReconciliationInterval(time time.Duration) { + n.lock.Lock() + defer n.lock.Unlock() + + n.reconciliationInterval = time +} diff --git a/pkg/provider/branch/provider.go b/pkg/provider/branch/provider.go index d963a2fe..a7e72469 100644 --- a/pkg/provider/branch/provider.go +++ b/pkg/provider/branch/provider.go @@ -184,7 +184,6 @@ func (b *branchENIProvider) InitResource(instance ec2.EC2Instance) error { // TODO: For efficiency submit the process delete queue job only when the delete queue has items. // Submit periodic jobs for the given node name b.SubmitAsyncJob(worker.NewOnDemandProcessDeleteQueueJob(nodeName)) - b.SubmitAsyncJob(worker.NewOnDemandReconcileNodeJob(nodeName)) b.log.Info("initialized the resource provider successfully") @@ -226,8 +225,6 @@ func (b *branchENIProvider) ProcessAsyncJob(job interface{}) (ctrl.Result, error return b.DeleteBranchUsedByPods(onDemandJob.NodeName, onDemandJob.UID) case worker.OperationProcessDeleteQueue: return b.ProcessDeleteQueue(onDemandJob.NodeName) - case worker.OperationReconcileNode: - return b.ReconcileNode(onDemandJob.NodeName) case worker.OperationDeleteNode: return b.DeleteNode(onDemandJob.NodeName) } @@ -270,27 +267,26 @@ func (b *branchENIProvider) UpdateResourceCapacity(instance ec2.EC2Instance) err // ReconcileNode reconciles a nodes by getting the list of pods from K8s and comparing the result // with the internal cache. -func (b *branchENIProvider) ReconcileNode(nodeName string) (ctrl.Result, error) { +func (b *branchENIProvider) ReconcileNode(nodeName string) bool { trunkENI, isPresent := b.getTrunkFromCache(nodeName) log := b.log.WithValues("node", nodeName) if !isPresent { - log.Info("stopping the reconcile job") - return ctrl.Result{}, nil + // return true to set the node next clean up asap since we don't know why trunk is missing + log.Info("no trunk ENI is pointing to the given node", "NodeName", nodeName) + return true } podList, err := b.apiWrapper.PodAPI.ListPods(nodeName) if err != nil { + // return true to set the node next cleanup asap since the LIST call may fail for other reasons + // we should assume that there are leaked resources need to be cleaned up log.Error(err, "failed fo list pod") - return reconcileRequeueRequest, nil - } - err = trunkENI.Reconcile(podList.Items) - if err != nil { - b.log.Error(err, "failed to reconcile") - return reconcileRequeueRequest, nil + return true } + foundLeakedENI := trunkENI.Reconcile(podList.Items) - log.V(1).Info("completed reconcile job") + log.Info("completed reconcile node cleanup on branch ENIs", "NodeName", nodeName) - return reconcileRequeueRequest, nil + return foundLeakedENI } // ProcessDeleteQueue removes cooled down ENIs associated with a trunk for a given node @@ -407,7 +403,11 @@ func (b *branchENIProvider) CreateAndAnnotateResources(podNamespace string, podN func (b *branchENIProvider) DeleteBranchUsedByPods(nodeName string, UID string) (ctrl.Result, error) { trunkENI, isPresent := b.getTrunkFromCache(nodeName) if !isPresent { - return ctrl.Result{}, fmt.Errorf("failed to find trunk ENI on the node %s", nodeName) + // trunk cache is local map with lock. it shouldn't return not found error if trunk exists + // if the node's trunk is not found, we shouldn't retry + // worst case we rely on node based clean up goroutines to clean branch ENIs up + b.log.Info("failed to find trunk ENI for the node %s", nodeName) + return ctrl.Result{}, nil } trunkENI.PushBranchENIsToCoolDownQueue(UID) diff --git a/pkg/provider/branch/provider_test.go b/pkg/provider/branch/provider_test.go index bab839ca..6f6d726d 100644 --- a/pkg/provider/branch/provider_test.go +++ b/pkg/provider/branch/provider_test.go @@ -479,7 +479,7 @@ func TestBranchENIProvider_CreateAndAnnotateResources_Annotate_Error(t *testing. // TestBranchENIProvider_ReconcileNode tests that the reconcile job returns no error and returns right results (with requeue after) // when the trunk ENI is present in cache -func TestBranchENIProvider_ReconcileNode(t *testing.T) { +func TestBranchENIProvider_ReconcileNode_NoLeak(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -491,11 +491,30 @@ func TestBranchENIProvider_ReconcileNode(t *testing.T) { list := &v1.PodList{} mockPodAPI.EXPECT().ListPods(NodeName).Return(list, nil) - fakeTrunk1.EXPECT().Reconcile(list.Items) + fakeTrunk1.EXPECT().Reconcile(list.Items).Return(false) - result, err := provider.ReconcileNode(NodeName) - assert.NoError(t, err) - assert.Equal(t, reconcileRequeueRequest, result) + result := provider.ReconcileNode(NodeName) + assert.False(t, result) +} + +// TestBranchENIProvider_ReconcileNode tests that the reconcile job returns no error and returns right results (with requeue after) +// when the trunk ENI is present in cache +func TestBranchENIProvider_ReconcileNode_Leak(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + provider, mockPodAPI, _, _ := getProviderAndMocks(ctrl) + + fakeTrunk1 := mock_trunk.NewMockTrunkENI(ctrl) + provider.trunkENICache[NodeName] = fakeTrunk1 + + list := &v1.PodList{} + mockPodAPI.EXPECT().ListPods(NodeName).Return(list, nil) + + fakeTrunk1.EXPECT().Reconcile(list.Items).Return(true) + + result := provider.ReconcileNode(NodeName) + assert.True(t, result) } // TestBranchENIProvider_ReconcileNode_TrunkENIDeleted tests that the reconcile job is removed once trunk eni is removed from @@ -503,9 +522,8 @@ func TestBranchENIProvider_ReconcileNode(t *testing.T) { func TestBranchENIProvider_ReconcileNode_TrunkENIDeleted(t *testing.T) { provider := getProvider() - result, err := provider.ReconcileNode(NodeName) - assert.NoError(t, err) - assert.Equal(t, k8sCtrl.Result{}, result) + result := provider.ReconcileNode(NodeName) + assert.True(t, result) } // TestBranchENIProvider_ProcessDeleteQueue_TrunkENIDeleted tests that the requeue job is removed once the trunk eni diff --git a/pkg/provider/branch/trunk/trunk.go b/pkg/provider/branch/trunk/trunk.go index fb408f8e..7b656fa5 100644 --- a/pkg/provider/branch/trunk/trunk.go +++ b/pkg/provider/branch/trunk/trunk.go @@ -17,11 +17,13 @@ import ( "encoding/json" "fmt" "strconv" + "strings" "sync" "time" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/aws/ec2" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/aws/ec2/api" + ec2Errors "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/aws/errors" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/aws/vpc" "github.com/aws/amazon-vpc-resource-controller-k8s/pkg/config" @@ -61,6 +63,20 @@ var ( }, []string{"operation"}, ) + branchENIOperationsSuccessCount = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "branch_eni_opeartions_success_count", + Help: "The number of branch ENI succeeded operations", + }, + []string{"operation"}, + ) + branchENIOperationsFailureCount = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "branch_eni_opeartions_failure_count", + Help: "The number of branch ENI failed operations", + }, + []string{"operation"}, + ) prometheusRegistered = false ) @@ -75,7 +91,7 @@ type TrunkENI interface { // DeleteCooledDownENIs deletes the interfaces that have been sitting in the queue for cool down period DeleteCooledDownENIs() // Reconcile compares the cache state with the list of pods to identify events that were missed and clean up the dangling interfaces - Reconcile(pods []v1.Pod) error + Reconcile(pods []v1.Pod) bool // PushENIsToFrontOfDeleteQueue pushes the eni network interfaces to the front of the delete queue PushENIsToFrontOfDeleteQueue(*v1.Pod, []*ENIDetails) // DeleteAllBranchENIs deletes all the branch ENI associated with the trunk and also clears the cool down queue @@ -158,6 +174,9 @@ func NewTrunkENI(logger logr.Logger, instance ec2.EC2Instance, helper api.EC2API func PrometheusRegister() { if !prometheusRegistered { metrics.Registry.MustRegister(trunkENIOperationsErrCount) + metrics.Registry.MustRegister(branchENIOperationsSuccessCount) + metrics.Registry.MustRegister(branchENIOperationsFailureCount) + prometheusRegistered = true } } @@ -277,7 +296,7 @@ func (t *trunkENI) InitTrunk(instance ec2.EC2Instance, podList []v1.Pod) error { // Reconcile reconciles the state from the API Server to the internal cache of EC2 Branch Interfaces, if the controller // missed some delete events the reconcile method will perform cleanup for the dangling interfaces -func (t *trunkENI) Reconcile(pods []v1.Pod) error { +func (t *trunkENI) Reconcile(pods []v1.Pod) bool { // Perform under lock to block new pods being added/removed concurrently t.lock.Lock() defer t.lock.Unlock() @@ -288,9 +307,12 @@ func (t *trunkENI) Reconcile(pods []v1.Pod) error { currentPodSet[string(pod.UID)] = isPresent } + leakedENIs := 0 for uid, branchENIs := range t.uidToBranchENIMap { _, exists := currentPodSet[uid] if !exists { + leakedENIs += 1 + branchENIOperationsSuccessCount.WithLabelValues("leaked_branch_enis").Inc() for _, eni := range branchENIs { // Pod could have been deleted recently, set the timestamp to current time as controller is not aware of the actual time. eni.deletionTimeStamp = time.Now() @@ -298,12 +320,12 @@ func (t *trunkENI) Reconcile(pods []v1.Pod) error { } delete(t.uidToBranchENIMap, uid) - t.log.Info("deleted pod that doesn't exist anymore", "pod uid", uid, + t.log.Info("trunk controller found leaked branch ENI. the controller pushed leaked ENI to delete queue and deleted pod that doesn't exist anymore", "pod uid", uid, "eni", branchENIs) } } - return nil + return leakedENIs > 0 } // CreateAndAssociateBranchToTrunk creates a new branch network interface and associates the branch to the trunk @@ -357,7 +379,10 @@ func (t *trunkENI) CreateAndAssociateBranchENIs(pod *v1.Pod, securityGroups []st if err != nil { err = fmt.Errorf("creating network interface, %w", err) t.freeVlanId(vlanID) + branchENIOperationsFailureCount.WithLabelValues("creating_branch_eni_failed").Inc() break + } else { + branchENIOperationsSuccessCount.WithLabelValues("created_branch_eni_succeeded").Inc() } // Branch ENI can have an IPv4 address, IPv6 address, or both @@ -391,7 +416,7 @@ func (t *trunkENI) CreateAndAssociateBranchENIs(pod *v1.Pod, securityGroups []st t.addBranchToCache(string(pod.UID), newENIs) - log.V(1).Info("successfully created branch interface/s", "interface/s", newENIs, + log.Info("successfully created branch interfaces", "interfaces", newENIs, "security group used", securityGroups) return newENIs, nil @@ -401,6 +426,7 @@ func (t *trunkENI) CreateAndAssociateBranchENIs(pod *v1.Pod, securityGroups []st // queue, this is the last API call to the the Trunk ENI before it is removed from cache func (t *trunkENI) DeleteAllBranchENIs() { // Delete all the branch used by the pod on this trunk ENI + // Since after this call, the trunk will be removed from cache. No need to clean up its branch map for _, podENIs := range t.uidToBranchENIMap { for _, eni := range podENIs { err := t.deleteENI(eni) @@ -429,8 +455,8 @@ func (t *trunkENI) PushBranchENIsToCoolDownQueue(UID string) { branchENIs, isPresent := t.uidToBranchENIMap[UID] if !isPresent { - t.log.V(1).Info("failed to find Branch ENI in cache, it could have been released if pod"+ - "succeeded/failed before being deleted", "uid", UID) + t.log.Info("couldn't find Branch ENI in cache, it could have been released if pod"+ + "succeeded/failed before being deleted", "UID", UID, "BranchENIs", branchENIs) trunkENIOperationsErrCount.WithLabelValues("get_branch_from_cache").Inc() return } @@ -442,8 +468,8 @@ func (t *trunkENI) PushBranchENIsToCoolDownQueue(UID string) { delete(t.uidToBranchENIMap, UID) - t.log.Info("moved branch network interfaces to delete queue", "interface/s", - branchENIs, "uid", UID) + t.log.Info("moved branch network interfaces to delete queue", "Interfaces", + branchENIs, "UID", UID) } func (t *trunkENI) DeleteCooledDownENIs() { @@ -477,10 +503,18 @@ func (t *trunkENI) deleteENI(eniDetail *ENIDetails) (err error) { // Delete Branch network interface first err = t.ec2ApiHelper.DeleteNetworkInterface(&eniDetail.ID) if err != nil { - trunkENIOperationsErrCount.WithLabelValues("delete_branch").Inc() - return err + branchENIOperationsFailureCount.WithLabelValues("delete_branch_error").Inc() + + if !strings.Contains(err.Error(), ec2Errors.NotFoundInterfaceID) { + t.log.Error(err, "calling EC2 delete API to delete the branch ENI failed", "BranchENI", eniDetail) + return err + } else { + t.log.Info("The branch ENI was not found by EC2. Will not call EC2 for deletion again", "BranchENI", eniDetail, "Error", err.Error()) + } } + branchENIOperationsSuccessCount.WithLabelValues("deleted_branch_succesfully").Inc() + t.log.Info("deleted eni", "eni details", eniDetail) // Free vlan id used by the branch ENI @@ -488,7 +522,7 @@ func (t *trunkENI) deleteENI(eniDetail *ENIDetails) (err error) { t.freeVlanId(eniDetail.VlanID) } - return + return nil } func (t *trunkENI) getBranchInterfaceMap(eniList []*ENIDetails) map[string]*ENIDetails { @@ -511,39 +545,6 @@ func (t *trunkENI) getBranchInterfacesUsedByPod(pod *v1.Pod) (eniDetails []*ENID return } -// GetBranchInterfacesFromEC2 returns the list of branch interfaces associated with the trunk ENI. This is not supported -// yet -func (t *trunkENI) GetBranchInterfacesFromEC2() (eniDetails []*ENIDetails, err error) { - // Get the branch associated with the trunk and store the result in the cache - associations, err := t.ec2ApiHelper.DescribeTrunkInterfaceAssociation(&t.trunkENIId) - if err != nil { - trunkENIOperationsErrCount.WithLabelValues("describe_trunk_assoc").Inc() - err = fmt.Errorf("failed to describe associations for trunk %s: %v", t.trunkENIId, err) - return - } - - // Return if no branches are associated with the trunk - if associations == nil || len(associations) == 0 { - t.log.V(1).Info("trunk has no associated branch interfaces", "trunk id", t.trunkENIId) - return - } - - // For each association build the map of branch ENIs with the interface id and the vlan id - for _, association := range associations { - eniDetail := &ENIDetails{ - ID: *association.BranchInterfaceId, - VlanID: int(*association.VlanId), - SubnetCIDR: t.instance.SubnetCidrBlock(), - SubnetV6CIDR: t.instance.SubnetV6CidrBlock(), - } - eniDetails = append(eniDetails, eniDetail) - } - - t.log.V(1).Info("loaded trunk associations", "trunk id", t.trunkENIId, "associations", eniDetails) - - return -} - // pushENIToDeleteQueue pushes an ENI to a delete queue func (t *trunkENI) pushENIToDeleteQueue(eni *ENIDetails) { t.lock.Lock() diff --git a/pkg/provider/branch/trunk/trunk_test.go b/pkg/provider/branch/trunk/trunk_test.go index f1bc1168..021dd0b0 100644 --- a/pkg/provider/branch/trunk/trunk_test.go +++ b/pkg/provider/branch/trunk/trunk_test.go @@ -359,60 +359,6 @@ func TestTrunkENI_popENIFromDeleteQueue(t *testing.T) { assert.False(t, hasENI) } -// TestTrunkENI_GetBranchInterfacesFromEC2 tests get branch interface from ec2 returns the branch interface with the -// eni id and vlan id populated -func TestTrunkENI_GetBranchInterfacesFromEC2(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - trunkENI, ec2APIHelper, mockInstance := getMockHelperInstanceAndTrunkObject(ctrl) - trunkENI.trunkENIId = trunkId - - ec2APIHelper.EXPECT().DescribeTrunkInterfaceAssociation(&trunkId).Return(trunkAssociationsBranch1And2, nil) - mockInstance.EXPECT().SubnetCidrBlock().Return(SubnetCidrBlock).Times(2) - mockInstance.EXPECT().SubnetV6CidrBlock().Return(SubnetV6CidrBlock).Times(2) - - eniDetails, err := trunkENI.GetBranchInterfacesFromEC2() - - assert.NoError(t, err) - - assert.Equal(t, EniDetails1.ID, eniDetails[0].ID) - assert.Equal(t, EniDetails1.VlanID, eniDetails[0].VlanID) - - assert.Equal(t, EniDetails2.ID, eniDetails[1].ID) - assert.Equal(t, EniDetails2.VlanID, eniDetails[1].VlanID) -} - -// TestTrunkENI_GetBranchInterfacesFromEC2_Error tests that error is returned if the operation fails -func TestTrunkENI_GetBranchInterfacesFromEC2_Error(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - trunkENI, ec2APIHelper, _ := getMockHelperInstanceAndTrunkObject(ctrl) - trunkENI.trunkENIId = trunkId - - ec2APIHelper.EXPECT().DescribeTrunkInterfaceAssociation(&trunkId).Return(nil, MockError) - - _, err := trunkENI.GetBranchInterfacesFromEC2() - assert.Error(t, MockError, err) -} - -// TestTrunkENI_GetBranchInterfacesFromEC2_NoBranch tests that error is not returned when there is no branch associated -// with the trunk -func TestTrunkENI_GetBranchInterfacesFromEC2_NoBranch(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - trunkENI, ec2APIHelper, _ := getMockHelperInstanceAndTrunkObject(ctrl) - trunkENI.trunkENIId = trunkId - - ec2APIHelper.EXPECT().DescribeTrunkInterfaceAssociation(&trunkId).Return(nil, nil) - - eniDetails, err := trunkENI.GetBranchInterfacesFromEC2() - assert.NoError(t, err) - assert.Nil(t, eniDetails) -} - // TestTrunkENI_getBranchInterfacesUsedByPod tests that branch interface are returned if present in pod annotation func TestTrunkENI_getBranchInterfacesUsedByPod(t *testing.T) { trunkENI := getMockTrunk() @@ -581,8 +527,8 @@ func TestTrunkENI_Reconcile(t *testing.T) { // Pod 1 doesn't exist anymore podList := []v1.Pod{*MockPod2} - err := trunkENI.Reconcile(podList) - assert.NoError(t, err) + leaked := trunkENI.Reconcile(podList) + assert.True(t, leaked) _, isPresent := trunkENI.uidToBranchENIMap[PodUID] assert.Equal(t, []*ENIDetails{EniDetails1, EniDetails2}, trunkENI.deleteQueue) @@ -596,8 +542,8 @@ func TestTrunkENI_Reconcile_NoStateChange(t *testing.T) { podList := []v1.Pod{*MockPod1, *MockPod2} - err := trunkENI.Reconcile(podList) - assert.NoError(t, err) + leaked := trunkENI.Reconcile(podList) + assert.False(t, leaked) _, isPresent := trunkENI.uidToBranchENIMap[PodUID] assert.Zero(t, trunkENI.deleteQueue) diff --git a/pkg/provider/ip/provider.go b/pkg/provider/ip/provider.go index ae3274f0..f3bba704 100644 --- a/pkg/provider/ip/provider.go +++ b/pkg/provider/ip/provider.go @@ -508,3 +508,8 @@ func (p *ipv4Provider) check() healthz.Checker { func (p *ipv4Provider) GetHealthChecker() healthz.Checker { return p.checker } + +// ReconcileNode implements provider.ResourceProvider. +func (*ipv4Provider) ReconcileNode(nodeName string) bool { + return false +} diff --git a/pkg/provider/prefix/provider.go b/pkg/provider/prefix/provider.go index ebeaf195..3cb22613 100644 --- a/pkg/provider/prefix/provider.go +++ b/pkg/provider/prefix/provider.go @@ -538,3 +538,8 @@ func (p *ipv4PrefixProvider) check() healthz.Checker { func (p *ipv4PrefixProvider) GetHealthChecker() healthz.Checker { return p.checker } + +// ReconcileNode implements provider.ResourceProvider. +func (*ipv4PrefixProvider) ReconcileNode(nodeName string) bool { + return false +} diff --git a/pkg/provider/provider.go b/pkg/provider/provider.go index 5a2e7fd6..c450358f 100644 --- a/pkg/provider/provider.go +++ b/pkg/provider/provider.go @@ -44,4 +44,5 @@ type ResourceProvider interface { GetHealthChecker() healthz.Checker // IntrospectSummary allows introspection of resources summary per node IntrospectSummary() interface{} + ReconcileNode(nodeName string) bool }