Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove disk feature #53

Merged
merged 7 commits into from
May 4, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions api/v1alpha1/common_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ const (
OperationAddBroker CruiseControlTaskOperation = "add_broker"
// OperationRemoveBroker means a Cruise Control remove_broker operation
OperationRemoveBroker CruiseControlTaskOperation = "remove_broker"
// OperationRemoveDisks means a Cruise Control remove_disks operation
OperationRemoveDisks CruiseControlTaskOperation = "remove_disks"
// OperationRebalance means a Cruise Control rebalance operation
OperationRebalance CruiseControlTaskOperation = "rebalance"
// KafkaAccessTypeRead states that a user wants consume access to a topic
Expand Down
10 changes: 8 additions & 2 deletions api/v1alpha1/cruisecontroloperation_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ const (
ErrorPolicyRetry ErrorPolicyType = "retry"
// DefaultRetryBackOffDurationSec defines the time between retries of the failed tasks.
DefaultRetryBackOffDurationSec = 30
// PauseLabel defines the label key for pausing Cruise Control operations.
PauseLabel = "pause"
True = "true"
)

//+kubebuilder:object:root=true
Expand Down Expand Up @@ -184,7 +187,7 @@ func (o *CruiseControlOperation) IsDone() bool {
}

func (o *CruiseControlOperation) IsPaused() bool {
return o.GetLabels()["pause"] == "true"
return o.GetLabels()[PauseLabel] == True
}

func (o *CruiseControlOperation) IsErrorPolicyIgnore() bool {
Expand Down Expand Up @@ -221,5 +224,8 @@ func (o *CruiseControlOperation) IsCurrentTaskFinished() bool {

func (o *CruiseControlOperation) IsCurrentTaskOperationValid() bool {
return o.CurrentTaskOperation() == OperationAddBroker ||
o.CurrentTaskOperation() == OperationRebalance || o.CurrentTaskOperation() == OperationRemoveBroker || o.CurrentTaskOperation() == OperationStopExecution
o.CurrentTaskOperation() == OperationRebalance ||
o.CurrentTaskOperation() == OperationRemoveBroker ||
o.CurrentTaskOperation() == OperationStopExecution ||
o.CurrentTaskOperation() == OperationRemoveDisks
}
28 changes: 15 additions & 13 deletions api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

54 changes: 44 additions & 10 deletions api/v1beta1/common_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,24 +66,39 @@ type PKIBackend string
// CruiseControlVolumeState holds information about the state of volume rebalance
type CruiseControlVolumeState string

// IsRunningState returns true if CruiseControlVolumeState indicates
// that the CC operation is scheduled and in-progress
func (s CruiseControlVolumeState) IsRunningState() bool {
// IsDiskRebalanceRunning returns true if CruiseControlVolumeState indicates
// that the CC rebalance disk operation is scheduled and in-progress
func (s CruiseControlVolumeState) IsDiskRebalanceRunning() bool {
return s == GracefulDiskRebalanceRunning ||
s == GracefulDiskRebalanceCompletedWithError ||
s == GracefulDiskRebalancePaused ||
s == GracefulDiskRebalanceScheduled
}

// IsRequiredState returns true if CruiseControlVolumeState is in GracefulDiskRebalanceRequired state
// IsDiskRemovalRunning returns true if CruiseControlVolumeState indicates
// that the CC remove disks operation is scheduled and in-progress
func (s CruiseControlVolumeState) IsDiskRemovalRunning() bool {
return s == GracefulDiskRemovalRunning ||
s == GracefulDiskRemovalCompletedWithError ||
s == GracefulDiskRemovalPaused ||
s == GracefulDiskRemovalScheduled
}

// IsRequiredState returns true if CruiseControlVolumeState is in GracefulDiskRebalanceRequired state or GracefulDiskRemovalRequired state
func (s CruiseControlVolumeState) IsRequiredState() bool {
return s == GracefulDiskRebalanceRequired
return s == GracefulDiskRebalanceRequired ||
s == GracefulDiskRemovalRequired
}

// IsActive returns true if CruiseControlVolumeState is in active state
// IsDiskRebalance returns true if CruiseControlVolumeState is in disk rebalance state
// the controller needs to take care of.
func (s CruiseControlVolumeState) IsActive() bool {
return s.IsRunningState() || s == GracefulDiskRebalanceRequired
func (s CruiseControlVolumeState) IsDiskRebalance() bool {
return s.IsDiskRebalanceRunning() || s == GracefulDiskRebalanceRequired
}

// IsDiskRemoval returns true if CruiseControlVolumeState is in disk removal state
func (s CruiseControlVolumeState) IsDiskRemoval() bool {
return s.IsDiskRemovalRunning() || s == GracefulDiskRemovalRequired
}

// IsUpscale returns true if CruiseControlState in GracefulUpscale* state.
Expand Down Expand Up @@ -138,11 +153,16 @@ func (r CruiseControlState) IsSucceeded() bool {
r == GracefulUpscaleSucceeded
}

// IsSucceeded returns true if CruiseControlVolumeState is succeeded
func (r CruiseControlVolumeState) IsSucceeded() bool {
// IsDiskRebalanceSucceeded returns true if CruiseControlVolumeState is disk rebalance succeeded
func (r CruiseControlVolumeState) IsDiskRebalanceSucceeded() bool {
return r == GracefulDiskRebalanceSucceeded
}

// IsDiskRemovalSucceeded returns true if CruiseControlVolumeState is disk removal succeeded
func (r CruiseControlVolumeState) IsDiskRemovalSucceeded() bool {
return r == GracefulDiskRemovalSucceeded
}

// IsSSL determines if the receiver is using SSL
func (r SecurityProtocol) IsSSL() bool {
return r.Equal(SecurityProtocolSaslSSL) || r.Equal(SecurityProtocolSSL)
Expand Down Expand Up @@ -255,6 +275,20 @@ const (
// GracefulDownscalePaused states that the broker downscale task is completed with an error and it will not be retried, it is paused. In this case further downscale tasks can be executed
GracefulDownscalePaused CruiseControlState = "GracefulDownscalePaused"

// Disk removal cruise control states
// GracefulDiskRemovalRequired states that the broker volume needs to be removed
GracefulDiskRemovalRequired CruiseControlVolumeState = "GracefulDiskRemovalRequired"
// GracefulDiskRemovalRunning states that for the broker volume a CC disk removal is in progress
GracefulDiskRemovalRunning CruiseControlVolumeState = "GracefulDiskRemovalRunning"
// GracefulDiskRemovalSucceeded states that the for the broker volume removal has succeeded
GracefulDiskRemovalSucceeded CruiseControlVolumeState = "GracefulDiskRemovalSucceeded"
// GracefulDiskRemovalScheduled states that the broker volume removal CCOperation is created and the task is waiting for execution
GracefulDiskRemovalScheduled CruiseControlVolumeState = "GracefulDiskRemovalScheduled"
// GracefulDiskRemovalCompletedWithError states that the broker volume removal task completed with an error
GracefulDiskRemovalCompletedWithError CruiseControlVolumeState = "GracefulDiskRemovalCompletedWithError"
// GracefulDiskRemovalPaused states that the broker volume removal task is completed with an error and it will not be retried, it is paused
GracefulDiskRemovalPaused CruiseControlVolumeState = "GracefulDiskRemovalPaused"

// Disk rebalance cruise control states
// GracefulDiskRebalanceRequired states that the broker volume needs a CC disk rebalance
GracefulDiskRebalanceRequired CruiseControlVolumeState = "GracefulDiskRebalanceRequired"
Expand Down
28 changes: 15 additions & 13 deletions api/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

72 changes: 53 additions & 19 deletions controllers/cruisecontroloperation_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@ const (
var (
defaultRequeueIntervalInSeconds = 10
executionPriorityMap = map[banzaiv1alpha1.CruiseControlTaskOperation]int{
banzaiv1alpha1.OperationAddBroker: 2,
banzaiv1alpha1.OperationRemoveBroker: 1,
banzaiv1alpha1.OperationAddBroker: 3,
banzaiv1alpha1.OperationRemoveBroker: 2,
banzaiv1alpha1.OperationRemoveDisks: 1,
banzaiv1alpha1.OperationRebalance: 0,
}
missingCCResErr = errors.New("missing Cruise Control user task result")
Expand Down Expand Up @@ -197,7 +198,11 @@ func (r *CruiseControlOperationReconciler) Reconcile(ctx context.Context, reques
return reconciled()
}

ccOperationExecution := selectOperationForExecution(ccOperationQueueMap)
ccOperationExecution, err := r.selectOperationForExecution(ccOperationQueueMap)
if err != nil {
log.Error(err, "requeue event as selecting operation for execution failed")
return requeueAfter(defaultRequeueIntervalInSeconds)
}
// There is nothing to be executed for now, requeue
if ccOperationExecution == nil {
return requeueAfter(defaultRequeueIntervalInSeconds)
Expand Down Expand Up @@ -265,6 +270,8 @@ func (r *CruiseControlOperationReconciler) executeOperation(ctx context.Context,
cruseControlTaskResult, err = r.scaler.RemoveBrokersWithParams(ctx, ccOperationExecution.CurrentTaskParameters())
case banzaiv1alpha1.OperationRebalance:
cruseControlTaskResult, err = r.scaler.RebalanceWithParams(ctx, ccOperationExecution.CurrentTaskParameters())
case banzaiv1alpha1.OperationRemoveDisks:
cruseControlTaskResult, err = r.scaler.RemoveDisksWithParams(ctx, ccOperationExecution.CurrentTaskParameters())
case banzaiv1alpha1.OperationStopExecution:
cruseControlTaskResult, err = r.scaler.StopExecution(ctx)
default:
Expand Down Expand Up @@ -300,29 +307,56 @@ func sortOperations(ccOperations []*banzaiv1alpha1.CruiseControlOperation) map[s
return ccOperationQueueMap
}

func selectOperationForExecution(ccOperationQueueMap map[string][]*banzaiv1alpha1.CruiseControlOperation) *banzaiv1alpha1.CruiseControlOperation {
// SELECTING OPERATION FOR EXECUTION
var ccOperationExecution *banzaiv1alpha1.CruiseControlOperation
// selectOperationForExecution selects the next operation to be executed
func (r *CruiseControlOperationReconciler) selectOperationForExecution(ccOperationQueueMap map[string][]*banzaiv1alpha1.CruiseControlOperation) (*banzaiv1alpha1.CruiseControlOperation, error) {
// First prio: execute the finalize task
switch {
case len(ccOperationQueueMap[ccOperationForStopExecution]) > 0:
ccOperationExecution = ccOperationQueueMap[ccOperationForStopExecution][0]
ccOperationExecution.CurrentTask().Operation = banzaiv1alpha1.OperationStopExecution
if op := getFirstOperation(ccOperationQueueMap, ccOperationForStopExecution); op != nil {
op.CurrentTask().Operation = banzaiv1alpha1.OperationStopExecution
return op, nil
}

// Second prio: execute add_broker operation
case len(ccOperationQueueMap[ccOperationFirstExecution]) > 0 && ccOperationQueueMap[ccOperationFirstExecution][0].CurrentTaskOperation() == banzaiv1alpha1.OperationAddBroker:
ccOperationExecution = ccOperationQueueMap[ccOperationFirstExecution][0]
if op := getFirstOperation(ccOperationQueueMap, ccOperationFirstExecution); op != nil &&
op.CurrentTaskOperation() == banzaiv1alpha1.OperationAddBroker {
return op, nil
}

// Third prio: execute failed task
case len(ccOperationQueueMap[ccOperationRetryExecution]) > 0:
if op := getFirstOperation(ccOperationQueueMap, ccOperationRetryExecution); op != nil {
// If there is a failed remove_disks task and there is a rebalance_disks task in the queue, we execute the rebalance_disks task
// This could only happen if the user tried to delete a disk, and later rolled back the change
if op.CurrentTaskOperation() == banzaiv1alpha1.OperationRemoveDisks {
for _, opFirstExecution := range ccOperationQueueMap[ccOperationFirstExecution] {
if opFirstExecution.CurrentTaskOperation() == banzaiv1alpha1.OperationRebalance {
// Mark the remove disk operation as paused, so it is not retried
op.Labels[banzaiv1alpha1.PauseLabel] = True
err := r.Client.Update(context.TODO(), op)
if err != nil {
return nil, errors.WrapIfWithDetails(err, "failed to update Cruise Control operation", "name", op.Name, "namespace", op.Namespace)
}

// Execute the rebalance disks operation
return opFirstExecution, nil
}
}
}

// When the default backoff duration elapsed we retry
if ccOperationQueueMap[ccOperationRetryExecution][0].IsReadyForRetryExecution() {
ccOperationExecution = ccOperationQueueMap[ccOperationRetryExecution][0]
if op.IsReadyForRetryExecution() {
return op, nil
}
// Forth prio: execute the first element in the FirstExecutionQueue which is ordered by operation type and k8s creation timestamp
case len(ccOperationQueueMap[ccOperationFirstExecution]) > 0:
ccOperationExecution = ccOperationQueueMap[ccOperationFirstExecution][0]
}

return ccOperationExecution
// Fourth prio: execute the first element in the FirstExecutionQueue which is ordered by operation type and k8s creation timestamp
return getFirstOperation(ccOperationQueueMap, ccOperationFirstExecution), nil
}

// getFirstOperation returns the first operation in the given queue
func getFirstOperation(ccOperationQueueMap map[string][]*banzaiv1alpha1.CruiseControlOperation, key string) *banzaiv1alpha1.CruiseControlOperation {
if len(ccOperationQueueMap[key]) > 0 {
return ccOperationQueueMap[key][0]
}
return nil
}

// SetupCruiseControlWithManager registers cruise control controller to the manager
Expand Down
15 changes: 15 additions & 0 deletions controllers/cruisecontroloperation_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,21 @@ func TestSortOperations(t *testing.T) {
createCCRetryExecutionOperation(timeNow, "3", v1alpha1.OperationRebalance),
},
},
{
testName: "mixed with remove disks",
ccOperations: []*v1alpha1.CruiseControlOperation{
createCCRetryExecutionOperation(timeNow, "1", v1alpha1.OperationAddBroker),
createCCRetryExecutionOperation(timeNow, "4", v1alpha1.OperationRebalance),
createCCRetryExecutionOperation(timeNow.Add(2*time.Second), "3", v1alpha1.OperationRemoveDisks),
createCCRetryExecutionOperation(timeNow.Add(time.Second), "2", v1alpha1.OperationRemoveBroker),
},
expectedOutput: []*v1alpha1.CruiseControlOperation{
createCCRetryExecutionOperation(timeNow, "1", v1alpha1.OperationAddBroker),
createCCRetryExecutionOperation(timeNow.Add(time.Second), "2", v1alpha1.OperationRemoveBroker),
createCCRetryExecutionOperation(timeNow.Add(2*time.Second), "3", v1alpha1.OperationRemoveDisks),
createCCRetryExecutionOperation(timeNow, "4", v1alpha1.OperationRebalance),
},
},
}
for _, testCase := range testCases {
sortedCCOperations := sortOperations(testCase.ccOperations)
Expand Down
Loading