Skip to content

Commit

Permalink
spring-projectsGH-8582:Transactional support in PostgresSubscribableC…
Browse files Browse the repository at this point in the history
…hannel
  • Loading branch information
igorlovich committed Mar 28, 2023
1 parent 6d7ee46 commit 6ad6722
Show file tree
Hide file tree
Showing 3 changed files with 240 additions and 42 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022 the original author or authors.
* Copyright 2022-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -26,6 +26,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;

import org.postgresql.PGNotification;
Expand Down Expand Up @@ -59,6 +60,7 @@
*
* @author Rafael Winterhalter
* @author Artem Bilan
* @author Igor Lovich
*
* @since 6.0
*/
Expand All @@ -77,6 +79,8 @@ public final class PostgresChannelMessageTableSubscriber implements SmartLifecyc

private CountDownLatch latch = new CountDownLatch(0);

private boolean userProvidedExecutor = false;

private Future<?> future = CompletableFuture.completedFuture(null);

@Nullable
Expand Down Expand Up @@ -143,12 +147,17 @@ public synchronized void start() {
ExecutorService executorToUse = this.executor;
if (executorToUse == null) {
CustomizableThreadFactory threadFactory =
new CustomizableThreadFactory("postgres-channel-message-table-subscriber-");
new CustomizableThreadFactory("postgres-channel-notifications-");
threadFactory.setDaemon(true);
executorToUse = Executors.newSingleThreadExecutor(threadFactory);
executorToUse = Executors.newFixedThreadPool(2, threadFactory);
this.executor = executorToUse;
}
else {
this.userProvidedExecutor = true;
}
this.latch = new CountDownLatch(1);

CountDownLatch startingLatch = new CountDownLatch(1);
this.future = executorToUse.submit(() -> {
try {
while (isActive()) {
Expand All @@ -166,11 +175,13 @@ public synchronized void start() {
}
throw ex;
}
this.subscriptionsMap.values()
.forEach(subscriptions -> subscriptions.forEach(Subscription::notifyUpdate));
this.subscriptionsMap.values().forEach(this::notifyAll);

try {
this.connection = conn;
while (isActive()) {
startingLatch.countDown();

PGNotification[] notifications = conn.getNotifications(0);
// Unfortunately, there is no good way of interrupting a notification
// poll but by closing its connection.
Expand All @@ -184,9 +195,7 @@ public synchronized void start() {
if (subscriptions == null) {
continue;
}
for (Subscription subscription : subscriptions) {
subscription.notifyUpdate();
}
notifyAll(subscriptions);
}
}
}
Expand All @@ -208,6 +217,29 @@ public synchronized void start() {
this.latch.countDown();
}
});

try {
if (!startingLatch.await(5, TimeUnit.SECONDS)) {
throw new IllegalStateException("Failed to start "
+ PostgresChannelMessageTableSubscriber.class.getName());
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("Failed to start "
+ PostgresChannelMessageTableSubscriber.class.getName(), e);
}
}

private void notifyAll(Set<Subscription> subscriptions) {
subscriptions.forEach(it -> {
try {
this.executor.submit(it::notifyUpdate);
}
catch (RejectedExecutionException e) {
LOGGER.warn(e, "Executor rejected submission of notification task");
}
});
}

private boolean isActive() {
Expand All @@ -232,6 +264,11 @@ public synchronized void stop() {
catch (SQLException ignored) {
}
}

if (!this.userProvidedExecutor) {
shutdownAndAwaitTermination(this.executor);
}

try {
if (!this.latch.await(5, TimeUnit.SECONDS)) {
throw new IllegalStateException("Failed to stop "
Expand All @@ -242,6 +279,35 @@ public synchronized void stop() {
}
}


/**
* Gracefully shutdown an executor service. Taken from @see ExecutorService javadoc
*
* @param pool The pool to shut down
*/
private void shutdownAndAwaitTermination(@Nullable ExecutorService pool) {
if (pool == null) {
return;
}
pool.shutdown(); // Disable new tasks from being submitted
try {
// Wait a while for existing tasks to terminate
if (!pool.awaitTermination(2, TimeUnit.SECONDS)) {
pool.shutdownNow(); // Cancel currently executing tasks
// Wait a while for tasks to respond to being cancelled
if (!pool.awaitTermination(2, TimeUnit.SECONDS)) {
LOGGER.warn("Unable to shutdown the executor service");
}
}
}
catch (InterruptedException ie) {
// (Re-)Cancel if current thread also interrupted
pool.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
}

@Override
public boolean isRunning() {
return this.latch.getCount() > 0;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022 the original author or authors.
* Copyright 2022-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,15 +16,17 @@

package org.springframework.integration.jdbc.channel;

import java.util.concurrent.Executor;
import java.util.Optional;

import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.integration.channel.AbstractSubscribableChannel;
import org.springframework.integration.dispatcher.MessageDispatcher;
import org.springframework.integration.dispatcher.UnicastingDispatcher;
import org.springframework.integration.jdbc.store.JdbcChannelMessageStore;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.util.Assert;

/**
Expand All @@ -39,6 +41,7 @@
*
* @author Rafael Winterhalter
* @author Artem Bilan
* @author Igor Lovich
*
* @since 6.0
*/
Expand All @@ -47,11 +50,15 @@ public class PostgresSubscribableChannel extends AbstractSubscribableChannel

private final JdbcChannelMessageStore jdbcChannelMessageStore;

private TransactionTemplate transactionTemplate;

private final Object groupId;

private final PostgresChannelMessageTableSubscriber messageTableSubscriber;

private UnicastingDispatcher dispatcher = new UnicastingDispatcher(new SimpleAsyncTaskExecutor());
private final UnicastingDispatcher dispatcher = new UnicastingDispatcher();

private RetryTemplate retryTemplate = RetryTemplate.builder().maxAttempts(1).build();

/**
* Create a subscribable channel for a Postgres database.
Expand All @@ -70,12 +77,22 @@ public PostgresSubscribableChannel(JdbcChannelMessageStore jdbcChannelMessageSto
}

/**
* Set the executor to use for dispatching newly received messages.
* @param executor The executor to use.
* Sets the transaction manager to use for message processing. Each message will be processed in a
* separate transaction
* @param transactionManager The transaction manager to use
*/
public void setDispatcherExecutor(Executor executor) {
Assert.notNull(executor, "An executor must be provided.");
this.dispatcher = new UnicastingDispatcher(executor);
public void setTransactionManager(PlatformTransactionManager transactionManager) {
Assert.notNull(transactionManager, "A platform transaction manager must be provided.");
this.transactionTemplate = new TransactionTemplate(transactionManager);
}

/**
* Sets retry template to use for retries in case of exception in downstream processing
* @param retryTemplate The retry template to use
*/
public void setRetryTemplate(RetryTemplate retryTemplate) {
Assert.notNull(retryTemplate, "A retry template must be provided.");
this.retryTemplate = retryTemplate;
}

@Override
Expand Down Expand Up @@ -110,10 +127,29 @@ protected boolean doSend(Message<?> message, long timeout) {

@Override
public void notifyUpdate() {
Message<?> message;
while ((message = this.jdbcChannelMessageStore.pollMessageFromGroup(this.groupId)) != null) {
this.dispatcher.dispatch(message);
}
Optional<Message<?>> dispatchedMessage;

do {
if (this.transactionTemplate != null) {
dispatchedMessage = this.retryTemplate.execute(context ->
this.transactionTemplate.execute(status ->
Optional.ofNullable(this.jdbcChannelMessageStore.pollMessageFromGroup(this.groupId))
.map(this::dispatch)
)
);
}
else {
dispatchedMessage = Optional.ofNullable(this.jdbcChannelMessageStore.pollMessageFromGroup(this.groupId))
.map(message ->
this.retryTemplate.execute(context -> dispatch(message))
);
}
} while (dispatchedMessage.isPresent());
}

private Message<?> dispatch(Message<?> message) {
this.dispatcher.dispatch(message);
return message;
}

@Override
Expand Down
Loading

0 comments on commit 6ad6722

Please sign in to comment.