Unable to repoll failed message from JDBC when aggregate is used in the integration flow - Stack Overflow

My current workflow has 2 integration flows-First Integration Flow- Polls files from SFTP -> Message

My current workflow has 2 integration flows-

  1. First Integration Flow- Polls files from SFTP -> Message Handler -> sends messages to JdbcChannelMessageStore and saves those messages in Jdbc under Group1.

  2. Second Integration Flow - polls messages from Group1 and passes the messages to aggregate. The aggregate uses TimeoutCountSequenceSizeReleaseStrategy to release messages as a group based on threshold/timeout provided to release strategy. The aggregate then has output processor that processes the aggregated messages(Group of messages).

If there are 2 messages that are in JDBC under group1 and when second integration flow polls these 2 messages, and if some exception occurs in output processor of aggregate, the repoll happens but only second message is being repolled but not the first message and first message is being removed from the jdbc.

First Integration Flow-

.handle("returns messages as File Info Objects")
.channel(c -> c.queue(jdbcChannelMessageStore, "Group1"))

Second Integration flow-

return IntegrationFlow.from(() -> (Message) jdbcChannelMessageStore.pollMessageFromGroup("Group1"), e -> e.poller(Pollers.fixedDelay(180000).transactional()))
.aggregate(a -> a.correlationStrategy(m -> 1).releaseStrategy(new TimeoutCountSequenceSizeReleaseStrategy(10, 60000)).expireGroupsUponCompletion(true).outputprocessor("Processes the aggregated messages")

I want to make sure that if something fails in my second integration flow, the poller should be able to repoll all the messages that have failed. Lets say, the poller polled 10 messages from the jdbc under group1 and if something fails in aggregate/outputprocessor then all those 10 messages should be repolled from the jdbc. If the second integration flow successfully processess these 10 messages then they can be removed from Jdbc.

My current workflow has 2 integration flows-

  1. First Integration Flow- Polls files from SFTP -> Message Handler -> sends messages to JdbcChannelMessageStore and saves those messages in Jdbc under Group1.

  2. Second Integration Flow - polls messages from Group1 and passes the messages to aggregate. The aggregate uses TimeoutCountSequenceSizeReleaseStrategy to release messages as a group based on threshold/timeout provided to release strategy. The aggregate then has output processor that processes the aggregated messages(Group of messages).

If there are 2 messages that are in JDBC under group1 and when second integration flow polls these 2 messages, and if some exception occurs in output processor of aggregate, the repoll happens but only second message is being repolled but not the first message and first message is being removed from the jdbc.

First Integration Flow-

.handle("returns messages as File Info Objects")
.channel(c -> c.queue(jdbcChannelMessageStore, "Group1"))

Second Integration flow-

return IntegrationFlow.from(() -> (Message) jdbcChannelMessageStore.pollMessageFromGroup("Group1"), e -> e.poller(Pollers.fixedDelay(180000).transactional()))
.aggregate(a -> a.correlationStrategy(m -> 1).releaseStrategy(new TimeoutCountSequenceSizeReleaseStrategy(10, 60000)).expireGroupsUponCompletion(true).outputprocessor("Processes the aggregated messages")

I want to make sure that if something fails in my second integration flow, the poller should be able to repoll all the messages that have failed. Lets say, the poller polled 10 messages from the jdbc under group1 and if something fails in aggregate/outputprocessor then all those 10 messages should be repolled from the jdbc. If the second integration flow successfully processess these 10 messages then they can be removed from Jdbc.

Share Improve this question asked Nov 20, 2024 at 6:09 MuskMusk 11 bronze badge
Add a comment  | 

1 Answer 1

Reset to default 0

That's not how messaging works. The unit of work in messaging is exactly a message. So, when you poll for one message, it must not effect the rest of messages. And the first message has been processed already: pulled from the queue and placed into an aggregator. So, we really have done already with that messages.

It sounds more like you would like to have a unit of work as a batch of messages. That's exactly what the aggregator is there for. So, you might need to think to place a persistent queue after an aggregator. This way you still would deal with one message as a unit of work, in messaging requirements, but the payload of that message would your whole batch.

There is no reason to do jdbcChannelMessageStore.pollMessageFromGroup() manually. That is exactly what is that QueueChannel there for. So, you need to extract your c.queue(jdbcChannelMessageStore, "Group1") into a bean and use that bean from the second flow. However you might rethink your solution where you would place messages from the first flow directly to the aggregator. That's exactly where grouping/batching happens.

发布者:admin,转转请注明出处:http://www.yc00.com/questions/1742378270a4432626.html

相关推荐

发表回复

评论列表(0条)

  • 暂无评论

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信