Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add endpoint to move all replicas from a disk to another disk of the same broker #7

Merged
merged 19 commits into from
Aug 24, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
e7a178a
Add endpoint to move replicas from a specified disk to another disk o…
ilievladiulian Jul 13, 2022
5ab953e
Remove unnecessary set of disk state to demoted
ilievladiulian Jul 14, 2022
0ca209c
Fix checkstyle errors
ilievladiulian Jul 14, 2022
564924e
Fix spotbugs warnings
ilievladiulian Jul 14, 2022
3b83621
Add endpoint open api spec
ilievladiulian Jul 14, 2022
00a0ba1
Fix copyright notice header year
ilievladiulian Jul 14, 2022
5700fb3
Removed unnecessary constructor
ilievladiulian Jul 22, 2022
cbce917
Validate requested log dirs exist
ilievladiulian Jul 27, 2022
cbe7f46
Refactor optimization function to improve readability
ilievladiulian Jul 27, 2022
ce764e3
Added configuration in cruisecontrol.properties for a margin of error…
ilievladiulian Aug 1, 2022
dea3b21
Added unit test for the disk removal goal
ilievladiulian Aug 1, 2022
6233c50
Added example for logdir param in api spec
ilievladiulian Aug 1, 2022
e5599e4
Changed example for remove disks logdirs to a list instead of single …
ilievladiulian Aug 1, 2022
41f5187
Check for disk utilization instead of disk capacity when validating r…
ilievladiulian Aug 2, 2022
247ecdd
Refactored to improve readability
ilievladiulian Aug 2, 2022
02748c9
Added test for high disk utilization
ilievladiulian Aug 2, 2022
3c4a92f
Added more tests for disk removal goal
ilievladiulian Aug 2, 2022
11ae051
Compacted similar functionality tests and added new test for the roun…
ilievladiulian Aug 4, 2022
9f1c9b0
Fix api spec endpoint summary
ilievladiulian Aug 23, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Copyright 2018 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information.
*/

package com.linkedin.kafka.cruisecontrol.analyzer.goals;

import com.linkedin.kafka.cruisecontrol.analyzer.OptimizationOptions;
import com.linkedin.kafka.cruisecontrol.analyzer.ActionAcceptance;
import com.linkedin.kafka.cruisecontrol.analyzer.BalancingAction;
import com.linkedin.kafka.cruisecontrol.analyzer.ProvisionResponse;
import com.linkedin.kafka.cruisecontrol.analyzer.ProvisionStatus;
import com.linkedin.kafka.cruisecontrol.model.Broker;
import com.linkedin.kafka.cruisecontrol.model.ClusterModel;
import com.linkedin.kafka.cruisecontrol.model.ClusterModelStats;
import com.linkedin.kafka.cruisecontrol.model.Disk;
import com.linkedin.kafka.cruisecontrol.model.Replica;
import com.linkedin.kafka.cruisecontrol.monitor.ModelCompletenessRequirements;
import java.util.Set;
import java.util.Map;
import java.util.Collections;
import java.util.List;
import java.util.ArrayList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.linkedin.kafka.cruisecontrol.analyzer.ActionAcceptance.ACCEPT;
import static com.linkedin.kafka.cruisecontrol.analyzer.goals.GoalUtils.MIN_NUM_VALID_WINDOWS_FOR_SELF_HEALING;


/**
* Soft goal to move the replicas to different log dir.
*/
public class DiskRemovalGoal implements Goal {
private static final Logger LOG = LoggerFactory.getLogger(DiskRemovalGoal.class);
private final ProvisionResponse _provisionResponse;

protected final Map<Integer, Set<String>> _brokerIdAndLogdirs;

public DiskRemovalGoal() {
this(Collections.emptyMap());
}

public DiskRemovalGoal(Map<Integer, Set<String>> brokerIdAndLogdirs) {
_provisionResponse = new ProvisionResponse(ProvisionStatus.UNDECIDED);
_brokerIdAndLogdirs = brokerIdAndLogdirs;
}

private void sanityCheckOptimizationOptions(OptimizationOptions optimizationOptions) {
if (optimizationOptions.isTriggeredByGoalViolation()) {
throw new IllegalArgumentException(String.format("%s goal does not support use by goal violation detector.", name()));
}
}

@Override
public boolean optimize(ClusterModel clusterModel, Set<Goal> optimizedGoals, OptimizationOptions optimizationOptions) {
sanityCheckOptimizationOptions(optimizationOptions);

for (Map.Entry<Integer, Set<String>> brokerIdLogDirs : _brokerIdAndLogdirs.entrySet()) {
Integer brokerId = brokerIdLogDirs.getKey();
Set<String> logDirs = brokerIdLogDirs.getValue();

Broker currentBroker = clusterModel.broker(brokerId);
List<Disk> remainingDisks = new ArrayList<>();
currentBroker.disks().stream().filter(disk -> !logDirs.contains(disk.logDir())).forEach(remainingDisks::add);

int step = 0;
int size = remainingDisks.size();
while (!logDirs.isEmpty()) {
String logDirToRemove = (String) logDirs.toArray()[0];

Set<Replica> replicasToMove = currentBroker.disk(logDirToRemove).replicas();
while (!replicasToMove.isEmpty()) {
Replica replica = (Replica) replicasToMove.toArray()[0];
clusterModel.relocateReplica(replica.topicPartition(), brokerId, remainingDisks.get(step % size).logDir());
}

logDirs.remove(logDirToRemove);
step++;
}
}

return true;
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
@Override
public boolean optimize(ClusterModel clusterModel, Set<Goal> optimizedGoals, OptimizationOptions optimizationOptions) {
sanityCheckOptimizationOptions(optimizationOptions);
for (Map.Entry<Integer, Set<String>> brokerIdLogDirs : _brokerIdAndLogdirs.entrySet()) {
Integer brokerId = brokerIdLogDirs.getKey();
Set<String> logDirs = brokerIdLogDirs.getValue();
Broker currentBroker = clusterModel.broker(brokerId);
List<Disk> remainingDisks = new ArrayList<>();
currentBroker.disks().stream().filter(disk -> !logDirs.contains(disk.logDir())).forEach(remainingDisks::add);
int step = 0;
int size = remainingDisks.size();
while (!logDirs.isEmpty()) {
String logDirToRemove = (String) logDirs.toArray()[0];
Set<Replica> replicasToMove = currentBroker.disk(logDirToRemove).replicas();
while (!replicasToMove.isEmpty()) {
Replica replica = (Replica) replicasToMove.toArray()[0];
clusterModel.relocateReplica(replica.topicPartition(), brokerId, remainingDisks.get(step % size).logDir());
}
logDirs.remove(logDirToRemove);
step++;
}
}
return true;
}
@Override
public boolean optimize(ClusterModel clusterModel, Set<Goal> optimizedGoals, OptimizationOptions optimizationOptions) {
sanityCheckOptimizationOptions(optimizationOptions);
for (Map.Entry<Integer, Set<String>> brokerIdLogDirs : _brokerIdAndLogdirs.entrySet()) {
relocateReplicas(clusterModel, brokerIdLogDirs);
}
return true;
}
private void relocateReplicas(ClusterModel clusterModel, Entry<Integer, Set<String>> brokerIdLogDirs) {
Integer brokerId = brokerIdLogDirs.getKey();
Set<String> logDirsToRemove = brokerIdLogDirs.getValue();
Broker currentBroker = clusterModel.broker(brokerId);
List<Disk> remainingDisks = currentBroker.disks().stream()
.filter(currentDisk -> !logDirsToRemove.contains(currentDisk.logDir()))
.collect(Collectors.toList());
int logDirToRemoveCount = 0;
int remainingDisksNumber = remainingDisks.size();
for (String logDirToRemove : logDirsToRemove) {
for (Replica replicaToMove : currentBroker.disk(logDirToRemove).replicas()) {
clusterModel.relocateReplica(replicaToMove.topicPartition(), brokerId, remainingDisks.get(logDirToRemoveCount % remainingDisksNumber).logDir());
}
logDirToRemoveCount++;
}
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would the above suggestion work? What is different:

  • the use of while (!logDirs.isEmpty()) and (String) logDirs.toArray()[0] to iterate through a set looked very odd (same with the nested replicasToMove loop)
  • some variable names could be more expressive (arguable, subjective), to make it easier to be read (e.g. logDirs, step or size)
  • we reduce the complexity (a little) with all the nested loops, by extracting a method

It is optional, you decide.

Copy link
Author

@ilievladiulian ilievladiulian Aug 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored the code to improve readability. As a note, there are 2 nested loops there. The outside loop is refactored to use a for each iterator, however the inside loop calls a function which changes its iterator. This is why it is treated as a loop over a queue, so that it can change during the loop.


@Override
public ActionAcceptance actionAcceptance(BalancingAction action, ClusterModel clusterModel) {
return ACCEPT;
}

@Override
public ClusterModelStatsComparator clusterModelStatsComparator() {
return new ClusterModelStatsComparator() {
@Override
public int compare(ClusterModelStats stats1, ClusterModelStats stats2) {
return 0;
}

@Override
public String explainLastComparison() {
return String.format("Comparison for the %s is irrelevant.", name());
}
};
}

@Override
public ModelCompletenessRequirements clusterModelCompletenessRequirements() {
return new ModelCompletenessRequirements(MIN_NUM_VALID_WINDOWS_FOR_SELF_HEALING, 0, true);
}

@Override
public String name() {
return DiskRemovalGoal.class.getSimpleName();
}

@Override
public void finish() {

}

@Override
public boolean isHardGoal() {
return false;
}

@Override
public ProvisionStatus provisionStatus() {
// Provision status computation is not relevant to PLE goal.
return provisionResponse().status();
}

@Override
public ProvisionResponse provisionResponse() {
return _provisionResponse;
}

@Override
public void configure(Map<String, ?> configs) {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,27 @@

package com.linkedin.kafka.cruisecontrol.config.constants;

import com.linkedin.kafka.cruisecontrol.servlet.parameters.AddBrokerParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.AdminParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.BootstrapParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.RemoveDisksParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.PauseResumeParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.StopProposalParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.TrainParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.ClusterLoadParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.CruiseControlStateParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.DemoteBrokerParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.FixOfflineReplicasParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.KafkaClusterStateParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.PartitionLoadParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.PauseResumeParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.BootstrapParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.ProposalsParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.RebalanceParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.RemoveBrokerParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.CruiseControlStateParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.KafkaClusterStateParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.UserTasksParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.ReviewBoardParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.ReviewParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.RightsizeParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.StopProposalParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.AddBrokerParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.RemoveBrokerParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.FixOfflineReplicasParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.DemoteBrokerParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.RebalanceParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.AdminParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.TopicConfigurationParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.TrainParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.UserTasksParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.RightsizeParameters;
import org.apache.kafka.common.config.ConfigDef;


Expand Down Expand Up @@ -179,6 +180,13 @@ public final class CruiseControlParametersConfig {
public static final String DEFAULT_RIGHTSIZE_PARAMETERS_CLASS = RightsizeParameters.class.getName();
public static final String RIGHTSIZE_PARAMETERS_CLASS_DOC = "The class for parameters of a provision rightsize request.";

/**
* <code>remove.disks.parameters.class</code>
*/
public static final String REMOVE_DISKS_PARAMETERS_CLASS_CONFIG = "remove.disks.parameters.class";
public static final String DEFAULT_REMOVE_DISKS_PARAMETERS_CLASS = RemoveDisksParameters.class.getName();
public static final String REMOVE_DISKS_PARAMETERS_CLASS_DOC = "The class for parameters of a disks removal request.";

private CruiseControlParametersConfig() {
}

Expand Down Expand Up @@ -293,6 +301,11 @@ public static ConfigDef define(ConfigDef configDef) {
ConfigDef.Type.CLASS,
DEFAULT_RIGHTSIZE_PARAMETERS_CLASS,
ConfigDef.Importance.MEDIUM,
RIGHTSIZE_PARAMETERS_CLASS_DOC);
RIGHTSIZE_PARAMETERS_CLASS_DOC)
.define(REMOVE_DISKS_PARAMETERS_CLASS_CONFIG,
ConfigDef.Type.CLASS,
DEFAULT_REMOVE_DISKS_PARAMETERS_CLASS,
ConfigDef.Importance.MEDIUM,
REMOVE_DISKS_PARAMETERS_CLASS_DOC);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,18 @@

package com.linkedin.kafka.cruisecontrol.config.constants;

import com.linkedin.kafka.cruisecontrol.servlet.handler.async.AddBrokerRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.ClusterLoadRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.CruiseControlStateRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.DemoteRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.FixOfflineReplicasRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.PartitionLoadRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.ProposalsRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.RebalanceRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.CruiseControlStateRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.TopicConfigurationRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.AddBrokerRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.RemoveBrokerRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.RemoveDisksRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.DemoteRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.RebalanceRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.FixOfflineReplicasRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.sync.RightsizeRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.TopicConfigurationRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.sync.AdminRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.sync.BootstrapRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.sync.KafkaClusterStateRequest;
Expand Down Expand Up @@ -181,6 +182,13 @@ public final class CruiseControlRequestConfig {
public static final String DEFAULT_RIGHTSIZE_REQUEST_CLASS = RightsizeRequest.class.getName();
public static final String RIGHTSIZE_REQUEST_CLASS_DOC = "The class to handle a provision rightsize request.";

/**
* <code>remove.disks.request.class</code>
*/
public static final String REMOVE_DISKS_REQUEST_CLASS_CONFIG = "remove.disks.request.class";
public static final String DEFAULT_REMOVE_DISKS_REQUEST_CLASS = RemoveDisksRequest.class.getName();
public static final String REMOVE_DISKS_REQUEST_CLASS_DOC = "The class to handle a disks removal request.";

private CruiseControlRequestConfig() {
}

Expand Down Expand Up @@ -295,6 +303,11 @@ public static ConfigDef define(ConfigDef configDef) {
ConfigDef.Type.CLASS,
DEFAULT_RIGHTSIZE_REQUEST_CLASS,
ConfigDef.Importance.MEDIUM,
RIGHTSIZE_REQUEST_CLASS_DOC);
RIGHTSIZE_REQUEST_CLASS_DOC)
.define(REMOVE_DISKS_REQUEST_CLASS_CONFIG,
ConfigDef.Type.CLASS,
DEFAULT_REMOVE_DISKS_REQUEST_CLASS,
ConfigDef.Importance.MEDIUM,
REMOVE_DISKS_REQUEST_CLASS_DOC);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ public enum CruiseControlEndPoint implements EndPoint {
ADMIN(CRUISE_CONTROL_ADMIN),
REVIEW(CRUISE_CONTROL_ADMIN),
TOPIC_CONFIGURATION(KAFKA_ADMIN),
RIGHTSIZE(KAFKA_ADMIN);
RIGHTSIZE(KAFKA_ADMIN),
REMOVE_DISKS(KAFKA_ADMIN);

private static final List<CruiseControlEndPoint> CACHED_VALUES = List.of(values());
private static final List<CruiseControlEndPoint> GET_ENDPOINTS = Arrays.asList(BOOTSTRAP,
Expand All @@ -57,7 +58,8 @@ public enum CruiseControlEndPoint implements EndPoint {
ADMIN,
REVIEW,
TOPIC_CONFIGURATION,
RIGHTSIZE);
RIGHTSIZE,
REMOVE_DISKS);

private final EndpointType _endpointType;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ public final class KafkaCruiseControlServletUtils {
RequestParameterWrapper rightsize = new RequestParameterWrapper(RIGHTSIZE_PARAMETERS_CLASS_CONFIG,
RIGHTSIZE_PARAMETER_OBJECT_CONFIG,
RIGHTSIZE_REQUEST_CLASS_CONFIG);
RequestParameterWrapper removeDisks = new RequestParameterWrapper(REMOVE_DISKS_PARAMETERS_CLASS_CONFIG,
REMOVE_DISKS_PARAMETER_OBJECT_CONFIG,
REMOVE_DISKS_REQUEST_CLASS_CONFIG);

requestParameterConfigs.put(BOOTSTRAP, bootstrap);
requestParameterConfigs.put(TRAIN, train);
Expand All @@ -136,6 +139,7 @@ public final class KafkaCruiseControlServletUtils {
requestParameterConfigs.put(REVIEW_BOARD, reviewBoard);
requestParameterConfigs.put(TOPIC_CONFIGURATION, topicConfiguration);
requestParameterConfigs.put(RIGHTSIZE, rightsize);
requestParameterConfigs.put(REMOVE_DISKS, removeDisks);

REQUEST_PARAMETER_CONFIGS = Collections.unmodifiableMap(requestParameterConfigs);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2018 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* Copyright 2018 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information.
* Copyright 2022 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information.

*/

package com.linkedin.kafka.cruisecontrol.servlet.handler.async;

import com.linkedin.kafka.cruisecontrol.servlet.handler.async.runnable.OperationFuture;
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.runnable.RemoveDisksRunnable;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.RemoveDisksParameters;
import java.util.Map;

import static com.linkedin.cruisecontrol.common.utils.Utils.validateNotNull;
import static com.linkedin.kafka.cruisecontrol.servlet.parameters.ParameterUtils.REMOVE_DISKS_PARAMETER_OBJECT_CONFIG;

public class RemoveDisksRequest extends AbstractAsyncRequest {
protected RemoveDisksParameters _parameters;

public RemoveDisksRequest() {
super();
}

@Override
protected OperationFuture handle(String uuid) {
OperationFuture future = new OperationFuture("Remove disks");
pending(future.operationProgress());
_asyncKafkaCruiseControl.sessionExecutor().submit(new RemoveDisksRunnable(_asyncKafkaCruiseControl, future, _parameters, uuid));
return future;
}

@Override
public RemoveDisksParameters parameters() {
return _parameters;
}

@Override
public String name() {
return RemoveDisksRequest.class.getSimpleName();
}

@Override
public void configure(Map<String, ?> configs) {
super.configure(configs);
_parameters = (RemoveDisksParameters) validateNotNull(configs.get(REMOVE_DISKS_PARAMETER_OBJECT_CONFIG),
"Parameter configuration is missing from the request.");
}
}
Loading