Transactional Outbox pattern with spring-boot

We can use Spring Integration to implement the outbox pattern. This can be done by setting up an integration flow that takes the email message as input and delivers it to a JDBC-backed output with a polling handler that will send the mail.

Project setup

  • Project: Maven
  • Language: Java
  • Spring Boot: 3.3.0
  • Packaging: Jar
  • Java: 21
  • Dependencies:
  • Spring Web
  • Spring Data JPA
  • Spring Integration
  • Docker Compose Support
  • PostgreSQL Driver
  • Flyway Migration

In the generated pom.xml, manually add the spring-integration-jdbc dependency:

  <dependency>
    <groupId>org.springframework.integration</groupId>
	<artifactId>spring-integration-jdbc</artifactId>
  </dependency>

Spring Integration setup

First, we configure Spring Integration itself by adding this configuration:

SpringIntegrationConfiguration.java

package com.wimdeblauwe.examples.transactional_outbox_spring_integration.infrastructure.integration;
 
import javax.sql.DataSource;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.jdbc.store.JdbcChannelMessageStore;
import org.springframework.integration.jdbc.store.channel.PostgresChannelMessageStoreQueryProvider;
 
@Configuration
public class SpringIntegrationConfiguration {
 
  private static final String CONCURRENT_METADATA_STORE_PREFIX = "_spring_integration_";
 
  @Bean
  JdbcChannelMessageStore jdbcChannelMessageStore(
      DataSource dataSource) {
    JdbcChannelMessageStore jdbcChannelMessageStore = new JdbcChannelMessageStore(dataSource);
    jdbcChannelMessageStore.setTablePrefix(CONCURRENT_METADATA_STORE_PREFIX);
    jdbcChannelMessageStore.setChannelMessageStoreQueryProvider(
        new PostgresChannelMessageStoreQueryProvider());
    return jdbcChannelMessageStore;
  }
}

This bean will persist the objects we add to the outbox Spring Integration channel in the database.

Next, we define the integration flow for the mail:

MailConfiguration.java

package com.wimdeblauwe.examples.transactional_outbox_spring_integration.infrastructure.mail;
 
import java.time.Duration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.jdbc.store.JdbcChannelMessageStore;
 
@Configuration
public class MailConfigration {
 
  @Bean
  public DirectChannel mailInput() {
    return new DirectChannel();
  }
 
  @Bean
  public QueueChannel mailOutbox(JdbcChannelMessageStore jdbcChannelMessageStore) {
    return MessageChannels.queue(jdbcChannelMessageStore, "mail-outbox").getObject();
  }
 
  @Bean
  public IntegrationFlow mailFlow(JdbcChannelMessageStore jdbcChannelMessageStore,
      MailSender mailSender) {
    return IntegrationFlow.from(mailInput())
        .channel(mailOutbox(jdbcChannelMessageStore))
        .handle(message -> {
          MailMessage mailMessage = (MailMessage) message.getPayload();
          mailSender.sendMail(mailMessage);
        }, e -> e.poller(Pollers.fixedDelay(Duration.ofSeconds(1))
            .transactional()))
        .get();
  }
}

The configuration has 3 beans:

  • mailInput: This is the input channel that will receive the MailMessage to be sent.
  • mailOutbox: This is the channel that the message is routed to and will store the message using the JdbcChannelMessageStore that we configured in the SpringIntegrationConfiguration class.
  • mailFlow: This defines the actual flow from the mailInput to the mailOutbox and adds a handle() method that does the actual sending of the emails. It polls the mailOutput every second to see if there are mails to be sent or not. Due to the transactional() the message remains on the mailOutbox until the sending succeeds.

This configuration class uses 2 classes that have not been explained yet: MailMessage and MailSender.

The MailMessage class is a record that contains the information needed to send the email:

MailMessage.java

package com.wimdeblauwe.examples.transactional_outbox_spring_integration.infrastructure.mail;
 
import java.io.Serial;
import java.io.Serializable;
 
public record MailMessage(String subject, String body, String to) implements Serializable {
 
  @Serial
  private static final long serialVersionUID = 1L;
}

Note how we need to make the class Serializable so that Spring Integration can store it in the database.

The MailSender is an interface that can be implemented in various ways depending on how you want to send the emails:

MailSender.java

package com.wimdeblauwe.examples.transactional_outbox_spring_integration.infrastructure.mail;
 
public interface MailSender {
 
  void sendMail(MailMessage mailMessage);
}

For testing, I implemented an unreliable mail sender that logs or throws an exception randomly. In reality, you will likely use Java Mail to connect to an SMTP server, or use a service such as SendGrid or Amazon SES to send the emails.

LoggingMailSender.java

package com.wimdeblauwe.examples.transactional_outbox_spring_integration.infrastructure.mail;
 
import java.util.random.RandomGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
 
@Component
public class LoggingMailSender implements
    MailSender {
 
  private static final Logger LOGGER = LoggerFactory.getLogger(LoggingMailSender.class);
  private final RandomGenerator randomGenerator = RandomGenerator.getDefault();
 
  @Override
  public void sendMail(MailMessage mailMessage) {
    if (randomGenerator.nextBoolean()) {
      LOGGER.info("Sending email: {}", mailMessage);
    } else {
      throw new RuntimeException("Email server down");
    }
  }
}

Sending the email from the application

In order to tap into the Spring Integration flow, we need to create a messaging gateway. This is done via an interface annotated with @MessagingGateway:

package com.wimdeblauwe.examples.transactional_outbox_spring_integration.infrastructure.mail;
 
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
 
@MessagingGateway
public interface MailGateway {
 
  @Gateway(requestChannel = "mailInput")
  void sendMail(MailMessage mailMessage);
}

Note that the name of the requestChannel has to match with the name of the bean of our input channel in the MailConfiguration class.

We don’t need to provide an implementation. Spring Integration will implement this at runtime for us.

An example use case that uses this gateway could look like this:

package com.wimdeblauwe.examples.transactional_outbox_spring_integration.order.usecase;
 
import com.wimdeblauwe.examples.transactional_outbox_spring_integration.infrastructure.mail.MailGateway;
import com.wimdeblauwe.examples.transactional_outbox_spring_integration.infrastructure.mail.MailMessage;
import com.wimdeblauwe.examples.transactional_outbox_spring_integration.order.Order;
import com.wimdeblauwe.examples.transactional_outbox_spring_integration.order.repository.OrderRepository;
import java.math.BigDecimal;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
 
@Component
@Transactional
public class CompleteOrder {
 
  private static final Logger LOGGER = LoggerFactory.getLogger(CompleteOrder.class);
  private final OrderRepository orderRepository;
  private final MailGateway mailGateway;
 
  public CompleteOrder(OrderRepository orderRepository, MailGateway mailGateway) {
    this.orderRepository = orderRepository;
    this.mailGateway = mailGateway;
  }
 
  public void execute(BigDecimal amount, String email) {
    LOGGER.info("Completing order for {}", email);
    Order order = new Order();
    order.setAmount(amount);
    order.setCustomerEmail(email);
 
    LOGGER.info("Save order in database");
    orderRepository.save(order); (1)
 
    MailMessage message = new MailMessage("Order %s completed".formatted(order.getId()),
        "Your order is registered in our system and will be processed.",
        order.getCustomerEmail()); (2)
    LOGGER.info("Sending email for order");
    mailGateway.sendMail(message); (3)
  }
}
1Save the Order in the database.
2Compose the data for the email message.
3Pass the data to the MailGateway for sending out the email.

From the use case side, it seems like we synchronously send the email, but in reality, the MailMessage is stored in the same transaction as the Order and the mail itself it sent asynchronously a few moments later.

Testing time

To test that everything works, we can create a REST controller to trigger the use case:

package com.wimdeblauwe.examples.transactional_outbox_spring_integration.order.web;
 
import com.wimdeblauwe.examples.transactional_outbox_spring_integration.order.usecase.CompleteOrder;
import java.math.BigDecimal;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
 
@RestController
@RequestMapping("/orders")
public class OrderRestController {
 
  private final CompleteOrder completeOrder;
 
  public OrderRestController(CompleteOrder completeOrder) {
    this.completeOrder = completeOrder;
  }
 
  @PostMapping
  public void completeOrder(@RequestBody CompleteOrderRequest request) {
    completeOrder.execute(request.amount(), request.email());
  }
 
  public record CompleteOrderRequest(BigDecimal amount, String email) {
 
  }
}

Using the HTTP client of IntelliJ or any other tool to send out a request, we can add a few orders:

POST http://localhost:8080/orders
Content-Type: application/json
 
{
  "amount": "100.0",
  "email": "test@example.com"
}

If you check the logging of the application, you will sometimes see a stack trace that the email could not be delivered, but soon after you will see a retry that most likely will succeed.

Our example here uses PostgreSQL, but if you use MySQL instead, there are a few things you need to change. Under the hood, Spring Integration uses SKIP LOCK, but MySQL does not support this.

You can do the following to make it work with MySQL:

  1. Define a TransactionInterceptor with READ_COMMITTED isolation level in SpringIntegrationConfiguration:
@Bean
public TransactionInterceptor springIntegrationTransactionInterceptor() {
  return new TransactionInterceptorBuilder()
      .isolation(Isolation.READ_COMMITTED)
      .build();
}
  1. Update the mailFlow bean to use this interceptor:
@Bean
public IntegrationFlow mailFlow(JdbcChannelMessageStore jdbcChannelMessageStore,
    MailSender mailSender,
    @Qualifier("springIntegrationTransactionInterceptor") TransactionInterceptor transactionInterceptor) {  // 1
  return IntegrationFlow.from(mailInput())
      .channel(mailOutbox(jdbcChannelMessageStore))
      .handle(message -> {
        MailMessage mailMessage = (MailMessage) message.getPayload();
        mailSender.sendMail(mailMessage);
      }, e -> e.poller(Pollers.fixedDelay(Duration.ofSeconds(1))
          .transactional(transactionInterceptor))) // 2
      .get();
}
  1. Declare the TransactionInterceptor as a parameter so Spring can inject it. We need to use the qualifier to ensure we get the one we declared in SpringIntegrationConfiguration.
  2. Use the interceptor as an argument to the transactional() method.

Spring Modulith

Spring Modulith is a new project in the Spring portfolio. It is led by Oliver Drotbohm and aims to make it easier to build modular monolith applications with Spring.

Communication between modules can be done asynchronously by using the ApplicationEventPublisher from Spring core. Spring Modulith has additional infrastructure to ensure no such event is ever lost by first storing it in the database. We can leverage this to build our outbox pattern.

Project setup

Create a Spring Boot project on start.spring.io with the following configuration:

  • Project: Maven
  • Language: Java
  • Spring Boot: 3.3.0
  • Packaging: Jar
  • Java: 21
  • Dependencies:
    • Spring Web
    • Spring Data JPA
    • Spring Modulith
    • Docker Compose Support
    • PostgreSQL Driver
    • Flyway Migration

Replace the spring-modulith-starter-jpa with spring-modulith-starter-jdbc:

pom.xml

<dependency>
    <groupId>org.springframework.modulith</groupId>
    <artifactId>spring-modulith-starter-jdbc</artifactId>
</dependency>

In this example, we will publish an OrderCompleted event from our usecase. The event itself is a simple record with a reference to the id of the order:

public record OrderCompleted(Long orderId) {
}

The use case publishes the event:

@Component
@Transactional
public class CompleteOrder {
 
  private static final Logger LOGGER = LoggerFactory.getLogger(CompleteOrder.class);
  private final OrderRepository orderRepository;
  private final ApplicationEventPublisher eventPublisher;
 
  public CompleteOrder(OrderRepository orderRepository, ApplicationEventPublisher eventPublisher) {
    this.orderRepository = orderRepository;
    this.eventPublisher = eventPublisher;
  }
 
  public void execute(BigDecimal amount, String email) {
    LOGGER.info("Completing order for {}", email);
    Order order = new Order();
    order.setAmount(amount);
    order.setCustomerEmail(email);
 
    LOGGER.info("Save order in database");
    orderRepository.save(order);
 
    eventPublisher.publishEvent(new OrderCompleted(order.getId()));  // 1
  }
}
  1. Publish the OrderCompleted event.

We can now create a Spring component that listens for the event and sends out a mail notification:

@Component
public class MailNotifier {
 
  private static final Logger LOGGER = LoggerFactory.getLogger(MailNotifier.class);
  private final MailSender mailSender;
  private final OrderRepository orderRepository;
 
  public MailNotifier(MailSender mailSender, OrderRepository orderRepository) {
    this.mailSender = mailSender;
    this.orderRepository = orderRepository;
  }
 
  @ApplicationModuleListener // 1
  public void onOrderCompleted(OrderCompleted orderCompleted) {
    Order order = orderRepository.findById(orderCompleted.orderId())
        .orElseThrow(() -> new RuntimeException("Order not found"));
 
    MailMessage message = new MailMessage("Order %s completed".formatted(order.getId()),
        "Your order is registered in our system and will be processed.",
        order.getCustomerEmail());
    LOGGER.info("Sending email for order {}", orderCompleted.orderId());
    mailSender.sendMail(message);
  }
}
  1. Mark the method as an @ApplicationModuleListener. This is an annotation provided by Spring Modulith and a combination of:
  • @Async: because we want the mail to be send asynchrounously. We don’t want the processing of the CompleteOrder use case to be affected by the email sending.
  • @Transactional: Since our listener runs in a separate thread, we should start a new transaction to get the state of the Order from the repository.
  • @TransactionalEventListener: This ensures this method is called when the transaction that contains the sending of the event is comitted. If the transaction is rolled back, our listener is not called.

We can again test this by using the IntelliJ HTTP client and notice that sometimes the mail is sent properly and sometimes it fails (since our mailsender has the ramdom failure code). If we check the database, we can see that the events are stored and marked as published or not:

idlistener_idevent_typeserialized_eventpublication_datecompletion_date
6fcaa30a-2b36-4f10-a091-4ce10ab520eaMailNotifier.onOrderCompleted(OrderCompleted)OrderCompleted{“orderId”:1}2024-06-13 05:50:43.090615 +00:002024-06-13 05:50:43.148320 +00:00
ddb661ad-d567-42a9-9f90-4a62bbffb3fcMailNotifier.onOrderCompleted(OrderCompleted)OrderCompleted{“orderId”:2}2024-06-13 05:50:57.749954 +00:00null

What is nice here is that the event is serialized to JSON, so it is readable in the database what it contains. With Spring Integration, it uses Java serialization, so there you only get a meaningless blob of bytes.

Update: You can use JSON as well with Spring Integration with some additional configuration. See Spring Integration using JSON serialization for more info.

Retry failed events

Unlike with Spring Integration, there is no automatic retry, but we can easily add it.

The first way is setting a property that will retry the events on application startup:

application.properties

spring.modulith.republish-outstanding-events-on-restart=true

If you have failed events and you restart the Spring Boot application, you will notice that things are retried. However, I wonder if this is actually useful, given that normally you don’t restart an application that much.

A better way is to query for unpublished events from time to time and re-publish them. To accomplish that, we can update our MailNotifier like this:

@Component
public class MailNotifier {
 
  private static final Logger LOGGER = LoggerFactory.getLogger(MailNotifier.class);
  private final MailSender mailSender;
  private final OrderRepository orderRepository;
  private final IncompleteEventPublications incompleteEventPublications;
 
  public MailNotifier(MailSender mailSender, OrderRepository orderRepository, IncompleteEventPublications incompleteEventPublications)  {  // 1
    this.mailSender = mailSender;
    this.orderRepository = orderRepository;
    this.incompleteEventPublications = incompleteEventPublications;
  }
 
  @Scheduled(fixedRate = 5, timeUnit = TimeUnit.SECONDS)  // 2
  public void retries() {
    this.incompleteEventPublications.resubmitIncompletePublicationsOlderThan(Duration.ofSeconds(5));  // 3
  }
 
  // ... other code below
}
1Inject the IncompleteEventPublication interface from Spring Modulith.
2Add @Scheduled with a certain polling frequency on a public method. In our example, Spring will call this method every 5 seconds.
3Republish any incomplete event that is older than 5 seconds.

With this setup, the events that failed are retried while the application is running.

Message ordering

An important difference in the Spring Integration solution vs the Spring Modulith solution is that with Spring Integration, the order is preserved and a failure of a message will prevent processing the next message. With Spring Modulith, as the application module listeners are invoked asynchronously, the retries for the individual event publications will be executed concurrently. Thus, the order at which they eventually end up in the email server cannot be guaranteed.

In our example of sending emails, there is no need to stop a next message from being sent when a previous one fails. But in other scenarios (like putting messages on Kafka for example), you probably do care about message ordering.

Running multiple instances

Another important difference is when you run multiple instances of your application.

With Spring Integration, the email is sent from one of the instances. So no double emails, and if the one that is doing the retries fails, the other will take over automatically.

With Spring Modulith, we also don’t send double emails if nothing goes wrong. But the @Scheduled annotation is done by both instances, resulting in double emails if you have 2 instances running. We can solve this by using ShedLock for example to only have a single instance doing the retries of the events.

Conclusion

Both Spring Integration and Spring Modulith can be used to build a Transactional Outbox to get more certainty that your main database action and any notification to an external system is in sync and does not get lost. However, the Spring Integration solution does seem to have some advantages over the Spring Modulith one.

See transactional-outbox-spring-integration and transactional-outbox-spring-modulith on GitHub for the full sources of these examples.

If you have any questions or remarks, feel free to post a comment at GitHub discussions.