Skip to content

Commit

Permalink
Address vagababov comments
Browse files Browse the repository at this point in the history
  • Loading branch information
JRBANCEL committed Jul 15, 2019
1 parent a7baff7 commit e3906a2
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 26 deletions.
5 changes: 2 additions & 3 deletions pkg/apis/networking/v1alpha1/ingress_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,8 @@ func (is *IngressStatus) MarkLoadBalancerReady(lbs []LoadBalancerIngressStatus,
// MarkLoadBalancerPending marks the "IngressConditionLoadBalancerReady" condition to unknown to
// reflect that the load balancer is not ready yet.
func (is *IngressStatus) MarkLoadBalancerPending() {
reason := "Uninitialized"
msg := "Waiting for VirtualService to be ready"
ingressCondSet.Manage(is).MarkUnknown(IngressConditionLoadBalancerReady, reason, msg)
ingressCondSet.Manage(is).MarkUnknown(IngressConditionLoadBalancerReady, "Uninitialized",
"Waiting for VirtualService to be ready")
}

// IsReady looks at the conditions and if the Status has a condition
Expand Down
3 changes: 2 additions & 1 deletion pkg/reconciler/clusteringress/clusteringress.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ func (c *Reconciler) Init(ctx context.Context, cmw configmap.Watcher, impl *cont
resyncIngressOnVirtualServiceReady := func(vs *v1alpha3.VirtualService) {
impl.EnqueueLabelOfClusterScopedResource(networking.ClusterIngressLabelKey)(vs)
}
c.BaseIngressReconciler.StatusManager = ing.NewStatusManager(gatewayInformer.Lister(), podInformer.Lister(), network.NewAutoTransport, resyncIngressOnVirtualServiceReady)
c.BaseIngressReconciler.StatusManager = ing.NewStatusManager(c.Logger.Named("status-manager"), gatewayInformer.Lister(),
podInformer.Lister(), network.NewAutoTransport, resyncIngressOnVirtualServiceReady)
c.BaseIngressReconciler.StatusManager.Start(ctx.Done())
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/reconciler/ingress/resources/virtual_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/knative/serving/pkg/network"
"github.com/knative/serving/pkg/reconciler/ingress/resources/names"
"github.com/knative/serving/pkg/resources"
"github.com/pkg/errors"
istiov1alpha1 "knative.dev/pkg/apis/istio/common/v1alpha1"
"knative.dev/pkg/apis/istio/v1alpha3"
"knative.dev/pkg/kmeta"
Expand Down Expand Up @@ -130,7 +131,7 @@ func MakeVirtualServices(ia v1alpha1.IngressAccessor, gateways map[v1alpha1.Ingr
for _, vs := range vss {
hash, err := computeVirtualServiceHash(vs)
if err != nil {
return nil, fmt.Errorf("failed to compute VirtualService hash: %v", err)
return nil, errors.Wrapf(err, "failed to compute the hash of %s/%s", vs.Namespace, vs.Name)
}
// As per RFC1153, a DNS label must be under 63 8-bit octets
host := fmt.Sprintf("%x", hash[:31]) + ProbeHostSuffix
Expand Down
61 changes: 40 additions & 21 deletions pkg/reconciler/ingress/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,17 @@ package ingress

import (
"context"
"errors"
"fmt"
"net/http"
"reflect"
"strings"
"sync"
"sync/atomic"
"time"

"go.uber.org/zap"

"k8s.io/apimachinery/pkg/util/wait"

"github.com/knative/serving/pkg/network/prober"
Expand All @@ -24,15 +28,21 @@ import (
)

const (
probeConcurrency = 15 // defines how many probing calls can be issued simultaneously
stateExpiration = 5 * time.Minute // defines how long after being last accessed a state expires
// probeConcurrency defines how many probing calls can be issued simultaneously
probeConcurrency = 15
// stateExpiration defines how long after being last accessed a state expires
stateExpiration = 5 * time.Minute
// cleanupPeriod defines how often states are cleaned up
cleanupPeriod = 1 * time.Minute
)

type probingState struct {
// probeHost is the value of the HTTP 'Host' header sent when probing
probeHost string
virtualService *v1alpha3.VirtualService
probeHost string // the value of the HTTP 'Host' header sent when probing

pendingCount int32 // the number of pod that haven't been successfully probed yet (0 means complete success)
// pendingCount is the number of pods that haven't been successfully probed yet
pendingCount int32
lastAccessed time.Time

context context.Context
Expand All @@ -46,6 +56,8 @@ type workItem struct {

// StatusManager provides a way to check if a VirtualService is ready
type StatusManager struct {
logger *zap.SugaredLogger

// mu guards probingStates
mu sync.Mutex
probingStates map[string]*probingState
Expand All @@ -61,11 +73,13 @@ type StatusManager struct {

// NewStatusManager creates a new instance of StatusManager
func NewStatusManager(
logger *zap.SugaredLogger,
gatewayLister istiolisters.GatewayLister,
podLister corev1listers.PodLister,
transportFactory prober.TransportFactory,
readyCallback func(*v1alpha3.VirtualService)) *StatusManager {
return &StatusManager{
logger: logger,
probingStates: make(map[string]*probingState),
workQueue: workqueue.NewNamedRateLimitingQueue(
workqueue.DefaultControllerRateLimiter(),
Expand Down Expand Up @@ -96,22 +110,28 @@ func (m *StatusManager) IsReady(vs *v1alpha3.VirtualService) (bool, error) {
}
}
if probeHost == "" {
return false, fmt.Errorf("only a VirtualService with a probe can be probed. Hosts: %v", vs.Spec.Hosts)
m.logger.Errorf("the provided VirtualService doesn't contain a probe host. Hosts: %v", vs.Spec.Hosts)
return false, errors.New("only a VirtualService with a probe host can be probed")
}

m.mu.Lock()
if state, ok := m.probingStates[key]; ok {
if state.probeHost == probeHost {
m.mu.Unlock()
state.lastAccessed = time.Now()
return atomic.LoadInt32(&state.pendingCount) == 0, nil
}
ready, ok := func() (bool, bool) {
m.mu.Lock()
defer m.mu.Unlock()
if state, ok := m.probingStates[key]; ok {
if state.probeHost == probeHost {
state.lastAccessed = time.Now()
return atomic.LoadInt32(&state.pendingCount) == 0, true
}

// Cancel the polling for the outdated version
state.cancel()
delete(m.probingStates, key)
// Cancel the polling for the outdated version
state.cancel()
delete(m.probingStates, key)
}
return false, false
}()
if ok {
return ready, nil
}
m.mu.Unlock()

podIPs, err := m.listVirtualServicePodIPs(vs)
if err != nil {
Expand Down Expand Up @@ -156,7 +176,7 @@ func (m *StatusManager) Start(done <-chan struct{}) {

// Cleanup the states periodically
go func() {
wait.Until(m.expireOldStates, time.Minute, done)
wait.Until(m.expireOldStates, cleanupPeriod, done)
}()

// Stop processing the queue when cancelled
Expand Down Expand Up @@ -188,17 +208,16 @@ func (m *StatusManager) processWorkItem() bool {

defer m.workQueue.Done(obj)

// Discard the item if it is not of the expected type
// Crash if the item is not of the expected type
item, ok := obj.(*workItem)
if !ok {
m.workQueue.Forget(obj)
return true
m.logger.Fatalf("unexpected work item type: want: %s, got: %s\n", "*workItem", reflect.TypeOf(obj).Name())
}

ok, err := prober.Do(
item.context,
m.transportFactory(),
fmt.Sprintf("http://%s", item.podIP),
fmt.Sprintf("http://%s/", item.podIP),
prober.WithHost(item.probeHost))

// In case of cancellation, drop the work item
Expand Down

0 comments on commit e3906a2

Please sign in to comment.