Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use better way to set up controller with Manager #27

Merged
merged 1 commit into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 47 additions & 20 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,17 @@ import (
"crypto/tls"
"flag"
"os"
"time"

// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
// to ensure that exec-entrypoint and run can make use of them.
_ "k8s.io/client-go/plugin/pkg/client/auth"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/leaderelection/resourcelock"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
Expand All @@ -36,21 +39,31 @@ import (

autoscalingv1alpha1 "github.com/aibrix/aibrix/api/autoscaling/v1alpha1"
modelv1alpha1 "github.com/aibrix/aibrix/api/model/v1alpha1"
modeladaptercontroller "github.com/aibrix/aibrix/pkg/controller/modeladapter"
autoscalingcontroller "github.com/aibrix/aibrix/pkg/controller/podautoscaler"
"github.com/aibrix/aibrix/pkg/controller"
//+kubebuilder:scaffold:imports
)

const (
defaultLeaseDuration = 15 * time.Second
defaultRenewDeadline = 10 * time.Second
defaultRetryPeriod = 2 * time.Second
defaultControllerCacheSyncTimeout = 2 * time.Minute
)

var (
scheme = runtime.NewScheme()
setupLog = ctrl.Log.WithName("setup")
scheme = runtime.NewScheme()
setupLog = ctrl.Log.WithName("setup")
restConfigQPS = flag.Int("rest-config-qps", 30, "QPS of rest config.")
restConfigBurst = flag.Int("rest-config-burst", 50, "Burst of rest config.")
)

func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))

utilruntime.Must(autoscalingv1alpha1.AddToScheme(scheme))
utilruntime.Must(modelv1alpha1.AddToScheme(scheme))

scheme.AddUnversionedTypes(metav1.SchemeGroupVersion, &metav1.UpdateOptions{}, &metav1.DeleteOptions{}, &metav1.CreateOptions{})
//+kubebuilder:scaffold:scheme
}

Expand All @@ -60,6 +73,11 @@ func main() {
var probeAddr string
var secureMetrics bool
var enableHTTP2 bool
var leaderElectionNamespace string
var leaseDuration time.Duration
var renewDeadLine time.Duration
var leaderElectionResourceLock string
var leaderElectionId string
flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
flag.BoolVar(&enableLeaderElection, "leader-elect", false,
Expand All @@ -69,6 +87,18 @@ func main() {
"If set the metrics endpoint is served securely")
flag.BoolVar(&enableHTTP2, "enable-http2", false,
"If set, HTTP/2 will be enabled for the metrics and webhook servers")
flag.BoolVar(&enableLeaderElection, "enable-leader-election", false, "Whether you need to enable leader election.")
flag.StringVar(&leaderElectionNamespace, "leader-election-namespace", "aibrix-system",
"This determines the namespace in which the leader election configmap will be created, it will use in-cluster namespace if empty.")
flag.DurationVar(&leaseDuration, "leader-election-lease-duration", defaultLeaseDuration,
"leader-election-lease-duration is the duration that non-leader candidates will wait to force acquire leadership. This is measured against time of last observed ack. Default is 15 seconds.")
flag.DurationVar(&renewDeadLine, "leader-election-renew-deadline", defaultRenewDeadline,
"leader-election-renew-deadline is the duration that the acting controlplane will retry refreshing leadership before giving up. Default is 10 seconds.")
flag.StringVar(&leaderElectionResourceLock, "leader-election-resource-lock", resourcelock.LeasesResourceLock,
"leader-election-resource-lock determines which resource lock to use for leader election, defaults to \"leases\".")
flag.StringVar(&leaderElectionId, "leader-election-id", "aibrix-controller-manager",
"leader-election-id determines the name of the resource that leader election will use for holding the leader lock, Default is aibrix-controller-manager.")

opts := zap.Options{
Development: true,
}
Expand Down Expand Up @@ -104,10 +134,14 @@ func main() {
SecureServing: secureMetrics,
TLSOpts: tlsOpts,
},
WebhookServer: webhookServer,
HealthProbeBindAddress: probeAddr,
LeaderElection: enableLeaderElection,
LeaderElectionID: "d534ee11.aibrix.ai",
WebhookServer: webhookServer,
HealthProbeBindAddress: probeAddr,
LeaderElection: enableLeaderElection,
LeaderElectionID: leaderElectionId,
LeaderElectionNamespace: leaderElectionNamespace,
LeaderElectionResourceLock: leaderElectionResourceLock,
LeaseDuration: &leaseDuration,
RenewDeadline: &renewDeadLine,
// LeaderElectionReleaseOnCancel defines if the leader should step down voluntarily
// when the Manager ends. This requires the binary to immediately end when the
// Manager is stopped, otherwise, this setting is unsafe. Setting this significantly
Expand All @@ -125,20 +159,13 @@ func main() {
os.Exit(1)
}

if err = (&autoscalingcontroller.PodAutoscalerReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create PodAutoscaler controller", "controller", "PodAutoscaler")
os.Exit(1)
}
if err = (&modeladaptercontroller.ModelAdapterReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "ModelAdapter")
// Kind controller registration is encapsulated inside the pkg/controller/controller.go
// So here we can use more clean registration flow and there's no need to change logics in future.
if err = controller.SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to setup controller")
os.Exit(1)
}

//+kubebuilder:scaffold:builder

if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
Expand Down
35 changes: 35 additions & 0 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package controller

import (
"github.com/aibrix/aibrix/pkg/controller/modeladapter"
"github.com/aibrix/aibrix/pkg/controller/podautoscaler"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/manager"
)

// Borrowed logic from Kruise
// Original source: https://github.com/openkruise/kruise/blob/master/pkg/controller/controllers.go
// Reason: We have single controller-manager as well and use the controller-runtime libraries.
// Instead of registering every controller in the main.go, kruise's registration flow is much cleaner.

var controllerAddFuncs []func(manager.Manager) error

func init() {
controllerAddFuncs = append(controllerAddFuncs, podautoscaler.Add)
controllerAddFuncs = append(controllerAddFuncs, modeladapter.Add)
}

// SetupWithManager sets up the controller with the Manager.
func SetupWithManager(m manager.Manager) error {
for _, f := range controllerAddFuncs {
if err := f(m); err != nil {
if kindMatchErr, ok := err.(*meta.NoKindMatchError); ok {
klog.InfoS("CRD is not installed, its controller will perform noops!", "CRD", kindMatchErr.GroupKind)
continue
}
return err
}
}
return nil
}
51 changes: 42 additions & 9 deletions pkg/controller/modeladapter/modeladapter_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,55 @@ package modeladapter

import (
"context"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"

modelv1alpha1 "github.com/aibrix/aibrix/api/model/v1alpha1"
)

// Add creates a new ModelAdapter Controller and adds it to the Manager with default RBAC.
// The Manager will set fields on the Controller and Start it when the Manager is Started.
func Add(mgr manager.Manager) error {
r, err := newReconciler(mgr)
if err != nil {
return err
}
return add(mgr, r)
}

// newReconciler returns a new reconcile.Reconciler
func newReconciler(mgr manager.Manager) (reconcile.Reconciler, error) {
reconciler := &ModelAdapterReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
}
return reconciler, nil
}

// add adds a new Controller to mgr with r as the reconcile.Reconciler
func add(mgr manager.Manager, r reconcile.Reconciler) error {
// Create a new controller
_, err := controller.New("model-adapter-controller", mgr, controller.Options{
Reconciler: r})
if err != nil {
return err
}
//return ctrl.NewControllerManagedBy(mgr).
// For(&modelv1alpha1.ModelAdapter{}).
// Complete(r)

klog.V(4).InfoS("Finished to add model-adapter-controller")

return nil
}

var _ reconcile.Reconciler = &ModelAdapterReconciler{}

// ModelAdapterReconciler reconciles a ModelAdapter object
type ModelAdapterReconciler struct {
client.Client
Expand All @@ -53,10 +93,3 @@ func (r *ModelAdapterReconciler) Reconcile(ctx context.Context, req ctrl.Request

return ctrl.Result{}, nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *ModelAdapterReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&modelv1alpha1.ModelAdapter{}).
Complete(r)
}
53 changes: 43 additions & 10 deletions pkg/controller/podautoscaler/podautoscaler_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,55 @@ package podautoscaler

import (
"context"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/log"

autoscalingv1alpha1 "github.com/aibrix/aibrix/api/autoscaling/v1alpha1"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

// Add creates a new PodAutoscaler Controller and adds it to the Manager with default RBAC.
// The Manager will set fields on the Controller and Start it when the Manager is Started.
func Add(mgr manager.Manager) error {
r, err := newReconciler(mgr)
if err != nil {
return err
}
return add(mgr, r)
}

// newReconciler returns a new reconcile.Reconciler
func newReconciler(mgr manager.Manager) (reconcile.Reconciler, error) {
reconciler := &PodAutoscalerReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
}
return reconciler, nil
}

// add adds a new Controller to mgr with r as the reconcile.Reconciler
func add(mgr manager.Manager, r reconcile.Reconciler) error {
// Create a new controller
_, err := controller.New("pod-autoscaler-controller", mgr, controller.Options{
Reconciler: r})
if err != nil {
return err
}

// ctrl.NewControllerManagedBy(mgr).
// For(&autoscalingv1alpha1.PodAutoscaler{}).
// Complete(r)

klog.V(4).InfoS("Finished to add pod-autoscaler-controller")

return nil
}

var _ reconcile.Reconciler = &PodAutoscalerReconciler{}

// PodAutoscalerReconciler reconciles a PodAutoscaler object
type PodAutoscalerReconciler struct {
client.Client
Expand All @@ -53,10 +93,3 @@ func (r *PodAutoscalerReconciler) Reconcile(ctx context.Context, req ctrl.Reques

return ctrl.Result{}, nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *PodAutoscalerReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&autoscalingv1alpha1.PodAutoscaler{}).
Complete(r)
}