Skip to content

Commit 98d03d8

Browse files
committed
KAFKA-14154; Return NOT_CONTROLLER from AlterPartition if leader is ahead of controller (#12506)
It is possible for the leader to send an `AlterPartition` request to a zombie controller which includes either a partition or leader epoch which is larger than what is found in the controller context. Prior to apache/kafka#12032, the controller handled this in the following way: 1. If the `LeaderAndIsr` state exactly matches the current state on the controller excluding the partition epoch, then the `AlterPartition` request is considered successful and no error is returned. The risk with this handling is that this may cause the leader to incorrectly assume that the state had been successfully updated. Since the controller's state is stale, there is no way to know what the latest ISR state is. 2. Otherwise, the controller will attempt to update the state in zookeeper with the leader/partition epochs from the `AlterPartition` request. This operation would fail if the controller's epoch was not still current in Zookeeper and the result would be a `NOT_CONTROLLER` error. Following apache/kafka#12032, the controller's validation is stricter. If the partition epoch is larger than expected, then the controller will return `INVALID_UPDATE_VERSION` without attempting the operation. Similarly, if the leader epoch is larger than expected, the controller will return `FENCED_LEADER_EPOCH`. The problem with this new handling is that the leader treats the errors from the controller as authoritative. For example, if it sees the `FENCED_LEADER_EPOCH` error, then it will not retry the request and will simply wait until the next leader epoch arrives. The ISR state gets suck in a pending state, which can lead to persistent URPs until the leader epoch gets bumped. In this patch, we want to fix the issues with this handling, but we don't want to restore the buggy idempotent check. The approach is straightforward. If the controller sees a partition/leader epoch which is larger than what it has in the controller context, then it assumes that has become a zombie and returns `NOT_CONTROLLER` to the leader. This will cause the leader to attempt to reset the controller from its local metadata cache and retry the `AlterPartition` request. Reviewers: David Jacot <djacot@confluent.io>, José Armando García Sancio <jsancio@users.noreply.github.com>
1 parent 3a784ac commit 98d03d8

File tree

45 files changed

+1139
-315
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+1139
-315
lines changed

Jenkinsfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
def doValidation() {
2121
sh """
22-
./gradlew -PscalaVersion=$SCALA_VERSION clean compileJava compileScala compileTestJava compileTestScala \
22+
./retry_zinc ./gradlew -PscalaVersion=$SCALA_VERSION clean compileJava compileScala compileTestJava compileTestScala \
2323
spotlessScalaCheck checkstyleMain checkstyleTest spotbugsMain rat \
2424
--profile --no-daemon --continue -PxmlSpotBugsReport=true
2525
"""

build.gradle

+6-6
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,11 @@ plugins {
3434
id 'com.github.ben-manes.versions' version '0.42.0'
3535
id 'idea'
3636
id 'java-library'
37-
id 'org.owasp.dependencycheck' version '7.0.3'
38-
id 'org.nosphere.apache.rat' version "0.7.0"
37+
id 'org.owasp.dependencycheck' version '7.1.1'
38+
id 'org.nosphere.apache.rat' version "0.7.1"
3939

40-
id "com.github.spotbugs" version '5.0.6' apply false
41-
id 'org.gradle.test-retry' version '1.3.1' apply false
40+
id "com.github.spotbugs" version '5.0.9' apply false
41+
id 'org.gradle.test-retry' version '1.4.0' apply false
4242
id 'org.scoverage' version '7.0.0' apply false
4343
id 'com.github.johnrengelman.shadow' version '7.1.2' apply false
4444
id "io.swagger.core.v3.swagger-gradle-plugin" version "2.2.0"
@@ -408,7 +408,7 @@ subprojects {
408408
"**/ConnectorsResourceTest.*", "**/DistributedHerderTest.*", "**/FileOffsetBakingStoreTest.*",
409409
"**/ErrorHandlingTaskTest.*", "**/KafkaConfigBackingStoreTest.*", "**/KafkaOffsetBackingStoreTest.*",
410410
"**/KafkaBasedLogTest.*", "**/OffsetStorageWriterTest.*", "**/StandaloneHerderTest.*",
411-
"**/SourceTaskOffsetCommitterTest.*", "**/WorkerGroupMemberTest.*",
411+
"**/SourceTaskOffsetCommitterTest.*",
412412
"**/WorkerTest.*", "**/WorkerSinkTaskTest.*", "**/WorkerSinkTaskThreadedTest.*",
413413
"**/WorkerSourceTaskTest.*", "**/AbstractWorkerSourceTaskTest.*", "**/ExactlyOnceWorkerSourceTaskTest.*",
414414
"**/WorkerTaskTest.*",
@@ -1349,7 +1349,7 @@ project(':clients') {
13491349
}
13501350
test {
13511351
java {
1352-
srcDirs = ["src/generated/java", "src/generated-test/java", "src/test/java"]
1352+
srcDirs = ["src/generated-test/java", "src/test/java"]
13531353
}
13541354
}
13551355
}

clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,10 @@ public long windowSize(MetricConfig config, long now) {
9191
if (numFullWindows < minFullWindows)
9292
totalElapsedTimeMs += (minFullWindows - numFullWindows) * config.timeWindowMs();
9393

94-
return totalElapsedTimeMs;
94+
// If window size is being calculated at the exact beginning of the window with no prior samples, the window size
95+
// will result in a value of 0. Calculation of rate over a window is size 0 is undefined, hence, we assume the
96+
// minimum window size to be at least 1ms.
97+
return Math.max(totalElapsedTimeMs, 1);
9598
}
9699

97100
@Override

clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545

4646
import org.apache.kafka.common.Metric;
4747
import org.apache.kafka.common.MetricName;
48+
import org.apache.kafka.common.metrics.internals.MetricsUtils;
4849
import org.apache.kafka.common.metrics.stats.Avg;
4950
import org.apache.kafka.common.metrics.stats.CumulativeSum;
5051
import org.apache.kafka.common.metrics.stats.Max;
@@ -607,15 +608,15 @@ public void testRateWindowing() throws Exception {
607608
// Sleep for half the window.
608609
time.sleep(cfg.timeWindowMs() / 2);
609610

610-
// prior to any time passing
611-
double elapsedSecs = (cfg.timeWindowMs() * (cfg.samples() - 1) + cfg.timeWindowMs() / 2) / 1000.0;
611+
// prior to any time passing, elapsedSecs = sampleWindowSize * (total samples - half of final sample)
612+
double elapsedSecs = MetricsUtils.convert(cfg.timeWindowMs(), TimeUnit.SECONDS) * (cfg.samples() - 0.5);
612613

613614
KafkaMetric rateMetric = metrics.metrics().get(rateMetricName);
614615
KafkaMetric countRateMetric = metrics.metrics().get(countRateMetricName);
615616
assertEquals(sum / elapsedSecs, (Double) rateMetric.metricValue(), EPS, "Rate(0...2) = 2.666");
616617
assertEquals(count / elapsedSecs, (Double) countRateMetric.metricValue(), EPS, "Count rate(0...2) = 0.02666");
617618
assertEquals(elapsedSecs,
618-
((Rate) rateMetric.measurable()).windowSize(cfg, time.milliseconds()) / 1000, EPS, "Elapsed Time = 75 seconds");
619+
MetricsUtils.convert(((Rate) rateMetric.measurable()).windowSize(cfg, time.milliseconds()), TimeUnit.SECONDS), EPS, "Elapsed Time = 75 seconds");
619620
assertEquals(sum, (Double) totalMetric.metricValue(), EPS);
620621
assertEquals(count, (Double) countTotalMetric.metricValue(), EPS);
621622

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.common.metrics.stats;
18+
19+
import org.apache.kafka.common.metrics.MetricConfig;
20+
import org.apache.kafka.common.metrics.internals.MetricsUtils;
21+
import org.apache.kafka.common.utils.MockTime;
22+
import org.apache.kafka.common.utils.Time;
23+
import org.junit.jupiter.api.BeforeEach;
24+
import org.junit.jupiter.params.ParameterizedTest;
25+
import org.junit.jupiter.params.provider.CsvSource;
26+
27+
import java.util.concurrent.TimeUnit;
28+
29+
import static org.junit.jupiter.api.Assertions.assertEquals;
30+
import static org.junit.jupiter.api.Assertions.assertFalse;
31+
32+
public class RateTest {
33+
private static final double EPS = 0.000001;
34+
private Rate r;
35+
private Time timeClock;
36+
37+
@BeforeEach
38+
public void setup() {
39+
r = new Rate();
40+
timeClock = new MockTime();
41+
}
42+
43+
// Tests the scenario where the recording and measurement is done before the window for first sample finishes
44+
// with no prior samples retained.
45+
@ParameterizedTest
46+
@CsvSource({"1,1", "1,11", "11,1", "11,11"})
47+
public void testRateWithNoPriorAvailableSamples(int numSample, int sampleWindowSizeSec) {
48+
final MetricConfig config = new MetricConfig().samples(numSample).timeWindow(sampleWindowSizeSec, TimeUnit.SECONDS);
49+
final double sampleValue = 50.0;
50+
// record at beginning of the window
51+
r.record(config, sampleValue, timeClock.milliseconds());
52+
// forward time till almost the end of window
53+
final long measurementTime = TimeUnit.SECONDS.toMillis(sampleWindowSizeSec) - 1;
54+
timeClock.sleep(measurementTime);
55+
// calculate rate at almost the end of window
56+
final double observedRate = r.measure(config, timeClock.milliseconds());
57+
assertFalse(Double.isNaN(observedRate));
58+
59+
// In a scenario where sufficient number of samples is not available yet, the rate calculation algorithm assumes
60+
// presence of N-1 (where N = numSample) prior samples with sample values of 0. Hence, the window size for rate
61+
// calculation accounts for N-1 prior samples
62+
final int dummyPriorSamplesAssumedByAlgorithm = numSample - 1;
63+
final double windowSize = MetricsUtils.convert(measurementTime, TimeUnit.SECONDS) + (dummyPriorSamplesAssumedByAlgorithm * sampleWindowSizeSec);
64+
double expectedRatePerSec = sampleValue / windowSize;
65+
assertEquals(expectedRatePerSec, observedRate, EPS);
66+
}
67+
}

connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMemberTest.java

+11-22
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,12 @@
2525
import org.apache.kafka.connect.runtime.MockConnectMetrics;
2626
import org.apache.kafka.connect.runtime.WorkerConfig;
2727
import org.apache.kafka.connect.storage.ConfigBackingStore;
28-
import org.apache.kafka.connect.storage.StatusBackingStore;
2928
import org.apache.kafka.connect.util.ConnectUtils;
30-
import org.easymock.EasyMock;
3129
import org.junit.Test;
3230
import org.junit.runner.RunWith;
33-
import org.powermock.api.easymock.PowerMock;
34-
import org.powermock.api.easymock.annotation.Mock;
35-
import org.powermock.core.classloader.annotations.PowerMockIgnore;
36-
import org.powermock.core.classloader.annotations.PrepareForTest;
37-
import org.powermock.modules.junit4.PowerMockRunner;
31+
import org.mockito.Mock;
32+
import org.mockito.MockedStatic;
33+
import org.mockito.junit.MockitoJUnitRunner;
3834

3935
import javax.management.MBeanServer;
4036
import javax.management.ObjectName;
@@ -45,15 +41,13 @@
4541
import static org.junit.Assert.assertEquals;
4642
import static org.junit.Assert.assertNotNull;
4743
import static org.junit.Assert.assertTrue;
44+
import static org.mockito.ArgumentMatchers.any;
45+
import static org.mockito.Mockito.mockStatic;
4846

49-
@RunWith(PowerMockRunner.class)
50-
@PrepareForTest({ConnectUtils.class})
51-
@PowerMockIgnore({"javax.management.*", "javax.crypto.*"})
47+
@RunWith(MockitoJUnitRunner.StrictStubs.class)
5248
public class WorkerGroupMemberTest {
5349
@Mock
5450
private ConfigBackingStore configBackingStore;
55-
@Mock
56-
private StatusBackingStore statusBackingStore;
5751

5852
@Test
5953
public void testMetrics() throws Exception {
@@ -72,10 +66,11 @@ public void testMetrics() throws Exception {
7266

7367
LogContext logContext = new LogContext("[Worker clientId=client-1 + groupId= group-1]");
7468

75-
expectClusterId();
76-
77-
member = new WorkerGroupMember(config, "", configBackingStore,
78-
null, Time.SYSTEM, "client-1", logContext);
69+
try (MockedStatic<ConnectUtils> utilities = mockStatic(ConnectUtils.class)) {
70+
utilities.when(() -> ConnectUtils.lookupKafkaClusterId(any())).thenReturn("cluster-1");
71+
member = new WorkerGroupMember(config, "", configBackingStore, null, Time.SYSTEM, "client-1", logContext);
72+
utilities.verify(() -> ConnectUtils.lookupKafkaClusterId(any()));
73+
}
7974

8075
boolean entered = false;
8176
for (MetricsReporter reporter : member.metrics().reporters()) {
@@ -94,10 +89,4 @@ public void testMetrics() throws Exception {
9489
//verify metric exists with correct prefix
9590
assertNotNull(server.getObjectInstance(new ObjectName("kafka.connect:type=grp1,client-id=client-1")));
9691
}
97-
private void expectClusterId() {
98-
PowerMock.mockStaticPartial(ConnectUtils.class, "lookupKafkaClusterId");
99-
EasyMock.expect(ConnectUtils.lookupKafkaClusterId(EasyMock.anyObject())).andReturn("cluster-1").anyTimes();
100-
PowerMock.replay(ConnectUtils.class);
101-
}
102-
10392
}

core/src/main/scala/kafka/cluster/Partition.scala

+7-2
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import kafka.log._
2626
import kafka.metrics.KafkaMetricsGroup
2727
import kafka.server._
2828
import kafka.server.checkpoints.OffsetCheckpoints
29-
import kafka.server.metadata.KRaftMetadataCache
29+
import kafka.server.metadata.{KRaftMetadataCache, ZkMetadataCache}
3030
import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
3131
import kafka.utils._
3232
import kafka.zookeeper.ZooKeeperClientException
@@ -881,11 +881,16 @@ class Partition(val topicPartition: TopicPartition,
881881
private def isReplicaIsrEligible(followerReplicaId: Int): Boolean = {
882882
metadataCache match {
883883
// In KRaft mode, only replicas which are not fenced nor in controlled shutdown are
884-
// allowed to join the ISR. This does not apply to ZK mode.
884+
// allowed to join the ISR.
885885
case kRaftMetadataCache: KRaftMetadataCache =>
886886
!kRaftMetadataCache.isBrokerFenced(followerReplicaId) &&
887887
!kRaftMetadataCache.isBrokerShuttingDown(followerReplicaId)
888888

889+
// In ZK mode, we just ensure the broker is alive. Although we do not check for shutting down brokers here,
890+
// the controller will block them from being added to ISR.
891+
case zkMetadataCache: ZkMetadataCache =>
892+
zkMetadataCache.hasAliveBroker(followerReplicaId)
893+
889894
case _ => true
890895
}
891896
}

core/src/main/scala/kafka/controller/KafkaController.scala

+25-2
Original file line numberDiff line numberDiff line change
@@ -2336,7 +2336,14 @@ class KafkaController(val config: KafkaConfig,
23362336
controllerContext.partitionLeadershipInfo(tp) match {
23372337
case Some(leaderIsrAndControllerEpoch) =>
23382338
val currentLeaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
2339-
if (newLeaderAndIsr.leaderEpoch != currentLeaderAndIsr.leaderEpoch) {
2339+
if (newLeaderAndIsr.partitionEpoch > currentLeaderAndIsr.partitionEpoch
2340+
|| newLeaderAndIsr.leaderEpoch > currentLeaderAndIsr.leaderEpoch) {
2341+
// If the partition leader has a higher partition/leader epoch, then it is likely
2342+
// that this node is no longer the active controller. We return NOT_CONTROLLER in
2343+
// this case to give the leader an opportunity to find the new controller.
2344+
partitionResponses(tp) = Left(Errors.NOT_CONTROLLER)
2345+
None
2346+
} else if (newLeaderAndIsr.leaderEpoch != currentLeaderAndIsr.leaderEpoch) {
23402347
partitionResponses(tp) = Left(Errors.FENCED_LEADER_EPOCH)
23412348
None
23422349
} else if (newLeaderAndIsr.equalsAllowStalePartitionEpoch(currentLeaderAndIsr)) {
@@ -2364,7 +2371,23 @@ class KafkaController(val config: KafkaConfig,
23642371
)
23652372
None
23662373
} else {
2367-
Some(tp -> newLeaderAndIsr)
2374+
// Pull out replicas being added to ISR and verify they are all online.
2375+
// If a replica is not online, reject the update as specified in KIP-841.
2376+
val ineligibleReplicas = newLeaderAndIsr.isr.toSet -- controllerContext.liveBrokerIds
2377+
if (ineligibleReplicas.nonEmpty) {
2378+
info(s"Rejecting AlterPartition request from node $brokerId for $tp because " +
2379+
s"it specified ineligible replicas $ineligibleReplicas in the new ISR ${newLeaderAndIsr.isr}."
2380+
)
2381+
2382+
if (alterPartitionRequestVersion > 1) {
2383+
partitionResponses(tp) = Left(Errors.INELIGIBLE_REPLICA)
2384+
} else {
2385+
partitionResponses(tp) = Left(Errors.OPERATION_NOT_ATTEMPTED)
2386+
}
2387+
None
2388+
} else {
2389+
Some(tp -> newLeaderAndIsr)
2390+
}
23682391
}
23692392

23702393
case None =>

core/src/main/scala/kafka/raft/RaftManager.scala

+1-7
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import java.util.concurrent.CompletableFuture
2424
import kafka.log.UnifiedLog
2525
import kafka.raft.KafkaRaftManager.RaftIoThread
2626
import kafka.server.{KafkaConfig, MetaProperties}
27-
import kafka.server.KafkaRaftServer.ControllerRole
2827
import kafka.utils.timer.SystemTimer
2928
import kafka.utils.{KafkaScheduler, Logging, ShutdownableThread}
3029
import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater, NetworkClient}
@@ -181,12 +180,7 @@ class KafkaRaftManager[T](
181180
val expirationTimer = new SystemTimer("raft-expiration-executor")
182181
val expirationService = new TimingWheelExpirationService(expirationTimer)
183182
val quorumStateStore = new FileBasedStateStore(new File(dataDir, "quorum-state"))
184-
185-
val nodeId = if (config.processRoles.contains(ControllerRole)) {
186-
OptionalInt.of(config.nodeId)
187-
} else {
188-
OptionalInt.empty()
189-
}
183+
val nodeId = OptionalInt.of(config.nodeId)
190184

191185
val client = new KafkaRaftClient(
192186
recordSerde,

core/src/main/scala/kafka/server/BrokerServer.scala

+11-4
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ import org.apache.kafka.raft.RaftConfig.AddressSpec
5050
import org.apache.kafka.raft.{RaftClient, RaftConfig}
5151
import org.apache.kafka.server.authorizer.Authorizer
5252
import org.apache.kafka.server.common.ApiMessageAndVersion
53+
import org.apache.kafka.server.fault.FaultHandler
5354
import org.apache.kafka.server.metrics.KafkaYammerMetrics
5455
import org.apache.kafka.snapshot.SnapshotWriter
5556

@@ -76,9 +77,13 @@ class BrokerServer(
7677
val raftManager: RaftManager[ApiMessageAndVersion],
7778
val time: Time,
7879
val metrics: Metrics,
80+
val brokerMetrics: BrokerServerMetrics,
7981
val threadNamePrefix: Option[String],
8082
val initialOfflineDirs: Seq[String],
81-
val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]]
83+
val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]],
84+
val fatalFaultHandler: FaultHandler,
85+
val metadataLoadingFaultHandler: FaultHandler,
86+
val metadataPublishingFaultHandler: FaultHandler
8287
) extends KafkaBroker {
8388

8489
override def brokerState: BrokerState = Option(lifecycleManager).
@@ -315,8 +320,8 @@ class BrokerServer(
315320
threadNamePrefix,
316321
config.metadataSnapshotMaxNewRecordBytes,
317322
metadataSnapshotter,
318-
BrokerServerMetrics(metrics)
319-
)
323+
brokerMetrics,
324+
metadataLoadingFaultHandler)
320325

321326
val networkListeners = new ListenerCollection()
322327
config.effectiveAdvertisedListeners.foreach { ep =>
@@ -432,7 +437,9 @@ class BrokerServer(
432437
transactionCoordinator,
433438
clientQuotaMetadataManager,
434439
dynamicConfigHandlers.toMap,
435-
authorizer)
440+
authorizer,
441+
fatalFaultHandler,
442+
metadataPublishingFaultHandler)
436443

437444
// Tell the metadata listener to start publishing its output, and wait for the first
438445
// publish operation to complete. This first operation will initialize logManager,

core/src/main/scala/kafka/server/ControllerServer.scala

+3-2
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ import org.apache.kafka.common.security.scram.internals.ScramMechanism
3737
import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
3838
import org.apache.kafka.common.utils.{LogContext, Time}
3939
import org.apache.kafka.common.{ClusterResource, Endpoint}
40-
import org.apache.kafka.controller.{BootstrapMetadata, Controller, QuorumController, QuorumControllerMetrics, QuorumFeatures}
40+
import org.apache.kafka.controller.{BootstrapMetadata, Controller, ControllerMetrics, QuorumController, QuorumFeatures}
4141
import org.apache.kafka.metadata.KafkaConfigSchema
4242
import org.apache.kafka.raft.RaftConfig
4343
import org.apache.kafka.raft.RaftConfig.AddressSpec
@@ -61,6 +61,7 @@ class ControllerServer(
6161
val raftManager: RaftManager[ApiMessageAndVersion],
6262
val time: Time,
6363
val metrics: Metrics,
64+
val controllerMetrics: ControllerMetrics,
6465
val threadNamePrefix: Option[String],
6566
val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]],
6667
val configSchema: KafkaConfigSchema,
@@ -201,7 +202,7 @@ class ControllerServer(
201202
setSnapshotMaxNewRecordBytes(config.metadataSnapshotMaxNewRecordBytes).
202203
setLeaderImbalanceCheckIntervalNs(leaderImbalanceCheckIntervalNs).
203204
setMaxIdleIntervalNs(maxIdleIntervalNs).
204-
setMetrics(new QuorumControllerMetrics(KafkaYammerMetrics.defaultRegistry(), time)).
205+
setMetrics(controllerMetrics).
205206
setCreateTopicPolicy(createTopicPolicy.asJava).
206207
setAlterConfigPolicy(alterConfigPolicy.asJava).
207208
setConfigurationValidator(new ControllerConfigurationValidator()).

0 commit comments

Comments
 (0)