Skip to content

Commit

Permalink
Merge pull request #2523 from code-elinka/sync
Browse files Browse the repository at this point in the history
Simplify InstanceGroup.Sync()
  • Loading branch information
k8s-ci-robot authored Apr 11, 2024
2 parents ed3d148 + 98a677f commit a34a810
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 5 deletions.
13 changes: 10 additions & 3 deletions pkg/instancegroups/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

apiv1 "k8s.io/api/core/v1"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/ingress-gce/pkg/utils"
Expand Down Expand Up @@ -51,6 +52,12 @@ type ControllerConfig struct {
StopCh <-chan struct{}
}

var defaultNodeObj = &apiv1.Node{
ObjectMeta: meta_v1.ObjectMeta{
Name: "default",
},
}

// NewController returns a new node update controller.
func NewController(config *ControllerConfig, logger klog.Logger) *Controller {
logger = logger.WithName("InstanceGroupsController")
Expand All @@ -65,14 +72,14 @@ func NewController(config *ControllerConfig, logger klog.Logger) *Controller {

config.NodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
c.queue.Enqueue(obj)
c.queue.Enqueue(defaultNodeObj)
},
DeleteFunc: func(obj interface{}) {
c.queue.Enqueue(obj)
c.queue.Enqueue(defaultNodeObj)
},
UpdateFunc: func(oldObj, newObj interface{}) {
if nodeStatusChanged(oldObj.(*apiv1.Node), newObj.(*apiv1.Node)) {
c.queue.Enqueue(newObj)
c.queue.Enqueue(defaultNodeObj)
}
},
})
Expand Down
93 changes: 91 additions & 2 deletions pkg/instancegroups/controller_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
package instancegroups

import (
"context"
"testing"
"time"

"github.com/go-logr/logr"
compute "google.golang.org/api/compute/v1"
api_v1 "k8s.io/api/core/v1"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
informerv1 "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/ingress-gce/pkg/utils"
)

func TestNodeStatusChanged(t *testing.T) {
Expand Down Expand Up @@ -58,8 +64,7 @@ func TestNodeStatusChanged(t *testing.T) {
func testNode() *api_v1.Node {
return &api_v1.Node{
ObjectMeta: meta_v1.ObjectMeta{
Namespace: "ns",
Name: "node",
Name: "node",
Annotations: map[string]string{
"key1": "value1",
},
Expand All @@ -79,3 +84,87 @@ func testNode() *api_v1.Node {
},
}
}

func TestSync(t *testing.T) {
config := &ControllerConfig{}
resyncPeriod := 1 * time.Second
fakeKubeClient := fake.NewSimpleClientset()
informer := informerv1.NewNodeInformer(fakeKubeClient, resyncPeriod, utils.NewNamespaceIndexer())
config.NodeInformer = informer
fakeManager := &IGManagerFake{}
config.IGManager = fakeManager
config.HasSynced = func() bool {
return true
}

controller := NewController(config, logr.Logger{})

channel := make(chan struct{})
go informer.Run(channel)
go controller.Run()

var expectedSyncedNodesCounter = 0
firstNode := testNode()
secondNode := testNode()
secondNode.Name = "secondNode"

// Add two nodes
fakeKubeClient.CoreV1().Nodes().Create(context.TODO(), firstNode, meta_v1.CreateOptions{})
// wait time > resync period
time.Sleep(2 * time.Second)
fakeKubeClient.CoreV1().Nodes().Create(context.TODO(), secondNode, meta_v1.CreateOptions{})
// The counter = 1 because it synced only once (for the first Create() call)
expectedSyncedNodesCounter += 1
verifyExpectedSyncerCount(t, fakeManager.syncedNodes, expectedSyncedNodesCounter)

// Update both nodes
firstNode.Annotations["key"] = "true"
firstNode.Spec.Unschedulable = false
secondNode.Annotations["key"] = "true"
fakeKubeClient.CoreV1().Nodes().Update(context.TODO(), firstNode, meta_v1.UpdateOptions{})
fakeKubeClient.CoreV1().Nodes().Update(context.TODO(), secondNode, meta_v1.UpdateOptions{})
time.Sleep(2 * time.Second)
// nodes were updated
expectedSyncedNodesCounter += 1
verifyExpectedSyncerCount(t, fakeManager.syncedNodes, expectedSyncedNodesCounter)

// no real update
fakeKubeClient.CoreV1().Nodes().Update(context.TODO(), firstNode, meta_v1.UpdateOptions{})
// Nothing should change
time.Sleep(2 * time.Second)
verifyExpectedSyncerCount(t, fakeManager.syncedNodes, expectedSyncedNodesCounter)
}

func verifyExpectedSyncerCount(t *testing.T, syncedNodes [][]string, expectedCount int) {
if len(syncedNodes) != expectedCount {
t.Errorf("verifyExpectedSyncerCount(): synced unexpected amount of times (gotCount, expectedCount), (%d, %d)", len(syncedNodes), expectedCount)
}
}

type IGManagerFake struct {
syncedNodes [][]string
}

func (igmf *IGManagerFake) Sync(nodeNames []string) error {
igmf.syncedNodes = append(igmf.syncedNodes, nodeNames)
return nil
}

func (igmf *IGManagerFake) EnsureInstanceGroupsAndPorts(name string, ports []int64) ([]*compute.InstanceGroup, error) {
igmf.syncedNodes = append(igmf.syncedNodes, []string{name})
return []*compute.InstanceGroup{}, nil
}

func (igmf *IGManagerFake) DeleteInstanceGroup(name string) error {
igmf.syncedNodes = append(igmf.syncedNodes, []string{name})
return nil
}

func (igmf *IGManagerFake) Get(name, zone string) (*compute.InstanceGroup, error) {
ig := compute.InstanceGroup{Name: name, Zone: zone}
return &ig, nil
}

func (igmf *IGManagerFake) List() ([]string, error) {
return []string{}, nil
}

0 comments on commit a34a810

Please sign in to comment.