Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transactional support in PostgresSubscribableChannel #8582

Closed
igorlovich opened this issue Mar 24, 2023 · 4 comments
Closed

Transactional support in PostgresSubscribableChannel #8582

igorlovich opened this issue Mar 24, 2023 · 4 comments

Comments

@igorlovich
Copy link
Contributor

igorlovich commented Mar 24, 2023

Expected Behavior

Opening an issue as discussed on stackoverflow Spring Integration PostgresSubscribableChannel is not transactional.

The expectation is that if there is an exception in the downstream the message returns to the underlying message store.

Current Behavior

When replacing current QueueChannel + Transactional Polling flow with a PostgresSubscribableChannel the behavior of the flow changes, since messages are now lost if there is an exception in the downstream

Context

I have made the following modifications as a POC to get the desired behavior. This does change the semantics somewhat, as before each message (for a subscriber) was dispatched on a separate thread and now all the messages for (a subscriber) are emitted sequentially on the same thread

PostgresChannelMessageTableSubscriber

  1. Add another executor service that will be used for notification dispatch, initialized in the start method with notificationThreadPoolSize (defaults to 1)
if (subscriptionExecutorService == null) {
    CustomizableThreadFactory threadFactory =
            new CustomizableThreadFactory("postgres-channel-dispatch-");
    threadFactory.setDaemon(true);
    subscriptionExecutorService = Executors.newFixedThreadPool(notificationThreadPoolSize);
}
  1. Modify the subscriber notifications to happen asynchronously
subscriptions.forEach( it-> subscriptionExecutorService.submit(it::notifyUpdate));
  1. Add a method shutDownAndAwaitTermination as per ExecutorService javadoc with a 5 second timeout for graceful termination. Use this method to shutdown the subscriptionExecutorService in the stop method

PostgresSubscribableChannel

  1. Inject a PlatformTransactionManager in the constructor and create a TransactionTemplate
    this.transactionTemplate = new TransactionTemplate(platformTransactionManager);
  2. Remove the Executor from the UnicastDispatcher (and remove the method that allows setting the Executor).
  3. Add an AtomicBoolean field errorOnNotify
  4. Modify the notifyUpdate method
@Override
public void notifyUpdate() {
    try {
        transactionTemplate.executeWithoutResult(it -> {
            Message<?> message;
            while ((message = this.jdbcChannelMessageStore.pollMessageFromGroup(this.groupId)) != null) {
                this.dispatcher.dispatch(message);
            }
        });
    } catch (Exception e) {
        logger.error(e, "Exception during message dispatch");
        errorOnNotify.set(true);
    }
}
  1. Add a @Scheduled method to retry whenever there are errors ( to be replaced by a TaskScheduler with a customizable PeriodicTrigger)
@Scheduled(fixedDelay = 10000)
public void retryOnErrors() {
    if (errorOnNotify.compareAndSet(true, false)) {
        notifyUpdate();
    }
}
@igorlovich igorlovich added status: waiting-for-triage The issue need to be evaluated and its future decided type: enhancement labels Mar 24, 2023
@artembilan
Copy link
Member

Right. That was my big concern if we have to keep an async hand-off for target subscribers.
So, yeah, I probably OK with the subscriptions.forEach( it-> subscriptionExecutorService.submit(it::notifyUpdate));, but I believe we can re-use the same Executor in that PostgresChannelMessageTableSubscriber.
No need in a new one.

I don't think that transaction has to be mandatory: like the same poller against QueueChannel is not transactional by default.
So, a setter would be better.
I'd prefer to deal with a RetryTemplate instead of @Scheduled.

There is also a flaw in your executeWithoutResult callback: you wrap the whole loop into a transaction.
However messages must not effect each other, so it is better to have a transaction per message.
With your current solution we may deliver 9 messages from the table correctly, but fail on the last one.
So, entire polling transaction is going to be rolled back causing re-deliver for already processed messages in the next transaction.

Do I miss anything?

@artembilan artembilan added status: waiting-for-reporter Needs a feedback from the reporter and removed status: waiting-for-triage The issue need to be evaluated and its future decided labels Mar 24, 2023
@igorlovich
Copy link
Contributor Author

The reasons for a separate ExecutorService is to avoid starvation for the thread that is receiving notifications. If you think that is not a problem, a single ExecutorService is simpler.

The @scheduled was for the POC; I was thinking of implementing the retries with a ThreadPoolTaskScheduler and a periodic task (the trigger can be customizable). The advantage over the RetryTemplate is that this will also address the problem of missed notifications from postgresql. On a channel that does not have a lot of activity, a missed notify will result in a message sitting in the store until the next notification event. Having a process that runs periodically puts an upper limit on latency.

You're right about the transaction boundaries, they should be per message and not per loop.

@artembilan
Copy link
Member

If we add a scheduled task to this channel implementation , then it is not going to be too much difference with a QueueChannel and polling consumer . And that is exactly why I was a bit skeptic about this Postgres feature implementation originally.
we probably can come back to an executor in the channel , but do that already before a dispatcher: directly in the channel . So no thread starvation in the publisher and that message fetching task can be transaction and retriable. But again : I won’t accept a scheduler solution since we have it already with a QueueChannel.

igorlovich pushed a commit to igorlovich/spring-integration that referenced this issue Mar 28, 2023
igorlovich added a commit to igorlovich/spring-integration that referenced this issue Mar 28, 2023
igorlovich added a commit to igorlovich/spring-integration that referenced this issue Mar 28, 2023
igorlovich added a commit to igorlovich/spring-integration that referenced this issue Mar 28, 2023
@igorlovich
Copy link
Contributor Author

@artembilan could you take a look at the PR please ?

I have added another (local) latch in the start method as I was running into a race condition in tests between the subscriber being ready to receive notifications and notifications being emitted.

igorlovich added a commit to igorlovich/spring-integration that referenced this issue Mar 28, 2023
igorlovich added a commit to igorlovich/spring-integration that referenced this issue Mar 29, 2023
artembilan pushed a commit that referenced this issue Mar 29, 2023
Fixes #8582

* Introduce a `PostgresSubscribableChannel.setTransactionManager()`
to wrap a message polling and dispatching operation into a transaction
* In addition add a `RetryTemplate` support around transaction attempts

**Cherry-pick to `6.0.x`**
@artembilan artembilan added this to the 6.1.0-RC1 milestone Mar 29, 2023
@artembilan artembilan added in: jdbc backport 6.0.x and removed status: waiting-for-reporter Needs a feedback from the reporter labels Mar 29, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants