Skip to content

Commit

Permalink
UPSTREAM: <carry>: Release lock on KCM and KS termination
Browse files Browse the repository at this point in the history
UPSTREAM: <carry>: Force releasing the lock on exit for KS

squash with UPSTREAM: <carry>: Release lock on KCM and KS termination

OpenShift-Rebase-Source: fc91252
  • Loading branch information
tnozicka authored and sanchezl committed Jan 5, 2023
1 parent 4a2fa27 commit f60aed3
Showing 1 changed file with 34 additions and 9 deletions.
43 changes: 34 additions & 9 deletions cmd/kube-controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
genericfeatures "k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/healthz"
"k8s.io/apiserver/pkg/server/mux"
utilfeature "k8s.io/apiserver/pkg/util/feature"
Expand Down Expand Up @@ -145,7 +146,9 @@ controller, and serviceaccounts controller.`,

// add feature enablement metrics
utilfeature.DefaultMutableFeatureGate.AddMetrics()
return Run(c.Complete(), wait.NeverStop)

stopCh := server.SetupSignalHandler()
return Run(c.Complete(), stopCh)
},
Args: func(cmd *cobra.Command, args []string) error {
for _, arg := range args {
Expand Down Expand Up @@ -297,10 +300,18 @@ func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
run(ctx, startSATokenController, initializersFunc)
},
OnStoppedLeading: func() {
klog.ErrorS(nil, "leaderelection lost")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
select {
case <-stopCh:
// We were asked to terminate. Exit 0.
klog.Info("Requested to terminate. Exiting.")
os.Exit(0)
default:
// We lost the lock.
klog.ErrorS(nil, "leaderelection lost")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
},
})
}, stopCh)

// If Leader Migration is enabled, proceed to attempt the migration lock.
if leaderMigrator != nil {
Expand All @@ -321,10 +332,18 @@ func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
run(ctx, nil, createInitializersFunc(leaderMigrator.FilterFunc, leadermigration.ControllerMigrated))
},
OnStoppedLeading: func() {
klog.ErrorS(nil, "migration leaderelection lost")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
select {
case <-stopCh:
// We were asked to terminate. Exit 0.
klog.Info("Requested to terminate. Exiting.")
os.Exit(0)
default:
// We lost the lock.
klog.ErrorS(nil, "migration leaderelection lost")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
},
})
}, stopCh)
}

<-stopCh
Expand Down Expand Up @@ -732,7 +751,7 @@ func createClientBuilders(c *config.CompletedConfig) (clientBuilder clientbuilde

// leaderElectAndRun runs the leader election, and runs the callbacks once the leader lease is acquired.
// TODO: extract this function into staging/controller-manager
func leaderElectAndRun(c *config.CompletedConfig, lockIdentity string, electionChecker *leaderelection.HealthzAdaptor, resourceLock string, leaseName string, callbacks leaderelection.LeaderCallbacks) {
func leaderElectAndRun(c *config.CompletedConfig, lockIdentity string, electionChecker *leaderelection.HealthzAdaptor, resourceLock string, leaseName string, callbacks leaderelection.LeaderCallbacks, stopCh <-chan struct{}) {
rl, err := resourcelock.NewFromKubeconfig(resourceLock,
c.ComponentConfig.Generic.LeaderElection.ResourceNamespace,
leaseName,
Expand All @@ -746,7 +765,13 @@ func leaderElectAndRun(c *config.CompletedConfig, lockIdentity string, electionC
klog.Fatalf("error creating lock: %v", err)
}

leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{
leCtx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
<-stopCh
cancel()
}()
leaderelection.RunOrDie(leCtx, leaderelection.LeaderElectionConfig{
Lock: rl,
LeaseDuration: c.ComponentConfig.Generic.LeaderElection.LeaseDuration.Duration,
RenewDeadline: c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration,
Expand Down

0 comments on commit f60aed3

Please sign in to comment.