Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add endpoint to move all replicas from a disk to another disk of the same broker #7

Merged
merged 19 commits into from
Aug 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
e7a178a
Add endpoint to move replicas from a specified disk to another disk o…
ilievladiulian Jul 13, 2022
5ab953e
Remove unnecessary set of disk state to demoted
ilievladiulian Jul 14, 2022
0ca209c
Fix checkstyle errors
ilievladiulian Jul 14, 2022
564924e
Fix spotbugs warnings
ilievladiulian Jul 14, 2022
3b83621
Add endpoint open api spec
ilievladiulian Jul 14, 2022
00a0ba1
Fix copyright notice header year
ilievladiulian Jul 14, 2022
5700fb3
Removed unnecessary constructor
ilievladiulian Jul 22, 2022
cbce917
Validate requested log dirs exist
ilievladiulian Jul 27, 2022
cbe7f46
Refactor optimization function to improve readability
ilievladiulian Jul 27, 2022
ce764e3
Added configuration in cruisecontrol.properties for a margin of error…
ilievladiulian Aug 1, 2022
dea3b21
Added unit test for the disk removal goal
ilievladiulian Aug 1, 2022
6233c50
Added example for logdir param in api spec
ilievladiulian Aug 1, 2022
e5599e4
Changed example for remove disks logdirs to a list instead of single …
ilievladiulian Aug 1, 2022
41f5187
Check for disk utilization instead of disk capacity when validating r…
ilievladiulian Aug 2, 2022
247ecdd
Refactored to improve readability
ilievladiulian Aug 2, 2022
02748c9
Added test for high disk utilization
ilievladiulian Aug 2, 2022
3c4a92f
Added more tests for disk removal goal
ilievladiulian Aug 2, 2022
11ae051
Compacted similar functionality tests and added new test for the roun…
ilievladiulian Aug 4, 2022
9f1c9b0
Fix api spec endpoint summary
ilievladiulian Aug 23, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions config/cruisecontrol.properties
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,9 @@ num.proposal.precompute.threads=1
# The impact of strictness on the relative balancedness score.
#goal.balancedness.strictness.weight

# the error margin between removed disk size and remaining disk size
#remove.disks.remaining.size.error.margin=0.1

# Configurations for the executor
# =======================================

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
/*
* Copyright 2022 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information.
*/

package com.linkedin.kafka.cruisecontrol.analyzer.goals;

import com.linkedin.kafka.cruisecontrol.analyzer.OptimizationOptions;
import com.linkedin.kafka.cruisecontrol.analyzer.ActionAcceptance;
import com.linkedin.kafka.cruisecontrol.analyzer.BalancingAction;
import com.linkedin.kafka.cruisecontrol.analyzer.ProvisionResponse;
import com.linkedin.kafka.cruisecontrol.analyzer.ProvisionStatus;
import com.linkedin.kafka.cruisecontrol.common.Resource;
import com.linkedin.kafka.cruisecontrol.model.Broker;
import com.linkedin.kafka.cruisecontrol.model.ClusterModel;
import com.linkedin.kafka.cruisecontrol.model.ClusterModelStats;
import com.linkedin.kafka.cruisecontrol.model.Disk;
import com.linkedin.kafka.cruisecontrol.model.Replica;
import com.linkedin.kafka.cruisecontrol.monitor.ModelCompletenessRequirements;
import java.util.Set;
import java.util.Map;
import java.util.List;
import java.util.ArrayList;
import java.util.Comparator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.linkedin.kafka.cruisecontrol.analyzer.ActionAcceptance.ACCEPT;
import static com.linkedin.kafka.cruisecontrol.analyzer.goals.GoalUtils.MIN_NUM_VALID_WINDOWS_FOR_SELF_HEALING;


/**
* Soft goal to move the replicas to different log dir.
*/
public class DiskRemovalGoal implements Goal {
private static final Logger LOG = LoggerFactory.getLogger(DiskRemovalGoal.class);
private static final Double EPSILON = 0.0001;

private final ProvisionResponse _provisionResponse;
protected final Map<Integer, Set<String>> _brokerIdAndLogdirs;
protected final double _errorMargin;

public DiskRemovalGoal(Map<Integer, Set<String>> brokerIdAndLogdirs, double errorMargin) {
_provisionResponse = new ProvisionResponse(ProvisionStatus.UNDECIDED);
_brokerIdAndLogdirs = brokerIdAndLogdirs;
_errorMargin = errorMargin;
}

private void sanityCheckOptimizationOptions(OptimizationOptions optimizationOptions) {
if (optimizationOptions.isTriggeredByGoalViolation()) {
throw new IllegalArgumentException(String.format("%s goal does not support use by goal violation detector.", name()));
}
}

@Override
public boolean optimize(ClusterModel clusterModel, Set<Goal> optimizedGoals, OptimizationOptions optimizationOptions) {
sanityCheckOptimizationOptions(optimizationOptions);

for (Map.Entry<Integer, Set<String>> brokerIdLogDirs : _brokerIdAndLogdirs.entrySet()) {
Integer brokerId = brokerIdLogDirs.getKey();
Set<String> logDirsToRemove = brokerIdLogDirs.getValue();
relocateBrokerLogDirs(clusterModel, brokerId, logDirsToRemove);
}

return true;
}

/**
* This method relocates the replicas on the provided log dirs to other log dirs of the same broker.
*
* @param clusterModel the cluster model
* @param brokerId the id of the broker where the movement will take place
* @param logDirsToRemove the set of log dirs to be removed from the broker
*/
private void relocateBrokerLogDirs(ClusterModel clusterModel, Integer brokerId, Set<String> logDirsToRemove) {
Broker currentBroker = clusterModel.broker(brokerId);
List<Disk> remainingDisks = new ArrayList<>();
currentBroker.disks().stream().filter(disk -> !logDirsToRemove.contains(disk.logDir())).forEach(remainingDisks::add);
remainingDisks.sort(Comparator.comparing(Disk::logDir));
List<Replica> replicasToMove = getReplicasToMoveAsListSortedBySizeDesc(currentBroker, logDirsToRemove);

int usedDiskIdx = -1;
for (Replica replicaToMove : replicasToMove) {
usedDiskIdx = relocateReplicaIfPossible(clusterModel, brokerId, remainingDisks, replicaToMove, usedDiskIdx);
}
}

/**
* This method provides the list of replicas to be moved sorted in descending order by the disk utilization.
*
* @param broker the broker where the replicas are
* @param logDirs the log dirs where the replicas are
* @return the sorted list of replicas to be moved
*/
private List<Replica> getReplicasToMoveAsListSortedBySizeDesc(Broker broker, Set<String> logDirs) {
List<Replica> replicasToMove = new ArrayList<>();
for (String logDir : logDirs) {
Set<Replica> logDirReplicas = broker.disk(logDir).replicas();
replicasToMove.addAll(logDirReplicas);
}

replicasToMove.sort(Comparator.comparingDouble(o -> ((Replica) o).load().expectedUtilizationFor(Resource.DISK)).reversed());
return replicasToMove;
}

/**
* This method relocates the given replica on one of the candidate disks in a round-robin manner if there is enough space
*
* @param clusterModel the cluster model
* @param brokerId the broker id where the replica movement occurs
* @param remainingDisks the candidate disks on which to move the replica
* @param replica the replica to move
* @param usedDiskIdx the index of the last disk used to relocate replicas
* @return the index of the disk used to relocate the replica to
*/
private int relocateReplicaIfPossible(ClusterModel clusterModel, Integer brokerId, List<Disk> remainingDisks, Replica replica, int usedDiskIdx) {
int remainingDisksNumber = remainingDisks.size();
int diskIndex = (usedDiskIdx + 1) % remainingDisksNumber;
for (int i = 0; i < remainingDisksNumber; i++) {
Disk destinationDisk = remainingDisks.get(diskIndex);
if (isEnoughSpace(destinationDisk, replica)) {
clusterModel.relocateReplica(replica.topicPartition(), brokerId, destinationDisk.logDir());
return diskIndex;
}
diskIndex = (diskIndex + 1) % remainingDisksNumber;
}
LOG.info("Could not move replica {} to any of the remaining disks.", replica);
return usedDiskIdx;
}

/**
* This method checks if the usage on the disk that the replica will be moved to is lower than the disk capacity
* including the error margin.
*
* @param disk the disk on which the replica can be moved
* @param replica the replica to move
* @return boolean which reflects if there is enough disk space to move the replica
*/
private boolean isEnoughSpace(Disk disk, Replica replica) {
double futureUsage = disk.utilization() + replica.load().expectedUtilizationFor(Resource.DISK);
double remainingSpacePercentage = (1 - (futureUsage / disk.capacity()));
return remainingSpacePercentage > _errorMargin
|| (remainingSpacePercentage > 0 && Math.abs(remainingSpacePercentage - _errorMargin) < EPSILON);
}

@Override
public ActionAcceptance actionAcceptance(BalancingAction action, ClusterModel clusterModel) {
return ACCEPT;
}

@Override
public ClusterModelStatsComparator clusterModelStatsComparator() {
return new ClusterModelStatsComparator() {
@Override
public int compare(ClusterModelStats stats1, ClusterModelStats stats2) {
return 0;
}

@Override
public String explainLastComparison() {
return String.format("Comparison for the %s is irrelevant.", name());
}
};
}

@Override
public ModelCompletenessRequirements clusterModelCompletenessRequirements() {
return new ModelCompletenessRequirements(MIN_NUM_VALID_WINDOWS_FOR_SELF_HEALING, 0, true);
}

@Override
public String name() {
return DiskRemovalGoal.class.getSimpleName();
}

@Override
public void finish() {

}

@Override
public boolean isHardGoal() {
return false;
}

@Override
public ProvisionStatus provisionStatus() {
// Provision status computation is not relevant to PLE goal.
return provisionResponse().status();
}

@Override
public ProvisionResponse provisionResponse() {
return _provisionResponse;
}

@Override
public void configure(Map<String, ?> configs) {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,15 @@ public final class AnalyzerConfig {
+ "Users can run goal optimizations in fast mode by setting the fast_mode parameter to true in relevant endpoints. "
+ "This mode intends to provide a more predictable runtime for goal optimizations.";

/**
* <code>remove.disks.remaining.size.error.margin</code>
*/
public static final String REMOVE_DISKS_REMAINING_SIZE_ERROR_MARGIN = "remove.disks.remaining.size.error.margin";
public static final double DEFAULT_REMOVE_DISKS_REMAINING_SIZE_ERROR_MARGIN = 0.1;
public static final String REMOVE_DISKS_REMAINING_SIZE_ERROR_MARGIN_DOC = "The margin of error between the remaining and the "
+ "removed disk sizes. The ratio between the removed and the remaining size should be greater than this parameter. The minimum "
+ "value is 0.05 (5%).";

private AnalyzerConfig() {
}

Expand Down Expand Up @@ -623,6 +632,12 @@ public static ConfigDef define(ConfigDef configDef) {
DEFAULT_FAST_MODE_PER_BROKER_MOVE_TIMEOUT_MS,
atLeast(1),
ConfigDef.Importance.LOW,
FAST_MODE_PER_BROKER_MOVE_TIMEOUT_MS_DOC);
FAST_MODE_PER_BROKER_MOVE_TIMEOUT_MS_DOC)
.define(REMOVE_DISKS_REMAINING_SIZE_ERROR_MARGIN,
ConfigDef.Type.DOUBLE,
DEFAULT_REMOVE_DISKS_REMAINING_SIZE_ERROR_MARGIN,
atLeast(0.05),
ConfigDef.Importance.LOW,
REMOVE_DISKS_REMAINING_SIZE_ERROR_MARGIN_DOC);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,27 @@

package com.linkedin.kafka.cruisecontrol.config.constants;

import com.linkedin.kafka.cruisecontrol.servlet.parameters.AddBrokerParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.AdminParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.BootstrapParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.RemoveDisksParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.PauseResumeParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.StopProposalParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.TrainParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.ClusterLoadParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.CruiseControlStateParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.DemoteBrokerParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.FixOfflineReplicasParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.KafkaClusterStateParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.PartitionLoadParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.PauseResumeParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.BootstrapParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.ProposalsParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.RebalanceParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.RemoveBrokerParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.CruiseControlStateParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.KafkaClusterStateParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.UserTasksParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.ReviewBoardParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.ReviewParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.RightsizeParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.StopProposalParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.AddBrokerParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.RemoveBrokerParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.FixOfflineReplicasParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.DemoteBrokerParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.RebalanceParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.AdminParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.TopicConfigurationParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.TrainParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.UserTasksParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.RightsizeParameters;
import org.apache.kafka.common.config.ConfigDef;


Expand Down Expand Up @@ -179,6 +180,13 @@ public final class CruiseControlParametersConfig {
public static final String DEFAULT_RIGHTSIZE_PARAMETERS_CLASS = RightsizeParameters.class.getName();
public static final String RIGHTSIZE_PARAMETERS_CLASS_DOC = "The class for parameters of a provision rightsize request.";

/**
* <code>remove.disks.parameters.class</code>
*/
public static final String REMOVE_DISKS_PARAMETERS_CLASS_CONFIG = "remove.disks.parameters.class";
public static final String DEFAULT_REMOVE_DISKS_PARAMETERS_CLASS = RemoveDisksParameters.class.getName();
public static final String REMOVE_DISKS_PARAMETERS_CLASS_DOC = "The class for parameters of a disks removal request.";

private CruiseControlParametersConfig() {
}

Expand Down Expand Up @@ -293,6 +301,11 @@ public static ConfigDef define(ConfigDef configDef) {
ConfigDef.Type.CLASS,
DEFAULT_RIGHTSIZE_PARAMETERS_CLASS,
ConfigDef.Importance.MEDIUM,
RIGHTSIZE_PARAMETERS_CLASS_DOC);
RIGHTSIZE_PARAMETERS_CLASS_DOC)
.define(REMOVE_DISKS_PARAMETERS_CLASS_CONFIG,
ConfigDef.Type.CLASS,
DEFAULT_REMOVE_DISKS_PARAMETERS_CLASS,
ConfigDef.Importance.MEDIUM,
REMOVE_DISKS_PARAMETERS_CLASS_DOC);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,18 @@

package com.linkedin.kafka.cruisecontrol.config.constants;

import com.linkedin.kafka.cruisecontrol.servlet.handler.async.AddBrokerRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.ClusterLoadRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.CruiseControlStateRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.DemoteRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.FixOfflineReplicasRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.PartitionLoadRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.ProposalsRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.RebalanceRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.CruiseControlStateRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.TopicConfigurationRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.AddBrokerRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.RemoveBrokerRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.RemoveDisksRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.DemoteRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.RebalanceRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.FixOfflineReplicasRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.sync.RightsizeRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.TopicConfigurationRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.sync.AdminRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.sync.BootstrapRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.sync.KafkaClusterStateRequest;
Expand Down Expand Up @@ -181,6 +182,13 @@ public final class CruiseControlRequestConfig {
public static final String DEFAULT_RIGHTSIZE_REQUEST_CLASS = RightsizeRequest.class.getName();
public static final String RIGHTSIZE_REQUEST_CLASS_DOC = "The class to handle a provision rightsize request.";

/**
* <code>remove.disks.request.class</code>
*/
public static final String REMOVE_DISKS_REQUEST_CLASS_CONFIG = "remove.disks.request.class";
public static final String DEFAULT_REMOVE_DISKS_REQUEST_CLASS = RemoveDisksRequest.class.getName();
public static final String REMOVE_DISKS_REQUEST_CLASS_DOC = "The class to handle a disks removal request.";

private CruiseControlRequestConfig() {
}

Expand Down Expand Up @@ -295,6 +303,11 @@ public static ConfigDef define(ConfigDef configDef) {
ConfigDef.Type.CLASS,
DEFAULT_RIGHTSIZE_REQUEST_CLASS,
ConfigDef.Importance.MEDIUM,
RIGHTSIZE_REQUEST_CLASS_DOC);
RIGHTSIZE_REQUEST_CLASS_DOC)
.define(REMOVE_DISKS_REQUEST_CLASS_CONFIG,
ConfigDef.Type.CLASS,
DEFAULT_REMOVE_DISKS_REQUEST_CLASS,
ConfigDef.Importance.MEDIUM,
REMOVE_DISKS_REQUEST_CLASS_DOC);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ public enum CruiseControlEndPoint implements EndPoint {
ADMIN(CRUISE_CONTROL_ADMIN),
REVIEW(CRUISE_CONTROL_ADMIN),
TOPIC_CONFIGURATION(KAFKA_ADMIN),
RIGHTSIZE(KAFKA_ADMIN);
RIGHTSIZE(KAFKA_ADMIN),
REMOVE_DISKS(KAFKA_ADMIN);

private static final List<CruiseControlEndPoint> CACHED_VALUES = List.of(values());
private static final List<CruiseControlEndPoint> GET_ENDPOINTS = Arrays.asList(BOOTSTRAP,
Expand All @@ -57,7 +58,8 @@ public enum CruiseControlEndPoint implements EndPoint {
ADMIN,
REVIEW,
TOPIC_CONFIGURATION,
RIGHTSIZE);
RIGHTSIZE,
REMOVE_DISKS);

private final EndpointType _endpointType;

Expand Down
Loading