Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix nil stop value for source.Channel #154

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 39 additions & 36 deletions pkg/manager/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ type controllerManager struct {
errChan chan error
stop <-chan struct{}

// stopper is the write side of the stop channel. They should have the same value.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe change to (for a bit more clarity -- it took me a moment to parse what "they" was):

stopper is the write side of the stop channel. It and stop should have the same value.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// stopper is the write side of the stop channel. They should have the same value.
// stopper is the write side of the stop channel (used to close `stop`). It and `stop` should have the same value.

stopper chan<- struct{}

startCache func(stop <-chan struct{}) error
}

Expand Down Expand Up @@ -158,9 +161,13 @@ func (cm *controllerManager) GetRESTMapper() meta.RESTMapper {
}

func (cm *controllerManager) Start(stop <-chan struct{}) error {
defer close(cm.stopper)

if cm.resourceLock == nil {
go cm.start(stop)
go cm.start()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this doesn't block. Does it really need a goroutine?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
go cm.start()
cm.start()

select {
// Only this function should receive from stop, and everything else
// should receive from cm.stop.
case <-stop:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this flow gets a bit confusing to read with cm.stop vs stop. Maybe stop to externalStop or rename cm.stop to cm.internalStop.

// we are done
return nil
Expand All @@ -178,7 +185,13 @@ func (cm *controllerManager) Start(stop <-chan struct{}) error {
RenewDeadline: 10 * time.Second,
RetryPeriod: 2 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: cm.start,
// This type changes in k8s 1.12 to func(context.Context)
// Ignore the passed-in stop channel from leaderelection. The next
// thing it does anyway after closing its stop channel is call
// OnStoppedLeading.
OnStartedLeading: func(_ <-chan struct{}) {
cm.start()
},
OnStoppedLeading: func() {
// Most implementations of leader election log.Fatal() here.
// Since Start is wrapped in log.Fatal when called, we can just return
Expand All @@ -203,43 +216,33 @@ func (cm *controllerManager) Start(stop <-chan struct{}) error {
}
}

func (cm *controllerManager) start(stop <-chan struct{}) {
func() {
cm.mu.Lock()
defer cm.mu.Unlock()

cm.stop = stop

// Start the Cache. Allow the function to start the cache to be mocked out for testing
if cm.startCache == nil {
cm.startCache = cm.cache.Start
}
go func() {
if err := cm.startCache(stop); err != nil {
cm.errChan <- err
}
}()
func (cm *controllerManager) start() {
cm.mu.Lock()
defer cm.mu.Unlock()

// Wait for the caches to sync.
// TODO(community): Check the return value and write a test
cm.cache.WaitForCacheSync(stop)

// Start the runnables after the cache has synced
for _, c := range cm.runnables {
// Controllers block, but we want to return an error if any have an error starting.
// Write any Start errors to a channel so we can return them
ctrl := c
go func() {
cm.errChan <- ctrl.Start(stop)
}()
// Start the Cache. Allow the function to start the cache to be mocked out for testing
if cm.startCache == nil {
cm.startCache = cm.cache.Start
}
go func() {
if err := cm.startCache(cm.stop); err != nil {
cm.errChan <- err
}

cm.started = true
}()

select {
case <-stop:
// We are done
return
// Wait for the caches to sync.
// TODO(community): Check the return value and write a test
cm.cache.WaitForCacheSync(cm.stop)

// Start the runnables after the cache has synced
for _, c := range cm.runnables {
// Controllers block, but we want to return an error if any have an error starting.
// Write any Start errors to a channel so we can return them
ctrl := c
go func() {
cm.errChan <- ctrl.Start(cm.stop)
}()
}

cm.started = true
}
4 changes: 4 additions & 0 deletions pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@ func New(config *rest.Config, options Options) (Manager, error) {
return nil, err
}

stop := make(chan struct{})

return &controllerManager{
config: config,
scheme: options.Scheme,
Expand All @@ -191,6 +193,8 @@ func New(config *rest.Config, options Options) (Manager, error) {
recorderProvider: recorderProvider,
resourceLock: resourceLock,
mapper: mapper,
stop: stop,
stopper: stop,
}, nil
}

Expand Down
3 changes: 1 addition & 2 deletions pkg/manager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,8 +327,7 @@ var _ = Describe("manger.Manager", func() {
},
stop: func(stop <-chan struct{}) error {
defer GinkgoRecover()
// Manager stop chan has not been initialized.
Expect(stop).To(BeNil())
Expect(stop).NotTo(BeNil())
return nil
},
f: func(f inject.Func) error {
Expand Down