You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Originally posted by linus-learns April 27, 2023
Hi,
I'm running a setup with @KafkaListener annotated consumers using a ConcurrentKafkaListenerContainerFactory setup with asyncAcks and AckMode.MANUAL. I have tried to implement a CommonErrorHandler for my setup with isAckAfterHandle set to true and seeksAfterHandling set to false, but the bookkeeping of the deferred acks does not work, as the the ack after the error handler has been invoked and the broken record handled is not done through the ackInOrder method. This causes the consumer to hang if a poison pill appears in the middle of the batch and higher offset commits are waiting in the ListenerConsumer.
It appears that the ack for the poison pill is never commited until the consumer get's the shutdown signal, as a rebalance causes the broken record to be fetched by the new partition owner but a restart resolves the issue.
Is this a bug, or should I use a different way of handling my errors given my setup?
The text was updated successfully, but these errors were encountered:
Resolvesspring-projects#2677
When an error handler handles an error (and `ackAfterHandle` is true),
the ack bypassed the out of order commit logic, causing the consumer
to be paused indefinitely, due to the missing ack.
**cherry-pick to 2.9.x**
Resolves#2677
When an error handler handles an error (and `ackAfterHandle` is true),
the ack bypassed the out of order commit logic, causing the consumer
to be paused indefinitely, due to the missing ack.
**cherry-pick to 2.9.x**
Resolves#2677
When an error handler handles an error (and `ackAfterHandle` is true),
the ack bypassed the out of order commit logic, causing the consumer
to be paused indefinitely, due to the missing ack.
**cherry-pick to 2.9.x**
# Conflicts:
# spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java
garyrussell
changed the title
CommonErrorHandler for asyncAcks and AckMode.MANUAL
Fix CommonErrorHandler for asyncAcks and AckMode.MANUAL
May 15, 2023
Discussed in #2668
Originally posted by linus-learns April 27, 2023
Hi,
I'm running a setup with
@KafkaListener
annotated consumers using aConcurrentKafkaListenerContainerFactory
setup withasyncAcks
andAckMode.MANUAL
. I have tried to implement aCommonErrorHandler
for my setup withisAckAfterHandle
set totrue
andseeksAfterHandling
set tofalse
, but the bookkeeping of the deferred acks does not work, as the the ack after the error handler has been invoked and the broken record handled is not done through theackInOrder
method. This causes the consumer to hang if a poison pill appears in the middle of the batch and higher offset commits are waiting in theListenerConsumer
.It appears that the ack for the poison pill is never commited until the consumer get's the shutdown signal, as a rebalance causes the broken record to be fetched by the new partition owner but a restart resolves the issue.
Is this a bug, or should I use a different way of handling my errors given my setup?
The text was updated successfully, but these errors were encountered: