Skip to content

Commit

Permalink
Merge pull request #1707 from panslava/trim-ig-to-1000
Browse files Browse the repository at this point in the history
Truncate nodes list to maximum instance group size (1000) before adding to instance group
  • Loading branch information
k8s-ci-robot authored May 23, 2022
2 parents 9f3089f + c0d9cef commit e24e6c8
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 70 deletions.
10 changes: 9 additions & 1 deletion pkg/instances/fakes.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
"google.golang.org/api/compute/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/ingress-gce/pkg/flags"
"k8s.io/ingress-gce/pkg/test"
"k8s.io/ingress-gce/pkg/utils"
)
Expand Down Expand Up @@ -138,7 +139,14 @@ func (f *FakeInstanceGroups) AddInstancesToInstanceGroup(name, zone string, inst
return err
}

f.zonesToIGsToInstances[zone][ig].Insert(instanceNames...)
newValue := sets.NewString(f.zonesToIGsToInstances[zone][ig].List()...)
newValue.Insert(instanceNames...)

if len(newValue) > flags.F.MaxIgSize {
return test.FakeGoogleAPIRequestEntityTooLargeError()
}

f.zonesToIGsToInstances[zone][ig] = newValue
return nil
}

Expand Down
18 changes: 18 additions & 0 deletions pkg/instances/instances.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"google.golang.org/api/compute/v1"
"k8s.io/client-go/tools/record"
"k8s.io/ingress-gce/pkg/events"
"k8s.io/ingress-gce/pkg/flags"
"k8s.io/ingress-gce/pkg/utils/namer"
"k8s.io/klog"

Expand Down Expand Up @@ -334,6 +335,23 @@ func (i *Instances) Sync(nodes []string) (err error) {
}
kubeNodes := sets.NewString(nodes...)

// Individual InstanceGroup has a limit for 1000 instances in it.
// As a result, it's not possible to add more to it.
if len(kubeNodes) > flags.F.MaxIgSize {
// List() will return a sorted list so the kubeNodesList truncation will have a stable set of nodes.
kubeNodesList := kubeNodes.List()

// Store first 10 truncated nodes for logging
truncatedNodesSample := kubeNodesList[flags.F.MaxIgSize:]
maxTruncatedNodesSampleSize := 10
if len(truncatedNodesSample) > maxTruncatedNodesSampleSize {
truncatedNodesSample = truncatedNodesSample[:maxTruncatedNodesSampleSize]
}

klog.Warningf("Total number of kubeNodes: %d, truncating to maximum Instance Group size = %d. Instance group name: %s. First %d truncated instances: %v", len(kubeNodesList), flags.F.MaxIgSize, igName, len(truncatedNodesSample), truncatedNodesSample)
kubeNodes = sets.NewString(kubeNodesList[:flags.F.MaxIgSize]...)
}

// A node deleted via kubernetes could still exist as a gce vm. We don't
// want to route requests to it. Similarly, a node added to kubernetes
// needs to get added to the instance group so we do route requests to it.
Expand Down
161 changes: 93 additions & 68 deletions pkg/instances/instances_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"google.golang.org/api/compute/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/ingress-gce/pkg/flags"
"k8s.io/ingress-gce/pkg/test"
"k8s.io/ingress-gce/pkg/utils/namer"
)
Expand All @@ -41,80 +42,104 @@ func newNodePool(f *FakeInstanceGroups, zone string) NodePool {
}

func TestNodePoolSync(t *testing.T) {
ig := &compute.InstanceGroup{Name: defaultNamer.InstanceGroup()}
fakeIGs := NewFakeInstanceGroups(map[string]IGsToInstances{
defaultZone: {
ig: sets.NewString("n1", "n2"),
},
})
pool := newNodePool(fakeIGs, defaultZone)
pool.EnsureInstanceGroupsAndPorts(defaultNamer.InstanceGroup(), []int64{80})

// KubeNodes: n1
// GCENodes: n1, n2
// Remove n2 from the instance group.

fakeIGs.calls = []int{}
kubeNodes := sets.NewString("n1")
pool.Sync(kubeNodes.List())
instancesList, err := fakeIGs.ListInstancesInInstanceGroup(ig.Name, defaultZone, allInstances)
if err != nil {
t.Fatalf("Error while listing instances in instance group: %v", err)
}
instances, err := test.InstancesListToNameSet(instancesList)
if err != nil {
t.Fatalf("Error while getting instances in instance group. IG: %v Error: %v", ig, err)
}
if instances.Len() != kubeNodes.Len() || !kubeNodes.IsSuperset(instances) {
t.Fatalf("%v != %v", kubeNodes, instances)
}
flags.F.MaxIgSize = 1000

// KubeNodes: n1, n2
// GCENodes: n1
// Try to add n2 to the instance group.
names1001 := make([]string, flags.F.MaxIgSize+1)
for i := 1; i <= flags.F.MaxIgSize+1; i++ {
names1001[i-1] = fmt.Sprintf("n%d", i)
}

fakeIGs = NewFakeInstanceGroups(map[string]IGsToInstances{
defaultZone: {
ig: sets.NewString("n1"),
testCases := []struct {
gceNodes sets.String
kubeNodes sets.String
shouldSkipSync bool
}{
{
gceNodes: sets.NewString("n1"),
kubeNodes: sets.NewString("n1", "n2"),
},
{
gceNodes: sets.NewString("n1, n2"),
kubeNodes: sets.NewString("n1"),
},
{
gceNodes: sets.NewString("n1", "n2"),
kubeNodes: sets.NewString("n1", "n2"),
shouldSkipSync: true,
},
{
gceNodes: sets.NewString(),
kubeNodes: sets.NewString(names1001...),
},
{
gceNodes: sets.NewString("n0", "n1"),
kubeNodes: sets.NewString(names1001...),
},
})
pool = newNodePool(fakeIGs, defaultZone)
pool.EnsureInstanceGroupsAndPorts(defaultNamer.InstanceGroup(), []int64{80})

fakeIGs.calls = []int{}
kubeNodes = sets.NewString("n1", "n2")
pool.Sync(kubeNodes.List())
instancesList, err = fakeIGs.ListInstancesInInstanceGroup(ig.Name, defaultZone, allInstances)
if err != nil {
t.Fatalf("Error while listing instances in instance group: %v", err)
}
instances, err = test.InstancesListToNameSet(instancesList)
if err != nil {
t.Fatalf("Error while getting instances in instance group. IG: %v Error: %v", ig, err)
}
if instances.Len() != kubeNodes.Len() ||
!kubeNodes.IsSuperset(instances) {
t.Fatalf("%v != %v", kubeNodes, instances)
}

// KubeNodes: n1, n2
// GCENodes: n1, n2
// Do nothing.
for _, testCase := range testCases {
// create fake gce node pool with existing gceNodes
ig := &compute.InstanceGroup{Name: defaultNamer.InstanceGroup()}
fakeGCEInstanceGroups := NewFakeInstanceGroups(map[string]IGsToInstances{
defaultZone: {
ig: testCase.gceNodes,
},
})

pool := newNodePool(fakeGCEInstanceGroups, defaultZone)

igName := defaultNamer.InstanceGroup()
ports := []int64{80}
_, err := pool.EnsureInstanceGroupsAndPorts(igName, ports)
if err != nil {
t.Fatalf("pool.EnsureInstanceGroupsAndPorts(%s, %v) returned error %v, want nil", igName, ports, err)
}

fakeIGs = NewFakeInstanceGroups(map[string]IGsToInstances{
defaultZone: {
ig: sets.NewString("n1", "n2"),
},
})
pool = newNodePool(fakeIGs, defaultZone)
pool.EnsureInstanceGroupsAndPorts(defaultNamer.InstanceGroup(), []int64{80})

fakeIGs.calls = []int{}
kubeNodes = sets.NewString("n1", "n2")
pool.Sync(kubeNodes.List())
if len(fakeIGs.calls) != 0 {
t.Fatalf(
"Did not expect any calls, got %+v", fakeIGs.calls)
// run sync with expected kubeNodes
apiCallsCountBeforeSync := len(fakeGCEInstanceGroups.calls)
err = pool.Sync(testCase.kubeNodes.List())
if err != nil {
t.Fatalf("pool.Sync(%v) returned error %v, want nil", testCase.kubeNodes.List(), err)
}

// run assertions
apiCallsCountAfterSync := len(fakeGCEInstanceGroups.calls)
if testCase.shouldSkipSync && apiCallsCountBeforeSync != apiCallsCountAfterSync {
t.Errorf("Should skip sync. apiCallsCountBeforeSync = %d, apiCallsCountAfterSync = %d", apiCallsCountBeforeSync, apiCallsCountAfterSync)
}

instancesList, err := fakeGCEInstanceGroups.ListInstancesInInstanceGroup(ig.Name, defaultZone, allInstances)
if err != nil {
t.Fatalf("fakeGCEInstanceGroups.ListInstancesInInstanceGroup(%s, %s, %s) returned error %v, want nil", ig.Name, defaultZone, allInstances, err)
}
instances, err := test.InstancesListToNameSet(instancesList)
if err != nil {
t.Fatalf("test.InstancesListToNameSet(%v) returned error %v, want nil", ig, err)
}

expectedInstancesSize := testCase.kubeNodes.Len()
if testCase.kubeNodes.Len() > flags.F.MaxIgSize {
// If kubeNodes bigger than maximum instance group size, resulted instances
// should be truncated to flags.F.MaxIgSize
expectedInstancesSize = flags.F.MaxIgSize
}
if instances.Len() != expectedInstancesSize {
t.Errorf("instances.Len() = %d not equal expectedInstancesSize = %d", instances.Len(), expectedInstancesSize)
}
if !testCase.kubeNodes.IsSuperset(instances) {
t.Errorf("kubeNodes = %v is not superset of instances = %v", testCase.kubeNodes, instances)
}

// call sync one more time and check that it will be no-op and will not cause any api calls
apiCallsCountBeforeSync = len(fakeGCEInstanceGroups.calls)
err = pool.Sync(testCase.kubeNodes.List())
if err != nil {
t.Fatalf("pool.Sync(%v) returned error %v, want nil", testCase.kubeNodes.List(), err)
}
apiCallsCountAfterSync = len(fakeGCEInstanceGroups.calls)
if apiCallsCountBeforeSync != apiCallsCountAfterSync {
t.Errorf("Should skip sync if called second time with the same kubeNodes. apiCallsCountBeforeSync = %d, apiCallsCountAfterSync = %d", apiCallsCountBeforeSync, apiCallsCountAfterSync)
}
}
}

Expand Down
1 change: 0 additions & 1 deletion pkg/l4lb/l4netlbcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,6 @@ func (lc *L4NetLBController) ensureBackendLinking(port utils.ServicePort) error
}

func (lc *L4NetLBController) ensureInstanceGroups(service *v1.Service, nodeNames []string) error {
// TODO(kl52752) implement limit for 1000 nodes in instance group
// TODO(kl52752) Move instance creation and deletion logic to NodeController
// to avoid race condition between controllers
_, _, nodePorts, _ := utils.GetPortsAndProtocol(service.Spec.Ports)
Expand Down
5 changes: 5 additions & 0 deletions pkg/l4lb/l4netlbcontroller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"time"

"google.golang.org/api/googleapi"
"k8s.io/ingress-gce/pkg/flags"

"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud"
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
Expand Down Expand Up @@ -581,6 +582,8 @@ func TestInternalLoadBalancerShouldNotBeProcessByL4NetLBController(t *testing.T)
}

func TestProcessServiceCreationFailed(t *testing.T) {
flags.F.MaxIgSize = 1000

for _, param := range []struct {
addMockFunc func(*cloud.MockGCE)
expectedError string
Expand Down Expand Up @@ -670,6 +673,8 @@ func TestProcessServiceDeletionFailed(t *testing.T) {
}

func TestProcessServiceUpdate(t *testing.T) {
flags.F.MaxIgSize = 1000

for _, param := range []struct {
Update func(*v1.Service)
CheckResult func(*L4NetLBController, *v1.Service) error
Expand Down
5 changes: 5 additions & 0 deletions pkg/test/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,11 @@ func FakeGoogleAPIConflictErr() *googleapi.Error {
return &googleapi.Error{Code: http.StatusConflict}
}

// FakeGoogleAPIRequestEntityTooLargeError creates a StatusRequestEntityTooLarge error with type googleapi.Error
func FakeGoogleAPIRequestEntityTooLargeError() *googleapi.Error {
return &googleapi.Error{Code: http.StatusRequestEntityTooLarge}
}

func InstancesListToNameSet(instancesList []*compute.InstanceWithNamedPorts) (sets.String, error) {
instancesSet := sets.NewString()
for _, instance := range instancesList {
Expand Down

0 comments on commit e24e6c8

Please sign in to comment.