Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add endpoint to move all replicas from a disk to other disks of the same broker #1908

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,31 @@ public class IntraBrokerDiskCapacityGoal extends AbstractGoal {
private static final Logger LOG = LoggerFactory.getLogger(IntraBrokerDiskCapacityGoal.class);
private static final int MIN_NUM_VALID_WINDOWS = 1;
private static final Resource RESOURCE = Resource.DISK;
// This is only used for the remove disks endpoint
private final boolean _shouldEmptyZeroCapacityDisks;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious. Even if we don't add this boolean, shouldn't cruise control move replicas out if you set disk capacity to 0?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be more intuitive if cruise control moved all replicas if the disk capacity is set to 0. However, initial tests showed that it does not (in the case of 0 load replicas) and I added this flag to make sure I do not change how the goal worked before.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only reason I can think of is cc tries to avoid moving replicas if possible. Since empty replicas does not affect balance, CC tires to avoid it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also please add a comment on this variable, something like "This is only used for removeReplicasFromBroker endpoint"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good note. Added the comment in the latest commit.


/**
* Constructor for Capacity Goal.
*/
public IntraBrokerDiskCapacityGoal() {
_shouldEmptyZeroCapacityDisks = false;
}

/**
* Constructor for Intra Broker Disk Capacity Goal.
*
* @param shouldEmptyZeroCapacityDisks specifies if the goal should move all replicas from disks with 0 capacity
*/
public IntraBrokerDiskCapacityGoal(boolean shouldEmptyZeroCapacityDisks) {
_shouldEmptyZeroCapacityDisks = shouldEmptyZeroCapacityDisks;
}

/**
* Package private for unit test.
*/
IntraBrokerDiskCapacityGoal(BalancingConstraint constraint) {
_balancingConstraint = constraint;
_shouldEmptyZeroCapacityDisks = false;
}

@Override
Expand Down Expand Up @@ -149,8 +161,15 @@ public ActionAcceptance actionAcceptance(BalancingAction action, ClusterModel cl
protected boolean selfSatisfied(ClusterModel clusterModel, BalancingAction action) {
Replica sourceReplica = clusterModel.broker(action.sourceBrokerId()).replica(action.topicPartition());
Disk destinationDisk = clusterModel.broker(action.destinationBrokerId()).disk(action.destinationBrokerLogdir());
return sourceReplica.load().expectedUtilizationFor(RESOURCE) > 0
&& isMovementAcceptableForCapacity(sourceReplica, destinationDisk);
boolean shouldMoveReplica;
if (_shouldEmptyZeroCapacityDisks) {
// should also move replicas with 0 expected disk utilization
shouldMoveReplica = sourceReplica.load().expectedUtilizationFor(RESOURCE) >= 0;
} else {
// In general CC tries to reduce number of replica move. Since empty replicas does not affect balance, CC tries to avoid moving them.
shouldMoveReplica = sourceReplica.load().expectedUtilizationFor(RESOURCE) > 0;
}
return shouldMoveReplica && isMovementAcceptableForCapacity(sourceReplica, destinationDisk);
}

/**
Expand Down Expand Up @@ -221,6 +240,10 @@ protected void updateGoalState(ClusterModel clusterModel, OptimizationOptions op
throw new OptimizationFailureException(String.format("[%s] Utilization (%.2f) for disk %s on broker %d is above capacity limit.",
name(), disk.utilization(), disk, broker.id()), recommendation);
}
if (_shouldEmptyZeroCapacityDisks && disk.capacity() == 0 && !disk.replicas().isEmpty()) {
throw new OptimizationFailureException(String.format("[%s] Could not move all replicas from disk %s on broker %d",
name(), disk.logDir(), broker.id()));
}
}
}
finish();
Expand All @@ -239,11 +262,15 @@ public ModelCompletenessRequirements clusterModelCompletenessRequirements() {

/**
* Check whether the combined replica utilization is above the given disk capacity limits.
* If _shouldEmptyZeroCapacityDisks is true, the disk utilization is over limit only if it is greater than 0.
*
* @param disk Disk to be checked for capacity limit violation.
* @return {@code true} if utilization is over the limit, {@code false} otherwise.
*/
private boolean isUtilizationOverLimit(Disk disk) {
if (_shouldEmptyZeroCapacityDisks && disk.capacity() == 0) {
return disk.utilization() > 0 || !disk.replicas().isEmpty();
}
return disk.utilization() > disk.capacity() * _balancingConstraint.capacityThreshold(RESOURCE);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,27 @@

package com.linkedin.kafka.cruisecontrol.config.constants;

import com.linkedin.kafka.cruisecontrol.servlet.parameters.AddBrokerParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.AdminParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.BootstrapParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.RemoveDisksParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.PauseResumeParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.StopProposalParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.TrainParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.ClusterLoadParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.CruiseControlStateParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.DemoteBrokerParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.FixOfflineReplicasParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.KafkaClusterStateParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.PartitionLoadParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.PauseResumeParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.BootstrapParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.ProposalsParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.RebalanceParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.RemoveBrokerParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.CruiseControlStateParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.KafkaClusterStateParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.UserTasksParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.ReviewBoardParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.ReviewParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.RightsizeParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.StopProposalParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.AddBrokerParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.RemoveBrokerParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.FixOfflineReplicasParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.DemoteBrokerParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.RebalanceParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.AdminParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.TopicConfigurationParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.TrainParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.UserTasksParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.RightsizeParameters;
import org.apache.kafka.common.config.ConfigDef;


Expand Down Expand Up @@ -179,6 +180,13 @@ public final class CruiseControlParametersConfig {
public static final String DEFAULT_RIGHTSIZE_PARAMETERS_CLASS = RightsizeParameters.class.getName();
public static final String RIGHTSIZE_PARAMETERS_CLASS_DOC = "The class for parameters of a provision rightsize request.";

/**
* <code>remove.disks.parameters.class</code>
*/
public static final String REMOVE_DISKS_PARAMETERS_CLASS_CONFIG = "remove.disks.parameters.class";
public static final String DEFAULT_REMOVE_DISKS_PARAMETERS_CLASS = RemoveDisksParameters.class.getName();
public static final String REMOVE_DISKS_PARAMETERS_CLASS_DOC = "The class for parameters of a disks removal request.";

private CruiseControlParametersConfig() {
}

Expand Down Expand Up @@ -293,6 +301,11 @@ public static ConfigDef define(ConfigDef configDef) {
ConfigDef.Type.CLASS,
DEFAULT_RIGHTSIZE_PARAMETERS_CLASS,
ConfigDef.Importance.MEDIUM,
RIGHTSIZE_PARAMETERS_CLASS_DOC);
RIGHTSIZE_PARAMETERS_CLASS_DOC)
.define(REMOVE_DISKS_PARAMETERS_CLASS_CONFIG,
ConfigDef.Type.CLASS,
DEFAULT_REMOVE_DISKS_PARAMETERS_CLASS,
ConfigDef.Importance.MEDIUM,
REMOVE_DISKS_PARAMETERS_CLASS_DOC);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,18 @@

package com.linkedin.kafka.cruisecontrol.config.constants;

import com.linkedin.kafka.cruisecontrol.servlet.handler.async.AddBrokerRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.ClusterLoadRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.CruiseControlStateRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.DemoteRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.FixOfflineReplicasRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.PartitionLoadRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.ProposalsRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.RebalanceRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.CruiseControlStateRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.TopicConfigurationRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.AddBrokerRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.RemoveBrokerRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.RemoveDisksRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.DemoteRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.RebalanceRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.FixOfflineReplicasRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.sync.RightsizeRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.TopicConfigurationRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.sync.AdminRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.sync.BootstrapRequest;
import com.linkedin.kafka.cruisecontrol.servlet.handler.sync.KafkaClusterStateRequest;
Expand Down Expand Up @@ -181,6 +182,13 @@ public final class CruiseControlRequestConfig {
public static final String DEFAULT_RIGHTSIZE_REQUEST_CLASS = RightsizeRequest.class.getName();
public static final String RIGHTSIZE_REQUEST_CLASS_DOC = "The class to handle a provision rightsize request.";

/**
* <code>remove.disks.request.class</code>
*/
public static final String REMOVE_DISKS_REQUEST_CLASS_CONFIG = "remove.disks.request.class";
public static final String DEFAULT_REMOVE_DISKS_REQUEST_CLASS = RemoveDisksRequest.class.getName();
public static final String REMOVE_DISKS_REQUEST_CLASS_DOC = "The class to handle a disks removal request.";

private CruiseControlRequestConfig() {
}

Expand Down Expand Up @@ -295,6 +303,11 @@ public static ConfigDef define(ConfigDef configDef) {
ConfigDef.Type.CLASS,
DEFAULT_RIGHTSIZE_REQUEST_CLASS,
ConfigDef.Importance.MEDIUM,
RIGHTSIZE_REQUEST_CLASS_DOC);
RIGHTSIZE_REQUEST_CLASS_DOC)
.define(REMOVE_DISKS_REQUEST_CLASS_CONFIG,
ConfigDef.Type.CLASS,
DEFAULT_REMOVE_DISKS_REQUEST_CLASS,
ConfigDef.Importance.MEDIUM,
REMOVE_DISKS_REQUEST_CLASS_DOC);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,13 @@ public void setState(Disk.State newState) {
}
}

/**
* Set disk capacity to 0 to mark it for removal.
*/
public void markDiskForRemoval() {
_capacity = 0;
}

/**
* Add replica to the disk.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ public enum CruiseControlEndPoint implements EndPoint {
ADMIN(CRUISE_CONTROL_ADMIN),
REVIEW(CRUISE_CONTROL_ADMIN),
TOPIC_CONFIGURATION(KAFKA_ADMIN),
RIGHTSIZE(KAFKA_ADMIN);
RIGHTSIZE(KAFKA_ADMIN),
REMOVE_DISKS(KAFKA_ADMIN);

private static final List<CruiseControlEndPoint> CACHED_VALUES = List.of(values());
private static final List<CruiseControlEndPoint> GET_ENDPOINTS = Arrays.asList(BOOTSTRAP,
Expand All @@ -57,7 +58,8 @@ public enum CruiseControlEndPoint implements EndPoint {
ADMIN,
REVIEW,
TOPIC_CONFIGURATION,
RIGHTSIZE);
RIGHTSIZE,
REMOVE_DISKS);

private final EndpointType _endpointType;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ public final class KafkaCruiseControlServletUtils {
RequestParameterWrapper rightsize = new RequestParameterWrapper(RIGHTSIZE_PARAMETERS_CLASS_CONFIG,
RIGHTSIZE_PARAMETER_OBJECT_CONFIG,
RIGHTSIZE_REQUEST_CLASS_CONFIG);
RequestParameterWrapper removeDisks = new RequestParameterWrapper(REMOVE_DISKS_PARAMETERS_CLASS_CONFIG,
REMOVE_DISKS_PARAMETER_OBJECT_CONFIG,
REMOVE_DISKS_REQUEST_CLASS_CONFIG);

requestParameterConfigs.put(BOOTSTRAP, bootstrap);
requestParameterConfigs.put(TRAIN, train);
Expand All @@ -138,6 +141,7 @@ public final class KafkaCruiseControlServletUtils {
requestParameterConfigs.put(REVIEW_BOARD, reviewBoard);
requestParameterConfigs.put(TOPIC_CONFIGURATION, topicConfiguration);
requestParameterConfigs.put(RIGHTSIZE, rightsize);
requestParameterConfigs.put(REMOVE_DISKS, removeDisks);

REQUEST_PARAMETER_CONFIGS = Collections.unmodifiableMap(requestParameterConfigs);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2022 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information.
*/

package com.linkedin.kafka.cruisecontrol.servlet.handler.async;

import com.linkedin.kafka.cruisecontrol.servlet.handler.async.runnable.OperationFuture;
import com.linkedin.kafka.cruisecontrol.servlet.handler.async.runnable.RemoveDisksRunnable;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.RemoveDisksParameters;
import java.util.Map;

import static com.linkedin.cruisecontrol.common.utils.Utils.validateNotNull;
import static com.linkedin.kafka.cruisecontrol.servlet.parameters.ParameterUtils.REMOVE_DISKS_PARAMETER_OBJECT_CONFIG;

public class RemoveDisksRequest extends AbstractAsyncRequest {
protected RemoveDisksParameters _parameters;

public RemoveDisksRequest() {
super();
}

@Override
protected OperationFuture handle(String uuid) {
OperationFuture future = new OperationFuture("Remove disks");
pending(future.operationProgress());
_asyncKafkaCruiseControl.sessionExecutor().submit(new RemoveDisksRunnable(_asyncKafkaCruiseControl, future, _parameters, uuid));
return future;
}

@Override
public RemoveDisksParameters parameters() {
return _parameters;
}

@Override
public String name() {
return RemoveDisksRequest.class.getSimpleName();
}

@Override
public void configure(Map<String, ?> configs) {
super.configure(configs);
_parameters = (RemoveDisksParameters) validateNotNull(configs.get(REMOVE_DISKS_PARAMETER_OBJECT_CONFIG),
"Parameter configuration is missing from the request.");
}
}
Loading