此篇文章主要用于讲解使用Spring进行编码时,核心与非核心代码解耦合常用的观察者模式@EventListener的使用方法,以及不常用的@TransactionalEventListener的使用场景和注意事项。

常规情况

在我们常规的业务开发中,有很多场景都会使用到观察者模式来解耦合,将非核心流程剥离到主流程之外,提高代码的可读性,举例:用户注册流程,当用户注册完成之后,需要给用户发送一个注册成功的短信通知,核心流程是存储用户信息,次要流程是发送短信通知。

由于本文主要是用来介绍SpringEvent的使用,所以另外的方案就不做介绍了,接下来让我们来初始化一个项目来模拟这部分业务代码的实现:

对于这样一个流程,我们常规的设计方案有以下几种:

  1. 设计一个有界队列线程池,将发送短信流程提交给异步线程执行。
  2. 使用SpringEvent,发布用户注册成功事件,由监听器执行(也可以选择执行线程)。
  3. 使用MQ,由消费者负责执行。(最重,最稳妥的方案)

Entity

public class User {
    @TableId(type = IdType.ASSIGN_ID)
    private String id;
    private String username;
    private String address;
}

Service

public interface UserService extends IService<User> {
    String addUser(String username, String address);
}

public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements UserService {

    private final ApplicationContext applicationContext;
    @Override
    @Transactional(rollbackFor = Exception.class)
    public String addUser(String username, String address) {
        User user = User.builder().username(username).address(address).build();
        log.info("添加用户开始");
        super.save(user);
        log.info("添加用户成功, 发送事件开始");
        applicationContext.publishEvent(new AddUserEvent(this, user));
        log.info("添加用户成功, 发送事件结束");
        return user.getId();
    }
}

Listener

public class UserListener {
    @EventListener(AddUserEvent.class)
    public void addHandler(AddUserEvent event) {
        User user = event.getUser();
        log.info("接收到添加用户事件, 发送短信完成 Normal, user={}", user.toString());
    }
}

Controller

@RequestMapping("user")
public class UserController {
    private final UserService userService;
    @PostMapping("/add")
    public String add(String username, String address) {
        return userService.addUser(username, address);
    }
}

DB

ORM框架我们使用Mybatis-Plus,数据库我们使用db2作为临时数据库,目前没有数据。

2024-08-04 16:31:45.590  INFO 55648 --- [nio-8080-exec-4] t.i.l.s.d.service.impl.UserServiceImpl   : 添加用户开始
Creating a new SqlSession
Registering transaction synchronization for SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@6e3d4aa9]
JDBC Connection [HikariProxyConnection@1603671607 wrapping conn0: url=jdbc:h2:~/mydb user=ROOT] will be managed by Spring
==>  Preparing: INSERT INTO user ( id, username, address ) VALUES ( ?, ?, ? )
==> Parameters: 1820014751548219393(String), yzt(String), shenzhen(String)
<==    Updates: 1
Releasing transactional SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@6e3d4aa9]
2024-08-04 16:31:45.705  INFO 55648 --- [nio-8080-exec-4] t.i.l.s.d.service.impl.UserServiceImpl   : 添加用户成功, 发送事件开始
2024-08-04 16:31:45.708  INFO 55648 --- [nio-8080-exec-4] t.i.l.s.demos.listener.UserListener      : 接收到添加用户事件, 发送短信完成 Normal, user=User(id=1820014751548219393, username=yzt, address=shenzhen)
2024-08-04 16:31:45.709  INFO 55648 --- [nio-8080-exec-4] t.i.l.s.d.service.impl.UserServiceImpl   : 添加用户成功, 发送事件结束
Transaction synchronization committing SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@6e3d4aa9]
Transaction synchronization deregistering SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@6e3d4aa9]
Transaction synchronization closing SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@6e3d4aa9]

再次查看数据

select * from user;

ID USERNAME ADDRESS
1820014751548219393 yzt shenzhen

(1 row, 2 ms)

小结

为了更好的呈现执行过程,上面的代码没有添加异步,通过上面的代码日志,可以看到整个过程大概如下图所示

一般我们的业务编码都是使用@EventListener来实现,在正常情况下不会有什么问题。但是在某些特殊场景下,可能会出现预期之外的结果。

场景2,事务异常

上面我们演示的更多是业务正常的情况,但通常情况下,添加用户不仅仅只有这么简单的业务,在存储之后,可能还有其他的业务,比如为邀请人结算奖励之类(举例),那么流程长了之后总会有异常的可能,比如我们用address=shanghai来模拟业务异常:

public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements UserService {
    private final ApplicationContext applicationContext;
    @Override
    @Transactional(rollbackFor = Exception.class)
    public String addUser(String username, String address) {
        User user = User.builder().username(username).address(address).build();
        log.info("添加用户开始");
        super.save(user);
        log.info("添加用户成功, 发送事件开始");
        applicationContext.publishEvent(new AddUserEvent(this, user));
        log.info("添加用户成功, 发送事件结束");
        
        // 模拟异常场景
        if (address.equals("shanghai")) {
            throw new RuntimeException("rollback");
        }
        return user.getId();
    }
}

再次查看日志

2024-08-04 16:47:30.838  INFO 55648 --- [nio-8080-exec-7] t.i.l.s.d.service.impl.UserServiceImpl   : 添加用户开始
Creating a new SqlSession
Registering transaction synchronization for SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@7900e41a]
JDBC Connection [HikariProxyConnection@1080868530 wrapping conn0: url=jdbc:h2:~/mydb user=ROOT] will be managed by Spring
==>  Preparing: INSERT INTO user ( id, username, address ) VALUES ( ?, ?, ? )
==> Parameters: 1820018715891113985(String), yzt1(String), shanghai(String)
<==    Updates: 1
Releasing transactional SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@7900e41a]
2024-08-04 16:47:30.839  INFO 55648 --- [nio-8080-exec-7] t.i.l.s.d.service.impl.UserServiceImpl   : 添加用户成功, 发送事件开始
2024-08-04 16:47:30.840  INFO 55648 --- [nio-8080-exec-7] t.i.l.s.demos.listener.UserListener      : 接收到添加用户事件, 发送短信完成 Normal, user=User(id=1820018715891113985, username=yzt1, address=shanghai)
2024-08-04 16:47:30.840  INFO 55648 --- [nio-8080-exec-7] t.i.l.s.d.service.impl.UserServiceImpl   : 添加用户成功, 发送事件结束
Transaction synchronization deregistering SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@7900e41a]
Transaction synchronization closing SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@7900e41a]
2024-08-04 16:47:30.842 ERROR 55648 --- [nio-8080-exec-7] o.a.c.c.C.[.[.[/].[dispatcherServlet]    : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is java.lang.RuntimeException: rollback] with root cause

java.lang.RuntimeException: rollback

因为下游的异常,导致事务并没有成功提交,但是前面的事件监听器已经被执行了,此时数据库未正确写入用户,但是短信已经发送出去了,在业务上肯定是不可接受的。
当然也可以说,把注册用户成功事件放到代码的最后面,这当然也是一种方案,但始终无法做到最完美的一致性问题,因为用户最终事务提交还是有可能失败(超时、重复写入等)。
或者说,将注册成功事件放到事务注解的外部,在确保事务提交之后,再发送事件,就像之前将的Redis的分布式锁解锁一样 使用Redis实现分布式锁的坑#3-事务未提交锁就释放了,这种方案固然可以,但体现到业务代码上,就需要再另一个被Spring代理了的bean上来操作,比较麻烦。

其实Spring是为这种场景提供了解决方案的,那就是@TransactionalEventListener,通过注解的名称就可以看出来,他是为了解决事务问题来提供的注解,它的使用和@EventListener完全一致,只是多了一个参数phase,共有4个选择:

  • TransactionPhase.AFTER_COMMIT,事务提交之后
  • TransactionPhase.BEFORE_COMMIT,事务提交之前
  • TransactionPhase.AFTER_COMPLETION,事务完成之后
  • TransactionPhase.AFTER_ROLLBACK,事务回滚之后
    通过名称可以很直观的看到他的作用,接下来我们添加5个事件监听器,分别来看看他们具体的执行时机:
public class UserListener {

    @EventListener(AddUserEvent.class)
    public void addHandler(AddUserEvent event) {
        User user = event.getUser();
        log.info("接收到添加用户事件, 发送短信完成 Normal, user={}", user.toString());
    }

    @TransactionalEventListener(value = AddUserEvent.class, phase = TransactionPhase.AFTER_COMMIT)
    public void addHandlerAfterCommit(AddUserEvent event) {
        User user = event.getUser();
        log.info("接收到添加用户事件, 发送短信完成 AFTER_COMMIT, user={}", user.toString());
    }

    @TransactionalEventListener(value = AddUserEvent.class, phase = TransactionPhase.BEFORE_COMMIT)
    public void addHandlerBeforeCommit(AddUserEvent event) {
        User user = event.getUser();
        log.info("接收到添加用户事件, 发送短信完成 BEFORE_COMMIT, user={}", user.toString());
    }

    @TransactionalEventListener(value = AddUserEvent.class, phase = TransactionPhase.AFTER_COMPLETION)
    public void addHandlerAfterCompletion(AddUserEvent event) {
        User user = event.getUser();
        log.info("接收到添加用户事件, 发送短信完成 AFTER_COMPLETION, user={}", user.toString());
    }

    @TransactionalEventListener(value = AddUserEvent.class, phase = TransactionPhase.AFTER_ROLLBACK)
    public void addHandlerAfterRollback(AddUserEvent event) {
        User user = event.getUser();
        log.info("添加用户事务执行失败, AFTER_ROLLBACK, user={}", user.toString());
    }
}

执行一次成功的写入,看看日志。

2024-08-04 17:02:09.069  INFO 56859 --- [nio-8080-exec-4] t.i.l.s.d.service.impl.UserServiceImpl   : 添加用户开始
Creating a new SqlSession
Registering transaction synchronization for SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@3e9cf44b]
JDBC Connection [HikariProxyConnection@879009595 wrapping conn0: url=jdbc:h2:~/mydb user=ROOT] will be managed by Spring
==>  Preparing: INSERT INTO user ( id, username, address ) VALUES ( ?, ?, ? )
==> Parameters: 1820022399454720001(String), yzt1(String), shenzhen(String)
<==    Updates: 1
Releasing transactional SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@3e9cf44b]
2024-08-04 17:02:09.070  INFO 56859 --- [nio-8080-exec-4] t.i.l.s.d.service.impl.UserServiceImpl   : 添加用户成功, 发送事件开始
2024-08-04 17:02:09.070  INFO 56859 --- [nio-8080-exec-4] t.i.l.s.demos.listener.UserListener      : 接收到添加用户事件, 发送短信完成 Normal, user=User(id=1820022399454720001, username=yzt1, address=shenzhen)
2024-08-04 17:02:09.071  INFO 56859 --- [nio-8080-exec-4] t.i.l.s.d.service.impl.UserServiceImpl   : 添加用户成功, 发送事件结束
Transaction synchronization committing SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@3e9cf44b]
2024-08-04 17:02:09.071  INFO 56859 --- [nio-8080-exec-4] t.i.l.s.demos.listener.UserListener      : 接收到添加用户事件, 发送短信完成 BEFORE_COMMIT, user=User(id=1820022399454720001, username=yzt1, address=shenzhen)
Transaction synchronization deregistering SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@3e9cf44b]
Transaction synchronization closing SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@3e9cf44b]
2024-08-04 17:02:09.072  INFO 56859 --- [nio-8080-exec-4] t.i.l.s.demos.listener.UserListener      : 接收到添加用户事件, 发送短信完成 AFTER_COMMIT, user=User(id=1820022399454720001, username=yzt1, address=shenzhen)
2024-08-04 17:02:09.072  INFO 56859 --- [nio-8080-exec-4] t.i.l.s.demos.listener.UserListener      : 接收到添加用户事件, 发送短信完成 AFTER_COMPLETION, user=User(id=1820022399454720001, username=yzt1, address=shenzhen)

再执行一次失败的写入,看看日志:

2024-08-04 17:09:30.720  INFO 56859 --- [nio-8080-exec-7] t.i.l.s.d.service.impl.UserServiceImpl   : 添加用户开始
Creating a new SqlSession
Registering transaction synchronization for SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@4ef5b6da]
JDBC Connection [HikariProxyConnection@566058297 wrapping conn0: url=jdbc:h2:~/mydb user=ROOT] will be managed by Spring
==>  Preparing: INSERT INTO user ( id, username, address ) VALUES ( ?, ?, ? )
==> Parameters: 1820024251877470209(String), yzt1(String), shanghai(String)
<==    Updates: 1
Releasing transactional SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@4ef5b6da]
2024-08-04 17:09:30.722  INFO 56859 --- [nio-8080-exec-7] t.i.l.s.d.service.impl.UserServiceImpl   : 添加用户成功, 发送事件开始
2024-08-04 17:09:30.722  INFO 56859 --- [nio-8080-exec-7] t.i.l.s.demos.listener.UserListener      : 接收到添加用户事件, 发送短信完成 Normal, user=User(id=1820024251877470209, username=yzt1, address=shanghai)
2024-08-04 17:09:30.722  INFO 56859 --- [nio-8080-exec-7] t.i.l.s.d.service.impl.UserServiceImpl   : 添加用户成功, 发送事件结束
Transaction synchronization deregistering SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@4ef5b6da]
Transaction synchronization closing SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@4ef5b6da]
2024-08-04 17:09:30.723  INFO 56859 --- [nio-8080-exec-7] t.i.l.s.demos.listener.UserListener      : 接收到添加用户事件, 发送短信完成 AFTER_COMPLETION, user=User(id=1820024251877470209, username=yzt1, address=shanghai)
2024-08-04 17:09:30.723  INFO 56859 --- [nio-8080-exec-7] t.i.l.s.demos.listener.UserListener      : 添加用户事务执行失败, AFTER_ROLLBACK, user=User(id=1820024251877470209, username=yzt1, address=shanghai)
2024-08-04 17:09:30.725 ERROR 56859 --- [nio-8080-exec-7] o.a.c.c.C.[.[.[/].[dispatcherServlet]    : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is java.lang.RuntimeException: rollback] with root cause

java.lang.RuntimeException: rollback

执行流程

通过上面的日志可以看到整个执行过程如下图所示:

由此可知,有了@TransactionalEventListener,我们不用确定事务的提交时机以及是否成功,只需要编写对应的监听器处理器,并指定执行时事务的时机即可在正确的时间点被调用,这一切都是Spring的AOP在帮我们处理。

TransactionalApplicationListenerMethodAdapter

public void onApplicationEvent(ApplicationEvent event) {
    if (TransactionSynchronizationManager.isSynchronizationActive() &&
            TransactionSynchronizationManager.isActualTransactionActive()) {
        TransactionSynchronizationManager.registerSynchronization(
                new TransactionalApplicationListenerSynchronization<>(event, this, this.callbacks));
    }
    else if (this.annotation.fallbackExecution()) {
        if (this.annotation.phase() == TransactionPhase.AFTER_ROLLBACK && logger.isWarnEnabled()) {
            logger.warn("Processing " + event + " as a fallback execution on AFTER_ROLLBACK phase");
        }
        processEvent(event);
    }
    else {
        // No transactional event execution at all
        if (logger.isDebugEnabled()) {
            logger.debug("No transaction is active - skipping " + event);
        }
    }
}

如果有事务在进行中,就将其监听器处理器先放到TransactionSynchronizationManager注册一个同步队列,在事务执行到对应的阶段,再回调每个监听了对应阶段的事务处理器。

AbstractPlatformTransactionManager#processCommit

protected final void triggerBeforeCommit(DefaultTransactionStatus status) {
    if (status.isNewSynchronization()) {
        TransactionSynchronizationUtils.triggerBeforeCommit(status.isReadOnly());
    }
}
protected final void triggerBeforeCompletion(DefaultTransactionStatus status) {
    if (status.isNewSynchronization()) {
        TransactionSynchronizationUtils.triggerBeforeCompletion();
    }
}
private void triggerAfterCommit(DefaultTransactionStatus status) {
    if (status.isNewSynchronization()) {
        TransactionSynchronizationUtils.triggerAfterCommit();
    }
}

小结

通过本篇文章,我们大致了解了@TransactionalEventListener注解的使用场景和注意事项,也了解了其大概的实现原理,其实SpringEvent的相关源码看起来非常容易,只要稍微看过Spring相关源码,并且对SpringAOP相关逻辑了解的,就可以很容易的看懂,因为他的调用过程没那么多弯弯绕绕,只要看着applicationContext.publishEvent方法一直往下盯,很快就能够看完整个的执行过程。

本篇博客对应的源码,希望能对你有所帮助:spring-boot-events