-
Notifications
You must be signed in to change notification settings - Fork 604
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add endpoint to move all replicas from a disk to other disks of the same broker #1908
Add endpoint to move all replicas from a disk to other disks of the same broker #1908
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this may work, but the "DiskRemovalGoal" as a goal is sort suspicious.
Could you briefly explain why the implementation is different from removeBroker endpoint? E.g. remove broker is not a goal, and simply mark the broker as dead. If we do same thing to removeDisk endpoint, would it work?
items: | ||
type: string | ||
required: true | ||
example: 101-/tmp/kafka-logs-1,101-/tmp/kafka-logs-2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How did you handle if the broker/logDir doesn't exist?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see it's in checkCanRemoveDisks in RemoveDisksRunnable. Since this is a Goal, what if user directly call it from the rebalance endpoint?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @CCisGG!
- The remove broker endpoint marks the broker as dead, however we do not need to mark the whole broker as dead, only some of its disks.
I saw that there is an enum for disk states which contains the DEAD value, however when generating the cluster model at the start of the runnable (clusterModel = _kafkaCruiseControl.clusterModel(...)
) the broker disk configuration is always read from the capacity configuration file. So even if the disk is marked as dead during the remove disks call, at a following rebalance call the cluster model will be read from the capacity file and the disk will have a healthy state. Please correct me if I am wrong.
I looked into this when trying to find a way to stop further rebalances to use the removed disks (until they are specifically removed from the Kafka broker configuration), but changing how the cluster model is read or how the disks are marked as dead seemed a bit too complex and could potentially affect other functionalities. - Indeed the broker/logDir combination is validated in
RemoveDisksRunnable.checkCanRemoveDisks
. This goal should not be executed outside of a remove disks call. To prevent being called from the rebalance endpoint,DiskRemovalGoal.optimize
first callsDiskRemovalGoal.sanityCheckOptimizationOptions
which throws an exception if the optimization was triggered by a goal violation. If this is not the way to prevent the call from the rebalance endpoint please tell.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made a second pass.
I still feel it is weird to make it a goal. I'll think more about it.
With this implementation, I think at least you need to add protection in KafkaCruiseControlConfig.sanityCheckGoalNames, to avoid this goal being triggered NOT by the endpoint.
Also I think you need to rebase the latest 2.5 branch as it has some changes to servlet.
validParameterNames.add(STOP_ONGOING_EXECUTION_PARAM); | ||
CASE_INSENSITIVE_PARAMETER_NAMES = Collections.unmodifiableSortedSet(validParameterNames); | ||
} | ||
protected boolean _dryRun; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are they protected for testing purpose?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, just followed how other parameters classes were implemented. Changed to private.
operationId: removeDisks | ||
summary: Move all replicas from the specified disk to other disks of the same broker. | ||
parameters: | ||
- name: dryrun |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like "skip_hard_goal_check" is missing here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed unnecessary parameter.
import java.util.stream.Collectors; | ||
|
||
import static com.linkedin.kafka.cruisecontrol.config.constants.AnalyzerConfig.REMOVE_DISKS_REMAINING_SIZE_ERROR_MARGIN; | ||
import static com.linkedin.kafka.cruisecontrol.servlet.handler.async.runnable.RunnableUtils.*; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid star imports.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
...ava/com/linkedin/kafka/cruisecontrol/servlet/handler/async/runnable/RemoveDisksRunnable.java
Show resolved
Hide resolved
protected void init() { | ||
_kafkaCruiseControl.sanityCheckDryRun(_dryRun, _stopOngoingExecution); | ||
_goalsByPriority = new ArrayList<>(1); | ||
_goalsByPriority.add(new DiskRemovalGoal(_brokerIdAndLogdirs, _errorMargin)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if you add the goal as the lowest priority as such, will that fail to optimize because it can violate previous goals?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is the only goal added, so it also has the highest priority.
if (_stopOngoingExecution) { | ||
maybeStopOngoingExecutionToModifyAndWait(_kafkaCruiseControl, _operationProgress); | ||
} | ||
_combinedCompletenessRequirements = _goalsByPriority.get(0).clusterModelCompletenessRequirements(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line seems to be copied from demoteBrokerRunnable. Could you help to explain what does it mean?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This combinedCompletenessRequirements is required when building the cluster model which the runnable uses to set capacity to 0 for the specified disks (in the new implementation). The requirements can be obtained from each goal instance so we get it here from the IntraBrokerDiskCapacityGoal.
result.goalProposals(), | ||
Collections.emptySet(), | ||
isKafkaAssignerMode(_goals), | ||
0, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest to make the constant value as static vars. It makes it easier for review.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
if (!brokerLogDirs.containsAll(logDirs)) { | ||
throw new IllegalArgumentException(String.format("Invalid log dirs provided for broker %d.", brokerId)); | ||
} | ||
if (broker.disks().size() < logDirs.size()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be possible to true given the check on line 123?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. Removed if branch.
private void checkCanRemoveDisks(Map<Integer, Set<String>> brokerIdAndLogdirs, ClusterModel clusterModel) { | ||
for (Map.Entry<Integer, Set<String>> entry : brokerIdAndLogdirs.entrySet()) { | ||
Integer brokerId = entry.getKey(); | ||
Set<String> logDirs = entry.getValue(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe change the name to something like "logDirsToRemove"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
currentUsage += disk.utilization(); | ||
} | ||
} | ||
double futureUsage = removedUsage + currentUsage; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like you don't need to record removedUsage and currentUsage separately? The only used var is futureUsage, or total usage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. Used only futureUsage.
This is a valid concern actually. Do you have a solution so far? |
Reading the src code, it looks like the inter broker replica movement will move the replica to the logDir with same name of the dst broker. E.g. brokerA.logDir1 -> brokerB.logDir1. It doesn't care about whether the logDir is removed or not: cruise-control/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/model/Broker.java Line 365 in 29f9185
I think you probably need to come up with some solutions to address this issue. |
|
Hey @ilievladiulian eager to see this enhancement move forward, is this still being worked on? |
We need to move kafka data to a pvc with different class (higher IOPS) and this would really help us with that and not commit seppuku :) |
} | ||
diskIndex = (diskIndex + 1) % remainingDisksNumber; | ||
} | ||
LOG.info("Could not move replica {} to any of the remaining disks.", replica); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that the fact that a replica couldn't be moved to any of the remaining disks is treated as a success? Shouldn't we return failure in this case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed in commit ca18da1.
6f7aea3
to
2ea6352
Compare
Hi @CCisGG! I pushed a new implementation for the remove disks endpoint. Instead of creating a new goal for disk removal, the endpoint reuses the IntraBrokerDiskCapacityGoal that uses intra-broker replica movement so the disk usage is kept under a certain threshold. |
I did not look into more detail into preventing CC from using the deleted disks from further rebalances, as there is still a possibility for those disks to be used by Kafka when creating new topics and this cannot be prevented.
The endpoint only uses intra-broker replica movement to move replicas from the deleted disks. It is not influenced by inter-broker replica movements. |
@CCisGG As stated in a previous comment, even if we prevent CC from using those disks, Kafka can still use them and this can't be prevented. If you think it still has value to prevent CC from using them, let me know and I will look into it with your suggestions in mind. |
@alex-necula Added exception if goal could not move all replicas from deleted disks, so that the task ends with the CompletedWithError status. |
Hi @CCisGG! Did you get a chance to look at the latest changes? |
Hi folks, I'll take a pass next week. |
@@ -43,19 +43,30 @@ public class IntraBrokerDiskCapacityGoal extends AbstractGoal { | |||
private static final Logger LOG = LoggerFactory.getLogger(IntraBrokerDiskCapacityGoal.class); | |||
private static final int MIN_NUM_VALID_WINDOWS = 1; | |||
private static final Resource RESOURCE = Resource.DISK; | |||
private final boolean _shouldEmptyZeroCapacityDisks; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious. Even if we don't add this boolean, shouldn't cruise control move replicas out if you set disk capacity to 0?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be more intuitive if cruise control moved all replicas if the disk capacity is set to 0. However, initial tests showed that it does not (in the case of 0 load replicas) and I added this flag to make sure I do not change how the goal worked before.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The only reason I can think of is cc tries to avoid moving replicas if possible. Since empty replicas does not affect balance, CC tires to avoid it.
* | ||
* @param disk Disk to be checked for capacity limit violation. | ||
* @return {@code true} if utilization is over the limit, {@code false} otherwise. | ||
*/ | ||
private boolean isUtilizationOverLimit(Disk disk) { | ||
return disk.utilization() > disk.capacity() * _balancingConstraint.capacityThreshold(RESOURCE); | ||
boolean diskUtilizationValid = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
�I see, so you do not want to trigger the movement if the disk utilization is already 0. I'm curious why do we add the logic here? Say if a disk utilization is already 0, what happens if we return true by this method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the disk utilisation is 0 and the capacity is 0, it would not be considered over utilised, although there may be empty replicas. This broke the loop at line 213. I removed the 0 utilisation case and did an extra check on the flag in the if statement at line 212. This was under the assumption that if there is a replica on that disk, the utilisation will be greater than 0.
However, while writing the tests I saw that this assumption was wrong. There could be a 0 utilisation on the disk for empty replicas, at least in the tests environment. I fixed the formula in the latest commit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall I think this new implementation is smart and should work. Left some minor comment. Also please add necessary unit test as part of this PR. Thanks for the contribution!
Is this still being worked on? @ilievladiulian This is increasingly common for us with disk limitations in cloud providers causing us to have to rotate disks for faster storage tiers. This would be a big win for us! |
- Extend IntraBrokerDiskCapacityGoal to allow moving all replicas from a 0 capacity disk - Added new endpoint that removes disks by setting 0 capacity on them and optimizing the IntraBrokerDiskCapacityGoal
- refactored for better readability - throw exception if goal could not move all replicas from 0 capacity disks
- fixed typo in comment - changed class field to be final
Added unit tests for intra-broker disk capacity goal
bb38e7f
to
b1df366
Compare
Hi @CCisGG! Thank you for the review. I added unit tests in the latest commit and answered the comments. |
Hi @CCisGG! Did you get a chance to look at the latest changes? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left more comments.
@@ -43,19 +43,30 @@ public class IntraBrokerDiskCapacityGoal extends AbstractGoal { | |||
private static final Logger LOG = LoggerFactory.getLogger(IntraBrokerDiskCapacityGoal.class); | |||
private static final int MIN_NUM_VALID_WINDOWS = 1; | |||
private static final Resource RESOURCE = Resource.DISK; | |||
private final boolean _shouldEmptyZeroCapacityDisks; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also please add a comment on this variable, something like "This is only used for removeReplicasFromBroker endpoint"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good note. Added the comment in the latest commit.
@@ -194,7 +209,7 @@ public int compare(Disk disk1, Disk disk2) { | |||
if (d == null) { | |||
LOG.debug("Failed to move replica {} to any disk {} in broker {}", replica, candidateDisks, replica.broker()); | |||
} | |||
if (!isUtilizationOverLimit(disk)) { | |||
if (!isUtilizationOverLimit(disk) && !_shouldEmptyZeroCapacityDisks) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to add extra check here? If you set capacity to 0, I think isUtilizationOverLimit always return true until all replicas moved out. I feel this extra condition is not necessary, but let me know what do you think
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok after rethink about it, it seems to just match the change in L163..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. After fixing the isUtilizationOverLimit
method, this is no longer needed. Removed it in the latest commit.
@@ -43,19 +43,30 @@ public class IntraBrokerDiskCapacityGoal extends AbstractGoal { | |||
private static final Logger LOG = LoggerFactory.getLogger(IntraBrokerDiskCapacityGoal.class); | |||
private static final int MIN_NUM_VALID_WINDOWS = 1; | |||
private static final Resource RESOURCE = Resource.DISK; | |||
private final boolean _shouldEmptyZeroCapacityDisks; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The only reason I can think of is cc tries to avoid moving replicas if possible. Since empty replicas does not affect balance, CC tires to avoid it.
@@ -149,8 +160,12 @@ public ActionAcceptance actionAcceptance(BalancingAction action, ClusterModel cl | |||
protected boolean selfSatisfied(ClusterModel clusterModel, BalancingAction action) { | |||
Replica sourceReplica = clusterModel.broker(action.sourceBrokerId()).replica(action.topicPartition()); | |||
Disk destinationDisk = clusterModel.broker(action.destinationBrokerId()).disk(action.destinationBrokerLogdir()); | |||
return sourceReplica.load().expectedUtilizationFor(RESOURCE) > 0 | |||
&& isMovementAcceptableForCapacity(sourceReplica, destinationDisk); | |||
boolean shouldMoveReplica = sourceReplica.load().expectedUtilizationFor(RESOURCE) > 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd recommend to change this to:
boolean shouldMoveReplica;
if (_shouldEmptyZeroCapacityDisks) {
// should also move replicas with 0 expected disk utilization
shouldMoveReplica = sourceReplica.load().expectedUtilizationFor(RESOURCE) >= 0;
} else {
// In general CC tries to reduce number of replica move. Since empty replicas does not affect balance, CC tries to avoid moving them.
shouldMoveReplica = sourceReplica.load().expectedUtilizationFor(RESOURCE) > 0;
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good note. It looks more clear this way. Refactored in latest commit.
_goalsByPriority = new ArrayList<>(1); | ||
Goal intraBrokerDiskCapacityGoal = new IntraBrokerDiskCapacityGoal(true); | ||
intraBrokerDiskCapacityGoal.configure(_kafkaCruiseControl.config().mergedConfigValues()); | ||
_goalsByPriority.add(intraBrokerDiskCapacityGoal); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can do something like this instead:
_goalsByPriority = Collections.singletonList(intraBrokerDiskCapacityGoal);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good note. One observation, in GoalBasedOperationRunnable
after the goal is completed, the _goalsByPriority
list is cleared. This does not work with Collections.singletonList
as it provides an immutable list. In the latest commit, I used Collections.singletonList
but wrapped in an ArrayList
constructor call so the list would be mutable.
Hi @CCisGG! Addressed your comments in the latest commit. I also added some logging in the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. Thanks for the great work! Could you please run some ad-hoc tests on state/cluster_state and on the new endpoint that you just added?
Feel free to let me know if the tests run well and I can merge the PR.
Hi @CCisGG! Thank you for the time to review this PR. The disk removal request returned: After operation ended the cluster state is: Running the Running the disk removal again but for the second disk moves all replicas to the first disk. Again, |
We are all waiting for this feature is there a workaround which could help us move data between disks and remove it safely? |
@sappusaketh Releasing 2.5.127 with this change. |
@ilievladiulian The CI workflow fails. Could you take a look at the issue? https://app.circleci.com/pipelines/github/linkedin/cruise-control/2259/workflows/ce2fdd75-615f-4784-9cb9-7b6d47fdb1d2/jobs/6294
Seems like there is a compile issue. |
Hi @ilievladiulian, any updates on this one? It's blocking the PR/release pipeline. |
Hi @CCisGG! Sorry for the delay. From what I can see, there was a change in the API for |
Hi @ilievladiulian, I was curious and wanted to know, is there some reason we removed the |
This PR resolves #1907 .
This PR introduces a new endpoint that moves replicas from a specified disk to other disks of the same broker. This will allow the removal of the disk without the loss of data.
The replicas are moved in a round-robin manner to the remaining disks, from the largest to the smallest, while checking the following constraint:
where
What is new
The new endpoint is available as follows:
What is missing
After all replicas are moved from the specified disk, the disk may still be used by CC during rebalances and Kafka can still use it when creating topics.