Skip to content

Commit

Permalink
Driver Controller: Scaffolding for concurrent reconcile
Browse files Browse the repository at this point in the history
Signed-off-by: nb-ohad <mitrani.ohad@gmail.com>
  • Loading branch information
nb-ohad committed Jul 6, 2024
1 parent ea1d401 commit 4753adc
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 0 deletions.
35 changes: 35 additions & 0 deletions internal/controller/driver_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package controller

import (
"context"
"errors"
"fmt"
"maps"
"reflect"
Expand All @@ -37,6 +38,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/reconcile"

csiv1a1 "github.com/ceph/ceph-csi-operator/api/v1alpha1"
"github.com/ceph/ceph-csi-operator/utils"
)

//+kubebuilder:rbac:groups=csi.ceph.io,resources=drivers,verbs=get;list;watch;create;update;patch;delete
Expand Down Expand Up @@ -139,6 +141,23 @@ func (r *driverReconcile) reconcile() (ctrl.Result, error) {
return ctrl.Result{}, err
}

// Concurrently reconcile different aspects of the clusters actual state to meet
// the desired state defined on the driver object
errChan := utils.RunConcurrently(
r.upsertPluginDeamonSet,
r.upsertProvisionerDeployment,
r.upsertK8sCSIDriver,
r.upsertLivnessService,
)

// Check if any reconcilatin error where raised during the concurrent execution
// of the reconciliation steps.
errList := utils.ChannelToSlice(errChan)
if err := errors.Join(errList...); err != nil {
r.log.Error(err, "Reconciliation failed")
return ctrl.Result{}, err
}

return ctrl.Result{}, nil
}

Expand Down Expand Up @@ -188,6 +207,22 @@ func (r *driverReconcile) LoadAndValidateDesiredState() error {
return nil
}

func (r *driverReconcile) upsertPluginDeamonSet() error {
return nil
}

func (r *driverReconcile) upsertProvisionerDeployment() error {
return nil
}

func (r *driverReconcile) upsertK8sCSIDriver() error {
return nil
}

func (r *driverReconcile) upsertLivnessService() error {
return nil
}

// mergeDriverSpecs will fill in any unset fields in dest with a copy of the same field in src
func mergeDriverSpecs(dest, src *csiv1a1.DriverSpec) {
// Create a copy of the src, making sure that any value copied into dest is a not shared
Expand Down
53 changes: 53 additions & 0 deletions utils/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
Copyright 2024.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package utils

import (
"sync"
)

func RunConcurrently(fnList ...func() error) chan error {
errors := make(chan error)
wg := sync.WaitGroup{}

// Run all the functions concurrently
for _, fn := range fnList {
fn := fn
wg.Add(1)
go func() {
defer wg.Done()
errors <- fn()
}()
}

// Close the output channel whenever all of the functions completed
go func() {
wg.Wait()
close(errors)
}()

// Read from the channel and aggregate into a slice
return errors
}

func ChannelToSlice[T any](c chan T) []T {
list := []T{}
for value := range c {
list = append(list, value)
}
return list
}

0 comments on commit 4753adc

Please sign in to comment.