diff --git a/pkg/operator/operator.go b/pkg/operator/operator.go new file mode 100644 index 0000000000..56311c5ced --- /dev/null +++ b/pkg/operator/operator.go @@ -0,0 +1,233 @@ +package operator + +import ( + "fmt" + "time" + + "github.com/golang/glog" + cvoclientset "github.com/openshift/cluster-version-operator/pkg/generated/clientset/versioned" + + "k8s.io/api/core/v1" + apiextclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" + apiextinformersv1beta1 "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions/apiextensions/v1beta1" + apiextlistersv1beta1 "k8s.io/apiextensions-apiserver/pkg/client/listers/apiextensions/v1beta1" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + appsinformersv1 "k8s.io/client-go/informers/apps/v1" + coreinformersv1 "k8s.io/client-go/informers/core/v1" + rbacinformersv1 "k8s.io/client-go/informers/rbac/v1" + "k8s.io/client-go/kubernetes" + //"k8s.io/client-go/kubernetes/scheme" + coreclientsetv1 "k8s.io/client-go/kubernetes/typed/core/v1" + appslisterv1 "k8s.io/client-go/listers/apps/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" + + //"github.com/openshift/machine-config-operator/pkg/generated/clientset/versioned/scheme" + "github.com/openshift/machine-api-operator/pkg/render" + + apiregistrationclientset "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset" + "sigs.k8s.io/cluster-api/pkg/client/clientset_generated/clientset" + "sigs.k8s.io/cluster-api/pkg/client/clientset_generated/clientset/scheme" + clusterapiinformersv1alpha1 "sigs.k8s.io/cluster-api/pkg/client/informers_generated/externalversions/cluster/v1alpha1" + clusterapilisterv1alpha1 "sigs.k8s.io/cluster-api/pkg/client/listers_generated/cluster/v1alpha1" +) + +const ( + // maxRetries is the number of times a machineconfig pool will be retried before it is dropped out of the queue. + // With the current rate-limiter in use (5ms*2^(maxRetries-1)) the following numbers represent the times + // a machineconfig pool is going to be requeued: + // + // 5ms, 10ms, 20ms, 40ms, 80ms, 160ms, 320ms, 640ms, 1.3s, 2.6s, 5.1s, 10.2s, 20.4s, 41s, 82s + maxRetries = 15 + providerAWS = "aws" + providerLibvirt = "libvirt" +) + +// Operator defines machince config operator. +type Operator struct { + namespace, name string + + imagesFile string + config string + + clusterAPIClient clientset.Interface + kubeClient kubernetes.Interface + apiExtClient apiextclientset.Interface + apiregistrationClient apiregistrationclientset.Interface + cvoClient cvoclientset.Interface + eventRecorder record.EventRecorder + + syncHandler func(ic string) error + + crdLister apiextlistersv1beta1.CustomResourceDefinitionLister + machineSetLister clusterapilisterv1alpha1.MachineSetLister + deployLister appslisterv1.DeploymentLister + + crdListerSynced cache.InformerSynced + machineSetSynced cache.InformerSynced + deployListerSynced cache.InformerSynced + daemonsetListerSynced cache.InformerSynced + + // queue only ever has one item, but it has nice error handling backoff/retry semantics + queue workqueue.RateLimitingInterface +} + +// New returns a new machine config operator. +func New( + namespace, name string, + imagesFile string, + + config string, + + machineSetInformer clusterapiinformersv1alpha1.MachineSetInformer, + configMapInformer coreinformersv1.ConfigMapInformer, + serviceAccountInfomer coreinformersv1.ServiceAccountInformer, + crdInformer apiextinformersv1beta1.CustomResourceDefinitionInformer, + deployInformer appsinformersv1.DeploymentInformer, + clusterRoleInformer rbacinformersv1.ClusterRoleInformer, + clusterRoleBindingInformer rbacinformersv1.ClusterRoleBindingInformer, + + kubeClient kubernetes.Interface, + apiExtClient apiextclientset.Interface, + apiregistrationClient apiregistrationclientset.Interface, + cvoClient cvoclientset.Interface, + clusterAPIClient clientset.Interface, +) *Operator { + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartLogging(glog.Infof) + eventBroadcaster.StartRecordingToSink(&coreclientsetv1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) + + optr := &Operator{ + namespace: namespace, + name: name, + imagesFile: imagesFile, + clusterAPIClient: clusterAPIClient, + kubeClient: kubeClient, + apiExtClient: apiExtClient, + apiregistrationClient: apiregistrationClient, + cvoClient: cvoClient, + eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "machineapioperator"}), + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "machineapioperator"), + } + + machineSetInformer.Informer().AddEventHandler(optr.eventHandler()) + configMapInformer.Informer().AddEventHandler(optr.eventHandler()) + serviceAccountInfomer.Informer().AddEventHandler(optr.eventHandler()) + crdInformer.Informer().AddEventHandler(optr.eventHandler()) + deployInformer.Informer().AddEventHandler(optr.eventHandler()) + clusterRoleInformer.Informer().AddEventHandler(optr.eventHandler()) + clusterRoleBindingInformer.Informer().AddEventHandler(optr.eventHandler()) + + optr.config = config + optr.syncHandler = optr.sync + + optr.crdLister = crdInformer.Lister() + optr.crdListerSynced = crdInformer.Informer().HasSynced + optr.machineSetLister = machineSetInformer.Lister() + optr.machineSetSynced = machineSetInformer.Informer().HasSynced + optr.deployLister = deployInformer.Lister() + optr.deployListerSynced = deployInformer.Informer().HasSynced + + return optr +} + +// Run runs the machine config operator. +func (optr *Operator) Run(workers int, stopCh <-chan struct{}) { + defer utilruntime.HandleCrash() + defer optr.queue.ShutDown() + + glog.Info("Starting MachineAPIOperator") + defer glog.Info("Shutting down MachineAPIOperator") + + if !cache.WaitForCacheSync(stopCh, + optr.deployListerSynced) { + glog.Error("failed to sync caches") + return + } + glog.Info("Synched up caches") + for i := 0; i < workers; i++ { + go wait.Until(optr.worker, time.Second, stopCh) + } + + <-stopCh +} + +func (optr *Operator) eventHandler() cache.ResourceEventHandler { + workQueueKey := fmt.Sprintf("%s/%s", optr.namespace, optr.name) + return cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { optr.queue.Add(workQueueKey) }, + UpdateFunc: func(old, new interface{}) { optr.queue.Add(workQueueKey) }, + DeleteFunc: func(obj interface{}) { optr.queue.Add(workQueueKey) }, + } +} + +func (optr *Operator) worker() { + for optr.processNextWorkItem() { + } +} + +func (optr *Operator) processNextWorkItem() bool { + key, quit := optr.queue.Get() + if quit { + return false + } + defer optr.queue.Done(key) + + err := optr.syncHandler(key.(string)) + optr.handleErr(err, key) + + return true +} + +func (optr *Operator) handleErr(err error, key interface{}) { + if err == nil { + //TODO: set operator Done. + + optr.queue.Forget(key) + return + } + + //TODO: set operator degraded. + + if optr.queue.NumRequeues(key) < maxRetries { + glog.V(2).Infof("Error syncing operator %v: %v", key, err) + optr.queue.AddRateLimited(key) + return + } + + utilruntime.HandleError(err) + glog.V(2).Infof("Dropping operator %q out of the queue: %v", key, err) + optr.queue.Forget(key) +} + +func (optr *Operator) sync(key string) error { + startTime := time.Now() + glog.V(4).Infof("Started syncing operator %q (%v)", key, startTime) + defer func() { + glog.V(4).Infof("Finished syncing operator %q (%v)", key, time.Since(startTime)) + }() + + if err := optr.syncCustomResourceDefinitions(); err != nil { + return err + } + // TODO(alberto) operatorConfig as CRD? + operatorConfig, err := render.Config(optr.config) + if err != nil { + return err + } + err = optr.syncClusterAPIServer(*operatorConfig) + if err != nil { + glog.Fatalf("Failed sync-up cluster apiserver: %v", err) + return err + } + glog.Info("Synched up cluster api server") + err = optr.syncClusterAPIController(*operatorConfig) + if err != nil { + glog.Fatalf("Failed sync-up cluster api controller: %v", err) + return err + } + glog.Info("Synched up cluster api controller") + return optr.syncAll(*operatorConfig) +} diff --git a/pkg/operator/status.go b/pkg/operator/status.go new file mode 100644 index 0000000000..cfcc1d7fbe --- /dev/null +++ b/pkg/operator/status.go @@ -0,0 +1,61 @@ +package operator + +import ( + "fmt" + + "github.com/openshift/cluster-version-operator/lib/resourceapply" + "github.com/openshift/cluster-version-operator/pkg/apis/operatorstatus.openshift.io/v1" + "github.com/openshift/machine-api-operator/pkg/version" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" +) + +// syncStatus applies the new condition to the mco's OperatorStatus object. +func (optr *Operator) syncStatus(cond v1.OperatorStatusCondition) error { + if cond.Type == v1.OperatorStatusConditionTypeDegraded { + return fmt.Errorf("invalid cond %s", cond.Type) + } + + // TODO(yifan): Fill in the Extention field for the status + // to report the status of all the managed components. + status := &v1.OperatorStatus{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: optr.namespace, + Name: optr.name, + }, + Condition: cond, + Version: version.Raw, + LastUpdate: metav1.Now(), + } + _, _, err := resourceapply.ApplyOperatorStatus(optr.cvoClient.Operatorstatus(), status) + return err +} + +// syncDegradedStatus updates the OperatorStatus to Degraded. +// if ierr is nil, return nil +// if ierr is not nil, update OperatorStatus as Degraded and return ierr +func (optr *Operator) syncDegradedStatus(ierr error) error { + if ierr == nil { + return nil + } + cond := v1.OperatorStatusCondition{ + Type: v1.OperatorStatusConditionTypeDegraded, + Message: fmt.Sprintf("error syncing: %v", ierr), + } + + status := &v1.OperatorStatus{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: optr.namespace, + Name: optr.name, + }, + Condition: cond, + Version: version.Raw, + LastUpdate: metav1.Now(), + Extension: runtime.RawExtension{}, + } + _, _, err := resourceapply.ApplyOperatorStatus(optr.cvoClient.Operatorstatus(), status) + if err != nil { + return err + } + return ierr +} diff --git a/pkg/operator/sync.go b/pkg/operator/sync.go new file mode 100644 index 0000000000..c248aa3a3f --- /dev/null +++ b/pkg/operator/sync.go @@ -0,0 +1,282 @@ +package operator + +import ( + "fmt" + "time" + + "github.com/golang/glog" + "github.com/openshift/cluster-version-operator/pkg/apis/operatorstatus.openshift.io/v1" + appsv1 "k8s.io/api/apps/v1" + apiextv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/apimachinery/pkg/util/wait" + + "github.com/openshift/machine-api-operator/lib/resourceapply" + "github.com/openshift/machine-api-operator/lib/resourceread" + "github.com/openshift/machine-api-operator/pkg/render" +) + +type syncFunc func(config render.OperatorConfig) error + +func (optr *Operator) syncAll(rconfig render.OperatorConfig) error { + // syncFuncs is the list of sync functions that are executed in order. + // any error marks sync as failure but continues to next syncFunc + syncFuncs := []syncFunc{ + // TODO(alberto): implement this once https://github.com/kubernetes-sigs/cluster-api/pull/488/files gets in + //optr.syncMachineClasses, + optr.syncMachineSets, + optr.syncCluster, + } + + //if err := optr.syncStatus(v1.OperatorStatusCondition{ + // Type: v1.OperatorStatusConditionTypeWorking, + // Message: "Running sync functions", + //}); err != nil { + // return fmt.Errorf("error syncing status: %v", err) + //} + + var errs []error + for _, f := range syncFuncs { + errs = append(errs, f(rconfig)) + } + + agg := utilerrors.NewAggregate(errs) + if agg != nil { + errs = append(errs, optr.syncDegradedStatus(agg)) + agg = utilerrors.NewAggregate(errs) + return fmt.Errorf("error syncing: %v", agg.Error()) + } + + return optr.syncStatus(v1.OperatorStatusCondition{ + Type: v1.OperatorStatusConditionTypeDone, + Message: "Done running sync functions", + }) +} + +func (optr *Operator) syncCustomResourceDefinitions() error { + // TODO(alberto): implement this once https://github.com/kubernetes-sigs/cluster-api/pull/494 gets in + //crds := []string{ + // "manifests/machine.crd.yaml", + // "manifests/machineSet.crd.yaml", + // "manifests/cluster.crd.yaml", + //} + // + //for _, crd := range crds { + // crdBytes, err := assets.Asset(crd) + // if err != nil { + // return fmt.Errorf("error getting asset %s: %v", crd, err) + // } + // c := resourceread.ReadCustomResourceDefinitionV1Beta1OrDie(crdBytes) + // _, updated, err := resourceapply.ApplyCustomResourceDefinition(optr.apiExtClient.ApiextensionsV1beta1(), c) + // if err != nil { + // return err + // } + // if updated { + // if err := optr.waitForCustomResourceDefinition(c); err != nil { + // return err + // } + // } + //} + + return nil +} + +func (optr *Operator) syncMachineSets(config render.OperatorConfig) error { + var machineSets []string + switch provider := config.Provider; provider { + case providerAWS: + machineSets = []string{ + "machines/aws/worker.machineset.yaml", + } + case providerLibvirt: + machineSets = []string{ + "machines/libvirt/worker.machineset.yaml", + } + } + for _, machineSet := range machineSets { + machineSetBytes, err := render.PopulateTemplate(&config, machineSet) + if err != nil { + return err + } + p := resourceread.ReadMachineSetV1alphaOrDie(machineSetBytes) + _, _, err = resourceapply.ApplyMachineSet(optr.clusterAPIClient, p) + if err != nil { + return err + } + } + + return nil +} + +func (optr *Operator) syncCluster(config render.OperatorConfig) error { + var clusters []string + switch provider := config.Provider; provider { + case providerAWS: + clusters = []string{ + "machines/aws/cluster.yaml", + } + case providerLibvirt: + clusters = []string{ + "machines/libvirt/cluster.yaml", + } + } + for _, cluster := range clusters { + clusterBytes, err := render.PopulateTemplate(&config, cluster) + if err != nil { + return err + } + p := resourceread.ReadClusterV1alphaOrDie(clusterBytes) + _, _, err = resourceapply.ApplyCluster(optr.clusterAPIClient, p) + if err != nil { + return err + } + } + + return nil +} + +func (optr *Operator) syncClusterAPIServer(config render.OperatorConfig) error { + crbBytes, err := render.PopulateTemplate(&config, "manifests/clusterapi-apiserver-cluster-role-binding.yaml") + if err != nil { + return err + } + crb := resourceread.ReadClusterRoleBindingV1OrDie(crbBytes) + _, _, err = resourceapply.ApplyClusterRoleBinding(optr.kubeClient.RbacV1(), crb) + if err != nil { + return err + } + + rbBytes, err := render.PopulateTemplate(&config, "manifests/clusterapi-apiserver-role-binding.yaml") + if err != nil { + return err + } + rb := resourceread.ReadRoleBindingV1OrDie(rbBytes) + _, _, err = resourceapply.ApplyRoleBinding(optr.kubeClient.RbacV1(), rb) + if err != nil { + return err + } + + svcBytes, err := render.PopulateTemplate(&config, "manifests/clusterapi-apiserver-svc.yaml") + if err != nil { + return err + } + svc := resourceread.ReadServiceV1OrDie(svcBytes) + _, _, err = resourceapply.ApplyService(optr.kubeClient.CoreV1(), svc) + if err != nil { + return err + } + + apiServiceBytes, err := render.PopulateTemplate(&config, "manifests/clusterapi-apiservice.yaml") + if err != nil { + return err + } + apiService := resourceread.ReadAPIServiceDefinitionV1Beta1OrDie(apiServiceBytes) + _, _, err = resourceapply.ApplyAPIServiceDefinition(optr.apiregistrationClient, apiService) + if err != nil { + return err + } + + controllerBytes, err := render.PopulateTemplate(&config, "manifests/clusterapi-apiserver.yaml") + if err != nil { + return err + } + controller := resourceread.ReadDeploymentV1OrDie(controllerBytes) + _, updated, err := resourceapply.ApplyDeployment(optr.kubeClient.AppsV1(), controller) + if err != nil { + return err + } + if updated { + return optr.waitForDeploymentRollout(controller) + } + return nil +} + +func (optr *Operator) syncClusterAPIController(config render.OperatorConfig) error { + crBytes, err := render.PopulateTemplate(&config, "manifests/clusterapi-controller-cluster-role.yaml") + if err != nil { + return err + } + cr := resourceread.ReadClusterRoleV1OrDie(crBytes) + _, _, err = resourceapply.ApplyClusterRole(optr.kubeClient.RbacV1(), cr) + if err != nil { + return err + } + crbBytes, err := render.PopulateTemplate(&config, "manifests/clusterapi-controller-cluster-role-binding.yaml") + if err != nil { + return err + } + crb := resourceread.ReadClusterRoleBindingV1OrDie(crbBytes) + _, _, err = resourceapply.ApplyClusterRoleBinding(optr.kubeClient.RbacV1(), crb) + if err != nil { + return err + } + controllerBytes, err := render.PopulateTemplate(&config, "manifests/clusterapi-controller.yaml") + if err != nil { + return err + } + controller := resourceread.ReadDeploymentV1OrDie(controllerBytes) + _, updated, err := resourceapply.ApplyDeployment(optr.kubeClient.AppsV1(), controller) + if err != nil { + return err + } + if updated { + return optr.waitForDeploymentRollout(controller) + } + return nil +} + +const ( + deploymentRolloutPollInterval = time.Second + deploymentRolloutTimeout = 5 * time.Minute + + customResourceReadyInterval = time.Second + customResourceReadyTimeout = 5 * time.Minute +) + +func (optr *Operator) waitForCustomResourceDefinition(resource *apiextv1beta1.CustomResourceDefinition) error { + return wait.Poll(customResourceReadyInterval, customResourceReadyTimeout, func() (bool, error) { + crd, err := optr.crdLister.Get(resource.Name) + if err != nil { + glog.Errorf("error getting CustomResourceDefinition %s: %v", resource.Name, err) + return false, nil + } + + for _, condition := range crd.Status.Conditions { + if condition.Type == apiextv1beta1.Established && condition.Status == apiextv1beta1.ConditionTrue { + return true, nil + } + } + glog.V(4).Infof("CustomResourceDefinition %s is not ready. conditions: %v", crd.Name, crd.Status.Conditions) + return false, nil + }) +} + +func (optr *Operator) waitForDeploymentRollout(resource *appsv1.Deployment) error { + return wait.Poll(deploymentRolloutPollInterval, deploymentRolloutTimeout, func() (bool, error) { + // TODO(vikas): When using deployLister, an issue is happening related to the apiVersion of cluster-api objects. + // This will be debugged later on to find out the root cause. For now, working aound is to use kubeClient.AppsV1 + // d, err := optr.deployLister.Deployments(resource.Namespace).Get(resource.Name) + d, err := optr.kubeClient.AppsV1().Deployments(resource.Namespace).Get(resource.Name, metav1.GetOptions{}) + if apierrors.IsNotFound(err) { + return false, nil + } + if err != nil { + // Do not return error here, as we could be updating the API Server itself, in which case we + // want to continue waiting. + glog.Errorf("error getting Deployment %s during rollout: %v", resource.Name, err) + return false, nil + } + + if d.DeletionTimestamp != nil { + return false, fmt.Errorf("Deployment %s is being deleted", resource.Name) + } + + if d.Generation <= d.Status.ObservedGeneration && d.Status.UpdatedReplicas == d.Status.Replicas && d.Status.UnavailableReplicas == 0 { + return true, nil + } + glog.V(4).Infof("Deployment %s is not ready. status: (replicas: %d, updated: %d, ready: %d, unavailable: %d)", d.Name, d.Status.Replicas, d.Status.UpdatedReplicas, d.Status.ReadyReplicas, d.Status.UnavailableReplicas) + return false, nil + }) +}