Skip to content


enable readiness reflector for Standalone NEG
Browse files Browse the repository at this point in the history
  • Loading branch information
freehan committed Nov 12, 2019
1 parent 011cb28 commit 8fb6d48
Show file tree
Hide file tree
Showing 6 changed files with 344 additions and 134 deletions.
2 changes: 1 addition & 1 deletion pkg/neg/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ func (c *Controller) mergeStandaloneNEGsPortInfo(service *apiv1.Service, name ty
return err

if err := portInfoMap.Merge(negtypes.NewPortInfoMap(name.Namespace, name.Name, exposedNegSvcPort, c.namer, false)); err != nil {
if err := portInfoMap.Merge(negtypes.NewPortInfoMap(name.Namespace, name.Name, exposedNegSvcPort, c.namer /*readinessGate*/, true)); err != nil {
return fmt.Errorf("failed to merge service ports exposed as standalone NEGs (%v) into ingress referenced service ports (%v): %v", exposedNegSvcPort, portInfoMap, err)
Expand Down
145 changes: 102 additions & 43 deletions pkg/neg/readiness/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@ package readiness

import (
utilerrors ""
negtypes ""

Expand All @@ -48,10 +49,11 @@ func (n negMeta) String() string {

// podStatusPatcher interface allows patching pod status
type podStatusPatcher interface {
// syncPod patches the neg condition in the pod status to be True.
// key is the key to the pod. It is the namespaced name in the format of "namespace/name"
// negName is the name of the NEG resource
syncPod(key, negName string) error
// syncPod syncs the NEG readiness gate condition of the given pod.
// podKey is the key to the pod. It is the namespaced name in the format of "namespace/name"
// neg is the key of the NEG resource
// backendService is the key of the BackendService resource.
syncPod(podKey string, neg, backendService *meta.Key) error

// pollTarget is the target for polling
Expand Down Expand Up @@ -135,69 +137,126 @@ func (p *poller) Poll(key negMeta) (retry bool, err error) {
defer p.unMarkPolling(key)

// TODO(freehan): refactor errList from pkg/neg/syncers to be reused here
var errList []error
klog.V(2).Infof("polling NEG %q in zone %q", key.Name, key.Zone)
// TODO(freehan): filter the NEs that are in interest once the API supports it
res, err := p.negCloud.ListNetworkEndpoints(key.Name, key.Zone /*showHealthStatus*/, true, key.SyncerKey.GetAPIVersion())
if err != nil {
return true, err

// Traverse the response and check if the endpoints in interest are HEALTHY
func() {
defer p.lock.Unlock()
var healthyCount int
for _, r := range res {
healthy, err := p.processHealthStatus(key, r)
if healthy && err == nil {
return p.processHealthStatus(key, res)

// processHealthStatus updates Pod readiness gates based on the input health status response.
// We update the pod (using the patcher) when:
// 1. if the endpoint considered healthy with one of the backend service health check
// 2. if the NEG is not associated with any health checks
// It returns true if retry is needed.
func (p *poller) processHealthStatus(key negMeta, healthStatuses []*composite.NetworkEndpointWithHealthStatus) (bool, error) {
defer p.lock.Unlock()
klog.V(4).Infof("processHealthStatus(%q, %+v)", key.String(), healthStatuses)

var (
errList []error
// healthChecked indicates whether at least one of the endpoint in response has health status.
// If a NEG is attached to a Backend Service with health check, all endpoints
// in the NEG will be health checked. However, for new endpoints, it may take a while for the
// health check to start. The assumption is that if at least one of the endpoint in a NEG has
// health status, all the endpoints in the NEG should have health status eventually.
healthChecked bool
// patchCount is the count of the pod got patched
patchCount int
unhealthyPods []types.NamespacedName

for _, healthStatus := range healthStatuses {
if healthStatus == nil {
klog.Warningf("healthStatus is nil from response %+v", healthStatuses)

if healthStatus.NetworkEndpoint == nil {
klog.Warningf("Health status has nil associated network endpoint: %v", healthStatus)

healthChecked = healthChecked || hasHealthStatus(healthStatus)

ne := negtypes.NetworkEndpoint{
IP: healthStatus.NetworkEndpoint.IpAddress,
Port: strconv.FormatInt(healthStatus.NetworkEndpoint.Port, 10),
Node: healthStatus.NetworkEndpoint.Instance,

podName, ok := p.getPod(key, ne)
if !ok {
// The pod is not in interest. Skip

bsKey := getHealthyBackendService(healthStatus)
if bsKey == nil {
unhealthyPods = append(unhealthyPods, podName)
} else {
err := p.patcher.syncPod(keyFunc(podName.Namespace, podName.Name), meta.ZonalKey(key.Name, key.Zone), bsKey)
if err != nil {
errList = append(errList, err)
} else {
if healthyCount != len(p.pollMap[key].endpointMap) {
retry = true
return retry, utilerrors.NewAggregate(errList)

// processHealthStatus evaluates the health status of the input network endpoint.
// Assumes p.lock is held when calling this method.
func (p *poller) processHealthStatus(key negMeta, healthStatus *composite.NetworkEndpointWithHealthStatus) (healthy bool, err error) {
ne := negtypes.NetworkEndpoint{
IP: healthStatus.NetworkEndpoint.IpAddress,
Port: strconv.FormatInt(healthStatus.NetworkEndpoint.Port, 10),
Node: healthStatus.NetworkEndpoint.Instance,
podName, ok := p.getPod(key, ne)
if !ok {
return false, nil

// if the NEG is not health checked, signal the patcher to mark the unhealthy pods to be Ready.
// This is most likely due to health check is not configured for the NEG. Hence none of the endpoints
// in the NEG has health status.
if !healthChecked {
for _, podName := range unhealthyPods {
err := p.patcher.syncPod(keyFunc(podName.Namespace, podName.Name), meta.ZonalKey(key.Name, key.Zone), nil)
if err != nil {
errList = append(errList, err)
} else {

// If we didn't patch all of the endpoints, we must keep polling for health status
return patchCount < len(p.pollMap[key].endpointMap), utilerrors.NewAggregate(errList)

// getHealthyBackendService returns one of the first backend service key where the endpoint is considered healthy.
func getHealthyBackendService(healthStatus *composite.NetworkEndpointWithHealthStatus) *meta.Key {
for _, hs := range healthStatus.Healths {
if hs == nil {
if hs.BackendService == nil {
klog.Warningf("Backend service is nil in health status of network endpoint %v: %v", ne, hs)
klog.Errorf("Backend service is nil in health status of network endpoint %v", healthStatus)

// This assumes the ingress backend service uses the NEG naming scheme. Hence the backend service share the same name as NEG.
if strings.Contains(hs.BackendService.BackendService, key.Name) {
if hs.HealthState == healthyState {
healthy = true
err := p.patcher.syncPod(keyFunc(podName.Namespace, podName.Name), key.Name)
return healthy, err
if hs.HealthState == healthyState {
id, err := cloud.ParseResourceURL(hs.BackendService.BackendService)
if err != nil {
klog.Errorf("Failed to parse backend service reference from a Network Endpoint health status %v: %v", healthStatus, err)
if id != nil {
return id.Key

return false, nil
return nil

// hasHealthStatus returns true if there is at least 1 health status associated with the endpoint.
func hasHealthStatus(healthStatus *composite.NetworkEndpointWithHealthStatus) bool {
return healthStatus != nil && len(healthStatus.Healths) > 0

// getPod returns the namespaced name of a pod corresponds to an endpoint and whether the pod is registered
Expand Down

0 comments on commit 8fb6d48

Please sign in to comment.