diff --git a/pkg/apis/networking/v1alpha1/ingress_lifecycle.go b/pkg/apis/networking/v1alpha1/ingress_lifecycle.go index 109dbd70e447..22a2255c2f29 100644 --- a/pkg/apis/networking/v1alpha1/ingress_lifecycle.go +++ b/pkg/apis/networking/v1alpha1/ingress_lifecycle.go @@ -74,9 +74,8 @@ func (is *IngressStatus) MarkLoadBalancerReady(lbs []LoadBalancerIngressStatus, // MarkLoadBalancerPending marks the "IngressConditionLoadBalancerReady" condition to unknown to // reflect that the load balancer is not ready yet. func (is *IngressStatus) MarkLoadBalancerPending() { - reason := "Uninitialized" - msg := "Waiting for VirtualService to be ready" - ingressCondSet.Manage(is).MarkUnknown(IngressConditionLoadBalancerReady, reason, msg) + ingressCondSet.Manage(is).MarkUnknown(IngressConditionLoadBalancerReady, "Uninitialized", + "Waiting for VirtualService to be ready") } // IsReady looks at the conditions and if the Status has a condition diff --git a/pkg/reconciler/clusteringress/clusteringress.go b/pkg/reconciler/clusteringress/clusteringress.go index ec06e4a69d81..4dc8cf4e06fd 100644 --- a/pkg/reconciler/clusteringress/clusteringress.go +++ b/pkg/reconciler/clusteringress/clusteringress.go @@ -115,7 +115,8 @@ func (c *Reconciler) Init(ctx context.Context, cmw configmap.Watcher, impl *cont resyncIngressOnVirtualServiceReady := func(vs *v1alpha3.VirtualService) { impl.EnqueueLabelOfClusterScopedResource(networking.ClusterIngressLabelKey)(vs) } - c.BaseIngressReconciler.StatusManager = ing.NewStatusManager(gatewayInformer.Lister(), podInformer.Lister(), network.NewAutoTransport, resyncIngressOnVirtualServiceReady) + c.BaseIngressReconciler.StatusManager = ing.NewStatusManager(c.Logger.Named("status-manager"), gatewayInformer.Lister(), + podInformer.Lister(), network.NewAutoTransport, resyncIngressOnVirtualServiceReady) c.BaseIngressReconciler.StatusManager.Start(ctx.Done()) } diff --git a/pkg/reconciler/ingress/resources/virtual_service.go b/pkg/reconciler/ingress/resources/virtual_service.go index d9d4320f68f2..02381a3ce5bc 100644 --- a/pkg/reconciler/ingress/resources/virtual_service.go +++ b/pkg/reconciler/ingress/resources/virtual_service.go @@ -33,6 +33,7 @@ import ( "github.com/knative/serving/pkg/network" "github.com/knative/serving/pkg/reconciler/ingress/resources/names" "github.com/knative/serving/pkg/resources" + "github.com/pkg/errors" istiov1alpha1 "knative.dev/pkg/apis/istio/common/v1alpha1" "knative.dev/pkg/apis/istio/v1alpha3" "knative.dev/pkg/kmeta" @@ -130,7 +131,7 @@ func MakeVirtualServices(ia v1alpha1.IngressAccessor, gateways map[v1alpha1.Ingr for _, vs := range vss { hash, err := computeVirtualServiceHash(vs) if err != nil { - return nil, fmt.Errorf("failed to compute VirtualService hash: %v", err) + return nil, errors.Wrapf(err, "failed to compute the hash of %s/%s", vs.Namespace, vs.Name) } // As per RFC1153, a DNS label must be under 63 8-bit octets host := fmt.Sprintf("%x", hash[:31]) + ProbeHostSuffix diff --git a/pkg/reconciler/ingress/status.go b/pkg/reconciler/ingress/status.go index 1dd7648f304d..e775b2de9633 100644 --- a/pkg/reconciler/ingress/status.go +++ b/pkg/reconciler/ingress/status.go @@ -2,13 +2,17 @@ package ingress import ( "context" + "errors" "fmt" "net/http" + "reflect" "strings" "sync" "sync/atomic" "time" + "go.uber.org/zap" + "k8s.io/apimachinery/pkg/util/wait" "github.com/knative/serving/pkg/network/prober" @@ -24,15 +28,21 @@ import ( ) const ( - probeConcurrency = 15 // defines how many probing calls can be issued simultaneously - stateExpiration = 5 * time.Minute // defines how long after being last accessed a state expires + // probeConcurrency defines how many probing calls can be issued simultaneously + probeConcurrency = 15 + // stateExpiration defines how long after being last accessed a state expires + stateExpiration = 5 * time.Minute + // cleanupPeriod defines how often states are cleaned up + cleanupPeriod = 1 * time.Minute ) type probingState struct { + // probeHost is the value of the HTTP 'Host' header sent when probing + probeHost string virtualService *v1alpha3.VirtualService - probeHost string // the value of the HTTP 'Host' header sent when probing - pendingCount int32 // the number of pod that haven't been successfully probed yet (0 means complete success) + // pendingCount is the number of pods that haven't been successfully probed yet + pendingCount int32 lastAccessed time.Time context context.Context @@ -46,6 +56,8 @@ type workItem struct { // StatusManager provides a way to check if a VirtualService is ready type StatusManager struct { + logger *zap.SugaredLogger + // mu guards probingStates mu sync.Mutex probingStates map[string]*probingState @@ -61,11 +73,13 @@ type StatusManager struct { // NewStatusManager creates a new instance of StatusManager func NewStatusManager( + logger *zap.SugaredLogger, gatewayLister istiolisters.GatewayLister, podLister corev1listers.PodLister, transportFactory prober.TransportFactory, readyCallback func(*v1alpha3.VirtualService)) *StatusManager { return &StatusManager{ + logger: logger, probingStates: make(map[string]*probingState), workQueue: workqueue.NewNamedRateLimitingQueue( workqueue.DefaultControllerRateLimiter(), @@ -96,22 +110,28 @@ func (m *StatusManager) IsReady(vs *v1alpha3.VirtualService) (bool, error) { } } if probeHost == "" { - return false, fmt.Errorf("only a VirtualService with a probe can be probed. Hosts: %v", vs.Spec.Hosts) + m.logger.Errorf("the provided VirtualService doesn't contain a probe host. Hosts: %v", vs.Spec.Hosts) + return false, errors.New("only a VirtualService with a probe host can be probed") } - m.mu.Lock() - if state, ok := m.probingStates[key]; ok { - if state.probeHost == probeHost { - m.mu.Unlock() - state.lastAccessed = time.Now() - return atomic.LoadInt32(&state.pendingCount) == 0, nil - } + ready, ok := func() (bool, bool) { + m.mu.Lock() + defer m.mu.Unlock() + if state, ok := m.probingStates[key]; ok { + if state.probeHost == probeHost { + state.lastAccessed = time.Now() + return atomic.LoadInt32(&state.pendingCount) == 0, true + } - // Cancel the polling for the outdated version - state.cancel() - delete(m.probingStates, key) + // Cancel the polling for the outdated version + state.cancel() + delete(m.probingStates, key) + } + return false, false + }() + if ok { + return ready, nil } - m.mu.Unlock() podIPs, err := m.listVirtualServicePodIPs(vs) if err != nil { @@ -156,7 +176,7 @@ func (m *StatusManager) Start(done <-chan struct{}) { // Cleanup the states periodically go func() { - wait.Until(m.expireOldStates, time.Minute, done) + wait.Until(m.expireOldStates, cleanupPeriod, done) }() // Stop processing the queue when cancelled @@ -188,17 +208,16 @@ func (m *StatusManager) processWorkItem() bool { defer m.workQueue.Done(obj) - // Discard the item if it is not of the expected type + // Crash if the item is not of the expected type item, ok := obj.(*workItem) if !ok { - m.workQueue.Forget(obj) - return true + m.logger.Fatalf("unexpected work item type: want: %s, got: %s\n", "*workItem", reflect.TypeOf(obj).Name()) } ok, err := prober.Do( item.context, m.transportFactory(), - fmt.Sprintf("http://%s", item.podIP), + fmt.Sprintf("http://%s/", item.podIP), prober.WithHost(item.probeHost)) // In case of cancellation, drop the work item