Skip to content

Commit

Permalink
Udpate some unit and integration tests to use KRaft (#10394)
Browse files Browse the repository at this point in the history
Signed-off-by: Jakub Scholz <www@scholzj.com>
  • Loading branch information
scholzj authored Jul 29, 2024
1 parent 9166997 commit a6531bc
Show file tree
Hide file tree
Showing 7 changed files with 440 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,30 @@ public static String kafkaComponentName(String clusterName) {
}

/**
* Returns the name of the Kafka {@code Pod} for a {@code Kafka} cluster of the given name.
* @param clusterName The {@code metadata.name} of the {@code Kafka} resource.
* @param podNum The number of the Kafka pod
* Returns the name of the Kafka {@code Pod} for a {@code Kafka} cluster not using {@code KafkaNodePool} resources.
*
* @param clusterName The {@code metadata.name} of the {@code Kafka} resource.
* @param podNum The ordinal number of the Kafka pod
*
* @return The name of the corresponding Kafka {@code Pod}.
*/
public static String kafkaPodName(String clusterName, int podNum) {
return kafkaComponentName(clusterName) + "-" + podNum;
}

/**
* Returns the name of the Kafka {@code Pod} for a {@code Kafka} cluster using {@code KafkaNodePool} resources.
*
* @param clusterName The {@code metadata.name} of the {@code Kafka} resource
* @param nodePoolName The {@code metadata.name} of the {@code KafkaNodePool} resource
* @param podNum The ordinal number of the Kafka pod
*
* @return The name of the corresponding Kafka {@code Pod}.
*/
public static String kafkaPodName(String clusterName, String nodePoolName, int podNum) {
return clusterName + "-" + nodePoolName + "-" + podNum;
}

/**
* Returns the name of the internal bootstrap {@code Service} for a {@code Kafka} cluster of the given name.
* @param clusterName The {@code metadata.name} of the {@code Kafka} resource.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
import io.strimzi.api.kafka.model.kafka.SingleVolumeStorage;
import io.strimzi.api.kafka.model.kafka.listener.GenericKafkaListenerBuilder;
import io.strimzi.api.kafka.model.kafka.listener.KafkaListenerType;
import io.strimzi.api.kafka.model.nodepool.KafkaNodePool;
import io.strimzi.api.kafka.model.nodepool.KafkaNodePoolBuilder;
import io.strimzi.api.kafka.model.nodepool.ProcessRoles;
import io.strimzi.operator.cluster.ClusterOperatorConfig;
import io.strimzi.operator.cluster.KafkaVersionTestUtils;
import io.strimzi.operator.cluster.PlatformFeaturesAvailability;
Expand Down Expand Up @@ -49,6 +52,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

Expand All @@ -57,7 +61,8 @@

@ExtendWith(VertxExtension.class)
public class JbodStorageMockTest {
private static final String NAME = "my-kafka";
private static final String NAME = "my-cluster";
private static final String NODE_POOL_NAME = "mixed";
private static final KafkaVersion.Lookup VERSIONS = KafkaVersionTestUtils.getKafkaVersionLookup();

private static Vertx vertx;
Expand All @@ -66,7 +71,7 @@ public class JbodStorageMockTest {
private static MockKube3 mockKube;

private String namespace = "test-jbod-storage";
private Kafka kafka;
private KafkaNodePool kafkaNodePool;
private KafkaAssemblyOperator operator;
private StrimziPodSetController podSetController;

Expand Down Expand Up @@ -116,41 +121,47 @@ public void beforeEach(TestInfo testInfo) {
.withDeleteClaim(false)
.withSize("100Gi").build());

this.kafka = new KafkaBuilder()
Kafka kafka = new KafkaBuilder()
.withNewMetadata()
.withNamespace(namespace)
.withName(NAME)
.withAnnotations(Map.of(Annotations.ANNO_STRIMZI_IO_NODE_POOLS, "enabled", Annotations.ANNO_STRIMZI_IO_KRAFT, "enabled"))
.endMetadata()
.withNewSpec()
.withNewKafka()
.withReplicas(3)
.withListeners(new GenericKafkaListenerBuilder()
.withName("plain")
.withPort(9092)
.withType(KafkaListenerType.INTERNAL)
.withTls(false)
.build())
.withNewJbodStorage()
.withVolumes(volumes)
.endJbodStorage()
.endKafka()
.withNewZookeeper()
.withReplicas(1)
.withNewEphemeralStorage()
.endEphemeralStorage()
.endZookeeper()
.endSpec()
.build();

Crds.kafkaOperation(client).inNamespace(namespace).resource(kafka).create();

this.kafkaNodePool = new KafkaNodePoolBuilder()
.withNewMetadata()
.withName(NODE_POOL_NAME)
.withNamespace(namespace)
.withLabels(Map.of(Labels.STRIMZI_CLUSTER_LABEL, NAME))
.withGeneration(1L)
.endMetadata()
.withNewSpec()
.withReplicas(3)
.withNewJbodStorage()
.withVolumes(volumes)
.endJbodStorage()
.withRoles(ProcessRoles.CONTROLLER, ProcessRoles.BROKER)
.endSpec()
.build();
Crds.kafkaNodePoolOperation(client).inNamespace(namespace).resource(kafkaNodePool).create();

PlatformFeaturesAvailability pfa = new PlatformFeaturesAvailability(false, KubernetesVersion.MINIMAL_SUPPORTED_VERSION);
// creating the Kafka operator
ResourceOperatorSupplier ros =
new ResourceOperatorSupplier(JbodStorageMockTest.vertx, client,
ResourceUtils.zookeeperLeaderFinder(JbodStorageMockTest.vertx, client),
ResourceUtils.adminClientProvider(), ResourceUtils.zookeeperScalerProvider(), ResourceUtils.kafkaAgentClientProvider(),
ResourceUtils.metricsProvider(), ResourceUtils.zooKeeperAdminProvider(), pfa, 60_000L);
new ResourceOperatorSupplier(vertx, client, null, ResourceUtils.adminClientProvider(), null,
ResourceUtils.kafkaAgentClientProvider(), ResourceUtils.metricsProvider(), null, pfa, 60_000L);

podSetController = new StrimziPodSetController(namespace, Labels.EMPTY, ros.kafkaOperator, ros.connectOperator, ros.mirrorMaker2Operator, ros.strimziPodSetOperator, ros.podOperations, ros.metricsProvider, Integer.parseInt(ClusterOperatorConfig.POD_SET_CONTROLLER_WORK_QUEUE_SIZE.defaultValue()));
podSetController.start();
Expand All @@ -172,11 +183,10 @@ public void testJbodStorageCreatesPVCsMatchingKafkaVolumes(VertxTestContext cont
.onComplete(context.succeeding(v -> context.verify(() -> {
List<PersistentVolumeClaim> pvcs = getPvcs();

for (int i = 0; i < this.kafka.getSpec().getKafka().getReplicas(); i++) {
for (int i = 0; i < this.kafkaNodePool.getSpec().getReplicas(); i++) {
for (SingleVolumeStorage volume : this.volumes) {
if (volume instanceof PersistentClaimStorage) {

String expectedPvcName = VolumeUtils.createVolumePrefix(volume.getId(), true) + "-" + KafkaResources.kafkaPodName(NAME, i);
String expectedPvcName = VolumeUtils.createVolumePrefix(volume.getId(), true) + "-" + KafkaResources.kafkaPodName(NAME, NODE_POOL_NAME, i);
List<PersistentVolumeClaim> matchingPvcs = pvcs.stream()
.filter(pvc -> pvc.getMetadata().getName().equals(expectedPvcName))
.collect(Collectors.toList());
Expand Down Expand Up @@ -208,16 +218,14 @@ public void testReconcileWithNewVolumeAddedToJbodStorage(VertxTestContext contex
.withDeleteClaim(false)
.withSize("100Gi").build());

Kafka kafkaWithNewJbodVolume = new KafkaBuilder(kafka)
KafkaNodePool kafkaNodePoolWithNewJbodVolume = new KafkaNodePoolBuilder(kafkaNodePool)
.editSpec()
.editKafka()
.withStorage(new JbodStorageBuilder().withVolumes(volumes).build())
.endKafka()
.withStorage(new JbodStorageBuilder().withVolumes(volumes).build())
.endSpec()
.build();

Set<String> expectedPvcs = expectedPvcs(kafka);
Set<String> expectedPvcsWithNewJbodStorageVolume = expectedPvcs(kafkaWithNewJbodVolume);
Set<String> expectedPvcs = expectedPvcs(kafkaNodePool);
Set<String> expectedPvcsWithNewJbodStorageVolume = expectedPvcs(kafkaNodePoolWithNewJbodVolume);

// reconcile for kafka cluster creation
operator.reconcile(new Reconciliation("test-trigger", Kafka.RESOURCE_KIND, namespace, NAME))
Expand All @@ -227,7 +235,7 @@ public void testReconcileWithNewVolumeAddedToJbodStorage(VertxTestContext contex
assertThat(pvcsNames, is(expectedPvcs));
})))
.compose(v -> {
Crds.kafkaOperation(client).inNamespace(namespace).withName(NAME).patch(kafkaWithNewJbodVolume);
Crds.kafkaNodePoolOperation(client).inNamespace(namespace).withName(NODE_POOL_NAME).patch(kafkaNodePoolWithNewJbodVolume);
// reconcile kafka cluster with new Jbod storage
return operator.reconcile(new Reconciliation("test-trigger", Kafka.RESOURCE_KIND, namespace, NAME));
})
Expand All @@ -248,16 +256,14 @@ public void testReconcileWithVolumeRemovedFromJbodStorage(VertxTestContext conte
// remove a volume from the Jbod Storage
volumes.remove(0);

Kafka kafkaWithRemovedJbodVolume = new KafkaBuilder(this.kafka)
KafkaNodePool kafkaNodePoolWithNewJbodVolume = new KafkaNodePoolBuilder(kafkaNodePool)
.editSpec()
.editKafka()
.withStorage(new JbodStorageBuilder().withVolumes(volumes).build())
.endKafka()
.withStorage(new JbodStorageBuilder().withVolumes(volumes).build())
.endSpec()
.build();

Set<String> expectedPvcs = expectedPvcs(kafka);
Set<String> expectedPvcsWithRemovedJbodStorageVolume = expectedPvcs(kafkaWithRemovedJbodVolume);
Set<String> expectedPvcs = expectedPvcs(kafkaNodePool);
Set<String> expectedPvcsWithRemovedJbodStorageVolume = expectedPvcs(kafkaNodePoolWithNewJbodVolume);

// reconcile for kafka cluster creation
operator.reconcile(new Reconciliation("test-trigger", Kafka.RESOURCE_KIND, namespace, NAME))
Expand All @@ -267,7 +273,7 @@ public void testReconcileWithVolumeRemovedFromJbodStorage(VertxTestContext conte
assertThat(pvcsNames, is(expectedPvcs));
})))
.compose(v -> {
Crds.kafkaOperation(client).inNamespace(namespace).withName(NAME).patch(kafkaWithRemovedJbodVolume);
Crds.kafkaNodePoolOperation(client).inNamespace(namespace).withName(NODE_POOL_NAME).patch(kafkaNodePoolWithNewJbodVolume);
// reconcile kafka cluster with a Jbod storage volume removed
return operator.reconcile(new Reconciliation("test-trigger2", Kafka.RESOURCE_KIND, namespace, NAME));
})
Expand All @@ -286,16 +292,14 @@ public void testReconcileWithUpdateVolumeIdJbod(VertxTestContext context) {
// trying to update id for a volume from in the JBOD storage
volumes.get(0).setId(3);

Kafka kafkaWithUpdatedJbodVolume = new KafkaBuilder(this.kafka)
KafkaNodePool kafkaNodePoolWithNewJbodVolume = new KafkaNodePoolBuilder(kafkaNodePool)
.editSpec()
.editKafka()
.withStorage(new JbodStorageBuilder().withVolumes(volumes).build())
.endKafka()
.withStorage(new JbodStorageBuilder().withVolumes(volumes).build())
.endSpec()
.build();

Set<String> expectedPvcs = expectedPvcs(kafka);
Set<String> expectedPvcsWithUpdatedJbodStorageVolume = expectedPvcs(kafkaWithUpdatedJbodVolume);
Set<String> expectedPvcs = expectedPvcs(kafkaNodePool);
Set<String> expectedPvcsWithUpdatedJbodStorageVolume = expectedPvcs(kafkaNodePoolWithNewJbodVolume);

// reconcile for kafka cluster creation
operator.reconcile(new Reconciliation("test-trigger", Kafka.RESOURCE_KIND, namespace, NAME))
Expand All @@ -305,7 +309,7 @@ public void testReconcileWithUpdateVolumeIdJbod(VertxTestContext context) {
assertThat(pvcsNames, is(expectedPvcs));
})))
.compose(v -> {
Crds.kafkaOperation(client).inNamespace(namespace).withName(NAME).patch(kafkaWithUpdatedJbodVolume);
Crds.kafkaNodePoolOperation(client).inNamespace(namespace).withName(NODE_POOL_NAME).patch(kafkaNodePoolWithNewJbodVolume);
// reconcile kafka cluster with a Jbod storage volume removed
return operator.reconcile(new Reconciliation("test-trigger2", Kafka.RESOURCE_KIND, namespace, NAME));
})
Expand All @@ -317,22 +321,20 @@ public void testReconcileWithUpdateVolumeIdJbod(VertxTestContext context) {
})));
}

private Set<String> expectedPvcs(Kafka kafka) {
private Set<String> expectedPvcs(KafkaNodePool nodePool) {
Set<String> expectedPvcs = new HashSet<>();
for (int i = 0; i < kafka.getSpec().getKafka().getReplicas(); i++) {
for (SingleVolumeStorage volume : ((JbodStorage) kafka.getSpec().getKafka().getStorage()).getVolumes()) {
for (int i = 0; i < nodePool.getSpec().getReplicas(); i++) {
for (SingleVolumeStorage volume : ((JbodStorage) nodePool.getSpec().getStorage()).getVolumes()) {
if (volume instanceof PersistentClaimStorage) {
expectedPvcs.add(VolumeUtils.DATA_VOLUME_NAME + "-" + volume.getId() + "-"
+ KafkaResources.kafkaPodName(NAME, i));
expectedPvcs.add(VolumeUtils.DATA_VOLUME_NAME + "-" + volume.getId() + "-" + KafkaResources.kafkaPodName(NAME, NODE_POOL_NAME, i));
}
}
}
return expectedPvcs;
}

private List<PersistentVolumeClaim> getPvcs() {
String kafkaStsName = KafkaResources.kafkaComponentName(JbodStorageMockTest.NAME);
Labels pvcSelector = Labels.forStrimziCluster(JbodStorageMockTest.NAME).withStrimziKind(Kafka.RESOURCE_KIND).withStrimziName(kafkaStsName);
Labels pvcSelector = Labels.forStrimziCluster(NAME).withStrimziKind(Kafka.RESOURCE_KIND).withStrimziName(KafkaResources.kafkaComponentName(NAME));
return client.persistentVolumeClaims()
.inNamespace(namespace)
.withLabels(pvcSelector.toMap())
Expand Down
Loading

0 comments on commit a6531bc

Please sign in to comment.