Skip to content

Commit

Permalink
Merge pull request #1973 from swetharepakula/remove-eps-flag
Browse files Browse the repository at this point in the history
Only use endpoint slices to generate network endpoints
  • Loading branch information
k8s-ci-robot authored Feb 27, 2023
2 parents 4f59808 + ca87598 commit 5c07837
Show file tree
Hide file tree
Showing 17 changed files with 872 additions and 1,514 deletions.
3 changes: 0 additions & 3 deletions cmd/glbc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,6 @@ func main() {
EnableASMConfigMap: flags.F.EnableASMConfigMapBasedConfig,
ASMConfigMapNamespace: flags.F.ASMConfigMapBasedConfigNamespace,
ASMConfigMapName: flags.F.ASMConfigMapBasedConfigCMName,
EndpointSlicesEnabled: flags.F.EnableEndpointSlices,
MaxIGSize: flags.F.MaxIGSize,
EnableL4ILBDualStack: flags.F.EnableL4ILBDualStack,
EnableL4NetLBDualStack: flags.F.EnableL4NetLBDualStack,
Expand Down Expand Up @@ -323,7 +322,6 @@ func runControllers(ctx *ingctx.ControllerContext) {
ctx.ServiceInformer,
ctx.PodInformer,
ctx.NodeInformer,
ctx.EndpointInformer,
ctx.EndpointSliceInformer,
ctx.SvcNegInformer,
ctx.HasSynced,
Expand All @@ -341,7 +339,6 @@ func runControllers(ctx *ingctx.ControllerContext) {
flags.F.EnableNonGCPMode,
enableAsm,
asmServiceNEGSkipNamespaces,
flags.F.EnableEndpointSlices,
klog.TODO(), // TODO(#1761): Replace this with a top level logger configuration once one is available.
)

Expand Down
33 changes: 6 additions & 27 deletions pkg/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,7 @@ type ControllerContext struct {
FrontendConfigInformer cache.SharedIndexInformer
PodInformer cache.SharedIndexInformer
NodeInformer cache.SharedIndexInformer
EndpointInformer cache.SharedIndexInformer
EndpointSliceInformer cache.SharedIndexInformer
UseEndpointSlices bool
ConfigMapInformer cache.SharedIndexInformer
SvcNegInformer cache.SharedIndexInformer
IngClassInformer cache.SharedIndexInformer
Expand Down Expand Up @@ -124,7 +122,6 @@ type ControllerContextConfig struct {
EnableASMConfigMap bool
ASMConfigMapNamespace string
ASMConfigMapName string
EndpointSlicesEnabled bool
MaxIGSize int
EnableL4ILBDualStack bool
EnableL4NetLBDualStack bool
Expand Down Expand Up @@ -180,25 +177,18 @@ func NewControllerContext(
context.RegionalCluster = true
}

context.UseEndpointSlices = config.EndpointSlicesEnabled
// Do not trigger periodic resync on Endpoints or EndpointSlices object.
// Do not trigger periodic resync on EndpointSlices object.
// This aims improve NEG controller performance by avoiding unnecessary NEG sync that triggers for each NEG syncer.
// As periodic resync may temporary starve NEG API ratelimit quota.
if config.EndpointSlicesEnabled {
context.EndpointSliceInformer = discoveryinformer.NewEndpointSliceInformer(kubeClient, config.Namespace, 0,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc, endpointslices.EndpointSlicesByServiceIndex: endpointslices.EndpointSlicesByServiceFunc})
} else {
context.EndpointInformer = informerv1.NewEndpointsInformer(kubeClient, config.Namespace, 0, utils.NewNamespaceIndexer())
}
context.EndpointSliceInformer = discoveryinformer.NewEndpointSliceInformer(kubeClient, config.Namespace, 0,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc, endpointslices.EndpointSlicesByServiceIndex: endpointslices.EndpointSlicesByServiceFunc})

context.Translator = translator.NewTranslator(
context.ServiceInformer,
context.BackendConfigInformer,
context.NodeInformer,
context.PodInformer,
context.EndpointInformer,
context.EndpointSliceInformer,
context.UseEndpointSlices,
context.KubeClient,
)
context.InstancePool = instancegroups.NewManager(&instancegroups.ManagerConfig{
Expand Down Expand Up @@ -249,14 +239,7 @@ func (ctx *ControllerContext) HasSynced() bool {
ctx.PodInformer.HasSynced,
ctx.NodeInformer.HasSynced,
ctx.SvcNegInformer.HasSynced,
}

if ctx.EndpointInformer != nil {
funcs = append(funcs, ctx.EndpointInformer.HasSynced)
}

if ctx.EndpointSliceInformer != nil {
funcs = append(funcs, ctx.EndpointSliceInformer.HasSynced)
ctx.EndpointSliceInformer.HasSynced,
}

if ctx.FrontendConfigInformer != nil {
Expand Down Expand Up @@ -336,12 +319,8 @@ func (ctx *ControllerContext) Start(stopCh chan struct{}) {
go ctx.ServiceInformer.Run(stopCh)
go ctx.PodInformer.Run(stopCh)
go ctx.NodeInformer.Run(stopCh)
if ctx.EndpointInformer != nil {
go ctx.EndpointInformer.Run(stopCh)
}
if ctx.EndpointSliceInformer != nil {
go ctx.EndpointSliceInformer.Run(stopCh)
}
go ctx.EndpointSliceInformer.Run(stopCh)

if ctx.BackendConfigInformer != nil {
go ctx.BackendConfigInformer.Run(stopCh)
}
Expand Down
43 changes: 1 addition & 42 deletions pkg/controller/translator/translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,18 +63,14 @@ func NewTranslator(serviceInformer cache.SharedIndexInformer,
backendConfigInformer cache.SharedIndexInformer,
nodeInformer cache.SharedIndexInformer,
podInformer cache.SharedIndexInformer,
endpointInformer cache.SharedIndexInformer,
endpointSliceInformer cache.SharedIndexInformer,
useEndpointSlices bool,
kubeClient kubernetes.Interface) *Translator {
return &Translator{
serviceInformer,
backendConfigInformer,
nodeInformer,
podInformer,
endpointInformer,
endpointSliceInformer,
useEndpointSlices,
kubeClient,
}
}
Expand All @@ -85,9 +81,7 @@ type Translator struct {
BackendConfigInformer cache.SharedIndexInformer
NodeInformer cache.SharedIndexInformer
PodInformer cache.SharedIndexInformer
EndpointInformer cache.SharedIndexInformer
EndpointSliceInformer cache.SharedIndexInformer
UseEndpointSlices bool
KubeClient kubernetes.Interface
}

Expand Down Expand Up @@ -492,11 +486,7 @@ func (t *Translator) GatherEndpointPorts(svcPorts []utils.ServicePort) []string
if p.TargetPort.Type == intstr.Int {
endpointPorts = []int{p.TargetPort.IntValue()}
} else {
if t.UseEndpointSlices {
endpointPorts = listEndpointTargetPortsFromEndpointSlices(t.EndpointSliceInformer.GetIndexer(), p.ID.Service.Namespace, p.ID.Service.Name, p.PortName)
} else {
endpointPorts = listEndpointTargetPortsFromEndpoints(t.EndpointInformer.GetIndexer(), p.ID.Service.Namespace, p.ID.Service.Name, p.PortName)
}
endpointPorts = listEndpointTargetPortsFromEndpointSlices(t.EndpointSliceInformer.GetIndexer(), p.ID.Service.Namespace, p.ID.Service.Name, p.PortName)
}
for _, ep := range endpointPorts {
portMap[int64(ep)] = true
Expand Down Expand Up @@ -609,37 +599,6 @@ func listEndpointTargetPortsFromEndpointSlices(indexer cache.Indexer, namespace,
return ret
}

// finds the actual target port behind named target port, the name of the target port is the same as service port name
func listEndpointTargetPortsFromEndpoints(indexer cache.Indexer, namespace, name, svcPortName string) []int {
ep, exists, err := indexer.Get(
&api_v1.Endpoints{
ObjectMeta: meta_v1.ObjectMeta{
Name: name,
Namespace: namespace,
},
},
)

if !exists {
klog.Errorf("Endpoint object %v/%v does not exist.", namespace, name)
return []int{}
}
if err != nil {
klog.Errorf("Failed to retrieve endpoint object %v/%v: %v", namespace, name, err)
return []int{}
}

ret := []int{}
for _, subset := range ep.(*api_v1.Endpoints).Subsets {
for _, port := range subset.Ports {
if port.Protocol == api_v1.ProtocolTCP && port.Name == svcPortName {
ret = append(ret, int(port.Port))
}
}
}
return ret
}

// orderedPods sorts a list of Pods by creation timestamp, using their names as a tie breaker.
type orderedPods []*api_v1.Pod

Expand Down
43 changes: 13 additions & 30 deletions pkg/controller/translator/translator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,10 @@ var (
)

func fakeTranslator() *Translator {
return configuredFakeTranslator(false)
return configuredFakeTranslator()
}

func configuredFakeTranslator(useEndpointSlices bool) *Translator {
func configuredFakeTranslator() *Translator {
client := fake.NewSimpleClientset()
backendConfigClient := backendconfigclient.NewSimpleClientset()
namespace := apiv1.NamespaceAll
Expand All @@ -76,22 +76,14 @@ func configuredFakeTranslator(useEndpointSlices bool) *Translator {
BackendConfigInformer := informerbackendconfig.NewBackendConfigInformer(backendConfigClient, namespace, resyncPeriod, utils.NewNamespaceIndexer())
PodInformer := informerv1.NewPodInformer(client, namespace, resyncPeriod, utils.NewNamespaceIndexer())
NodeInformer := informerv1.NewNodeInformer(client, resyncPeriod, utils.NewNamespaceIndexer())
var EndpointSliceInformer cache.SharedIndexInformer
var EndpointInformer cache.SharedIndexInformer
if useEndpointSlices {
EndpointSliceInformer = discoveryinformer.NewEndpointSliceInformer(client, namespace, 0,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc, endpointslices.EndpointSlicesByServiceIndex: endpointslices.EndpointSlicesByServiceFunc})
} else {
EndpointInformer = informerv1.NewEndpointsInformer(client, namespace, 0, utils.NewNamespaceIndexer())
}
EndpointSliceInformer := discoveryinformer.NewEndpointSliceInformer(client, namespace, 0,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc, endpointslices.EndpointSlicesByServiceIndex: endpointslices.EndpointSlicesByServiceFunc})
return NewTranslator(
ServiceInformer,
BackendConfigInformer,
NodeInformer,
PodInformer,
EndpointInformer,
EndpointSliceInformer,
useEndpointSlices,
client,
)
}
Expand Down Expand Up @@ -933,25 +925,16 @@ func TestGatherEndpointPorts(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
for _, useSlices := range []bool{false, true} {
translator := configuredFakeTranslator(useSlices)
// Add endpoints or endpoint slices to informers.
if useSlices {
endpointSliceLister := translator.EndpointSliceInformer.GetIndexer()
for _, slice := range tc.endpointSlices {
endpointSliceLister.Add(slice)
}
} else {
endpointLister := translator.EndpointInformer.GetIndexer()
for _, ep := range tc.endpoints {
endpointLister.Add(ep)
}
}
translator := configuredFakeTranslator()
// Add endpoints or endpoint slices to informers.
endpointSliceLister := translator.EndpointSliceInformer.GetIndexer()
for _, slice := range tc.endpointSlices {
endpointSliceLister.Add(slice)
}

gotPorts := translator.GatherEndpointPorts(tc.svcPorts)
if !sets.NewString(gotPorts...).Equal(sets.NewString(tc.expectedPorts...)) {
t.Errorf("GatherEndpointPorts() = %v, expected %v (using slices: %v)", gotPorts, tc.expectedPorts, useSlices)
}
gotPorts := translator.GatherEndpointPorts(tc.svcPorts)
if !sets.NewString(gotPorts...).Equal(sets.NewString(tc.expectedPorts...)) {
t.Errorf("GatherEndpointPorts() = %v, expected %v", gotPorts, tc.expectedPorts)
}
})
}
Expand Down
1 change: 0 additions & 1 deletion pkg/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,6 @@ L7 load balancing. CSV values accepted. Example: -node-port-ranges=80,8080,400-5
flag.StringVar(&F.GKEClusterHash, "gke-cluster-hash", "", "The cluster hash of the GKE cluster this Ingress Controller will be interacting with")
flag.StringVar(&F.GKEClusterType, "gke-cluster-type", "ZONAL", "The cluster type of the GKE cluster this Ingress Controller will be interacting with")
flag.BoolVar(&F.EnableTrafficScaling, "enable-traffic-scaling", false, "Enable support for Service {max-rate-per-endpoint, capacity-scaler}")
flag.BoolVar(&F.EnableEndpointSlices, "enable-endpoint-slices", false, "Enable using Endpoint Slices API instead of Endpoints API")
flag.BoolVar(&F.EnablePinhole, "enable-pinhole", false, "Enable Pinhole firewall feature")
flag.BoolVar(&F.EnableL4ILBDualStack, "enable-l4ilb-dual-stack", false, "Enable Dual-Stack handling for L4 Internal Load Balancers")
flag.BoolVar(&F.EnableL4NetLBDualStack, "enable-l4netlb-dual-stack", false, "Enable Dual-Stack handling for L4 External Load Balancers")
Expand Down
48 changes: 8 additions & 40 deletions pkg/neg/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ func NewController(
serviceInformer cache.SharedIndexInformer,
podInformer cache.SharedIndexInformer,
nodeInformer cache.SharedIndexInformer,
endpointInformer cache.SharedIndexInformer,
endpointSliceInformer cache.SharedIndexInformer,
svcNegInformer cache.SharedIndexInformer,
hasSynced func() bool,
Expand All @@ -129,7 +128,6 @@ func NewController(
enableNonGcpMode bool,
enableAsm bool,
asmServiceNEGSkipNamespaces []string,
enableEndpointSlices bool,
logger klog.Logger,
) *Controller {
logger = logger.WithName("NEGController")
Expand All @@ -153,13 +151,6 @@ func NewController(
recorder := eventBroadcaster.NewRecorder(negScheme,
apiv1.EventSource{Component: "neg-controller"})

var endpointIndexer, endpointSliceIndexer cache.Indexer
if enableEndpointSlices {
endpointSliceIndexer = endpointSliceInformer.GetIndexer()
} else {
endpointIndexer = endpointInformer.GetIndexer()
}

syncerMetrics := metrics.NewNegMetricsCollector(flags.F.NegMetricsExportInterval, logger)
manager := newSyncerManager(
namer,
Expand All @@ -170,13 +161,11 @@ func NewController(
kubeSystemUID,
podInformer.GetIndexer(),
serviceInformer.GetIndexer(),
endpointIndexer,
endpointSliceIndexer,
endpointSliceInformer.GetIndexer(),
nodeInformer.GetIndexer(),
svcNegInformer.GetIndexer(),
syncerMetrics,
enableNonGcpMode,
enableEndpointSlices,
logger)

var reflector readiness.Reflector
Expand Down Expand Up @@ -269,24 +258,13 @@ func NewController(
negController.enqueueService(cur)
},
})
if enableEndpointSlices {
endpointSliceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: negController.enqueueEndpointSlice,
DeleteFunc: negController.enqueueEndpointSlice,
UpdateFunc: func(old, cur interface{}) {
negController.enqueueEndpointSlice(cur)
},
})
} else {
endpointInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: negController.enqueueEndpoint,
DeleteFunc: negController.enqueueEndpoint,
UpdateFunc: func(old, cur interface{}) {
negController.enqueueEndpoint(cur)
},
})
}

endpointSliceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: negController.enqueueEndpointSlice,
DeleteFunc: negController.enqueueEndpointSlice,
UpdateFunc: func(old, cur interface{}) {
negController.enqueueEndpointSlice(cur)
},
})
nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
node := obj.(*apiv1.Node)
Expand Down Expand Up @@ -722,16 +700,6 @@ func (c *Controller) handleErr(err error, key interface{}) {
c.serviceQueue.AddRateLimited(key)
}

func (c *Controller) enqueueEndpoint(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
c.logger.Error(err, "Failed to generate endpoint key")
return
}
c.logger.V(3).Info("Adding Endpoints to endpointQueue for processing", "endpoints", key)
c.endpointQueue.Add(key)
}

func (c *Controller) enqueueEndpointSlice(obj interface{}) {
endpointSlice, ok := obj.(*discovery.EndpointSlice)
if !ok {
Expand Down
Loading

0 comments on commit 5c07837

Please sign in to comment.