Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: enhance replay #2984

Merged
merged 6 commits into from
Sep 18, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 37 additions & 6 deletions pkg/cachemanager/cachemanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,14 @@ func (c *CacheManager) syncGVK(ctx context.Context, gvk schema.GroupVersionKind)
func (c *CacheManager) manageCache(ctx context.Context) {
// relistStopChan is used to stop any list operations still in progress
relistStopChan := make(chan struct{})
// waitToCloseChan is used to wait on the relist goroutine to end
// when needing to create another one. This ensures that we are essentially
// only using a singleton routine to relist gvks.
waitToCloseChan := make(chan struct{})

// edge case: the 0th relist goroutine is "stopped", by definition, so we close the wait channel
// but it's also "running" so we don't close the kill channel in order to do so in the for loop below.
close(waitToCloseChan)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you share why we are closing it right after it's created? I'm not clear on the comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

iirc, without closing the channel the first time around, the select block below will fire off an error (after 10 seconds hanging on a channel that is not signaled yet).

does that make sense? if so, how could we make the comment better?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ritazh let me know if the updated comment address the why of not closing the relistStopChan here. 🙏🏼


for {
select {
Expand All @@ -341,7 +349,17 @@ func (c *CacheManager) manageCache(ctx context.Context) {

// stop any goroutines that were relisting before
// as we may no longer be interested in those gvks
// and wait with a timeout for the child gorountine to stop.
close(relistStopChan)
select {
case <-waitToCloseChan:
// child goroutine exited gracefully
break
case <-time.After(time.Second * 10):
log.Error(fmt.Errorf("internal: background relist did not exit gracefully"), "possible goroutine leak")
// do not close waitToCloseChan as the goroutine may eventually exit and call close on the channel
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, it will be GC'd anyway

break
}

// assume all gvks need to be relisted
// and while under lock, make a copy of
Expand All @@ -352,8 +370,12 @@ func (c *CacheManager) manageCache(ctx context.Context) {
// clean state
c.needToList = false
relistStopChan = make(chan struct{})
waitToCloseChan = make(chan struct{})

go c.replayGVKs(ctx, gvksToRelist, relistStopChan)
go func() {
c.replayGVKs(ctx, gvksToRelist, relistStopChan)
close(waitToCloseChan)
}()
}()
}
}
Expand All @@ -373,14 +395,23 @@ func (c *CacheManager) replayGVKs(ctx context.Context, gvksToRelist []schema.Gro
case <-stopCh:
return
default:
operation := func() (bool, error) {
if err := c.syncGVK(ctx, gvk); err != nil {
return false, err
operation := func(ctx context.Context) (bool, error) {
select {
// make sure that the stop channel hasn't closed yet in order to stop
// the operation in the backoff retry-er earlier so we don't sync GVKs
// that we may not want to sync anymore. This also ensures that we exit
// the func as soon as possible.
case <-stopCh:
return true, nil
default:
if err := c.syncGVK(ctx, gvk); err != nil {
return false, err
}
return true, nil
}
return true, nil
}

if err := wait.ExponentialBackoff(backoff, operation); err != nil {
if err := wait.ExponentialBackoffWithContext(ctx, backoff, operation); err != nil {
log.Error(err, "internal: error listings gvk cache data", "gvk", gvk)
} else {
gvksSet.Remove(gvk)
Expand Down