-
Notifications
You must be signed in to change notification settings - Fork 14.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-13959: Controller should unfence Broker with busy metadata log #12274
Conversation
@jsancio This is what I find about this problem, this solution is just temporary, let's first ensure the reason. |
@dengziming , I like this idea to fix the issue. And you dig deeper than I did. Nice finding. But in this case, I think we should increase the But let's wait for the jenkins results to see if it really fixes the issue. (I think your investigation is correct) |
@dengziming Thanks for the investigation. One idea I was considering is only letting observers fetch up to the high watermark. Then the records would be returned to brokers as they become committed. Would that address this issue (at least when controllers are not colocated with brokers)? |
@hachikuji Not sure did I understand correctly, the problem is that observer HW is always less than leader HW because the observer needs to send one more request to update HW and the request will wait an extra 500ms before response, and after 500ms, the leader will append a |
@showuon Neither changing |
@dengziming The difference is that advancing the high watermark would then be the trigger for fulfilling the Fetch. If the observer does not have the latest high watermark, then it means data is available. |
@hachikuji I saw what you mean, just change but I also tried a similar solution in my PR that can solve the issue, which is to return directly if fetchPartition.fetchOffset() >= highWatermark. |
@dengziming Yeah, I think that's the other option. If we keep track of the last sent high watermark for each replica, then we can fulfill the fetch immediately whenever there is a high watermark change to propagate. There are some tradeoffs in terms of latency and the frequency of fetches, but perhaps metadata changes are infrequent enough and it does not make a big difference. I do think we should aim for a solution which is not super sensitive to timing-related configurations. |
Yes, I think this is the best solution under current design, although it will break the purpose of |
@hachikuji In this PR I tried your suggestion and it does solve this problem, however, this will make the logic in RaftClient very complex and we need to save more states in here is the code change: https://github.com/dengziming/kafka/tree/KAFKA-13959-2, @showuon @hachikuji @jsancio WDYT? |
Hey @dengziming, I took at look at the commits in this tree. Is this the only commit dengziming@79dc8ec? Can you maybe share a diff/compare. For example, something like dengziming/kafka@30216ea...KAFKA-13959-2 |
Never mind. I understand now. The broker sends the active controller the local LEO instead of the last applied offset by the broker listener. I think this will unfence the broker at startup even if the broker hasn't applied the snapshot or any of the log records, right? |
@dengziming Do you have a diff for this solution? I am interested in this solution as it would work in both REMOTE and COLOCATED configuration for KRaft. |
Currently, we will replay the metadata records when metadata listener got new records. So yes, if we just return the current LEO, the records/snapshots might have not applied, yet. Sorry, it's easy to reject other's proposal, but difficult to come up another solution. If we don't have any other better solution, maybe we can try the original proposed one?
And again, thanks for keeping trying to fix this difficult issue, @dengziming ! |
@jsancio I havn't flesh it, the basic idea is shown in this PR, only read up to
@showuon This solution can only take effect if the problem is that the heartbeat period is much bigger than NoOpRecord, it may not work now since the main problem is from |
I see. This would not work for co-located Kafka servers, right? Co-located Kafka server are servers that are running both a controller and a broker. In that case the replica will read uncommitted data and the leader will not send a FETCH response when the HW changes. |
@jsancio you are right, this is why I'm trying to find a simpler solution, for example, letting the broker send the local LEO instead of the last applied offset, this is the logic how we expand topic-partition ISR, the problem is that the broker may haven't applied the LEO as you mentioned. maybe we should allow the lastAppliedOffset to be behind by A records (or time) to unfence the broker. |
@dengziming Here are my thoughts Improve logic for unfencing brokersHow about the controller unfence the broker when the broker's high-watermark has reached the broker registration record for that broker? When the broker first registers for a given incarnation, the controller writes a broker registration record. The controller can remember this offset as returned by the raft client. The controller can unfence the broker when the broker's high-watermark is greater than this registration offset. Propagate the HWM to the replicas as quickly as possible.I think that the solution above would allow us to de-prioritize this. Here are my observations anyways. Looking at the KafkaRaftClient implementation we would have to have an index for both the fetch offset and the last sent high-watermark for that replica. Another issue here is that we changed the KafkaRaftManager so that it doesn't set the replica id when it is an observer/broker. Since the HWM is not part of the Fetch request the leader would have to keep track of this in the LeaderState
We would need to find a better solution for https://issues.apache.org/jira/browse/KAFKA-13168 or improve the FETCH request so that it includes the HWM. |
@jsancio Thank you, here is my feedback.
This is a good idea, however, the offset of each registration record is a soft state(not persist to metadata log) and will be lost during leader change, so we should move this judgment to the broker side, to be clear, let the broker unfence itself when seeing registration record of itself.
We already have a Jira for it: https://issues.apache.org/jira/browse/KAFKA-13986 when working on KIP-836. |
@dengziming The offset is persistent along with the record. This offset is provided to the inactive controllers in handleCommit. When processing handleCommit the inactive controllers can use the |
@jsancio I think the last question is that there may be more than one registration record for each broker after restarting it, so can we rely on the broker epoch? I think I need some time to check the logic before make a final decision. |
Yes. I think you can rely on broker epoch and broker id. Also the active controller is guaranteed to have read all of the records on the log before handling RPCs like heartbeat. |
d791c44
to
81ac913
Compare
@dengziming I marked this issue as a blocker to 3.3.0. Let me know if you want to work on this issue. I don't mind. |
@jsancio Thank you for reminding me, I'm working on this now, the solution is not difficult but it's trying to figure out how to test it. |
Hello @jsancio , I want to call for the first round review of this PR. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for working on this issue @dengziming. It is important to get this fixed for 3.3.0.
Can you please update the description of the PR to match what is implemented here.
@@ -759,7 +759,7 @@ public void run() throws Exception { | |||
int i = 1; | |||
for (ApiMessageAndVersion message : result.records()) { | |||
try { | |||
replay(message.message(), Optional.empty()); | |||
replay(message.message(), Optional.empty(), writeOffset + i); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's write a comment explaining why the controller needs to do this. I wonder if this should be writeOffset + result.records().size()
to make it consistent with the code in handleCommit
. As you mentioned earlier, this is okay as long as the offset passed is equal or larger than the actual offset.
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for pointing out this.
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
Outdated
Show resolved
Hide resolved
@@ -1305,12 +1305,11 @@ private void handleFeatureControlChange() { | |||
} | |||
} | |||
|
|||
@SuppressWarnings("unchecked") | |||
private void replay(ApiMessage message, Optional<OffsetAndEpoch> snapshotId) { | |||
private void replay(ApiMessage message, Optional<OffsetAndEpoch> snapshotId, long offset) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's document this offset
argument since it now important and it is used when replaying registration broker records.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
* this is not necessarily the exact offset of each broker registration record | ||
* but should not be smaller than it. | ||
*/ | ||
private final TimelineHashMap<Integer, Long> registerBrokerRecordOffsets; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay. Did you consider adding this offset to BrokerRegistration
. ClusterControlManager
already has a mapping of broker id to BrokerRegistration
that gets snapshotted in-memory.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, all fields in BrokerRegistration
are hard states which means they will all be persisted in RegisterBrokerRecord
, so I prefer to leave it in a separate field
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
Outdated
Show resolved
Hide resolved
throw new RuntimeException( | ||
String.format("Receive a heartbeat from broker %d before registration", brokerId)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would argue that this means that the client/broker sent a heartbeat request without sending a register broker request first. Unfortunately, KIP-631 only documents NOT_CONTROLLER
as a possible error code. Looking at the broker code, the broker retries all errors. I say we return an INVALID_REQUEST
instead of UNKNOWN_SERVER_ERROR
.
Also, by throwing a RuntimeException
, this will force the active controller to resign
. See QuorumController#handleEventException
. We don't want to do this in this case. For example, this code can instead throw a InvalidRequestException
and ControlerApis
will convert that to an Errors
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, I find we have a similar inspection at ClusterControlManager.checkBrokerEpoch
where we return StaleBrokerEpochException
if BrokerRegistration is null, I also return a StaleBrokerEpochException
here to make it consistent.
5bc7023
to
5f26c66
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your comments @jsancio , I have updated the description and fixed the problems.
* this is not necessarily the exact offset of each broker registration record | ||
* but should not be smaller than it. | ||
*/ | ||
private final TimelineHashMap<Integer, Long> registerBrokerRecordOffsets; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, all fields in BrokerRegistration
are hard states which means they will all be persisted in RegisterBrokerRecord
, so I prefer to leave it in a separate field
@@ -759,7 +759,7 @@ public void run() throws Exception { | |||
int i = 1; | |||
for (ApiMessageAndVersion message : result.records()) { | |||
try { | |||
replay(message.message(), Optional.empty()); | |||
replay(message.message(), Optional.empty(), writeOffset + i); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for pointing out this.
@@ -1305,12 +1305,11 @@ private void handleFeatureControlChange() { | |||
} | |||
} | |||
|
|||
@SuppressWarnings("unchecked") | |||
private void replay(ApiMessage message, Optional<OffsetAndEpoch> snapshotId) { | |||
private void replay(ApiMessage message, Optional<OffsetAndEpoch> snapshotId, long offset) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
throw new RuntimeException( | ||
String.format("Receive a heartbeat from broker %d before registration", brokerId)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, I find we have a similar inspection at ClusterControlManager.checkBrokerEpoch
where we return StaleBrokerEpochException
if BrokerRegistration is null, I also return a StaleBrokerEpochException
here to make it consistent.
@dengziming take a look at the tests failures. All of the failures in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @dengziming. It is looking good. It should be ready to merge once we get a green build.
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
Show resolved
Hide resolved
method.invoke(target, record, 0L); | ||
} catch (NoSuchMethodException i) { | ||
// ignore | ||
} catch (InvocationTargetException i) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have duplicated this pattern 3 times. I think we can remove this duplication if we add a try
for the entire method. E.g. starts a line 57 and ends after the for
loop.
@@ -56,7 +56,7 @@ | |||
CompletableFuture<Long> await(T threshold, long maxWaitTimeMs); | |||
|
|||
/** | |||
* Complete awaiting futures whose associated values are larger than the given threshold value. | |||
* Complete awaiting futures whose associated values are smaller than the given threshold value. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this fix. I think the phrase "associated values" is still misleading. How about:
Complete awaiting futures whose threshold value from {@link await} is smaller than the given threshold value.
Thank you for your good suggestions, @jsancio , I have resolved them. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I love the brilliant solution! Thanks @dengziming and @jsancio !
Please remember to update the solution in JIRA, for future reference! Thank you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Build failed with test errors. They seem unrelated:
[Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest.testOffsetSyncsTopicsOnTarget()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12274/9/testReport/junit/org.apache.kafka.connect.mirror.integration/IdentityReplicationIntegrationTest/Build___JDK_8_and_Scala_2_12___testOffsetSyncsTopicsOnTarget__/)
[Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testReplication()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12274/9/testReport/junit/org.apache.kafka.connect.mirror.integration/MirrorConnectorsIntegrationBaseTest/Build___JDK_8_and_Scala_2_12___testReplication__/)
[Build / JDK 8 and Scala 2.12 / kafka.server.KRaftClusterTest.testIncrementalAlterConfigs()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12274/9/testReport/junit/kafka.server/KRaftClusterTest/Build___JDK_8_and_Scala_2_12___testIncrementalAlterConfigs__/)
[Build / JDK 11 and Scala 2.13 / org.apache.kafka.controller.QuorumControllerTest.testNoOpRecordWriteAfterTimeout()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12274/9/testReport/junit/org.apache.kafka.controller/QuorumControllerTest/Build___JDK_11_and_Scala_2_13___testNoOpRecordWriteAfterTimeout__/)
…12274) The reason for KAFKA-13959 is a little complex, the two keys to this problem are: KafkaRaftClient.MAX_FETCH_WAIT_MS==MetadataMaxIdleIntervalMs == 500ms. We rely on fetchPurgatory to complete a FetchRequest, in details, if FetchRequest.fetchOffset >= log.endOffset, we will wait for 500ms to send a FetchResponse. The follower needs to send one more FetchRequest to get the HW. Here are the event sequences: 1. When starting the leader(active controller) LEO=m+1(m is the offset of the last record), leader HW=m(because we need more than half of the voters to reach m+1) 2. Follower (standby controller) and observer (broker) send FetchRequest(fetchOffset=m) 2.1. leader receives FetchRequest, set leader HW=m and waits 500ms before send FetchResponse 2.2. leader send FetchResponse(HW=m) 3.3 broker receive FetchResponse(HW=m), set metadataOffset=m. 3. Leader append NoOpRecord, LEO=m+2. leader HW=m 4. Looping 1-4 If we change MAX_FETCH_WAIT_MS=200 (less than half of MetadataMaxIdleIntervalMs), this problem can be solved temporarily. We plan to improve this problem in 2 ways, firstly, in this PR, we change the controller to unfence a broker when the broker's high-watermark has reached the broker registration record for that broker. Secondly, we will propagate the HWM to the replicas as quickly as possible in KAFKA-14145. Reviewers: Luke Chen <showuon@gmail.com>, José Armando García Sancio <jsancio@users.noreply.github.com>
* apache-github/trunk: (447 commits) KAFKA-13959: Controller should unfence Broker with busy metadata log (apache#12274) KAFKA-10199: Expose read only task from state updater (apache#12497) KAFKA-14154; Return NOT_CONTROLLER from AlterPartition if leader is ahead of controller (apache#12506) KAFKA-13986; Brokers should include node.id in fetches to metadata quorum (apache#12498) KAFKA-14163; Retry compilation after zinc compile cache error (apache#12507) Remove duplicate common.message.* from clients:test jar file (apache#12407) KAFKA-13060: Replace EasyMock and PowerMock with Mockito in WorkerGroupMemberTest.java (apache#12484) Fix the rate window size calculation for edge cases (apache#12184) MINOR: Upgrade gradle to 7.5.1 and bump other build/test dependencies (apache#12495) KAFKA-14140: Ensure an offline or in-controlled-shutdown replica is not eligible to join ISR in ZK mode (apache#12487) KAFKA-14114: Add Metadata Error Related Metrics MINOR: BrokerMetadataSnapshotter must avoid exceeding batch size (apache#12486) MINOR: Upgrade mockito test dependencies (apache#12460) KAFKA-14144:; Compare AlterPartition LeaderAndIsr before fencing partition epoch (apache#12489) KAFKA-14134: Replace EasyMock with Mockito for WorkerConnectorTest (apache#12472) MINOR: Update scala version in bin scripts to 2.13.8 (apache#12477) KAFKA-14104; Add CRC validation when iterating over Metadata Log Records (apache#12457) MINOR: add :server-common test dependency to :storage (apache#12488) KAFKA-14107: Upgrade Jetty version for CVE fixes (apache#12440) KAFKA-14124: improve quorum controller fault handling (apache#12447) ...
More detailed description of your change
The reason for KAFKA-13959 is a little complex, the two keys to this problem are:
KafkaRaftClient.MAX_FETCH_WAIT_MS==MetadataMaxIdleIntervalMs == 500ms
. We rely on fetchPurgatory to complete a FetchRequest, in details, ifFetchRequest.fetchOffset >= log.endOffset
, we will wait for 500ms to send a FetchResponse.Here are the event sequences:
3.1. leader receives FetchRequest, set leader HW=m and waits 500ms before send FetchResponse
3.2. leader send FetchResponse(HW=m)
3.3 broker receive FetchResponse(HW=m), set metadataOffset=m.
NoOpRecord
, LEO=m+2. leader HW=mIf we change MAX_FETCH_WAIT_MS=200(less than half of MetadataMaxIdleIntervalMs), this problem can be solved temporarily.
We plan to improve this problem in 2 ways, firstly, in this PR, we change the controller to unfence a broker when the broker's high-watermark has reached the broker registration record for that broker. Secondly, we will propagate the HWM to the replicas as quickly as possible in KAFKA-14145.
Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.
Committer Checklist (excluded from commit message)