Skip to content

Commit

Permalink
Merge pull request #3088 from k8s-infra-cherrypick-robot/cherry-pick-…
Browse files Browse the repository at this point in the history
…3075-to-release-0.20

[release-0.20] 🌱 Add debug logging for the state of the priority queue
  • Loading branch information
k8s-ci-robot authored Jan 23, 2025
2 parents f33705e + 99a4044 commit 791b6c9
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 37 deletions.
5 changes: 4 additions & 1 deletion examples/priorityqueue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"os"
"time"

"go.uber.org/zap/zapcore"
corev1 "k8s.io/api/core/v1"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/builder"
Expand All @@ -45,7 +46,9 @@ func main() {
}

func run() error {
log.SetLogger(zap.New())
log.SetLogger(zap.New(func(o *zap.Options) {
o.Level = zapcore.Level(-5)
}))

// Setup a Manager
mgr, err := manager.New(kubeconfig.GetConfigOrDie(), manager.Options{
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ func NewTypedUnmanaged[request comparable](name string, mgr manager.Manager, opt
options.NewQueue = func(controllerName string, rateLimiter workqueue.TypedRateLimiter[request]) workqueue.TypedRateLimitingInterface[request] {
if ptr.Deref(mgr.GetControllerOptions().UsePriorityQueue, false) {
return priorityqueue.New(controllerName, func(o *priorityqueue.Opts[request]) {
o.Log = mgr.GetLogger().WithValues("controller", controllerName)
o.RateLimiter = rateLimiter
})
}
Expand Down
105 changes: 69 additions & 36 deletions pkg/controller/priorityqueue/priorityqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"sync/atomic"
"time"

"github.com/go-logr/logr"
"github.com/google/btree"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/util/workqueue"
Expand Down Expand Up @@ -36,6 +37,7 @@ type Opts[T comparable] struct {
// limiter with an initial delay of five milliseconds and a max delay of 1000 seconds.
RateLimiter workqueue.TypedRateLimiter[T]
MetricProvider workqueue.MetricsProvider
Log logr.Logger
}

// Opt allows to configure a PriorityQueue.
Expand All @@ -57,6 +59,7 @@ func New[T comparable](name string, o ...Opt[T]) PriorityQueue[T] {
}

pq := &priorityqueue[T]{
log: opts.Log,
items: map[T]*item[T]{},
queue: btree.NewG(32, less[T]),
becameReady: sets.Set[T]{},
Expand All @@ -75,6 +78,7 @@ func New[T comparable](name string, o ...Opt[T]) PriorityQueue[T] {
}

go pq.spin()
go pq.logState()
if _, ok := pq.metrics.(noMetrics[T]); !ok {
go pq.updateUnfinishedWorkLoop()
}
Expand All @@ -83,6 +87,7 @@ func New[T comparable](name string, o ...Opt[T]) PriorityQueue[T] {
}

type priorityqueue[T comparable] struct {
log logr.Logger
// lock has to be acquired for any access any of items, queue, addedCounter
// or becameReady
lock sync.Mutex
Expand Down Expand Up @@ -141,14 +146,14 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) {
}
if _, ok := w.items[key]; !ok {
item := &item[T]{
key: key,
addedCounter: w.addedCounter,
priority: o.Priority,
readyAt: readyAt,
Key: key,
AddedCounter: w.addedCounter,
Priority: o.Priority,
ReadyAt: readyAt,
}
w.items[key] = item
w.queue.ReplaceOrInsert(item)
if item.readyAt == nil {
if item.ReadyAt == nil {
w.metrics.add(key)
}
w.addedCounter++
Expand All @@ -158,15 +163,15 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) {
// The b-tree de-duplicates based on ordering and any change here
// will affect the order - Just delete and re-add.
item, _ := w.queue.Delete(w.items[key])
if o.Priority > item.priority {
item.priority = o.Priority
if o.Priority > item.Priority {
item.Priority = o.Priority
}

if item.readyAt != nil && (readyAt == nil || readyAt.Before(*item.readyAt)) {
if item.ReadyAt != nil && (readyAt == nil || readyAt.Before(*item.ReadyAt)) {
if readyAt == nil {
w.metrics.add(key)
}
item.readyAt = readyAt
item.ReadyAt = readyAt
}

w.queue.ReplaceOrInsert(item)
Expand Down Expand Up @@ -210,14 +215,14 @@ func (w *priorityqueue[T]) spin() {
// track what we want to delete and do it after we are done ascending.
var toDelete []*item[T]
w.queue.Ascend(func(item *item[T]) bool {
if item.readyAt != nil {
if readyAt := item.readyAt.Sub(w.now()); readyAt > 0 {
if item.ReadyAt != nil {
if readyAt := item.ReadyAt.Sub(w.now()); readyAt > 0 {
nextReady = w.tick(readyAt)
return false
}
if !w.becameReady.Has(item.key) {
w.metrics.add(item.key)
w.becameReady.Insert(item.key)
if !w.becameReady.Has(item.Key) {
w.metrics.add(item.Key)
w.becameReady.Insert(item.Key)
}
}

Expand All @@ -228,16 +233,16 @@ func (w *priorityqueue[T]) spin() {
}

// Item is locked, we can not hand it out
if w.locked.Has(item.key) {
if w.locked.Has(item.Key) {
return true
}

w.metrics.get(item.key)
w.locked.Insert(item.key)
w.metrics.get(item.Key)
w.locked.Insert(item.Key)
w.waiters.Add(-1)
delete(w.items, item.key)
delete(w.items, item.Key)
toDelete = append(toDelete, item)
w.becameReady.Delete(item.key)
w.becameReady.Delete(item.Key)
w.get <- *item

return true
Expand Down Expand Up @@ -268,7 +273,7 @@ func (w *priorityqueue[T]) GetWithPriority() (_ T, priority int, shutdown bool)
w.notifyItemOrWaiterAdded()
item := <-w.get

return item.key, item.priority, w.shutdown.Load()
return item.Key, item.Priority, w.shutdown.Load()
}

func (w *priorityqueue[T]) Get() (item T, shutdown bool) {
Expand Down Expand Up @@ -316,7 +321,7 @@ func (w *priorityqueue[T]) Len() int {

var result int
w.queue.Ascend(func(item *item[T]) bool {
if item.readyAt == nil || item.readyAt.Compare(w.now()) <= 0 {
if item.ReadyAt == nil || item.ReadyAt.Compare(w.now()) <= 0 {
result++
return true
}
Expand All @@ -326,36 +331,64 @@ func (w *priorityqueue[T]) Len() int {
return result
}

func (w *priorityqueue[T]) logState() {
t := time.Tick(10 * time.Second)
for {
select {
case <-w.done:
return
case <-t:
}

// Log level may change at runtime, so keep the
// loop going even if a given level is currently
// not enabled.
if !w.log.V(5).Enabled() {
continue
}
w.lock.Lock()
items := make([]*item[T], 0, len(w.items))
w.queue.Ascend(func(item *item[T]) bool {
items = append(items, item)
return true
})
w.lock.Unlock()

w.log.V(5).Info("workqueue_items", "items", items)
}
}

func less[T comparable](a, b *item[T]) bool {
if a.readyAt == nil && b.readyAt != nil {
if a.ReadyAt == nil && b.ReadyAt != nil {
return true
}
if b.readyAt == nil && a.readyAt != nil {
if b.ReadyAt == nil && a.ReadyAt != nil {
return false
}
if a.readyAt != nil && b.readyAt != nil && !a.readyAt.Equal(*b.readyAt) {
return a.readyAt.Before(*b.readyAt)
if a.ReadyAt != nil && b.ReadyAt != nil && !a.ReadyAt.Equal(*b.ReadyAt) {
return a.ReadyAt.Before(*b.ReadyAt)
}
if a.priority != b.priority {
return a.priority > b.priority
if a.Priority != b.Priority {
return a.Priority > b.Priority
}

return a.addedCounter < b.addedCounter
return a.AddedCounter < b.AddedCounter
}

type item[T comparable] struct {
key T
addedCounter uint64
priority int
readyAt *time.Time
Key T `json:"key"`
AddedCounter uint64 `json:"addedCounter"`
Priority int `json:"priority"`
ReadyAt *time.Time `json:"readyAt,omitempty"`
}

func (w *priorityqueue[T]) updateUnfinishedWorkLoop() {
t := time.NewTicker(500 * time.Millisecond) // borrowed from workqueue: https://github.com/kubernetes/kubernetes/blob/67a807bf142c7a2a5ecfdb2a5d24b4cdea4cc79c/staging/src/k8s.io/client-go/util/workqueue/queue.go#L182
defer t.Stop()
for range t.C {
if w.shutdown.Load() {
t := time.Tick(500 * time.Millisecond) // borrowed from workqueue: https://github.com/kubernetes/kubernetes/blob/67a807bf142c7a2a5ecfdb2a5d24b4cdea4cc79c/staging/src/k8s.io/client-go/util/workqueue/queue.go#L182
for {
select {
case <-w.done:
return
case <-t:
}
w.metrics.updateUnfinishedWork()
}
Expand Down

0 comments on commit 791b6c9

Please sign in to comment.