Skip to content

Commit

Permalink
Fixed over utilisation formula
Browse files Browse the repository at this point in the history
Added unit tests for intra-broker disk capacity goal
  • Loading branch information
ilievladiulian committed Aug 22, 2023
1 parent ab739e1 commit b1df366
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -264,11 +264,10 @@ public ModelCompletenessRequirements clusterModelCompletenessRequirements() {
* @return {@code true} if utilization is over the limit, {@code false} otherwise.
*/
private boolean isUtilizationOverLimit(Disk disk) {
boolean diskUtilizationValid = true;
if (_shouldEmptyZeroCapacityDisks) {
diskUtilizationValid = disk.utilization() > 0;
if (_shouldEmptyZeroCapacityDisks && disk.capacity() == 0) {
return disk.utilization() > 0 || disk.replicas().size() > 0;
}
return diskUtilizationValid && disk.utilization() > disk.capacity() * _balancingConstraint.capacityThreshold(RESOURCE);
return disk.utilization() > disk.capacity() * _balancingConstraint.capacityThreshold(RESOURCE);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Copyright 2023 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information.
*/

package com.linkedin.kafka.cruisecontrol.analyzer;

import com.codahale.metrics.MetricRegistry;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUnitTestUtils;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.IntraBrokerDiskCapacityGoal;
import com.linkedin.kafka.cruisecontrol.async.progress.OperationProgress;
import com.linkedin.kafka.cruisecontrol.common.Resource;
import com.linkedin.kafka.cruisecontrol.common.TestConstants;
import com.linkedin.kafka.cruisecontrol.config.BrokerCapacityInfo;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.exception.KafkaCruiseControlException;
import com.linkedin.kafka.cruisecontrol.exception.OptimizationFailureException;
import com.linkedin.kafka.cruisecontrol.executor.ExecutionProposal;
import com.linkedin.kafka.cruisecontrol.executor.Executor;
import com.linkedin.kafka.cruisecontrol.model.ClusterModel;
import com.linkedin.kafka.cruisecontrol.monitor.ModelGeneration;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.SystemTime;
import org.easymock.EasyMock;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;

import static com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUnitTestUtils.getAggregatedMetricValues;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;

@RunWith(Parameterized.class)
public class RemoveDisksTest {
private final Goal _goalToTest;
private final ClusterModel _clusterModel;
private final boolean _expectedToOptimize;
private final KafkaCruiseControlConfig _kafkaCruiseControlConfig;

public RemoveDisksTest(Goal goal, ClusterModel clusterModel, KafkaCruiseControlConfig config, boolean expectedToOptimize) {
_goalToTest = goal;
_clusterModel = clusterModel;
_expectedToOptimize = expectedToOptimize;
_kafkaCruiseControlConfig = config;
}

/**
* Populate parameters for the {@link OptimizationVerifier}.
*
* @return Parameters for the {@link OptimizationVerifier}.
*/
@Parameterized.Parameters(name = "{0}")
public static Collection<Object[]> buildParameters() {
List<Object[]> parameters = new ArrayList<>();

final Properties props = KafkaCruiseControlUnitTestUtils.getKafkaCruiseControlProperties();
final KafkaCruiseControlConfig kafkaCruiseControlConfig = new KafkaCruiseControlConfig(props);
final BalancingConstraint balancingConstraint = new BalancingConstraint(kafkaCruiseControlConfig);

// running 3 tests: one for medium utilization, one for 0 utilization, and one for max utilization.
// max utilization should not be moved by the intra-broker disk capacity goal.
List<Double> replicaLoadList = Arrays.asList(
TestConstants.LARGE_BROKER_CAPACITY / 4,
0.0,
TestConstants.LARGE_BROKER_CAPACITY / 2 * balancingConstraint.capacityThreshold(Resource.DISK)
);
for (Double replicaLoad : replicaLoadList) {
final Map<Integer, String> brokerToRack = Map.of(0, "A::0",
1, "B::0",
2, "C::1");
final ClusterModel cluster = new ClusterModel(new ModelGeneration(0, 0L), 1.0);
BrokerCapacityInfo commonBrokerCapacityInfo = new BrokerCapacityInfo(TestConstants.BROKER_CAPACITY, TestConstants.DISK_CAPACITY);
brokerToRack.values().stream().distinct().forEach(cluster::createRack);
brokerToRack.forEach((broker, rack) -> cluster.createBroker(
rack, Integer.toString(broker), broker,
commonBrokerCapacityInfo,
commonBrokerCapacityInfo.diskCapacityByLogDir() != null
));

// populate the cluster with a set of replicas
TopicPartition tp = new TopicPartition("topic", 0);
cluster.createReplica(brokerToRack.get(0), 0, tp, 0, true, false, TestConstants.LOGDIR0, false);

// create snapshots and push them to the cluster.
List<Long> windows = Collections.singletonList(1L);
cluster.setReplicaLoad(brokerToRack.get(0), 0, tp, getAggregatedMetricValues(40.0, 100.0, 130.0, replicaLoad), windows);

parameters.add(new Object[]{
new IntraBrokerDiskCapacityGoal(true),
cluster,
kafkaCruiseControlConfig,
replicaLoad != TestConstants.LARGE_BROKER_CAPACITY / 2 * balancingConstraint.capacityThreshold(Resource.DISK)
});
}

return parameters;
}

@Test
public void testRemoveDisks() throws KafkaCruiseControlException {
// mark disk 0 of broker 0 for removal
_clusterModel.broker(0).disk(TestConstants.LOGDIR0).markDiskForRemoval();

List<Goal> goalsByPriority = Collections.singletonList(_goalToTest);
_goalToTest.configure(_kafkaCruiseControlConfig.mergedConfigValues());
GoalOptimizer goalOptimizer = new GoalOptimizer(_kafkaCruiseControlConfig,
null,
new SystemTime(),
new MetricRegistry(),
EasyMock.mock(Executor.class),
EasyMock.mock(AdminClient.class));

if (_expectedToOptimize) {
final Set<ExecutionProposal> proposals =
goalOptimizer.optimizations(_clusterModel, goalsByPriority, new OperationProgress()).goalProposals();
assertEquals(1, proposals.size());
assertEquals(1, proposals.iterator().next().replicasToMoveBetweenDisksByBroker().size());
assertEquals(0, proposals.iterator().next().replicasToMoveBetweenDisksByBroker().get(0).brokerId().intValue());
assertEquals(TestConstants.LOGDIR1, proposals.iterator().next().replicasToMoveBetweenDisksByBroker().get(0).logdir());
} else {
assertThrows(OptimizationFailureException.class,
() -> goalOptimizer.optimizations(_clusterModel, goalsByPriority, new OperationProgress()));
}
}
}

0 comments on commit b1df366

Please sign in to comment.