Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spring Integration - Can not store SQS message in Redis #3625

Closed
DPetrunov opened this issue Sep 7, 2021 · 3 comments
Closed

Spring Integration - Can not store SQS message in Redis #3625

DPetrunov opened this issue Sep 7, 2021 · 3 comments
Labels
status: invalid Not reproducable or not relevant to the current state of the project

Comments

@DPetrunov
Copy link

DPetrunov commented Sep 7, 2021

In what version(s) of Spring Integration are you seeing this issue?

springBootVersion=2.5.3
springCloudVersion=2020.0.3
springCloudAWSVersion=2.2.6.RELEASE
springSessionCoreVersion=2.5.1
springIntegrationVersion=5.2.2

Describe the bug
I'm not sure if this is a bug for the Spring Cloud team or the Spring Integration team but here goes...

I'm reading off of an SQS queue and as soon as I get a message I send it to a Spring Integration Channel which is backed by a Redis Channel Message Store. Ever since upgrading to Spring 5.2.2 I'm getting an error.

It seems like the problem is the Message Header serializer.

This is the config I've tried:

   @Bean("messageTemplate")
    public RedisTemplate<String, Message<?>> redisMessageTemplate() {

        final RedisTemplate<String, Message<?>> template = new RedisTemplate<>();

        template.setKeySerializer(new StringRedisSerializer());
        template.setHashKeySerializer(new StringRedisSerializer());
        template.setConnectionFactory(jedisConnectionFactory());

        ObjectMapper mapper = JacksonJsonUtils.messagingAwareMapper();
        mapper.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);

        GenericJackson2JsonRedisSerializer serializer = new GenericJackson2JsonRedisSerializer(mapper);

        template.setValueSerializer(serializer);

        return template;
    }

which only seems to affect the value serializer, but not the message header serializer. Is there a way to disable the FAIL_ON_EMPTY_BEANS property in the MessageHeadersJacksonSerializer's object mapper?

2021-09-07 13:42:57.562 ERROR 6 --- [nerContainer-64] o.s.c.a.m.listener.QueueMessageHandler   : An exception occurred while invoking the handler method
org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'orderCreatedChannel'; nested exception is java.lang.IllegalArgumentException: If relying on the default RedisSerializer (JdkSerializationRedisSerializer) the Object must be Serializable. Either make it Serializable or provide your own implementation of RedisSerializer via 'setValueSerializer(..)'
	at org.springframework.integration.support.utils.IntegrationUtils.wrapInDeliveryExceptionIfNecessary(IntegrationUtils.java:167)
	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:339)
	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
	at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
	at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
	at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
	at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
	at org.springframework.integration.router.AbstractMessageRouter.doSend(AbstractMessageRouter.java:213)
	at org.springframework.integration.router.AbstractMessageRouter.handleMessageInternal(AbstractMessageRouter.java:195)
	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
	at com.urbanise.asyncapi.integration.handlers.plaza.PlazaMessageHandler.handleMessageInternal(PlazaMessageHandler.java:22)
	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
	at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
	at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
	at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
	at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
	at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
	at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
	at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
	at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
	at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:454)
	at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:324)
	at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:267)
	at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:231)
	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:142)
	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
	at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
	at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
	at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
	at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
	at com.urbanise.asyncapi.sqs.PlazaInboundSQSListener.listenInbound(PlazaInboundSQSListener.java:46)
	at jdk.internal.reflect.GeneratedMethodAccessor561.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.base/java.lang.reflect.Method.invoke(Unknown Source)
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171)
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
	at org.springframework.messaging.handler.invocation.AbstractMethodMessageHandler.handleMatch(AbstractMethodMessageHandler.java:565)
	at org.springframework.messaging.handler.invocation.AbstractMethodMessageHandler.handleMessageInternal(AbstractMethodMessageHandler.java:520)
	at org.springframework.messaging.handler.invocation.AbstractMethodMessageHandler.handleMessage(AbstractMethodMessageHandler.java:454)
	at org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer.executeMessage(SimpleMessageListenerContainer.java:228)
	at org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer$MessageExecutor.run(SimpleMessageListenerContainer.java:418)
	at org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer$SignalExecutingRunnable.run(SimpleMessageListenerContainer.java:310)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.lang.IllegalArgumentException: If relying on the default RedisSerializer (JdkSerializationRedisSerializer) the Object must be Serializable. Either make it Serializable or provide your own implementation of RedisSerializer via 'setValueSerializer(..)'
	at org.springframework.integration.redis.store.RedisMessageStore.rethrowAsIllegalArgumentException(RedisMessageStore.java:188)
	at org.springframework.integration.redis.store.RedisMessageStore.doStoreIfAbsent(RedisMessageStore.java:128)
	at org.springframework.integration.store.AbstractKeyValueMessageStore.doAddMessage(AbstractKeyValueMessageStore.java:146)
	at org.springframework.integration.store.AbstractKeyValueMessageStore.addMessagesToGroup(AbstractKeyValueMessageStore.java:214)
	at org.springframework.integration.store.AbstractMessageGroupStore.addMessageToGroup(AbstractMessageGroupStore.java:189)
	at org.springframework.integration.store.MessageGroupQueue.doOffer(MessageGroupQueue.java:371)
	at org.springframework.integration.store.MessageGroupQueue.put(MessageGroupQueue.java:308)
	at org.springframework.integration.store.MessageGroupQueue.put(MessageGroupQueue.java:51)
	at org.springframework.integration.channel.QueueChannel.doSend(QueueChannel.java:120)
	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
	... 47 common frames omitted
Caused by: org.springframework.data.redis.serializer.SerializationException: Could not write JSON: No serializer found for class org.springframework.cloud.aws.messaging.listener.QueueMessageVisibility and no properties discovered to create BeanSerializer (to avoid exception, disable SerializationFeature.FAIL_ON_EMPTY_BEANS) (through reference chain: org.springframework.integration.store.MessageHolder["message"]->org.springframework.messaging.support.GenericMessage["headers"]->java.util.HashMap["Visibility"]); nested exception is com.fasterxml.jackson.databind.exc.InvalidDefinitionException: No serializer found for class org.springframework.cloud.aws.messaging.listener.QueueMessageVisibility and no properties discovered to create BeanSerializer (to avoid exception, disable SerializationFeature.FAIL_ON_EMPTY_BEANS) (through reference chain: org.springframework.integration.store.MessageHolder["message"]->org.springframework.messaging.support.GenericMessage["headers"]->java.util.HashMap["Visibility"])
	at org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer.serialize(GenericJackson2JsonRedisSerializer.java:120)
	at org.springframework.data.redis.core.AbstractOperations.rawValue(AbstractOperations.java:127)
	at org.springframework.data.redis.core.DefaultValueOperations.setIfAbsent(DefaultValueOperations.java:295)
	at org.springframework.data.redis.core.DefaultBoundValueOperations.setIfAbsent(DefaultBoundValueOperations.java:149)
	at org.springframework.integration.redis.store.RedisMessageStore.doStoreIfAbsent(RedisMessageStore.java:121)
	... 55 common frames omitted
Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: No serializer found for class org.springframework.cloud.aws.messaging.listener.QueueMessageVisibility and no properties discovered to create BeanSerializer (to avoid exception, disable SerializationFeature.FAIL_ON_EMPTY_BEANS) (through reference chain: org.springframework.integration.store.MessageHolder["message"]->org.springframework.messaging.support.GenericMessage["headers"]->java.util.HashMap["Visibility"])
	at com.fasterxml.jackson.databind.exc.InvalidDefinitionException.from(InvalidDefinitionException.java:77)
	at com.fasterxml.jackson.databind.SerializerProvider.reportBadDefinition(SerializerProvider.java:1276)
	at com.fasterxml.jackson.databind.DatabindContext.reportBadDefinition(DatabindContext.java:400)
	at com.fasterxml.jackson.databind.ser.impl.UnknownSerializer.failForEmpty(UnknownSerializer.java:71)
	at com.fasterxml.jackson.databind.ser.impl.UnknownSerializer.serializeWithType(UnknownSerializer.java:45)
	at com.fasterxml.jackson.databind.ser.std.MapSerializer.serializeTypedFields(MapSerializer.java:1027)
	at com.fasterxml.jackson.databind.ser.std.MapSerializer.serializeFields(MapSerializer.java:779)
	at com.fasterxml.jackson.databind.ser.std.MapSerializer.serializeWithoutTypeInfo(MapSerializer.java:764)
	at com.fasterxml.jackson.databind.ser.std.MapSerializer.serializeWithType(MapSerializer.java:733)
	at com.fasterxml.jackson.databind.ser.std.MapSerializer.serializeWithType(MapSerializer.java:35)
	at com.fasterxml.jackson.databind.ser.impl.TypeWrappedSerializer.serialize(TypeWrappedSerializer.java:32)
	at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider._serialize(DefaultSerializerProvider.java:480)
	at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:319)
	at com.fasterxml.jackson.databind.ObjectMapper.writeValue(ObjectMapper.java:3126)
	at com.fasterxml.jackson.core.base.GeneratorBase.writeObject(GeneratorBase.java:388)
	at org.springframework.integration.support.json.MessageHeadersJacksonSerializer.serialize(MessageHeadersJacksonSerializer.java:56)
	at org.springframework.integration.support.json.MessageHeadersJacksonSerializer.serializeWithType(MessageHeadersJacksonSerializer.java:51)
	at org.springframework.integration.support.json.MessageHeadersJacksonSerializer.serializeWithType(MessageHeadersJacksonSerializer.java:40)
	at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:730)
	at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:770)
	at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeWithType(BeanSerializerBase.java:655)
	at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:730)
	at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:770)
	at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeWithType(BeanSerializerBase.java:655)
	at com.fasterxml.jackson.databind.ser.impl.TypeWrappedSerializer.serialize(TypeWrappedSerializer.java:32)
	at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider._serialize(DefaultSerializerProvider.java:480)
	at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:319)
	at com.fasterxml.jackson.databind.ObjectMapper._writeValueAndClose(ObjectMapper.java:4487)
	at com.fasterxml.jackson.databind.ObjectMapper.writeValueAsBytes(ObjectMapper.java:3765)
	at org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer.serialize(GenericJackson2JsonRedisSerializer.java:118)
	... 59 common frames omitted
@DPetrunov DPetrunov added status: waiting-for-triage The issue need to be evaluated and its future decided type: bug labels Sep 7, 2021
@artembilan
Copy link
Member

I'm reading off of an SQS queue and as soon as I get a message I send it to a Spring Integration Channel which is backed by a Redis Channel Message Store.

That's strange: what is the point to read message from SQS if you are going to store it into Redis?
You might revise your logic for something more robust and without Redis in between.

Anyway it looks like a QueueMessageVisibility is not intended for serialization at all.
You may consider to exclude such a header before storing to Redis.
See Message Filter endpoint for that purpose: https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#filter.

I don't treat as a bug since it is already not a RedisMessageStore (or even its serializer) responsibility to deal with non-serializable objects.

The MessageHeadersJacksonSerializer is a Jackson's StdSerializer it has nothing to do a top-level RedisSerializer.
Have no idea why your mapper.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS); is not propagated down to Jackson's internals...

at com.fasterxml.jackson.databind.ObjectMapper.writeValue(ObjectMapper.java:3126)

Does tell us that provided serializer config is in use. Therefore it us out of my knowledge why at com.fasterxml.jackson.databind.ser.impl.UnknownSerializer.failForEmpty(UnknownSerializer.java:71).

@artembilan artembilan added status: waiting-for-reporter Needs a feedback from the reporter and removed status: waiting-for-triage The issue need to be evaluated and its future decided labels Sep 7, 2021
@artembilan
Copy link
Member

I probably miss something from your explanation.
Here is a simple unit test to play with:

	@Test
	public void testMessagingAwareMapperForSerializationFeature() throws JsonProcessingException {
		ObjectMapper mapper = JacksonJsonUtils.messagingAwareMapper();
//		mapper.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);

		String result =
				mapper.writeValueAsString(
						MessageBuilder.withPayload("test")
								.setHeader("nonSerializableHeader",
										new QueueMessageVisibility(mock(AmazonSQSAsync.class), "test", "test"))
								.build());

		System.out.println(result);
	}

With that commented out line I get this error which looks exactly what you show:

com.fasterxml.jackson.databind.exc.InvalidDefinitionException: No serializer found for class io.awspring.cloud.messaging.listener.QueueMessageVisibility and no properties discovered to create BeanSerializer (to avoid exception, disable SerializationFeature.FAIL_ON_EMPTY_BEANS) (through reference chain: org.springframework.messaging.support.GenericMessage["headers"]->java.util.HashMap["nonSerializableHeader"])

	at com.fasterxml.jackson.databind.exc.InvalidDefinitionException.from(InvalidDefinitionException.java:77)
	at com.fasterxml.jackson.databind.SerializerProvider.reportBadDefinition(SerializerProvider.java:1276)
	at com.fasterxml.jackson.databind.DatabindContext.reportBadDefinition(DatabindContext.java:400)
	at com.fasterxml.jackson.databind.ser.impl.UnknownSerializer.failForEmpty(UnknownSerializer.java:71)
	at com.fasterxml.jackson.databind.ser.impl.UnknownSerializer.serializeWithType(UnknownSerializer.java:45)
	at com.fasterxml.jackson.databind.ser.std.MapSerializer.serializeTypedFields(MapSerializer.java:1027)
	at com.fasterxml.jackson.databind.ser.std.MapSerializer.serializeFields(MapSerializer.java:779)
	at com.fasterxml.jackson.databind.ser.std.MapSerializer.serializeWithoutTypeInfo(MapSerializer.java:764)
	at com.fasterxml.jackson.databind.ser.std.MapSerializer.serializeWithType(MapSerializer.java:733)
	at com.fasterxml.jackson.databind.ser.std.MapSerializer.serializeWithType(MapSerializer.java:35)

If I uncoment that line, it is fully different and passing:

{"@class":"org.springframework.messaging.support.GenericMessage","payload":"test","headers":{"@class":"java.util.HashMap","id":["java.util.UUID","dcdfb210-3f64-3488-9baa-f440dd3c0fad"],"nonSerializableHeader":{"@class":"io.awspring.cloud.messaging.listener.QueueMessageVisibility"},"timestamp":["java.lang.Long",1631136265324]}}

So, it does work as expected, although we see that QueueMessageVisibility is only serialized to its class name nothing more.
However this still says to me that your error is somehow not relevant to the config your show before.

Would you mind, please, revising your solution to figure out that we are really on the same page?

@artembilan
Copy link
Member

Closed as Invalid without reporter response.

@artembilan artembilan closed this as not planned Won't fix, can't repro, duplicate, stale Sep 8, 2022
@artembilan artembilan added status: invalid Not reproducable or not relevant to the current state of the project and removed status: waiting-for-reporter Needs a feedback from the reporter labels Sep 8, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status: invalid Not reproducable or not relevant to the current state of the project
Projects
None yet
Development

No branches or pull requests

2 participants