此篇文章主要用于讲解使用Spring进行编码时,核心与非核心代码解耦合常用的观察者模式@EventListener
的使用方法,以及不常用的@TransactionalEventListener
的使用场景和注意事项。
常规情况
在我们常规的业务开发中,有很多场景都会使用到观察者模式来解耦合,将非核心流程剥离到主流程之外,提高代码的可读性,举例:用户注册流程,当用户注册完成之后,需要给用户发送一个注册成功的短信通知,核心流程是存储用户信息,次要流程是发送短信通知。
由于本文主要是用来介绍SpringEvent的使用,所以另外的方案就不做介绍了,接下来让我们来初始化一个项目来模拟这部分业务代码的实现:
对于这样一个流程,我们常规的设计方案有以下几种:
- 设计一个有界队列线程池,将发送短信流程提交给异步线程执行。
- 使用SpringEvent,发布用户注册成功事件,由监听器执行(也可以选择执行线程)。
- 使用MQ,由消费者负责执行。(最重,最稳妥的方案)
Entity
public class User {
@TableId(type = IdType.ASSIGN_ID)
private String id;
private String username;
private String address;
}
Service
public interface UserService extends IService<User> {
String addUser(String username, String address);
}
public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements UserService {
private final ApplicationContext applicationContext;
@Override
@Transactional(rollbackFor = Exception.class)
public String addUser(String username, String address) {
User user = User.builder().username(username).address(address).build();
log.info("添加用户开始");
super.save(user);
log.info("添加用户成功, 发送事件开始");
applicationContext.publishEvent(new AddUserEvent(this, user));
log.info("添加用户成功, 发送事件结束");
return user.getId();
}
}
Listener
public class UserListener {
@EventListener(AddUserEvent.class)
public void addHandler(AddUserEvent event) {
User user = event.getUser();
log.info("接收到添加用户事件, 发送短信完成 Normal, user={}", user.toString());
}
}
Controller
@RequestMapping("user")
public class UserController {
private final UserService userService;
@PostMapping("/add")
public String add(String username, String address) {
return userService.addUser(username, address);
}
}
DB
ORM框架我们使用Mybatis-Plus,数据库我们使用db2作为临时数据库,目前没有数据。
2024-08-04 16:31:45.590 INFO 55648 --- [nio-8080-exec-4] t.i.l.s.d.service.impl.UserServiceImpl : 添加用户开始
Creating a new SqlSession
Registering transaction synchronization for SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@6e3d4aa9]
JDBC Connection [HikariProxyConnection@1603671607 wrapping conn0: url=jdbc:h2:~/mydb user=ROOT] will be managed by Spring
==> Preparing: INSERT INTO user ( id, username, address ) VALUES ( ?, ?, ? )
==> Parameters: 1820014751548219393(String), yzt(String), shenzhen(String)
<== Updates: 1
Releasing transactional SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@6e3d4aa9]
2024-08-04 16:31:45.705 INFO 55648 --- [nio-8080-exec-4] t.i.l.s.d.service.impl.UserServiceImpl : 添加用户成功, 发送事件开始
2024-08-04 16:31:45.708 INFO 55648 --- [nio-8080-exec-4] t.i.l.s.demos.listener.UserListener : 接收到添加用户事件, 发送短信完成 Normal, user=User(id=1820014751548219393, username=yzt, address=shenzhen)
2024-08-04 16:31:45.709 INFO 55648 --- [nio-8080-exec-4] t.i.l.s.d.service.impl.UserServiceImpl : 添加用户成功, 发送事件结束
Transaction synchronization committing SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@6e3d4aa9]
Transaction synchronization deregistering SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@6e3d4aa9]
Transaction synchronization closing SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@6e3d4aa9]
再次查看数据
select * from user;
ID | USERNAME | ADDRESS |
---|---|---|
1820014751548219393 | yzt | shenzhen |
(1 row, 2 ms)
小结
为了更好的呈现执行过程,上面的代码没有添加异步,通过上面的代码日志,可以看到整个过程大概如下图所示
一般我们的业务编码都是使用@EventListener
来实现,在正常情况下不会有什么问题。但是在某些特殊场景下,可能会出现预期之外的结果。
场景2,事务异常
上面我们演示的更多是业务正常的情况,但通常情况下,添加用户不仅仅只有这么简单的业务,在存储之后,可能还有其他的业务,比如为邀请人结算奖励之类(举例),那么流程长了之后总会有异常的可能,比如我们用address=shanghai
来模拟业务异常:
public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements UserService {
private final ApplicationContext applicationContext;
@Override
@Transactional(rollbackFor = Exception.class)
public String addUser(String username, String address) {
User user = User.builder().username(username).address(address).build();
log.info("添加用户开始");
super.save(user);
log.info("添加用户成功, 发送事件开始");
applicationContext.publishEvent(new AddUserEvent(this, user));
log.info("添加用户成功, 发送事件结束");
// 模拟异常场景
if (address.equals("shanghai")) {
throw new RuntimeException("rollback");
}
return user.getId();
}
}
再次查看日志
2024-08-04 16:47:30.838 INFO 55648 --- [nio-8080-exec-7] t.i.l.s.d.service.impl.UserServiceImpl : 添加用户开始
Creating a new SqlSession
Registering transaction synchronization for SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@7900e41a]
JDBC Connection [HikariProxyConnection@1080868530 wrapping conn0: url=jdbc:h2:~/mydb user=ROOT] will be managed by Spring
==> Preparing: INSERT INTO user ( id, username, address ) VALUES ( ?, ?, ? )
==> Parameters: 1820018715891113985(String), yzt1(String), shanghai(String)
<== Updates: 1
Releasing transactional SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@7900e41a]
2024-08-04 16:47:30.839 INFO 55648 --- [nio-8080-exec-7] t.i.l.s.d.service.impl.UserServiceImpl : 添加用户成功, 发送事件开始
2024-08-04 16:47:30.840 INFO 55648 --- [nio-8080-exec-7] t.i.l.s.demos.listener.UserListener : 接收到添加用户事件, 发送短信完成 Normal, user=User(id=1820018715891113985, username=yzt1, address=shanghai)
2024-08-04 16:47:30.840 INFO 55648 --- [nio-8080-exec-7] t.i.l.s.d.service.impl.UserServiceImpl : 添加用户成功, 发送事件结束
Transaction synchronization deregistering SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@7900e41a]
Transaction synchronization closing SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@7900e41a]
2024-08-04 16:47:30.842 ERROR 55648 --- [nio-8080-exec-7] o.a.c.c.C.[.[.[/].[dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is java.lang.RuntimeException: rollback] with root cause
java.lang.RuntimeException: rollback
因为下游的异常,导致事务并没有成功提交,但是前面的事件监听器已经被执行了,此时数据库未正确写入用户,但是短信已经发送出去了,在业务上肯定是不可接受的。
当然也可以说,把注册用户成功事件放到代码的最后面,这当然也是一种方案,但始终无法做到最完美的一致性问题,因为用户最终事务提交还是有可能失败(超时、重复写入等)。
或者说,将注册成功事件放到事务注解的外部,在确保事务提交之后,再发送事件,就像之前将的Redis的分布式锁解锁一样 使用Redis实现分布式锁的坑#3-事务未提交锁就释放了,这种方案固然可以,但体现到业务代码上,就需要再另一个被Spring代理了的bean上来操作,比较麻烦。
其实Spring是为这种场景提供了解决方案的,那就是@TransactionalEventListener
,通过注解的名称就可以看出来,他是为了解决事务问题来提供的注解,它的使用和@EventListener
完全一致,只是多了一个参数phase
,共有4个选择:
- TransactionPhase.AFTER_COMMIT,事务提交之后
- TransactionPhase.BEFORE_COMMIT,事务提交之前
- TransactionPhase.AFTER_COMPLETION,事务完成之后
- TransactionPhase.AFTER_ROLLBACK,事务回滚之后
通过名称可以很直观的看到他的作用,接下来我们添加5个事件监听器,分别来看看他们具体的执行时机:
public class UserListener {
@EventListener(AddUserEvent.class)
public void addHandler(AddUserEvent event) {
User user = event.getUser();
log.info("接收到添加用户事件, 发送短信完成 Normal, user={}", user.toString());
}
@TransactionalEventListener(value = AddUserEvent.class, phase = TransactionPhase.AFTER_COMMIT)
public void addHandlerAfterCommit(AddUserEvent event) {
User user = event.getUser();
log.info("接收到添加用户事件, 发送短信完成 AFTER_COMMIT, user={}", user.toString());
}
@TransactionalEventListener(value = AddUserEvent.class, phase = TransactionPhase.BEFORE_COMMIT)
public void addHandlerBeforeCommit(AddUserEvent event) {
User user = event.getUser();
log.info("接收到添加用户事件, 发送短信完成 BEFORE_COMMIT, user={}", user.toString());
}
@TransactionalEventListener(value = AddUserEvent.class, phase = TransactionPhase.AFTER_COMPLETION)
public void addHandlerAfterCompletion(AddUserEvent event) {
User user = event.getUser();
log.info("接收到添加用户事件, 发送短信完成 AFTER_COMPLETION, user={}", user.toString());
}
@TransactionalEventListener(value = AddUserEvent.class, phase = TransactionPhase.AFTER_ROLLBACK)
public void addHandlerAfterRollback(AddUserEvent event) {
User user = event.getUser();
log.info("添加用户事务执行失败, AFTER_ROLLBACK, user={}", user.toString());
}
}
执行一次成功的写入,看看日志。
2024-08-04 17:02:09.069 INFO 56859 --- [nio-8080-exec-4] t.i.l.s.d.service.impl.UserServiceImpl : 添加用户开始
Creating a new SqlSession
Registering transaction synchronization for SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@3e9cf44b]
JDBC Connection [HikariProxyConnection@879009595 wrapping conn0: url=jdbc:h2:~/mydb user=ROOT] will be managed by Spring
==> Preparing: INSERT INTO user ( id, username, address ) VALUES ( ?, ?, ? )
==> Parameters: 1820022399454720001(String), yzt1(String), shenzhen(String)
<== Updates: 1
Releasing transactional SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@3e9cf44b]
2024-08-04 17:02:09.070 INFO 56859 --- [nio-8080-exec-4] t.i.l.s.d.service.impl.UserServiceImpl : 添加用户成功, 发送事件开始
2024-08-04 17:02:09.070 INFO 56859 --- [nio-8080-exec-4] t.i.l.s.demos.listener.UserListener : 接收到添加用户事件, 发送短信完成 Normal, user=User(id=1820022399454720001, username=yzt1, address=shenzhen)
2024-08-04 17:02:09.071 INFO 56859 --- [nio-8080-exec-4] t.i.l.s.d.service.impl.UserServiceImpl : 添加用户成功, 发送事件结束
Transaction synchronization committing SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@3e9cf44b]
2024-08-04 17:02:09.071 INFO 56859 --- [nio-8080-exec-4] t.i.l.s.demos.listener.UserListener : 接收到添加用户事件, 发送短信完成 BEFORE_COMMIT, user=User(id=1820022399454720001, username=yzt1, address=shenzhen)
Transaction synchronization deregistering SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@3e9cf44b]
Transaction synchronization closing SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@3e9cf44b]
2024-08-04 17:02:09.072 INFO 56859 --- [nio-8080-exec-4] t.i.l.s.demos.listener.UserListener : 接收到添加用户事件, 发送短信完成 AFTER_COMMIT, user=User(id=1820022399454720001, username=yzt1, address=shenzhen)
2024-08-04 17:02:09.072 INFO 56859 --- [nio-8080-exec-4] t.i.l.s.demos.listener.UserListener : 接收到添加用户事件, 发送短信完成 AFTER_COMPLETION, user=User(id=1820022399454720001, username=yzt1, address=shenzhen)
再执行一次失败的写入,看看日志:
2024-08-04 17:09:30.720 INFO 56859 --- [nio-8080-exec-7] t.i.l.s.d.service.impl.UserServiceImpl : 添加用户开始
Creating a new SqlSession
Registering transaction synchronization for SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@4ef5b6da]
JDBC Connection [HikariProxyConnection@566058297 wrapping conn0: url=jdbc:h2:~/mydb user=ROOT] will be managed by Spring
==> Preparing: INSERT INTO user ( id, username, address ) VALUES ( ?, ?, ? )
==> Parameters: 1820024251877470209(String), yzt1(String), shanghai(String)
<== Updates: 1
Releasing transactional SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@4ef5b6da]
2024-08-04 17:09:30.722 INFO 56859 --- [nio-8080-exec-7] t.i.l.s.d.service.impl.UserServiceImpl : 添加用户成功, 发送事件开始
2024-08-04 17:09:30.722 INFO 56859 --- [nio-8080-exec-7] t.i.l.s.demos.listener.UserListener : 接收到添加用户事件, 发送短信完成 Normal, user=User(id=1820024251877470209, username=yzt1, address=shanghai)
2024-08-04 17:09:30.722 INFO 56859 --- [nio-8080-exec-7] t.i.l.s.d.service.impl.UserServiceImpl : 添加用户成功, 发送事件结束
Transaction synchronization deregistering SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@4ef5b6da]
Transaction synchronization closing SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@4ef5b6da]
2024-08-04 17:09:30.723 INFO 56859 --- [nio-8080-exec-7] t.i.l.s.demos.listener.UserListener : 接收到添加用户事件, 发送短信完成 AFTER_COMPLETION, user=User(id=1820024251877470209, username=yzt1, address=shanghai)
2024-08-04 17:09:30.723 INFO 56859 --- [nio-8080-exec-7] t.i.l.s.demos.listener.UserListener : 添加用户事务执行失败, AFTER_ROLLBACK, user=User(id=1820024251877470209, username=yzt1, address=shanghai)
2024-08-04 17:09:30.725 ERROR 56859 --- [nio-8080-exec-7] o.a.c.c.C.[.[.[/].[dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is java.lang.RuntimeException: rollback] with root cause
java.lang.RuntimeException: rollback
执行流程
通过上面的日志可以看到整个执行过程如下图所示:
由此可知,有了@TransactionalEventListener
,我们不用确定事务的提交时机以及是否成功,只需要编写对应的监听器处理器,并指定执行时事务的时机即可在正确的时间点被调用,这一切都是Spring的AOP在帮我们处理。
TransactionalApplicationListenerMethodAdapter
public void onApplicationEvent(ApplicationEvent event) {
if (TransactionSynchronizationManager.isSynchronizationActive() &&
TransactionSynchronizationManager.isActualTransactionActive()) {
TransactionSynchronizationManager.registerSynchronization(
new TransactionalApplicationListenerSynchronization<>(event, this, this.callbacks));
}
else if (this.annotation.fallbackExecution()) {
if (this.annotation.phase() == TransactionPhase.AFTER_ROLLBACK && logger.isWarnEnabled()) {
logger.warn("Processing " + event + " as a fallback execution on AFTER_ROLLBACK phase");
}
processEvent(event);
}
else {
// No transactional event execution at all
if (logger.isDebugEnabled()) {
logger.debug("No transaction is active - skipping " + event);
}
}
}
如果有事务在进行中,就将其监听器处理器先放到TransactionSynchronizationManager
注册一个同步队列,在事务执行到对应的阶段,再回调每个监听了对应阶段的事务处理器。
在AbstractPlatformTransactionManager#processCommit
中
protected final void triggerBeforeCommit(DefaultTransactionStatus status) {
if (status.isNewSynchronization()) {
TransactionSynchronizationUtils.triggerBeforeCommit(status.isReadOnly());
}
}
protected final void triggerBeforeCompletion(DefaultTransactionStatus status) {
if (status.isNewSynchronization()) {
TransactionSynchronizationUtils.triggerBeforeCompletion();
}
}
private void triggerAfterCommit(DefaultTransactionStatus status) {
if (status.isNewSynchronization()) {
TransactionSynchronizationUtils.triggerAfterCommit();
}
}
小结
通过本篇文章,我们大致了解了@TransactionalEventListener
注解的使用场景和注意事项,也了解了其大概的实现原理,其实SpringEvent的相关源码看起来非常容易,只要稍微看过Spring相关源码,并且对SpringAOP相关逻辑了解的,就可以很容易的看懂,因为他的调用过程没那么多弯弯绕绕,只要看着applicationContext.publishEvent
方法一直往下盯,很快就能够看完整个的执行过程。
本篇博客对应的源码,希望能对你有所帮助:spring-boot-events