Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Make MaxConcurrentReconciles configurable #138

Merged
merged 17 commits into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 0 additions & 18 deletions api/v1alpha1/ionoscloudcluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,24 +117,6 @@ func (i *IonosCloudCluster) SetConditions(conditions clusterv1.Conditions) {
i.Status.Conditions = conditions
}

// SetCurrentRequestByDatacenter sets the current provisioning request for the given data center.
// This function makes sure that the map is initialized before setting the request.
func (i *IonosCloudCluster) SetCurrentRequestByDatacenter(datacenterID, method, status, requestPath string) {
if i.Status.CurrentRequestByDatacenter == nil {
i.Status.CurrentRequestByDatacenter = map[string]ProvisioningRequest{}
}
i.Status.CurrentRequestByDatacenter[datacenterID] = ProvisioningRequest{
Method: method,
RequestPath: requestPath,
State: status,
}
}

// DeleteCurrentRequestByDatacenter deletes the current provisioning request for the given data center.
func (i *IonosCloudCluster) DeleteCurrentRequestByDatacenter(datacenterID string) {
delete(i.Status.CurrentRequestByDatacenter, datacenterID)
}

// SetCurrentClusterRequest sets the current provisioning request for the cluster.
func (i *IonosCloudCluster) SetCurrentClusterRequest(method, status, requestPath string) {
i.Status.CurrentClusterRequest = &ProvisioningRequest{
Expand Down
7 changes: 4 additions & 3 deletions api/v1alpha1/ionoscloudcluster_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,9 @@ var _ = Describe("IonosCloudCluster", func() {
RequestPath: "/path/to/resource",
State: "QUEUED",
}
fetched.SetCurrentRequestByDatacenter("123",
wantProvisionRequest.Method, wantProvisionRequest.State, wantProvisionRequest.RequestPath)
fetched.Status.CurrentRequestByDatacenter = map[string]ProvisioningRequest{
"123": wantProvisionRequest,
}
conditions.MarkTrue(fetched, clusterv1.ReadyCondition)

By("updating the cluster status")
Expand All @@ -163,7 +164,7 @@ var _ = Describe("IonosCloudCluster", func() {
Expect(conditions.IsTrue(fetched, clusterv1.ReadyCondition)).To(BeTrue())

By("Removing the entry from the status again")
fetched.DeleteCurrentRequestByDatacenter("123")
delete(fetched.Status.CurrentRequestByDatacenter, "123")
Expect(k8sClient.Status().Update(context.Background(), fetched)).To(Succeed())

Expect(k8sClient.Get(context.Background(), key, fetched)).To(Succeed())
Expand Down
27 changes: 18 additions & 9 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ import (
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
"sigs.k8s.io/cluster-api/util/flags"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/healthz"

infrav1 "github.com/ionos-cloud/cluster-api-provider-ionoscloud/api/v1alpha1"
"github.com/ionos-cloud/cluster-api-provider-ionoscloud/internal/controller"
iccontroller "github.com/ionos-cloud/cluster-api-provider-ionoscloud/internal/controller"
)

var (
Expand All @@ -42,6 +43,9 @@ var (
healthProbeAddr string
enableLeaderElection bool
diagnosticOptions = flags.DiagnosticsOptions{}

icClusterConcurrency int
icMachineConcurrency int
)

func init() {
Expand Down Expand Up @@ -86,17 +90,18 @@ func main() {

ctx := ctrl.SetupSignalHandler()

if err = (&controller.IonosCloudClusterReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
}).SetupWithManager(ctx, mgr); err != nil {
if err = iccontroller.NewIonosCloudClusterReconciler(mgr).SetupWithManager(
ctx,
mgr,
controller.Options{MaxConcurrentReconciles: icClusterConcurrency},
); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "IonosCloudCluster")
os.Exit(1)
}
if err = (&controller.IonosCloudMachineReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
}).SetupWithManager(mgr); err != nil {
if err = iccontroller.NewIonosCloudMachineReconciler(mgr).SetupWithManager(
mgr,
controller.Options{MaxConcurrentReconciles: icMachineConcurrency},
); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "IonosCloudMachine")
os.Exit(1)
}
Expand Down Expand Up @@ -128,4 +133,8 @@ func initFlags() {
pflag.BoolVar(&enableLeaderElection, "leader-elect", false,
"Enable leader election for controller manager. "+
"Enabling this will ensure there is only one active controller manager.")
pflag.IntVar(&icClusterConcurrency, "ionoscloudcluster-concurrency", 1,
"Number of IonosCloudClusters to process simultaneously")
pflag.IntVar(&icMachineConcurrency, "ionoscloudmachine-concurrency", 1,
"Number of IonosCloudMachines to process simultaneously")
}
23 changes: 21 additions & 2 deletions internal/controller/ionoscloudcluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,32 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

infrav1 "github.com/ionos-cloud/cluster-api-provider-ionoscloud/api/v1alpha1"
"github.com/ionos-cloud/cluster-api-provider-ionoscloud/internal/service/cloud"
"github.com/ionos-cloud/cluster-api-provider-ionoscloud/internal/util/locker"
"github.com/ionos-cloud/cluster-api-provider-ionoscloud/scope"
)

// IonosCloudClusterReconciler reconciles a IonosCloudCluster object.
type IonosCloudClusterReconciler struct {
client.Client
Scheme *runtime.Scheme
scheme *runtime.Scheme
locker *locker.Locker
}

// NewIonosCloudClusterReconciler creates a new IonosCloudClusterReconciler.
func NewIonosCloudClusterReconciler(mgr ctrl.Manager) *IonosCloudClusterReconciler {
r := &IonosCloudClusterReconciler{
Client: mgr.GetClient(),
scheme: mgr.GetScheme(),
locker: locker.New(),
}
return r
}

//+kubebuilder:rbac:groups=infrastructure.cluster.x-k8s.io,resources=ionoscloudclusters,verbs=get;list;watch;create;update;patch;delete
Expand Down Expand Up @@ -87,6 +100,7 @@ func (r *IonosCloudClusterReconciler) Reconcile(
Client: r.Client,
Cluster: cluster,
IonosCluster: ionosCloudCluster,
Locker: r.locker,
})
if err != nil {
return ctrl.Result{}, fmt.Errorf("unable to create scope %w", err)
Expand Down Expand Up @@ -224,8 +238,13 @@ func (*IonosCloudClusterReconciler) checkRequestStatus(
}

// SetupWithManager sets up the controller with the Manager.
func (r *IonosCloudClusterReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error {
func (r *IonosCloudClusterReconciler) SetupWithManager(
ctx context.Context,
mgr ctrl.Manager,
options controller.Options,
) error {
return ctrl.NewControllerManagedBy(mgr).
WithOptions(options).
For(&infrav1.IonosCloudCluster{}).
WithEventFilter(predicates.ResourceNotPaused(ctrl.LoggerFrom(ctx))).
Watches(&clusterv1.Cluster{},
Expand Down
32 changes: 21 additions & 11 deletions internal/controller/ionoscloudmachine_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,32 @@ import (
"sigs.k8s.io/cluster-api/util/conditions"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

infrav1 "github.com/ionos-cloud/cluster-api-provider-ionoscloud/api/v1alpha1"
"github.com/ionos-cloud/cluster-api-provider-ionoscloud/internal/service/cloud"
"github.com/ionos-cloud/cluster-api-provider-ionoscloud/internal/util/locker"
"github.com/ionos-cloud/cluster-api-provider-ionoscloud/scope"
)

// IonosCloudMachineReconciler reconciles a IonosCloudMachine object.
type IonosCloudMachineReconciler struct {
client.Client
Scheme *runtime.Scheme
scheme *runtime.Scheme
locker *locker.Locker
}

// NewIonosCloudMachineReconciler creates a new IonosCloudMachineReconciler.
func NewIonosCloudMachineReconciler(mgr ctrl.Manager) *IonosCloudMachineReconciler {
r := &IonosCloudMachineReconciler{
Client: mgr.GetClient(),
scheme: mgr.GetScheme(),
locker: locker.New(),
}
return r
}

//+kubebuilder:rbac:groups=infrastructure.cluster.x-k8s.io,resources=ionoscloudmachines,verbs=get;list;watch;create;update;patch;delete
Expand Down Expand Up @@ -95,6 +108,7 @@ func (r *IonosCloudMachineReconciler) Reconcile(
Machine: machine,
ClusterScope: clusterScope,
IonosMachine: ionosCloudMachine,
Locker: r.locker,
})
if err != nil {
return ctrl.Result{}, fmt.Errorf("failed to create scope: %w", err)
Expand Down Expand Up @@ -164,7 +178,6 @@ func (r *IonosCloudMachineReconciler) reconcileNormal(
return ctrl.Result{RequeueAfter: defaultReconcileDuration}, nil
}

// TODO(piepmatz): This is not thread-safe, but needs to be. Add locking.
reconcileSequence := []serviceReconcileStep[scope.Machine]{
{"ReconcileLAN", cloudService.ReconcileLAN},
{"ReconcileServer", cloudService.ReconcileServer},
Expand All @@ -190,10 +203,6 @@ func (r *IonosCloudMachineReconciler) reconcileDelete(
) (ctrl.Result, error) {
log := ctrl.LoggerFrom(ctx)

// TODO(piepmatz): This is not thread-safe, but needs to be. Add locking.
// Moreover, should only be attempted if it's the last machine using that LAN. We should check that our machines
// at least, but need to accept that users added their own infrastructure into our LAN (in that case a LAN deletion
// attempt will be denied with HTTP 422).
requeue, err := r.checkRequestStates(ctx, machineScope, cloudService)
if err != nil {
// In case the request state cannot be determined, we want to continue with the
Expand Down Expand Up @@ -246,17 +255,16 @@ func (*IonosCloudMachineReconciler) checkRequestStates(
cloudService *cloud.Service,
) (requeue bool, retErr error) {
log := ctrl.LoggerFrom(ctx)
// check cluster wide request
ionosCluster := machineScope.ClusterScope.IonosCluster
if req, exists := ionosCluster.Status.CurrentRequestByDatacenter[machineScope.DatacenterID()]; exists {
// check cluster-wide request
if req, exists := machineScope.ClusterScope.GetCurrentRequestByDatacenter(machineScope.DatacenterID()); exists {
status, message, err := cloudService.GetRequestStatus(ctx, req.RequestPath)
if err != nil {
retErr = fmt.Errorf("could not get request status: %w", err)
} else {
requeue, retErr = withStatus(status, message, &log,
func() error {
// remove the request from the status and patch the cluster
ionosCluster.DeleteCurrentRequestByDatacenter(machineScope.DatacenterID())
machineScope.ClusterScope.DeleteCurrentRequestByDatacenter(machineScope.DatacenterID())
return machineScope.ClusterScope.PatchObject()
},
)
Expand Down Expand Up @@ -315,8 +323,9 @@ func (*IonosCloudMachineReconciler) isInfrastructureReady(ctx context.Context, m
}

// SetupWithManager sets up the controller with the Manager.
func (r *IonosCloudMachineReconciler) SetupWithManager(mgr ctrl.Manager) error {
func (r *IonosCloudMachineReconciler) SetupWithManager(mgr ctrl.Manager, options controller.Options) error {
return ctrl.NewControllerManagedBy(mgr).
WithOptions(options).
For(&infrav1.IonosCloudMachine{}).
Watches(
&clusterv1.Machine{},
Expand Down Expand Up @@ -353,6 +362,7 @@ func (r *IonosCloudMachineReconciler) getClusterScope(
Client: r.Client,
Cluster: cluster,
IonosCluster: ionosCloudCluster,
Locker: r.locker,
})
if err != nil {
return nil, fmt.Errorf("failed to create cluster scope: %w", err)
Expand Down
16 changes: 16 additions & 0 deletions internal/service/cloud/ipblock.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,12 @@ func (s *Service) ReconcileFailoverIPBlockDeletion(ctx context.Context, ms *scop
return false, nil
}

lockKey := s.failoverIPBlockLockKey(ms)
if err := ms.Locker.Lock(ctx, lockKey); err != nil {
return false, err
}
defer ms.Locker.Unlock(lockKey)

// Check if the IP block is currently in creation. We need to wait for it to be finished
// before we can trigger the deletion.
ipBlock, request, err := scopedFindResource(
Expand Down Expand Up @@ -457,6 +463,16 @@ func (*Service) failoverIPBlockName(ms *scope.Machine) string {
)
}

func (*Service) failoverIPBlockLockKey(ms *scope.Machine) string {
// Failover IPs are shared across machines within the same failover group.
// When reserving the corresponding IP block, we must avoid duplicate reservations caused by concurrent machine
// reconciliations. So we lock when performing write operations.
// As the failover group corresponds with the MachineDeployment the machines belong to, we use the MachineDeployment
// namespace and name as part of the key used for locking. That's more fine-grained than using the machine's
// datacenter ID and allows working on distinct failover groups within the same datacenter in parallel.
return "fo-ipb/" + ms.IonosMachine.Namespace + "/" + ms.IonosMachine.Labels[clusterv1.MachineDeploymentNameLabel]
}

func ignoreErrUserSetIPNotFound(err error) error {
if errors.Is(err, errUserSetIPNotFound) {
return nil
Expand Down
4 changes: 4 additions & 0 deletions internal/service/cloud/ipblock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,10 @@ func (s *ipBlockTestSuite) buildRequest(status string, method, id string) sdk.Re
return s.buildIPBlockRequestWithName(s.service.controlPlaneEndpointIPBlockName(s.clusterScope), status, method, id)
}

func (s *ipBlockTestSuite) TestFailoverIPBlockLockKey() {
s.Equal("fo-ipb/default/test-md", s.service.failoverIPBlockLockKey(s.machineScope))
}

// exampleIPBlock returns a new sdk.IpBlock instance for testing. The IPs need to be set.
func exampleIPBlock() *sdk.IpBlock {
return exampleIPBlockWithName(exampleIPBlockName)
Expand Down
Loading
Loading