Skip to content

Commit

Permalink
Adding cninode leak metrics and cninode finalizer handler. (aws#476)
Browse files Browse the repository at this point in the history
* add finalizer handler in v1.4



* fix an err variable



* adding logs for mismatched CNINode



* add metrics for mismatches

Co-authored-by: Hao Zhou <zhuhz@amazon.com>
  • Loading branch information
yash97 and haouc authored Sep 20, 2024
1 parent 42a911f commit de7470a
Show file tree
Hide file tree
Showing 7 changed files with 166 additions and 9 deletions.
60 changes: 52 additions & 8 deletions controllers/core/node_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package controllers

import (
"context"
"fmt"
"net/http"
"time"

Expand All @@ -24,6 +25,8 @@ import (
"github.com/aws/amazon-vpc-resource-controller-k8s/pkg/k8s"
"github.com/aws/amazon-vpc-resource-controller-k8s/pkg/node/manager"
"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
"sigs.k8s.io/controller-runtime/pkg/metrics"

"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
Expand All @@ -33,16 +36,29 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/healthz"
)

var (
leakedCNINodeResourceCount = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "orphaned_cninode_objects",
Help: "The number of leaked cninode resources",
},
)

prometheusRegistered = false
)

// MaxNodeConcurrentReconciles is the number of go routines that can invoke
// Reconcile in parallel. Since Node Reconciler, performs local operation
// on cache only a single go routine should be sufficient. Using more than
// one routines to help high rate churn and larger nodes groups restarting
// when the controller has to be restarted for various reasons.
const (
MaxNodeConcurrentReconciles = 10
NodeTerminationFinalizer = "networking.k8s.aws/resource-cleanup"
)

// NodeReconciler reconciles a Node object
Expand Down Expand Up @@ -73,27 +89,45 @@ func (r *NodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.
}

node := &corev1.Node{}
var err error

logger := r.Log.WithValues("node", req.NamespacedName)

if err := r.Client.Get(ctx, req.NamespacedName, node); err != nil {
if errors.IsNotFound(err) {
r.Log.V(1).Info("the requested node couldn't be found by k8s client", "Node", req.NamespacedName)
if nodeErr := r.Client.Get(ctx, req.NamespacedName, node); nodeErr != nil {
if errors.IsNotFound(nodeErr) {
// clean up CNINode finalizer
cniNode := &v1alpha1.CNINode{}
if cninodeErr := r.Client.Get(ctx, req.NamespacedName, cniNode); cninodeErr == nil {
if yes := controllerutil.ContainsFinalizer(cniNode, NodeTerminationFinalizer); yes {
updated := cniNode.DeepCopy()
if yes = controllerutil.RemoveFinalizer(updated, NodeTerminationFinalizer); yes {
if err := r.Client.Patch(ctx, updated, client.MergeFrom(cniNode)); err != nil {
return ctrl.Result{}, err
}
r.Log.Info("removed leaked CNINode resource's finalizer", "cninode", cniNode.Name)
}
leakedCNINodeResourceCount.Inc()
}
} else if !errors.IsNotFound(cninodeErr) {
return ctrl.Result{}, fmt.Errorf("failed getting CNINode %s from cached client, %w", cniNode.Name, cninodeErr)
}

// clean up local cached nodes
_, found := r.Manager.GetNode(req.Name)
if found {
err := r.Manager.DeleteNode(req.Name)
if err != nil {
cacheErr := r.Manager.DeleteNode(req.Name)
if cacheErr != nil {
// The request is not retryable so not returning the error
logger.Error(err, "failed to delete node from manager")
logger.Error(cacheErr, "failed to delete node from manager")
return ctrl.Result{}, nil
}
logger.V(1).Info("deleted the node from manager")
}
}
return ctrl.Result{}, client.IgnoreNotFound(err)
return ctrl.Result{}, client.IgnoreNotFound(nodeErr)
}

var err error

_, found := r.Manager.GetNode(req.Name)
if found {
logger.V(1).Info("updating node")
Expand All @@ -115,6 +149,8 @@ func (r *NodeReconciler) SetupWithManager(mgr ctrl.Manager, healthzHandler *rcHe
map[string]healthz.Checker{"health-node-controller": r.Check()},
)

prometheusRegister()

return ctrl.NewControllerManagedBy(mgr).
For(&corev1.Node{}).
WithOptions(controller.Options{MaxConcurrentReconciles: MaxNodeConcurrentReconciles}).
Expand Down Expand Up @@ -152,3 +188,11 @@ func (r *NodeReconciler) Check() healthz.Checker {
return err
}
}

func prometheusRegister() {
if !prometheusRegistered {
metrics.Registry.MustRegister(leakedCNINodeResourceCount)

prometheusRegistered = true
}
}
41 changes: 41 additions & 0 deletions controllers/core/node_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,21 @@ import (
"testing"
"time"

"github.com/aws/amazon-vpc-resource-controller-k8s/apis/vpcresources/v1alpha1"
mock_condition "github.com/aws/amazon-vpc-resource-controller-k8s/mocks/amazon-vcp-resource-controller-k8s/pkg/condition"
mock_node "github.com/aws/amazon-vpc-resource-controller-k8s/mocks/amazon-vcp-resource-controller-k8s/pkg/node"
mock_manager "github.com/aws/amazon-vpc-resource-controller-k8s/mocks/amazon-vcp-resource-controller-k8s/pkg/node/manager"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
fakeClient "sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)
Expand Down Expand Up @@ -61,6 +64,7 @@ func NewNodeMock(ctrl *gomock.Controller, mockObjects ...client.Object) NodeMock

scheme := runtime.NewScheme()
_ = corev1.AddToScheme(scheme)
_ = v1alpha1.AddToScheme(scheme)
client := fakeClient.NewClientBuilder().WithScheme(scheme).WithObjects(mockObjects...).Build()

return NodeMock{
Expand Down Expand Up @@ -139,6 +143,43 @@ func TestNodeReconciler_Reconcile_DeleteNonExistentNode(t *testing.T) {
assert.Equal(t, res, reconcile.Result{})
}

func TestNodeReconciler_Reconcile_DeleteNonExistentNodesCNINode(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

mock := NewNodeMock(ctrl)
cniNode := &v1alpha1.CNINode{
ObjectMeta: v1.ObjectMeta{
Name: mockNodeName,
Finalizers: []string{NodeTerminationFinalizer},
},
}
mock.Reconciler.Client = fakeClient.NewClientBuilder().WithScheme(mock.Reconciler.Scheme).WithObjects(cniNode).Build()

mock.Conditions.EXPECT().GetPodDataStoreSyncStatus().Return(true)
mock.Manager.EXPECT().GetNode(mockNodeName).Return(mock.MockNode, false)

original := &v1alpha1.CNINode{}
err := mock.Reconciler.Client.Get(context.TODO(), types.NamespacedName{Name: cniNode.Name}, original)
assert.NoError(t, err)
assert.True(t, controllerutil.ContainsFinalizer(original, NodeTerminationFinalizer), "the CNINode has finalizer")

res, err := mock.Reconciler.Reconcile(context.TODO(), reconcileRequest)
assert.NoError(t, err)
assert.Equal(t, res, reconcile.Result{})

node := &corev1.Node{}
updated := &v1alpha1.CNINode{}
err = mock.Reconciler.Client.Get(context.TODO(), types.NamespacedName{Name: cniNode.Name}, node)
assert.Error(t, err, "the node shouldn't existing")
assert.True(t, errors.IsNotFound(err))

err = mock.Reconciler.Client.Get(context.TODO(), types.NamespacedName{Name: cniNode.Name}, updated)
assert.NoError(t, err)
assert.True(t, updated.Name == mockNodeName, "the CNINode should existing and waiting for finalizer removal")
assert.False(t, controllerutil.ContainsFinalizer(updated, NodeTerminationFinalizer), "CNINode finalizer should be removed when the node is gone")
}

func TestNodeReconciler_Reconcile_DeleteNonExistentUnmanagedNode(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions pkg/aws/ec2/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ type EC2Instance interface {
PrimaryNetworkInterfaceID() string
CurrentInstanceSecurityGroups() []string
SetNewCustomNetworkingSpec(subnetID string, securityGroup []string)
GetCustomNetworkingSpec() (subnetID string, securityGroup []string)
UpdateCurrentSubnetAndCidrBlock(helper api.EC2APIHelper) error
}

Expand Down Expand Up @@ -311,3 +312,10 @@ func (i *ec2Instance) updateCurrentSubnetAndCidrBlock(ec2APIHelper api.EC2APIHel

return nil
}

func (i *ec2Instance) GetCustomNetworkingSpec() (subnetID string, securityGroup []string) {
i.lock.RLock()
defer i.lock.RUnlock()

return i.newCustomNetworkingSubnetID, i.newCustomNetworkingSecurityGroups
}
2 changes: 1 addition & 1 deletion pkg/node/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func (m *manager) CreateCNINodeIfNotExisting(node *v1.Node) error {
}
return err
} else {
m.Log.V(1).Info("The CNINode is already existing", "CNINode", cniNode)
m.Log.Info("The CNINode is already existing", "cninode", cniNode.Name, "features", cniNode.Spec.Features)
return nil
}
}
Expand Down
47 changes: 47 additions & 0 deletions pkg/provider/branch/trunk/trunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package trunk
import (
"encoding/json"
"fmt"
"slices"
"strconv"
"strings"
"sync"
Expand All @@ -27,6 +28,7 @@ import (
"github.com/aws/amazon-vpc-resource-controller-k8s/pkg/aws/vpc"
"github.com/aws/amazon-vpc-resource-controller-k8s/pkg/config"
"github.com/aws/amazon-vpc-resource-controller-k8s/pkg/provider/branch/cooldown"
"github.com/samber/lo"

"github.com/aws/aws-sdk-go/aws"
awsEC2 "github.com/aws/aws-sdk-go/service/ec2"
Expand Down Expand Up @@ -62,6 +64,13 @@ var (
},
[]string{"operation"},
)
unreconciledTrunkENICount = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "unreconciled_trunk_network_interfaces",
Help: "The number of unreconciled trunk network interfaces",
},
[]string{"attribute"},
)
branchENIOperationsSuccessCount = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "branch_eni_opeartions_success_count",
Expand Down Expand Up @@ -173,6 +182,7 @@ func NewTrunkENI(logger logr.Logger, instance ec2.EC2Instance, helper api.EC2API
func PrometheusRegister() {
if !prometheusRegistered {
metrics.Registry.MustRegister(trunkENIOperationsErrCount)
metrics.Registry.MustRegister(unreconciledTrunkENICount)
metrics.Registry.MustRegister(branchENIOperationsSuccessCount)
metrics.Registry.MustRegister(branchENIOperationsFailureCount)

Expand All @@ -192,6 +202,7 @@ func (t *trunkENI) InitTrunk(instance ec2.EC2Instance, podList []v1.Pod) error {
return err
}

var trunk awsEC2.InstanceNetworkInterface
// Get trunk network interface
for _, nwInterface := range nwInterfaces {
// It's possible to get an empty network interface response if the instance is being deleted.
Expand All @@ -206,6 +217,7 @@ func (t *trunkENI) InitTrunk(instance ec2.EC2Instance, podList []v1.Pod) error {
} else {
return fmt.Errorf("failed to verify network interface status attached for %v", *nwInterface.NetworkInterfaceId)
}
trunk = *nwInterface
}
}

Expand All @@ -231,6 +243,41 @@ func (t *trunkENI) InitTrunk(instance ec2.EC2Instance, podList []v1.Pod) error {
return nil
}

// the node already have trunk, let's check if its SGs and Subnets match with expected
expectedSubnetID, expectedSecurityGroups := t.instance.GetCustomNetworkingSpec()
if len(expectedSecurityGroups) > 0 || expectedSubnetID != "" {
slices.Sort(expectedSecurityGroups)
trunkSGs := lo.Map(trunk.Groups, func(g *awsEC2.GroupIdentifier, _ int) string {
return lo.FromPtr(g.GroupId)
})
slices.Sort(trunkSGs)

mismatchedSubnets := expectedSubnetID != lo.FromPtr(trunk.SubnetId)
mismatchedSGs := !slices.Equal(expectedSecurityGroups, trunkSGs)

extraSGsInTrunk, missingSGsInTrunk := lo.Difference(trunkSGs, expectedSecurityGroups)
t.log.Info("Observed trunk ENI config",
"instanceID", t.instance.InstanceID(),
"trunkENIID", lo.FromPtr(trunk.NetworkInterfaceId),
"configuredTrunkSGs", trunkSGs,
"configuredTrunkSubnet", lo.FromPtr(trunk.SubnetId),
"desiredTrunkSGs", expectedSecurityGroups,
"desiredTrunkSubnet", expectedSubnetID,
"mismatchedSGs", mismatchedSGs,
"mismatchedSubnets", mismatchedSubnets,
"missingSGs", missingSGsInTrunk,
"extraSGs", extraSGsInTrunk,
)

if mismatchedSGs {
unreconciledTrunkENICount.WithLabelValues("security_groups").Inc()
}

if mismatchedSubnets {
unreconciledTrunkENICount.WithLabelValues("subnet").Inc()
}
}

// Get the list of branch ENIs
branchInterfaces, err := t.ec2ApiHelper.GetBranchNetworkInterface(&t.trunkENIId, aws.String(t.instance.SubnetID()))
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions pkg/provider/branch/trunk/trunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,7 @@ func TestTrunkENI_InitTrunk(t *testing.T) {
name: "TrunkExists_WithBranches, verifies no error when trunk exists with branches",
prepare: func(f *fields) {
f.mockInstance.EXPECT().InstanceID().Return(InstanceId)
f.mockInstance.EXPECT().GetCustomNetworkingSpec().Return("", []string{})
f.mockEC2APIHelper.EXPECT().GetInstanceNetworkInterface(&InstanceId).Return(instanceNwInterfaces, nil)
f.mockEC2APIHelper.EXPECT().WaitForNetworkInterfaceStatusChange(&trunkId, awsEc2.AttachmentStatusAttached).Return(nil)
f.mockInstance.EXPECT().SubnetID().Return(SubnetId)
Expand Down Expand Up @@ -674,6 +675,7 @@ func TestTrunkENI_InitTrunk(t *testing.T) {
name: "TrunkExists_DanglingENIs, verifies ENIs are pushed to delete queue if no pod exists",
prepare: func(f *fields) {
f.mockInstance.EXPECT().InstanceID().Return(InstanceId)
f.mockInstance.EXPECT().GetCustomNetworkingSpec().Return("", []string{})
f.mockEC2APIHelper.EXPECT().GetInstanceNetworkInterface(&InstanceId).Return(instanceNwInterfaces, nil)
f.mockEC2APIHelper.EXPECT().WaitForNetworkInterfaceStatusChange(&trunkId, awsEc2.AttachmentStatusAttached).Return(nil)
f.mockInstance.EXPECT().SubnetID().Return(SubnetId)
Expand Down

0 comments on commit de7470a

Please sign in to comment.