Skip to content

Commit

Permalink
KAFKA-18365 Remove zookeeper.connect in Test (#18353)
Browse files Browse the repository at this point in the history
Reviewers: TaiJuWu <tjwu1217@gmail.com>, PoAn Yang <payang@apache.org>, Chia-Ping Tsai <chia7712@gmail.com>
  • Loading branch information
m1a2st authored Jan 4, 2025
1 parent 409a43e commit a628d9b
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 35 deletions.
33 changes: 15 additions & 18 deletions core/src/test/scala/unit/kafka/KafkaConfigTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.kafka.common.internals.FatalExitError
import org.apache.kafka.common.utils.Exit
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ZkConfigs}
import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.api.Assertions._

Expand All @@ -47,20 +47,20 @@ class KafkaConfigTest {

// We should load configuration file without any arguments
val config1 = KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile)))
assertEquals(1, config1.brokerId)
assertEquals(1, config1.nodeId)

// We should be able to override given property on command line
val config2 = KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, "--override", "broker.id=2")))
assertEquals(2, config2.brokerId)
val config2 = KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, "--override", "node.id=2")))
assertEquals(2, config2.nodeId)

// We should be also able to set completely new property
val config3 = KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, "--override", "log.cleanup.policy=compact")))
assertEquals(1, config3.brokerId)
assertEquals(1, config3.nodeId)
assertEquals(util.Arrays.asList("compact"), config3.logCleanupPolicy)

// We should be also able to set several properties
val config4 = KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, "--override", "log.cleanup.policy=compact,delete", "--override", "broker.id=2")))
assertEquals(2, config4.brokerId)
val config4 = KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, "--override", "log.cleanup.policy=compact,delete", "--override", "node.id=2")))
assertEquals(2, config4.nodeId)
assertEquals(util.Arrays.asList("compact","delete"), config4.logCleanupPolicy)
}

Expand Down Expand Up @@ -155,16 +155,6 @@ class KafkaConfigTest {
|must contain the set of bootstrap controllers or controller.quorum.voters must contain a
|parseable set of controllers.""".stripMargin.replace("\n", " ")
)

// Ensure that if neither process.roles nor controller.quorum.voters is populated, then an exception is thrown if zookeeper.connect is not defined
propertiesFile.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "")
assertBadConfigContainingMessage(propertiesFile,
"Missing required configuration `zookeeper.connect` which has no default value.")

// Ensure that no exception is thrown once zookeeper.connect is defined (and we clear controller.listener.names)
propertiesFile.setProperty(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181")
propertiesFile.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "")
KafkaConfig.fromProps(propertiesFile)
}

private def setListenerProps(props: Properties): Unit = {
Expand Down Expand Up @@ -244,7 +234,14 @@ class KafkaConfigTest {
}

def prepareDefaultConfig(): String = {
prepareConfig(Array("broker.id=1", "zookeeper.connect=somewhere"))
prepareConfig(Array(
"node.id=1",
"process.roles=controller",
"controller.listener.names=CONTROLLER",
"controller.quorum.voters=1@localhost:9093,2@localhost:9093",
"listeners=CONTROLLER://:9093",
"advertised.listeners=CONTROLLER://127.0.0.1:9093"
))
}

def prepareConfig(lines : Array[String]): String = {
Expand Down
8 changes: 7 additions & 1 deletion core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.{ListOffsetsRequest, ListOffsetsResponse}
import org.apache.kafka.common.utils.{BufferSupplier, Time, Utils}
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.server.config.KRaftConfigs
import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig
import org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, NoOpRemoteStorageManager, RemoteLogManagerConfig}
import org.apache.kafka.server.metrics.KafkaYammerMetrics
Expand Down Expand Up @@ -2270,7 +2271,12 @@ class UnifiedLogTest {

private def createKafkaConfigWithRLM: KafkaConfig = {
val props = new Properties()
props.put("zookeeper.connect", "test")
props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "controller")
props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "0")
props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER")
props.setProperty("controller.quorum.bootstrap.servers", "localhost:9093")
props.setProperty("listeners", "CONTROLLER://:9093")
props.setProperty("advertised.listeners", "CONTROLLER://127.0.0.1:9093")
props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true")
props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteStorageManager].getName)
props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteLogMetadataManager].getName)
Expand Down
16 changes: 13 additions & 3 deletions core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState}
import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils}
import org.apache.kafka.server.common.{DirectoryEventHandler, KRaftVersion, MetadataVersion, OffsetAndEpoch, RequestLocal, StopPartition}
import org.apache.kafka.server.config.{ReplicationConfigs, ServerLogConfigs}
import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerLogConfigs}
import org.apache.kafka.server.log.remote.storage._
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
import org.apache.kafka.server.network.BrokerEndPoint
Expand Down Expand Up @@ -4101,7 +4101,12 @@ class ReplicaManagerTest {
val tidp0 = new TopicIdPartition(topicId, tp0)

val props = new Properties()
props.put("zookeeper.connect", "test")
props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "controller")
props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "0")
props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER")
props.setProperty("controller.quorum.bootstrap.servers", "localhost:9093")
props.setProperty("listeners", "CONTROLLER://:9093")
props.setProperty("advertised.listeners", "CONTROLLER://127.0.0.1:9093")
props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, true.toString)
props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteStorageManager].getName)
props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteLogMetadataManager].getName)
Expand Down Expand Up @@ -4209,7 +4214,12 @@ class ReplicaManagerTest {
val tidp0 = new TopicIdPartition(topicId, tp0)

val props = new Properties()
props.put("zookeeper.connect", "test")
props.setProperty("process.roles", "controller")
props.setProperty("node.id", "0")
props.setProperty("controller.listener.names", "CONTROLLER")
props.setProperty("controller.quorum.bootstrap.servers", "localhost:9093")
props.setProperty("listeners", "CONTROLLER://:9093")
props.setProperty("advertised.listeners", "CONTROLLER://127.0.0.1:9093")
props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, true.toString)
props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteStorageManager].getName)
props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteLogMetadataManager].getName)
Expand Down
13 changes: 0 additions & 13 deletions core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -294,19 +294,6 @@ Found problem:
"Failed to find content in output: " + stream.toString())
}

@Test
def testFormatFailsInZkMode(): Unit = {
val availableDirs = Seq(TestUtils.tempDir())
val properties = new Properties()
properties.setProperty("log.dirs", availableDirs.mkString(","))
properties.setProperty("zookeeper.connect", "localhost:2181")
val stream = new ByteArrayOutputStream()
assertEquals("The kafka configuration file appears to be for a legacy cluster. " +
"Formatting is only supported for clusters in KRaft mode.",
assertThrows(classOf[TerseFailure],
() => runFormatCommand(stream, properties)).getMessage)
}

@Test
def testFormatWithReleaseVersion(): Unit = {
val availableDirs = Seq(TestUtils.tempDir())
Expand Down

0 comments on commit a628d9b

Please sign in to comment.