Skip to content

Commit

Permalink
Add the ability to set watch timeouts.
Browse files Browse the repository at this point in the history
Allow setting shorter watch timeout for the firt time alone.

What this PR does / why we need it:
We have seen in some evnironments (where requests are tunnelled) setting up the (first) watch is blocking if there are no resources available to watch. This causes the apply loop to be blocked when setting up watches before apply.
  • Loading branch information
barney-s committed Jun 12, 2024
1 parent b758057 commit 4a54ca1
Showing 1 changed file with 49 additions and 3 deletions.
52 changes: 49 additions & 3 deletions pkg/patterns/declarative/pkg/watch/dynamic.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"

"k8s.io/apimachinery/pkg/api/meta"
Expand All @@ -35,8 +36,19 @@ import (
"sigs.k8s.io/controller-runtime/pkg/log"
)

// WatchDelay is the time between a Watch being dropped and attempting to resume it
const WatchDelay = 30 * time.Second
var (
// WatchActivityTimeout sets a timeout for a Watch activity under normal operation
WatchActivityTimeout = 300 * time.Second
// WatchActivityFirstTimeout sets a timeout for Watch activity in an Apply path
// We expect the author to set this to a lower value in environments where it makes sense.
// func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error { ... watch.WatchActivityFirstTimeout = 10 * time.Second ... }
WatchActivityFirstTimeout = 300 * time.Second
)

const (
// WatchDelay is the time between a Watch being dropped and attempting to resume it
WatchDelay = 30 * time.Second
)

// NewDynamicWatch constructs a watcher for unstructured objects.
// Deprecated: avoid using directly; will move to internal in future.
Expand Down Expand Up @@ -138,13 +150,46 @@ type clientObject struct {
//
// [1] https://github.com/kubernetes/kubernetes/issues/54878#issuecomment-357575276
func (w *dynamicKindWatch) watchUntilClosed(ctx context.Context, eventTarget metav1.ObjectMeta, watchStarted *sync.WaitGroup) {
var sawActivity atomic.Bool

log := log.FromContext(ctx)

options := w.FilterOptions
// Though we don't use the resource version, we allow bookmarks to help keep TCP connections healthy.
options.AllowWatchBookmarks = true

events, err := w.resource.Watch(context.TODO(), options)
activityTimeout := WatchActivityTimeout
if watchStarted != nil {
activityTimeout = WatchActivityFirstTimeout
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// Check for events periodically
ticker := time.NewTicker(activityTimeout)
defer ticker.Stop()
sawActivity.Store(false)

go func() {
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if !sawActivity.Load() {
log.WithValues("kind", w.GVK.String()).WithValues("namespace", w.FilterNamespace).WithValues("labels", options.LabelSelector).Info("no activity seen for a while, cancelling watch")
cancel()
return
}
sawActivity.Store(false)
}
}
}()

events, err := w.resource.Watch(ctx, options)
// If the Watch() call doesnt return, this would not be set to true thereby causing the timer to cancle the watch() context
// We have seen cases where a proxy in between causes the first watch to hang if there were no matching objects to return
sawActivity.Store(true)

if watchStarted != nil {
watchStarted.Done()
}
Expand All @@ -159,6 +204,7 @@ func (w *dynamicKindWatch) watchUntilClosed(ctx context.Context, eventTarget met
defer events.Stop()

for clientEvent := range events.ResultChan() {
sawActivity.Store(true)
switch clientEvent.Type {
case watch.Bookmark:
// not an object change, we ignore it
Expand Down

0 comments on commit 4a54ca1

Please sign in to comment.