Skip to content

Commit

Permalink
Add support for Kafka 3.8.0
Browse files Browse the repository at this point in the history
Signed-off-by: Jakub Scholz <www@scholzj.com>
  • Loading branch information
scholzj committed Jul 15, 2024
1 parent 2e0be0b commit e5e7c89
Show file tree
Hide file tree
Showing 57 changed files with 256 additions and 170 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## 0.43.0

* Add support for Apache Kafka 3.8.0.
Remove support for Apache Kafka 3.6.0, 3.6.1, and 3.6.2.
* Added alerts for Connectors/Tasks in failed state.

## 0.42.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,25 @@ public class KafkaVersionTestUtils {

private static final Set<String> SUPPORTED_VERSIONS = new KafkaVersion.Lookup(Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap()).supportedVersions();

public static final String LATEST_KAFKA_VERSION = "3.7.1";
public static final String LATEST_FORMAT_VERSION = "3.7";
public static final String LATEST_PROTOCOL_VERSION = "3.7";
public static final String LATEST_METADATA_VERSION = "3.7-IV4";
public static final String LATEST_KAFKA_VERSION = "3.8.0";
public static final String LATEST_FORMAT_VERSION = "3.8";
public static final String LATEST_PROTOCOL_VERSION = "3.8";
public static final String LATEST_METADATA_VERSION = "3.8-IV0";
public static final String LATEST_ZOOKEEPER_VERSION = "3.8.4";
public static final String LATEST_CHECKSUM = "ABCD1234";
public static final String LATEST_THIRD_PARTY_VERSION = "3.7.x";
public static final String LATEST_THIRD_PARTY_VERSION = "3.8.x";
public static final String LATEST_KAFKA_IMAGE = KAFKA_IMAGE_STR + LATEST_KAFKA_VERSION;
public static final String LATEST_KAFKA_CONNECT_IMAGE = KAFKA_CONNECT_IMAGE_STR + LATEST_KAFKA_VERSION;
public static final String LATEST_KAFKA_MIRROR_MAKER_IMAGE = KAFKA_MIRROR_MAKER_IMAGE_STR + LATEST_KAFKA_VERSION;
public static final String LATEST_KAFKA_MIRROR_MAKER_2_IMAGE = KAFKA_MIRROR_MAKER_2_IMAGE_STR + LATEST_KAFKA_VERSION;

public static final String PREVIOUS_KAFKA_VERSION = "3.6.1";
public static final String PREVIOUS_FORMAT_VERSION = "3.6";
public static final String PREVIOUS_PROTOCOL_VERSION = "3.6";
public static final String PREVIOUS_METADATA_VERSION = "3.6-IV2";
public static final String PREVIOUS_ZOOKEEPER_VERSION = "3.8.3";
public static final String PREVIOUS_KAFKA_VERSION = "3.7.1";
public static final String PREVIOUS_FORMAT_VERSION = "3.7";
public static final String PREVIOUS_PROTOCOL_VERSION = "3.7";
public static final String PREVIOUS_METADATA_VERSION = "3.7-IV4";
public static final String PREVIOUS_ZOOKEEPER_VERSION = "3.8.4";
public static final String PREVIOUS_CHECKSUM = "ABCD1234";
public static final String PREVIOUS_THIRD_PARTY_VERSION = "3.6.x";
public static final String PREVIOUS_THIRD_PARTY_VERSION = "3.7.x";
public static final String PREVIOUS_KAFKA_IMAGE = KAFKA_IMAGE_STR + PREVIOUS_KAFKA_VERSION;
public static final String PREVIOUS_KAFKA_CONNECT_IMAGE = KAFKA_CONNECT_IMAGE_STR + PREVIOUS_KAFKA_VERSION;
public static final String PREVIOUS_KAFKA_MIRROR_MAKER_IMAGE = KAFKA_MIRROR_MAKER_IMAGE_STR + PREVIOUS_KAFKA_VERSION;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public void passwordType() {
@ParallelTest
public void invalidVersion() {
assertConfigError("inter.broker.protocol.version", "dclncswn",
"inter.broker.protocol.version has value 'dclncswn' which does not match the required pattern: \\Q0.8.0\\E(\\.[0-9]+)*|\\Q0.8.0\\E|\\Q0.8.1\\E(\\.[0-9]+)*|\\Q0.8.1\\E|\\Q0.8.2\\E(\\.[0-9]+)*|\\Q0.8.2\\E|\\Q0.9.0\\E(\\.[0-9]+)*|\\Q0.9.0\\E|\\Q0.10.0\\E(\\.[0-9]+)*|\\Q0.10.0-IV0\\E|\\Q0.10.0-IV1\\E|\\Q0.10.1\\E(\\.[0-9]+)*|\\Q0.10.1-IV0\\E|\\Q0.10.1-IV1\\E|\\Q0.10.1-IV2\\E|\\Q0.10.2\\E(\\.[0-9]+)*|\\Q0.10.2-IV0\\E|\\Q0.11.0\\E(\\.[0-9]+)*|\\Q0.11.0-IV0\\E|\\Q0.11.0-IV1\\E|\\Q0.11.0-IV2\\E|\\Q1.0\\E(\\.[0-9]+)*|\\Q1.0-IV0\\E|\\Q1.1\\E(\\.[0-9]+)*|\\Q1.1-IV0\\E|\\Q2.0\\E(\\.[0-9]+)*|\\Q2.0-IV0\\E|\\Q2.0-IV1\\E|\\Q2.1\\E(\\.[0-9]+)*|\\Q2.1-IV0\\E|\\Q2.1-IV1\\E|\\Q2.1-IV2\\E|\\Q2.2\\E(\\.[0-9]+)*|\\Q2.2-IV0\\E|\\Q2.2-IV1\\E|\\Q2.3\\E(\\.[0-9]+)*|\\Q2.3-IV0\\E|\\Q2.3-IV1\\E|\\Q2.4\\E(\\.[0-9]+)*|\\Q2.4-IV0\\E|\\Q2.4-IV1\\E|\\Q2.5\\E(\\.[0-9]+)*|\\Q2.5-IV0\\E|\\Q2.6\\E(\\.[0-9]+)*|\\Q2.6-IV0\\E|\\Q2.7\\E(\\.[0-9]+)*|\\Q2.7-IV0\\E|\\Q2.7-IV1\\E|\\Q2.7-IV2\\E|\\Q2.8\\E(\\.[0-9]+)*|\\Q2.8-IV0\\E|\\Q2.8-IV1\\E|\\Q3.0\\E(\\.[0-9]+)*|\\Q3.0-IV0\\E|\\Q3.0-IV1\\E|\\Q3.1\\E(\\.[0-9]+)*|\\Q3.1-IV0\\E|\\Q3.2\\E(\\.[0-9]+)*|\\Q3.2-IV0\\E|\\Q3.3\\E(\\.[0-9]+)*|\\Q3.3-IV0\\E|\\Q3.3-IV1\\E|\\Q3.3-IV2\\E|\\Q3.3-IV3\\E|\\Q3.4\\E(\\.[0-9]+)*|\\Q3.4-IV0\\E|\\Q3.5\\E(\\.[0-9]+)*|\\Q3.5-IV0\\E|\\Q3.5-IV1\\E|\\Q3.5-IV2\\E|\\Q3.6\\E(\\.[0-9]+)*|\\Q3.6-IV0\\E|\\Q3.6-IV1\\E|\\Q3.6-IV2\\E|\\Q3.7\\E(\\.[0-9]+)*|\\Q3.7-IV0\\E|\\Q3.7-IV1\\E|\\Q3.7-IV2\\E|\\Q3.7-IV3\\E|\\Q3.7-IV4\\E|\\Q3.8\\E(\\.[0-9]+)*|\\Q3.8-IV0\\E");
"inter.broker.protocol.version has value 'dclncswn' which does not match the required pattern: \\Q0.8.0\\E(\\.[0-9]+)*|\\Q0.8.0\\E|\\Q0.8.1\\E(\\.[0-9]+)*|\\Q0.8.1\\E|\\Q0.8.2\\E(\\.[0-9]+)*|\\Q0.8.2\\E|\\Q0.9.0\\E(\\.[0-9]+)*|\\Q0.9.0\\E|\\Q0.10.0\\E(\\.[0-9]+)*|\\Q0.10.0-IV0\\E|\\Q0.10.0-IV1\\E|\\Q0.10.1\\E(\\.[0-9]+)*|\\Q0.10.1-IV0\\E|\\Q0.10.1-IV1\\E|\\Q0.10.1-IV2\\E|\\Q0.10.2\\E(\\.[0-9]+)*|\\Q0.10.2-IV0\\E|\\Q0.11.0\\E(\\.[0-9]+)*|\\Q0.11.0-IV0\\E|\\Q0.11.0-IV1\\E|\\Q0.11.0-IV2\\E|\\Q1.0\\E(\\.[0-9]+)*|\\Q1.0-IV0\\E|\\Q1.1\\E(\\.[0-9]+)*|\\Q1.1-IV0\\E|\\Q2.0\\E(\\.[0-9]+)*|\\Q2.0-IV0\\E|\\Q2.0-IV1\\E|\\Q2.1\\E(\\.[0-9]+)*|\\Q2.1-IV0\\E|\\Q2.1-IV1\\E|\\Q2.1-IV2\\E|\\Q2.2\\E(\\.[0-9]+)*|\\Q2.2-IV0\\E|\\Q2.2-IV1\\E|\\Q2.3\\E(\\.[0-9]+)*|\\Q2.3-IV0\\E|\\Q2.3-IV1\\E|\\Q2.4\\E(\\.[0-9]+)*|\\Q2.4-IV0\\E|\\Q2.4-IV1\\E|\\Q2.5\\E(\\.[0-9]+)*|\\Q2.5-IV0\\E|\\Q2.6\\E(\\.[0-9]+)*|\\Q2.6-IV0\\E|\\Q2.7\\E(\\.[0-9]+)*|\\Q2.7-IV0\\E|\\Q2.7-IV1\\E|\\Q2.7-IV2\\E|\\Q2.8\\E(\\.[0-9]+)*|\\Q2.8-IV0\\E|\\Q2.8-IV1\\E|\\Q3.0\\E(\\.[0-9]+)*|\\Q3.0-IV0\\E|\\Q3.0-IV1\\E|\\Q3.1\\E(\\.[0-9]+)*|\\Q3.1-IV0\\E|\\Q3.2\\E(\\.[0-9]+)*|\\Q3.2-IV0\\E|\\Q3.3\\E(\\.[0-9]+)*|\\Q3.3-IV0\\E|\\Q3.3-IV1\\E|\\Q3.3-IV2\\E|\\Q3.3-IV3\\E|\\Q3.4\\E(\\.[0-9]+)*|\\Q3.4-IV0\\E|\\Q3.5\\E(\\.[0-9]+)*|\\Q3.5-IV0\\E|\\Q3.5-IV1\\E|\\Q3.5-IV2\\E|\\Q3.6\\E(\\.[0-9]+)*|\\Q3.6-IV0\\E|\\Q3.6-IV1\\E|\\Q3.6-IV2\\E|\\Q3.7\\E(\\.[0-9]+)*|\\Q3.7-IV0\\E|\\Q3.7-IV1\\E|\\Q3.7-IV2\\E|\\Q3.7-IV3\\E|\\Q3.7-IV4\\E|\\Q3.8\\E(\\.[0-9]+)*|\\Q3.8-IV0\\E|\\Q3.9\\E(\\.[0-9]+)*|\\Q3.9-IV0\\E");
}

@ParallelTest
Expand All @@ -105,4 +105,13 @@ public void unsupportedVersion() {

assertThat(exc.getMessage(), containsString("Configuration model /kafka-2.6.0-config-model.json was not found"));
}

@ParallelTest
public void gzipCompressionLevel() {
assertNoError("compression.gzip.level", "9");
assertNoError("compression.gzip.level", "-1");
assertNoError("compression.gzip.level", "1");
assertConfigError("compression.gzip.level", "0", "compression.gzip.level has value '0' which does not match the required pattern: [1-9]{1}|-1");
assertConfigError("compression.gzip.level", "10", "compression.gzip.level has value '10' which does not match the required pattern: [1-9]{1}|-1");
}
}
40 changes: 40 additions & 0 deletions config-model-generator/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,26 @@
</properties>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>${maven.dependency.version}</version>
<executions>
<execution>
<id>analyze</id>
<goals>
<goal>analyze-only</goal>
</goals>
<configuration>
<failOnWarning>true</failOnWarning>
<ignoredUnusedDeclaredDependencies>
<!-- Mot used directly because of differences between Kafka versions. But needed in the classpath. -->
<ignoredUnusedDeclaredDependency>org.apache.kafka:kafka-raft</ignoredUnusedDeclaredDependency>
</ignoredUnusedDeclaredDependencies>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
Expand Down Expand Up @@ -119,6 +139,26 @@
<id>generate-model</id>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>${maven.dependency.version}</version>
<executions>
<execution>
<id>analyze</id>
<goals>
<goal>analyze-only</goal>
</goals>
<configuration>
<failOnWarning>true</failOnWarning>
<ignoredUnusedDeclaredDependencies>
<!-- Mot used directly because of differences between Kafka versions. But needed in the classpath. -->
<ignoredUnusedDeclaredDependency>org.apache.kafka:kafka-raft</ignoredUnusedDeclaredDependency>
</ignoredUnusedDeclaredDependencies>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import kafka.server.DynamicBrokerConfig$;
import kafka.server.KafkaConfig$;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.raft.RaftConfig;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.common.MetadataVersionValidator;

Expand All @@ -28,12 +27,14 @@
import java.lang.reflect.Method;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.TreeMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -76,6 +77,7 @@ private static String kafkaVersion() throws IOException {
return p.getProperty("version");
}

@SuppressWarnings({"checkstyle:CyclomaticComplexity"})
private static Map<String, ConfigModel> configs(String version) throws NoSuchMethodException, IllegalAccessException, InvocationTargetException {
ConfigDef def = brokerConfigs();
Map<String, String> dynamicUpdates = brokerDynamicUpdates();
Expand All @@ -102,8 +104,6 @@ private static Map<String, ConfigModel> configs(String version) throws NoSuchMet
} else if (key.validator instanceof ConfigDef.ValidList) {
descriptor.setItems(validList(key));
} else if (key.validator instanceof MetadataVersionValidator) {
// Versions for Kafka 3.3.0 and newer are in MetadataVersion
// We extract it from there
Iterator<MetadataVersion> iterator = Arrays.stream(MetadataVersion.VERSIONS).iterator();
LinkedHashSet<String> versions = new LinkedHashSet<>();
while (iterator.hasNext()) {
Expand All @@ -113,10 +113,6 @@ private static Map<String, ConfigModel> configs(String version) throws NoSuchMet
}
descriptor.setPattern(String.join("|", versions));
} else if (key.validator != null && "class kafka.api.ApiVersionValidator$".equals(key.validator.getClass().toString())) {
// Versions for Kafka 3.2.x and older are in ApiVersions. We cannot use this class because it does not
// exist anymore in Kafka 3.3.0. So we extract the versions from Kafka 3.3.0, but we filter them to have
// the short version equal or less the version for which we generate the config. This should filter out
// the older versions and keep only the valid versions for given release.
Iterator<MetadataVersion> iterator = Arrays.stream(MetadataVersion.VERSIONS).iterator();
LinkedHashSet<String> versions = new LinkedHashSet<>();
while (iterator.hasNext()) {
Expand All @@ -131,8 +127,16 @@ private static Map<String, ConfigModel> configs(String version) throws NoSuchMet
descriptor.setPattern(".+");
} else if (key.validator instanceof ConfigDef.NonEmptyString) {
descriptor.setPattern(".+");
} else if (key.validator instanceof RaftConfig.ControllerQuorumVotersValidator) {
} else if (key.validator instanceof ConfigDef.CaseInsensitiveValidString) {
descriptor.setValues(caseInsensitiveEnumer(key.validator));
} else if (key.validator != null && "class org.apache.kafka.raft.RaftConfig$ControllerQuorumVotersValidator".equals(key.validator.getClass().toString())) { // we compare the class names because of changes done between Kafka version 3.7 and 3.8 => this is for Kafka 3.7 and older
continue;
} else if (key.validator != null && "class org.apache.kafka.raft.QuorumConfig$ControllerQuorumVotersValidator".equals(key.validator.getClass().toString())) { // we compare the class names because of changes done between Kafka version 3.7 and 3.8 => this is for Kafka 3.8 and newer
continue;
} else if (key.validator != null && "class org.apache.kafka.raft.QuorumConfig$ControllerQuorumBootstrapServersValidator".equals(key.validator.getClass().toString())) { // we compare the class names because of changes done between Kafka version 3.7 and 3.8 => this is for Kafka 3.8 and newer
continue;
} else if (key.validator != null && "class org.apache.kafka.common.compress.GzipCompression$LevelValidator".equals(key.validator.getClass().toString())) { // we compare the class names because of changes done between Kafka version 3.7 and 3.8 => this is for Kafka 3.8 and newer
descriptor.setPattern("[1-9]{1}|-1");
} else if (key.validator != null) {
throw new IllegalStateException("Invalid validator class " + key.validator.getClass() + " for option " + configName);
}
Expand Down Expand Up @@ -203,6 +207,16 @@ private static List<String> enumer(ConfigDef.Validator validator) {
}
}

private static List<String> caseInsensitiveEnumer(ConfigDef.Validator validator) {
// TODO: Deal with the case-insensitive part
try {
Field f = getField(ConfigDef.CaseInsensitiveValidString.class, "validStrings");
return new ArrayList((Set) f.get(validator));
} catch (ReflectiveOperationException e) {
throw new RuntimeException(e);
}
}

private static Field getOneOfFields(Class<?> cls, String... alternativeFields) {
for (String field : alternativeFields) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,13 @@ && i > getMaximum().intValue()) {
}
errors.add(maximumErrorMsg(configName, value));
}
if (getPattern() != null
&& !value.matches(getPattern())) {
if (errors.isEmpty()) {
errors = new ArrayList<>(1);
}
errors.add(configName + " has value '" + value + "' which does not match the required pattern: " + getPattern());
}
} catch (NumberFormatException e) {
errors = singletonList(numFormatMsg(configName, value, "an int"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
<properties>
<strimzi-oauth.version>0.15.0</strimzi-oauth.version>
<jayway-json-path.version>2.9.0</jayway-json-path.version>
<cruise-control.version>2.5.137</cruise-control.version>
<cruise-control.version>2.5.138</cruise-control.version>
<opa-authorizer.version>1.5.1</opa-authorizer.version>
<kafka-quotas-plugin.version>0.3.1</kafka-quotas-plugin.version>
<kafka-mirror-maker-2-extensions.version>1.2.0</kafka-mirror-maker-2-extensions.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
<properties>
<strimzi-oauth.version>0.15.0</strimzi-oauth.version>
<jayway-json-path.version>2.9.0</jayway-json-path.version>
<cruise-control.version>2.5.137</cruise-control.version>
<cruise-control.version>2.5.138</cruise-control.version>
<opa-authorizer.version>1.5.1</opa-authorizer.version>
<kafka-quotas-plugin.version>0.3.1</kafka-quotas-plugin.version>
<kafka-mirror-maker-2-extensions.version>1.2.0</kafka-mirror-maker-2-extensions.version>
Expand Down
Loading

0 comments on commit e5e7c89

Please sign in to comment.