diff --git a/examples/priorityqueue/main.go b/examples/priorityqueue/main.go index 2b09432f22..8dacdcc9a3 100644 --- a/examples/priorityqueue/main.go +++ b/examples/priorityqueue/main.go @@ -22,6 +22,7 @@ import ( "os" "time" + "go.uber.org/zap/zapcore" corev1 "k8s.io/api/core/v1" "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/builder" @@ -45,7 +46,9 @@ func main() { } func run() error { - log.SetLogger(zap.New()) + log.SetLogger(zap.New(func(o *zap.Options) { + o.Level = zapcore.Level(-5) + })) // Setup a Manager mgr, err := manager.New(kubeconfig.GetConfigOrDie(), manager.Options{ diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index b7d7286033..5c5b249ef5 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -202,6 +202,7 @@ func NewTypedUnmanaged[request comparable](name string, mgr manager.Manager, opt options.NewQueue = func(controllerName string, rateLimiter workqueue.TypedRateLimiter[request]) workqueue.TypedRateLimitingInterface[request] { if ptr.Deref(mgr.GetControllerOptions().UsePriorityQueue, false) { return priorityqueue.New(controllerName, func(o *priorityqueue.Opts[request]) { + o.Log = mgr.GetLogger().WithValues("controller", controllerName) o.RateLimiter = rateLimiter }) } diff --git a/pkg/controller/priorityqueue/priorityqueue.go b/pkg/controller/priorityqueue/priorityqueue.go index 2b3a8904d7..2240e115e5 100644 --- a/pkg/controller/priorityqueue/priorityqueue.go +++ b/pkg/controller/priorityqueue/priorityqueue.go @@ -5,6 +5,7 @@ import ( "sync/atomic" "time" + "github.com/go-logr/logr" "github.com/google/btree" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/util/workqueue" @@ -36,6 +37,7 @@ type Opts[T comparable] struct { // limiter with an initial delay of five milliseconds and a max delay of 1000 seconds. RateLimiter workqueue.TypedRateLimiter[T] MetricProvider workqueue.MetricsProvider + Log logr.Logger } // Opt allows to configure a PriorityQueue. @@ -57,6 +59,7 @@ func New[T comparable](name string, o ...Opt[T]) PriorityQueue[T] { } pq := &priorityqueue[T]{ + log: opts.Log, items: map[T]*item[T]{}, queue: btree.NewG(32, less[T]), becameReady: sets.Set[T]{}, @@ -75,6 +78,7 @@ func New[T comparable](name string, o ...Opt[T]) PriorityQueue[T] { } go pq.spin() + go pq.logState() if _, ok := pq.metrics.(noMetrics[T]); !ok { go pq.updateUnfinishedWorkLoop() } @@ -83,6 +87,7 @@ func New[T comparable](name string, o ...Opt[T]) PriorityQueue[T] { } type priorityqueue[T comparable] struct { + log logr.Logger // lock has to be acquired for any access any of items, queue, addedCounter // or becameReady lock sync.Mutex @@ -141,14 +146,14 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) { } if _, ok := w.items[key]; !ok { item := &item[T]{ - key: key, - addedCounter: w.addedCounter, - priority: o.Priority, - readyAt: readyAt, + Key: key, + AddedCounter: w.addedCounter, + Priority: o.Priority, + ReadyAt: readyAt, } w.items[key] = item w.queue.ReplaceOrInsert(item) - if item.readyAt == nil { + if item.ReadyAt == nil { w.metrics.add(key) } w.addedCounter++ @@ -158,15 +163,15 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) { // The b-tree de-duplicates based on ordering and any change here // will affect the order - Just delete and re-add. item, _ := w.queue.Delete(w.items[key]) - if o.Priority > item.priority { - item.priority = o.Priority + if o.Priority > item.Priority { + item.Priority = o.Priority } - if item.readyAt != nil && (readyAt == nil || readyAt.Before(*item.readyAt)) { + if item.ReadyAt != nil && (readyAt == nil || readyAt.Before(*item.ReadyAt)) { if readyAt == nil { w.metrics.add(key) } - item.readyAt = readyAt + item.ReadyAt = readyAt } w.queue.ReplaceOrInsert(item) @@ -210,14 +215,14 @@ func (w *priorityqueue[T]) spin() { // track what we want to delete and do it after we are done ascending. var toDelete []*item[T] w.queue.Ascend(func(item *item[T]) bool { - if item.readyAt != nil { - if readyAt := item.readyAt.Sub(w.now()); readyAt > 0 { + if item.ReadyAt != nil { + if readyAt := item.ReadyAt.Sub(w.now()); readyAt > 0 { nextReady = w.tick(readyAt) return false } - if !w.becameReady.Has(item.key) { - w.metrics.add(item.key) - w.becameReady.Insert(item.key) + if !w.becameReady.Has(item.Key) { + w.metrics.add(item.Key) + w.becameReady.Insert(item.Key) } } @@ -228,16 +233,16 @@ func (w *priorityqueue[T]) spin() { } // Item is locked, we can not hand it out - if w.locked.Has(item.key) { + if w.locked.Has(item.Key) { return true } - w.metrics.get(item.key) - w.locked.Insert(item.key) + w.metrics.get(item.Key) + w.locked.Insert(item.Key) w.waiters.Add(-1) - delete(w.items, item.key) + delete(w.items, item.Key) toDelete = append(toDelete, item) - w.becameReady.Delete(item.key) + w.becameReady.Delete(item.Key) w.get <- *item return true @@ -268,7 +273,7 @@ func (w *priorityqueue[T]) GetWithPriority() (_ T, priority int, shutdown bool) w.notifyItemOrWaiterAdded() item := <-w.get - return item.key, item.priority, w.shutdown.Load() + return item.Key, item.Priority, w.shutdown.Load() } func (w *priorityqueue[T]) Get() (item T, shutdown bool) { @@ -316,7 +321,7 @@ func (w *priorityqueue[T]) Len() int { var result int w.queue.Ascend(func(item *item[T]) bool { - if item.readyAt == nil || item.readyAt.Compare(w.now()) <= 0 { + if item.ReadyAt == nil || item.ReadyAt.Compare(w.now()) <= 0 { result++ return true } @@ -326,36 +331,64 @@ func (w *priorityqueue[T]) Len() int { return result } +func (w *priorityqueue[T]) logState() { + t := time.Tick(10 * time.Second) + for { + select { + case <-w.done: + return + case <-t: + } + + // Log level may change at runtime, so keep the + // loop going even if a given level is currently + // not enabled. + if !w.log.V(5).Enabled() { + continue + } + w.lock.Lock() + items := make([]*item[T], 0, len(w.items)) + w.queue.Ascend(func(item *item[T]) bool { + items = append(items, item) + return true + }) + w.lock.Unlock() + + w.log.V(5).Info("workqueue_items", "items", items) + } +} + func less[T comparable](a, b *item[T]) bool { - if a.readyAt == nil && b.readyAt != nil { + if a.ReadyAt == nil && b.ReadyAt != nil { return true } - if b.readyAt == nil && a.readyAt != nil { + if b.ReadyAt == nil && a.ReadyAt != nil { return false } - if a.readyAt != nil && b.readyAt != nil && !a.readyAt.Equal(*b.readyAt) { - return a.readyAt.Before(*b.readyAt) + if a.ReadyAt != nil && b.ReadyAt != nil && !a.ReadyAt.Equal(*b.ReadyAt) { + return a.ReadyAt.Before(*b.ReadyAt) } - if a.priority != b.priority { - return a.priority > b.priority + if a.Priority != b.Priority { + return a.Priority > b.Priority } - return a.addedCounter < b.addedCounter + return a.AddedCounter < b.AddedCounter } type item[T comparable] struct { - key T - addedCounter uint64 - priority int - readyAt *time.Time + Key T `json:"key"` + AddedCounter uint64 `json:"addedCounter"` + Priority int `json:"priority"` + ReadyAt *time.Time `json:"readyAt,omitempty"` } func (w *priorityqueue[T]) updateUnfinishedWorkLoop() { - t := time.NewTicker(500 * time.Millisecond) // borrowed from workqueue: https://github.com/kubernetes/kubernetes/blob/67a807bf142c7a2a5ecfdb2a5d24b4cdea4cc79c/staging/src/k8s.io/client-go/util/workqueue/queue.go#L182 - defer t.Stop() - for range t.C { - if w.shutdown.Load() { + t := time.Tick(500 * time.Millisecond) // borrowed from workqueue: https://github.com/kubernetes/kubernetes/blob/67a807bf142c7a2a5ecfdb2a5d24b4cdea4cc79c/staging/src/k8s.io/client-go/util/workqueue/queue.go#L182 + for { + select { + case <-w.done: return + case <-t: } w.metrics.updateUnfinishedWork() }