-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Transactional support in PostgresSubscribableChannel #8582
Comments
Right. That was my big concern if we have to keep an async hand-off for target subscribers. I don't think that transaction has to be mandatory: like the same poller against There is also a flaw in your Do I miss anything? |
The reasons for a separate ExecutorService is to avoid starvation for the thread that is receiving notifications. If you think that is not a problem, a single ExecutorService is simpler. The @scheduled was for the POC; I was thinking of implementing the retries with a ThreadPoolTaskScheduler and a periodic task (the trigger can be customizable). The advantage over the RetryTemplate is that this will also address the problem of missed notifications from postgresql. On a channel that does not have a lot of activity, a missed notify will result in a message sitting in the store until the next notification event. Having a process that runs periodically puts an upper limit on latency. You're right about the transaction boundaries, they should be per message and not per loop. |
If we add a scheduled task to this channel implementation , then it is not going to be too much difference with a |
@artembilan could you take a look at the PR please ? I have added another (local) latch in the start method as I was running into a race condition in tests between the subscriber being ready to receive notifications and notifications being emitted. |
Fixes #8582 * Introduce a `PostgresSubscribableChannel.setTransactionManager()` to wrap a message polling and dispatching operation into a transaction * In addition add a `RetryTemplate` support around transaction attempts **Cherry-pick to `6.0.x`**
Expected Behavior
Opening an issue as discussed on stackoverflow Spring Integration PostgresSubscribableChannel is not transactional.
The expectation is that if there is an exception in the downstream the message returns to the underlying message store.
Current Behavior
When replacing current QueueChannel + Transactional Polling flow with a PostgresSubscribableChannel the behavior of the flow changes, since messages are now lost if there is an exception in the downstream
Context
I have made the following modifications as a POC to get the desired behavior. This does change the semantics somewhat, as before each message (for a subscriber) was dispatched on a separate thread and now all the messages for (a subscriber) are emitted sequentially on the same thread
PostgresChannelMessageTableSubscriber
notificationThreadPoolSize
(defaults to 1)subscriptionExecutorService
in thestop
methodPostgresSubscribableChannel
this.transactionTemplate = new TransactionTemplate(platformTransactionManager);
errorOnNotify
notifyUpdate
method@Scheduled
method to retry whenever there are errors ( to be replaced by aTaskScheduler
with a customizablePeriodicTrigger
)The text was updated successfully, but these errors were encountered: