Skip to content

Commit

Permalink
Remove disks endpoint using IntraBrokerDiskCapacityGoal (#12)
Browse files Browse the repository at this point in the history
* Revert "Add endpoint to move all replicas from a disk to another disk of the same broker (#7)"

This reverts commit 8b203be.

* Added new endpoint to remove disks:
- Extend IntraBrokerDiskCapacityGoal to allow moving all replicas from a 0 capacity disk
- Added new endpoint that removes disks by setting 0 capacity on them and optimizing the IntraBrokerDiskCapacityGoal

* Additional fixes:
- refactored for better readability
- throw exception if goal could not move all replicas from 0 capacity disks

* Fix typo

* Changed class field to final
  • Loading branch information
ilievladiulian authored and amuraru committed May 14, 2023
1 parent 8b290ba commit f7a3e53
Show file tree
Hide file tree
Showing 9 changed files with 83 additions and 414 deletions.
3 changes: 0 additions & 3 deletions config/cruisecontrol.properties
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,6 @@ num.proposal.precompute.threads=1
# The impact of strictness on the relative balancedness score.
#goal.balancedness.strictness.weight

# the error margin between removed disk size and remaining disk size
#remove.disks.remaining.size.error.margin=0.1

# Configurations for the executor
# =======================================

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,30 @@ public class IntraBrokerDiskCapacityGoal extends AbstractGoal {
private static final Logger LOG = LoggerFactory.getLogger(IntraBrokerDiskCapacityGoal.class);
private static final int MIN_NUM_VALID_WINDOWS = 1;
private static final Resource RESOURCE = Resource.DISK;
private final boolean _shouldEmptyZeroCapacityDisks;

/**
* Constructor for Capacity Goal.
*/
public IntraBrokerDiskCapacityGoal() {
_shouldEmptyZeroCapacityDisks = false;
}

/**
* Constructor for Intra Broker Disk Capacity Goal.
*
* @param shouldEmptyZeroCapacityDisks specifies if the goal should move all replicas from disks with 0 capacity
*/
public IntraBrokerDiskCapacityGoal(boolean shouldEmptyZeroCapacityDisks) {
_shouldEmptyZeroCapacityDisks = shouldEmptyZeroCapacityDisks;
}

/**
* Package private for unit test.
*/
IntraBrokerDiskCapacityGoal(BalancingConstraint constraint) {
_balancingConstraint = constraint;
_shouldEmptyZeroCapacityDisks = false;
}

@Override
Expand Down Expand Up @@ -149,8 +160,12 @@ public ActionAcceptance actionAcceptance(BalancingAction action, ClusterModel cl
protected boolean selfSatisfied(ClusterModel clusterModel, BalancingAction action) {
Replica sourceReplica = clusterModel.broker(action.sourceBrokerId()).replica(action.topicPartition());
Disk destinationDisk = clusterModel.broker(action.destinationBrokerId()).disk(action.destinationBrokerLogdir());
return sourceReplica.load().expectedUtilizationFor(RESOURCE) > 0
&& isMovementAcceptableForCapacity(sourceReplica, destinationDisk);
boolean shouldMoveReplica = sourceReplica.load().expectedUtilizationFor(RESOURCE) > 0;
if (_shouldEmptyZeroCapacityDisks) {
// should also move replicas with 0 expected disk utilization
shouldMoveReplica = sourceReplica.load().expectedUtilizationFor(RESOURCE) >= 0;
}
return shouldMoveReplica && isMovementAcceptableForCapacity(sourceReplica, destinationDisk);
}

/**
Expand Down Expand Up @@ -194,7 +209,7 @@ public int compare(Disk disk1, Disk disk2) {
if (d == null) {
LOG.debug("Failed to move replica {} to any disk {} in broker {}", replica, candidateDisks, replica.broker());
}
if (!isUtilizationOverLimit(disk)) {
if (!isUtilizationOverLimit(disk) && !_shouldEmptyZeroCapacityDisks) {
break;
}
}
Expand All @@ -221,6 +236,10 @@ protected void updateGoalState(ClusterModel clusterModel, OptimizationOptions op
throw new OptimizationFailureException(String.format("[%s] Utilization (%.2f) for disk %s on broker %d is above capacity limit.",
name(), disk.utilization(), disk, broker.id()), recommendation);
}
if (_shouldEmptyZeroCapacityDisks && disk.capacity() == 0 && disk.replicas().size() > 0) {
throw new OptimizationFailureException(String.format("[%s] Could not move all replicas from disk %s on broker %d",
name(), disk.logDir(), broker.id()));
}
}
}
finish();
Expand All @@ -239,12 +258,17 @@ public ModelCompletenessRequirements clusterModelCompletenessRequirements() {

/**
* Check whether the combined replica utilization is above the given disk capacity limits.
* If _shouldEmptyZeroCapacityDisks is true, the disk utilization is over limit only if it is greater than 0.
*
* @param disk Disk to be checked for capacity limit violation.
* @return {@code true} if utilization is over the limit, {@code false} otherwise.
*/
private boolean isUtilizationOverLimit(Disk disk) {
return disk.utilization() > disk.capacity() * _balancingConstraint.capacityThreshold(RESOURCE);
boolean diskUtilizationValid = true;
if (_shouldEmptyZeroCapacityDisks) {
diskUtilizationValid = disk.utilization() > 0;
}
return diskUtilizationValid && disk.utilization() > disk.capacity() * _balancingConstraint.capacityThreshold(RESOURCE);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,15 +460,6 @@ public final class AnalyzerConfig {
String.format("The class implements %s interface and is used to generate replica to broker set mapping.",
ReplicaToBrokerSetMappingPolicy.class.getName());

/**
* <code>remove.disks.remaining.size.error.margin</code>
*/
public static final String REMOVE_DISKS_REMAINING_SIZE_ERROR_MARGIN = "remove.disks.remaining.size.error.margin";
public static final double DEFAULT_REMOVE_DISKS_REMAINING_SIZE_ERROR_MARGIN = 0.1;
public static final String REMOVE_DISKS_REMAINING_SIZE_ERROR_MARGIN_DOC = "The margin of error between the remaining and the "
+ "removed disk sizes. The ratio between the removed and the remaining size should be greater than this parameter. The minimum "
+ "value is 0.05 (5%).";

private AnalyzerConfig() {
}

Expand Down Expand Up @@ -709,12 +700,6 @@ public static ConfigDef define(ConfigDef configDef) {
.define(REPLICA_TO_BROKER_SET_MAPPING_POLICY_CLASS_CONFIG,
ConfigDef.Type.CLASS, DEFAULT_REPLICA_TO_BROKER_SET_MAPPING_POLICY_CLASS,
ConfigDef.Importance.LOW,
REPLICA_TO_BROKER_SET_MAPPING_POLICY_CLASS_DOC)
.define(REMOVE_DISKS_REMAINING_SIZE_ERROR_MARGIN,
ConfigDef.Type.DOUBLE,
DEFAULT_REMOVE_DISKS_REMAINING_SIZE_ERROR_MARGIN,
atLeast(0.05),
ConfigDef.Importance.LOW,
REMOVE_DISKS_REMAINING_SIZE_ERROR_MARGIN_DOC);
REPLICA_TO_BROKER_SET_MAPPING_POLICY_CLASS_DOC);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,13 @@ public void setState(Disk.State newState) {
}
}

/**
* Set disk capacity to 0 to mark it for removal.
*/
public void markDiskForRemoval() {
_capacity = 0;
}

/**
* Add replica to the disk.
*
Expand Down
Loading

0 comments on commit f7a3e53

Please sign in to comment.