Skip to content

Commit

Permalink
Addressed review comments.
Browse files Browse the repository at this point in the history
Added helper func to identify L4 legacy services and flag to run L4 NEG controller.
  • Loading branch information
prameshj committed Jan 16, 2020
1 parent b04d02d commit 40eec7f
Show file tree
Hide file tree
Showing 10 changed files with 72 additions and 35 deletions.
2 changes: 1 addition & 1 deletion cmd/glbc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func runControllers(ctx *ingctx.ControllerContext) {
fwc := firewalls.NewFirewallController(ctx, flags.F.NodePortRanges.Values())

// TODO: Refactor NEG to use cloud mocks so ctx.Cloud can be referenced within NewController.
negController := neg.NewController(negtypes.NewAdapter(ctx.Cloud), ctx, lbc.Translator, ctx.ClusterNamer, flags.F.ResyncPeriod, flags.F.NegGCPeriod, flags.F.EnableReadinessReflector, flags.F.RunIngressController)
negController := neg.NewController(negtypes.NewAdapter(ctx.Cloud), ctx, lbc.Translator, ctx.ClusterNamer, flags.F.ResyncPeriod, flags.F.NegGCPeriod, flags.F.EnableReadinessReflector, flags.F.RunIngressController, flags.F.RunL4Controller)

go negController.Run(stopCh)
klog.V(0).Infof("negController started")
Expand Down
17 changes: 14 additions & 3 deletions pkg/annotations/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,20 @@ func FromService(obj *v1.Service) *Service {
return &Service{obj.Annotations}
}

// ILBAnnotation returns true if L4 ILB annotation is found.
func ILBAnnotation(svc *v1.Service) bool {
return gce.GetLoadBalancerAnnotationType(svc) == gce.LBTypeInternal
// WantsL4ILB checks if the given service requires L4 ILB.
// the function returns a boolean as well as the loadbalancer type(string).
func WantsL4ILB(service *v1.Service) (bool, string) {
if service == nil {
return false, ""
}
if service.Spec.Type != v1.ServiceTypeLoadBalancer {
return false, fmt.Sprintf("Type : %s", service.Spec.Type)
}
ltype := gce.GetLoadBalancerAnnotationType(service)
if ltype == gce.LBTypeInternal {
return true, fmt.Sprintf("Type : %s, LBType : %s", service.Spec.Type, ltype)
}
return false, fmt.Sprintf("Type : %s, LBType : %s", service.Spec.Type, ltype)
}

// ApplicationProtocols returns a map of port (name or number) to the protocol
Expand Down
4 changes: 3 additions & 1 deletion pkg/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ var (
EnableDeleteUnusedFrontends bool
EnableV2FrontendNamer bool
RunIngressController bool
RunL4Controller bool

LeaderElection LeaderElectionConfiguration
}{}
Expand Down Expand Up @@ -211,7 +212,8 @@ L7 load balancing. CSV values accepted. Example: -node-port-ranges=80,8080,400-5
flag.BoolVar(&F.EnableNonGCPMode, "enable-non-gcp-mode", false, "Set to true when running on a non-GCP cluster.")
flag.BoolVar(&F.EnableDeleteUnusedFrontends, "enable-delete-unused-frontends", false, "Enable deleting unused gce frontend resources.")
flag.BoolVar(&F.EnableV2FrontendNamer, "enable-v2-frontend-namer", false, "Enable v2 ingress frontend naming policy.")
flag.BoolVar(&F.RunIngressController, "run-ingress-controller", true, `Optional, whether or not to run IngressController as part of glbc. If set to false, only the L4ILB controller will be run, if the feature is enabled.`)
flag.BoolVar(&F.RunIngressController, "run-ingress-controller", true, `Optional, whether or not to run IngressController as part of glbc. If set to false, ingress resources will not be processed. Only the L4 Service controller will be run, if that flag is set to true.`)
flag.BoolVar(&F.RunL4Controller, "run-l4-controller", false, `Optional, whether or not to run L4 Service Controller as part of glbc. If set to true, services of Type:LoadBalancer with Internal annotation will be processed by this controller.`)
}

type RateLimitSpecs struct {
Expand Down
25 changes: 9 additions & 16 deletions pkg/neg/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,6 @@ import (
"k8s.io/ingress-gce/pkg/utils"
"k8s.io/ingress-gce/pkg/utils/common"
"k8s.io/klog"
"k8s.io/legacy-cloud-providers/gce"
)

var (
// Indicates whether to run NEG controller that processes L4 ILB services
runL4 bool
)

func init() {
Expand All @@ -75,7 +69,6 @@ type Controller struct {
ingressLister cache.Indexer
serviceLister cache.Indexer
client kubernetes.Interface
gceCloud *gce.Cloud
defaultBackendService utils.ServicePort
destinationRuleLister cache.Indexer
destinationRuleClient dynamic.NamespaceableResourceInterface
Expand All @@ -102,6 +95,9 @@ type Controller struct {

// collector collects NEG usage metrics
collector usage.NegMetricsCollector

// runL4 indicates whether to run NEG controller that processes L4 ILB services
runL4 bool
}

// NewController returns a network endpoint group controller.
Expand All @@ -114,6 +110,7 @@ func NewController(
gcPeriod time.Duration,
enableReadinessReflector bool,
runIngress bool,
runL4Controller bool,
) *Controller {
// init event recorder
// TODO: move event recorder initializer to main. Reuse it among controllers.
Expand All @@ -136,7 +133,6 @@ func NewController(

negController := &Controller{
client: ctx.KubeClient,
gceCloud: ctx.Cloud,
manager: manager,
resyncPeriod: resyncPeriod,
gcPeriod: gcPeriod,
Expand All @@ -153,10 +149,7 @@ func NewController(
syncTracker: utils.NewTimeTracker(),
reflector: reflector,
collector: ctx.ControllerMetrics,
}

if ctx.Cloud.AlphaFeatureGate != nil && ctx.Cloud.AlphaFeatureGate.Enabled(gce.AlphaFeatureILBSubsets) {
runL4 = true
runL4: runL4Controller,
}

if runIngress {
Expand Down Expand Up @@ -219,7 +212,7 @@ func NewController(
},
})

if runL4 {
if negController.runL4 {
ctx.NodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
node := obj.(*apiv1.Node)
Expand Down Expand Up @@ -415,7 +408,7 @@ func (c *Controller) processService(key string) error {
if err := svcPortInfoMap.Merge(csmSVCPortInfoMap); err != nil {
return fmt.Errorf("failed to merge CSM service PortInfoMap: %v, error: %v", csmSVCPortInfoMap, err)
}
if runL4 {
if c.runL4 {
if err := c.mergeVmPrimaryIpNEGsPortInfo(service, types.NamespacedName{Namespace: namespace, Name: name}, svcPortInfoMap); err != nil {
return err
}
Expand Down Expand Up @@ -502,10 +495,10 @@ func (c *Controller) mergeStandaloneNEGsPortInfo(service *apiv1.Service, name ty

// mergeVmPrimaryIpNEGsPortInfo merges the PortInfo for ILB services using GCE_VM_PRIMARY_IP NEGs into portInfoMap
func (c *Controller) mergeVmPrimaryIpNEGsPortInfo(service *apiv1.Service, name types.NamespacedName, portInfoMap negtypes.PortInfoMap) error {
if !annotations.ILBAnnotation(service) {
if wantsILB, _ := annotations.WantsL4ILB(service); !wantsILB {
return nil
}
if utils.IsLegacyL4ILBService(c.gceCloud, service) {
if utils.IsLegacyL4ILBService(service) {
msg := fmt.Sprintf("Ignoring ILB Service %s, namespace %s as it contains legacy resources created by service controller", service.Name, service.Namespace)
klog.Warning(msg)
c.recorder.Eventf(service, apiv1.EventTypeWarning, "ProcessServiceFailed", msg)
Expand Down
1 change: 1 addition & 0 deletions pkg/neg/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ func newTestController(kubeClient kubernetes.Interface) *Controller {
// TODO(freehan): enable readiness reflector for unit tests
false,
true,
true,
)
return controller
}
Expand Down
35 changes: 26 additions & 9 deletions pkg/neg/syncers/endpoints_calculator.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,14 @@ import (
"k8s.io/klog"
)

// LocalL4ILBEndpointGetter implements methods to calculate Network endpoints for VM_PRIMARY_IP NEGs when the service
// LocalL4ILBEndpointGetter implements the NetworkEndpointsCalculator interface.
// It exposes methods to calculate Network endpoints for VM_PRIMARY_IP NEGs when the service
// uses "ExternalTrafficPolicy: Local" mode.
// In this mode, the endpoints of the NEG are calculated by listing the nodes that host the service endpoints(pods)
// for the given service. These candidate nodes picked as is, if the count is less than the subset size limit(250).
// Otherwise, a subset of nodes is selected.
// In a cluster with nodes node1... node 50. If nodes node10 to node 45 run the pods for a given ILB service, all these
// nodes - node10, node 11 ... node45 will be part of the subset.
type LocalL4ILBEndpointsCalculator struct {
nodeLister listers.NodeLister
zoneGetter types.ZoneGetter
Expand All @@ -42,7 +48,7 @@ func NewLocalL4ILBEndpointsCalculator(nodeLister listers.NodeLister, zoneGetter
// CalculateEndpoints determines the endpoints in the NEGs based on the current service endpoints and the current NEGs.
func (l *LocalL4ILBEndpointsCalculator) CalculateEndpoints(ep *v1.Endpoints, currentMap map[string]types.NetworkEndpointSet) (map[string]types.NetworkEndpointSet, types.EndpointPodMap, error) {
// List all nodes where the service endpoints are running. Get a subset of the desired count.
nodeZoneMap := make(map[string][]*v1.Node)
zoneNodeMap := make(map[string][]*v1.Node)
nodeNames := sets.String{}
numEndpoints := 0
for _, curEp := range ep.Subsets {
Expand Down Expand Up @@ -70,18 +76,20 @@ func (l *LocalL4ILBEndpointsCalculator) CalculateEndpoints(ep *v1.Endpoints, cur
klog.Errorf("Unable to find zone for node %s, err %v, skipping", node.Name, err)
continue
}
nodeZoneMap[zone] = append(nodeZoneMap[zone], node)
zoneNodeMap[zone] = append(zoneNodeMap[zone], node)
}
}
if numEndpoints == 0 {
// TODO verify the behavior seen by a client when accessing an ILB whose NEGs have no endpoints.
return nil, nil, nil
}
// This denotes zones where the endpoint pods are running
numZones := len(nodeZoneMap)
numZones := len(zoneNodeMap)
perZoneCount := l.getPerZoneSubsetCount(numZones, numEndpoints)
// Compute the networkEndpoints, with endpointSet size in each zone being atmost `perZoneCount` in size
subsetMap, err := getSubsetPerZone(nodeZoneMap, perZoneCount, l.svcId, currentMap)
// TODO fix this logic to pick upto a total of l.SubsetSizeLimit if there are more than perZoneCount nodes in one
// zone and fewer in another.
subsetMap, err := getSubsetPerZone(zoneNodeMap, perZoneCount, l.svcId, currentMap)
return subsetMap, nil, err
}

Expand All @@ -103,13 +111,20 @@ func (l *LocalL4ILBEndpointsCalculator) getPerZoneSubsetCount(numZones, numEndpo
return numEndpoints / numZones
}

// ClusterL4ILBEndpointGetter implements methods to calculate Network endpoints for VM_PRIMARY_IP NEGs when the service
// ClusterL4ILBEndpointGetter implements the NetworkEndpointsCalculator interface.
// It exposes methods to calculate Network endpoints for VM_PRIMARY_IP NEGs when the service
// uses "ExternalTrafficPolicy: Cluster" mode This is the default mode.
// In this mode, the endpoints of the NEG are calculated by selecting nodes at random. Upto 25(subset size limit in this
// mode) are selected.
type ClusterL4ILBEndpointsCalculator struct {
nodeLister listers.NodeLister
zoneGetter types.ZoneGetter
// nodeLister is used for listing all the nodes in the cluster when calculating the subset.
nodeLister listers.NodeLister
// zoneGetter looks up the zone for a given node when calculating subsets.
zoneGetter types.ZoneGetter
// subsetSizeLimit is the max value of the subset size in this mode.
subsetSizeLimit int
svcId string
// svcId is the unique identifier for the service, that is used as a salt when hashing nodenames.
svcId string
}

func NewClusterL4ILBEndpointsCalculator(nodeLister listers.NodeLister, zoneGetter types.ZoneGetter, svcId string) *ClusterL4ILBEndpointsCalculator {
Expand Down Expand Up @@ -138,6 +153,8 @@ func (l *ClusterL4ILBEndpointsCalculator) CalculateEndpoints(ep *v1.Endpoints, c
// perZoneCount := l.getPerZoneSubsetCount(numZones, utils.NumEndpoints(ep))
perZoneCount := l.getPerZoneSubsetCount(numZones, 0)
// Compute the networkEndpoints, with endpointSet size in each zone being atmost `perZoneCount` in size
// TODO fix this logic to pick upto a total of l.SubsetSizeLimit if there are more than perZoneCount nodes in one
// zone and fewer in another.
subsetMap, err := getSubsetPerZone(nodeZoneMap, perZoneCount, l.svcId, currentMap)
return subsetMap, nil, err
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/neg/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,13 +200,16 @@ func (p1 PortInfoMap) Merge(p2 PortInfoMap) error {
if existingPortInfo.Subset != portInfo.Subset {
return fmt.Errorf("for service port %v, Subset name in existing map is %q, but the merge map has %q", mapKey, existingPortInfo.Subset, portInfo.Subset)
}
if existingPortInfo.RandomizeEndpoints != portInfo.RandomizeEndpoints {
return fmt.Errorf("For service port %v, Existing map has RandomizeEndpoints %v, but the merge map has %v", mapKey, existingPortInfo.RandomizeEndpoints, portInfo.RandomizeEndpoints)
}
mergedInfo.ReadinessGate = existingPortInfo.ReadinessGate
}
mergedInfo.PortTuple = portInfo.PortTuple
mergedInfo.NegName = portInfo.NegName
// Turn on the readiness gate if one of them is on
mergedInfo.ReadinessGate = mergedInfo.ReadinessGate || portInfo.ReadinessGate
mergedInfo.RandomizeEndpoints = mergedInfo.RandomizeEndpoints || portInfo.RandomizeEndpoints
mergedInfo.RandomizeEndpoints = portInfo.RandomizeEndpoints
mergedInfo.Subset = portInfo.Subset
mergedInfo.SubsetLabels = portInfo.SubsetLabels

Expand Down
11 changes: 11 additions & 0 deletions pkg/utils/namer/namer.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,17 @@ func (n *Namer) NEGWithSubset(namespace, name, subset string, port int32) string
return fmt.Sprintf("%s-%s-%s-%s-%s-%s", n.negPrefix(), truncNamespace, truncName, truncPort, truncSubset, negSuffix(n.shortUID(), namespace, name, portStr, subset))
}

// PrimaryIPNEG returns the gce neg name based on the service namespace and name
// NEG naming convention:
//
// {prefix}{version}-{clusterid}-{namespace}-{name}-{hash}
//
// Output name is at most 63 characters. NEG tries to keep as much
// information as possible.
//
// WARNING: Controllers depend on the naming pattern to get the list
// of all NEGs associated with the current cluster. Any modifications
// must be backward compatible.
func (n *Namer) PrimaryIPNEG(namespace, name string) string {
truncFields := TrimFieldsEvenly(maxNEGDescriptiveLabel, namespace, name)
truncNamespace := truncFields[0]
Expand Down
3 changes: 1 addition & 2 deletions pkg/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
"k8s.io/ingress-gce/pkg/utils/common"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/util/node"
"k8s.io/legacy-cloud-providers/gce"
)

const (
Expand Down Expand Up @@ -431,7 +430,7 @@ func NumEndpoints(ep *api_v1.Endpoints) (result int) {
}

// IsLegacyL4ILBService returns true if the given LoadBalancer service is managed by service controller.
func IsLegacyL4ILBService(g *gce.Cloud, svc *api_v1.Service) bool {
func IsLegacyL4ILBService(svc *api_v1.Service) bool {
for _, key := range svc.ObjectMeta.Finalizers {
if key == common.FinalizerKeyL4V1 {
// service has v1 finalizer, this is handled by service controller code.
Expand Down
4 changes: 2 additions & 2 deletions pkg/utils/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -772,13 +772,13 @@ func TestIsLegacyL4ILBService(t *testing.T) {
},
}
g := gce.NewFakeGCECloud(gce.DefaultTestClusterValues())
if !IsLegacyL4ILBService(g, svc) {
if !IsLegacyL4ILBService(svc) {
t.Errorf("Expected True for Legacy service %s, got False", svc.Name)
}

// Remove the finalizer and ensure the check returns False.
svc.ObjectMeta.Finalizers = nil
if IsLegacyL4ILBService(g, svc) {
if IsLegacyL4ILBService(svc) {
t.Errorf("Expected False for Legacy service %s, got True", svc.Name)
}

Expand Down

0 comments on commit 40eec7f

Please sign in to comment.