Skip to content

Commit

Permalink
MINOR: Remove RaftManager.maybeDeleteMetadataLogDir and AutoTopicCrea…
Browse files Browse the repository at this point in the history
…tionManagerTest.scala (#17365)

Remove RaftManager.maybeDeleteMetadataLogDir since it was only used during ZK migration, and that code has been removed.

Similarly, remove RaftManagerTest.testKRaftBrokerDoesNotDeleteMetadataLog which tested that function.

Remove AutoTopicCreationManagerTest since it tests the ZK-mode-only AutoTopicReationManager.

Reviewers: Mickael Maison <mickael.maison@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
  • Loading branch information
cmccabe authored Jan 7, 2025
1 parent d874aa4 commit d8236be
Show file tree
Hide file tree
Showing 3 changed files with 0 additions and 471 deletions.
34 changes: 0 additions & 34 deletions core/src/main/scala/kafka/raft/RaftManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import org.apache.kafka.common.KafkaException
import org.apache.kafka.common.Node
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.{ChannelBuilders, ListenerName, NetworkReceive, Selectable, Selector}
import org.apache.kafka.common.protocol.ApiMessage
Expand Down Expand Up @@ -84,39 +83,6 @@ object KafkaRaftManager {
.map(Paths.get(_).toAbsolutePath)
.contains(Paths.get(config.metadataLogDir).toAbsolutePath)
}

/**
* Obtain the file lock and delete the metadata log directory completely.
*
* This is only used by ZK brokers that are in pre-migration or hybrid mode of the ZK to KRaft migration.
* The rationale for deleting the metadata log in these cases is that it is safe to do on brokers and it
* makes recovery from a failed migration much easier. See KAFKA-16463.
*
* @param config The broker config
*/
def maybeDeleteMetadataLogDir(config: KafkaConfig): Unit = {
// These constraints are enforced in KafkaServer, but repeating them here to guard against future callers
if (config.processRoles.nonEmpty) {
throw new RuntimeException("Not deleting metadata log dir since this node is in KRaft mode.")
} else {
val metadataDir = new File(config.metadataLogDir)
val logDirName = UnifiedLog.logDirName(Topic.CLUSTER_METADATA_TOPIC_PARTITION)
val metadataPartitionDir = KafkaRaftManager.createLogDirectory(metadataDir, logDirName)
val deletionLock = if (hasDifferentLogDir(config)) {
Some(KafkaRaftManager.lockDataDir(metadataDir))
} else {
None
}

try {
Utils.delete(metadataPartitionDir)
} catch {
case t: Throwable => throw new RuntimeException("Failed to delete metadata log", t)
} finally {
deletionLock.foreach(_.destroy())
}
}
}
}

trait RaftManager[T] {
Expand Down
19 changes: 0 additions & 19 deletions core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -222,25 +222,6 @@ class RaftManagerTest {
}
}

@Test
def testKRaftBrokerDoesNotDeleteMetadataLog(): Unit = {
val logDirs = Seq(TestUtils.tempDir().toPath)
val metadataLogDir = Some(TestUtils.tempDir().toPath)
val nodeId = 1
val config = createConfig(
Set(ProcessRole.BrokerRole),
nodeId,
logDirs,
metadataLogDir
)
createMetadataLog(config)

assertThrows(classOf[RuntimeException], () => KafkaRaftManager.maybeDeleteMetadataLogDir(config),
"Should not have deleted metadata log")
assertLogDirsExist(logDirs, metadataLogDir, expectMetadataLog = true)

}

private def fileLocked(path: Path): Boolean = {
Using.resource(FileChannel.open(path, StandardOpenOption.CREATE, StandardOpenOption.WRITE)) { channel =>
try {
Expand Down
Loading

0 comments on commit d8236be

Please sign in to comment.