From 4753adca9f925c31dce26c7617abb0f4800d780c Mon Sep 17 00:00:00 2001 From: nb-ohad Date: Wed, 26 Jun 2024 14:59:34 +0300 Subject: [PATCH] Driver Controller: Scaffolding for concurrent reconcile Signed-off-by: nb-ohad --- internal/controller/driver_controller.go | 35 ++++++++++++++++ utils/utils.go | 53 ++++++++++++++++++++++++ 2 files changed, 88 insertions(+) create mode 100644 utils/utils.go diff --git a/internal/controller/driver_controller.go b/internal/controller/driver_controller.go index ff717a45..a7aeef57 100644 --- a/internal/controller/driver_controller.go +++ b/internal/controller/driver_controller.go @@ -18,6 +18,7 @@ package controller import ( "context" + "errors" "fmt" "maps" "reflect" @@ -37,6 +38,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/reconcile" csiv1a1 "github.com/ceph/ceph-csi-operator/api/v1alpha1" + "github.com/ceph/ceph-csi-operator/utils" ) //+kubebuilder:rbac:groups=csi.ceph.io,resources=drivers,verbs=get;list;watch;create;update;patch;delete @@ -139,6 +141,23 @@ func (r *driverReconcile) reconcile() (ctrl.Result, error) { return ctrl.Result{}, err } + // Concurrently reconcile different aspects of the clusters actual state to meet + // the desired state defined on the driver object + errChan := utils.RunConcurrently( + r.upsertPluginDeamonSet, + r.upsertProvisionerDeployment, + r.upsertK8sCSIDriver, + r.upsertLivnessService, + ) + + // Check if any reconcilatin error where raised during the concurrent execution + // of the reconciliation steps. + errList := utils.ChannelToSlice(errChan) + if err := errors.Join(errList...); err != nil { + r.log.Error(err, "Reconciliation failed") + return ctrl.Result{}, err + } + return ctrl.Result{}, nil } @@ -188,6 +207,22 @@ func (r *driverReconcile) LoadAndValidateDesiredState() error { return nil } +func (r *driverReconcile) upsertPluginDeamonSet() error { + return nil +} + +func (r *driverReconcile) upsertProvisionerDeployment() error { + return nil +} + +func (r *driverReconcile) upsertK8sCSIDriver() error { + return nil +} + +func (r *driverReconcile) upsertLivnessService() error { + return nil +} + // mergeDriverSpecs will fill in any unset fields in dest with a copy of the same field in src func mergeDriverSpecs(dest, src *csiv1a1.DriverSpec) { // Create a copy of the src, making sure that any value copied into dest is a not shared diff --git a/utils/utils.go b/utils/utils.go new file mode 100644 index 00000000..0e635eb2 --- /dev/null +++ b/utils/utils.go @@ -0,0 +1,53 @@ +/* +Copyright 2024. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package utils + +import ( + "sync" +) + +func RunConcurrently(fnList ...func() error) chan error { + errors := make(chan error) + wg := sync.WaitGroup{} + + // Run all the functions concurrently + for _, fn := range fnList { + fn := fn + wg.Add(1) + go func() { + defer wg.Done() + errors <- fn() + }() + } + + // Close the output channel whenever all of the functions completed + go func() { + wg.Wait() + close(errors) + }() + + // Read from the channel and aggregate into a slice + return errors +} + +func ChannelToSlice[T any](c chan T) []T { + list := []T{} + for value := range c { + list = append(list, value) + } + return list +}