-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add endpoint to move all replicas from a disk to another disk of the same broker #7
Conversation
@@ -0,0 +1,46 @@ | |||
/* | |||
* Copyright 2018 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* Copyright 2018 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information. | |
* Copyright 2022 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information. |
@ilievladiulian Please also add the new endpoint to https://github.com/adobe/cruise-control/tree/master/cruise-control/src/yaml |
Fixed unit test
default: true | ||
- name: goals | ||
in: query | ||
description: List of goals used to generate proposal, the default goals will be used if this parameter is not specified. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how are the goals in thie endpoint being used? In my mind, moving data intra-broker wouldn't need inter-broker goals right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These parameters were required by the open api spec test because the RemoveDisksParameters class extended GoalBasedOptimizationParameters, but they were not used in the actual endpoint. Found a way to remove them and be ignored by the tests.
additionalProperties: | ||
type: array | ||
items: | ||
type: string |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
example?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added an example in the latest commit (ex: 1-/tmp/kafka-logs-1,1-/tmp/kafka-logs-2)
type: string | ||
example: "Balance disk utilization across all brokers in the cluster." | ||
- name: brokerid_and_logdirs | ||
in: query |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this required?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the parameter is required. Updated spec to reflect that.
protected final Map<Integer, Set<String>> _brokerIdAndLogdirs; | ||
|
||
/** | ||
* Constructor to be used for creating a runnable for self-healing. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
stale java doc?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it was a java doc for a constructor which is not used, just copy-paste leftover from another runnable.
String anomalyId, | ||
Supplier<String> reasonSupplier, | ||
boolean stopOngoingExecution) { | ||
super(kafkaCruiseControl, new OperationFuture("Broker Removal for Self-Healing"), selfHealingGoals, allowCapacityEstimation, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Broker Removal for Self-Healing
reads confusing here. Can you expand of that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code is from a constructor which is not used (copy-paste leftover). Removed it in latest commits.
@ilievladiulian can you please also add unit and integration tests for this new feature? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice!
Left some questions, most importantly about URL length limitations and disk capacity validations.
type: string | ||
example: "Balance disk utilization across all brokers in the cluster." | ||
- name: brokerid_and_logdirs | ||
in: query |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A disk might have a lot of log dirs. If we want to remove the disk, and the disk has 1000 replicas (which is not much), each log dir having 50 chars, will we hit any URL limits having >50k chars (excluding protocol, host and other query params) in the URL string?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally you need a way to specify which logdirs to remove, as Kafka does not have a notion of disks but of logdirs. Number of replicas might not make sense to talk about here.
In terms of number of logdirs, they are usually a small number (as they are logical paths on a disk) ex. kafka-logs1
or kafka-logs2
.
You could hit limit if you want to remove a lot of them at the same time though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ecojan thanks for clarifying! Thought we need to pass the path down to partition dir e.g. /kafka-logs1/kafka/aep_pipeline_test_topic_1-0
, not disk path e.g. /kafka-logs1
. I think there is no danger of URL length overflow then.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As @ecojan mentioned, the parameter is not exactly a logdir, but a disk path to the logdirs. The endpoint moves all replicas from that path to another path configured on the same broker.
@Override | ||
public boolean optimize(ClusterModel clusterModel, Set<Goal> optimizedGoals, OptimizationOptions optimizationOptions) { | ||
sanityCheckOptimizationOptions(optimizationOptions); | ||
|
||
for (Map.Entry<Integer, Set<String>> brokerIdLogDirs : _brokerIdAndLogdirs.entrySet()) { | ||
Integer brokerId = brokerIdLogDirs.getKey(); | ||
Set<String> logDirs = brokerIdLogDirs.getValue(); | ||
|
||
Broker currentBroker = clusterModel.broker(brokerId); | ||
List<Disk> remainingDisks = new ArrayList<>(); | ||
currentBroker.disks().stream().filter(disk -> !logDirs.contains(disk.logDir())).forEach(remainingDisks::add); | ||
|
||
int step = 0; | ||
int size = remainingDisks.size(); | ||
while (!logDirs.isEmpty()) { | ||
String logDirToRemove = (String) logDirs.toArray()[0]; | ||
|
||
Set<Replica> replicasToMove = currentBroker.disk(logDirToRemove).replicas(); | ||
while (!replicasToMove.isEmpty()) { | ||
Replica replica = (Replica) replicasToMove.toArray()[0]; | ||
clusterModel.relocateReplica(replica.topicPartition(), brokerId, remainingDisks.get(step % size).logDir()); | ||
} | ||
|
||
logDirs.remove(logDirToRemove); | ||
step++; | ||
} | ||
} | ||
|
||
return true; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Override | |
public boolean optimize(ClusterModel clusterModel, Set<Goal> optimizedGoals, OptimizationOptions optimizationOptions) { | |
sanityCheckOptimizationOptions(optimizationOptions); | |
for (Map.Entry<Integer, Set<String>> brokerIdLogDirs : _brokerIdAndLogdirs.entrySet()) { | |
Integer brokerId = brokerIdLogDirs.getKey(); | |
Set<String> logDirs = brokerIdLogDirs.getValue(); | |
Broker currentBroker = clusterModel.broker(brokerId); | |
List<Disk> remainingDisks = new ArrayList<>(); | |
currentBroker.disks().stream().filter(disk -> !logDirs.contains(disk.logDir())).forEach(remainingDisks::add); | |
int step = 0; | |
int size = remainingDisks.size(); | |
while (!logDirs.isEmpty()) { | |
String logDirToRemove = (String) logDirs.toArray()[0]; | |
Set<Replica> replicasToMove = currentBroker.disk(logDirToRemove).replicas(); | |
while (!replicasToMove.isEmpty()) { | |
Replica replica = (Replica) replicasToMove.toArray()[0]; | |
clusterModel.relocateReplica(replica.topicPartition(), brokerId, remainingDisks.get(step % size).logDir()); | |
} | |
logDirs.remove(logDirToRemove); | |
step++; | |
} | |
} | |
return true; | |
} | |
@Override | |
public boolean optimize(ClusterModel clusterModel, Set<Goal> optimizedGoals, OptimizationOptions optimizationOptions) { | |
sanityCheckOptimizationOptions(optimizationOptions); | |
for (Map.Entry<Integer, Set<String>> brokerIdLogDirs : _brokerIdAndLogdirs.entrySet()) { | |
relocateReplicas(clusterModel, brokerIdLogDirs); | |
} | |
return true; | |
} | |
private void relocateReplicas(ClusterModel clusterModel, Entry<Integer, Set<String>> brokerIdLogDirs) { | |
Integer brokerId = brokerIdLogDirs.getKey(); | |
Set<String> logDirsToRemove = brokerIdLogDirs.getValue(); | |
Broker currentBroker = clusterModel.broker(brokerId); | |
List<Disk> remainingDisks = currentBroker.disks().stream() | |
.filter(currentDisk -> !logDirsToRemove.contains(currentDisk.logDir())) | |
.collect(Collectors.toList()); | |
int logDirToRemoveCount = 0; | |
int remainingDisksNumber = remainingDisks.size(); | |
for (String logDirToRemove : logDirsToRemove) { | |
for (Replica replicaToMove : currentBroker.disk(logDirToRemove).replicas()) { | |
clusterModel.relocateReplica(replicaToMove.topicPartition(), brokerId, remainingDisks.get(logDirToRemoveCount % remainingDisksNumber).logDir()); | |
} | |
logDirToRemoveCount++; | |
} | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would the above suggestion work? What is different:
- the use of
while (!logDirs.isEmpty())
and(String) logDirs.toArray()[0]
to iterate through a set looked very odd (same with the nestedreplicasToMove
loop) - some variable names could be more expressive (arguable, subjective), to make it easier to be read (e.g.
logDirs
,step
orsize
) - we reduce the complexity (a little) with all the nested loops, by extracting a method
It is optional, you decide.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactored the code to improve readability. As a note, there are 2 nested loops there. The outside loop is refactored to use a for each iterator, however the inside loop calls a function which changes its iterator. This is why it is treated as a loop over a queue, so that it can change during the loop.
Integer brokerId = entry.getKey(); | ||
Set<String> logDirs = entry.getValue(); | ||
Broker broker = clusterModel.broker(brokerId); | ||
if (broker.disks().size() < logDirs.size()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we validate against erroneous log dir names too, not only size?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added validation against configured log dir names as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, would also differentiate the error messages when the broker dir names are an issue versus when the broker dir number is an issue. Right now the error messages are the same.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed log message to differentiate between issues.
remainingCapacity += disk.capacity(); | ||
} | ||
} | ||
if (removedCapacity > remainingCapacity) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also add a margin of error here? Edge case, if the capacities are equal, it would still be an issue, as it would result in full disks. Maybe a 5-10%? cc @amuraru
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree here as well, we should have a configurable margin here.
For background info, if a Kafka disk ends up getting filled to 100%, it will only cause issues (such as broker unavailability).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added an error margin check there, which is configurable.
Cleaned up available endpoint params
… between removed and remaining disk sizes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Solid work @ilievladiulian
@ilievladiulian one thing that's not very clear to me is whether the removed/decommissioned disks are kept as such (disabled) for a well defined period. Can you confirm whether this functionality is encoded in this PR? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good! Great job @ilievladiulian
Co-authored-by: Adrian Muraru <adi.muraru@gmail.com>
@amuraru the disks are not marked as disabled after this operation. I identified no trivial way to do this and after discussing internally we convened to use this functionality as it is right now with the following workflow:
As a note, even if we are able to mark the disks as disabled and no rebalance will run, the disks can still be used by Kafka if a new topic is created. |
…same broker (#7) Add endpoint to move replicas from a specified disk to other disks of the same broker
…same broker (#7) Add endpoint to move replicas from a specified disk to other disks of the same broker
…same broker (#7) Add endpoint to move replicas from a specified disk to other disks of the same broker
…same broker (#7) Add endpoint to move replicas from a specified disk to other disks of the same broker
…same broker (#7) Add endpoint to move replicas from a specified disk to other disks of the same broker
…same broker (#7) Add endpoint to move replicas from a specified disk to other disks of the same broker
…same broker (#7) Add endpoint to move replicas from a specified disk to other disks of the same broker
…same broker (#7) Add endpoint to move replicas from a specified disk to other disks of the same broker
…same broker (#7) Add endpoint to move replicas from a specified disk to other disks of the same broker
* Revert "Add endpoint to move all replicas from a disk to another disk of the same broker (#7)" This reverts commit 8b203be. * Added new endpoint to remove disks: - Extend IntraBrokerDiskCapacityGoal to allow moving all replicas from a 0 capacity disk - Added new endpoint that removes disks by setting 0 capacity on them and optimizing the IntraBrokerDiskCapacityGoal * Additional fixes: - refactored for better readability - throw exception if goal could not move all replicas from 0 capacity disks * Fix typo * Changed class field to final
…same broker (#7) Add endpoint to move replicas from a specified disk to other disks of the same broker
* Revert "Add endpoint to move all replicas from a disk to another disk of the same broker (#7)" This reverts commit 8b203be. * Added new endpoint to remove disks: - Extend IntraBrokerDiskCapacityGoal to allow moving all replicas from a 0 capacity disk - Added new endpoint that removes disks by setting 0 capacity on them and optimizing the IntraBrokerDiskCapacityGoal * Additional fixes: - refactored for better readability - throw exception if goal could not move all replicas from 0 capacity disks * Fix typo * Changed class field to final
* Revert "Add endpoint to move all replicas from a disk to another disk of the same broker (#7)" This reverts commit 8b203be. * Added new endpoint to remove disks: - Extend IntraBrokerDiskCapacityGoal to allow moving all replicas from a 0 capacity disk - Added new endpoint that removes disks by setting 0 capacity on them and optimizing the IntraBrokerDiskCapacityGoal * Additional fixes: - refactored for better readability - throw exception if goal could not move all replicas from 0 capacity disks * Fix typo * Changed class field to final
…same broker (#7) Add endpoint to move replicas from a specified disk to other disks of the same broker
* Revert "Add endpoint to move all replicas from a disk to another disk of the same broker (#7)" This reverts commit 8b203be. * Added new endpoint to remove disks: - Extend IntraBrokerDiskCapacityGoal to allow moving all replicas from a 0 capacity disk - Added new endpoint that removes disks by setting 0 capacity on them and optimizing the IntraBrokerDiskCapacityGoal * Additional fixes: - refactored for better readability - throw exception if goal could not move all replicas from 0 capacity disks * Fix typo * Changed class field to final
…same broker (#7) Add endpoint to move replicas from a specified disk to other disks of the same broker
* Revert "Add endpoint to move all replicas from a disk to another disk of the same broker (#7)" This reverts commit 8b203be. * Added new endpoint to remove disks: - Extend IntraBrokerDiskCapacityGoal to allow moving all replicas from a 0 capacity disk - Added new endpoint that removes disks by setting 0 capacity on them and optimizing the IntraBrokerDiskCapacityGoal * Additional fixes: - refactored for better readability - throw exception if goal could not move all replicas from 0 capacity disks * Fix typo * Changed class field to final
* Revert "Add endpoint to move all replicas from a disk to another disk of the same broker (#7)" This reverts commit 8b203be. * Added new endpoint to remove disks: - Extend IntraBrokerDiskCapacityGoal to allow moving all replicas from a 0 capacity disk - Added new endpoint that removes disks by setting 0 capacity on them and optimizing the IntraBrokerDiskCapacityGoal * Additional fixes: - refactored for better readability - throw exception if goal could not move all replicas from 0 capacity disks * Fix typo * Changed class field to final
…same broker (#7) Add endpoint to move replicas from a specified disk to other disks of the same broker
* Revert "Add endpoint to move all replicas from a disk to another disk of the same broker (#7)" This reverts commit 8b203be. * Added new endpoint to remove disks: - Extend IntraBrokerDiskCapacityGoal to allow moving all replicas from a 0 capacity disk - Added new endpoint that removes disks by setting 0 capacity on them and optimizing the IntraBrokerDiskCapacityGoal * Additional fixes: - refactored for better readability - throw exception if goal could not move all replicas from 0 capacity disks * Fix typo * Changed class field to final
This PR introduces a new endpoint that moves replicas from a specified disk to another disk of the same broker. This will allow the removal of the disk without the loss of data.
What is new
The new endpoint is available as follows:
POST /remove_disks?brokerid_and_logdirs=[brokerid1-logdir1,brokerid2-logdir2...]
What is missing
After all replicas are moved from the specified disk, the disk may still be used by CC during rebalances and Kafka can still use it when creating topics.