Skip to content

Commit

Permalink
Merge pull request #466 from abursavich/dontpanic
Browse files Browse the repository at this point in the history
🐛 Fix runnable race in manager
  • Loading branch information
k8s-ci-robot authored Jun 6, 2019
2 parents cd4692d + 96826f6 commit be010e1
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 26 deletions.
42 changes: 24 additions & 18 deletions pkg/manager/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,19 +288,7 @@ func (cm *controllerManager) startNonLeaderElectionRunnables() {
cm.mu.Lock()
defer cm.mu.Unlock()

// Start the Cache. Allow the function to start the cache to be mocked out for testing
if cm.startCache == nil {
cm.startCache = cm.cache.Start
}
go func() {
if err := cm.startCache(cm.internalStop); err != nil {
cm.errChan <- err
}
}()

// Wait for the caches to sync.
// TODO(community): Check the return value and write a test
cm.cache.WaitForCacheSync(cm.internalStop)
cm.waitForCache()

// Start the non-leaderelection Runnables after the cache has synced
for _, c := range cm.nonLeaderElectionRunnables {
Expand All @@ -311,14 +299,13 @@ func (cm *controllerManager) startNonLeaderElectionRunnables() {
cm.errChan <- ctrl.Start(cm.internalStop)
}()
}

cm.started = true
}

func (cm *controllerManager) startLeaderElectionRunnables() {
// Wait for the caches to sync.
// TODO(community): Check the return value and write a test
cm.cache.WaitForCacheSync(cm.internalStop)
cm.mu.Lock()
defer cm.mu.Unlock()

cm.waitForCache()

// Start the leader election Runnables after the cache has synced
for _, c := range cm.leaderElectionRunnables {
Expand All @@ -329,7 +316,26 @@ func (cm *controllerManager) startLeaderElectionRunnables() {
cm.errChan <- ctrl.Start(cm.internalStop)
}()
}
}

func (cm *controllerManager) waitForCache() {
if cm.started {
return
}

// Start the Cache. Allow the function to start the cache to be mocked out for testing
if cm.startCache == nil {
cm.startCache = cm.cache.Start
}
go func() {
if err := cm.startCache(cm.internalStop); err != nil {
cm.errChan <- err
}
}()

// Wait for the caches to sync.
// TODO(community): Check the return value and write a test
cm.cache.WaitForCacheSync(cm.internalStop)
cm.started = true
}

Expand Down
16 changes: 8 additions & 8 deletions pkg/manager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,15 +206,15 @@ var _ = Describe("manger.Manager", func() {
Expect(err).NotTo(HaveOccurred())
c1 := make(chan struct{})
Expect(m.Add(RunnableFunc(func(s <-chan struct{}) error {
defer close(c1)
defer GinkgoRecover()
close(c1)
return nil
}))).To(Succeed())

c2 := make(chan struct{})
Expect(m.Add(RunnableFunc(func(s <-chan struct{}) error {
defer close(c2)
defer GinkgoRecover()
close(c2)
return nil
}))).To(Succeed())

Expand Down Expand Up @@ -257,21 +257,21 @@ var _ = Describe("manger.Manager", func() {
c1 := make(chan struct{})
Expect(m.Add(RunnableFunc(func(s <-chan struct{}) error {
defer GinkgoRecover()
defer close(c1)
close(c1)
return nil
}))).To(Succeed())

c2 := make(chan struct{})
Expect(m.Add(RunnableFunc(func(s <-chan struct{}) error {
defer GinkgoRecover()
defer close(c2)
close(c2)
return fmt.Errorf("expected error")
}))).To(Succeed())

c3 := make(chan struct{})
Expect(m.Add(RunnableFunc(func(s <-chan struct{}) error {
defer GinkgoRecover()
defer close(c3)
close(c3)
return nil
}))).To(Succeed())

Expand Down Expand Up @@ -441,8 +441,8 @@ var _ = Describe("manger.Manager", func() {
// Add one component before starting
c1 := make(chan struct{})
Expect(m.Add(RunnableFunc(func(s <-chan struct{}) error {
defer close(c1)
defer GinkgoRecover()
close(c1)
return nil
}))).To(Succeed())

Expand All @@ -457,8 +457,8 @@ var _ = Describe("manger.Manager", func() {
// Add another component after starting
c2 := make(chan struct{})
Expect(m.Add(RunnableFunc(func(s <-chan struct{}) error {
defer close(c2)
defer GinkgoRecover()
close(c2)
return nil
}))).To(Succeed())
<-c1
Expand All @@ -483,8 +483,8 @@ var _ = Describe("manger.Manager", func() {

c1 := make(chan struct{})
Expect(m.Add(RunnableFunc(func(s <-chan struct{}) error {
defer close(c1)
defer GinkgoRecover()
close(c1)
return nil
}))).To(Succeed())
<-c1
Expand Down

0 comments on commit be010e1

Please sign in to comment.