From 06bf4e99eb592902575b948c2ceedcff3aaefd30 Mon Sep 17 00:00:00 2001 From: Matthias Bastian Date: Tue, 28 May 2024 23:19:18 +0200 Subject: [PATCH 01/17] Add `locker` package --- internal/util/locker/locker.go | 123 +++++++++++++++++++++++ internal/util/locker/locker_test.go | 146 ++++++++++++++++++++++++++++ 2 files changed, 269 insertions(+) create mode 100644 internal/util/locker/locker.go create mode 100644 internal/util/locker/locker_test.go diff --git a/internal/util/locker/locker.go b/internal/util/locker/locker.go new file mode 100644 index 00000000..6ae9db76 --- /dev/null +++ b/internal/util/locker/locker.go @@ -0,0 +1,123 @@ +/* +Package locker provides a mechanism for fine-grained locking. + +In contrast to a sync.Mutex, the user must provide a key when locking and unlocking. + +If a lock with a given key does not exist when Lock() is called, one is created. +Lock references are automatically cleaned up if nothing is waiting for the lock anymore. + +Locking can be aborted on context cancellation. + +This package is inspired by https://github.com/moby/locker, but uses a buffered channel instead of a sync.Mutex to allow +for context cancellation. +Even though the standard library does not offer to cancel locking a mutex, golang.org/x/sync/semaphore does for its +Acquire() method. +*/ +package locker + +import ( + "context" + "sync" + "sync/atomic" +) + +// Locker offers thread-safe locking of entities represented by keys of type string. +type Locker struct { + mu sync.Mutex // protects the locks map + locks map[string]*lockWithCounter +} + +// New returns a new Locker. +func New() *Locker { + locker := &Locker{ + locks: make(map[string]*lockWithCounter), + } + return locker +} + +// lockWithCounter is used by Locker to represent a lock for a given key. +type lockWithCounter struct { + // ch is a buffered channel of size 1 used as a mutex. + // In contrast to a sync.Mutex, accessing the channel can be aborted upon context cancellation. + ch chan struct{} + // waiters is the number of callers waiting to acquire the lock. + // This is Int32 instead of Uint32, so we can add -1 in dec(). + waiters atomic.Int32 +} + +// inc increments the number of waiters. +func (lwc *lockWithCounter) inc() { + lwc.waiters.Add(1) +} + +// dec decrements the number of waiters. +func (lwc *lockWithCounter) dec() { + lwc.waiters.Add(-1) +} + +// count gets the current number of waiters. +func (lwc *lockWithCounter) count() int32 { + return lwc.waiters.Load() +} + +// Lock locks the given key. Returns after acquiring lock or when given context is done. +func (l *Locker) Lock(ctx context.Context, key string) error { + l.mu.Lock() + + select { + case <-ctx.Done(): + // ctx becoming done has "happened before" acquiring the lock, whether it became done before the call began or + // while we were waiting for the mutex. We prefer to fail even if we could acquire the mutex without blocking. + l.mu.Unlock() + return ctx.Err() + default: + } + + lwc, exists := l.locks[key] + if !exists { + lwc = &lockWithCounter{ + ch: make(chan struct{}, 1), + } + l.locks[key] = lwc + } + // Increment the waiters while inside the mutex to make sure that the lock isn't deleted if Lock() and Unlock() + // are called concurrently. + lwc.inc() + + l.mu.Unlock() + + // Lock the key outside the mutex, so we don't block other operations. + select { + case lwc.ch <- struct{}{}: + // Lock acquired, so we can decrement the number of waiters for this lock. + lwc.dec() + return nil + case <-ctx.Done(): + // Locking aborted, so we can decrement the number of waiters for this lock. + lwc.dec() + // If there are no more waiters, we can delete the lock. + l.mu.Lock() + if lwc.count() == 0 { + delete(l.locks, key) + } + l.mu.Unlock() + + return ctx.Err() + } +} + +// Unlock unlocks the given key. It panics if the key is not locked. +func (l *Locker) Unlock(key string) { + l.mu.Lock() + defer l.mu.Unlock() + lwc, exists := l.locks[key] + if !exists { + panic("no such lock: " + key) + } + <-lwc.ch + + // If there are no more waiters, we can delete the lock. + if lwc.count() == 0 { + delete(l.locks, key) + } +} diff --git a/internal/util/locker/locker_test.go b/internal/util/locker/locker_test.go new file mode 100644 index 00000000..4be76a37 --- /dev/null +++ b/internal/util/locker/locker_test.go @@ -0,0 +1,146 @@ +package locker + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func withTimeout(t *testing.T, f func()) { + t.Helper() + done := make(chan struct{}) + go func() { + f() + close(done) + }() + select { + case <-time.After(1 * time.Second): + t.Fatal("timed out") + case <-done: + } +} + +func TestNew(t *testing.T) { + locker := New() + require.NotNil(t, locker) + require.NotNil(t, locker.locks) +} + +func TestLockWithCounter(t *testing.T) { + lwc := &lockWithCounter{} + require.EqualValues(t, 0, lwc.count()) + + lwc.inc() + require.EqualValues(t, 1, lwc.count()) + + lwc.dec() + require.EqualValues(t, 0, lwc.count()) +} + +func TestLockerLock(t *testing.T) { + l := New() + require.NoError(t, l.Lock(context.Background(), "test")) + lwc := l.locks["test"] + + require.EqualValues(t, 0, lwc.count()) + + chDone := make(chan struct{}) + go func(t *testing.T) { + assert.NoError(t, l.Lock(context.Background(), "test")) + close(chDone) + }(t) + + chWaiting := make(chan struct{}) + go func() { + for range time.Tick(1 * time.Millisecond) { + if lwc.count() == 1 { + close(chWaiting) + break + } + } + }() + + withTimeout(t, func() { + <-chWaiting + }) + + select { + case <-chDone: + t.Fatal("lock should not have returned while it was still held") + default: + } + + l.Unlock("test") + + withTimeout(t, func() { + <-chDone + }) + + require.EqualValues(t, 0, lwc.count()) +} + +func TestLockerUnlock(t *testing.T) { + l := New() + + require.NoError(t, l.Lock(context.Background(), "test")) + l.Unlock("test") + + require.PanicsWithValue(t, "no such lock: test", func() { + l.Unlock("test") + }) + + withTimeout(t, func() { + require.NoError(t, l.Lock(context.Background(), "test")) + }) +} + +func TestLockerConcurrency(t *testing.T) { + l := New() + + var wg sync.WaitGroup + for i := 0; i <= 10_000; i++ { + wg.Add(1) + go func(t *testing.T) { + assert.NoError(t, l.Lock(context.Background(), "test")) + // If there is a concurrency issue, it will very likely become visible here. + l.Unlock("test") + wg.Done() + }(t) + } + + withTimeout(t, wg.Wait) + + // Since everything has unlocked the map should be empty. + require.Empty(t, l.locks) +} + +func TestLockerContextCanceled(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + l := New() + + withTimeout(t, func() { + err := l.Lock(ctx, "test") + require.ErrorIs(t, context.Canceled, err) + }) +} + +func TestLockerContextDeadlineExceeded(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Millisecond) + defer cancel() + + l := New() + require.NoError(t, l.Lock(ctx, "test")) + + withTimeout(t, func() { + err := l.Lock(ctx, "test") + require.ErrorIs(t, context.DeadlineExceeded, err) + }) + + require.Empty(t, l.locks) +} From 8019fc59da4078b0ce6fcf7aefff874239ceac40 Mon Sep 17 00:00:00 2001 From: Matthias Bastian Date: Tue, 28 May 2024 23:21:53 +0200 Subject: [PATCH 02/17] Lock racy IONOS Cloud operations --- internal/service/cloud/ipblock.go | 16 +++++++++++ internal/service/cloud/network.go | 43 ++++++++++++++++++++++++++++ internal/service/cloud/suite_test.go | 2 ++ scope/machine.go | 7 +++++ scope/machine_test.go | 10 +++++++ 5 files changed, 78 insertions(+) diff --git a/internal/service/cloud/ipblock.go b/internal/service/cloud/ipblock.go index 8e86b549..909eb03f 100644 --- a/internal/service/cloud/ipblock.go +++ b/internal/service/cloud/ipblock.go @@ -153,6 +153,12 @@ func (s *Service) ReconcileFailoverIPBlockDeletion(ctx context.Context, ms *scop return false, nil } + lockKey := s.failoverIPBlockLockKey(ms) + if err := ms.Locker.Lock(ctx, lockKey); err != nil { + return false, err + } + defer ms.Locker.Unlock(lockKey) + // Check if the IP block is currently in creation. We need to wait for it to be finished // before we can trigger the deletion. ipBlock, request, err := scopedFindResource( @@ -457,6 +463,16 @@ func (*Service) failoverIPBlockName(ms *scope.Machine) string { ) } +func (*Service) failoverIPBlockLockKey(ms *scope.Machine) string { + // Failover IPs are shared across machines within the same failover group. + // When reserving the corresponding IP block, we must avoid duplicate reservations caused by concurrent machine + // reconciliations. So we lock when performing write operations. + // As the failover group corresponds with the MachineDeployment the machines belong to, we use the MachineDeployment + // namespace and name as part of the key used for locking. That's more fine-grained than using the machine's + // datacenter ID and allows working on distinct failover groups within the same datacenter in parallel. + return "fo-ipb/" + ms.IonosMachine.Namespace + "/" + ms.IonosMachine.Labels[clusterv1.MachineDeploymentNameLabel] +} + func ignoreErrUserSetIPNotFound(err error) error { if errors.Is(err, errUserSetIPNotFound) { return nil diff --git a/internal/service/cloud/network.go b/internal/service/cloud/network.go index e2e25ad7..15f9a1d1 100644 --- a/internal/service/cloud/network.go +++ b/internal/service/cloud/network.go @@ -42,6 +42,13 @@ func (*Service) lanName(c *clusterv1.Cluster) string { c.Name) } +func (*Service) lanLockKey(ms *scope.Machine) string { + // LANs are shared across machines within a datacenter, so need to be locked when performing write operations during + // concurrent machine reconciliations. + // Their datacenter scope fits well to be part of the key used for locking the LAN. + return "dc/" + ms.DatacenterID() + "/lan" +} + func (*Service) lanURL(datacenterID, id string) string { return path.Join("datacenters", datacenterID, "lans", id) } @@ -54,6 +61,12 @@ func (*Service) lansURL(datacenterID string) string { func (s *Service) ReconcileLAN(ctx context.Context, ms *scope.Machine) (requeue bool, err error) { log := s.logger.WithName("ReconcileLAN") + lockKey := s.lanLockKey(ms) + if err := ms.Locker.Lock(ctx, lockKey); err != nil { + return false, err + } + defer ms.Locker.Unlock(lockKey) + lan, request, err := scopedFindResource(ctx, ms, s.getLAN, s.getLatestLANCreationRequest) if err != nil { return false, err @@ -87,6 +100,12 @@ func (s *Service) ReconcileLAN(ctx context.Context, ms *scope.Machine) (requeue func (s *Service) ReconcileLANDeletion(ctx context.Context, ms *scope.Machine) (requeue bool, err error) { log := s.logger.WithName("ReconcileLANDeletion") + lockKey := s.lanLockKey(ms) + if err := ms.Locker.Lock(ctx, lockKey); err != nil { + return false, err + } + defer ms.Locker.Unlock(lockKey) + // Try to retrieve the cluster LAN or even check if it's currently still being created. lan, request, err := scopedFindResource(ctx, ms, s.getLAN, s.getLatestLANCreationRequest) if err != nil { @@ -290,6 +309,12 @@ func (s *Service) retrieveFailoverIPForMachine( // AUTO means we have to reserve an IP address. if failoverIP == infrav1.CloudResourceConfigAuto { + lockKey := s.failoverIPBlockLockKey(ms) + if err := ms.Locker.Lock(ctx, lockKey); err != nil { + return false, "", err + } + defer ms.Locker.Unlock(lockKey) + // Check if the IP block is already reserved. ipBlock, info, err := scopedFindResource( ctx, @@ -378,6 +403,12 @@ func (s *Service) swapNICInFailoverGroup( return false, err } + lockKey := s.lanLockKey(ms) + if err := ms.Locker.Lock(ctx, lockKey); err != nil { + return false, err + } + defer ms.Locker.Unlock(lockKey) + lan, failoverConfig := &sdk.Lan{}, &[]sdk.IPFailover{} if requeue, err := s.retrieveLANFailoverConfig(ctx, ms, lan, failoverConfig); err != nil || requeue { return requeue, err @@ -469,6 +500,12 @@ func (s *Service) reconcileIPFailoverGroup( // Add the NIC to the failover group of the LAN + lockKey := s.lanLockKey(ms) + if err := ms.Locker.Lock(ctx, lockKey); err != nil { + return false, err + } + defer ms.Locker.Unlock(lockKey) + lan, failoverConfig := &sdk.Lan{}, &[]sdk.IPFailover{} if requeue, err := s.retrieveLANFailoverConfig(ctx, ms, lan, failoverConfig); err != nil || requeue { return requeue, err @@ -524,6 +561,12 @@ func (s *Service) removeNICFromFailoverGroup( ) (requeue bool, err error) { log := s.logger.WithName("removeNICFromFailoverGroup") + lockKey := s.lanLockKey(ms) + if err := ms.Locker.Lock(ctx, lockKey); err != nil { + return false, err + } + defer ms.Locker.Unlock(lockKey) + lan, failoverConfig := &sdk.Lan{}, &[]sdk.IPFailover{} if requeue, err := s.retrieveLANFailoverConfig(ctx, ms, lan, failoverConfig); err != nil || requeue { return requeue, err diff --git a/internal/service/cloud/suite_test.go b/internal/service/cloud/suite_test.go index 62f427fb..c83ffc65 100644 --- a/internal/service/cloud/suite_test.go +++ b/internal/service/cloud/suite_test.go @@ -36,6 +36,7 @@ import ( infrav1 "github.com/ionos-cloud/cluster-api-provider-ionoscloud/api/v1alpha1" "github.com/ionos-cloud/cluster-api-provider-ionoscloud/internal/ionoscloud/clienttest" + "github.com/ionos-cloud/cluster-api-provider-ionoscloud/internal/util/locker" "github.com/ionos-cloud/cluster-api-provider-ionoscloud/internal/util/ptr" "github.com/ionos-cloud/cluster-api-provider-ionoscloud/scope" ) @@ -189,6 +190,7 @@ func (s *ServiceTestSuite) SetupTest() { Machine: s.capiMachine, ClusterScope: s.clusterScope, IonosMachine: s.infraMachine, + Locker: locker.New(), }) s.NoError(err, "failed to create machine scope") diff --git a/scope/machine.go b/scope/machine.go index 6c681295..509413ee 100644 --- a/scope/machine.go +++ b/scope/machine.go @@ -31,6 +31,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" infrav1 "github.com/ionos-cloud/cluster-api-provider-ionoscloud/api/v1alpha1" + "github.com/ionos-cloud/cluster-api-provider-ionoscloud/internal/util/locker" "github.com/ionos-cloud/cluster-api-provider-ionoscloud/internal/util/ptr" ) @@ -38,6 +39,7 @@ import ( type Machine struct { client client.Client patchHelper *patch.Helper + Locker *locker.Locker Machine *clusterv1.Machine IonosMachine *infrav1.IonosCloudMachine @@ -51,6 +53,7 @@ type MachineParams struct { Machine *clusterv1.Machine ClusterScope *Cluster IonosMachine *infrav1.IonosCloudMachine + Locker *locker.Locker } // NewMachine creates a new Machine using the provided params. @@ -67,6 +70,9 @@ func NewMachine(params MachineParams) (*Machine, error) { if params.ClusterScope == nil { return nil, errors.New("machine scope params need a IONOS Cloud cluster scope") } + if params.Locker == nil { + return nil, errors.New("machine scope params need a locker") + } helper, err := patch.NewHelper(params.IonosMachine, params.Client) if err != nil { @@ -75,6 +81,7 @@ func NewMachine(params MachineParams) (*Machine, error) { return &Machine{ client: params.Client, patchHelper: helper, + Locker: params.Locker, Machine: params.Machine, ClusterScope: params.ClusterScope, IonosMachine: params.IonosMachine, diff --git a/scope/machine_test.go b/scope/machine_test.go index 68a59af1..c878bd2d 100644 --- a/scope/machine_test.go +++ b/scope/machine_test.go @@ -30,6 +30,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client/fake" infrav1 "github.com/ionos-cloud/cluster-api-provider-ionoscloud/api/v1alpha1" + "github.com/ionos-cloud/cluster-api-provider-ionoscloud/internal/util/locker" "github.com/ionos-cloud/cluster-api-provider-ionoscloud/internal/util/ptr" ) @@ -46,6 +47,7 @@ func exampleParams(t *testing.T) MachineParams { Cluster: &clusterv1.Cluster{}, }, IonosMachine: &infrav1.IonosCloudMachine{}, + Locker: locker.New(), } } @@ -88,6 +90,14 @@ func TestMachineParamsNilClusterScopeShouldFail(t *testing.T) { require.Error(t, err) } +func TestMachineParamsNilLockerShouldFail(t *testing.T) { + params := exampleParams(t) + params.Locker = nil + scope, err := NewMachine(params) + require.Nil(t, scope, "returned machine scope should be nil") + require.Error(t, err) +} + func TestMachineHasFailedFailureMessage(t *testing.T) { scope, err := NewMachine(exampleParams(t)) require.NoError(t, err) From 0042fe891ccaa793ac18927fcbb69f349d52c94f Mon Sep 17 00:00:00 2001 From: Matthias Bastian Date: Tue, 28 May 2024 23:25:01 +0200 Subject: [PATCH 03/17] Make concurrency configurable per controller Also set up locker for machines. --- cmd/main.go | 27 +++++++---- .../ionoscloudcluster_controller.go | 41 +++++++++++++---- .../ionoscloudmachine_controller.go | 46 ++++++++++++------- 3 files changed, 80 insertions(+), 34 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index f91e2ff2..ea705f1f 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -30,10 +30,11 @@ import ( clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" "sigs.k8s.io/cluster-api/util/flags" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/healthz" infrav1 "github.com/ionos-cloud/cluster-api-provider-ionoscloud/api/v1alpha1" - "github.com/ionos-cloud/cluster-api-provider-ionoscloud/internal/controller" + iccontroller "github.com/ionos-cloud/cluster-api-provider-ionoscloud/internal/controller" ) var ( @@ -42,6 +43,9 @@ var ( healthProbeAddr string enableLeaderElection bool diagnosticOptions = flags.DiagnosticsOptions{} + + icClusterConcurrency int + icMachineConcurrency int ) func init() { @@ -86,17 +90,18 @@ func main() { ctx := ctrl.SetupSignalHandler() - if err = (&controller.IonosCloudClusterReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - }).SetupWithManager(ctx, mgr); err != nil { + if err = iccontroller.RegisterIonosCloudClusterReconciler( + ctx, + mgr, + controller.Options{MaxConcurrentReconciles: icClusterConcurrency}, + ); err != nil { setupLog.Error(err, "unable to create controller", "controller", "IonosCloudCluster") os.Exit(1) } - if err = (&controller.IonosCloudMachineReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - }).SetupWithManager(mgr); err != nil { + if err = iccontroller.RegisterIonosCloudMachineReconciler( + mgr, + controller.Options{MaxConcurrentReconciles: icMachineConcurrency}, + ); err != nil { setupLog.Error(err, "unable to create controller", "controller", "IonosCloudMachine") os.Exit(1) } @@ -128,4 +133,8 @@ func initFlags() { pflag.BoolVar(&enableLeaderElection, "leader-elect", false, "Enable leader election for controller manager. "+ "Enabling this will ensure there is only one active controller manager.") + pflag.IntVar(&icClusterConcurrency, "ionoscloud-cluster-concurrency", 1, + "Number of IonosCloudClusters to process simultaneously") + pflag.IntVar(&icMachineConcurrency, "ionoscloud-machine-concurrency", 1, + "Number of IonosCloudMachines to process simultaneously") } diff --git a/internal/controller/ionoscloudcluster_controller.go b/internal/controller/ionoscloudcluster_controller.go index dc59b268..70f0ef29 100644 --- a/internal/controller/ionoscloudcluster_controller.go +++ b/internal/controller/ionoscloudcluster_controller.go @@ -34,6 +34,7 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -43,10 +44,27 @@ import ( "github.com/ionos-cloud/cluster-api-provider-ionoscloud/scope" ) -// IonosCloudClusterReconciler reconciles a IonosCloudCluster object. -type IonosCloudClusterReconciler struct { +// ionosCloudClusterReconciler reconciles a IonosCloudCluster object. +type ionosCloudClusterReconciler struct { client.Client - Scheme *runtime.Scheme + scheme *runtime.Scheme +} + +// RegisterIonosCloudClusterReconciler creates an ionosCloudClusterReconciler and registers it with the manager. +func RegisterIonosCloudClusterReconciler( + ctx context.Context, + mgr ctrl.Manager, + options controller.Options, +) error { + return newIonosCloudClusterReconciler(mgr).setupWithManager(ctx, mgr, options) +} + +func newIonosCloudClusterReconciler(mgr ctrl.Manager) *ionosCloudClusterReconciler { + r := &ionosCloudClusterReconciler{ + Client: mgr.GetClient(), + scheme: mgr.GetScheme(), + } + return r } //+kubebuilder:rbac:groups=infrastructure.cluster.x-k8s.io,resources=ionoscloudclusters,verbs=get;list;watch;create;update;patch;delete @@ -62,7 +80,7 @@ type IonosCloudClusterReconciler struct { // // For more details, check Reconcile and its Result here: // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.16.0/pkg/reconcile -func (r *IonosCloudClusterReconciler) Reconcile( +func (r *ionosCloudClusterReconciler) Reconcile( ctx context.Context, ionosCloudCluster *infrav1.IonosCloudCluster, ) (_ ctrl.Result, retErr error) { @@ -116,7 +134,7 @@ func (r *IonosCloudClusterReconciler) Reconcile( return r.reconcileNormal(ctx, clusterScope, cloudService) } -func (r *IonosCloudClusterReconciler) reconcileNormal( +func (r *ionosCloudClusterReconciler) reconcileNormal( ctx context.Context, clusterScope *scope.Cluster, cloudService *cloud.Service, @@ -153,7 +171,7 @@ func (r *IonosCloudClusterReconciler) reconcileNormal( return ctrl.Result{}, nil } -func (r *IonosCloudClusterReconciler) reconcileDelete( +func (r *ionosCloudClusterReconciler) reconcileDelete( ctx context.Context, clusterScope *scope.Cluster, cloudService *cloud.Service, ) (ctrl.Result, error) { log := ctrl.LoggerFrom(ctx) @@ -202,7 +220,7 @@ func (r *IonosCloudClusterReconciler) reconcileDelete( return ctrl.Result{}, nil } -func (*IonosCloudClusterReconciler) checkRequestStatus( +func (*ionosCloudClusterReconciler) checkRequestStatus( ctx context.Context, clusterScope *scope.Cluster, cloudService *cloud.Service, ) (requeue bool, retErr error) { log := ctrl.LoggerFrom(ctx) @@ -223,9 +241,14 @@ func (*IonosCloudClusterReconciler) checkRequestStatus( return requeue, retErr } -// SetupWithManager sets up the controller with the Manager. -func (r *IonosCloudClusterReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error { +// setupWithManager sets up the controller with the Manager. +func (r *ionosCloudClusterReconciler) setupWithManager( + ctx context.Context, + mgr ctrl.Manager, + options controller.Options, +) error { return ctrl.NewControllerManagedBy(mgr). + WithOptions(options). For(&infrav1.IonosCloudCluster{}). WithEventFilter(predicates.ResourceNotPaused(ctrl.LoggerFrom(ctx))). Watches(&clusterv1.Cluster{}, diff --git a/internal/controller/ionoscloudmachine_controller.go b/internal/controller/ionoscloudmachine_controller.go index 1b02264f..7ef7d7ca 100644 --- a/internal/controller/ionoscloudmachine_controller.go +++ b/internal/controller/ionoscloudmachine_controller.go @@ -29,19 +29,36 @@ import ( "sigs.k8s.io/cluster-api/util/conditions" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/reconcile" infrav1 "github.com/ionos-cloud/cluster-api-provider-ionoscloud/api/v1alpha1" "github.com/ionos-cloud/cluster-api-provider-ionoscloud/internal/service/cloud" + "github.com/ionos-cloud/cluster-api-provider-ionoscloud/internal/util/locker" "github.com/ionos-cloud/cluster-api-provider-ionoscloud/scope" ) -// IonosCloudMachineReconciler reconciles a IonosCloudMachine object. -type IonosCloudMachineReconciler struct { +// ionosCloudMachineReconciler reconciles a IonosCloudMachine object. +type ionosCloudMachineReconciler struct { client.Client - Scheme *runtime.Scheme + scheme *runtime.Scheme + locker *locker.Locker +} + +// RegisterIonosCloudMachineReconciler creates an ionosCloudMachineReconciler and registers it with the manager. +func RegisterIonosCloudMachineReconciler(mgr ctrl.Manager, options controller.Options) error { + return newIonosCloudMachineReconciler(mgr).setupWithManager(mgr, options) +} + +func newIonosCloudMachineReconciler(mgr ctrl.Manager) *ionosCloudMachineReconciler { + r := &ionosCloudMachineReconciler{ + Client: mgr.GetClient(), + scheme: mgr.GetScheme(), + locker: locker.New(), + } + return r } //+kubebuilder:rbac:groups=infrastructure.cluster.x-k8s.io,resources=ionoscloudmachines,verbs=get;list;watch;create;update;patch;delete @@ -52,7 +69,7 @@ type IonosCloudMachineReconciler struct { //+kubebuilder:rbac:groups="",resources=secrets,verbs=get;list;watch;update //+kubebuilder:rbac:groups="",resources=events,verbs=get;list;watch;create;update;patch -func (r *IonosCloudMachineReconciler) Reconcile( +func (r *ionosCloudMachineReconciler) Reconcile( ctx context.Context, ionosCloudMachine *infrav1.IonosCloudMachine, ) (_ ctrl.Result, retErr error) { @@ -95,6 +112,7 @@ func (r *IonosCloudMachineReconciler) Reconcile( Machine: machine, ClusterScope: clusterScope, IonosMachine: ionosCloudMachine, + Locker: r.locker, }) if err != nil { return ctrl.Result{}, fmt.Errorf("failed to create scope: %w", err) @@ -127,7 +145,7 @@ func (r *IonosCloudMachineReconciler) Reconcile( return r.reconcileNormal(ctx, cloudService, machineScope) } -func (r *IonosCloudMachineReconciler) reconcileNormal( +func (r *ionosCloudMachineReconciler) reconcileNormal( ctx context.Context, cloudService *cloud.Service, machineScope *scope.Machine, ) (ctrl.Result, error) { log := ctrl.LoggerFrom(ctx) @@ -164,7 +182,6 @@ func (r *IonosCloudMachineReconciler) reconcileNormal( return ctrl.Result{RequeueAfter: defaultReconcileDuration}, nil } - // TODO(piepmatz): This is not thread-safe, but needs to be. Add locking. reconcileSequence := []serviceReconcileStep[scope.Machine]{ {"ReconcileLAN", cloudService.ReconcileLAN}, {"ReconcileServer", cloudService.ReconcileServer}, @@ -185,15 +202,11 @@ func (r *IonosCloudMachineReconciler) reconcileNormal( return ctrl.Result{}, nil } -func (r *IonosCloudMachineReconciler) reconcileDelete( +func (r *ionosCloudMachineReconciler) reconcileDelete( ctx context.Context, machineScope *scope.Machine, cloudService *cloud.Service, ) (ctrl.Result, error) { log := ctrl.LoggerFrom(ctx) - // TODO(piepmatz): This is not thread-safe, but needs to be. Add locking. - // Moreover, should only be attempted if it's the last machine using that LAN. We should check that our machines - // at least, but need to accept that users added their own infrastructure into our LAN (in that case a LAN deletion - // attempt will be denied with HTTP 422). requeue, err := r.checkRequestStates(ctx, machineScope, cloudService) if err != nil { // In case the request state cannot be determined, we want to continue with the @@ -240,7 +253,7 @@ func (r *IonosCloudMachineReconciler) reconcileDelete( // - Queued, Running => Requeue the current request // - Failed => Log the error and continue also apply the same logic as in Done. // - Done => Clear request from the status and continue reconciliation. -func (*IonosCloudMachineReconciler) checkRequestStates( +func (*ionosCloudMachineReconciler) checkRequestStates( ctx context.Context, machineScope *scope.Machine, cloudService *cloud.Service, @@ -284,7 +297,7 @@ func (*IonosCloudMachineReconciler) checkRequestStates( return requeue, retErr } -func (*IonosCloudMachineReconciler) isInfrastructureReady(ctx context.Context, ms *scope.Machine) bool { +func (*ionosCloudMachineReconciler) isInfrastructureReady(ctx context.Context, ms *scope.Machine) bool { log := ctrl.LoggerFrom(ctx) // Make sure the infrastructure is ready. if !ms.ClusterScope.Cluster.Status.InfrastructureReady { @@ -314,9 +327,10 @@ func (*IonosCloudMachineReconciler) isInfrastructureReady(ctx context.Context, m return true } -// SetupWithManager sets up the controller with the Manager. -func (r *IonosCloudMachineReconciler) SetupWithManager(mgr ctrl.Manager) error { +// setupWithManager sets up the controller with the Manager. +func (r *ionosCloudMachineReconciler) setupWithManager(mgr ctrl.Manager, options controller.Options) error { return ctrl.NewControllerManagedBy(mgr). + WithOptions(options). For(&infrav1.IonosCloudMachine{}). Watches( &clusterv1.Machine{}, @@ -325,7 +339,7 @@ func (r *IonosCloudMachineReconciler) SetupWithManager(mgr ctrl.Manager) error { Complete(reconcile.AsReconciler[*infrav1.IonosCloudMachine](r.Client, r)) } -func (r *IonosCloudMachineReconciler) getClusterScope( +func (r *ionosCloudMachineReconciler) getClusterScope( ctx context.Context, cluster *clusterv1.Cluster, ionosCloudMachine *infrav1.IonosCloudMachine, ) (*scope.Cluster, error) { var clusterScope *scope.Cluster From 0af71194a3a90056db87ceb7f587aa21aa0d6996 Mon Sep 17 00:00:00 2001 From: Matthias Bastian Date: Wed, 29 May 2024 10:26:31 +0200 Subject: [PATCH 04/17] Protect CurrentRequestByDatacenter with RWMutex --- api/v1alpha1/ionoscloudcluster_types.go | 20 +++++++++++++ api/v1alpha1/ionoscloudcluster_types_test.go | 10 ++++++- .../ionoscloudmachine_controller.go | 4 +-- internal/service/cloud/network_test.go | 29 ++++++++----------- 4 files changed, 43 insertions(+), 20 deletions(-) diff --git a/api/v1alpha1/ionoscloudcluster_types.go b/api/v1alpha1/ionoscloudcluster_types.go index 5e1b50d4..7d411f10 100644 --- a/api/v1alpha1/ionoscloudcluster_types.go +++ b/api/v1alpha1/ionoscloudcluster_types.go @@ -17,6 +17,8 @@ limitations under the License. package v1alpha1 import ( + "sync" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" @@ -34,6 +36,10 @@ const ( IonosCloudClusterKind = "IonosCloudCluster" ) +// currentRequestByDatacenterMutex is used to synchronize access to the +// IonosCloudCluster.Status.CurrentRequestByDatacenter map. +var currentRequestByDatacenterMutex sync.RWMutex + // IonosCloudClusterSpec defines the desired state of IonosCloudCluster. type IonosCloudClusterSpec struct { // ControlPlaneEndpoint represents the endpoint used to communicate with the control plane. @@ -66,6 +72,7 @@ type IonosCloudClusterStatus struct { Conditions clusterv1.Conditions `json:"conditions,omitempty"` // CurrentRequestByDatacenter maps data center IDs to a pending provisioning request made during reconciliation. + // Use the provided getters and setters to access and modify this map to ensure thread safety. //+optional CurrentRequestByDatacenter map[string]ProvisioningRequest `json:"currentRequest,omitempty"` @@ -117,9 +124,20 @@ func (i *IonosCloudCluster) SetConditions(conditions clusterv1.Conditions) { i.Status.Conditions = conditions } +// GetCurrentRequestByDatacenter returns the current provisioning request for the given data center and a boolean +// indicating if it exists. +func (i *IonosCloudCluster) GetCurrentRequestByDatacenter(datacenterID string) (ProvisioningRequest, bool) { + currentRequestByDatacenterMutex.RLock() + defer currentRequestByDatacenterMutex.RUnlock() + req, ok := i.Status.CurrentRequestByDatacenter[datacenterID] + return req, ok +} + // SetCurrentRequestByDatacenter sets the current provisioning request for the given data center. // This function makes sure that the map is initialized before setting the request. func (i *IonosCloudCluster) SetCurrentRequestByDatacenter(datacenterID, method, status, requestPath string) { + currentRequestByDatacenterMutex.Lock() + defer currentRequestByDatacenterMutex.Unlock() if i.Status.CurrentRequestByDatacenter == nil { i.Status.CurrentRequestByDatacenter = map[string]ProvisioningRequest{} } @@ -132,6 +150,8 @@ func (i *IonosCloudCluster) SetCurrentRequestByDatacenter(datacenterID, method, // DeleteCurrentRequestByDatacenter deletes the current provisioning request for the given data center. func (i *IonosCloudCluster) DeleteCurrentRequestByDatacenter(datacenterID string) { + currentRequestByDatacenterMutex.Lock() + defer currentRequestByDatacenterMutex.Unlock() delete(i.Status.CurrentRequestByDatacenter, datacenterID) } diff --git a/api/v1alpha1/ionoscloudcluster_types_test.go b/api/v1alpha1/ionoscloudcluster_types_test.go index 43072ed4..4fce42b2 100644 --- a/api/v1alpha1/ionoscloudcluster_types_test.go +++ b/api/v1alpha1/ionoscloudcluster_types_test.go @@ -138,7 +138,9 @@ var _ = Describe("IonosCloudCluster", func() { fetched := &IonosCloudCluster{} Expect(k8sClient.Get(context.Background(), key, fetched)).To(Succeed()) Expect(fetched.Status.Ready).To(BeFalse()) + currentRequestByDatacenterMutex.RLock() Expect(fetched.Status.CurrentRequestByDatacenter).To(BeEmpty()) + currentRequestByDatacenterMutex.RUnlock() Expect(fetched.Status.Conditions).To(BeEmpty()) By("retrieving the cluster and setting the status") @@ -157,8 +159,12 @@ var _ = Describe("IonosCloudCluster", func() { Expect(k8sClient.Get(context.Background(), key, fetched)).To(Succeed()) Expect(fetched.Status.Ready).To(BeTrue()) + currentRequestByDatacenterMutex.RLock() Expect(fetched.Status.CurrentRequestByDatacenter).To(HaveLen(1)) - Expect(fetched.Status.CurrentRequestByDatacenter["123"]).To(Equal(wantProvisionRequest)) + currentRequestByDatacenterMutex.RUnlock() + gotProvisionRequest, exists := fetched.GetCurrentRequestByDatacenter("123") + Expect(exists).To(BeTrue()) + Expect(gotProvisionRequest).To(Equal(wantProvisionRequest)) Expect(fetched.Status.Conditions).To(HaveLen(1)) Expect(conditions.IsTrue(fetched, clusterv1.ReadyCondition)).To(BeTrue()) @@ -167,7 +173,9 @@ var _ = Describe("IonosCloudCluster", func() { Expect(k8sClient.Status().Update(context.Background(), fetched)).To(Succeed()) Expect(k8sClient.Get(context.Background(), key, fetched)).To(Succeed()) + currentRequestByDatacenterMutex.RLock() Expect(fetched.Status.CurrentRequestByDatacenter).To(BeEmpty()) + currentRequestByDatacenterMutex.RUnlock() }) }) }) diff --git a/internal/controller/ionoscloudmachine_controller.go b/internal/controller/ionoscloudmachine_controller.go index 7ef7d7ca..28ad220b 100644 --- a/internal/controller/ionoscloudmachine_controller.go +++ b/internal/controller/ionoscloudmachine_controller.go @@ -259,9 +259,9 @@ func (*ionosCloudMachineReconciler) checkRequestStates( cloudService *cloud.Service, ) (requeue bool, retErr error) { log := ctrl.LoggerFrom(ctx) - // check cluster wide request + // check cluster-wide request ionosCluster := machineScope.ClusterScope.IonosCluster - if req, exists := ionosCluster.Status.CurrentRequestByDatacenter[machineScope.DatacenterID()]; exists { + if req, exists := ionosCluster.GetCurrentRequestByDatacenter(machineScope.DatacenterID()); exists { status, message, err := cloudService.GetRequestStatus(ctx, req.RequestPath) if err != nil { retErr = fmt.Errorf("could not get request status: %w", err) diff --git a/internal/service/cloud/network_test.go b/internal/service/cloud/network_test.go index 1fe52035..7e1a54e0 100644 --- a/internal/service/cloud/network_test.go +++ b/internal/service/cloud/network_test.go @@ -62,9 +62,8 @@ func (s *lanSuite) TestLANURLs() { func (s *lanSuite) TestNetworkCreateLANSuccessful() { s.mockCreateLANCall().Return(exampleRequestPath, nil).Once() s.NoError(s.service.createLAN(s.ctx, s.machineScope)) - s.Contains(s.infraCluster.Status.CurrentRequestByDatacenter, s.machineScope.DatacenterID(), - "request should be stored in status") - req := s.infraCluster.Status.CurrentRequestByDatacenter[s.machineScope.DatacenterID()] + req, exists := s.infraCluster.GetCurrentRequestByDatacenter(s.machineScope.DatacenterID()) + s.True(exists, "request should be stored in status") s.Equal(exampleRequestPath, req.RequestPath, "request path should be stored in status") s.Equal(http.MethodPost, req.Method, "request method should be stored in status") s.Equal(sdk.RequestStatusQueued, req.State, "request status should be stored in status") @@ -73,9 +72,8 @@ func (s *lanSuite) TestNetworkCreateLANSuccessful() { func (s *lanSuite) TestNetworkDeleteLANSuccessful() { s.mockDeleteLANCall(exampleLANID).Return(exampleRequestPath, nil).Once() s.NoError(s.service.deleteLAN(s.ctx, s.machineScope, exampleLANID)) - s.Contains(s.infraCluster.Status.CurrentRequestByDatacenter, s.machineScope.DatacenterID(), - "request should be stored in status") - req := s.infraCluster.Status.CurrentRequestByDatacenter[s.machineScope.DatacenterID()] + req, exists := s.infraCluster.GetCurrentRequestByDatacenter(s.machineScope.DatacenterID()) + s.True(exists, "request should be stored in status") s.Equal(exampleRequestPath, req.RequestPath, "request path should be stored in status") s.Equal(http.MethodDelete, req.Method, "request method should be stored in status") s.Equal(sdk.RequestStatusQueued, req.State, "request status should be stored in status") @@ -105,16 +103,11 @@ func (s *lanSuite) TestNetworkGetLANErrorNotUnique() { } func (s *lanSuite) TestNetworkRemoveLANPendingRequestFromClusterSuccessful() { - s.infraCluster.Status.CurrentRequestByDatacenter = map[string]infrav1.ProvisioningRequest{ - s.machineScope.DatacenterID(): { - RequestPath: exampleRequestPath, - Method: http.MethodDelete, - State: sdk.RequestStatusQueued, - }, - } + s.infraCluster.SetCurrentRequestByDatacenter(s.machineScope.DatacenterID(), http.MethodDelete, sdk.RequestStatusQueued, + exampleRequestPath) s.NoError(s.service.removeLANPendingRequestFromCluster(s.machineScope)) - s.NotContains(s.infraCluster.Status.CurrentRequestByDatacenter, - s.machineScope.DatacenterID(), "request should be removed from status") + _, exists := s.infraCluster.GetCurrentRequestByDatacenter(s.machineScope.DatacenterID()) + s.False(exists, "request should be removed from status") } func (s *lanSuite) TestNetworkRemoveLANPendingRequestFromClusterNoRequest() { @@ -171,7 +164,8 @@ func (s *lanSuite) TestNetworkReconcileLANDeleteLANExistsNoPendingRequestsHasOth requeue, err := s.service.ReconcileLANDeletion(s.ctx, s.machineScope) s.NoError(err) s.False(requeue) - s.NotContains(s.infraCluster.Status.CurrentRequestByDatacenter, s.machineScope.DatacenterID()) + _, exists := s.infraCluster.GetCurrentRequestByDatacenter(s.machineScope.DatacenterID()) + s.False(exists) } func (s *lanSuite) TestNetworkReconcileLANDeleteNoExistingLANExistingRequestPending() { @@ -197,7 +191,8 @@ func (s *lanSuite) TestNetworkReconcileLANDeleteLANDoesNotExist() { requeue, err := s.service.ReconcileLANDeletion(s.ctx, s.machineScope) s.NoError(err) s.False(requeue) - s.NotContains(s.infraCluster.Status.CurrentRequestByDatacenter, s.machineScope.DatacenterID()) + _, exists := s.infraCluster.GetCurrentRequestByDatacenter(s.machineScope.DatacenterID()) + s.False(exists) } func (s *lanSuite) TestReconcileIPFailoverNICNotInFailoverGroup() { From 78d8a70a18d8b4eefca20305014726ea9840c199 Mon Sep 17 00:00:00 2001 From: Matthias Bastian Date: Wed, 29 May 2024 12:00:25 +0200 Subject: [PATCH 05/17] Add license headers --- internal/util/locker/locker.go | 16 ++++++++++++++++ internal/util/locker/locker_test.go | 16 ++++++++++++++++ 2 files changed, 32 insertions(+) diff --git a/internal/util/locker/locker.go b/internal/util/locker/locker.go index 6ae9db76..a96d1a29 100644 --- a/internal/util/locker/locker.go +++ b/internal/util/locker/locker.go @@ -1,3 +1,19 @@ +/* +Copyright 2024 IONOS Cloud. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + /* Package locker provides a mechanism for fine-grained locking. diff --git a/internal/util/locker/locker_test.go b/internal/util/locker/locker_test.go index 4be76a37..7bfa2fdf 100644 --- a/internal/util/locker/locker_test.go +++ b/internal/util/locker/locker_test.go @@ -1,3 +1,19 @@ +/* +Copyright 2024 IONOS Cloud. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package locker import ( From 7594997cbcc3f0699663d49b7a045f86bf4c33e7 Mon Sep 17 00:00:00 2001 From: Matthias Bastian Date: Wed, 29 May 2024 12:01:42 +0200 Subject: [PATCH 06/17] Regenerate manifests --- .../infrastructure.cluster.x-k8s.io_ionoscloudclusters.yaml | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/config/crd/bases/infrastructure.cluster.x-k8s.io_ionoscloudclusters.yaml b/config/crd/bases/infrastructure.cluster.x-k8s.io_ionoscloudclusters.yaml index ede97de7..49ccca26 100644 --- a/config/crd/bases/infrastructure.cluster.x-k8s.io_ionoscloudclusters.yaml +++ b/config/crd/bases/infrastructure.cluster.x-k8s.io_ionoscloudclusters.yaml @@ -209,8 +209,9 @@ spec: - method - requestPath type: object - description: CurrentRequestByDatacenter maps data center IDs to a - pending provisioning request made during reconciliation. + description: |- + CurrentRequestByDatacenter maps data center IDs to a pending provisioning request made during reconciliation. + Use the provided getters and setters to access and modify this map to ensure thread safety. type: object ready: description: Ready indicates that the cluster is ready. From 035d1c2c1a5de421646b7c18b8022149b0462f7f Mon Sep 17 00:00:00 2001 From: Matthias Bastian Date: Wed, 29 May 2024 15:47:09 +0200 Subject: [PATCH 07/17] Rename new flags --- cmd/main.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index ea705f1f..623cedeb 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -133,8 +133,8 @@ func initFlags() { pflag.BoolVar(&enableLeaderElection, "leader-elect", false, "Enable leader election for controller manager. "+ "Enabling this will ensure there is only one active controller manager.") - pflag.IntVar(&icClusterConcurrency, "ionoscloud-cluster-concurrency", 1, + pflag.IntVar(&icClusterConcurrency, "ionoscloudcluster-concurrency", 1, "Number of IonosCloudClusters to process simultaneously") - pflag.IntVar(&icMachineConcurrency, "ionoscloud-machine-concurrency", 1, + pflag.IntVar(&icMachineConcurrency, "ionoscloudmachine-concurrency", 1, "Number of IonosCloudMachines to process simultaneously") } From 84a4230486461580e7017d4693adca0501bbf37b Mon Sep 17 00:00:00 2001 From: Matthias Bastian Date: Wed, 29 May 2024 15:58:19 +0200 Subject: [PATCH 08/17] Check initial context cancellation w/o select --- internal/util/locker/locker.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/internal/util/locker/locker.go b/internal/util/locker/locker.go index a96d1a29..5a8dff1c 100644 --- a/internal/util/locker/locker.go +++ b/internal/util/locker/locker.go @@ -80,13 +80,11 @@ func (lwc *lockWithCounter) count() int32 { func (l *Locker) Lock(ctx context.Context, key string) error { l.mu.Lock() - select { - case <-ctx.Done(): + if err := ctx.Err(); err != nil { // ctx becoming done has "happened before" acquiring the lock, whether it became done before the call began or // while we were waiting for the mutex. We prefer to fail even if we could acquire the mutex without blocking. l.mu.Unlock() - return ctx.Err() - default: + return err } lwc, exists := l.locks[key] From 62ac69c0bd5af6c2f6f0eca0ada695e490940c11 Mon Sep 17 00:00:00 2001 From: Matthias Bastian Date: Wed, 29 May 2024 16:07:15 +0200 Subject: [PATCH 09/17] Add cluster UID to `lanLockKey()` --- internal/service/cloud/network.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/internal/service/cloud/network.go b/internal/service/cloud/network.go index 15f9a1d1..a8625f6c 100644 --- a/internal/service/cloud/network.go +++ b/internal/service/cloud/network.go @@ -46,7 +46,9 @@ func (*Service) lanLockKey(ms *scope.Machine) string { // LANs are shared across machines within a datacenter, so need to be locked when performing write operations during // concurrent machine reconciliations. // Their datacenter scope fits well to be part of the key used for locking the LAN. - return "dc/" + ms.DatacenterID() + "/lan" + // To not interfere with other clusters having resources in the same datacenter, the cluster is also part of the + // key. + return "cluster/" + string(ms.ClusterScope.Cluster.UID) + "/dc/" + ms.DatacenterID() + "/lan" } func (*Service) lanURL(datacenterID, id string) string { From 5ba44d667171883809c14a5ca581ac807258767c Mon Sep 17 00:00:00 2001 From: Matthias Bastian Date: Wed, 29 May 2024 17:16:28 +0200 Subject: [PATCH 10/17] Lock `CurrentRequestByDatacenter` in cluster scope Keeps the `api` package clean. To scope the lock to a specific ICC, we make use of the keyed locks here again, even though we don't need their ability to react on canceled contexts. We therefore pass `context.Background` and ignore the error, as none can occur. In contrast to the previous approach, we lose the differentiation between read-only and read-write locks, but even for writing the locks are very short-lived, so it doesn't matter. --- api/v1alpha1/ionoscloudcluster_types.go | 38 ------------- api/v1alpha1/ionoscloudcluster_types_test.go | 15 ++---- .../ionoscloudcluster_controller.go | 3 ++ .../ionoscloudmachine_controller.go | 5 +- internal/service/cloud/network.go | 6 +-- internal/service/cloud/network_test.go | 12 ++--- internal/service/cloud/suite_test.go | 1 + internal/util/locker/locker.go | 1 + scope/cluster.go | 46 ++++++++++++++++ scope/cluster_test.go | 53 +++++++++++++------ 10 files changed, 102 insertions(+), 78 deletions(-) diff --git a/api/v1alpha1/ionoscloudcluster_types.go b/api/v1alpha1/ionoscloudcluster_types.go index 7d411f10..08183ffa 100644 --- a/api/v1alpha1/ionoscloudcluster_types.go +++ b/api/v1alpha1/ionoscloudcluster_types.go @@ -17,8 +17,6 @@ limitations under the License. package v1alpha1 import ( - "sync" - corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" @@ -36,10 +34,6 @@ const ( IonosCloudClusterKind = "IonosCloudCluster" ) -// currentRequestByDatacenterMutex is used to synchronize access to the -// IonosCloudCluster.Status.CurrentRequestByDatacenter map. -var currentRequestByDatacenterMutex sync.RWMutex - // IonosCloudClusterSpec defines the desired state of IonosCloudCluster. type IonosCloudClusterSpec struct { // ControlPlaneEndpoint represents the endpoint used to communicate with the control plane. @@ -72,7 +66,6 @@ type IonosCloudClusterStatus struct { Conditions clusterv1.Conditions `json:"conditions,omitempty"` // CurrentRequestByDatacenter maps data center IDs to a pending provisioning request made during reconciliation. - // Use the provided getters and setters to access and modify this map to ensure thread safety. //+optional CurrentRequestByDatacenter map[string]ProvisioningRequest `json:"currentRequest,omitempty"` @@ -124,37 +117,6 @@ func (i *IonosCloudCluster) SetConditions(conditions clusterv1.Conditions) { i.Status.Conditions = conditions } -// GetCurrentRequestByDatacenter returns the current provisioning request for the given data center and a boolean -// indicating if it exists. -func (i *IonosCloudCluster) GetCurrentRequestByDatacenter(datacenterID string) (ProvisioningRequest, bool) { - currentRequestByDatacenterMutex.RLock() - defer currentRequestByDatacenterMutex.RUnlock() - req, ok := i.Status.CurrentRequestByDatacenter[datacenterID] - return req, ok -} - -// SetCurrentRequestByDatacenter sets the current provisioning request for the given data center. -// This function makes sure that the map is initialized before setting the request. -func (i *IonosCloudCluster) SetCurrentRequestByDatacenter(datacenterID, method, status, requestPath string) { - currentRequestByDatacenterMutex.Lock() - defer currentRequestByDatacenterMutex.Unlock() - if i.Status.CurrentRequestByDatacenter == nil { - i.Status.CurrentRequestByDatacenter = map[string]ProvisioningRequest{} - } - i.Status.CurrentRequestByDatacenter[datacenterID] = ProvisioningRequest{ - Method: method, - RequestPath: requestPath, - State: status, - } -} - -// DeleteCurrentRequestByDatacenter deletes the current provisioning request for the given data center. -func (i *IonosCloudCluster) DeleteCurrentRequestByDatacenter(datacenterID string) { - currentRequestByDatacenterMutex.Lock() - defer currentRequestByDatacenterMutex.Unlock() - delete(i.Status.CurrentRequestByDatacenter, datacenterID) -} - // SetCurrentClusterRequest sets the current provisioning request for the cluster. func (i *IonosCloudCluster) SetCurrentClusterRequest(method, status, requestPath string) { i.Status.CurrentClusterRequest = &ProvisioningRequest{ diff --git a/api/v1alpha1/ionoscloudcluster_types_test.go b/api/v1alpha1/ionoscloudcluster_types_test.go index 4fce42b2..4ddcdd0e 100644 --- a/api/v1alpha1/ionoscloudcluster_types_test.go +++ b/api/v1alpha1/ionoscloudcluster_types_test.go @@ -138,9 +138,7 @@ var _ = Describe("IonosCloudCluster", func() { fetched := &IonosCloudCluster{} Expect(k8sClient.Get(context.Background(), key, fetched)).To(Succeed()) Expect(fetched.Status.Ready).To(BeFalse()) - currentRequestByDatacenterMutex.RLock() Expect(fetched.Status.CurrentRequestByDatacenter).To(BeEmpty()) - currentRequestByDatacenterMutex.RUnlock() Expect(fetched.Status.Conditions).To(BeEmpty()) By("retrieving the cluster and setting the status") @@ -150,8 +148,7 @@ var _ = Describe("IonosCloudCluster", func() { RequestPath: "/path/to/resource", State: "QUEUED", } - fetched.SetCurrentRequestByDatacenter("123", - wantProvisionRequest.Method, wantProvisionRequest.State, wantProvisionRequest.RequestPath) + fetched.Status.CurrentRequestByDatacenter["123"] = wantProvisionRequest conditions.MarkTrue(fetched, clusterv1.ReadyCondition) By("updating the cluster status") @@ -159,23 +156,17 @@ var _ = Describe("IonosCloudCluster", func() { Expect(k8sClient.Get(context.Background(), key, fetched)).To(Succeed()) Expect(fetched.Status.Ready).To(BeTrue()) - currentRequestByDatacenterMutex.RLock() Expect(fetched.Status.CurrentRequestByDatacenter).To(HaveLen(1)) - currentRequestByDatacenterMutex.RUnlock() - gotProvisionRequest, exists := fetched.GetCurrentRequestByDatacenter("123") - Expect(exists).To(BeTrue()) - Expect(gotProvisionRequest).To(Equal(wantProvisionRequest)) + Expect(fetched.Status.CurrentRequestByDatacenter["123"]).To(Equal(wantProvisionRequest)) Expect(fetched.Status.Conditions).To(HaveLen(1)) Expect(conditions.IsTrue(fetched, clusterv1.ReadyCondition)).To(BeTrue()) By("Removing the entry from the status again") - fetched.DeleteCurrentRequestByDatacenter("123") + delete(fetched.Status.CurrentRequestByDatacenter, "123") Expect(k8sClient.Status().Update(context.Background(), fetched)).To(Succeed()) Expect(k8sClient.Get(context.Background(), key, fetched)).To(Succeed()) - currentRequestByDatacenterMutex.RLock() Expect(fetched.Status.CurrentRequestByDatacenter).To(BeEmpty()) - currentRequestByDatacenterMutex.RUnlock() }) }) }) diff --git a/internal/controller/ionoscloudcluster_controller.go b/internal/controller/ionoscloudcluster_controller.go index 70f0ef29..50dc3bd2 100644 --- a/internal/controller/ionoscloudcluster_controller.go +++ b/internal/controller/ionoscloudcluster_controller.go @@ -41,6 +41,7 @@ import ( infrav1 "github.com/ionos-cloud/cluster-api-provider-ionoscloud/api/v1alpha1" "github.com/ionos-cloud/cluster-api-provider-ionoscloud/internal/service/cloud" + "github.com/ionos-cloud/cluster-api-provider-ionoscloud/internal/util/locker" "github.com/ionos-cloud/cluster-api-provider-ionoscloud/scope" ) @@ -48,6 +49,7 @@ import ( type ionosCloudClusterReconciler struct { client.Client scheme *runtime.Scheme + locker *locker.Locker } // RegisterIonosCloudClusterReconciler creates an ionosCloudClusterReconciler and registers it with the manager. @@ -63,6 +65,7 @@ func newIonosCloudClusterReconciler(mgr ctrl.Manager) *ionosCloudClusterReconcil r := &ionosCloudClusterReconciler{ Client: mgr.GetClient(), scheme: mgr.GetScheme(), + locker: locker.New(), } return r } diff --git a/internal/controller/ionoscloudmachine_controller.go b/internal/controller/ionoscloudmachine_controller.go index 28ad220b..bba04dbe 100644 --- a/internal/controller/ionoscloudmachine_controller.go +++ b/internal/controller/ionoscloudmachine_controller.go @@ -260,8 +260,7 @@ func (*ionosCloudMachineReconciler) checkRequestStates( ) (requeue bool, retErr error) { log := ctrl.LoggerFrom(ctx) // check cluster-wide request - ionosCluster := machineScope.ClusterScope.IonosCluster - if req, exists := ionosCluster.GetCurrentRequestByDatacenter(machineScope.DatacenterID()); exists { + if req, exists := machineScope.ClusterScope.GetCurrentRequestByDatacenter(machineScope.DatacenterID()); exists { status, message, err := cloudService.GetRequestStatus(ctx, req.RequestPath) if err != nil { retErr = fmt.Errorf("could not get request status: %w", err) @@ -269,7 +268,7 @@ func (*ionosCloudMachineReconciler) checkRequestStates( requeue, retErr = withStatus(status, message, &log, func() error { // remove the request from the status and patch the cluster - ionosCluster.DeleteCurrentRequestByDatacenter(machineScope.DatacenterID()) + machineScope.ClusterScope.DeleteCurrentRequestByDatacenter(machineScope.DatacenterID()) return machineScope.ClusterScope.PatchObject() }, ) diff --git a/internal/service/cloud/network.go b/internal/service/cloud/network.go index a8625f6c..30653b44 100644 --- a/internal/service/cloud/network.go +++ b/internal/service/cloud/network.go @@ -191,7 +191,7 @@ func (s *Service) createLAN(ctx context.Context, ms *scope.Machine) error { return fmt.Errorf("unable to create LAN in data center %s: %w", ms.DatacenterID(), err) } - ms.ClusterScope.IonosCluster.SetCurrentRequestByDatacenter(ms.DatacenterID(), + ms.ClusterScope.SetCurrentRequestByDatacenter(ms.DatacenterID(), http.MethodPost, sdk.RequestStatusQueued, requestPath) err = ms.ClusterScope.PatchObject() @@ -212,7 +212,7 @@ func (s *Service) deleteLAN(ctx context.Context, ms *scope.Machine, lanID string return fmt.Errorf("unable to request LAN deletion in data center: %w", err) } - ms.ClusterScope.IonosCluster.SetCurrentRequestByDatacenter(ms.DatacenterID(), + ms.ClusterScope.SetCurrentRequestByDatacenter(ms.DatacenterID(), http.MethodDelete, sdk.RequestStatusQueued, requestPath) err = ms.ClusterScope.PatchObject() @@ -254,7 +254,7 @@ func (s *Service) getLatestLANPatchRequest(ctx context.Context, ms *scope.Machin } func (*Service) removeLANPendingRequestFromCluster(ms *scope.Machine) error { - ms.ClusterScope.IonosCluster.DeleteCurrentRequestByDatacenter(ms.DatacenterID()) + ms.ClusterScope.DeleteCurrentRequestByDatacenter(ms.DatacenterID()) if err := ms.ClusterScope.PatchObject(); err != nil { return fmt.Errorf("could not remove stale LAN pending request from cluster: %w", err) } diff --git a/internal/service/cloud/network_test.go b/internal/service/cloud/network_test.go index 7e1a54e0..2fbe8af4 100644 --- a/internal/service/cloud/network_test.go +++ b/internal/service/cloud/network_test.go @@ -62,7 +62,7 @@ func (s *lanSuite) TestLANURLs() { func (s *lanSuite) TestNetworkCreateLANSuccessful() { s.mockCreateLANCall().Return(exampleRequestPath, nil).Once() s.NoError(s.service.createLAN(s.ctx, s.machineScope)) - req, exists := s.infraCluster.GetCurrentRequestByDatacenter(s.machineScope.DatacenterID()) + req, exists := s.clusterScope.GetCurrentRequestByDatacenter(s.machineScope.DatacenterID()) s.True(exists, "request should be stored in status") s.Equal(exampleRequestPath, req.RequestPath, "request path should be stored in status") s.Equal(http.MethodPost, req.Method, "request method should be stored in status") @@ -72,7 +72,7 @@ func (s *lanSuite) TestNetworkCreateLANSuccessful() { func (s *lanSuite) TestNetworkDeleteLANSuccessful() { s.mockDeleteLANCall(exampleLANID).Return(exampleRequestPath, nil).Once() s.NoError(s.service.deleteLAN(s.ctx, s.machineScope, exampleLANID)) - req, exists := s.infraCluster.GetCurrentRequestByDatacenter(s.machineScope.DatacenterID()) + req, exists := s.clusterScope.GetCurrentRequestByDatacenter(s.machineScope.DatacenterID()) s.True(exists, "request should be stored in status") s.Equal(exampleRequestPath, req.RequestPath, "request path should be stored in status") s.Equal(http.MethodDelete, req.Method, "request method should be stored in status") @@ -103,10 +103,10 @@ func (s *lanSuite) TestNetworkGetLANErrorNotUnique() { } func (s *lanSuite) TestNetworkRemoveLANPendingRequestFromClusterSuccessful() { - s.infraCluster.SetCurrentRequestByDatacenter(s.machineScope.DatacenterID(), http.MethodDelete, sdk.RequestStatusQueued, + s.clusterScope.SetCurrentRequestByDatacenter(s.machineScope.DatacenterID(), http.MethodDelete, sdk.RequestStatusQueued, exampleRequestPath) s.NoError(s.service.removeLANPendingRequestFromCluster(s.machineScope)) - _, exists := s.infraCluster.GetCurrentRequestByDatacenter(s.machineScope.DatacenterID()) + _, exists := s.clusterScope.GetCurrentRequestByDatacenter(s.machineScope.DatacenterID()) s.False(exists, "request should be removed from status") } @@ -164,7 +164,7 @@ func (s *lanSuite) TestNetworkReconcileLANDeleteLANExistsNoPendingRequestsHasOth requeue, err := s.service.ReconcileLANDeletion(s.ctx, s.machineScope) s.NoError(err) s.False(requeue) - _, exists := s.infraCluster.GetCurrentRequestByDatacenter(s.machineScope.DatacenterID()) + _, exists := s.clusterScope.GetCurrentRequestByDatacenter(s.machineScope.DatacenterID()) s.False(exists) } @@ -191,7 +191,7 @@ func (s *lanSuite) TestNetworkReconcileLANDeleteLANDoesNotExist() { requeue, err := s.service.ReconcileLANDeletion(s.ctx, s.machineScope) s.NoError(err) s.False(requeue) - _, exists := s.infraCluster.GetCurrentRequestByDatacenter(s.machineScope.DatacenterID()) + _, exists := s.clusterScope.GetCurrentRequestByDatacenter(s.machineScope.DatacenterID()) s.False(exists) } diff --git a/internal/service/cloud/suite_test.go b/internal/service/cloud/suite_test.go index c83ffc65..5e916b3f 100644 --- a/internal/service/cloud/suite_test.go +++ b/internal/service/cloud/suite_test.go @@ -182,6 +182,7 @@ func (s *ServiceTestSuite) SetupTest() { Client: s.k8sClient, Cluster: s.capiCluster, IonosCluster: s.infraCluster, + Locker: locker.New(), }) s.NoError(err, "failed to create cluster scope") diff --git a/internal/util/locker/locker.go b/internal/util/locker/locker.go index 5a8dff1c..178b99d0 100644 --- a/internal/util/locker/locker.go +++ b/internal/util/locker/locker.go @@ -77,6 +77,7 @@ func (lwc *lockWithCounter) count() int32 { } // Lock locks the given key. Returns after acquiring lock or when given context is done. +// The only errors it can return stem from the context being done. func (l *Locker) Lock(ctx context.Context, key string) error { l.mu.Lock() diff --git a/scope/cluster.go b/scope/cluster.go index 22f46f22..bffb01d4 100644 --- a/scope/cluster.go +++ b/scope/cluster.go @@ -33,6 +33,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" infrav1 "github.com/ionos-cloud/cluster-api-provider-ionoscloud/api/v1alpha1" + "github.com/ionos-cloud/cluster-api-provider-ionoscloud/internal/util/locker" ) // resolver is able to look up IP addresses from a given host name. @@ -47,6 +48,7 @@ type Cluster struct { client client.Client patchHelper *patch.Helper resolver resolver + Locker *locker.Locker Cluster *clusterv1.Cluster IonosCluster *infrav1.IonosCloudCluster } @@ -56,6 +58,7 @@ type ClusterParams struct { Client client.Client Cluster *clusterv1.Cluster IonosCluster *infrav1.IonosCloudCluster + Locker *locker.Locker } // NewCluster creates a new cluster scope with the supplied parameters. @@ -73,6 +76,10 @@ func NewCluster(params ClusterParams) (*Cluster, error) { return nil, errors.New("IonosCluster is required when creating a cluster scope") } + if params.Locker == nil { + return nil, errors.New("locker is required when creating a cluster scope") + } + helper, err := patch.NewHelper(params.IonosCluster, params.Client) if err != nil { return nil, fmt.Errorf("failed to init patch helper: %w", err) @@ -84,6 +91,7 @@ func NewCluster(params ClusterParams) (*Cluster, error) { IonosCluster: params.IonosCluster, patchHelper: helper, resolver: net.DefaultResolver, + Locker: params.Locker, } return clusterScope, nil @@ -153,6 +161,44 @@ func (c *Cluster) IsDeleted() bool { return !c.Cluster.DeletionTimestamp.IsZero() || !c.IonosCluster.DeletionTimestamp.IsZero() } +func (c *Cluster) currentRequestByDatacenterLockKey() string { + return string(c.Cluster.UID) + "/currentRequestByDatacenter" +} + +// GetCurrentRequestByDatacenter returns the current provisioning request for the given data center and a boolean +// indicating if it exists. +func (c *Cluster) GetCurrentRequestByDatacenter(datacenterID string) (infrav1.ProvisioningRequest, bool) { + lockKey := c.currentRequestByDatacenterLockKey() + _ = c.Locker.Lock(context.Background(), lockKey) + defer c.Locker.Unlock(lockKey) + req, ok := c.IonosCluster.Status.CurrentRequestByDatacenter[datacenterID] + return req, ok +} + +// SetCurrentRequestByDatacenter sets the current provisioning request for the given data center. +// This function makes sure that the map is initialized before setting the request. +func (c *Cluster) SetCurrentRequestByDatacenter(datacenterID, method, status, requestPath string) { + lockKey := c.currentRequestByDatacenterLockKey() + _ = c.Locker.Lock(context.Background(), lockKey) + defer c.Locker.Unlock(lockKey) + if c.IonosCluster.Status.CurrentRequestByDatacenter == nil { + c.IonosCluster.Status.CurrentRequestByDatacenter = map[string]infrav1.ProvisioningRequest{} + } + c.IonosCluster.Status.CurrentRequestByDatacenter[datacenterID] = infrav1.ProvisioningRequest{ + Method: method, + RequestPath: requestPath, + State: status, + } +} + +// DeleteCurrentRequestByDatacenter deletes the current provisioning request for the given data center. +func (c *Cluster) DeleteCurrentRequestByDatacenter(datacenterID string) { + lockKey := c.currentRequestByDatacenterLockKey() + _ = c.Locker.Lock(context.Background(), lockKey) + defer c.Locker.Unlock(lockKey) + delete(c.IonosCluster.Status.CurrentRequestByDatacenter, datacenterID) +} + // PatchObject will apply all changes from the IonosCloudCluster. // It will also make sure to patch the status subresource. func (c *Cluster) PatchObject() error { diff --git a/scope/cluster_test.go b/scope/cluster_test.go index f573d52f..8e32def1 100644 --- a/scope/cluster_test.go +++ b/scope/cluster_test.go @@ -31,6 +31,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client/fake" infrav1 "github.com/ionos-cloud/cluster-api-provider-ionoscloud/api/v1alpha1" + "github.com/ionos-cloud/cluster-api-provider-ionoscloud/internal/util/locker" ) func TestNewClusterMissingParams(t *testing.T) { @@ -38,41 +39,60 @@ func TestNewClusterMissingParams(t *testing.T) { require.NoError(t, infrav1.AddToScheme(scheme)) cl := fake.NewClientBuilder().WithScheme(scheme).Build() + completeParams := func() ClusterParams { + return ClusterParams{ + Client: cl, + Cluster: &clusterv1.Cluster{}, + IonosCluster: &infrav1.IonosCloudCluster{}, + Locker: locker.New(), + } + } + tests := []struct { name string - params ClusterParams + params func() ClusterParams wantErr bool }{ { name: "all present", - params: ClusterParams{ - Client: cl, - Cluster: &clusterv1.Cluster{}, - IonosCluster: &infrav1.IonosCloudCluster{}, + params: func() ClusterParams { + return completeParams() }, wantErr: false, }, { name: "missing client", - params: ClusterParams{ - Cluster: &clusterv1.Cluster{}, - IonosCluster: &infrav1.IonosCloudCluster{}, + params: func() ClusterParams { + params := completeParams() + params.Client = nil + return params }, wantErr: true, }, { name: "missing cluster", - params: ClusterParams{ - Client: cl, - IonosCluster: &infrav1.IonosCloudCluster{}, + params: func() ClusterParams { + params := completeParams() + params.Cluster = nil + return params }, wantErr: true, }, { name: "missing IONOS cluster", - params: ClusterParams{ - Client: cl, - Cluster: &clusterv1.Cluster{}, + params: func() ClusterParams { + params := completeParams() + params.IonosCluster = nil + return params + }, + wantErr: true, + }, + { + name: "missing locker", + params: func() ClusterParams { + params := completeParams() + params.Locker = nil + return params }, wantErr: true, }, @@ -80,11 +100,10 @@ func TestNewClusterMissingParams(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { + params, err := NewCluster(test.params()) if test.wantErr { - _, err := NewCluster(test.params) require.Error(t, err) } else { - params, err := NewCluster(test.params) require.NoError(t, err) require.NotNil(t, params) require.Equal(t, net.DefaultResolver, params.resolver) @@ -220,6 +239,7 @@ func TestClusterListMachines(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { params := ClusterParams{ + Locker: locker.New(), Cluster: &clusterv1.Cluster{ ObjectMeta: metav1.ObjectMeta{ Name: clusterName, @@ -305,6 +325,7 @@ func TestClusterIsDeleted(t *testing.T) { Client: cl, Cluster: test.cluster, IonosCluster: test.ionosCluster, + Locker: locker.New(), } c, err := NewCluster(params) From f505139e67b25a9d7c994b458790a540269f3f51 Mon Sep 17 00:00:00 2001 From: Matthias Bastian Date: Wed, 29 May 2024 17:18:48 +0200 Subject: [PATCH 11/17] Regenerate manifests. Again. --- .../infrastructure.cluster.x-k8s.io_ionoscloudclusters.yaml | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/config/crd/bases/infrastructure.cluster.x-k8s.io_ionoscloudclusters.yaml b/config/crd/bases/infrastructure.cluster.x-k8s.io_ionoscloudclusters.yaml index 49ccca26..ede97de7 100644 --- a/config/crd/bases/infrastructure.cluster.x-k8s.io_ionoscloudclusters.yaml +++ b/config/crd/bases/infrastructure.cluster.x-k8s.io_ionoscloudclusters.yaml @@ -209,9 +209,8 @@ spec: - method - requestPath type: object - description: |- - CurrentRequestByDatacenter maps data center IDs to a pending provisioning request made during reconciliation. - Use the provided getters and setters to access and modify this map to ensure thread safety. + description: CurrentRequestByDatacenter maps data center IDs to a + pending provisioning request made during reconciliation. type: object ready: description: Ready indicates that the cluster is ready. From b3eab843da0e5f1260076a95102b960aacd52ed4 Mon Sep 17 00:00:00 2001 From: Matthias Bastian Date: Wed, 29 May 2024 17:34:55 +0200 Subject: [PATCH 12/17] Fix integration tests --- api/v1alpha1/ionoscloudcluster_types_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/api/v1alpha1/ionoscloudcluster_types_test.go b/api/v1alpha1/ionoscloudcluster_types_test.go index 4ddcdd0e..447a6407 100644 --- a/api/v1alpha1/ionoscloudcluster_types_test.go +++ b/api/v1alpha1/ionoscloudcluster_types_test.go @@ -148,7 +148,9 @@ var _ = Describe("IonosCloudCluster", func() { RequestPath: "/path/to/resource", State: "QUEUED", } - fetched.Status.CurrentRequestByDatacenter["123"] = wantProvisionRequest + fetched.Status.CurrentRequestByDatacenter = map[string]ProvisioningRequest{ + "123": wantProvisionRequest, + } conditions.MarkTrue(fetched, clusterv1.ReadyCondition) By("updating the cluster status") From 61a5eea53a2953fa64c9224053e25ac6a24adbf3 Mon Sep 17 00:00:00 2001 From: Matthias Bastian Date: Thu, 30 May 2024 14:04:49 +0200 Subject: [PATCH 13/17] Add missing locker in cluster scope creations (If we only had unit tests for the controllers...) Locking `CurrentRequestByDatacenter` in the cluster scope is better than doing in the machine scope. I had that first, but the code looked strange and unrelated when suddenly patching the cluster after doing things in the machine scope. That's why the cluster scope contains a locker now. However, that scope is used from the machine controller. It cannot (and should not) access the cluster controller, so when building the cluster scope, the machine controller needs to pass its own locker. At the same time, the cluster controller also creates a cluster scope and is now required to pass a locker, even though it doesn't actually need one for its own work. An alternative could be having a controller package-wide locker (global variable) --- internal/controller/ionoscloudcluster_controller.go | 1 + internal/controller/ionoscloudmachine_controller.go | 1 + 2 files changed, 2 insertions(+) diff --git a/internal/controller/ionoscloudcluster_controller.go b/internal/controller/ionoscloudcluster_controller.go index 50dc3bd2..4332d854 100644 --- a/internal/controller/ionoscloudcluster_controller.go +++ b/internal/controller/ionoscloudcluster_controller.go @@ -108,6 +108,7 @@ func (r *ionosCloudClusterReconciler) Reconcile( Client: r.Client, Cluster: cluster, IonosCluster: ionosCloudCluster, + Locker: r.locker, }) if err != nil { return ctrl.Result{}, fmt.Errorf("unable to create scope %w", err) diff --git a/internal/controller/ionoscloudmachine_controller.go b/internal/controller/ionoscloudmachine_controller.go index bba04dbe..e48fce6b 100644 --- a/internal/controller/ionoscloudmachine_controller.go +++ b/internal/controller/ionoscloudmachine_controller.go @@ -366,6 +366,7 @@ func (r *ionosCloudMachineReconciler) getClusterScope( Client: r.Client, Cluster: cluster, IonosCluster: ionosCloudCluster, + Locker: r.locker, }) if err != nil { return nil, fmt.Errorf("failed to create cluster scope: %w", err) From ecc1dc46af1d472ccbf97a18421e6d59e561f5bb Mon Sep 17 00:00:00 2001 From: Matthias Bastian Date: Thu, 30 May 2024 14:15:04 +0200 Subject: [PATCH 14/17] Make reconciler types public again Same for the `SetupWithManager` methods. I'm not convinced about this, but it's in-line with other providers. --- cmd/main.go | 4 +-- .../ionoscloudcluster_controller.go | 29 +++++++------------ .../ionoscloudmachine_controller.go | 29 ++++++++----------- 3 files changed, 24 insertions(+), 38 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index 623cedeb..3f1dcf51 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -90,7 +90,7 @@ func main() { ctx := ctrl.SetupSignalHandler() - if err = iccontroller.RegisterIonosCloudClusterReconciler( + if err = iccontroller.NewIonosCloudClusterReconciler(mgr).SetupWithManager( ctx, mgr, controller.Options{MaxConcurrentReconciles: icClusterConcurrency}, @@ -98,7 +98,7 @@ func main() { setupLog.Error(err, "unable to create controller", "controller", "IonosCloudCluster") os.Exit(1) } - if err = iccontroller.RegisterIonosCloudMachineReconciler( + if err = iccontroller.NewIonosCloudMachineReconciler(mgr).SetupWithManager( mgr, controller.Options{MaxConcurrentReconciles: icMachineConcurrency}, ); err != nil { diff --git a/internal/controller/ionoscloudcluster_controller.go b/internal/controller/ionoscloudcluster_controller.go index 4332d854..2aaf1008 100644 --- a/internal/controller/ionoscloudcluster_controller.go +++ b/internal/controller/ionoscloudcluster_controller.go @@ -45,24 +45,15 @@ import ( "github.com/ionos-cloud/cluster-api-provider-ionoscloud/scope" ) -// ionosCloudClusterReconciler reconciles a IonosCloudCluster object. -type ionosCloudClusterReconciler struct { +// IonosCloudClusterReconciler reconciles a IonosCloudCluster object. +type IonosCloudClusterReconciler struct { client.Client scheme *runtime.Scheme locker *locker.Locker } -// RegisterIonosCloudClusterReconciler creates an ionosCloudClusterReconciler and registers it with the manager. -func RegisterIonosCloudClusterReconciler( - ctx context.Context, - mgr ctrl.Manager, - options controller.Options, -) error { - return newIonosCloudClusterReconciler(mgr).setupWithManager(ctx, mgr, options) -} - -func newIonosCloudClusterReconciler(mgr ctrl.Manager) *ionosCloudClusterReconciler { - r := &ionosCloudClusterReconciler{ +func NewIonosCloudClusterReconciler(mgr ctrl.Manager) *IonosCloudClusterReconciler { + r := &IonosCloudClusterReconciler{ Client: mgr.GetClient(), scheme: mgr.GetScheme(), locker: locker.New(), @@ -83,7 +74,7 @@ func newIonosCloudClusterReconciler(mgr ctrl.Manager) *ionosCloudClusterReconcil // // For more details, check Reconcile and its Result here: // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.16.0/pkg/reconcile -func (r *ionosCloudClusterReconciler) Reconcile( +func (r *IonosCloudClusterReconciler) Reconcile( ctx context.Context, ionosCloudCluster *infrav1.IonosCloudCluster, ) (_ ctrl.Result, retErr error) { @@ -138,7 +129,7 @@ func (r *ionosCloudClusterReconciler) Reconcile( return r.reconcileNormal(ctx, clusterScope, cloudService) } -func (r *ionosCloudClusterReconciler) reconcileNormal( +func (r *IonosCloudClusterReconciler) reconcileNormal( ctx context.Context, clusterScope *scope.Cluster, cloudService *cloud.Service, @@ -175,7 +166,7 @@ func (r *ionosCloudClusterReconciler) reconcileNormal( return ctrl.Result{}, nil } -func (r *ionosCloudClusterReconciler) reconcileDelete( +func (r *IonosCloudClusterReconciler) reconcileDelete( ctx context.Context, clusterScope *scope.Cluster, cloudService *cloud.Service, ) (ctrl.Result, error) { log := ctrl.LoggerFrom(ctx) @@ -224,7 +215,7 @@ func (r *ionosCloudClusterReconciler) reconcileDelete( return ctrl.Result{}, nil } -func (*ionosCloudClusterReconciler) checkRequestStatus( +func (*IonosCloudClusterReconciler) checkRequestStatus( ctx context.Context, clusterScope *scope.Cluster, cloudService *cloud.Service, ) (requeue bool, retErr error) { log := ctrl.LoggerFrom(ctx) @@ -245,8 +236,8 @@ func (*ionosCloudClusterReconciler) checkRequestStatus( return requeue, retErr } -// setupWithManager sets up the controller with the Manager. -func (r *ionosCloudClusterReconciler) setupWithManager( +// SetupWithManager sets up the controller with the Manager. +func (r *IonosCloudClusterReconciler) SetupWithManager( ctx context.Context, mgr ctrl.Manager, options controller.Options, diff --git a/internal/controller/ionoscloudmachine_controller.go b/internal/controller/ionoscloudmachine_controller.go index e48fce6b..aa120d25 100644 --- a/internal/controller/ionoscloudmachine_controller.go +++ b/internal/controller/ionoscloudmachine_controller.go @@ -40,20 +40,15 @@ import ( "github.com/ionos-cloud/cluster-api-provider-ionoscloud/scope" ) -// ionosCloudMachineReconciler reconciles a IonosCloudMachine object. -type ionosCloudMachineReconciler struct { +// IonosCloudMachineReconciler reconciles a IonosCloudMachine object. +type IonosCloudMachineReconciler struct { client.Client scheme *runtime.Scheme locker *locker.Locker } -// RegisterIonosCloudMachineReconciler creates an ionosCloudMachineReconciler and registers it with the manager. -func RegisterIonosCloudMachineReconciler(mgr ctrl.Manager, options controller.Options) error { - return newIonosCloudMachineReconciler(mgr).setupWithManager(mgr, options) -} - -func newIonosCloudMachineReconciler(mgr ctrl.Manager) *ionosCloudMachineReconciler { - r := &ionosCloudMachineReconciler{ +func NewIonosCloudMachineReconciler(mgr ctrl.Manager) *IonosCloudMachineReconciler { + r := &IonosCloudMachineReconciler{ Client: mgr.GetClient(), scheme: mgr.GetScheme(), locker: locker.New(), @@ -69,7 +64,7 @@ func newIonosCloudMachineReconciler(mgr ctrl.Manager) *ionosCloudMachineReconcil //+kubebuilder:rbac:groups="",resources=secrets,verbs=get;list;watch;update //+kubebuilder:rbac:groups="",resources=events,verbs=get;list;watch;create;update;patch -func (r *ionosCloudMachineReconciler) Reconcile( +func (r *IonosCloudMachineReconciler) Reconcile( ctx context.Context, ionosCloudMachine *infrav1.IonosCloudMachine, ) (_ ctrl.Result, retErr error) { @@ -145,7 +140,7 @@ func (r *ionosCloudMachineReconciler) Reconcile( return r.reconcileNormal(ctx, cloudService, machineScope) } -func (r *ionosCloudMachineReconciler) reconcileNormal( +func (r *IonosCloudMachineReconciler) reconcileNormal( ctx context.Context, cloudService *cloud.Service, machineScope *scope.Machine, ) (ctrl.Result, error) { log := ctrl.LoggerFrom(ctx) @@ -202,7 +197,7 @@ func (r *ionosCloudMachineReconciler) reconcileNormal( return ctrl.Result{}, nil } -func (r *ionosCloudMachineReconciler) reconcileDelete( +func (r *IonosCloudMachineReconciler) reconcileDelete( ctx context.Context, machineScope *scope.Machine, cloudService *cloud.Service, ) (ctrl.Result, error) { log := ctrl.LoggerFrom(ctx) @@ -253,7 +248,7 @@ func (r *ionosCloudMachineReconciler) reconcileDelete( // - Queued, Running => Requeue the current request // - Failed => Log the error and continue also apply the same logic as in Done. // - Done => Clear request from the status and continue reconciliation. -func (*ionosCloudMachineReconciler) checkRequestStates( +func (*IonosCloudMachineReconciler) checkRequestStates( ctx context.Context, machineScope *scope.Machine, cloudService *cloud.Service, @@ -296,7 +291,7 @@ func (*ionosCloudMachineReconciler) checkRequestStates( return requeue, retErr } -func (*ionosCloudMachineReconciler) isInfrastructureReady(ctx context.Context, ms *scope.Machine) bool { +func (*IonosCloudMachineReconciler) isInfrastructureReady(ctx context.Context, ms *scope.Machine) bool { log := ctrl.LoggerFrom(ctx) // Make sure the infrastructure is ready. if !ms.ClusterScope.Cluster.Status.InfrastructureReady { @@ -326,8 +321,8 @@ func (*ionosCloudMachineReconciler) isInfrastructureReady(ctx context.Context, m return true } -// setupWithManager sets up the controller with the Manager. -func (r *ionosCloudMachineReconciler) setupWithManager(mgr ctrl.Manager, options controller.Options) error { +// SetupWithManager sets up the controller with the Manager. +func (r *IonosCloudMachineReconciler) SetupWithManager(mgr ctrl.Manager, options controller.Options) error { return ctrl.NewControllerManagedBy(mgr). WithOptions(options). For(&infrav1.IonosCloudMachine{}). @@ -338,7 +333,7 @@ func (r *ionosCloudMachineReconciler) setupWithManager(mgr ctrl.Manager, options Complete(reconcile.AsReconciler[*infrav1.IonosCloudMachine](r.Client, r)) } -func (r *ionosCloudMachineReconciler) getClusterScope( +func (r *IonosCloudMachineReconciler) getClusterScope( ctx context.Context, cluster *clusterv1.Cluster, ionosCloudMachine *infrav1.IonosCloudMachine, ) (*scope.Cluster, error) { var clusterScope *scope.Cluster From 6ee561e7442390593da2c8d01c5e863d2614c9c7 Mon Sep 17 00:00:00 2001 From: Matthias Bastian Date: Thu, 30 May 2024 14:21:41 +0200 Subject: [PATCH 15/17] Add go docs --- internal/controller/ionoscloudcluster_controller.go | 1 + internal/controller/ionoscloudmachine_controller.go | 1 + 2 files changed, 2 insertions(+) diff --git a/internal/controller/ionoscloudcluster_controller.go b/internal/controller/ionoscloudcluster_controller.go index 2aaf1008..391032ee 100644 --- a/internal/controller/ionoscloudcluster_controller.go +++ b/internal/controller/ionoscloudcluster_controller.go @@ -52,6 +52,7 @@ type IonosCloudClusterReconciler struct { locker *locker.Locker } +// NewIonosCloudClusterReconciler creates a new IonosCloudClusterReconciler. func NewIonosCloudClusterReconciler(mgr ctrl.Manager) *IonosCloudClusterReconciler { r := &IonosCloudClusterReconciler{ Client: mgr.GetClient(), diff --git a/internal/controller/ionoscloudmachine_controller.go b/internal/controller/ionoscloudmachine_controller.go index aa120d25..9a85479c 100644 --- a/internal/controller/ionoscloudmachine_controller.go +++ b/internal/controller/ionoscloudmachine_controller.go @@ -47,6 +47,7 @@ type IonosCloudMachineReconciler struct { locker *locker.Locker } +// NewIonosCloudMachineReconciler creates a new IonosCloudMachineReconciler. func NewIonosCloudMachineReconciler(mgr ctrl.Manager) *IonosCloudMachineReconciler { r := &IonosCloudMachineReconciler{ Client: mgr.GetClient(), From 9fea73c459d634b17673f4b7298774944b54dc7e Mon Sep 17 00:00:00 2001 From: Matthias Bastian Date: Tue, 4 Jun 2024 22:28:31 +0200 Subject: [PATCH 16/17] Test `CurrentRequestByDatacenter` accessors --- scope/cluster_test.go | 51 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/scope/cluster_test.go b/scope/cluster_test.go index 8e32def1..0930ea5a 100644 --- a/scope/cluster_test.go +++ b/scope/cluster_test.go @@ -20,8 +20,11 @@ import ( "context" "net" "net/netip" + "strconv" + "sync" "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -347,3 +350,51 @@ func buildMachineWithLabel(name string, labels map[string]string) *infrav1.Ionos }, } } + +func TestCurrentRequestByDatacenterAccessors(t *testing.T) { + cluster := &Cluster{ + Cluster: &clusterv1.Cluster{ + ObjectMeta: metav1.ObjectMeta{ + UID: "uid", + }, + }, + IonosCluster: &infrav1.IonosCloudCluster{}, + Locker: locker.New(), + } + + // If there is a concurrency issue, it will very likely become visible here. + var wg sync.WaitGroup + for i := 0; i <= 10_000; i++ { + wg.Add(1) + go func(t *testing.T, id string) { + defer wg.Done() + + req, exists := cluster.GetCurrentRequestByDatacenter(id) + assert.False(t, exists) + assert.Zero(t, req) + + cluster.SetCurrentRequestByDatacenter(id, "method", "status", "requestPath") + + req, exists = cluster.GetCurrentRequestByDatacenter(id) + assert.True(t, exists) + assert.Equal(t, "method", req.Method) + assert.Equal(t, "status", req.State) + assert.Equal(t, "requestPath", req.RequestPath) + + cluster.DeleteCurrentRequestByDatacenter(id) + + req, exists = cluster.GetCurrentRequestByDatacenter(id) + assert.False(t, exists) + assert.Zero(t, req) + }(t, strconv.Itoa(i)) + } + + wg.Wait() + + lockKey := cluster.currentRequestByDatacenterLockKey() + require.Equal(t, "uid/currentRequestByDatacenter", lockKey) + + _ = cluster.Locker.Lock(context.Background(), lockKey) + require.Empty(t, cluster.IonosCluster.Status.CurrentRequestByDatacenter) + cluster.Locker.Unlock(lockKey) +} From b2f1d5b7977b2d04383a273c3947513a0f25c12b Mon Sep 17 00:00:00 2001 From: Matthias Bastian Date: Wed, 5 Jun 2024 08:50:54 +0200 Subject: [PATCH 17/17] Test additional lock key generators --- internal/service/cloud/ipblock_test.go | 4 ++++ internal/service/cloud/network_test.go | 4 ++++ internal/service/cloud/suite_test.go | 4 +++- 3 files changed, 11 insertions(+), 1 deletion(-) diff --git a/internal/service/cloud/ipblock_test.go b/internal/service/cloud/ipblock_test.go index 2a81525d..62070fe4 100644 --- a/internal/service/cloud/ipblock_test.go +++ b/internal/service/cloud/ipblock_test.go @@ -487,6 +487,10 @@ func (s *ipBlockTestSuite) buildRequest(status string, method, id string) sdk.Re return s.buildIPBlockRequestWithName(s.service.controlPlaneEndpointIPBlockName(s.clusterScope), status, method, id) } +func (s *ipBlockTestSuite) TestFailoverIPBlockLockKey() { + s.Equal("fo-ipb/default/test-md", s.service.failoverIPBlockLockKey(s.machineScope)) +} + // exampleIPBlock returns a new sdk.IpBlock instance for testing. The IPs need to be set. func exampleIPBlock() *sdk.IpBlock { return exampleIPBlockWithName(exampleIPBlockName) diff --git a/internal/service/cloud/network_test.go b/internal/service/cloud/network_test.go index 2fbe8af4..6e40afbd 100644 --- a/internal/service/cloud/network_test.go +++ b/internal/service/cloud/network_test.go @@ -50,6 +50,10 @@ func (s *lanSuite) TestNetworkLANName() { s.Equal("lan-default-test-cluster", s.service.lanName(s.clusterScope.Cluster)) } +func (s *lanSuite) TestLANLockKey() { + s.Equal("cluster/uid/dc/ccf27092-34e8-499e-a2f5-2bdee9d34a12/lan", s.service.lanLockKey(s.machineScope)) +} + func (s *lanSuite) TestLANURL() { s.Equal("datacenters/"+s.machineScope.DatacenterID()+"/lans/1", s.service.lanURL(s.machineScope.DatacenterID(), "1")) diff --git a/internal/service/cloud/suite_test.go b/internal/service/cloud/suite_test.go index 5e916b3f..908b6cd5 100644 --- a/internal/service/cloud/suite_test.go +++ b/internal/service/cloud/suite_test.go @@ -108,6 +108,7 @@ func (s *ServiceTestSuite) SetupTest() { ObjectMeta: metav1.ObjectMeta{ Namespace: metav1.NamespaceDefault, Name: "test-cluster", + UID: "uid", }, Spec: clusterv1.ClusterSpec{}, } @@ -143,7 +144,8 @@ func (s *ServiceTestSuite) SetupTest() { Namespace: metav1.NamespaceDefault, Name: "test-machine", Labels: map[string]string{ - clusterv1.ClusterNameLabel: s.capiCluster.Name, + clusterv1.ClusterNameLabel: s.capiCluster.Name, + clusterv1.MachineDeploymentNameLabel: "test-md", }, }, Spec: infrav1.IonosCloudMachineSpec{