Skip to content

Commit

Permalink
Add support for Kafka 3.8.0 (#10343)
Browse files Browse the repository at this point in the history
Signed-off-by: Jakub Scholz <www@scholzj.com>
  • Loading branch information
scholzj authored Jul 27, 2024
1 parent 363caf0 commit 9ce1d77
Show file tree
Hide file tree
Showing 53 changed files with 302 additions and 174 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## 0.43.0

* Add support for Apache Kafka 3.8.0.
Remove support for Apache Kafka 3.6.0, 3.6.1, and 3.6.2.
* Added alerts for Connectors/Tasks in failed state.
* Additional OAuth configuration options have been added for 'oauth' authentication on the listener and the client.
On the listener `serverBearerTokenLocation` and `userNamePrefix` have been added.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,25 @@ public class KafkaVersionTestUtils {

private static final Set<String> SUPPORTED_VERSIONS = new KafkaVersion.Lookup(Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap()).supportedVersions();

public static final String LATEST_KAFKA_VERSION = "3.7.1";
public static final String LATEST_FORMAT_VERSION = "3.7";
public static final String LATEST_PROTOCOL_VERSION = "3.7";
public static final String LATEST_METADATA_VERSION = "3.7-IV4";
public static final String LATEST_KAFKA_VERSION = "3.8.0";
public static final String LATEST_FORMAT_VERSION = "3.8";
public static final String LATEST_PROTOCOL_VERSION = "3.8";
public static final String LATEST_METADATA_VERSION = "3.8-IV0";
public static final String LATEST_ZOOKEEPER_VERSION = "3.8.4";
public static final String LATEST_CHECKSUM = "ABCD1234";
public static final String LATEST_THIRD_PARTY_VERSION = "3.7.x";
public static final String LATEST_THIRD_PARTY_VERSION = "3.8.x";
public static final String LATEST_KAFKA_IMAGE = KAFKA_IMAGE_STR + LATEST_KAFKA_VERSION;
public static final String LATEST_KAFKA_CONNECT_IMAGE = KAFKA_CONNECT_IMAGE_STR + LATEST_KAFKA_VERSION;
public static final String LATEST_KAFKA_MIRROR_MAKER_IMAGE = KAFKA_MIRROR_MAKER_IMAGE_STR + LATEST_KAFKA_VERSION;
public static final String LATEST_KAFKA_MIRROR_MAKER_2_IMAGE = KAFKA_MIRROR_MAKER_2_IMAGE_STR + LATEST_KAFKA_VERSION;

public static final String PREVIOUS_KAFKA_VERSION = "3.6.1";
public static final String PREVIOUS_FORMAT_VERSION = "3.6";
public static final String PREVIOUS_PROTOCOL_VERSION = "3.6";
public static final String PREVIOUS_METADATA_VERSION = "3.6-IV2";
public static final String PREVIOUS_ZOOKEEPER_VERSION = "3.8.3";
public static final String PREVIOUS_KAFKA_VERSION = "3.7.1";
public static final String PREVIOUS_FORMAT_VERSION = "3.7";
public static final String PREVIOUS_PROTOCOL_VERSION = "3.7";
public static final String PREVIOUS_METADATA_VERSION = "3.7-IV4";
public static final String PREVIOUS_ZOOKEEPER_VERSION = "3.8.4";
public static final String PREVIOUS_CHECKSUM = "ABCD1234";
public static final String PREVIOUS_THIRD_PARTY_VERSION = "3.6.x";
public static final String PREVIOUS_THIRD_PARTY_VERSION = "3.7.x";
public static final String PREVIOUS_KAFKA_IMAGE = KAFKA_IMAGE_STR + PREVIOUS_KAFKA_VERSION;
public static final String PREVIOUS_KAFKA_CONNECT_IMAGE = KAFKA_CONNECT_IMAGE_STR + PREVIOUS_KAFKA_VERSION;
public static final String PREVIOUS_KAFKA_MIRROR_MAKER_IMAGE = KAFKA_MIRROR_MAKER_IMAGE_STR + PREVIOUS_KAFKA_VERSION;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,14 @@
import io.strimzi.test.annotations.ParallelTest;
import org.junit.jupiter.api.Assertions;

import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.oneOf;

@ParallelSuite
public class KafkaConfigurationTests {
Expand All @@ -33,7 +36,7 @@ public void unknownConfigIsNotAnError() {

private void assertNoError(String foo, Object value) {
KafkaConfiguration kafkaConfiguration = new KafkaConfiguration(Reconciliation.DUMMY_RECONCILIATION, singletonMap(foo, value).entrySet());
kafkaConfiguration.validate(kafkaVersion);
assertThat(kafkaConfiguration.validate(kafkaVersion), is(oneOf(emptyList(), nullValue())));
}

@ParallelTest
Expand Down Expand Up @@ -89,7 +92,7 @@ public void passwordType() {
@ParallelTest
public void invalidVersion() {
assertConfigError("inter.broker.protocol.version", "dclncswn",
"inter.broker.protocol.version has value 'dclncswn' which does not match the required pattern: \\Q0.8.0\\E(\\.[0-9]+)*|\\Q0.8.0\\E|\\Q0.8.1\\E(\\.[0-9]+)*|\\Q0.8.1\\E|\\Q0.8.2\\E(\\.[0-9]+)*|\\Q0.8.2\\E|\\Q0.9.0\\E(\\.[0-9]+)*|\\Q0.9.0\\E|\\Q0.10.0\\E(\\.[0-9]+)*|\\Q0.10.0-IV0\\E|\\Q0.10.0-IV1\\E|\\Q0.10.1\\E(\\.[0-9]+)*|\\Q0.10.1-IV0\\E|\\Q0.10.1-IV1\\E|\\Q0.10.1-IV2\\E|\\Q0.10.2\\E(\\.[0-9]+)*|\\Q0.10.2-IV0\\E|\\Q0.11.0\\E(\\.[0-9]+)*|\\Q0.11.0-IV0\\E|\\Q0.11.0-IV1\\E|\\Q0.11.0-IV2\\E|\\Q1.0\\E(\\.[0-9]+)*|\\Q1.0-IV0\\E|\\Q1.1\\E(\\.[0-9]+)*|\\Q1.1-IV0\\E|\\Q2.0\\E(\\.[0-9]+)*|\\Q2.0-IV0\\E|\\Q2.0-IV1\\E|\\Q2.1\\E(\\.[0-9]+)*|\\Q2.1-IV0\\E|\\Q2.1-IV1\\E|\\Q2.1-IV2\\E|\\Q2.2\\E(\\.[0-9]+)*|\\Q2.2-IV0\\E|\\Q2.2-IV1\\E|\\Q2.3\\E(\\.[0-9]+)*|\\Q2.3-IV0\\E|\\Q2.3-IV1\\E|\\Q2.4\\E(\\.[0-9]+)*|\\Q2.4-IV0\\E|\\Q2.4-IV1\\E|\\Q2.5\\E(\\.[0-9]+)*|\\Q2.5-IV0\\E|\\Q2.6\\E(\\.[0-9]+)*|\\Q2.6-IV0\\E|\\Q2.7\\E(\\.[0-9]+)*|\\Q2.7-IV0\\E|\\Q2.7-IV1\\E|\\Q2.7-IV2\\E|\\Q2.8\\E(\\.[0-9]+)*|\\Q2.8-IV0\\E|\\Q2.8-IV1\\E|\\Q3.0\\E(\\.[0-9]+)*|\\Q3.0-IV0\\E|\\Q3.0-IV1\\E|\\Q3.1\\E(\\.[0-9]+)*|\\Q3.1-IV0\\E|\\Q3.2\\E(\\.[0-9]+)*|\\Q3.2-IV0\\E|\\Q3.3\\E(\\.[0-9]+)*|\\Q3.3-IV0\\E|\\Q3.3-IV1\\E|\\Q3.3-IV2\\E|\\Q3.3-IV3\\E|\\Q3.4\\E(\\.[0-9]+)*|\\Q3.4-IV0\\E|\\Q3.5\\E(\\.[0-9]+)*|\\Q3.5-IV0\\E|\\Q3.5-IV1\\E|\\Q3.5-IV2\\E|\\Q3.6\\E(\\.[0-9]+)*|\\Q3.6-IV0\\E|\\Q3.6-IV1\\E|\\Q3.6-IV2\\E|\\Q3.7\\E(\\.[0-9]+)*|\\Q3.7-IV0\\E|\\Q3.7-IV1\\E|\\Q3.7-IV2\\E|\\Q3.7-IV3\\E|\\Q3.7-IV4\\E|\\Q3.8\\E(\\.[0-9]+)*|\\Q3.8-IV0\\E");
"inter.broker.protocol.version has value 'dclncswn' which does not match the required pattern: \\Q0.8.0\\E(\\.[0-9]+)*|\\Q0.8.0\\E|\\Q0.8.1\\E(\\.[0-9]+)*|\\Q0.8.1\\E|\\Q0.8.2\\E(\\.[0-9]+)*|\\Q0.8.2\\E|\\Q0.9.0\\E(\\.[0-9]+)*|\\Q0.9.0\\E|\\Q0.10.0\\E(\\.[0-9]+)*|\\Q0.10.0-IV0\\E|\\Q0.10.0-IV1\\E|\\Q0.10.1\\E(\\.[0-9]+)*|\\Q0.10.1-IV0\\E|\\Q0.10.1-IV1\\E|\\Q0.10.1-IV2\\E|\\Q0.10.2\\E(\\.[0-9]+)*|\\Q0.10.2-IV0\\E|\\Q0.11.0\\E(\\.[0-9]+)*|\\Q0.11.0-IV0\\E|\\Q0.11.0-IV1\\E|\\Q0.11.0-IV2\\E|\\Q1.0\\E(\\.[0-9]+)*|\\Q1.0-IV0\\E|\\Q1.1\\E(\\.[0-9]+)*|\\Q1.1-IV0\\E|\\Q2.0\\E(\\.[0-9]+)*|\\Q2.0-IV0\\E|\\Q2.0-IV1\\E|\\Q2.1\\E(\\.[0-9]+)*|\\Q2.1-IV0\\E|\\Q2.1-IV1\\E|\\Q2.1-IV2\\E|\\Q2.2\\E(\\.[0-9]+)*|\\Q2.2-IV0\\E|\\Q2.2-IV1\\E|\\Q2.3\\E(\\.[0-9]+)*|\\Q2.3-IV0\\E|\\Q2.3-IV1\\E|\\Q2.4\\E(\\.[0-9]+)*|\\Q2.4-IV0\\E|\\Q2.4-IV1\\E|\\Q2.5\\E(\\.[0-9]+)*|\\Q2.5-IV0\\E|\\Q2.6\\E(\\.[0-9]+)*|\\Q2.6-IV0\\E|\\Q2.7\\E(\\.[0-9]+)*|\\Q2.7-IV0\\E|\\Q2.7-IV1\\E|\\Q2.7-IV2\\E|\\Q2.8\\E(\\.[0-9]+)*|\\Q2.8-IV0\\E|\\Q2.8-IV1\\E|\\Q3.0\\E(\\.[0-9]+)*|\\Q3.0-IV0\\E|\\Q3.0-IV1\\E|\\Q3.1\\E(\\.[0-9]+)*|\\Q3.1-IV0\\E|\\Q3.2\\E(\\.[0-9]+)*|\\Q3.2-IV0\\E|\\Q3.3\\E(\\.[0-9]+)*|\\Q3.3-IV0\\E|\\Q3.3-IV1\\E|\\Q3.3-IV2\\E|\\Q3.3-IV3\\E|\\Q3.4\\E(\\.[0-9]+)*|\\Q3.4-IV0\\E|\\Q3.5\\E(\\.[0-9]+)*|\\Q3.5-IV0\\E|\\Q3.5-IV1\\E|\\Q3.5-IV2\\E|\\Q3.6\\E(\\.[0-9]+)*|\\Q3.6-IV0\\E|\\Q3.6-IV1\\E|\\Q3.6-IV2\\E|\\Q3.7\\E(\\.[0-9]+)*|\\Q3.7-IV0\\E|\\Q3.7-IV1\\E|\\Q3.7-IV2\\E|\\Q3.7-IV3\\E|\\Q3.7-IV4\\E|\\Q3.8\\E(\\.[0-9]+)*|\\Q3.8-IV0\\E|\\Q3.9\\E(\\.[0-9]+)*|\\Q3.9-IV0\\E");
}

@ParallelTest
Expand All @@ -105,4 +108,27 @@ public void unsupportedVersion() {

assertThat(exc.getMessage(), containsString("Configuration model /kafka-2.6.0-config-model.json was not found"));
}

@ParallelTest
public void testGzipCompressionLevel() {
assertNoError("compression.gzip.level", "9");
assertNoError("compression.gzip.level", "-1");
assertNoError("compression.gzip.level", "1");
assertConfigError("compression.gzip.level", "0", "compression.gzip.level has value '0' which does not match the required pattern: [1-9]{1}|-1");
assertConfigError("compression.gzip.level", "10", "compression.gzip.level has value '10' which does not match the required pattern: [1-9]{1}|-1");
}

@ParallelTest
public void testCaseSensitiveOptions() {
assertNoError("compression.type", "gzip");
assertConfigError("compression.type", "GZIP", "compression.type has value 'GZIP' which is not one of the allowed values: [uncompressed, zstd, lz4, snappy, gzip, producer]");
}

@ParallelTest
public void testCaseInsensitiveOptions() {
assertNoError("group.consumer.migration.policy", "DISABLED");
assertNoError("group.consumer.migration.policy", "downgrade");
assertNoError("group.consumer.migration.policy", "Upgrade");
assertConfigError("group.consumer.migration.policy", "wrong_option", "group.consumer.migration.policy has value 'wrong_option' which is not one of the allowed values (case-insensitive): [DISABLED, DOWNGRADE, UPGRADE, BIDIRECTIONAL]");
}
}
40 changes: 40 additions & 0 deletions config-model-generator/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,26 @@
</properties>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>${maven.dependency.version}</version>
<executions>
<execution>
<id>analyze</id>
<goals>
<goal>analyze-only</goal>
</goals>
<configuration>
<failOnWarning>true</failOnWarning>
<ignoredUnusedDeclaredDependencies>
<!-- Mot used directly because of differences between Kafka versions. But needed in the classpath. -->
<ignoredUnusedDeclaredDependency>org.apache.kafka:kafka-raft</ignoredUnusedDeclaredDependency>
</ignoredUnusedDeclaredDependencies>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
Expand Down Expand Up @@ -119,6 +139,26 @@
<id>generate-model</id>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>${maven.dependency.version}</version>
<executions>
<execution>
<id>analyze</id>
<goals>
<goal>analyze-only</goal>
</goals>
<configuration>
<failOnWarning>true</failOnWarning>
<ignoredUnusedDeclaredDependencies>
<!-- Mot used directly because of differences between Kafka versions. But needed in the classpath. -->
<ignoredUnusedDeclaredDependency>org.apache.kafka:kafka-raft</ignoredUnusedDeclaredDependency>
</ignoredUnusedDeclaredDependencies>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import kafka.server.DynamicBrokerConfig$;
import kafka.server.KafkaConfig$;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.raft.RaftConfig;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.common.MetadataVersionValidator;

Expand All @@ -28,12 +27,14 @@
import java.lang.reflect.Method;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.TreeMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -76,6 +77,7 @@ private static String kafkaVersion() throws IOException {
return p.getProperty("version");
}

@SuppressWarnings({"checkstyle:CyclomaticComplexity"})
private static Map<String, ConfigModel> configs(String version) throws NoSuchMethodException, IllegalAccessException, InvocationTargetException {
ConfigDef def = brokerConfigs();
Map<String, String> dynamicUpdates = brokerDynamicUpdates();
Expand All @@ -101,9 +103,10 @@ private static Map<String, ConfigModel> configs(String version) throws NoSuchMet
descriptor.setValues(enumer(key.validator));
} else if (key.validator instanceof ConfigDef.ValidList) {
descriptor.setItems(validList(key));
} else if (key.validator instanceof ConfigDef.CaseInsensitiveValidString) {
descriptor.setCaseInsensitive(true);
descriptor.setValues(caseInsensitiveEnumer(key.validator));
} else if (key.validator instanceof MetadataVersionValidator) {
// Versions for Kafka 3.3.0 and newer are in MetadataVersion
// We extract it from there
Iterator<MetadataVersion> iterator = Arrays.stream(MetadataVersion.VERSIONS).iterator();
LinkedHashSet<String> versions = new LinkedHashSet<>();
while (iterator.hasNext()) {
Expand All @@ -113,10 +116,6 @@ private static Map<String, ConfigModel> configs(String version) throws NoSuchMet
}
descriptor.setPattern(String.join("|", versions));
} else if (key.validator != null && "class kafka.api.ApiVersionValidator$".equals(key.validator.getClass().toString())) {
// Versions for Kafka 3.2.x and older are in ApiVersions. We cannot use this class because it does not
// exist anymore in Kafka 3.3.0. So we extract the versions from Kafka 3.3.0, but we filter them to have
// the short version equal or less the version for which we generate the config. This should filter out
// the older versions and keep only the valid versions for given release.
Iterator<MetadataVersion> iterator = Arrays.stream(MetadataVersion.VERSIONS).iterator();
LinkedHashSet<String> versions = new LinkedHashSet<>();
while (iterator.hasNext()) {
Expand All @@ -131,8 +130,14 @@ private static Map<String, ConfigModel> configs(String version) throws NoSuchMet
descriptor.setPattern(".+");
} else if (key.validator instanceof ConfigDef.NonEmptyString) {
descriptor.setPattern(".+");
} else if (key.validator instanceof RaftConfig.ControllerQuorumVotersValidator) {
} else if (key.validator != null && "class org.apache.kafka.raft.RaftConfig$ControllerQuorumVotersValidator".equals(key.validator.getClass().toString())) { // we compare the class names because of changes done between Kafka version 3.7 and 3.8 => this is for Kafka 3.7 and older
continue;
} else if (key.validator != null && "class org.apache.kafka.raft.QuorumConfig$ControllerQuorumVotersValidator".equals(key.validator.getClass().toString())) { // we compare the class names because of changes done between Kafka version 3.7 and 3.8 => this is for Kafka 3.8 and newer
continue;
} else if (key.validator != null && "class org.apache.kafka.raft.QuorumConfig$ControllerQuorumBootstrapServersValidator".equals(key.validator.getClass().toString())) { // we compare the class names because of changes done between Kafka version 3.7 and 3.8 => this is for Kafka 3.8 and newer
continue;
} else if (key.validator != null && "class org.apache.kafka.common.compress.GzipCompression$LevelValidator".equals(key.validator.getClass().toString())) { // we compare the class names because of changes done between Kafka version 3.7 and 3.8 => this is for Kafka 3.8 and newer
descriptor.setPattern("[1-9]{1}|-1");
} else if (key.validator != null) {
throw new IllegalStateException("Invalid validator class " + key.validator.getClass() + " for option " + configName);
}
Expand Down Expand Up @@ -203,6 +208,15 @@ private static List<String> enumer(ConfigDef.Validator validator) {
}
}

private static List<String> caseInsensitiveEnumer(ConfigDef.Validator validator) {
try {
Field f = getField(ConfigDef.CaseInsensitiveValidString.class, "validStrings");
return new ArrayList((Set) f.get(validator));
} catch (ReflectiveOperationException e) {
throw new RuntimeException(e);
}
}

private static Field getOneOfFields(Class<?> cls, String... alternativeFields) {
for (String field : alternativeFields) {
try {
Expand Down
Loading

0 comments on commit 9ce1d77

Please sign in to comment.