Skip to content

Commit

Permalink
Merge pull request #424 from mengqiy/nonleaderelection
Browse files Browse the repository at this point in the history
support HA (non leader election) components
  • Loading branch information
k8s-ci-robot authored May 16, 2019
2 parents f39791e + ea5a354 commit fe0f6dd
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,25 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

// +kubebuilder:object:generate=true
// +groupName=chaosapps.metamagical.io
package pkg

import (
"k8s.io/apimachinery/pkg/runtime/schema"
logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"
"sigs.k8s.io/controller-runtime/pkg/scheme"
)

var log = logf.Log.WithName("chaospod-resource")
var (
log = logf.Log.WithName("chaospod-resource")

// SchemeGroupVersion is group version used to register these objects
SchemeGroupVersion = schema.GroupVersion{Group: "chaosapps.metamagical.io", Version: "v1"}

// SchemeBuilder is used to add go types to the GroupVersionKind scheme
SchemeBuilder = &scheme.Builder{GroupVersion: SchemeGroupVersion}

// AddToScheme is required by pkg/client/...
AddToScheme = SchemeBuilder.AddToScheme
)
18 changes: 2 additions & 16 deletions examples/crd/pkg/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/controller-runtime/pkg/scheme"
"sigs.k8s.io/controller-runtime/pkg/webhook"
)

Expand All @@ -41,12 +39,11 @@ type ChaosPodStatus struct {
LastRun metav1.Time `json:"lastRun,omitempty"`
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// +kubebuilder:object:root=true

// ChaosPod is the Schema for the randomjobs API
// +kubebuilder:printcolumn:name="next stop",type="string",JSONPath=".spec.nextStop",format="date"
// +kubebuilder:printcolumn:name="last run",type="string",JSONPath=".status.lastRun",format="date"
// +k8s:openapi-gen=true
type ChaosPod struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Expand All @@ -55,7 +52,7 @@ type ChaosPod struct {
Status ChaosPodStatus `json:"status,omitempty"`
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// +kubebuilder:object:root=true

// ChaosPodList contains a list of ChaosPod
type ChaosPodList struct {
Expand Down Expand Up @@ -106,14 +103,3 @@ func (c *ChaosPod) Default() {
func init() {
SchemeBuilder.Register(&ChaosPod{}, &ChaosPodList{})
}

var (
// SchemeGroupVersion is group version used to register these objects
SchemeGroupVersion = schema.GroupVersion{Group: "chaosapps.metamagical.io", Version: "v1"}

// SchemeBuilder is used to add go types to the GroupVersionKind scheme
SchemeBuilder = &scheme.Builder{GroupVersion: SchemeGroupVersion}

// AddToScheme is required by pkg/client/...
AddToScheme = SchemeBuilder.AddToScheme
)
49 changes: 39 additions & 10 deletions pkg/manager/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,12 @@ type controllerManager struct {
// to scheme.scheme.
scheme *runtime.Scheme

// runnables is the set of Controllers that the controllerManager injects deps into and Starts.
runnables []Runnable
// leaderElectionRunnables is the set of Controllers that the controllerManager injects deps into and Starts.
// These Runnables are managed by lead election.
leaderElectionRunnables []Runnable
// nonLeaderElectionRunnables is the set of webhook servers that the controllerManager injects deps into and Starts.
// These Runnables will not be blocked by lead election.
nonLeaderElectionRunnables []Runnable

cache cache.Cache

Expand Down Expand Up @@ -121,7 +125,7 @@ type controllerManager struct {
retryPeriod time.Duration
}

// Add sets dependencies on i, and adds it to the list of runnables to start.
// Add sets dependencies on i, and adds it to the list of Runnables to start.
func (cm *controllerManager) Add(r Runnable) error {
cm.mu.Lock()
defer cm.mu.Unlock()
Expand All @@ -131,8 +135,13 @@ func (cm *controllerManager) Add(r Runnable) error {
return err
}

// Add the runnable to the list
cm.runnables = append(cm.runnables, r)
// Add the runnable to the leader election or the non-leaderelection list
if leRunnable, ok := r.(LeaderElectionRunnable); ok && !leRunnable.NeedLeaderElection() {
cm.nonLeaderElectionRunnables = append(cm.nonLeaderElectionRunnables, r)
} else {
cm.leaderElectionRunnables = append(cm.leaderElectionRunnables, r)
}

if cm.started {
// If already started, start the controller
go func() {
Expand Down Expand Up @@ -254,13 +263,15 @@ func (cm *controllerManager) Start(stop <-chan struct{}) error {
go cm.serveMetrics(cm.internalStop)
}

go cm.startNonLeaderElectionRunnables()

if cm.resourceLock != nil {
err := cm.startLeaderElection()
if err != nil {
return err
}
} else {
go cm.start()
go cm.startLeaderElectionRunnables()
}

select {
Expand All @@ -273,7 +284,7 @@ func (cm *controllerManager) Start(stop <-chan struct{}) error {
}
}

func (cm *controllerManager) start() {
func (cm *controllerManager) startNonLeaderElectionRunnables() {
cm.mu.Lock()
defer cm.mu.Unlock()

Expand All @@ -291,8 +302,26 @@ func (cm *controllerManager) start() {
// TODO(community): Check the return value and write a test
cm.cache.WaitForCacheSync(cm.internalStop)

// Start the runnables after the cache has synced
for _, c := range cm.runnables {
// Start the non-leaderelection Runnables after the cache has synced
for _, c := range cm.nonLeaderElectionRunnables {
// Controllers block, but we want to return an error if any have an error starting.
// Write any Start errors to a channel so we can return them
ctrl := c
go func() {
cm.errChan <- ctrl.Start(cm.internalStop)
}()
}

cm.started = true
}

func (cm *controllerManager) startLeaderElectionRunnables() {
// Wait for the caches to sync.
// TODO(community): Check the return value and write a test
cm.cache.WaitForCacheSync(cm.internalStop)

// Start the leader election Runnables after the cache has synced
for _, c := range cm.leaderElectionRunnables {
// Controllers block, but we want to return an error if any have an error starting.
// Write any Start errors to a channel so we can return them
ctrl := c
Expand All @@ -312,7 +341,7 @@ func (cm *controllerManager) startLeaderElection() (err error) {
RetryPeriod: cm.retryPeriod,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(_ context.Context) {
cm.start()
cm.startLeaderElectionRunnables()
},
OnStoppedLeading: func() {
// Most implementations of leader election log.Fatal() here.
Expand Down
13 changes: 11 additions & 2 deletions pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,11 @@ import (
// Manager initializes shared dependencies such as Caches and Clients, and provides them to Runnables.
// A Manager is required to create Controllers.
type Manager interface {
// Add will set reqeusted dependencies on the component, and cause the component to be
// Add will set requested dependencies on the component, and cause the component to be
// started when Start is called. Add will inject any dependencies for which the argument
// implements the inject interface - e.g. inject.Client
// implements the inject interface - e.g. inject.Client.
// Depending on if a Runnable implements LeaderElectionRunnable interface, a Runnable can be run in either
// non-leaderelection mode (always running) or leader election mode (managed by leader election if enabled).
Add(Runnable) error

// SetFields will set any dependencies on an object for which the object has implemented the inject
Expand Down Expand Up @@ -183,6 +185,13 @@ func (r RunnableFunc) Start(s <-chan struct{}) error {
return r(s)
}

// LeaderElectionRunnable knows if a Runnable needs to be run in the leader election mode.
type LeaderElectionRunnable interface {
// NeedLeaderElection returns true if the Runnable needs to be run in the leader election mode.
// e.g. controllers need to be run in leader election mode, while webhook server doesn't.
NeedLeaderElection() bool
}

// New returns a new Manager for creating Controllers.
func New(config *rest.Config, options Options) (Manager, error) {
// Initialize a rest.config if none was specified
Expand Down
4 changes: 4 additions & 0 deletions pkg/manager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,10 @@ var _ = Describe("manger.Manager", func() {
<-c2
<-c3
})

It("should return an error if any non-leaderelection Components fail to Start", func() {
// TODO(mengqiy): implement this after resolving https://github.com/kubernetes-sigs/controller-runtime/issues/429
})
}

Context("with defaults", func() {
Expand Down

0 comments on commit fe0f6dd

Please sign in to comment.