Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

consumer disconnect during join -- hang #2615

Closed
5 tasks done
keith-chew opened this issue Nov 8, 2019 · 5 comments
Closed
5 tasks done

consumer disconnect during join -- hang #2615

keith-chew opened this issue Nov 8, 2019 · 5 comments

Comments

@keith-chew
Copy link

Description

I have 2 consumer clients, and during a graceful shutdown, they will disconnect from the broker. The issue comes when the first client disconnects, the second client receives a heartbeat failure (broker is rebalancing) and rejoins. If the second client disconnects during this process, the disconnect callback never gets called, leaving the client in a hang state.

How to reproduce

I created a test harness with 2 clients and on every 30s, it will disconnect and connect with the broker. This issue can be reproduced within 30-60 minutes of running the harness.

Checklist

  • librdkafka version (release number or git tag): 1.1.0
  • Apache Kafka version: 2.12
  • librdkafka client configuration: default values
  • Operating system: rhel7
  • Provide logs (with debug=.. as necessary) from librdkafka
Nov 09 08:00:13 [1936] {"severity":7,"fac":"SEND","message":"[thrd:GroupCoordinator]: GroupCoordinator/2: Sent HeartbeatRequest (v0, 179 bytes @ 0, CorrId 30)"}
Nov 09 08:00:13 [1936] {"severity":7,"fac":"RECV","message":"[thrd:GroupCoordinator]: GroupCoordinator/2: Received HeartbeatResponse (v0, 2 bytes, CorrId 30, rtt 31.02ms)"}
Nov 09 08:00:13 [1936] {"severity":7,"fac":"REQERR","message":"[thrd:main]: GroupCoordinator/2: HeartbeatRequest failed: Broker: Group rebalance in progress: actions Permanent"}
Nov 09 08:00:13 [1936] {"severity":7,"fac":"SEND","message":"[thrd:GroupCoordinator]: GroupCoordinator/2: Sent JoinGroupRequest (v2, 310 bytes @ 0, CorrId 31)"}] 
Nov 09 08:00:13 [1936] {"severity":7,"fac":"RECV","message":"[thrd:broker01:9092/bootstrap]: broker01:9092/1: Received FetchResponse (v4, 105 bytes, CorrId 496, rtt 130.45ms)"}] 
Nov 09 08:00:13 [1936] {"severity":7,"fac":"RECV","message":"[thrd:broker02:9092/bootstrap]: broker02:9092/2: Received FetchResponse (v4, 105 bytes, CorrId 493, rtt 131.69ms)"}] 
Nov 09 08:00:13 [1936] {"severity":7,"fac":"RECV","message":"[thrd:GroupCoordinator]: GroupCoordinator/2: Received JoinGroupResponse (v2, 314 bytes, CorrId 31, rtt 30.07ms)"}] 
Nov 09 08:00:13 [1936] {"severity":7,"fac":"SEND","message":"[thrd:GroupCoordinator]: GroupCoordinator/2: Sent MetadataRequest (v2, 94 bytes @ 0, CorrId 32)"}] 
Nov 09 08:00:13 [1936] {"severity":7,"fac":"RECV","message":"[thrd:broker03:9092/bootstrap]: broker03:9092/3: Received FetchResponse (v4, 105 bytes, CorrId 495, rtt 131.71ms)"}] 
Nov 09 08:00:13 [1936] {"severity":7,"fac":"RECV","message":"[thrd:GroupCoordinator]: GroupCoordinator/2: Received MetadataResponse (v2, 748 bytes, CorrId 32, rtt 31.30ms)"}] 
Nov 09 08:00:13 [1936] {"severity":7,"fac":"SEND","message":"[thrd:GroupCoordinator]: GroupCoordinator/2: Sent SyncGroupRequest (v0, 364 bytes @ 0, CorrId 33)"}] 
Nov 09 08:00:13 [1936] called disconnect()
Nov 09 08:00:13 [1936] {"severity":7,"fac":"RECV","message":"[thrd:GroupCoordinator]: GroupCoordinator/2: Received SyncGroupResponse (v0, 101 bytes, CorrId 33, rtt 39.92ms)"}] 
Nov 09 08:00:13 [1936] rebalance {"message":"Local: Assign partitions","code":-175,"errno":-175,"origin":"kafka"} assignment [{"topic":"mytopic","partition":0},{"topic":"mytopic","partition":1},{"topic":"mytopic","partition":2},{"topic":"mytopic","partition":3},{"topic":"mytopic","partition":4},{"topic":"mytopic","partition":5},{"topic":"mytopic","partition":6},{"topic":"mytopic","partition":7},{"topic":"mytopic","partition":8},{"topic":"mytopic","partition":9},{"topic":"mytopic","partition":10},{"topic":"mytopic","partition":11}] 
Nov 09 08:00:13 [1936] newPartitions [0,1,2,3,4,5,6,7,8,9,10,11] oldPartitions [6,7,8,9,10,11]

From the logs (of second consumer) above, it can be seen that it is rejoining on a rebalance (after first consumer disconnected). After this point, the consumer never gets a callback on the disconnect, and still thinks it is connected. Any calls to consume messages returns KafkaConsumer is not connected.

Note: I am using node-rdkafka, but looking at the source, it is calling the consumer's close() method in librdkafka.

I can track this further if required, just need some guidance on which part of the code to focus on.

@keith-chew keith-chew changed the title disconnect during join -- hang consumer disconnect during join -- hang Nov 8, 2019
@keith-chew
Copy link
Author

Just repeating the test on version 1.2.1 and it is stable so far. Looking at the changelog, I wonder if this bug has been fixed here:

7af5c3d

Will report if the hang happens, otherwise I will close this issue once it is stable after 24hours or running.

@keith-chew
Copy link
Author

Unfortunately, this condition also happens in v1.2.1. Will enable debug "protocol" and post logs here soon.

@keith-chew
Copy link
Author

Logs for v1.2.1, looks like same problem, disconnect called during message consumption, then heartbeat failure caused a rejoin, disconnect callback never gets called:

Nov 09 14:22:36 [64816] [MAIN] called disconnect
Nov 09 14:22:36 [64816] [MAIN] {"severity":7,"fac":"RECV","message":"[thrd:broker3:9092/bootstrap]: broker3:9092/3: Received FetchResponse (v4, 1825 bytes, CorrId 717, rtt 43.74ms)"}
Nov 09 14:22:36 [64816] [MAIN] {"severity":7,"fac":"SEND","message":"[thrd:broker3:9092/bootstrap]: broker3:9092/3: Sent FetchRequest (v4, 126 bytes @ 0, CorrId 718)"}
Nov 09 14:22:36 [64816] [MAIN] {"severity":7,"fac":"RECV","message":"[thrd:broker3:9092/bootstrap]: broker3:9092/3: Received FetchResponse (v4, 2740 bytes, CorrId 718, rtt 21.79ms)"}
Nov 09 14:22:36 [64816] [MAIN] {"severity":7,"fac":"SEND","message":"[thrd:broker3:9092/bootstrap]: broker3:9092/3: Sent FetchRequest (v4, 126 bytes @ 0, CorrId 719)"}
Nov 09 14:22:36 [64816] [MAIN] {"severity":7,"fac":"RECV","message":"[thrd:broker2:9092/bootstrap]: broker2:9092/2: Received FetchResponse (v4, 1877 bytes, CorrId 708, rtt 46.29ms)"}
Nov 09 14:22:36 [64816] [MAIN] {"severity":7,"fac":"SEND","message":"[thrd:broker2:9092/bootstrap]: broker2:9092/2: Sent FetchRequest (v4, 126 bytes @ 0, CorrId 709)"}
Nov 09 14:22:36 [64816] [MAIN] {"severity":7,"fac":"RECV","message":"[thrd:broker3:9092/bootstrap]: broker3:9092/3: Received FetchResponse (v4, 1805 bytes, CorrId 719, rtt 37.81ms)"}
Nov 09 14:22:36 [64816] [MAIN] {"severity":7,"fac":"SEND","message":"[thrd:broker3:9092/bootstrap]: broker3:9092/3: Sent FetchRequest (v4, 126 bytes @ 0, CorrId 720)"}
Nov 09 14:22:36 [64816] [MAIN] {"severity":7,"fac":"RECV","message":"[thrd:broker1:9092/bootstrap]: broker1:9092/1: Received FetchResponse (v4, 7210 bytes, CorrId 1035, rtt 75.52ms)"}
Nov 09 14:22:36 [64816] [MAIN] {"severity":7,"fac":"SEND","message":"[thrd:broker1:9092/bootstrap]: broker1:9092/1: Sent FetchRequest (v4, 142 bytes @ 0, CorrId 1036)"}
Nov 09 14:22:36 [64816] [MAIN] {"severity":7,"fac":"RECV","message":"[thrd:broker1:9092/bootstrap]: broker1:9092/1: Received FetchResponse (v4, 2715 bytes, CorrId 1036, rtt 14.70ms)"}
Nov 09 14:22:36 [64816] [MAIN] {"severity":7,"fac":"SEND","message":"[thrd:broker1:9092/bootstrap]: broker1:9092/1: Sent FetchRequest (v4, 142 bytes @ 0, CorrId 1037)"}
Nov 09 14:22:36 [64816] [MAIN] {"severity":7,"fac":"RECV","message":"[thrd:broker3:9092/bootstrap]: broker3:9092/3: Received FetchResponse (v4, 4361 bytes, CorrId 720, rtt 26.87ms)"}
Nov 09 14:22:36 [64816] [MAIN] {"severity":7,"fac":"SEND","message":"[thrd:broker3:9092/bootstrap]: broker3:9092/3: Sent FetchRequest (v4, 126 bytes @ 0, CorrId 721)"}
Nov 09 14:22:36 [64816] [MAIN] {"severity":7,"fac":"RECV","message":"[thrd:broker1:9092/bootstrap]: broker1:9092/1: Received FetchResponse (v4, 3758 bytes, CorrId 1037, rtt 32.97ms)"}
Nov 09 14:22:36 [64816] [MAIN] {"severity":7,"fac":"SEND","message":"[thrd:broker1:9092/bootstrap]: broker1:9092/1: Sent FetchRequest (v4, 142 bytes @ 0, CorrId 1038)"}
Nov 09 14:22:36 [64816] [MAIN] {"severity":7,"fac":"RECV","message":"[thrd:broker2:9092/bootstrap]: broker2:9092/2: Received FetchResponse (v4, 976 bytes, CorrId 709, rtt 95.91ms)"}
Nov 09 14:22:36 [64816] [MAIN] {"severity":7,"fac":"SEND","message":"[thrd:broker2:9092/bootstrap]: broker2:9092/2: Sent FetchRequest (v4, 126 bytes @ 0, CorrId 710)"}
Nov 09 14:22:36 [64816] [MAIN] {"severity":7,"fac":"RECV","message":"[thrd:broker1:9092/bootstrap]: broker1:9092/1: Received FetchResponse (v4, 6420 bytes, CorrId 1038, rtt 39.29ms)"}
Nov 09 14:22:36 [64816] [MAIN] {"severity":7,"fac":"SEND","message":"[thrd:broker1:9092/bootstrap]: broker1:9092/1: Sent FetchRequest (v4, 142 bytes @ 0, CorrId 1039)"}
Nov 09 14:22:36 [64816] [MAIN] {"severity":7,"fac":"RECV","message":"[thrd:broker3:9092/bootstrap]: broker3:9092/3: Received FetchResponse (v4, 5177 bytes, CorrId 721, rtt 100.37ms)"}
Nov 09 14:22:36 [64816] [MAIN] {"severity":7,"fac":"SEND","message":"[thrd:broker3:9092/bootstrap]: broker3:9092/3: Sent FetchRequest (v4, 126 bytes @ 0, CorrId 722)"}
Nov 09 14:22:36 [64816] [MAIN] {"severity":7,"fac":"SEND","message":"[thrd:GroupCoordinator]: GroupCoordinator/2: Sent HeartbeatRequest (v0, 179 bytes @ 0, CorrId 40)"}
Nov 09 14:22:36 [64816] [MAIN] {"severity":7,"fac":"RECV","message":"[thrd:GroupCoordinator]: GroupCoordinator/2: Received HeartbeatResponse (v0, 2 bytes, CorrId 40, rtt 2.69ms)"}
Nov 09 14:22:36 [64816] [MAIN] {"severity":7,"fac":"REQERR","message":"[thrd:main]: GroupCoordinator/2: HeartbeatRequest failed: Broker: Group rebalance in progress: actions Permanent"}
Nov 09 14:22:36 [64816] [MAIN] {"message":"Local: Revoke partitions","code":-174,"errno":-174,"origin":"kafka"} [{"topic":"mytopic","partition":8},{"topic":"mytopic","partition":9},{"topic":"mytopic","partition":10},{"topic":"mytopic","partition":11}]
Nov 09 14:22:36 [64816] [MAIN] {"severity":7,"fac":"SEND","message":"[thrd:GroupCoordinator]: GroupCoordinator/2: Sent JoinGroupRequest (v2, 298 bytes @ 0, CorrId 41)"}
Nov 09 14:22:36 [64816] [MAIN] {"severity":7,"fac":"RECV","message":"[thrd:broker2:9092/bootstrap]: broker2:9092/2: Received FetchResponse (v4, 69 bytes, CorrId 710, rtt 109.34ms)"}
Nov 09 14:22:36 [64816] [MAIN] {"severity":7,"fac":"RECV","message":"[thrd:broker3:9092/bootstrap]: broker3:9092/3: Received FetchResponse (v4, 967 bytes, CorrId 722, rtt 51.96ms)"}] 
Nov 09 14:22:36 [64816] [MAIN] {"severity":7,"fac":"RECV","message":"[thrd:broker1:9092/bootstrap]: broker1:9092/1: Received FetchResponse (v4, 99 bytes, CorrId 1039, rtt 103.32ms)"}] 
Nov 09 14:22:37 [64816] [MAIN] {"severity":7,"fac":"RECV","message":"[thrd:GroupCoordinator]: GroupCoordinator/2: Received JoinGroupResponse (v2, 311 bytes, CorrId 41, rtt 508.54ms)"}] 
Nov 09 14:22:37 [64816] [MAIN] {"severity":7,"fac":"SEND","message":"[thrd:GroupCoordinator]: GroupCoordinator/2: Sent MetadataRequest (v2, 89 bytes @ 0, CorrId 42)"}] 
Nov 09 14:22:37 [64816] [MAIN] {"severity":7,"fac":"RECV","message":"[thrd:GroupCoordinator]: GroupCoordinator/2: Received MetadataResponse (v2, 742 bytes, CorrId 42, rtt 2.81ms)"}] 
Nov 09 14:22:37 [64816] [MAIN] {"severity":7,"fac":"SEND","message":"[thrd:GroupCoordinator]: GroupCoordinator/2: Sent SyncGroupRequest (v0, 359 bytes @ 0, CorrId 43)"}] 
Nov 09 14:22:37 [64816] [MAIN] {"severity":7,"fac":"RECV","message":"[thrd:GroupCoordinator]: GroupCoordinator/2: Received SyncGroupResponse (v0, 95 bytes, CorrId 43, rtt 51.56ms)"}] 
Nov 09 14:22:37 [64816] [MAIN] {"message":"Local: Assign partitions","code":-175,"errno":-175,"origin":"kafka"} [{"topic":"mytopic","partition":0},{"topic":"mytopic","partition":1},{"topic":"mytopic","partition":2},{"topic":"mytopic","partition":3},{"topic":"mytopic","partition":4},{"topic":"mytopic","partition":5},{"topic":"mytopic","partition":6},{"topic":"mytopic","partition":7},{"topic":"mytopic","partition":8},{"topic":"mytopic","partition":9},{"topic":"mytopic","partition":10},{"topic":"mytopic","partition":11}]
Nov 09 14:22:38 [64816] [MAIN] failed to consume kafka messages

@keith-chew
Copy link
Author

After reading up on old issue tickets, i found issue 196 in node-rdkafka. I can confirm that without specifying rebalance_cb, the test has been running fine for the past 6hrs. So it appears that the hang issue still exists when rebalance_cb is used:
Blizzard/node-rdkafka#196

@edenhill
Copy link
Contributor

Could you reproduce this with the upcoming v1.4.0 release (wait two weeks or grab v1.4.0-RC4 or librdkafka master) and set debug=cgrp,broker?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants