Skip to content

Commit

Permalink
Remove disk feature (#53)
Browse files Browse the repository at this point in the history
* Implement disk removal feature

---------

Co-authored-by: Alex Necula <anecula@adobe.com>
  • Loading branch information
amuraru and Alex Necula committed Jun 10, 2023
1 parent 1390424 commit 7570e22
Show file tree
Hide file tree
Showing 22 changed files with 1,055 additions and 665 deletions.
2 changes: 2 additions & 0 deletions api/v1alpha1/common_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ const (
OperationAddBroker CruiseControlTaskOperation = "add_broker"
// OperationRemoveBroker means a Cruise Control remove_broker operation
OperationRemoveBroker CruiseControlTaskOperation = "remove_broker"
// OperationRemoveDisks means a Cruise Control remove_disks operation
OperationRemoveDisks CruiseControlTaskOperation = "remove_disks"
// OperationRebalance means a Cruise Control rebalance operation
OperationRebalance CruiseControlTaskOperation = "rebalance"
// OperationStatus means a Cruise Control status operation
Expand Down
10 changes: 8 additions & 2 deletions api/v1alpha1/cruisecontroloperation_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ const (
ErrorPolicyRetry ErrorPolicyType = "retry"
// DefaultRetryBackOffDurationSec defines the time between retries of the failed tasks.
DefaultRetryBackOffDurationSec = 30
// PauseLabel defines the label key for pausing Cruise Control operations.
PauseLabel = "pause"
True = "true"
)

//+kubebuilder:object:root=true
Expand Down Expand Up @@ -184,7 +187,7 @@ func (o *CruiseControlOperation) IsDone() bool {
}

func (o *CruiseControlOperation) IsPaused() bool {
return o.GetLabels()["pause"] == "true"
return o.GetLabels()[PauseLabel] == True
}

func (o *CruiseControlOperation) IsErrorPolicyIgnore() bool {
Expand Down Expand Up @@ -221,5 +224,8 @@ func (o *CruiseControlOperation) IsCurrentTaskFinished() bool {

func (o *CruiseControlOperation) IsCurrentTaskOperationValid() bool {
return o.CurrentTaskOperation() == OperationAddBroker ||
o.CurrentTaskOperation() == OperationRebalance || o.CurrentTaskOperation() == OperationRemoveBroker || o.CurrentTaskOperation() == OperationStopExecution
o.CurrentTaskOperation() == OperationRebalance ||
o.CurrentTaskOperation() == OperationRemoveBroker ||
o.CurrentTaskOperation() == OperationStopExecution ||
o.CurrentTaskOperation() == OperationRemoveDisks
}
54 changes: 44 additions & 10 deletions api/v1beta1/common_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,24 +66,39 @@ type PKIBackend string
// CruiseControlVolumeState holds information about the state of volume rebalance
type CruiseControlVolumeState string

// IsRunningState returns true if CruiseControlVolumeState indicates
// that the CC operation is scheduled and in-progress
func (s CruiseControlVolumeState) IsRunningState() bool {
// IsDiskRebalanceRunning returns true if CruiseControlVolumeState indicates
// that the CC rebalance disk operation is scheduled and in-progress
func (s CruiseControlVolumeState) IsDiskRebalanceRunning() bool {
return s == GracefulDiskRebalanceRunning ||
s == GracefulDiskRebalanceCompletedWithError ||
s == GracefulDiskRebalancePaused ||
s == GracefulDiskRebalanceScheduled
}

// IsRequiredState returns true if CruiseControlVolumeState is in GracefulDiskRebalanceRequired state
// IsDiskRemovalRunning returns true if CruiseControlVolumeState indicates
// that the CC remove disks operation is scheduled and in-progress
func (s CruiseControlVolumeState) IsDiskRemovalRunning() bool {
return s == GracefulDiskRemovalRunning ||
s == GracefulDiskRemovalCompletedWithError ||
s == GracefulDiskRemovalPaused ||
s == GracefulDiskRemovalScheduled
}

// IsRequiredState returns true if CruiseControlVolumeState is in GracefulDiskRebalanceRequired state or GracefulDiskRemovalRequired state
func (s CruiseControlVolumeState) IsRequiredState() bool {
return s == GracefulDiskRebalanceRequired
return s == GracefulDiskRebalanceRequired ||
s == GracefulDiskRemovalRequired
}

// IsActive returns true if CruiseControlVolumeState is in active state
// IsDiskRebalance returns true if CruiseControlVolumeState is in disk rebalance state
// the controller needs to take care of.
func (s CruiseControlVolumeState) IsActive() bool {
return s.IsRunningState() || s == GracefulDiskRebalanceRequired
func (s CruiseControlVolumeState) IsDiskRebalance() bool {
return s.IsDiskRebalanceRunning() || s == GracefulDiskRebalanceRequired
}

// IsDiskRemoval returns true if CruiseControlVolumeState is in disk removal state
func (s CruiseControlVolumeState) IsDiskRemoval() bool {
return s.IsDiskRemovalRunning() || s == GracefulDiskRemovalRequired
}

// IsUpscale returns true if CruiseControlState in GracefulUpscale* state.
Expand Down Expand Up @@ -138,11 +153,16 @@ func (r CruiseControlState) IsSucceeded() bool {
r == GracefulUpscaleSucceeded
}

// IsSucceeded returns true if CruiseControlVolumeState is succeeded
func (r CruiseControlVolumeState) IsSucceeded() bool {
// IsDiskRebalanceSucceeded returns true if CruiseControlVolumeState is disk rebalance succeeded
func (r CruiseControlVolumeState) IsDiskRebalanceSucceeded() bool {
return r == GracefulDiskRebalanceSucceeded
}

// IsDiskRemovalSucceeded returns true if CruiseControlVolumeState is disk removal succeeded
func (r CruiseControlVolumeState) IsDiskRemovalSucceeded() bool {
return r == GracefulDiskRemovalSucceeded
}

// IsSSL determines if the receiver is using SSL
func (r SecurityProtocol) IsSSL() bool {
return r.Equal(SecurityProtocolSaslSSL) || r.Equal(SecurityProtocolSSL)
Expand Down Expand Up @@ -255,6 +275,20 @@ const (
// GracefulDownscalePaused states that the broker downscale task is completed with an error and it will not be retried, it is paused. In this case further downscale tasks can be executed
GracefulDownscalePaused CruiseControlState = "GracefulDownscalePaused"

// Disk removal cruise control states
// GracefulDiskRemovalRequired states that the broker volume needs to be removed
GracefulDiskRemovalRequired CruiseControlVolumeState = "GracefulDiskRemovalRequired"
// GracefulDiskRemovalRunning states that for the broker volume a CC disk removal is in progress
GracefulDiskRemovalRunning CruiseControlVolumeState = "GracefulDiskRemovalRunning"
// GracefulDiskRemovalSucceeded states that the for the broker volume removal has succeeded
GracefulDiskRemovalSucceeded CruiseControlVolumeState = "GracefulDiskRemovalSucceeded"
// GracefulDiskRemovalScheduled states that the broker volume removal CCOperation is created and the task is waiting for execution
GracefulDiskRemovalScheduled CruiseControlVolumeState = "GracefulDiskRemovalScheduled"
// GracefulDiskRemovalCompletedWithError states that the broker volume removal task completed with an error
GracefulDiskRemovalCompletedWithError CruiseControlVolumeState = "GracefulDiskRemovalCompletedWithError"
// GracefulDiskRemovalPaused states that the broker volume removal task is completed with an error and it will not be retried, it is paused
GracefulDiskRemovalPaused CruiseControlVolumeState = "GracefulDiskRemovalPaused"

// Disk rebalance cruise control states
// GracefulDiskRebalanceRequired states that the broker volume needs a CC disk rebalance
GracefulDiskRebalanceRequired CruiseControlVolumeState = "GracefulDiskRebalanceRequired"
Expand Down
72 changes: 53 additions & 19 deletions controllers/cruisecontroloperation_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@ const (
var (
defaultRequeueIntervalInSeconds = 10
executionPriorityMap = map[banzaiv1alpha1.CruiseControlTaskOperation]int{
banzaiv1alpha1.OperationAddBroker: 2,
banzaiv1alpha1.OperationRemoveBroker: 1,
banzaiv1alpha1.OperationAddBroker: 3,
banzaiv1alpha1.OperationRemoveBroker: 2,
banzaiv1alpha1.OperationRemoveDisks: 1,
banzaiv1alpha1.OperationRebalance: 0,
}
missingCCResErr = errors.New("missing Cruise Control user task result")
Expand Down Expand Up @@ -197,7 +198,11 @@ func (r *CruiseControlOperationReconciler) Reconcile(ctx context.Context, reques
return reconciled()
}

ccOperationExecution := selectOperationForExecution(ccOperationQueueMap)
ccOperationExecution, err := r.selectOperationForExecution(ccOperationQueueMap)
if err != nil {
log.Error(err, "requeue event as selecting operation for execution failed")
return requeueAfter(defaultRequeueIntervalInSeconds)
}
// There is nothing to be executed for now, requeue
if ccOperationExecution == nil {
return requeueAfter(defaultRequeueIntervalInSeconds)
Expand Down Expand Up @@ -265,6 +270,8 @@ func (r *CruiseControlOperationReconciler) executeOperation(ctx context.Context,
cruseControlTaskResult, err = r.scaler.RemoveBrokersWithParams(ctx, ccOperationExecution.CurrentTaskParameters())
case banzaiv1alpha1.OperationRebalance:
cruseControlTaskResult, err = r.scaler.RebalanceWithParams(ctx, ccOperationExecution.CurrentTaskParameters())
case banzaiv1alpha1.OperationRemoveDisks:
cruseControlTaskResult, err = r.scaler.RemoveDisksWithParams(ctx, ccOperationExecution.CurrentTaskParameters())
case banzaiv1alpha1.OperationStopExecution:
cruseControlTaskResult, err = r.scaler.StopExecution(ctx)
default:
Expand Down Expand Up @@ -300,29 +307,56 @@ func sortOperations(ccOperations []*banzaiv1alpha1.CruiseControlOperation) map[s
return ccOperationQueueMap
}

func selectOperationForExecution(ccOperationQueueMap map[string][]*banzaiv1alpha1.CruiseControlOperation) *banzaiv1alpha1.CruiseControlOperation {
// SELECTING OPERATION FOR EXECUTION
var ccOperationExecution *banzaiv1alpha1.CruiseControlOperation
// selectOperationForExecution selects the next operation to be executed
func (r *CruiseControlOperationReconciler) selectOperationForExecution(ccOperationQueueMap map[string][]*banzaiv1alpha1.CruiseControlOperation) (*banzaiv1alpha1.CruiseControlOperation, error) {
// First prio: execute the finalize task
switch {
case len(ccOperationQueueMap[ccOperationForStopExecution]) > 0:
ccOperationExecution = ccOperationQueueMap[ccOperationForStopExecution][0]
ccOperationExecution.CurrentTask().Operation = banzaiv1alpha1.OperationStopExecution
if op := getFirstOperation(ccOperationQueueMap, ccOperationForStopExecution); op != nil {
op.CurrentTask().Operation = banzaiv1alpha1.OperationStopExecution
return op, nil
}

// Second prio: execute add_broker operation
case len(ccOperationQueueMap[ccOperationFirstExecution]) > 0 && ccOperationQueueMap[ccOperationFirstExecution][0].CurrentTaskOperation() == banzaiv1alpha1.OperationAddBroker:
ccOperationExecution = ccOperationQueueMap[ccOperationFirstExecution][0]
if op := getFirstOperation(ccOperationQueueMap, ccOperationFirstExecution); op != nil &&
op.CurrentTaskOperation() == banzaiv1alpha1.OperationAddBroker {
return op, nil
}

// Third prio: execute failed task
case len(ccOperationQueueMap[ccOperationRetryExecution]) > 0:
if op := getFirstOperation(ccOperationQueueMap, ccOperationRetryExecution); op != nil {
// If there is a failed remove_disks task and there is a rebalance_disks task in the queue, we execute the rebalance_disks task
// This could only happen if the user tried to delete a disk, and later rolled back the change
if op.CurrentTaskOperation() == banzaiv1alpha1.OperationRemoveDisks {
for _, opFirstExecution := range ccOperationQueueMap[ccOperationFirstExecution] {
if opFirstExecution.CurrentTaskOperation() == banzaiv1alpha1.OperationRebalance {
// Mark the remove disk operation as paused, so it is not retried
op.Labels[banzaiv1alpha1.PauseLabel] = True
err := r.Client.Update(context.TODO(), op)
if err != nil {
return nil, errors.WrapIfWithDetails(err, "failed to update Cruise Control operation", "name", op.Name, "namespace", op.Namespace)
}

// Execute the rebalance disks operation
return opFirstExecution, nil
}
}
}

// When the default backoff duration elapsed we retry
if ccOperationQueueMap[ccOperationRetryExecution][0].IsReadyForRetryExecution() {
ccOperationExecution = ccOperationQueueMap[ccOperationRetryExecution][0]
if op.IsReadyForRetryExecution() {
return op, nil
}
// Forth prio: execute the first element in the FirstExecutionQueue which is ordered by operation type and k8s creation timestamp
case len(ccOperationQueueMap[ccOperationFirstExecution]) > 0:
ccOperationExecution = ccOperationQueueMap[ccOperationFirstExecution][0]
}

return ccOperationExecution
// Fourth prio: execute the first element in the FirstExecutionQueue which is ordered by operation type and k8s creation timestamp
return getFirstOperation(ccOperationQueueMap, ccOperationFirstExecution), nil
}

// getFirstOperation returns the first operation in the given queue
func getFirstOperation(ccOperationQueueMap map[string][]*banzaiv1alpha1.CruiseControlOperation, key string) *banzaiv1alpha1.CruiseControlOperation {
if len(ccOperationQueueMap[key]) > 0 {
return ccOperationQueueMap[key][0]
}
return nil
}

// SetupCruiseControlWithManager registers cruise control controller to the manager
Expand Down
15 changes: 15 additions & 0 deletions controllers/cruisecontroloperation_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,21 @@ func TestSortOperations(t *testing.T) {
createCCRetryExecutionOperation(timeNow, "3", v1alpha1.OperationRebalance),
},
},
{
testName: "mixed with remove disks",
ccOperations: []*v1alpha1.CruiseControlOperation{
createCCRetryExecutionOperation(timeNow, "1", v1alpha1.OperationAddBroker),
createCCRetryExecutionOperation(timeNow, "4", v1alpha1.OperationRebalance),
createCCRetryExecutionOperation(timeNow.Add(2*time.Second), "3", v1alpha1.OperationRemoveDisks),
createCCRetryExecutionOperation(timeNow.Add(time.Second), "2", v1alpha1.OperationRemoveBroker),
},
expectedOutput: []*v1alpha1.CruiseControlOperation{
createCCRetryExecutionOperation(timeNow, "1", v1alpha1.OperationAddBroker),
createCCRetryExecutionOperation(timeNow.Add(time.Second), "2", v1alpha1.OperationRemoveBroker),
createCCRetryExecutionOperation(timeNow.Add(2*time.Second), "3", v1alpha1.OperationRemoveDisks),
createCCRetryExecutionOperation(timeNow, "4", v1alpha1.OperationRebalance),
},
},
}
for _, testCase := range testCases {
sortedCCOperations := sortOperations(testCase.ccOperations)
Expand Down
Loading

0 comments on commit 7570e22

Please sign in to comment.