From 0fcdb3b8e397c8f27e607a814b225cc28faf81ff Mon Sep 17 00:00:00 2001 From: Alex Necula Date: Tue, 21 Mar 2023 14:31:13 +0200 Subject: [PATCH 1/7] Implement disk removal feature --- api/v1alpha1/common_types.go | 2 + api/v1alpha1/cruisecontroloperation_types.go | 10 +- api/v1alpha1/zz_generated.deepcopy.go | 28 +- api/v1beta1/common_types.go | 54 +- api/v1beta1/zz_generated.deepcopy.go | 28 +- .../cruisecontroloperation_controller.go | 72 ++- .../cruisecontroloperation_controller_test.go | 15 + controllers/cruisecontroltask_controller.go | 110 ++++- .../cruisecontroltask_controller_test.go | 116 +++++ controllers/cruisecontroltask_types.go | 26 +- controllers/kafkacluster_controller.go | 1 + .../cruisecontroloperation_controller_test.go | 89 +++- controllers/tests/mocks/scale.go | 15 + main.go | 9 - pkg/k8sutil/status.go | 53 +- pkg/resources/kafka/configmap.go | 31 +- pkg/resources/kafka/kafka.go | 79 ++- pkg/resources/kafka/kafka_test.go | 282 ++++++++++- pkg/scale/scale.go | 106 +++- pkg/scale/types.go | 1 + pkg/webhooks/kafkacluster_validator.go | 83 ---- pkg/webhooks/kafkacluster_validator_test.go | 467 ------------------ 22 files changed, 979 insertions(+), 698 deletions(-) diff --git a/api/v1alpha1/common_types.go b/api/v1alpha1/common_types.go index 0bcff342a..0a948c9a8 100644 --- a/api/v1alpha1/common_types.go +++ b/api/v1alpha1/common_types.go @@ -44,6 +44,8 @@ const ( OperationAddBroker CruiseControlTaskOperation = "add_broker" // OperationRemoveBroker means a Cruise Control remove_broker operation OperationRemoveBroker CruiseControlTaskOperation = "remove_broker" + // OperationRemoveDisks means a Cruise Control remove_disks operation + OperationRemoveDisks CruiseControlTaskOperation = "remove_disks" // OperationRebalance means a Cruise Control rebalance operation OperationRebalance CruiseControlTaskOperation = "rebalance" // KafkaAccessTypeRead states that a user wants consume access to a topic diff --git a/api/v1alpha1/cruisecontroloperation_types.go b/api/v1alpha1/cruisecontroloperation_types.go index 92034d528..4ffeff73c 100644 --- a/api/v1alpha1/cruisecontroloperation_types.go +++ b/api/v1alpha1/cruisecontroloperation_types.go @@ -29,6 +29,9 @@ const ( ErrorPolicyRetry ErrorPolicyType = "retry" // DefaultRetryBackOffDurationSec defines the time between retries of the failed tasks. DefaultRetryBackOffDurationSec = 30 + // PauseLabel defines the label key for pausing Cruise Control operations. + PauseLabel = "pause" + True = "true" ) //+kubebuilder:object:root=true @@ -184,7 +187,7 @@ func (o *CruiseControlOperation) IsDone() bool { } func (o *CruiseControlOperation) IsPaused() bool { - return o.GetLabels()["pause"] == "true" + return o.GetLabels()[PauseLabel] == True } func (o *CruiseControlOperation) IsErrorPolicyIgnore() bool { @@ -221,5 +224,8 @@ func (o *CruiseControlOperation) IsCurrentTaskFinished() bool { func (o *CruiseControlOperation) IsCurrentTaskOperationValid() bool { return o.CurrentTaskOperation() == OperationAddBroker || - o.CurrentTaskOperation() == OperationRebalance || o.CurrentTaskOperation() == OperationRemoveBroker || o.CurrentTaskOperation() == OperationStopExecution + o.CurrentTaskOperation() == OperationRebalance || + o.CurrentTaskOperation() == OperationRemoveBroker || + o.CurrentTaskOperation() == OperationStopExecution || + o.CurrentTaskOperation() == OperationRemoveDisks } diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index b7e5de78b..b77b8150f 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -1,19 +1,21 @@ //go:build !ignore_autogenerated // +build !ignore_autogenerated -// Copyright © 2019 Cisco Systems, Inc. and/or its affiliates -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. +/* +Copyright 2023 Cisco Systems, Inc. and/or its affiliates + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ // Code generated by controller-gen. DO NOT EDIT. diff --git a/api/v1beta1/common_types.go b/api/v1beta1/common_types.go index 3f226fe4b..c919dfd45 100644 --- a/api/v1beta1/common_types.go +++ b/api/v1beta1/common_types.go @@ -66,24 +66,39 @@ type PKIBackend string // CruiseControlVolumeState holds information about the state of volume rebalance type CruiseControlVolumeState string -// IsRunningState returns true if CruiseControlVolumeState indicates -// that the CC operation is scheduled and in-progress -func (s CruiseControlVolumeState) IsRunningState() bool { +// IsDiskRebalanceRunning returns true if CruiseControlVolumeState indicates +// that the CC rebalance disk operation is scheduled and in-progress +func (s CruiseControlVolumeState) IsDiskRebalanceRunning() bool { return s == GracefulDiskRebalanceRunning || s == GracefulDiskRebalanceCompletedWithError || s == GracefulDiskRebalancePaused || s == GracefulDiskRebalanceScheduled } -// IsRequiredState returns true if CruiseControlVolumeState is in GracefulDiskRebalanceRequired state +// IsDiskRemovalRunning returns true if CruiseControlVolumeState indicates +// that the CC remove disks operation is scheduled and in-progress +func (s CruiseControlVolumeState) IsDiskRemovalRunning() bool { + return s == GracefulDiskRemovalRunning || + s == GracefulDiskRemovalCompletedWithError || + s == GracefulDiskRemovalPaused || + s == GracefulDiskRemovalScheduled +} + +// IsRequiredState returns true if CruiseControlVolumeState is in GracefulDiskRebalanceRequired state or GracefulDiskRemovalRequired state func (s CruiseControlVolumeState) IsRequiredState() bool { - return s == GracefulDiskRebalanceRequired + return s == GracefulDiskRebalanceRequired || + s == GracefulDiskRemovalRequired } -// IsActive returns true if CruiseControlVolumeState is in active state +// IsDiskRebalance returns true if CruiseControlVolumeState is in disk rebalance state // the controller needs to take care of. -func (s CruiseControlVolumeState) IsActive() bool { - return s.IsRunningState() || s == GracefulDiskRebalanceRequired +func (s CruiseControlVolumeState) IsDiskRebalance() bool { + return s.IsDiskRebalanceRunning() || s == GracefulDiskRebalanceRequired +} + +// IsDiskRemoval returns true if CruiseControlVolumeState is in disk removal state +func (s CruiseControlVolumeState) IsDiskRemoval() bool { + return s.IsDiskRemovalRunning() || s == GracefulDiskRemovalRequired } // IsUpscale returns true if CruiseControlState in GracefulUpscale* state. @@ -138,11 +153,16 @@ func (r CruiseControlState) IsSucceeded() bool { r == GracefulUpscaleSucceeded } -// IsSucceeded returns true if CruiseControlVolumeState is succeeded -func (r CruiseControlVolumeState) IsSucceeded() bool { +// IsDiskRebalanceSucceeded returns true if CruiseControlVolumeState is disk rebalance succeeded +func (r CruiseControlVolumeState) IsDiskRebalanceSucceeded() bool { return r == GracefulDiskRebalanceSucceeded } +// IsDiskRemovalSucceeded returns true if CruiseControlVolumeState is disk removal succeeded +func (r CruiseControlVolumeState) IsDiskRemovalSucceeded() bool { + return r == GracefulDiskRemovalSucceeded +} + // IsSSL determines if the receiver is using SSL func (r SecurityProtocol) IsSSL() bool { return r.Equal(SecurityProtocolSaslSSL) || r.Equal(SecurityProtocolSSL) @@ -255,6 +275,20 @@ const ( // GracefulDownscalePaused states that the broker downscale task is completed with an error and it will not be retried, it is paused. In this case further downscale tasks can be executed GracefulDownscalePaused CruiseControlState = "GracefulDownscalePaused" + // Disk removal cruise control states + // GracefulDiskRemovalRequired states that the broker volume needs to be removed + GracefulDiskRemovalRequired CruiseControlVolumeState = "GracefulDiskRemovalRequired" + // GracefulDiskRemovalRunning states that for the broker volume a CC disk removal is in progress + GracefulDiskRemovalRunning CruiseControlVolumeState = "GracefulDiskRemovalRunning" + // GracefulDiskRemovalSucceeded states that the for the broker volume removal has succeeded + GracefulDiskRemovalSucceeded CruiseControlVolumeState = "GracefulDiskRemovalSucceeded" + // GracefulDiskRemovalScheduled states that the broker volume removal CCOperation is created and the task is waiting for execution + GracefulDiskRemovalScheduled CruiseControlVolumeState = "GracefulDiskRemovalScheduled" + // GracefulDiskRemovalCompletedWithError states that the broker volume removal task completed with an error + GracefulDiskRemovalCompletedWithError CruiseControlVolumeState = "GracefulDiskRemovalCompletedWithError" + // GracefulDiskRemovalPaused states that the broker volume removal task is completed with an error and it will not be retried, it is paused + GracefulDiskRemovalPaused CruiseControlVolumeState = "GracefulDiskRemovalPaused" + // Disk rebalance cruise control states // GracefulDiskRebalanceRequired states that the broker volume needs a CC disk rebalance GracefulDiskRebalanceRequired CruiseControlVolumeState = "GracefulDiskRebalanceRequired" diff --git a/api/v1beta1/zz_generated.deepcopy.go b/api/v1beta1/zz_generated.deepcopy.go index 96fefb6ab..ccfaada52 100644 --- a/api/v1beta1/zz_generated.deepcopy.go +++ b/api/v1beta1/zz_generated.deepcopy.go @@ -1,19 +1,21 @@ //go:build !ignore_autogenerated // +build !ignore_autogenerated -// Copyright © 2019 Cisco Systems, Inc. and/or its affiliates -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. +/* +Copyright 2023 Cisco Systems, Inc. and/or its affiliates + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ // Code generated by controller-gen. DO NOT EDIT. diff --git a/controllers/cruisecontroloperation_controller.go b/controllers/cruisecontroloperation_controller.go index 7ffe1d67a..9292f8ba0 100644 --- a/controllers/cruisecontroloperation_controller.go +++ b/controllers/cruisecontroloperation_controller.go @@ -52,8 +52,9 @@ const ( var ( defaultRequeueIntervalInSeconds = 10 executionPriorityMap = map[banzaiv1alpha1.CruiseControlTaskOperation]int{ - banzaiv1alpha1.OperationAddBroker: 2, - banzaiv1alpha1.OperationRemoveBroker: 1, + banzaiv1alpha1.OperationAddBroker: 3, + banzaiv1alpha1.OperationRemoveBroker: 2, + banzaiv1alpha1.OperationRemoveDisks: 1, banzaiv1alpha1.OperationRebalance: 0, } missingCCResErr = errors.New("missing Cruise Control user task result") @@ -197,7 +198,11 @@ func (r *CruiseControlOperationReconciler) Reconcile(ctx context.Context, reques return reconciled() } - ccOperationExecution := selectOperationForExecution(ccOperationQueueMap) + ccOperationExecution, err := r.selectOperationForExecution(ccOperationQueueMap) + if err != nil { + log.Error(err, "requeue event as selecting operation for execution failed") + return requeueAfter(defaultRequeueIntervalInSeconds) + } // There is nothing to be executed for now, requeue if ccOperationExecution == nil { return requeueAfter(defaultRequeueIntervalInSeconds) @@ -265,6 +270,8 @@ func (r *CruiseControlOperationReconciler) executeOperation(ctx context.Context, cruseControlTaskResult, err = r.scaler.RemoveBrokersWithParams(ctx, ccOperationExecution.CurrentTaskParameters()) case banzaiv1alpha1.OperationRebalance: cruseControlTaskResult, err = r.scaler.RebalanceWithParams(ctx, ccOperationExecution.CurrentTaskParameters()) + case banzaiv1alpha1.OperationRemoveDisks: + cruseControlTaskResult, err = r.scaler.RemoveDisksWithParams(ctx, ccOperationExecution.CurrentTaskParameters()) case banzaiv1alpha1.OperationStopExecution: cruseControlTaskResult, err = r.scaler.StopExecution(ctx) default: @@ -300,29 +307,56 @@ func sortOperations(ccOperations []*banzaiv1alpha1.CruiseControlOperation) map[s return ccOperationQueueMap } -func selectOperationForExecution(ccOperationQueueMap map[string][]*banzaiv1alpha1.CruiseControlOperation) *banzaiv1alpha1.CruiseControlOperation { - // SELECTING OPERATION FOR EXECUTION - var ccOperationExecution *banzaiv1alpha1.CruiseControlOperation +// selectOperationForExecution selects the next operation to be executed +func (r *CruiseControlOperationReconciler) selectOperationForExecution(ccOperationQueueMap map[string][]*banzaiv1alpha1.CruiseControlOperation) (*banzaiv1alpha1.CruiseControlOperation, error) { // First prio: execute the finalize task - switch { - case len(ccOperationQueueMap[ccOperationForStopExecution]) > 0: - ccOperationExecution = ccOperationQueueMap[ccOperationForStopExecution][0] - ccOperationExecution.CurrentTask().Operation = banzaiv1alpha1.OperationStopExecution + if op := getFirstOperation(ccOperationQueueMap, ccOperationForStopExecution); op != nil { + op.CurrentTask().Operation = banzaiv1alpha1.OperationStopExecution + return op, nil + } + // Second prio: execute add_broker operation - case len(ccOperationQueueMap[ccOperationFirstExecution]) > 0 && ccOperationQueueMap[ccOperationFirstExecution][0].CurrentTaskOperation() == banzaiv1alpha1.OperationAddBroker: - ccOperationExecution = ccOperationQueueMap[ccOperationFirstExecution][0] + if op := getFirstOperation(ccOperationQueueMap, ccOperationFirstExecution); op != nil && + op.CurrentTaskOperation() == banzaiv1alpha1.OperationAddBroker { + return op, nil + } + // Third prio: execute failed task - case len(ccOperationQueueMap[ccOperationRetryExecution]) > 0: + if op := getFirstOperation(ccOperationQueueMap, ccOperationRetryExecution); op != nil { + // If there is a failed remove_disks task and there is a rebalance_disks task in the queue, we execute the rebalance_disks task + // This could only happen if the user tried to delete a disk, and later rolled back the change + if op.CurrentTaskOperation() == banzaiv1alpha1.OperationRemoveDisks { + for _, opFirstExecution := range ccOperationQueueMap[ccOperationFirstExecution] { + if opFirstExecution.CurrentTaskOperation() == banzaiv1alpha1.OperationRebalance { + // Mark the remove disk operation as paused, so it is not retried + op.Labels[banzaiv1alpha1.PauseLabel] = True + err := r.Client.Update(context.TODO(), op) + if err != nil { + return nil, errors.WrapIfWithDetails(err, "failed to update Cruise Control operation", "name", op.Name, "namespace", op.Namespace) + } + + // Execute the rebalance disks operation + return opFirstExecution, nil + } + } + } + // When the default backoff duration elapsed we retry - if ccOperationQueueMap[ccOperationRetryExecution][0].IsReadyForRetryExecution() { - ccOperationExecution = ccOperationQueueMap[ccOperationRetryExecution][0] + if op.IsReadyForRetryExecution() { + return op, nil } - // Forth prio: execute the first element in the FirstExecutionQueue which is ordered by operation type and k8s creation timestamp - case len(ccOperationQueueMap[ccOperationFirstExecution]) > 0: - ccOperationExecution = ccOperationQueueMap[ccOperationFirstExecution][0] } - return ccOperationExecution + // Fourth prio: execute the first element in the FirstExecutionQueue which is ordered by operation type and k8s creation timestamp + return getFirstOperation(ccOperationQueueMap, ccOperationFirstExecution), nil +} + +// getFirstOperation returns the first operation in the given queue +func getFirstOperation(ccOperationQueueMap map[string][]*banzaiv1alpha1.CruiseControlOperation, key string) *banzaiv1alpha1.CruiseControlOperation { + if len(ccOperationQueueMap[key]) > 0 { + return ccOperationQueueMap[key][0] + } + return nil } // SetupCruiseControlWithManager registers cruise control controller to the manager diff --git a/controllers/cruisecontroloperation_controller_test.go b/controllers/cruisecontroloperation_controller_test.go index d7c0db642..1286bdf43 100644 --- a/controllers/cruisecontroloperation_controller_test.go +++ b/controllers/cruisecontroloperation_controller_test.go @@ -84,6 +84,21 @@ func TestSortOperations(t *testing.T) { createCCRetryExecutionOperation(timeNow, "3", v1alpha1.OperationRebalance), }, }, + { + testName: "mixed with remove disks", + ccOperations: []*v1alpha1.CruiseControlOperation{ + createCCRetryExecutionOperation(timeNow, "1", v1alpha1.OperationAddBroker), + createCCRetryExecutionOperation(timeNow, "4", v1alpha1.OperationRebalance), + createCCRetryExecutionOperation(timeNow.Add(2*time.Second), "3", v1alpha1.OperationRemoveDisks), + createCCRetryExecutionOperation(timeNow.Add(time.Second), "2", v1alpha1.OperationRemoveBroker), + }, + expectedOutput: []*v1alpha1.CruiseControlOperation{ + createCCRetryExecutionOperation(timeNow, "1", v1alpha1.OperationAddBroker), + createCCRetryExecutionOperation(timeNow.Add(time.Second), "2", v1alpha1.OperationRemoveBroker), + createCCRetryExecutionOperation(timeNow.Add(2*time.Second), "3", v1alpha1.OperationRemoveDisks), + createCCRetryExecutionOperation(timeNow, "4", v1alpha1.OperationRebalance), + }, + }, } for _, testCase := range testCases { sortedCCOperations := sortOperations(testCase.ccOperations) diff --git a/controllers/cruisecontroltask_controller.go b/controllers/cruisecontroltask_controller.go index ca823fbf7..2891bb19a 100644 --- a/controllers/cruisecontroltask_controller.go +++ b/controllers/cruisecontroltask_controller.go @@ -48,6 +48,7 @@ const ( DefaultRequeueAfterTimeInSec = 20 BrokerCapacityDisk = "DISK" BrokerCapacity = "capacity" + True = "true" ) // CruiseControlTaskReconciler reconciles a kafka cluster object @@ -62,6 +63,7 @@ type CruiseControlTaskReconciler struct { // +kubebuilder:rbac:groups=kafka.banzaicloud.io,resources=kafkaclusters/status,verbs=get;update;patch +//nolint:funlen,gocyclo func (r *CruiseControlTaskReconciler) Reconcile(ctx context.Context, request ctrl.Request) (ctrl.Result, error) { log := logr.FromContextOrDiscard(ctx) @@ -169,6 +171,55 @@ func (r *CruiseControlTaskReconciler) Reconcile(ctx context.Context, request ctr removeTask.SetCruiseControlOperationRef(cruiseControlOpRef) removeTask.SetStateScheduled() + case tasksAndStates.NumActiveTasksByOp(banzaiv1alpha1.OperationRemoveDisks) > 0: + brokerLogDirsToRemove := make(map[string][]string) + logDirsByBroker, err := scaler.LogDirsByBroker(ctx) + if err != nil { + return requeueWithError(log, "failed to get list of brokerIdsToLogDirs per broker from Cruise Control", err) + } + + for _, task := range tasksAndStates.GetActiveTasksByOp(banzaiv1alpha1.OperationRemoveDisks) { + if task == nil { + continue + } + + brokerID := task.BrokerID + volume := task.Volume + if _, ok := brokerLogDirsToRemove[brokerID]; !ok { + brokerLogDirsToRemove[brokerID] = []string{} + } + + found := false + if onlineDirs, ok := logDirsByBroker[brokerID][scale.LogDirStateOnline]; ok { + for _, dir := range onlineDirs { + if strings.HasPrefix(strings.TrimSpace(dir), strings.TrimSpace(volume)) { + brokerLogDirsToRemove[brokerID] = append(brokerLogDirsToRemove[brokerID], dir) + found = true + break + } + } + } + + if !found { + return requeueWithError(log, fmt.Sprintf("volume %s not found for broker %s in CC online log dirs", volume, brokerID), errors.New("log dir not found")) + } + } + + // create the cruise control operation + cruiseControlOpRef, err := r.removeDisks(ctx, instance, operationTTLSecondsAfterFinished, brokerLogDirsToRemove) + if err != nil { + return requeueWithError(log, fmt.Sprintf("creating CruiseControlOperation for disk removal has failed, brokerID and brokerIdsToLogDirs: %s", brokerLogDirsToRemove), err) + } + + for _, task := range tasksAndStates.GetActiveTasksByOp(banzaiv1alpha1.OperationRemoveDisks) { + if task == nil { + continue + } + + task.SetCruiseControlOperationRef(cruiseControlOpRef) + task.SetStateScheduled() + } + case tasksAndStates.NumActiveTasksByOp(banzaiv1alpha1.OperationRebalance) > 0: brokerIDs := make([]string, 0) for _, task := range tasksAndStates.GetActiveTasksByOp(banzaiv1alpha1.OperationRebalance) { @@ -280,25 +331,31 @@ func getUnavailableBrokers(ctx context.Context, scaler scale.CruiseControlScaler } func (r *CruiseControlTaskReconciler) addBrokers(ctx context.Context, kafkaCluster *banzaiv1beta1.KafkaCluster, ttlSecondsAfterFinished *int, bokerIDs []string) (corev1.LocalObjectReference, error) { - return r.createCCOperation(ctx, kafkaCluster, banzaiv1alpha1.ErrorPolicyRetry, ttlSecondsAfterFinished, banzaiv1alpha1.OperationAddBroker, bokerIDs, false) + return r.createCCOperation(ctx, kafkaCluster, banzaiv1alpha1.ErrorPolicyRetry, ttlSecondsAfterFinished, banzaiv1alpha1.OperationAddBroker, bokerIDs, false, nil) } func (r *CruiseControlTaskReconciler) removeBroker(ctx context.Context, kafkaCluster *banzaiv1beta1.KafkaCluster, ttlSecondsAfterFinished *int, brokerID string) (corev1.LocalObjectReference, error) { - return r.createCCOperation(ctx, kafkaCluster, banzaiv1alpha1.ErrorPolicyRetry, ttlSecondsAfterFinished, banzaiv1alpha1.OperationRemoveBroker, []string{brokerID}, false) + return r.createCCOperation(ctx, kafkaCluster, banzaiv1alpha1.ErrorPolicyRetry, ttlSecondsAfterFinished, banzaiv1alpha1.OperationRemoveBroker, []string{brokerID}, false, nil) +} + +func (r *CruiseControlTaskReconciler) removeDisks(ctx context.Context, kafkaCluster *banzaiv1beta1.KafkaCluster, ttlSecondsAfterFinished *int, brokerIdsToRemovedLogDirs map[string][]string) (corev1.LocalObjectReference, error) { + return r.createCCOperation(ctx, kafkaCluster, banzaiv1alpha1.ErrorPolicyRetry, ttlSecondsAfterFinished, banzaiv1alpha1.OperationRemoveDisks, nil, false, brokerIdsToRemovedLogDirs) } func (r *CruiseControlTaskReconciler) rebalanceDisks(ctx context.Context, kafkaCluster *banzaiv1beta1.KafkaCluster, ttlSecondsAfterFinished *int, bokerIDs []string, isJBOD bool) (corev1.LocalObjectReference, error) { - return r.createCCOperation(ctx, kafkaCluster, banzaiv1alpha1.ErrorPolicyRetry, ttlSecondsAfterFinished, banzaiv1alpha1.OperationRebalance, bokerIDs, isJBOD) + return r.createCCOperation(ctx, kafkaCluster, banzaiv1alpha1.ErrorPolicyRetry, ttlSecondsAfterFinished, banzaiv1alpha1.OperationRebalance, bokerIDs, isJBOD, nil) } +//nolint:unparam func (r *CruiseControlTaskReconciler) createCCOperation( ctx context.Context, kafkaCluster *banzaiv1beta1.KafkaCluster, errorPolicy banzaiv1alpha1.ErrorPolicyType, ttlSecondsAfterFinished *int, operationType banzaiv1alpha1.CruiseControlTaskOperation, - bokerIDs []string, + brokerIDs []string, isJBOD bool, + logDirsByBrokerID map[string][]string, ) (corev1.LocalObjectReference, error) { operation := &banzaiv1alpha1.CruiseControlOperation{ ObjectMeta: metav1.ObjectMeta{ @@ -323,20 +380,32 @@ func (r *CruiseControlTaskReconciler) createCCOperation( } operation.Status.CurrentTask = &banzaiv1alpha1.CruiseControlTask{ - Operation: operationType, - Parameters: map[string]string{ - "exclude_recently_demoted_brokers": "true", - "exclude_recently_removed_brokers": "true", - }, + Operation: operationType, + Parameters: make(map[string]string), } - if operationType == banzaiv1alpha1.OperationRebalance { - operation.Status.CurrentTask.Parameters["destination_broker_ids"] = strings.Join(bokerIDs, ",") + if operationType != banzaiv1alpha1.OperationRemoveDisks { + operation.Status.CurrentTask.Parameters[scale.ParamExcludeDemoted] = True + operation.Status.CurrentTask.Parameters[scale.ParamExcludeRemoved] = True + } + + switch { + case operationType == banzaiv1alpha1.OperationRebalance: + operation.Status.CurrentTask.Parameters[scale.ParamDestbrokerIDs] = strings.Join(brokerIDs, ",") if isJBOD { - operation.Status.CurrentTask.Parameters["rebalance_disk"] = "true" + operation.Status.CurrentTask.Parameters[scale.ParamRebalanceDisk] = True } - } else { - operation.Status.CurrentTask.Parameters["brokerid"] = strings.Join(bokerIDs, ",") + case operationType == banzaiv1alpha1.OperationRemoveDisks: + pairs := make([]string, 0, len(logDirsByBrokerID)) + for brokerID, logDirs := range logDirsByBrokerID { + for _, logDir := range logDirs { + pair := fmt.Sprintf("%s-%s", brokerID, logDir) + pairs = append(pairs, pair) + } + } + operation.Status.CurrentTask.Parameters[scale.ParamBrokerIDAndLogDirs] = strings.Join(pairs, ",") + default: + operation.Status.CurrentTask.Parameters[scale.ParamBrokerID] = strings.Join(brokerIDs, ",") } if err := r.Status().Update(ctx, operation); err != nil { @@ -517,7 +586,8 @@ func getActiveTasksFromCluster(instance *banzaiv1beta1.KafkaCluster) *CruiseCont } for mountPath, volumeState := range brokerStatus.GracefulActionState.VolumeStates { - if volumeState.CruiseControlVolumeState.IsActive() { + switch { + case volumeState.CruiseControlVolumeState.IsDiskRebalance(): t := &CruiseControlTask{ BrokerID: brokerId, Volume: mountPath, @@ -526,6 +596,16 @@ func getActiveTasksFromCluster(instance *banzaiv1beta1.KafkaCluster) *CruiseCont CruiseControlOperationReference: volumeState.CruiseControlOperationReference, } tasksAndStates.Add(t) + + case volumeState.CruiseControlVolumeState.IsDiskRemoval(): + t := &CruiseControlTask{ + BrokerID: brokerId, + Volume: mountPath, + VolumeState: volumeState.CruiseControlVolumeState, + Operation: banzaiv1alpha1.OperationRemoveDisks, + CruiseControlOperationReference: volumeState.CruiseControlOperationReference, + } + tasksAndStates.Add(t) } } } diff --git a/controllers/cruisecontroltask_controller_test.go b/controllers/cruisecontroltask_controller_test.go index 5ff734e41..94a32ec44 100644 --- a/controllers/cruisecontroltask_controller_test.go +++ b/controllers/cruisecontroltask_controller_test.go @@ -15,9 +15,20 @@ package controllers import ( + "context" "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + + "github.com/banzaicloud/koperator/pkg/scale" + + banzaiv1alpha1 "github.com/banzaicloud/koperator/api/v1alpha1" + "github.com/banzaicloud/koperator/api/v1beta1" + "github.com/banzaicloud/koperator/pkg/resources/kafka/mocks" ) func TestBrokersJBODSelector(t *testing.T) { @@ -295,3 +306,108 @@ func TestBrokersJBODSelector(t *testing.T) { assert.ElementsMatch(t, testCase.expectedBrokersNotJBOD, brokersNotJBOD, "testName", testCase.testName) } } + +func TestCreateCCOperation(t *testing.T) { + t.Parallel() + ctx := context.Background() + ttlSecondsAfterFinished := 60 + + testCases := []struct { + operationType banzaiv1alpha1.CruiseControlTaskOperation + brokerIDs []string + isJBOD bool + brokerIdsToLogDirs map[string][]string + parameterCheck func(t *testing.T, params map[string]string) + }{ + { + operationType: banzaiv1alpha1.OperationAddBroker, + brokerIDs: []string{"1", "2", "3"}, + isJBOD: false, + brokerIdsToLogDirs: nil, + parameterCheck: func(t *testing.T, params map[string]string) { + assert.Equal(t, "1,2,3", params[scale.ParamBrokerID]) + assert.Equal(t, "true", params[scale.ParamExcludeDemoted]) + assert.Equal(t, "true", params[scale.ParamExcludeRemoved]) + }, + }, + { + operationType: banzaiv1alpha1.OperationRemoveBroker, + brokerIDs: []string{"1"}, + isJBOD: false, + brokerIdsToLogDirs: nil, + parameterCheck: func(t *testing.T, params map[string]string) { + assert.Equal(t, "1", params[scale.ParamBrokerID]) + assert.Equal(t, "true", params[scale.ParamExcludeDemoted]) + assert.Equal(t, "true", params[scale.ParamExcludeRemoved]) + }, + }, + { + operationType: banzaiv1alpha1.OperationRemoveDisks, + brokerIDs: []string{"1", "2"}, + isJBOD: false, + brokerIdsToLogDirs: map[string][]string{ + "1": {"logdir1", "logdir2"}, + "2": {"logdir1"}, + }, + parameterCheck: func(t *testing.T, params map[string]string) { + // can be in any order + expectedString1 := "1-logdir1,1-logdir2,2-logdir1" + expectedString2 := "2-logdir1,1-logdir1,1-logdir2" + assert.Contains(t, []string{expectedString1, expectedString2}, params[scale.ParamBrokerIDAndLogDirs]) + }, + }, + { + operationType: banzaiv1alpha1.OperationRebalance, + brokerIDs: []string{"1", "2", "3"}, + isJBOD: true, + brokerIdsToLogDirs: nil, + parameterCheck: func(t *testing.T, params map[string]string) { + assert.Equal(t, "1,2,3", params[scale.ParamDestbrokerIDs]) + assert.Equal(t, "true", params[scale.ParamRebalanceDisk]) + assert.Equal(t, "true", params[scale.ParamExcludeDemoted]) + assert.Equal(t, "true", params[scale.ParamExcludeRemoved]) + }, + }, + } + + for _, testCase := range testCases { + mockClient := new(mocks.Client) + scheme := runtime.NewScheme() + _ = v1beta1.AddToScheme(scheme) + _ = corev1.AddToScheme(scheme) + + r := CruiseControlTaskReconciler{ + Client: mockClient, + Scheme: scheme, + } + + kafkaCluster := &v1beta1.KafkaCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "kafka", + Namespace: "kafka", + }} + + // Mock the Create call and capture the operation + var createdOperation *banzaiv1alpha1.CruiseControlOperation + mockClient.On("Create", ctx, mock.IsType(&banzaiv1alpha1.CruiseControlOperation{})).Run(func(args mock.Arguments) { + createdOperation = args.Get(1).(*banzaiv1alpha1.CruiseControlOperation) + createdOperation.ObjectMeta.Name = "generated-name" + }).Return(nil) + + // Mock the Status call + mockClient.On("Status").Return(mockClient) + + // Mock the Update call + mockClient.On("Update", ctx, mock.IsType(&banzaiv1alpha1.CruiseControlOperation{})).Run(func(args mock.Arguments) { + arg := args.Get(1).(*banzaiv1alpha1.CruiseControlOperation) + createdOperation.Status = arg.Status + }).Return(nil) + + _, err := r.createCCOperation(ctx, kafkaCluster, banzaiv1alpha1.ErrorPolicyRetry, &ttlSecondsAfterFinished, testCase.operationType, testCase.brokerIDs, testCase.isJBOD, testCase.brokerIdsToLogDirs) + assert.NoError(t, err) + + // Use the captured operation for further assertions + assert.Equal(t, testCase.operationType, createdOperation.Status.CurrentTask.Operation) + testCase.parameterCheck(t, createdOperation.Status.CurrentTask.Parameters) + } +} diff --git a/controllers/cruisecontroltask_types.go b/controllers/cruisecontroltask_types.go index ddbba3d08..c04f4af19 100644 --- a/controllers/cruisecontroltask_types.go +++ b/controllers/cruisecontroltask_types.go @@ -41,7 +41,7 @@ func (t *CruiseControlTask) IsRequired() bool { switch t.Operation { case koperatorv1alpha1.OperationAddBroker, koperatorv1alpha1.OperationRemoveBroker: return t.BrokerState.IsRequiredState() - case koperatorv1alpha1.OperationRebalance: + case koperatorv1alpha1.OperationRebalance, koperatorv1alpha1.OperationRemoveDisks: return t.VolumeState.IsRequiredState() } return false @@ -61,7 +61,7 @@ func (t *CruiseControlTask) Apply(instance *koperatorv1beta1.KafkaCluster) { state.GracefulActionState.CruiseControlOperationReference = t.CruiseControlOperationReference instance.Status.BrokersState[t.BrokerID] = state } - case koperatorv1alpha1.OperationRebalance: + case koperatorv1alpha1.OperationRebalance, koperatorv1alpha1.OperationRemoveDisks: if state, ok := instance.Status.BrokersState[t.BrokerID]; ok { if volState, ok := state.GracefulActionState.VolumeStates[t.Volume]; ok { volState.CruiseControlVolumeState = t.VolumeState @@ -88,10 +88,14 @@ func (t *CruiseControlTask) SetStateScheduled() { t.BrokerState = koperatorv1beta1.GracefulDownscaleScheduled case koperatorv1alpha1.OperationRebalance: t.VolumeState = koperatorv1beta1.GracefulDiskRebalanceScheduled + case koperatorv1alpha1.OperationRemoveDisks: + t.VolumeState = koperatorv1beta1.GracefulDiskRemovalScheduled } } // FromResult takes a scale.Result instance returned by scale.CruiseControlScaler and updates its own state accordingly. +// +//nolint:gocyclo func (t *CruiseControlTask) FromResult(operation *koperatorv1alpha1.CruiseControlOperation) { if t == nil { return @@ -135,6 +139,24 @@ func (t *CruiseControlTask) FromResult(operation *koperatorv1alpha1.CruiseContro t.BrokerState = koperatorv1beta1.GracefulDownscaleScheduled } + case koperatorv1alpha1.OperationRemoveDisks: + switch { + case operation == nil: + t.VolumeState = koperatorv1beta1.GracefulDiskRemovalSucceeded + case operation.IsErrorPolicyIgnore() && operation.CurrentTaskState() == koperatorv1beta1.CruiseControlTaskCompletedWithError: + t.VolumeState = koperatorv1beta1.GracefulDiskRemovalSucceeded + case operation.IsPaused() && operation.CurrentTaskState() == koperatorv1beta1.CruiseControlTaskCompletedWithError: + t.VolumeState = koperatorv1beta1.GracefulDiskRemovalPaused + case operation.CurrentTaskState() == koperatorv1beta1.CruiseControlTaskActive, operation.CurrentTaskState() == koperatorv1beta1.CruiseControlTaskInExecution: + t.VolumeState = koperatorv1beta1.GracefulDiskRemovalRunning + case operation.CurrentTaskState() == koperatorv1beta1.CruiseControlTaskCompleted: + t.VolumeState = koperatorv1beta1.GracefulDiskRemovalSucceeded + case operation.CurrentTaskState() == koperatorv1beta1.CruiseControlTaskCompletedWithError: + t.VolumeState = koperatorv1beta1.GracefulDiskRemovalCompletedWithError + case operation.CurrentTaskState() == "": + t.VolumeState = koperatorv1beta1.GracefulDiskRemovalScheduled + } + case koperatorv1alpha1.OperationRebalance: switch { case operation == nil: diff --git a/controllers/kafkacluster_controller.go b/controllers/kafkacluster_controller.go index 15366c667..fcdd74776 100644 --- a/controllers/kafkacluster_controller.go +++ b/controllers/kafkacluster_controller.go @@ -150,6 +150,7 @@ func (r *KafkaClusterReconciler) Reconcile(ctx context.Context, request ctrl.Req RequeueAfter: time.Duration(15) * time.Second, }, nil case errors.As(err, &errorfactory.CruiseControlTaskRunning{}): + log.Info("Cruise Control task running, requeuing", "error", err.Error()) return ctrl.Result{ RequeueAfter: time.Duration(20) * time.Second, }, nil diff --git a/controllers/tests/cruisecontroloperation_controller_test.go b/controllers/tests/cruisecontroloperation_controller_test.go index 09974a3fc..9f99b4994 100644 --- a/controllers/tests/cruisecontroloperation_controller_test.go +++ b/controllers/tests/cruisecontroloperation_controller_test.go @@ -44,8 +44,8 @@ var _ = Describe("CruiseControlTaskReconciler", func() { namespaceObj *corev1.Namespace kafkaClusterCRName string kafkaCluster *v1beta1.KafkaCluster - opName1 string = "operation1" - opName2 string = "operation2" + opName1 = "operation1" + opName2 = "operation2" ) BeforeEach(func() { atomic.AddUint64(&count, 1) @@ -305,6 +305,60 @@ var _ = Describe("CruiseControlTaskReconciler", func() { }, 10*time.Second, 500*time.Millisecond).Should(BeTrue()) }) }) + When("there is an errored remove_disks and a rebalance disks operation for the same broker", Serial, func() { + JustBeforeEach(func() { + cruiseControlOperationReconciler.ScaleFactory = NewMockScaleFactory(getScaleMock6()) + // Remove_disk operation - errored + operation := generateCruiseControlOperation(opName1, namespace, kafkaCluster.GetName()) + err := k8sClient.Create(context.Background(), &operation) + Expect(err).NotTo(HaveOccurred()) + + operation.Status.CurrentTask = &v1alpha1.CruiseControlTask{ + Operation: v1alpha1.OperationRemoveDisks, + Finished: &metav1.Time{Time: time.Now().Add(-time.Second*v1alpha1.DefaultRetryBackOffDurationSec - 10)}, + Parameters: map[string]string{ + scale.ParamBrokerIDAndLogDirs: "101-logdir1", + }, + } + err = k8sClient.Status().Update(context.Background(), &operation) + Expect(err).NotTo(HaveOccurred()) + // Rebalance operation + operation = generateCruiseControlOperation(opName2, namespace, kafkaCluster.GetName()) + err = k8sClient.Create(context.Background(), &operation) + Expect(err).NotTo(HaveOccurred()) + operation.Status.CurrentTask = &v1alpha1.CruiseControlTask{ + Operation: v1alpha1.OperationRebalance, + Parameters: map[string]string{ + scale.ParamDestbrokerIDs: "101,102", + }, + } + err = k8sClient.Status().Update(context.Background(), &operation) + Expect(err).NotTo(HaveOccurred()) + }) + It("should mark the removed disk task as paused and should execute the rebalance", func() { + Eventually(func() bool { + removeDisksOp := v1alpha1.CruiseControlOperation{} + err := k8sClient.Get(context.Background(), client.ObjectKey{ + Namespace: kafkaCluster.Namespace, + Name: opName1, + }, &removeDisksOp) + if err != nil { + return false + } + rebalanceOp := v1alpha1.CruiseControlOperation{} + err = k8sClient.Get(context.Background(), client.ObjectKey{ + Namespace: kafkaCluster.Namespace, + Name: opName2, + }, &rebalanceOp) + if err != nil { + return false + } + + return rebalanceOp.CurrentTaskState() == v1beta1.CruiseControlTaskCompleted && + removeDisksOp.GetLabels()[v1alpha1.PauseLabel] == v1alpha1.True + }, 10*time.Second, 500*time.Millisecond).Should(BeTrue()) + }) + }) }) func generateCruiseControlOperation(name, namespace, kafkaRef string) v1alpha1.CruiseControlOperation { @@ -456,6 +510,37 @@ func getScaleMock5() *mocks.MockCruiseControlScaler { return scaleMock } +func getScaleMock6() *mocks.MockCruiseControlScaler { + mockCtrl := gomock.NewController(GinkgoT()) + scaleMock := mocks.NewMockCruiseControlScaler(mockCtrl) + scaleMock.EXPECT().IsUp(gomock.Any()).Return(true).AnyTimes() + + userTaskResult := []*scale.Result{scaleResultPointer(scale.Result{ + TaskID: "12345", + StartedAt: "Sat, 27 Aug 2022 12:22:21 GMT", + State: v1beta1.CruiseControlTaskCompletedWithError, + })} + scaleMock.EXPECT().UserTasks(gomock.Any(), gomock.Any()).Return(userTaskResult, nil).AnyTimes() + scaleMock.EXPECT().Status(gomock.Any()).Return(scale.CruiseControlStatus{ + ExecutorReady: true, + MonitorReady: true, + AnalyzerReady: true, + }, nil).AnyTimes() + scaleMock.EXPECT().RebalanceWithParams(gomock.Any(), gomock.All()).Return(scaleResultPointer(scale.Result{ + TaskID: "12346", + StartedAt: "Sat, 27 Aug 2022 12:22:21 GMT", + State: v1beta1.CruiseControlTaskCompleted, + }), nil).Times(1) + + scaleMock.EXPECT().RemoveDisksWithParams(gomock.Any(), gomock.All()).Return(scaleResultPointer(scale.Result{ + TaskID: "12345", + StartedAt: "Sat, 27 Aug 2022 12:22:21 GMT", + State: v1beta1.CruiseControlTaskCompletedWithError, + }), nil).AnyTimes() + + return scaleMock +} + func scaleResultPointer(res scale.Result) *scale.Result { return &res } diff --git a/controllers/tests/mocks/scale.go b/controllers/tests/mocks/scale.go index 6c9259baa..947fbe1a4 100644 --- a/controllers/tests/mocks/scale.go +++ b/controllers/tests/mocks/scale.go @@ -52,6 +52,21 @@ func (m *MockCruiseControlScaler) EXPECT() *MockCruiseControlScalerMockRecorder return m.recorder } +// RemoveDisksWithParams mocks base method. +func (m *MockCruiseControlScaler) RemoveDisksWithParams(ctx context.Context, params map[string]string) (*scale.Result, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RemoveDisksWithParams", ctx, params) + ret0, _ := ret[0].(*scale.Result) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// RemoveDisksWithParams indicates an expected call of RemoveDisksWithParams. +func (mr *MockCruiseControlScalerMockRecorder) RemoveDisksWithParams(ctx, params interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemoveDisksWithParams", reflect.TypeOf((*MockCruiseControlScaler)(nil).RemoveDisksWithParams), ctx, params) +} + // AddBrokers mocks base method. func (m *MockCruiseControlScaler) AddBrokers(ctx context.Context, brokerIDs ...string) (*scale.Result, error) { m.ctrl.T.Helper() diff --git a/main.go b/main.go index cb015ed06..fbe4f7feb 100644 --- a/main.go +++ b/main.go @@ -217,15 +217,6 @@ func main() { } if !webhookDisabled { - err = ctrl.NewWebhookManagedBy(mgr).For(&banzaicloudv1beta1.KafkaCluster{}). - WithValidator(webhooks.KafkaClusterValidator{ - Log: mgr.GetLogger().WithName("webhooks").WithName("KafkaCluster"), - }). - Complete() - if err != nil { - setupLog.Error(err, "unable to create validating webhook", "Kind", "KafkaCluster") - os.Exit(1) - } err = ctrl.NewWebhookManagedBy(mgr).For(&banzaicloudv1alpha1.KafkaTopic{}). WithValidator(webhooks.KafkaTopicValidator{ Client: mgr.GetClient(), diff --git a/pkg/k8sutil/status.go b/pkg/k8sutil/status.go index fe6713b56..5ba4a68a8 100644 --- a/pkg/k8sutil/status.go +++ b/pkg/k8sutil/status.go @@ -179,8 +179,8 @@ func generateBrokerState(brokerIDs []string, cluster *banzaicloudv1beta1.KafkaCl cluster.Status.BrokersState = brokersState } -// DeleteStatus deletes the given broker state from the CR -func DeleteStatus(c client.Client, brokerID string, cluster *banzaicloudv1beta1.KafkaCluster, logger logr.Logger) error { +// DeleteBrokerStatus deletes the given broker state from the CR +func DeleteBrokerStatus(c client.Client, brokerID string, cluster *banzaicloudv1beta1.KafkaCluster, logger logr.Logger) error { typeMeta := cluster.TypeMeta brokerStatus := cluster.Status.BrokersState @@ -224,6 +224,55 @@ func DeleteStatus(c client.Client, brokerID string, cluster *banzaicloudv1beta1. return nil } +// DeleteVolumeStatus deletes the given volume state for the given broker from the CR +func DeleteVolumeStatus(c client.Client, brokerID string, mountPath string, cluster *banzaicloudv1beta1.KafkaCluster, logger logr.Logger) error { + typeMeta := cluster.TypeMeta + + brokerStatus := cluster.Status.BrokersState + + if status, ok := brokerStatus[brokerID]; ok { + delete(status.GracefulActionState.VolumeStates, mountPath) + } + + cluster.Status.BrokersState = brokerStatus + + err := c.Status().Update(context.Background(), cluster) + if apierrors.IsNotFound(err) { + err = c.Update(context.Background(), cluster) + } + if err != nil { + if !apierrors.IsConflict(err) { + return errors.WrapIff(err, "could not delete Kafka cluster broker %s volume %s state ", brokerID, mountPath) + } + err := c.Get(context.TODO(), types.NamespacedName{ + Namespace: cluster.Namespace, + Name: cluster.Name, + }, cluster) + if err != nil { + return errors.WrapIf(err, "could not get config for updating status") + } + brokerStatus = cluster.Status.BrokersState + + if status, ok := brokerStatus[brokerID]; ok { + delete(status.GracefulActionState.VolumeStates, mountPath) + } + + cluster.Status.BrokersState = brokerStatus + err = c.Status().Update(context.Background(), cluster) + if apierrors.IsNotFound(err) { + err = c.Update(context.Background(), cluster) + } + if err != nil { + return errors.WrapIff(err, "could not delete Kafka clusters broker %s volume %s state ", brokerID, mountPath) + } + } + + // update loses the typeMeta of the config that's used later when setting ownerrefs + cluster.TypeMeta = typeMeta + logger.Info(fmt.Sprintf("Kafka broker %s volume %s state deleted", brokerID, mountPath)) + return nil +} + // UpdateCRStatus updates the cluster state func UpdateCRStatus(c client.Client, cluster *banzaicloudv1beta1.KafkaCluster, state interface{}, logger logr.Logger) error { typeMeta := cluster.TypeMeta diff --git a/pkg/resources/kafka/configmap.go b/pkg/resources/kafka/configmap.go index 4c00eabf0..3c900e7cd 100644 --- a/pkg/resources/kafka/configmap.go +++ b/pkg/resources/kafka/configmap.go @@ -15,16 +15,12 @@ package kafka import ( - "context" "fmt" "sort" "strings" "emperror.dev/errors" "github.com/go-logr/logr" - apierrors "k8s.io/apimachinery/pkg/api/errors" - "sigs.k8s.io/controller-runtime/pkg/client" - corev1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1" @@ -111,28 +107,11 @@ func (r *Reconciler) getConfigProperties(bConfig *v1beta1.BrokerConfig, id int32 log.Error(err, fmt.Sprintf("setting '%s' in broker configuration resulted an error", kafkautils.KafkaConfigBrokerId)) } - // This logic prevents the removal of the mountPath from the broker configmap - brokerConfigMapName := fmt.Sprintf(brokerConfigTemplate+"-%d", r.KafkaCluster.Name, id) - var brokerConfigMapOld v1.ConfigMap - err = r.Client.Get(context.Background(), client.ObjectKey{Name: brokerConfigMapName, Namespace: r.KafkaCluster.GetNamespace()}, &brokerConfigMapOld) - if err != nil && !apierrors.IsNotFound(err) { - log.Error(err, "getting broker configmap from the Kubernetes API server resulted an error") - } - - mountPathsOld, err := getMountPathsFromBrokerConfigMap(&brokerConfigMapOld) - if err != nil { - log.Error(err, "could not get mountPaths from broker configmap", v1beta1.BrokerIdLabelKey, id) - } - mountPathsNew := generateStorageConfig(bConfig.StorageConfigs) - mountPathsMerged, isMountPathRemoved := mergeMountPaths(mountPathsOld, mountPathsNew) - - if isMountPathRemoved { - log.Error(errors.New("removed storage is found in the KafkaCluster CR"), "removing storage from broker is not supported", v1beta1.BrokerIdLabelKey, id, "mountPaths", mountPathsOld, "mountPaths in kafkaCluster CR ", mountPathsNew) - } - - if len(mountPathsMerged) != 0 { - if err := config.Set(kafkautils.KafkaConfigBrokerLogDirectory, strings.Join(mountPathsMerged, ",")); err != nil { - log.Error(err, fmt.Sprintf("setting '%s' in broker configuration resulted an error", kafkautils.KafkaConfigBrokerLogDirectory)) + // Storage configuration + storageConf := generateStorageConfig(bConfig.StorageConfigs) + if len(storageConf) > 0 { + if err := config.Set(kafkautils.KafkaConfigBrokerLogDirectory, storageConf); err != nil { + log.Error(err, "setting log.dirs in broker configuration resulted an error") } } diff --git a/pkg/resources/kafka/kafka.go b/pkg/resources/kafka/kafka.go index ba8e8a9e2..60ffac12c 100644 --- a/pkg/resources/kafka/kafka.go +++ b/pkg/resources/kafka/kafka.go @@ -549,7 +549,7 @@ func (r *Reconciler) reconcileKafkaPodDelete(ctx context.Context, log logr.Logge log.V(1).Info("pvc for broker deleted", "pvc name", volume.PersistentVolumeClaim.ClaimName, v1beta1.BrokerIdLabelKey, broker.Labels[v1beta1.BrokerIdLabelKey]) } } - err = k8sutil.DeleteStatus(r.Client, broker.Labels[v1beta1.BrokerIdLabelKey], r.KafkaCluster, log) + err = k8sutil.DeleteBrokerStatus(r.Client, broker.Labels[v1beta1.BrokerIdLabelKey], r.KafkaCluster, log) if err != nil { return errors.WrapIfWithDetails(err, "could not delete status for broker", "id", broker.Labels[v1beta1.BrokerIdLabelKey]) } @@ -961,6 +961,7 @@ func (r *Reconciler) isPodTainted(log logr.Logger, pod *corev1.Pod) bool { func (r *Reconciler) reconcileKafkaPvc(ctx context.Context, log logr.Logger, brokersDesiredPvcs map[string][]*corev1.PersistentVolumeClaim) error { brokersVolumesState := make(map[string]map[string]v1beta1.VolumeState) var brokerIds []string + waitForDiskRemovalToFinish := false for brokerId, desiredPvcs := range brokersDesiredPvcs { desiredType := reflect.TypeOf(&corev1.PersistentVolumeClaim{}) @@ -977,6 +978,66 @@ func (r *Reconciler) reconcileKafkaPvc(ctx context.Context, log logr.Logger, bro log = log.WithValues("kind", desiredType) + err := r.Client.List(ctx, pvcList, + client.InNamespace(r.KafkaCluster.GetNamespace()), matchingLabels) + if err != nil { + return errorfactory.New(errorfactory.APIFailure{}, err, "getting resource failed", "kind", desiredType) + } + + // Handle disk removal + if len(pvcList.Items) > len(desiredPvcs) { + for _, pvc := range pvcList.Items { + foundInDesired := false + existingMountPath := pvc.Annotations["mountPath"] + + for _, desiredPvc := range desiredPvcs { + desiredMountPath := desiredPvc.Annotations["mountPath"] + + if existingMountPath == desiredMountPath { + foundInDesired = true + break + } + } + + if foundInDesired { + continue + } + + mountPathToRemove := existingMountPath + if brokerState, ok := r.KafkaCluster.Status.BrokersState[brokerId]; ok { + volumeStateStatus, found := brokerState.GracefulActionState.VolumeStates[mountPathToRemove] + if !found { + // If the state is not found, it means that the disk removal was done according to the disk removal succeeded branch + log.Info("Disk removal was completed, waiting for Rolling Upgrade to remove PVC", "brokerId", brokerId, "mountPath", mountPathToRemove) + } + + // Check the volume state + ccVolumeState := volumeStateStatus.CruiseControlVolumeState + switch { + case ccVolumeState.IsDiskRemovalSucceeded(): + if err := r.Client.Delete(ctx, &pvc); err != nil { + return errorfactory.New(errorfactory.APIFailure{}, err, "deleting resource failed", "kind", desiredType) + } + log.Info("resource deleted") + err = k8sutil.DeleteVolumeStatus(r.Client, brokerId, mountPathToRemove, r.KafkaCluster, log) + if err != nil { + return errors.WrapIfWithDetails(err, "could not delete volume status for broker volume", "brokerId", brokerId, "mountPath", mountPathToRemove) + } + case ccVolumeState.IsDiskRemoval(): + log.Info("Graceful disk removal is in progress", "brokerId", brokerId, "mountPath", mountPathToRemove) + waitForDiskRemovalToFinish = true + case ccVolumeState.IsDiskRebalance(): + log.Info("Graceful disk rebalance is in progress, waiting to mark disk for removal", "brokerId", brokerId, "mountPath", mountPathToRemove) + waitForDiskRemovalToFinish = true + default: + brokerVolumesState[mountPathToRemove] = v1beta1.VolumeState{CruiseControlVolumeState: v1beta1.GracefulDiskRemovalRequired} + log.Info("Marked the volume for removal", "brokerId", brokerId, "mountPath", mountPathToRemove) + waitForDiskRemovalToFinish = true + } + } + } + } + for _, desiredPvc := range desiredPvcs { currentPvc := desiredPvc.DeepCopy() log.V(1).Info("searching with label because name is empty") @@ -1007,8 +1068,10 @@ func (r *Reconciler) reconcileKafkaPvc(ctx context.Context, log logr.Logger, bro alreadyCreated = true // Checking pvc state, if bounded, so the broker has already restarted and the CC GracefulDiskRebalance has not happened yet, // then we make it happening with status update. - if _, ok := r.KafkaCluster.Status.BrokersState[brokerId].GracefulActionState.VolumeStates[mountPath]; !ok && - currentPvc.Status.Phase == corev1.ClaimBound { + // If disk removal was set, and the disk was added back, we also need to mark the volume for rebalance + volumeState, found := r.KafkaCluster.Status.BrokersState[brokerId].GracefulActionState.VolumeStates[mountPath] + if currentPvc.Status.Phase == corev1.ClaimBound && + (!found || volumeState.CruiseControlVolumeState.IsDiskRemoval()) { brokerVolumesState[mountPath] = v1beta1.VolumeState{CruiseControlVolumeState: v1beta1.GracefulDiskRebalanceRequired} } break @@ -1063,6 +1126,10 @@ func (r *Reconciler) reconcileKafkaPvc(ctx context.Context, log logr.Logger, bro } } + if waitForDiskRemovalToFinish { + return errorfactory.New(errorfactory.CruiseControlTaskRunning{}, errors.New("Disk removal pending"), "Disk removal pending") + } + return nil } @@ -1076,10 +1143,10 @@ func GetBrokersWithPendingOrRunningCCTask(kafkaCluster *v1beta1.KafkaCluster) [] (state.GracefulActionState.CruiseControlOperationReference != nil && state.GracefulActionState.CruiseControlState.IsRunningState()) { brokerIDs = append(brokerIDs, kafkaCluster.Spec.Brokers[i].Id) } else { - // Check if the volumes are rebalancing + // Check if the volumes are rebalancing or removing for _, volumeState := range state.GracefulActionState.VolumeStates { - if volumeState.CruiseControlVolumeState == v1beta1.GracefulDiskRebalanceRequired || - (volumeState.CruiseControlOperationReference != nil && volumeState.CruiseControlVolumeState.IsRunningState()) { + ccVolumeState := volumeState.CruiseControlVolumeState + if ccVolumeState.IsDiskRemoval() || ccVolumeState.IsDiskRebalance() { brokerIDs = append(brokerIDs, kafkaCluster.Spec.Brokers[i].Id) } } diff --git a/pkg/resources/kafka/kafka_test.go b/pkg/resources/kafka/kafka_test.go index be15a7ebe..6f75edc59 100644 --- a/pkg/resources/kafka/kafka_test.go +++ b/pkg/resources/kafka/kafka_test.go @@ -16,15 +16,16 @@ package kafka import ( "context" + "errors" "reflect" "testing" - "errors" - "github.com/go-logr/logr" "github.com/onsi/gomega" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "sigs.k8s.io/controller-runtime/pkg/client" + logf "sigs.k8s.io/controller-runtime/pkg/log" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -33,7 +34,7 @@ import ( "github.com/banzaicloud/koperator/api/v1alpha1" "github.com/banzaicloud/koperator/api/v1beta1" "github.com/banzaicloud/koperator/pkg/resources" - mocks "github.com/banzaicloud/koperator/pkg/resources/kafka/mocks" + "github.com/banzaicloud/koperator/pkg/resources/kafka/mocks" ) func TestGetBrokersWithPendingOrRunningCCTask(t *testing.T) { @@ -951,3 +952,278 @@ func TestGetServerPasswordKeysAndUsers(t *testing.T) { //nolint funlen }) } } + +func TestReconcileKafkaPvcDiskRemoval(t *testing.T) { + t.Parallel() + testCases := []struct { + testName string + brokersDesiredPvcs map[string][]*corev1.PersistentVolumeClaim + existingPvcs []*corev1.PersistentVolumeClaim + kafkaClusterSpec v1beta1.KafkaClusterSpec + kafkaClusterStatus v1beta1.KafkaClusterStatus + expectedError bool + expectedDeletePvc bool + expectedVolumeState map[string]v1beta1.CruiseControlVolumeState + }{ + { + testName: "If no disk removed, do nothing", + brokersDesiredPvcs: map[string][]*corev1.PersistentVolumeClaim{ + "0": { + createPvc("test-pvc-1", "0", "/path/to/mount1"), + createPvc("test-pvc-2", "0", "/path/to/mount2"), + }, + }, + existingPvcs: []*corev1.PersistentVolumeClaim{ + createPvc("test-pvc-1", "0", "/path/to/mount1"), + createPvc("test-pvc-2", "0", "/path/to/mount2"), + }, + kafkaClusterStatus: v1beta1.KafkaClusterStatus{}, + expectedError: false, + expectedDeletePvc: false, + expectedVolumeState: nil, + }, + { + testName: "If disk removed, mark it as GracefulDiskRemovalRequired and return error", + brokersDesiredPvcs: map[string][]*corev1.PersistentVolumeClaim{ + "0": { + createPvc("test-pvc-1", "0", "/path/to/mount1"), + }, + }, + existingPvcs: []*corev1.PersistentVolumeClaim{ + createPvc("test-pvc-1", "0", "/path/to/mount1"), + createPvc("test-pvc-2", "0", "/path/to/mount2"), + }, + kafkaClusterStatus: v1beta1.KafkaClusterStatus{ + BrokersState: map[string]v1beta1.BrokerState{ + "0": { + GracefulActionState: v1beta1.GracefulActionState{ + VolumeStates: map[string]v1beta1.VolumeState{ + "/path/to/mount2": { + CruiseControlVolumeState: v1beta1.GracefulDiskRebalanceSucceeded, + }, + }, + }, + }, + }, + }, + expectedError: true, + expectedDeletePvc: false, + expectedVolumeState: map[string]v1beta1.CruiseControlVolumeState{ + "/path/to/mount2": v1beta1.GracefulDiskRemovalRequired, + }, + }, + { + testName: "If disk is rebalancing, wait for it to finish", + brokersDesiredPvcs: map[string][]*corev1.PersistentVolumeClaim{ + "0": { + createPvc("test-pvc-1", "0", "/path/to/mount1"), + }, + }, + existingPvcs: []*corev1.PersistentVolumeClaim{ + createPvc("test-pvc-1", "0", "/path/to/mount1"), + createPvc("test-pvc-2", "0", "/path/to/mount2"), + }, + kafkaClusterStatus: v1beta1.KafkaClusterStatus{ + BrokersState: map[string]v1beta1.BrokerState{ + "0": { + GracefulActionState: v1beta1.GracefulActionState{ + VolumeStates: map[string]v1beta1.VolumeState{ + "/path/to/mount2": { + CruiseControlVolumeState: v1beta1.GracefulDiskRebalanceScheduled, + }, + }, + }, + }, + }, + }, + expectedError: true, + expectedDeletePvc: false, + expectedVolumeState: map[string]v1beta1.CruiseControlVolumeState{ + "/path/to/mount2": v1beta1.GracefulDiskRebalanceScheduled, + }, + }, + { + testName: "Wait for disk removal to finish", + brokersDesiredPvcs: map[string][]*corev1.PersistentVolumeClaim{ + "0": { + createPvc("test-pvc-1", "0", "/path/to/mount1"), + }, + }, + existingPvcs: []*corev1.PersistentVolumeClaim{ + createPvc("test-pvc-1", "0", "/path/to/mount1"), + createPvc("test-pvc-2", "0", "/path/to/mount2"), + }, + kafkaClusterStatus: v1beta1.KafkaClusterStatus{ + BrokersState: map[string]v1beta1.BrokerState{ + "0": { + GracefulActionState: v1beta1.GracefulActionState{ + VolumeStates: map[string]v1beta1.VolumeState{ + "/path/to/mount2": { + CruiseControlVolumeState: v1beta1.GracefulDiskRemovalRunning, + }, + }, + }, + }, + }, + }, + expectedError: true, + expectedDeletePvc: false, + expectedVolumeState: map[string]v1beta1.CruiseControlVolumeState{ + "/path/to/mount2": v1beta1.GracefulDiskRemovalRunning, + }, + }, + { + testName: "If disk removal successful, do not return error and delete pvc and volume state", + brokersDesiredPvcs: map[string][]*corev1.PersistentVolumeClaim{ + "0": { + createPvc("test-pvc-1", "0", "/path/to/mount1"), + }, + }, + existingPvcs: []*corev1.PersistentVolumeClaim{ + createPvc("test-pvc-1", "0", "/path/to/mount1"), + createPvc("test-pvc-2", "0", "/path/to/mount2"), + }, + kafkaClusterStatus: v1beta1.KafkaClusterStatus{ + BrokersState: map[string]v1beta1.BrokerState{ + "0": { + GracefulActionState: v1beta1.GracefulActionState{ + VolumeStates: map[string]v1beta1.VolumeState{ + "/path/to/mount2": { + CruiseControlVolumeState: v1beta1.GracefulDiskRemovalSucceeded, + }, + }, + }, + }, + }, + }, + expectedError: false, + expectedDeletePvc: true, + expectedVolumeState: nil, + }, + { + testName: "If disk removal failed, and it is readded, mark the disk as rebalancing", + brokersDesiredPvcs: map[string][]*corev1.PersistentVolumeClaim{ + "0": { + createPvc("test-pvc-1", "0", "/path/to/mount1"), + createPvc("test-pvc-2", "0", "/path/to/mount2"), + }, + }, + existingPvcs: []*corev1.PersistentVolumeClaim{ + createPvc("test-pvc-1", "0", "/path/to/mount1"), + createPvc("test-pvc-2", "0", "/path/to/mount2"), + }, + kafkaClusterStatus: v1beta1.KafkaClusterStatus{ + BrokersState: map[string]v1beta1.BrokerState{ + "0": { + GracefulActionState: v1beta1.GracefulActionState{ + VolumeStates: map[string]v1beta1.VolumeState{ + "/path/to/mount2": { + CruiseControlVolumeState: v1beta1.GracefulDiskRemovalCompletedWithError, + }, + }, + }, + }, + }, + }, + expectedError: false, + expectedDeletePvc: false, + expectedVolumeState: map[string]v1beta1.CruiseControlVolumeState{ + "/path/to/mount2": v1beta1.GracefulDiskRebalanceRequired, + }, + }, + } + + for _, test := range testCases { + mockClient := new(mocks.Client) + t.Run(test.testName, func(t *testing.T) { + r := Reconciler{ + Reconciler: resources.Reconciler{ + Client: mockClient, + KafkaCluster: &v1beta1.KafkaCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "kafka", + Namespace: "kafka", + }, + }, + }, + } + + // Set up the mockClient to return the provided test.existingPvcs + mockClient.On( + "List", + context.TODO(), + mock.IsType(&corev1.PersistentVolumeClaimList{}), + client.InNamespace("kafka"), + mock.AnythingOfType("client.MatchingLabels"), + ).Run(func(args mock.Arguments) { + arg := args.Get(1).(*corev1.PersistentVolumeClaimList) + + // Convert []*corev1.PersistentVolumeClaim to []corev1.PersistentVolumeClaim + pvcItems := make([]corev1.PersistentVolumeClaim, len(test.existingPvcs)) + for i, pvc := range test.existingPvcs { + pvcItems[i] = *pvc + } + + arg.Items = pvcItems + }).Return(nil) + + // Mock the client.Delete call + mockClient.On("Delete", context.TODO(), mock.AnythingOfType("*v1.PersistentVolumeClaim")).Return(nil) + + // Mock the status update call + mockClient.On("Status").Return(mockClient) + mockClient.On("Update", context.TODO(), mock.AnythingOfType("*v1beta1.KafkaCluster")).Run(func(args mock.Arguments) { + arg := args.Get(1).(*v1beta1.KafkaCluster) + r.KafkaCluster.Status = arg.Status + }).Return(nil) + + // Set up the r.KafkaCluster.Status with the provided test.kafkaClusterStatus + r.KafkaCluster.Status = test.kafkaClusterStatus + + // Call the reconcileKafkaPvc function with the provided test.brokersDesiredPvcs + err := r.reconcileKafkaPvc(context.TODO(), logf.Log, test.brokersDesiredPvcs) + + // Test that the expected error is returned + if test.expectedError { + assert.NotNil(t, err, "Expected an error but got nil") + } else { + assert.Nil(t, err, "Expected no error but got an error") + } + + // Test that PVC is deleted if expected + if test.expectedDeletePvc { + mockClient.AssertCalled(t, "Delete", context.TODO(), mock.AnythingOfType("*v1.PersistentVolumeClaim")) + } else { + mockClient.AssertNotCalled(t, "Delete", context.TODO(), mock.AnythingOfType("*v1.PersistentVolumeClaim")) + } + + // Test that the expected volume state is set + brokerState := r.KafkaCluster.Status.BrokersState["0"] + if test.expectedVolumeState != nil { + for mountPath, expectedState := range test.expectedVolumeState { + actualState, exists := brokerState.GracefulActionState.VolumeStates[mountPath] + assert.True(t, exists, "Expected volume state not found for mount path %s", mountPath) + assert.Equal(t, expectedState, actualState.CruiseControlVolumeState, "Volume state mismatch for mount path %s", mountPath) + } + } + }) + } +} + +//nolint:unparam +func createPvc(name, brokerId, mountPath string) *corev1.PersistentVolumeClaim { + return &corev1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Labels: map[string]string{ + v1beta1.BrokerIdLabelKey: brokerId, + }, + Annotations: map[string]string{ + "mountPath": mountPath, + }, + }, + Status: corev1.PersistentVolumeClaimStatus{ + Phase: corev1.ClaimBound, + }, + } +} diff --git a/pkg/scale/scale.go b/pkg/scale/scale.go index 3e213fbde..f97d6e5b3 100644 --- a/pkg/scale/scale.go +++ b/pkg/scale/scale.go @@ -35,11 +35,12 @@ import ( const ( // Constants for the Cruise Control operations parameters // Check for more details: https://github.com/linkedin/cruise-control/wiki/REST-APIs - paramBrokerID = "brokerid" - paramExcludeDemoted = "exclude_recently_demoted_brokers" - paramExcludeRemoved = "exclude_recently_removed_brokers" - paramDestbrokerIDs = "destination_broker_ids" - paramRebalanceDisk = "rebalance_disk" + ParamBrokerID = "brokerid" + ParamExcludeDemoted = "exclude_recently_demoted_brokers" + ParamExcludeRemoved = "exclude_recently_removed_brokers" + ParamDestbrokerIDs = "destination_broker_ids" + ParamRebalanceDisk = "rebalance_disk" + ParamBrokerIDAndLogDirs = "brokerid_and_logdirs" // Cruise Control API returns NullPointerException when a broker storage capacity calculations are missing // from the Cruise Control configurations nullPointerExceptionErrString = "NullPointerException" @@ -50,21 +51,25 @@ const ( var ( newCruiseControlScaler = createNewDefaultCruiseControlScaler addBrokerSupportedParams = map[string]struct{}{ - paramBrokerID: {}, - paramExcludeDemoted: {}, - paramExcludeRemoved: {}, + ParamBrokerID: {}, + ParamExcludeDemoted: {}, + ParamExcludeRemoved: {}, } removeBrokerSupportedParams = map[string]struct{}{ - paramBrokerID: {}, - paramExcludeDemoted: {}, - paramExcludeRemoved: {}, + ParamBrokerID: {}, + ParamExcludeDemoted: {}, + ParamExcludeRemoved: {}, } rebalanceSupportedParams = map[string]struct{}{ - paramDestbrokerIDs: {}, - paramRebalanceDisk: {}, - paramExcludeDemoted: {}, - paramExcludeRemoved: {}, - } + ParamDestbrokerIDs: {}, + ParamRebalanceDisk: {}, + ParamExcludeDemoted: {}, + ParamExcludeRemoved: {}, + } + // TODO use this map to validate the parameters + //removeDisksSupportedParams = map[string]struct{}{ + // ParamBrokerIDAndLogDirs: {}, + //} ) func ScaleFactoryFn() func(ctx context.Context, kafkaCluster *v1beta1.KafkaCluster) (CruiseControlScaler, error) { @@ -205,19 +210,19 @@ func (cc *cruiseControlScaler) AddBrokersWithParams(ctx context.Context, params for param, pvalue := range params { if _, ok := addBrokerSupportedParams[param]; ok { switch param { - case paramBrokerID: + case ParamBrokerID: ret, err := parseBrokerIDtoSlice(pvalue) if err != nil { return nil, err } addBrokerReq.BrokerIDs = ret - case paramExcludeDemoted: + case ParamExcludeDemoted: ret, err := strconv.ParseBool(pvalue) if err != nil { return nil, err } addBrokerReq.ExcludeRecentlyDemotedBrokers = ret - case paramExcludeRemoved: + case ParamExcludeRemoved: ret, err := strconv.ParseBool(pvalue) if err != nil { return nil, err @@ -282,19 +287,19 @@ func (cc *cruiseControlScaler) RemoveBrokersWithParams(ctx context.Context, para for param, pvalue := range params { if _, ok := removeBrokerSupportedParams[param]; ok { switch param { - case paramBrokerID: + case ParamBrokerID: ret, err := parseBrokerIDtoSlice(pvalue) if err != nil { return nil, err } rmBrokerReq.BrokerIDs = ret - case paramExcludeDemoted: + case ParamExcludeDemoted: ret, err := strconv.ParseBool(pvalue) if err != nil { return nil, err } rmBrokerReq.ExcludeRecentlyDemotedBrokers = ret - case paramExcludeRemoved: + case ParamExcludeRemoved: ret, err := strconv.ParseBool(pvalue) if err != nil { return nil, err @@ -464,25 +469,25 @@ func (cc *cruiseControlScaler) RebalanceWithParams(ctx context.Context, params m for param, pvalue := range params { if _, ok := rebalanceSupportedParams[param]; ok { switch param { - case paramDestbrokerIDs: + case ParamDestbrokerIDs: ret, err := parseBrokerIDtoSlice(pvalue) if err != nil { return nil, err } rebalanceReq.DestinationBrokerIDs = ret - case paramRebalanceDisk: + case ParamRebalanceDisk: ret, err := strconv.ParseBool(pvalue) if err != nil { return nil, err } rebalanceReq.RebalanceDisk = ret - case paramExcludeDemoted: + case ParamExcludeDemoted: ret, err := strconv.ParseBool(pvalue) if err != nil { return nil, err } rebalanceReq.ExcludeRecentlyDemotedBrokers = ret - case paramExcludeRemoved: + case ParamExcludeRemoved: ret, err := strconv.ParseBool(pvalue) if err != nil { return nil, err @@ -516,6 +521,55 @@ func (cc *cruiseControlScaler) RebalanceWithParams(ctx context.Context, params m }, nil } +func (cc *cruiseControlScaler) RemoveDisksWithParams(ctx context.Context, params map[string]string) (*Result, error) { + // TODO uncomment code below once go-cruise-control supports remove disk + /* + rmDiskReq := &api.RemoveDiskRequest{ + // TODO + } + + for param, pvalue := range params { + if _, ok := removeDisksSupportedParams[param]; ok { + switch param { + case paramBrokerIDAndLogDirs: + // TODO + default: + return nil, fmt.Errorf("unsupported %s parameter: %s, supported parameters: %s", v1alpha1.OperationRemoveDisk, param, removeDiskSupportedParams) + } + } + } + + rmDiskResp, err := cc.client.RemoveDisks(rmDiskReq) + if err != nil { + return &Result{ + TaskID: rmDiskResp.TaskID, + StartedAt: rmDiskResp.Date, + ResponseStatusCode: rmDiskResp.StatusCode, + RequestURL: rmDiskResp.RequestURL, + State: v1beta1.CruiseControlTaskCompletedWithError, + Err: err, + }, err + } + + return &Result{ + TaskID: rmDiskResp.TaskID, + StartedAt: rmDiskResp.Date, + ResponseStatusCode: rmDiskResp.StatusCode, + RequestURL: rmDiskResp.RequestURL, + Result: rmDiskResp.Result, + State: v1beta1.CruiseControlTaskActive, + }, nil + */ + + return &Result{ + State: v1beta1.CruiseControlTaskCompleted, + TaskID: "15062281-b604-4f52-b465-fc8f8ff94d09", + StartedAt: "Mon, 02 Jan 2006 15:04:05 MST", + ResponseStatusCode: 200, + RequestURL: "http://kafka-cruisecontrol-svc.kafka.svc.cluster.local:8090", + }, nil +} + func (cc *cruiseControlScaler) KafkaClusterLoad(ctx context.Context) (*api.KafkaClusterLoadResponse, error) { clusterLoadResp, err := cc.client.KafkaClusterLoad(ctx, api.KafkaClusterLoadRequestWithDefaults()) if err != nil { diff --git a/pkg/scale/types.go b/pkg/scale/types.go index 79b1ca188..700623a27 100644 --- a/pkg/scale/types.go +++ b/pkg/scale/types.go @@ -34,6 +34,7 @@ type CruiseControlScaler interface { RebalanceWithParams(ctx context.Context, params map[string]string) (*Result, error) StopExecution(ctx context.Context) (*Result, error) RemoveBrokers(ctx context.Context, brokerIDs ...string) (*Result, error) + RemoveDisksWithParams(ctx context.Context, params map[string]string) (*Result, error) RebalanceDisks(ctx context.Context, brokerIDs ...string) (*Result, error) BrokersWithState(ctx context.Context, states ...KafkaBrokerState) ([]string, error) KafkaClusterState(ctx context.Context) (*types.KafkaClusterState, error) diff --git a/pkg/webhooks/kafkacluster_validator.go b/pkg/webhooks/kafkacluster_validator.go index d2af7a5fa..906d0f3eb 100644 --- a/pkg/webhooks/kafkacluster_validator.go +++ b/pkg/webhooks/kafkacluster_validator.go @@ -18,8 +18,6 @@ import ( "context" "fmt" - "emperror.dev/errors" - "golang.org/x/exp/slices" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/validation/field" @@ -27,7 +25,6 @@ import ( "github.com/go-logr/logr" banzaicloudv1beta1 "github.com/banzaicloud/koperator/api/v1beta1" - "github.com/banzaicloud/koperator/pkg/util" ) type KafkaClusterValidator struct { @@ -36,19 +33,9 @@ type KafkaClusterValidator struct { func (s KafkaClusterValidator) ValidateUpdate(ctx context.Context, oldObj, newObj runtime.Object) error { var allErrs field.ErrorList - kafkaClusterOld := oldObj.(*banzaicloudv1beta1.KafkaCluster) kafkaClusterNew := newObj.(*banzaicloudv1beta1.KafkaCluster) log := s.Log.WithValues("name", kafkaClusterNew.GetName(), "namespace", kafkaClusterNew.GetNamespace()) - fieldErr, err := checkBrokerStorageRemoval(&kafkaClusterOld.Spec, &kafkaClusterNew.Spec) - if err != nil { - log.Error(err, errorDuringValidationMsg) - return apierrors.NewInternalError(errors.WithMessage(err, errorDuringValidationMsg)) - } - if fieldErr != nil { - allErrs = append(allErrs, fieldErr) - } - listenerErrs := checkInternalAndExternalListeners(&kafkaClusterNew.Spec) if listenerErrs != nil { allErrs = append(allErrs, listenerErrs...) @@ -88,76 +75,6 @@ func (s KafkaClusterValidator) ValidateDelete(ctx context.Context, obj runtime.O return nil } -// checkBrokerStorageRemoval checks if there is any broker storage which has been removed. If yes, admission will be rejected -func checkBrokerStorageRemoval(kafkaClusterSpecOld, kafkaClusterSpecNew *banzaicloudv1beta1.KafkaClusterSpec) (*field.Error, error) { - for j := range kafkaClusterSpecOld.Brokers { - brokerOld := &kafkaClusterSpecOld.Brokers[j] - for k := range kafkaClusterSpecNew.Brokers { - brokerNew := &kafkaClusterSpecNew.Brokers[k] - if brokerOld.Id == brokerNew.Id { - brokerConfigsOld, err := brokerOld.GetBrokerConfig(*kafkaClusterSpecOld) - if err != nil { - return nil, err - } - // checking broukerConfigGroup existence - if brokerNew.BrokerConfigGroup != "" { - if _, exists := kafkaClusterSpecNew.BrokerConfigGroups[brokerNew.BrokerConfigGroup]; !exists { - return field.Invalid(field.NewPath("spec").Child("brokers").Index(int(brokerNew.Id)).Child("brokerConfigGroup"), brokerNew.BrokerConfigGroup, unsupportedRemovingStorageMsg+", provided brokerConfigGroup not found"), nil - } - } - brokerConfigsNew, err := brokerNew.GetBrokerConfig(*kafkaClusterSpecNew) - if err != nil { - return nil, err - } - for e := range brokerConfigsOld.StorageConfigs { - storageConfigOld := &brokerConfigsOld.StorageConfigs[e] - isStorageFound := false - - for f := range brokerConfigsNew.StorageConfigs { - storageConfigNew := &brokerConfigsNew.StorageConfigs[f] - if storageConfigOld.MountPath == storageConfigNew.MountPath { - isStorageFound = true - break - } - } - if !isStorageFound { - fromConfigGroup := getMissingMounthPathLocation(storageConfigOld.MountPath, kafkaClusterSpecOld, int32(k)) - if fromConfigGroup != nil && *fromConfigGroup { - return field.Invalid(field.NewPath("spec").Child("brokers").Index(k).Child("brokerConfigGroup"), brokerNew.BrokerConfigGroup, fmt.Sprintf("%s, missing storageConfig mounthPath: %s", unsupportedRemovingStorageMsg, storageConfigOld.MountPath)), nil - } - return field.NotFound(field.NewPath("spec").Child("brokers").Index(k).Child("storageConfig").Index(e), storageConfigOld.MountPath+", "+unsupportedRemovingStorageMsg), nil - } - } - } - } - } - return nil, nil -} -func getMissingMounthPathLocation(mounthPath string, kafkaClusterSpec *banzaicloudv1beta1.KafkaClusterSpec, brokerId int32) (fromConfigGroup *bool) { - if brokerId < 0 || int(brokerId) >= len(kafkaClusterSpec.Brokers) { - return nil - } - - brokerConfigGroup := kafkaClusterSpec.Brokers[brokerId].BrokerConfigGroup - brokerConfigs, ok := kafkaClusterSpec.BrokerConfigGroups[brokerConfigGroup] - if !ok { - fromConfigGroup = util.BoolPointer(true) - } - idx := slices.IndexFunc(brokerConfigs.StorageConfigs, func(c banzaicloudv1beta1.StorageConfig) bool { return c.MountPath == mounthPath }) - if idx != -1 { - fromConfigGroup = util.BoolPointer(true) - } - - perBrokerConfigs := kafkaClusterSpec.Brokers[brokerId].BrokerConfig - if perBrokerConfigs != nil { - idx := slices.IndexFunc(perBrokerConfigs.StorageConfigs, func(c banzaicloudv1beta1.StorageConfig) bool { return c.MountPath == mounthPath }) - if idx != -1 { - fromConfigGroup = util.BoolPointer(false) - } - } - return fromConfigGroup -} - // checkListeners validates the spec.listenersConfig object func checkInternalAndExternalListeners(kafkaClusterSpec *banzaicloudv1beta1.KafkaClusterSpec) field.ErrorList { var allErrs field.ErrorList diff --git a/pkg/webhooks/kafkacluster_validator_test.go b/pkg/webhooks/kafkacluster_validator_test.go index 927d1f388..63393f5bb 100644 --- a/pkg/webhooks/kafkacluster_validator_test.go +++ b/pkg/webhooks/kafkacluster_validator_test.go @@ -24,473 +24,6 @@ import ( "k8s.io/apimachinery/pkg/util/validation/field" ) -// nolint: funlen -func TestCheckBrokerStorageRemoval(t *testing.T) { - testCases := []struct { - testName string - kafkaClusterSpecNew v1beta1.KafkaClusterSpec - kafkaClusterSpecOld v1beta1.KafkaClusterSpec - isValid bool - }{ - { - testName: "there is no storage remove", - kafkaClusterSpecNew: v1beta1.KafkaClusterSpec{ - BrokerConfigGroups: map[string]v1beta1.BrokerConfig{ - "default": { - StorageConfigs: []v1beta1.StorageConfig{ - {MountPath: "logs1"}, - {MountPath: "logs2"}, - {MountPath: "logs3"}, - }, - }, - }, - Brokers: []v1beta1.Broker{ - { - Id: 1, - BrokerConfigGroup: "default", - }, - }, - }, - kafkaClusterSpecOld: v1beta1.KafkaClusterSpec{ - BrokerConfigGroups: map[string]v1beta1.BrokerConfig{ - "default": { - StorageConfigs: []v1beta1.StorageConfig{ - {MountPath: "logs1"}, - {MountPath: "logs2"}, - {MountPath: "logs3"}, - }, - }, - }, - Brokers: []v1beta1.Broker{ - { - Id: 1, - BrokerConfigGroup: "default", - }, - }, - }, - isValid: true, - }, - { - testName: "there is no storage remove but there is broker remove", - kafkaClusterSpecNew: v1beta1.KafkaClusterSpec{ - BrokerConfigGroups: map[string]v1beta1.BrokerConfig{ - "default": { - StorageConfigs: []v1beta1.StorageConfig{ - {MountPath: "logs1"}, - {MountPath: "logs2"}, - {MountPath: "logs3"}, - }, - }, - }, - Brokers: []v1beta1.Broker{ - { - Id: 1, - BrokerConfigGroup: "default", - }, - }, - }, - kafkaClusterSpecOld: v1beta1.KafkaClusterSpec{ - BrokerConfigGroups: map[string]v1beta1.BrokerConfig{ - "default": { - StorageConfigs: []v1beta1.StorageConfig{ - {MountPath: "logs1"}, - {MountPath: "logs2"}, - {MountPath: "logs3"}, - }, - }, - }, - Brokers: []v1beta1.Broker{ - { - Id: 1, - BrokerConfigGroup: "default", - }, - { - Id: 2, - BrokerConfigGroup: "default", - }, - }, - }, - isValid: true, - }, - { - testName: "when there is storage remove but there is broker remove also", - kafkaClusterSpecNew: v1beta1.KafkaClusterSpec{ - BrokerConfigGroups: map[string]v1beta1.BrokerConfig{ - "default": { - StorageConfigs: []v1beta1.StorageConfig{ - {MountPath: "logs1"}, - {MountPath: "logs2"}, - {MountPath: "logs3"}, - }, - }, - }, - Brokers: []v1beta1.Broker{ - { - Id: 1, - BrokerConfigGroup: "default", - }, - }, - }, - kafkaClusterSpecOld: v1beta1.KafkaClusterSpec{ - BrokerConfigGroups: map[string]v1beta1.BrokerConfig{ - "default": { - StorageConfigs: []v1beta1.StorageConfig{ - {MountPath: "logs1"}, - {MountPath: "logs2"}, - {MountPath: "logs3"}, - }, - }, - }, - Brokers: []v1beta1.Broker{ - { - Id: 1, - BrokerConfigGroup: "default", - }, - { - Id: 2, - BrokerConfigGroup: "default", - BrokerConfig: &v1beta1.BrokerConfig{ - StorageConfigs: []v1beta1.StorageConfig{ - {MountPath: "logs4"}, - {MountPath: "logs5"}, - {MountPath: "logs6"}, - }, - }, - }, - }, - }, - isValid: true, - }, - { - testName: "when there is storage remove from another brokerConfigBroup", - kafkaClusterSpecNew: v1beta1.KafkaClusterSpec{ - BrokerConfigGroups: map[string]v1beta1.BrokerConfig{ - "default": { - StorageConfigs: []v1beta1.StorageConfig{ - {MountPath: "logs1"}, - {MountPath: "logs2"}, - {MountPath: "logs3"}, - }, - }, - "default2": { - StorageConfigs: []v1beta1.StorageConfig{ - // v1beta1.StorageConfig{MountPath: "logs1"}, - {MountPath: "logs2"}, - {MountPath: "logs3"}, - }, - }, - }, - Brokers: []v1beta1.Broker{ - { - Id: 1, - BrokerConfigGroup: "default2", - }, - }, - }, - kafkaClusterSpecOld: v1beta1.KafkaClusterSpec{ - BrokerConfigGroups: map[string]v1beta1.BrokerConfig{ - "default": { - StorageConfigs: []v1beta1.StorageConfig{ - {MountPath: "logs1"}, - {MountPath: "logs2"}, - {MountPath: "logs3"}, - }, - }, - }, - Brokers: []v1beta1.Broker{ - { - Id: 1, - BrokerConfigGroup: "default", - }, - }, - }, - isValid: false, - }, - { - testName: "when there is storage remove", - kafkaClusterSpecNew: v1beta1.KafkaClusterSpec{ - BrokerConfigGroups: map[string]v1beta1.BrokerConfig{ - "default": { - StorageConfigs: []v1beta1.StorageConfig{ - //v1beta1.StorageConfig{MountPath: "logs1"}, - {MountPath: "logs2"}, - {MountPath: "logs3"}, - }, - }, - }, - Brokers: []v1beta1.Broker{ - { - Id: 1, - BrokerConfigGroup: "default", - }, - }, - }, - kafkaClusterSpecOld: v1beta1.KafkaClusterSpec{ - BrokerConfigGroups: map[string]v1beta1.BrokerConfig{ - "default": { - StorageConfigs: []v1beta1.StorageConfig{ - {MountPath: "logs1"}, - {MountPath: "logs2"}, - {MountPath: "logs3"}, - }, - }, - }, - Brokers: []v1beta1.Broker{ - { - Id: 1, - BrokerConfigGroup: "default", - }, - }, - }, - isValid: false, - }, - { - testName: "when added a new one", - kafkaClusterSpecNew: v1beta1.KafkaClusterSpec{ - BrokerConfigGroups: map[string]v1beta1.BrokerConfig{ - "default": { - StorageConfigs: []v1beta1.StorageConfig{ - {MountPath: "logs1"}, - {MountPath: "logs2"}, - {MountPath: "logs3"}, - }, - }, - }, - Brokers: []v1beta1.Broker{ - { - Id: 1, - BrokerConfigGroup: "default", - }, - }, - }, - kafkaClusterSpecOld: v1beta1.KafkaClusterSpec{ - BrokerConfigGroups: map[string]v1beta1.BrokerConfig{ - "default": { - StorageConfigs: []v1beta1.StorageConfig{ - //v1beta1.StorageConfig{MountPath: "logs1"}, - {MountPath: "logs2"}, - {MountPath: "logs3"}, - }, - }, - }, - Brokers: []v1beta1.Broker{ - { - Id: 1, - BrokerConfigGroup: "default", - }, - }, - }, - isValid: true, - }, - { - testName: "when only sequence has changed", - kafkaClusterSpecNew: v1beta1.KafkaClusterSpec{ - BrokerConfigGroups: map[string]v1beta1.BrokerConfig{ - "default": { - StorageConfigs: []v1beta1.StorageConfig{ - {MountPath: "logs3"}, - {MountPath: "logs2"}, - {MountPath: "logs1"}, - }, - }, - }, - Brokers: []v1beta1.Broker{ - { - Id: 1, - BrokerConfigGroup: "default", - }, - }, - }, - kafkaClusterSpecOld: v1beta1.KafkaClusterSpec{ - BrokerConfigGroups: map[string]v1beta1.BrokerConfig{ - "default": { - StorageConfigs: []v1beta1.StorageConfig{ - {MountPath: "logs1"}, - {MountPath: "logs2"}, - {MountPath: "logs3"}, - }, - }, - }, - Brokers: []v1beta1.Broker{ - { - Id: 1, - BrokerConfigGroup: "default", - }, - }, - }, - isValid: true, - }, - { - testName: "when there is perBroker storageconfigs and there is no storage remove", - kafkaClusterSpecNew: v1beta1.KafkaClusterSpec{ - BrokerConfigGroups: map[string]v1beta1.BrokerConfig{ - "default": { - StorageConfigs: []v1beta1.StorageConfig{ - {MountPath: "logs1"}, - {MountPath: "logs2"}, - {MountPath: "logs3"}, - }, - }, - }, - Brokers: []v1beta1.Broker{ - { - Id: 1, - BrokerConfigGroup: "default", - BrokerConfig: &v1beta1.BrokerConfig{ - StorageConfigs: []v1beta1.StorageConfig{ - {MountPath: "logs4"}, - {MountPath: "logs5"}, - {MountPath: "logs6"}, - }, - }, - }, - }, - }, - kafkaClusterSpecOld: v1beta1.KafkaClusterSpec{ - BrokerConfigGroups: map[string]v1beta1.BrokerConfig{ - "default": { - StorageConfigs: []v1beta1.StorageConfig{ - {MountPath: "logs1"}, - {MountPath: "logs2"}, - {MountPath: "logs3"}, - }, - }, - }, - Brokers: []v1beta1.Broker{ - { - Id: 1, - BrokerConfigGroup: "default", - BrokerConfig: &v1beta1.BrokerConfig{ - StorageConfigs: []v1beta1.StorageConfig{ - {MountPath: "logs4"}, - {MountPath: "logs5"}, - {MountPath: "logs6"}, - }, - }, - }, - }, - }, - isValid: true, - }, - { - testName: "when there is perBroker config and added new and removed old", - kafkaClusterSpecNew: v1beta1.KafkaClusterSpec{ - BrokerConfigGroups: map[string]v1beta1.BrokerConfig{ - "default": { - StorageConfigs: []v1beta1.StorageConfig{ - {MountPath: "logs1"}, - {MountPath: "logs2"}, - {MountPath: "logs3"}, - }, - }, - }, - Brokers: []v1beta1.Broker{ - { - Id: 1, - BrokerConfigGroup: "default", - BrokerConfig: &v1beta1.BrokerConfig{ - StorageConfigs: []v1beta1.StorageConfig{ - {MountPath: "logs4"}, - {MountPath: "logs5"}, - {MountPath: "logs6"}, - }, - }, - }, - }, - }, - kafkaClusterSpecOld: v1beta1.KafkaClusterSpec{ - BrokerConfigGroups: map[string]v1beta1.BrokerConfig{ - "default": { - StorageConfigs: []v1beta1.StorageConfig{ - {MountPath: "logs1"}, - {MountPath: "logs2"}, - {MountPath: "logs3"}, - }, - }, - }, - Brokers: []v1beta1.Broker{ - { - Id: 1, - BrokerConfigGroup: "default", - BrokerConfig: &v1beta1.BrokerConfig{ - StorageConfigs: []v1beta1.StorageConfig{ - {MountPath: "logs4"}, - {MountPath: "logs5"}, - {MountPath: "logs8"}, - }, - }, - }, - }, - }, - isValid: false, - }, - { - testName: "when there is no such brokerConfigGroup", - kafkaClusterSpecNew: v1beta1.KafkaClusterSpec{ - BrokerConfigGroups: map[string]v1beta1.BrokerConfig{ - "default": { - StorageConfigs: []v1beta1.StorageConfig{ - {MountPath: "logs1"}, - {MountPath: "logs2"}, - {MountPath: "logs3"}, - }, - }, - }, - Brokers: []v1beta1.Broker{ - { - Id: 1, - BrokerConfigGroup: "notExists", - BrokerConfig: &v1beta1.BrokerConfig{ - StorageConfigs: []v1beta1.StorageConfig{ - {MountPath: "logs4"}, - {MountPath: "logs5"}, - {MountPath: "logs6"}, - }, - }, - }, - }, - }, - kafkaClusterSpecOld: v1beta1.KafkaClusterSpec{ - BrokerConfigGroups: map[string]v1beta1.BrokerConfig{ - "default": { - StorageConfigs: []v1beta1.StorageConfig{ - {MountPath: "logs1"}, - {MountPath: "logs2"}, - {MountPath: "logs3"}, - }, - }, - }, - Brokers: []v1beta1.Broker{ - { - Id: 1, - BrokerConfigGroup: "default", - BrokerConfig: &v1beta1.BrokerConfig{ - StorageConfigs: []v1beta1.StorageConfig{ - {MountPath: "logs4"}, - {MountPath: "logs5"}, - {MountPath: "logs8"}, - }, - }, - }, - }, - }, - isValid: false, - }, - } - - for _, testCase := range testCases { - res, err := checkBrokerStorageRemoval(&testCase.kafkaClusterSpecOld, &testCase.kafkaClusterSpecNew) - if err != nil { - t.Errorf("testName: %s, err should be nil, got %s", testCase.testName, err) - } - if res != nil && testCase.isValid { - t.Errorf("Message: %s, testName: %s", res.Error(), testCase.testName) - } else if res == nil && !testCase.isValid { - t.Errorf("there should be storage removal, testName: %s", testCase.testName) - } - } -} - func TestCheckUniqueListenerContainerPort(t *testing.T) { testCases := []struct { testName string From 971cd921f802bffd117dccdc61a25743333144e2 Mon Sep 17 00:00:00 2001 From: Alex Necula Date: Wed, 19 Apr 2023 18:00:12 +0300 Subject: [PATCH 2/7] Add local replacement for the go-cruise-control lib --- go.mod | 2 +- go.sum | 6 ++---- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/go.mod b/go.mod index e30dab480..ae1653a3e 100644 --- a/go.mod +++ b/go.mod @@ -24,7 +24,6 @@ require ( github.com/prometheus/common v0.37.0 github.com/stretchr/testify v1.8.0 go.uber.org/zap v1.23.0 - golang.org/x/exp v0.0.0-20220827204233-334a2380cb91 google.golang.org/protobuf v1.28.1 gopkg.in/inf.v0 v0.9.1 gotest.tools v2.2.0+incompatible @@ -140,6 +139,7 @@ require ( ) replace ( + github.com/banzaicloud/go-cruise-control => github.com/alex-necula/go-cruise-control v0.6.0-rc5 // TODO this will be replaced with adobe fork once it's created github.com/banzaicloud/koperator/api => ./api github.com/banzaicloud/koperator/properties => ./properties github.com/gogo/protobuf => github.com/waynz0r/protobuf v1.3.3-0.20210811122234-64636cae0910 diff --git a/go.sum b/go.sum index 0d6de2a9f..4611fcc4f 100644 --- a/go.sum +++ b/go.sum @@ -87,13 +87,13 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= +github.com/alex-necula/go-cruise-control v0.6.0-rc5 h1:CP+oohpCSF+ApvJt2U8LLTgmyi9dM7InDhofr6bSnus= +github.com/alex-necula/go-cruise-control v0.6.0-rc5/go.mod h1:6pfmzhD23At4/QV2capmSquv5hkyDUSDAqjNMoS2+70= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY= github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY= -github.com/banzaicloud/go-cruise-control v0.4.0 h1:+RH5D6k+SVe99Hqmbaw2LkHtS1ySRU9nprjai9Z+Ipk= -github.com/banzaicloud/go-cruise-control v0.4.0/go.mod h1:6pfmzhD23At4/QV2capmSquv5hkyDUSDAqjNMoS2+70= github.com/banzaicloud/istio-client-go v0.0.17 h1:wiplbM7FDiIHopujInAnin3zuovtVcphtKy9En39q5I= github.com/banzaicloud/istio-client-go v0.0.17/go.mod h1:rpnEYYGHzisx8nARl2d30Oq38EeCX0/PPaxMaREfE9I= github.com/banzaicloud/istio-operator/api/v2 v2.15.1 h1:BZg8COvoOJtfx/dgN7KpoOnce0LxDrElNHbvxNySs6g= @@ -711,8 +711,6 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0 golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4= golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= -golang.org/x/exp v0.0.0-20220827204233-334a2380cb91 h1:tnebWN09GYg9OLPss1KXj8txwZc6X6uMr6VFdcGNbHw= -golang.org/x/exp v0.0.0-20220827204233-334a2380cb91/go.mod h1:cyybsKvd6eL0RnXn6p/Grxp8F5bW7iYuBgsNCOHpMYE= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= From b34ed065207ddcf069dca158b32238d14d7d9706 Mon Sep 17 00:00:00 2001 From: Alex Necula Date: Thu, 20 Apr 2023 08:21:07 +0300 Subject: [PATCH 3/7] Update scale package --- pkg/scale/scale.go | 105 +++++++++++++++++++++++++--------------- pkg/scale/scale_test.go | 70 +++++++++++++++++++++++++++ 2 files changed, 135 insertions(+), 40 deletions(-) create mode 100644 pkg/scale/scale_test.go diff --git a/pkg/scale/scale.go b/pkg/scale/scale.go index f97d6e5b3..d2d4b39b4 100644 --- a/pkg/scale/scale.go +++ b/pkg/scale/scale.go @@ -66,10 +66,9 @@ var ( ParamExcludeDemoted: {}, ParamExcludeRemoved: {}, } - // TODO use this map to validate the parameters - //removeDisksSupportedParams = map[string]struct{}{ - // ParamBrokerIDAndLogDirs: {}, - //} + removeDisksSupportedParams = map[string]struct{}{ + ParamBrokerIDAndLogDirs: {}, + } ) func ScaleFactoryFn() func(ctx context.Context, kafkaCluster *v1beta1.KafkaCluster) (CruiseControlScaler, error) { @@ -522,54 +521,80 @@ func (cc *cruiseControlScaler) RebalanceWithParams(ctx context.Context, params m } func (cc *cruiseControlScaler) RemoveDisksWithParams(ctx context.Context, params map[string]string) (*Result, error) { - // TODO uncomment code below once go-cruise-control supports remove disk - /* - rmDiskReq := &api.RemoveDiskRequest{ - // TODO - } + removeReq := &api.RemoveDisksRequest{} - for param, pvalue := range params { - if _, ok := removeDisksSupportedParams[param]; ok { - switch param { - case paramBrokerIDAndLogDirs: - // TODO - default: - return nil, fmt.Errorf("unsupported %s parameter: %s, supported parameters: %s", v1alpha1.OperationRemoveDisk, param, removeDiskSupportedParams) + for param, pvalue := range params { + if _, ok := removeDisksSupportedParams[param]; ok { + switch param { + case ParamBrokerIDAndLogDirs: + ret, err := parseBrokerIDsAndLogDirsToMap(pvalue) + if err != nil { + return nil, err } + removeReq.BrokerIDAndLogDirs = ret + default: + return nil, fmt.Errorf("unsupported %s parameter: %s, supported parameters: %s", v1alpha1.OperationRemoveDisks, param, removeDisksSupportedParams) } } + } - rmDiskResp, err := cc.client.RemoveDisks(rmDiskReq) - if err != nil { - return &Result{ - TaskID: rmDiskResp.TaskID, - StartedAt: rmDiskResp.Date, - ResponseStatusCode: rmDiskResp.StatusCode, - RequestURL: rmDiskResp.RequestURL, - State: v1beta1.CruiseControlTaskCompletedWithError, - Err: err, - }, err - } - + if len(removeReq.BrokerIDAndLogDirs) == 0 { return &Result{ - TaskID: rmDiskResp.TaskID, - StartedAt: rmDiskResp.Date, - ResponseStatusCode: rmDiskResp.StatusCode, - RequestURL: rmDiskResp.RequestURL, - Result: rmDiskResp.Result, - State: v1beta1.CruiseControlTaskActive, + State: v1beta1.CruiseControlTaskCompleted, }, nil - */ + } + + removeResp, err := cc.client.RemoveDisks(ctx, removeReq) + if err != nil { + return &Result{ + TaskID: removeResp.TaskID, + StartedAt: removeResp.Date, + ResponseStatusCode: removeResp.StatusCode, + RequestURL: removeResp.RequestURL, + State: v1beta1.CruiseControlTaskCompletedWithError, + Err: err, + }, err + } return &Result{ - State: v1beta1.CruiseControlTaskCompleted, - TaskID: "15062281-b604-4f52-b465-fc8f8ff94d09", - StartedAt: "Mon, 02 Jan 2006 15:04:05 MST", - ResponseStatusCode: 200, - RequestURL: "http://kafka-cruisecontrol-svc.kafka.svc.cluster.local:8090", + TaskID: removeResp.TaskID, + StartedAt: removeResp.Date, + ResponseStatusCode: removeResp.StatusCode, + RequestURL: removeResp.RequestURL, + Result: removeResp.Result, + State: v1beta1.CruiseControlTaskActive, }, nil } +func parseBrokerIDsAndLogDirsToMap(brokerIDsAndLogDirs string) (map[int32][]string, error) { + // brokerIDsAndLogDirs format: brokerID1-logDir1,brokerID2-logDir2,brokerID1-logDir3 + brokerIDLogDirMap := make(map[int32][]string) + + if len(brokerIDsAndLogDirs) == 0 { + return brokerIDLogDirMap, nil + } + + pairs := strings.Split(brokerIDsAndLogDirs, ",") + for _, pair := range pairs { + components := strings.SplitN(pair, "-", 2) + if len(components) != 2 { + return nil, errors.New("invalid format for brokerIDsAndLogDirs") + } + + brokerID, err := strconv.ParseInt(components[0], 10, 32) + if err != nil { + return nil, errors.New("invalid broker ID") + } + + logDir := components[1] + + // Add logDir to the corresponding brokerID's list + brokerIDLogDirMap[int32(brokerID)] = append(brokerIDLogDirMap[int32(brokerID)], logDir) + } + + return brokerIDLogDirMap, nil +} + func (cc *cruiseControlScaler) KafkaClusterLoad(ctx context.Context) (*api.KafkaClusterLoadResponse, error) { clusterLoadResp, err := cc.client.KafkaClusterLoad(ctx, api.KafkaClusterLoadRequestWithDefaults()) if err != nil { diff --git a/pkg/scale/scale_test.go b/pkg/scale/scale_test.go new file mode 100644 index 000000000..ccaf6ad0c --- /dev/null +++ b/pkg/scale/scale_test.go @@ -0,0 +1,70 @@ +// Copyright © 2023 Cisco Systems, Inc. and/or its affiliates +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package scale + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestParseBrokerIDsAndLogDirToMap(t *testing.T) { + testCases := []struct { + testName string + brokerIDsAndLogDirs string + want map[int32][]string + wantErr bool + }{ + { + testName: "valid input", + brokerIDsAndLogDirs: "102-/kafka-logs3/kafka,101-/kafka-logs3/kafka,101-/kafka-logs2/kafka", + want: map[int32][]string{ + 101: {"/kafka-logs3/kafka", "/kafka-logs2/kafka"}, + 102: {"/kafka-logs3/kafka"}, + }, + wantErr: false, + }, + { + testName: "empty input", + brokerIDsAndLogDirs: "", + want: map[int32][]string{}, + wantErr: false, + }, + { + testName: "invalid format", + brokerIDsAndLogDirs: "1-dirA,2-dirB,1", + want: nil, + wantErr: true, + }, + { + testName: "invalid broker ID", + brokerIDsAndLogDirs: "1-dirA,abc-dirB,1-dirC", + want: nil, + wantErr: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.testName, func(t *testing.T) { + got, err := parseBrokerIDsAndLogDirsToMap(tc.brokerIDsAndLogDirs) + if tc.wantErr { + require.Error(t, err) + } else { + require.NoError(t, err) + require.Equal(t, tc.want, got) + } + }) + } +} From 39d12a9eea401f43059996d36a6c390896fcec9a Mon Sep 17 00:00:00 2001 From: Alex Necula Date: Thu, 20 Apr 2023 08:32:19 +0300 Subject: [PATCH 4/7] Revert removing kafkacluster webhook --- main.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/main.go b/main.go index fbe4f7feb..cb015ed06 100644 --- a/main.go +++ b/main.go @@ -217,6 +217,15 @@ func main() { } if !webhookDisabled { + err = ctrl.NewWebhookManagedBy(mgr).For(&banzaicloudv1beta1.KafkaCluster{}). + WithValidator(webhooks.KafkaClusterValidator{ + Log: mgr.GetLogger().WithName("webhooks").WithName("KafkaCluster"), + }). + Complete() + if err != nil { + setupLog.Error(err, "unable to create validating webhook", "Kind", "KafkaCluster") + os.Exit(1) + } err = ctrl.NewWebhookManagedBy(mgr).For(&banzaicloudv1alpha1.KafkaTopic{}). WithValidator(webhooks.KafkaTopicValidator{ Client: mgr.GetClient(), From 6df6afebaa5c4f206429d29e50385db1fd8e7652 Mon Sep 17 00:00:00 2001 From: Alex Necula Date: Thu, 20 Apr 2023 08:51:52 +0300 Subject: [PATCH 5/7] Fix creating multiple CC tasks --- pkg/resources/kafka/kafka.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/resources/kafka/kafka.go b/pkg/resources/kafka/kafka.go index 60ffac12c..15ffa27a5 100644 --- a/pkg/resources/kafka/kafka.go +++ b/pkg/resources/kafka/kafka.go @@ -1009,6 +1009,7 @@ func (r *Reconciler) reconcileKafkaPvc(ctx context.Context, log logr.Logger, bro if !found { // If the state is not found, it means that the disk removal was done according to the disk removal succeeded branch log.Info("Disk removal was completed, waiting for Rolling Upgrade to remove PVC", "brokerId", brokerId, "mountPath", mountPathToRemove) + continue } // Check the volume state From 439651e26ce98a70bd7693845364bd24b6000e81 Mon Sep 17 00:00:00 2001 From: Alex Necula Date: Thu, 20 Apr 2023 09:12:17 +0300 Subject: [PATCH 6/7] Fix lint --- pkg/resources/kafka/kafka.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/resources/kafka/kafka.go b/pkg/resources/kafka/kafka.go index 15ffa27a5..7778520ff 100644 --- a/pkg/resources/kafka/kafka.go +++ b/pkg/resources/kafka/kafka.go @@ -958,6 +958,7 @@ func (r *Reconciler) isPodTainted(log logr.Logger, pod *corev1.Pod) bool { return selector.Matches(labels.Set(pod.Labels)) } +//nolint:funlen func (r *Reconciler) reconcileKafkaPvc(ctx context.Context, log logr.Logger, brokersDesiredPvcs map[string][]*corev1.PersistentVolumeClaim) error { brokersVolumesState := make(map[string]map[string]v1beta1.VolumeState) var brokerIds []string From 53dcc48c6d861e6dc1ea59bc8a2db0dee8274d2c Mon Sep 17 00:00:00 2001 From: Alex Necula Date: Thu, 4 May 2023 14:07:23 +0300 Subject: [PATCH 7/7] Update image --- go.mod | 22 +++++++++++----------- go.sum | 20 ++++++++++++++++++++ 2 files changed, 31 insertions(+), 11 deletions(-) diff --git a/go.mod b/go.mod index ae1653a3e..77f02f2ec 100644 --- a/go.mod +++ b/go.mod @@ -16,10 +16,10 @@ require ( github.com/cisco-open/cluster-registry-controller/api v0.2.5 github.com/envoyproxy/go-control-plane v0.10.3 github.com/ghodss/yaml v1.0.1-0.20190212211648-25d852aebe32 - github.com/go-logr/logr v1.2.3 + github.com/go-logr/logr v1.2.4 github.com/imdario/mergo v0.3.13 - github.com/onsi/ginkgo/v2 v2.8.4 - github.com/onsi/gomega v1.27.2 + github.com/onsi/ginkgo/v2 v2.9.2 + github.com/onsi/gomega v1.27.6 github.com/pavlo-v-chernykh/keystore-go/v4 v4.4.1 github.com/prometheus/common v0.37.0 github.com/stretchr/testify v1.8.0 @@ -35,9 +35,9 @@ require ( ) require ( - github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 // indirect + github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 // indirect - golang.org/x/tools v0.6.0 // indirect + golang.org/x/tools v0.7.0 // indirect ) require ( @@ -70,7 +70,7 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/mock v1.6.0 - github.com/golang/protobuf v1.5.2 // indirect + github.com/golang/protobuf v1.5.3 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/google/gnostic v0.6.9 // indirect github.com/google/go-cmp v0.5.9 @@ -116,11 +116,11 @@ require ( go.uber.org/atomic v1.7.0 // indirect go.uber.org/multierr v1.6.0 // indirect golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa // indirect - golang.org/x/net v0.7.0 // indirect + golang.org/x/net v0.8.0 // indirect golang.org/x/oauth2 v0.0.0-20220223155221-ee480838109b // indirect - golang.org/x/sys v0.5.0 // indirect - golang.org/x/term v0.5.0 // indirect - golang.org/x/text v0.7.0 // indirect + golang.org/x/sys v0.6.0 // indirect + golang.org/x/term v0.6.0 // indirect + golang.org/x/text v0.8.0 // indirect golang.org/x/time v0.0.0-20220609170525-579cf78fd858 // indirect gomodules.xyz/jsonpatch/v2 v2.2.0 // indirect google.golang.org/appengine v1.6.7 // indirect @@ -139,7 +139,7 @@ require ( ) replace ( - github.com/banzaicloud/go-cruise-control => github.com/alex-necula/go-cruise-control v0.6.0-rc5 // TODO this will be replaced with adobe fork once it's created + github.com/banzaicloud/go-cruise-control => github.com/adobe/go-cruise-control v0.6.0-adbe github.com/banzaicloud/koperator/api => ./api github.com/banzaicloud/koperator/properties => ./properties github.com/gogo/protobuf => github.com/waynz0r/protobuf v1.3.3-0.20210811122234-64636cae0910 diff --git a/go.sum b/go.sum index 4611fcc4f..44d125878 100644 --- a/go.sum +++ b/go.sum @@ -81,6 +81,8 @@ github.com/Shopify/sarama v1.36.0 h1:0OJs3eCcnezkWniVjwBbCJVaa0B1k7ImCRS3WN6NsSk github.com/Shopify/sarama v1.36.0/go.mod h1:9glG3eX83tgVYJ5aVtrjVUnEsOPqQIBGx1BWfN+X51I= github.com/Shopify/toxiproxy/v2 v2.4.0 h1:O1e4Jfvr/hefNTNu+8VtdEG5lSeamJRo4aKhMOKNM64= github.com/Shopify/toxiproxy/v2 v2.4.0/go.mod h1:3ilnjng821bkozDRxNoo64oI/DKqM+rOyJzb564+bvg= +github.com/adobe/go-cruise-control v0.6.0-adbe h1:Qr9NJ4clpbgNmZQK52b96fg+7+wnrjNRF7i7upHLUk0= +github.com/adobe/go-cruise-control v0.6.0-adbe/go.mod h1:52C8XiTZjSmFVD+y76rd2al//GTJk9mSwkcHs2LGSvA= github.com/ahmetb/gen-crd-api-reference-docs v0.3.0/go.mod h1:TdjdkYhlOifCQWPs1UdTma97kQQMozf5h26hTuG70u8= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= @@ -235,6 +237,8 @@ github.com/go-logr/logr v1.2.0/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbV github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0= github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ= +github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/zapr v0.4.0/go.mod h1:tabnROwaDl0UNxkVeFRbY8bwB37GwRv0P8lg6aAiEnk= github.com/go-logr/zapr v1.2.3 h1:a9vnzlIBPQBBkeaR9IuMUfmVOrQlkoC4YfPoFkX3T7A= github.com/go-logr/zapr v1.2.3/go.mod h1:eIauM6P8qSvTw5o2ez6UEAfGjQKrxQTl5EoK+Qa2oG4= @@ -259,6 +263,8 @@ github.com/go-openapi/swag v0.19.14/go.mod h1:QYRuS/SOXUCsnplDa677K7+DxSOj6IPNl/ github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 h1:p104kn46Q8WdvHunIJ9dAyjPVtrBPhSr3KT2yUst43I= github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE= +github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI= +github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4B2jHnOSGXyyzV8ROjYa2ojvAY6HCGYYfMoC3Ls= github.com/go-test/deep v1.0.7 h1:/VSMRlnY/JSyqxQUzQLKVMAskpY/NZKFA5j2P+0pP2M= github.com/gobuffalo/flect v0.2.3/go.mod h1:vmkQwuZYhN5Pc4ljYQZzP+1sq+NEkK+lh20jmEmX3jc= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= @@ -517,6 +523,8 @@ github.com/onsi/ginkgo v1.16.4/go.mod h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vv github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= github.com/onsi/ginkgo/v2 v2.8.4 h1:gf5mIQ8cLFieruNLAdgijHF1PYfLphKm2dxxcUtcqK0= github.com/onsi/ginkgo/v2 v2.8.4/go.mod h1:427dEDQZkDKsBvCjc2A/ZPefhKxsTTrsQegMlayL730= +github.com/onsi/ginkgo/v2 v2.9.2 h1:BA2GMJOtfGAfagzYtrAlufIP0lq6QERkFmHLMLPwFSU= +github.com/onsi/ginkgo/v2 v2.9.2/go.mod h1:WHcJJG2dIlcCqVfBAwUCrJxSPFb6v4azBwgxeMeDuts= github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA= github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= @@ -524,6 +532,8 @@ github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1y github.com/onsi/gomega v1.14.0/go.mod h1:cIuvLEne0aoVhAgh/O6ac0Op8WWw9H6eYCriF+tEHG0= github.com/onsi/gomega v1.27.2 h1:SKU0CXeKE/WVgIV1T61kSa3+IRE8Ekrv9rdXDwwTqnY= github.com/onsi/gomega v1.27.2/go.mod h1:5mR3phAHpkAVIDkHEUBY6HGVsU+cpcEscrGPB4oPlZI= +github.com/onsi/gomega v1.27.6 h1:ENqfyGeS5AX/rlXDd/ETokDz93u0YufY1Pgxuy/PvWE= +github.com/onsi/gomega v1.27.6/go.mod h1:PIQNjfQwkP3aQAH7lf7j87O/5FiNr+ZR8+ipb+qQlhg= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pavlo-v-chernykh/keystore-go/v4 v4.4.1 h1:FyBdsRqqHH4LctMLL+BL2oGO+ONcIPwn96ctofCVtNE= github.com/pavlo-v-chernykh/keystore-go/v4 v4.4.1/go.mod h1:lAVhWwbNaveeJmxrxuSTxMgKpF6DjnuVpn6T8WiBwYQ= @@ -796,6 +806,8 @@ golang.org/x/net v0.0.0-20220725212005-46097bf591d3/go.mod h1:AaygXjzTFtRAg2ttMY golang.org/x/net v0.0.0-20220809184613-07c6da5e1ced/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk= golang.org/x/net v0.7.0 h1:rJrUqqhjsgNp7KqAIc25s9pZnjU7TUcSY7HcVZjdn1g= golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ= +golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -910,12 +922,16 @@ golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.5.0 h1:n2a8QNdAb0sZNpU9R1ALUXBbY+w51fCQDN+7EdxNBsY= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= +golang.org/x/term v0.6.0 h1:clScbb1cHjoCkyRbWwBEUZ5H/tIFu5TAXIqaZD0Gcjw= +golang.org/x/term v0.6.0/go.mod h1:m6U89DPEgQRMq3DNkDClhWw02AUbt2daBVO4cn4Hv9U= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -927,6 +943,8 @@ golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68= +golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -998,6 +1016,8 @@ golang.org/x/tools v0.1.4/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.6.0 h1:BOw41kyTf3PuCW1pVQf8+Cyg8pMlkYB1oo9iJ6D/lKM= golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= +golang.org/x/tools v0.7.0 h1:W4OVu8VVOaIO0yzWMNdepAulS7YfoS3Zabrm8DOXXU4= +golang.org/x/tools v0.7.0/go.mod h1:4pg6aUX35JBAogB10C9AtvVL+qowtN4pT3CGSQex14s= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=