Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exception when trying to send a message to EventHub that was received as part of a batch #26213

Closed
3 tasks
avpines opened this issue Dec 30, 2021 · 4 comments · Fixed by #26291
Closed
3 tasks
Assignees
Labels
azure-spring All azure-spring related issues Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. issue-addressed Workflow: The Azure SDK team believes it to be addressed and ready to close. question The issue doesn't require a change to the product in order to be resolved. Most issues start as that

Comments

@avpines
Copy link

avpines commented Dec 30, 2021

Describe the bug
I have a simple Spring Cloud Stream function that receives a batch and attempts to send it to the next broker.

  @Bean
  Function<List<SimpleEvent>, List<SimpleEvent>> transform() {
    return events -> events;
  }

When increasing the batch size, I'm receiving the following error:

2021-12-30 08:33:29.272 ERROR 52999 --- [ition-pump-21-2] c.a.m.eventhubs.PartitionPumpManager     : Error in event processing callback
2021-12-30 08:33:29.275 ERROR 52999 --- [ition-pump-21-2] reactor.core.scheduler.Schedulers        : Scheduler worker in group main failed with an uncaught exception

com.azure.messaging.eventhubs.implementation.PartitionProcessorException: Error in event processing callback
	at com.azure.messaging.eventhubs.PartitionPumpManager.processEvents(PartitionPumpManager.java:322) ~[azure-messaging-eventhubs-5.10.3.jar:5.10.3]
	at com.azure.messaging.eventhubs.PartitionPumpManager.lambda$startPartitionPump$2(PartitionPumpManager.java:235) ~[azure-messaging-eventhubs-5.10.3.jar:5.10.3]
	at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160) ~[reactor-core-3.4.12.jar:3.4.12]
	at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:440) ~[reactor-core-3.4.12.jar:3.4.12]
	at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:527) ~[reactor-core-3.4.12.jar:3.4.12]
	at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84) ~[reactor-core-3.4.12.jar:3.4.12]
	at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37) ~[reactor-core-3.4.12.jar:3.4.12]
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
	at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: org.springframework.messaging.MessageHandlingException: error occurred in message handler [com.azure.spring.integration.handler.DefaultMessageHandler@19c3e94b]; nested exception is java.lang.IllegalArgumentException: Partition key '[1141721982, 272811229, 1192168138, 188728110, 1267345383, 891811572, 1030040218, 1323489010, 1670508649, 200424939, 55583285, 416098991, 748384464, 1353334283, 1991638060, 906871757, 1482912043, 734267693, 1510402728, 1700376981, 662910500, 41762433, 2081852485, 2036969893, 1148467063, 1021186209, 76884744, 1872104433, 1058978944, 1622071530, 52936468, 1907283028, 1000737878, 1714011771, 1660979284, 245534648, 1951992137, 1634734677, 806778705, 1954106688, 1936642666, 1786826001, 1212384973, 442811150, 737031376, 605072605, 1204234139, 327781196, 1075207579, 340529247, 1655202220, 1374698916, 1605807322, 998488439, 1684423693, 1552583765, 683079331, 1236453630, 1430153106, 640778528, 1361990084, 85445280, 1906984615, 1455812745, 1881683799, 615496362, 1909534724, 890969115, 91453427, 1683504276, 1887971212, 288419651, 808164948, 871827378, 712063512, 516137872, 1935422505, 64197088, 24844386, 860516711, 1542540496, 236875908, 315489088, 1506085356, 1917549066, 2010901992, 1753125054, 449272744, 1448256307, 443027845, 1333932949, 1931279284, 1899982026, 572506175, 878535779, 1398514720, 2102685402, 1670179990, 2047312282, 973212085]' exceeds the maximum allowed length: '128'.
	at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191) ~[spring-integration-core-5.5.6.jar:5.5.6]
	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:65) ~[spring-integration-core-5.5.6.jar:5.5.6]
	at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:1074) ~[spring-cloud-stream-3.2.1.jar:3.2.1]
	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56) ~[spring-integration-core-5.5.6.jar:5.5.6]
	at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) ~[spring-integration-core-5.5.6.jar:5.5.6]
	at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133) ~[spring-integration-core-5.5.6.jar:5.5.6]
	at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106) ~[spring-integration-core-5.5.6.jar:5.5.6]
	at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72) ~[spring-integration-core-5.5.6.jar:5.5.6]
	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317) ~[spring-integration-core-5.5.6.jar:5.5.6]
	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272) ~[spring-integration-core-5.5.6.jar:5.5.6]
	at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-5.3.13.jar:5.3.13]
	at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-5.3.13.jar:5.3.13]
	at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.3.13.jar:5.3.13]
	at org.springframework.messaging.core.AbstractDestinationResolvingMessagingTemplate.send(AbstractDestinationResolvingMessagingTemplate.java:72) ~[spring-messaging-5.3.13.jar:5.3.13]
	at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1.doSendMessage(FunctionConfiguration.java:614) ~[spring-cloud-stream-3.2.1.jar:3.2.1]
	at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1.handleMessageInternal(FunctionConfiguration.java:597) ~[spring-cloud-stream-3.2.1.jar:3.2.1]
	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56) ~[spring-integration-core-5.5.6.jar:5.5.6]
	at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) ~[spring-integration-core-5.5.6.jar:5.5.6]
	at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133) ~[spring-integration-core-5.5.6.jar:5.5.6]
	at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106) ~[spring-integration-core-5.5.6.jar:5.5.6]
	at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72) ~[spring-integration-core-5.5.6.jar:5.5.6]
	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317) ~[spring-integration-core-5.5.6.jar:5.5.6]
	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272) ~[spring-integration-core-5.5.6.jar:5.5.6]
	at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-5.3.13.jar:5.3.13]
	at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-5.3.13.jar:5.3.13]
	at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.3.13.jar:5.3.13]
	at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-5.3.13.jar:5.3.13]
	at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:208) ~[spring-integration-core-5.5.6.jar:5.5.6]
	at com.azure.spring.integration.eventhubs.inbound.EventHubsInboundChannelAdapter.access$600(EventHubsInboundChannelAdapter.java:45) ~[spring-integration-azure-eventhubs-4.0.0-beta.3.jar:4.0.0-beta.3]
	at com.azure.spring.integration.eventhubs.inbound.EventHubsInboundChannelAdapter$IntegrationBatchEventProcessingListener.onEventBatch(EventHubsInboundChannelAdapter.java:272) ~[spring-integration-azure-eventhubs-4.0.0-beta.3.jar:4.0.0-beta.3]
	at com.azure.messaging.eventhubs.EventProcessorClientBuilder$1.processEventBatch(EventProcessorClientBuilder.java:652) ~[azure-messaging-eventhubs-5.10.3.jar:5.10.3]
	at com.azure.messaging.eventhubs.PartitionPumpManager.processEvents(PartitionPumpManager.java:306) ~[azure-messaging-eventhubs-5.10.3.jar:5.10.3]
	... 11 common frames omitted
Caused by: java.lang.IllegalArgumentException: Partition key '[1141721982, 272811229, 1192168138, 188728110, 1267345383, 891811572, 1030040218, 1323489010, 1670508649, 200424939, 55583285, 416098991, 748384464, 1353334283, 1991638060, 906871757, 1482912043, 734267693, 1510402728, 1700376981, 662910500, 41762433, 2081852485, 2036969893, 1148467063, 1021186209, 76884744, 1872104433, 1058978944, 1622071530, 52936468, 1907283028, 1000737878, 1714011771, 1660979284, 245534648, 1951992137, 1634734677, 806778705, 1954106688, 1936642666, 1786826001, 1212384973, 442811150, 737031376, 605072605, 1204234139, 327781196, 1075207579, 340529247, 1655202220, 1374698916, 1605807322, 998488439, 1684423693, 1552583765, 683079331, 1236453630, 1430153106, 640778528, 1361990084, 85445280, 1906984615, 1455812745, 1881683799, 615496362, 1909534724, 890969115, 91453427, 1683504276, 1887971212, 288419651, 808164948, 871827378, 712063512, 516137872, 1935422505, 64197088, 24844386, 860516711, 1542540496, 236875908, 315489088, 1506085356, 1917549066, 2010901992, 1753125054, 449272744, 1448256307, 443027845, 1333932949, 1931279284, 1899982026, 572506175, 878535779, 1398514720, 2102685402, 1670179990, 2047312282, 973212085]' exceeds the maximum allowed length: '128'.
	at com.azure.messaging.eventhubs.EventHubProducerAsyncClient.createBatch(EventHubProducerAsyncClient.java:310) ~[azure-messaging-eventhubs-5.10.3.jar:5.10.3]
	at com.azure.spring.eventhubs.core.EventHubsTemplate.doSend(EventHubsTemplate.java:61) ~[spring-messaging-azure-eventhubs-4.0.0-beta.3.jar:4.0.0-beta.3]
	at com.azure.spring.eventhubs.core.EventHubsTemplate.sendAsync(EventHubsTemplate.java:48) ~[spring-messaging-azure-eventhubs-4.0.0-beta.3.jar:4.0.0-beta.3]
	at com.azure.spring.eventhubs.core.EventHubsTemplate.sendAsync(EventHubsTemplate.java:53) ~[spring-messaging-azure-eventhubs-4.0.0-beta.3.jar:4.0.0-beta.3]
	at com.azure.spring.integration.handler.DefaultMessageHandler.handleMessageInternal(DefaultMessageHandler.java:78) ~[spring-integration-azure-core-4.0.0-beta.3.jar:4.0.0-beta.3]
	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56) ~[spring-integration-core-5.5.6.jar:5.5.6]
	... 41 common frames omitted

My application.yaml configuration:

spring:
  application:
    name: eventhub-batch-transformer
  cloud:
    function:
      definition: transform
    stream:
      eventhubs:
        bindings:
          transform-in-0:
            consumer:
              track-last-enqueued-event-properties: true
              batch:
                max-size: 100
              checkpoint:
                mode: BATCH
      binders:
        my-default-binder:
          type: eventhubs
          default-candidate: true
          environment:
            spring:
              cloud:
                azure:
                  eventhubs:
                    connection-string: "***"
                    processor:
                      checkpoint-store:
                        client:
                          maximum-connection-pool-size: 500
                        create-container-if-not-exists: true
                        container-name: "***"
                        account-name: "***"
                        account-key: "***"
      bindings:
        transform-in-0:
          destination: af-in-32
          group: $Default
          consumer:
            batch-mode: true
        transform-out-0:
          destination: af-in

Setup (please complete the following information):

  • Library/Libraries: 4.0.0-beta
  • Java version: 11

If you suspect a dependency version mismatch (e.g. you see NoClassDefFoundError, NoSuchMethodError or similar), please check out Troubleshoot dependency version conflict article first. If it doesn't provide solution for the problem, please provide:

  • verbose dependency tree (mvn dependency:tree -Dverbose)
  • exception message, full stack trace, and any available logs

Information Checklist
Kindly make sure that you have added all the following information above and checkoff the required fields otherwise we will treat the issuer as an incomplete report

  • Bug Description Added
  • Repro Steps Added
  • Setup information Added
@ghost ghost added needs-triage Workflow: This is a new issue that needs to be triaged to the appropriate team. customer-reported Issues that are reported by GitHub users external to the Azure organization. question The issue doesn't require a change to the product in order to be resolved. Most issues start as that labels Dec 30, 2021
@saragluna saragluna added azure-spring All azure-spring related issues Client This issue points to a problem in the data-plane of the library. labels Dec 30, 2021
@ghost ghost removed the needs-triage Workflow: This is a new issue that needs to be triaged to the appropriate team. label Dec 30, 2021
@saragluna saragluna added this to the Spring Cloud Azure 4.0 GA milestone Dec 30, 2021
@yiliuTo
Copy link
Member

yiliuTo commented Jan 5, 2022

Hi @avpines , thanks for reporting this. However, while transforming messages, Spring Cloud Steam reserves the original message headers which causes this issue. Thus we recommand that you could use the ChannelInterceptor to remove the header of partition key before sending messages.

For example, on the basis of your configuraiton, your could add a ChannelInterceptor for channel transform-out-0 like below:

@Service
@GlobalChannelInterceptor(patterns={"transform-out-0"})
public class OutputChannelInterceptor implements ChannelInterceptor {
    @Override
    public Message<?> preSend(Message<?> message, MessageChannel channel) {
        MessageHeaderAccessor mutableAccessor = MessageHeaderAccessor.getMutableAccessor(message);
        mutableAccessor.removeHeader(EventHubsHeaders.PARTITION_KEY);
        return new GenericMessage<>(message.getPayload(), mutableAccessor.getMessageHeaders());
    }
}

@yiliuTo
Copy link
Member

yiliuTo commented Jan 11, 2022

Hi @avpines , we fix this issue in #26291 by excluding some unnecessary message headers when sending events to Event Hubs, which include headers converted from batch messages. Please expect our next release for this bug fix.

@yiliuTo yiliuTo added the issue-addressed Workflow: The Azure SDK team believes it to be addressed and ready to close. label Jan 11, 2022
@ghost
Copy link

ghost commented Jan 11, 2022

Hi @avpines. Thank you for opening this issue and giving us the opportunity to assist. We believe that this has been addressed. If you feel that further discussion is needed, please add a comment with the text “/unresolve” to remove the “issue-addressed” label and continue the conversation.

@ghost
Copy link

ghost commented Jan 28, 2022

Hi @avpines, since you haven’t asked that we “/unresolve” the issue, we’ll close this out. If you believe further discussion is needed, please add a comment “/unresolve” to reopen the issue.

@ghost ghost closed this as completed Jan 28, 2022
@saragluna saragluna moved this to Todo in Spring Cloud Azure Apr 6, 2022
@chenrujun chenrujun moved this from Todo to Done in Spring Cloud Azure Apr 6, 2022
@github-actions github-actions bot locked and limited conversation to collaborators Apr 11, 2023
This issue was closed.
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
azure-spring All azure-spring related issues Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. issue-addressed Workflow: The Azure SDK team believes it to be addressed and ready to close. question The issue doesn't require a change to the product in order to be resolved. Most issues start as that
Projects
Archived in project
Development

Successfully merging a pull request may close this issue.

3 participants