Skip to content

Commit

Permalink
isolate node cleanup from worker queue
Browse files Browse the repository at this point in the history
  • Loading branch information
haouc committed Oct 13, 2023
1 parent 83d3279 commit 35ebac1
Show file tree
Hide file tree
Showing 16 changed files with 282 additions and 134 deletions.
7 changes: 6 additions & 1 deletion controllers/core/node_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ import (
// on cache only a single go routine should be sufficient. Using more than
// one routines to help high rate churn and larger nodes groups restarting
// when the controller has to be restarted for various reasons.
const MaxNodeConcurrentReconciles = 7
const (
MaxNodeConcurrentReconciles = 10
)

// NodeReconciler reconciles a Node object
type NodeReconciler struct {
Expand Down Expand Up @@ -96,6 +98,9 @@ func (r *NodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.
if found {
logger.V(1).Info("updating node")
err = r.Manager.UpdateNode(req.Name)

// ReconcileNode actually run a branch ENI leaking check from an independent goroutine on added nodes.
r.Manager.CheckNodeForLeakedENIs(req.Name)
} else {
logger.Info("adding node")
err = r.Manager.AddNode(req.Name)
Expand Down
8 changes: 6 additions & 2 deletions controllers/core/node_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package controllers
import (
"context"
"testing"
"time"

mock_condition "github.com/aws/amazon-vpc-resource-controller-k8s/mocks/amazon-vcp-resource-controller-k8s/pkg/condition"
mock_node "github.com/aws/amazon-vpc-resource-controller-k8s/mocks/amazon-vcp-resource-controller-k8s/pkg/node"
Expand Down Expand Up @@ -83,8 +84,9 @@ func TestNodeReconciler_Reconcile_AddNode(t *testing.T) {
mock := NewNodeMock(ctrl, mockNodeObj)

mock.Conditions.EXPECT().GetPodDataStoreSyncStatus().Return(true)
mock.Manager.EXPECT().GetNode(mockNodeName).Return(mock.MockNode, false)
mock.Manager.EXPECT().GetNode(mockNodeName).Return(mock.MockNode, false).Times(1)
mock.Manager.EXPECT().AddNode(mockNodeName).Return(nil)
mock.Manager.EXPECT().CheckNodeForLeakedENIs(mockNodeName).Times(0)

res, err := mock.Reconciler.Reconcile(context.TODO(), reconcileRequest)
assert.NoError(t, err)
Expand All @@ -98,10 +100,12 @@ func TestNodeReconciler_Reconcile_UpdateNode(t *testing.T) {
mock := NewNodeMock(ctrl, mockNodeObj)

mock.Conditions.EXPECT().GetPodDataStoreSyncStatus().Return(true)
mock.Manager.EXPECT().GetNode(mockNodeName).Return(mock.MockNode, true)
mock.Manager.EXPECT().GetNode(mockNodeName).Return(mock.MockNode, true).Times(1)
mock.Manager.EXPECT().UpdateNode(mockNodeName).Return(nil)
mock.Manager.EXPECT().CheckNodeForLeakedENIs(mockNodeName).Times(1)

res, err := mock.Reconciler.Reconcile(context.TODO(), reconcileRequest)
time.Sleep(time.Second)
assert.NoError(t, err)
assert.Equal(t, res, reconcile.Result{})
}
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

53 changes: 53 additions & 0 deletions mocks/amazon-vcp-resource-controller-k8s/pkg/node/mock_node.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions pkg/aws/errors/ec2_errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package errors

const (
NotFoundAssociationID = "InvalidAssociationID.NotFound"
NotFoundInterfaceID = "InvalidNetworkInterfaceID.NotFound"
)
34 changes: 34 additions & 0 deletions pkg/node/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"net/http"
"strings"
"sync"
"time"

"github.com/aws/amazon-vpc-resource-controller-k8s/apis/vpcresources/v1alpha1"
"github.com/aws/amazon-vpc-resource-controller-k8s/pkg/api"
Expand All @@ -33,6 +34,7 @@ import (
"github.com/samber/lo"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/healthz"

Expand Down Expand Up @@ -63,6 +65,7 @@ type Manager interface {
AddNode(nodeName string) error
UpdateNode(nodeName string) error
DeleteNode(nodeName string) error
CheckNodeForLeakedENIs(nodeName string)
}

// AsyncOperation is operation on a node after the lock has been released.
Expand Down Expand Up @@ -115,6 +118,37 @@ func NewNodeManager(logger logr.Logger, resourceManager resource.ResourceManager
return manager, worker.StartWorkerPool(manager.performAsyncOperation)
}

func (m *manager) CheckNodeForLeakedENIs(nodeName string) {
managedNode, found := m.GetNode(nodeName)
if !found {
m.Log.Info("Node manager couldn't find the node for reconciliation cleanup", "NodeName", nodeName)
return
}

// Only start a goroutine when need to
if time.Now().After(managedNode.GetNextReconciliationTime()) {
go func() {
if resourceProvider, found := m.resourceManager.GetResourceProvider(config.ResourceNamePodENI); found {
foundLeakedENI := resourceProvider.ReconcileNode(nodeName)
if foundLeakedENI {
managedNode.SetReconciliationInterval(node.NodeInitialCleanupInterval)
} else {
interval := wait.Jitter(managedNode.GetReconciliationInterval(), 5)
if interval > node.MaxNodeReconciliationInterval {
interval = node.MaxNodeReconciliationInterval
}
managedNode.SetReconciliationInterval(interval)
}
managedNode.SetNextReconciliationTime(time.Now().Add(managedNode.GetReconciliationInterval()))
m.Log.Info("reconciled cleanup node for leaking branch interfaces", "NodeName", nodeName, "NextInterval", managedNode.GetReconciliationInterval(), "NextReconciliationTime", managedNode.GetNextReconciliationTime())
} else {
// no SGP provider enabled
return
}
}()
}
}

// GetNode returns the node from in memory data store
func (m *manager) GetNode(nodeName string) (node node.Node, found bool) {
m.lock.RLock()
Expand Down
50 changes: 47 additions & 3 deletions pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"errors"
"fmt"
"sync"
"time"

"github.com/aws/amazon-vpc-resource-controller-k8s/pkg/aws/ec2"
"github.com/aws/amazon-vpc-resource-controller-k8s/pkg/aws/ec2/api"
Expand Down Expand Up @@ -44,8 +45,17 @@ type node struct {
k8sAPI k8s.K8sWrapper
// node has reference to EC2 APIs
ec2API api.EC2APIHelper
// lastReconciledTime time.Time
nextReconciliationTime time.Time
// reconciliation interval between cleanups
reconciliationInterval time.Duration
}

const (
MaxNodeReconciliationInterval = 15 * time.Minute
NodeInitialCleanupInterval = 1 * time.Minute
)

// ErrInitResources to wrap error messages for all errors encountered
// during node initialization so the node can be de-registered on failure
type ErrInitResources struct {
Expand All @@ -69,6 +79,11 @@ type Node interface {

GetNodeInstanceID() string
HasInstance() bool

GetNextReconciliationTime() time.Time
SetNextReconciliationTime(time time.Time)
GetReconciliationInterval() time.Duration
SetReconciliationInterval(time time.Duration)
}

// NewManagedNode returns node managed by the controller
Expand All @@ -77,9 +92,10 @@ func NewManagedNode(log logr.Logger, nodeName string, instanceID string, os stri
managed: true,
log: log.WithName("node resource handler").
WithValues("node name", nodeName),
instance: ec2.NewEC2Instance(nodeName, instanceID, os),
k8sAPI: k8sAPI,
ec2API: ec2API,
instance: ec2.NewEC2Instance(nodeName, instanceID, os),
k8sAPI: k8sAPI,
ec2API: ec2API,
reconciliationInterval: NodeInitialCleanupInterval,
}
}

Expand Down Expand Up @@ -248,3 +264,31 @@ func (n *node) IsNitroInstance() bool {
isNitroInstance, err := utils.IsNitroInstance(n.instance.Type())
return err == nil && isNitroInstance
}

func (n *node) GetNextReconciliationTime() time.Time {
n.lock.RLock()
defer n.lock.RUnlock()

return n.nextReconciliationTime
}

func (n *node) SetNextReconciliationTime(time time.Time) {
n.lock.Lock()
defer n.lock.Unlock()

n.nextReconciliationTime = time
}

func (n *node) GetReconciliationInterval() time.Duration {
n.lock.RLock()
defer n.lock.RUnlock()

return n.reconciliationInterval
}

func (n *node) SetReconciliationInterval(time time.Duration) {
n.lock.Lock()
defer n.lock.Unlock()

n.reconciliationInterval = time
}
Loading

0 comments on commit 35ebac1

Please sign in to comment.