在阅读VIVO的博客高性能无锁队列 Disruptor 核心原理分析及其在i主题业务中的应用的时候,了解到 Disruptor 高性能的核心原因:
空间预分配
避免伪共享
无锁
其中 伪共享 的概念之前没有了解过,故特地了解学习了下,主要涉及到一些基础的概念:
CPU的分级缓存机制
volatile的内存可见性
Java long类型/Long类型的字节大小
众所周知,CPU在读取内存中的数据时,并不是读取的直接内存,而是从L1/L2/L3缓存中读取数据,而读取缓存也并非按1字节的读取,而是按照缓存行(通常64字节)一块一块的读取,以此来提高读取效率。
周所也周知,现代计算机都是多核CPU在运行,线程都会被分配CPU来执行,所以线程内的变量数据是需要读取到CPU Cache中才能够对CPU可见的,为了解决内存在CPU1中修改后CPU2不可见(脏读)的问题,在Java中有设计变量修饰符volatile来修饰变量,以此来实现数据在多个CPU之间不会产生脏读问题,内存在任何一个CPU上发生修改后,在其他CPU上均不可用而丢弃重新从内存中获取。
当然,也正是因为以上设计,带来了一些预期外的结果(不是问题或bug),如下代码所示:
对于下面的代码:
public class FalseSharing {
public static void main(String[] args) throws InterruptedException {
int num = 100000000;
Pointer pointer1 = new Pointer();
long start = System.currentTimeMillis();
Thread t1 = new Thread(() -> {
for(int i = 0; i < num; i++){
pointer1.x++;
}
});
Thread t2 = new Thread(() -> {
for(int i = 0; i < num; i++){
pointer1.y++;
}
});
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("pointer1=" + (System.currentTimeMillis() - start));
}
}
class Pointer {
volatile long x;
volatile long y;
}
上述代码在我的电脑上执行,需要3.5秒+,多次执行亦然如此,而我们只需稍作调整,改成如下代码,则只需要0.5秒左右即可执行完毕,这是为何?
public class FalseSharing {
public static void main(String[] args) throws InterruptedException {
int num = 100000000;
Pointer2 pointer2 = new Pointer2();
long start2 = System.currentTimeMillis();
Thread t3 = new Thread(() -> {
for(int i = 0; i < num; i++){
pointer2.x++;
}
});
Thread t4 = new Thread(() -> {
for(int i = 0; i < num; i++){
pointer2.y++;
}
});
t3.start();
t4.start();
t3.join();
t4.join();
System.out.println("pointer2=" + (System.currentTimeMillis() - start2));
}
}
class Pointer2 {
volatile long x;
// long p1, p2, p3, p4, p5, p6, p7;
Long z1, z2; long z3;
volatile long y;
}
以上现象对应的机制,正是因为缓存行的设计。
在上面的代码中,包含了线程t1/t2(t3和t4等同)两组线程,分别对x和y进行累加操作,看似是两个线程(CPU在分别对两块内存中的变量执行累加),但是因为CPU是以缓存行的方式读取内存,Pointer1中的x和y在内存中时相邻的两块内存,Java的基本数据类型long类型,占用8字节,所以x和y(加起来16字节)被放到同一个缓存行了,当CPU1对x做了修改后,CPU2读取到y时发现,对应缓存行已经失效了,所以不得不重新从内存中重新读取数据,从而导致了效率降低。
而我们下面给出的代码,Pointer2的x和y变量间,被塞了两组变量:
long p1, p2, p3, p4, p5, p6, p7; --long类型,占用8个字节, 8*7=56
Long z1, z2; long z3; --包装Long类型,根据计算机不同有所不同,在我电脑上占用24字节, 24+24+8=56
两组变量任何一组都可以解决伪共享的问题,之所以塞两组变量,都是为了验证缓存行的存在和解决方案。
方案一直接使用7个基本数据类型,占用了56个字节,加上x变量自身,刚好占用一个完整的缓存行64字节,y只能在另一个缓存行了。
方案二使用了2组包装数据类型,1组基本数据类型,加起来依旧占用的是56个字节,依旧能将一个缓存行占满,解决缓存行失效问题。
通过对这块的学习,对基础知识又做了一个巩固,造火箭的知识又增加了。🤡
博文参考:
美团:高性能队列——Disruptor
VIVO:高性能无锁队列 Disruptor 核心原理分析及其在i主题业务中的应用
实操代码:FalseSharing
Read More ~
Spring Events使用和问题分析
此篇文章主要用于讲解使用Spring进行编码时,核心与非核心代码解耦合常用的观察者模式@EventListener的使用方法,以及不常用的@TransactionalEventListener的使用场景和注意事项。
常规情况
在我们常规的业务开发中,有很多场景都会使用到观察者模式来解耦合,将非核心流程剥离到主流程之外,提高代码的可读性,举例:用户注册流程,当用户注册完成之后,需要给用户发送一个注册成功的短信通知,核心流程是存储用户信息,次要流程是发送短信通知。
由于本文主要是用来介绍SpringEvent的使用,所以另外的方案就不做介绍了,接下来让我们来初始化一个项目来模拟这部分业务代码的实现:
对于这样一个流程,我们常规的设计方案有以下几种:
设计一个有界队列线程池,将发送短信流程提交给异步线程执行。
使用SpringEvent,发布用户注册成功事件,由监听器执行(也可以选择执行线程)。
使用MQ,由消费者负责执行。(最重,最稳妥的方案)
Entity
public class User {
@TableId(type = IdType.ASSIGN_ID)
private String id;
private String username;
private String address;
}
Service
public interface UserService extends IService<User> {
String addUser(String username, String address);
}
public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements UserService {
private final ApplicationContext applicationContext;
@Override
@Transactional(rollbackFor = Exception.class)
public String addUser(String username, String address) {
User user = User.builder().username(username).address(address).build();
log.info("添加用户开始");
super.save(user);
log.info("添加用户成功, 发送事件开始");
applicationContext.publishEvent(new AddUserEvent(this, user));
log.info("添加用户成功, 发送事件结束");
return user.getId();
}
}
Listener
public class UserListener {
@EventListener(AddUserEvent.class)
public void addHandler(AddUserEvent event) {
User user = event.getUser();
log.info("接收到添加用户事件, 发送短信完成 Normal, user={}", user.toString());
}
}
Controller
@RequestMapping("user")
public class UserController {
private final UserService userService;
@PostMapping("/add")
public String add(String username, String address) {
return userService.addUser(username, address);
}
}
DB
ORM框架我们使用Mybatis-Plus,数据库我们使用db2作为临时数据库,目前没有数据。
2024-08-04 16:31:45.590 INFO 55648 --- [nio-8080-exec-4] t.i.l.s.d.service.impl.UserServiceImpl : 添加用户开始
Creating a new SqlSession
Registering transaction synchronization for SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@6e3d4aa9]
JDBC Connection [HikariProxyConnection@1603671607 wrapping conn0: url=jdbc:h2:~/mydb user=ROOT] will be managed by Spring
==> Preparing: INSERT INTO user ( id, username, address ) VALUES ( ?, ?, ? )
==> Parameters: 1820014751548219393(String), yzt(String), shenzhen(String)
<== Updates: 1
Releasing transactional SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@6e3d4aa9]
2024-08-04 16:31:45.705 INFO 55648 --- [nio-8080-exec-4] t.i.l.s.d.service.impl.UserServiceImpl : 添加用户成功, 发送事件开始
2024-08-04 16:31:45.708 INFO 55648 --- [nio-8080-exec-4] t.i.l.s.demos.listener.UserListener : 接收到添加用户事件, 发送短信完成 Normal, user=User(id=1820014751548219393, username=yzt, address=shenzhen)
2024-08-04 16:31:45.709 INFO 55648 --- [nio-8080-exec-4] t.i.l.s.d.service.impl.UserServiceImpl : 添加用户成功, 发送事件结束
Transaction synchronization committing SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@6e3d4aa9]
Transaction synchronization deregistering SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@6e3d4aa9]
Transaction synchronization closing SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@6e3d4aa9]
再次查看数据
select * from user;
ID
USERNAME
ADDRESS
1820014751548219393
yzt
shenzhen
(1 row, 2 ms)
小结
为了更好的呈现执行过程,上面的代码没有添加异步,通过上面的代码日志,可以看到整个过程大概如下图所示
一般我们的业务编码都是使用@EventListener来实现,在正常情况下不会有什么问题。但是在某些特殊场景下,可能会出现预期之外的结果。
场景2,事务异常
上面我们演示的更多是业务正常的情况,但通常情况下,添加用户不仅仅只有这么简单的业务,在存储之后,可能还有其他的业务,比如为邀请人结算奖励之类(举例),那么流程长了之后总会有异常的可能,比如我们用address=shanghai来模拟业务异常:
public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements UserService {
private final ApplicationContext applicationContext;
@Override
@Transactional(rollbackFor = Exception.class)
public String addUser(String username, String address) {
User user = User.builder().username(username).address(address).build();
log.info("添加用户开始");
super.save(user);
log.info("添加用户成功, 发送事件开始");
applicationContext.publishEvent(new AddUserEvent(this, user));
log.info("添加用户成功, 发送事件结束");
// 模拟异常场景
if (address.equals("shanghai")) {
throw new RuntimeException("rollback");
}
return user.getId();
}
}
再次查看日志
2024-08-04 16:47:30.838 INFO 55648 --- [nio-8080-exec-7] t.i.l.s.d.service.impl.UserServiceImpl : 添加用户开始
Creating a new SqlSession
Registering transaction synchronization for SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@7900e41a]
JDBC Connection [HikariProxyConnection@1080868530 wrapping conn0: url=jdbc:h2:~/mydb user=ROOT] will be managed by Spring
==> Preparing: INSERT INTO user ( id, username, address ) VALUES ( ?, ?, ? )
==> Parameters: 1820018715891113985(String), yzt1(String), shanghai(String)
<== Updates: 1
Releasing transactional SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@7900e41a]
2024-08-04 16:47:30.839 INFO 55648 --- [nio-8080-exec-7] t.i.l.s.d.service.impl.UserServiceImpl : 添加用户成功, 发送事件开始
2024-08-04 16:47:30.840 INFO 55648 --- [nio-8080-exec-7] t.i.l.s.demos.listener.UserListener : 接收到添加用户事件, 发送短信完成 Normal, user=User(id=1820018715891113985, username=yzt1, address=shanghai)
2024-08-04 16:47:30.840 INFO 55648 --- [nio-8080-exec-7] t.i.l.s.d.service.impl.UserServiceImpl : 添加用户成功, 发送事件结束
Transaction synchronization deregistering SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@7900e41a]
Transaction synchronization closing SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@7900e41a]
2024-08-04 16:47:30.842 ERROR 55648 --- [nio-8080-exec-7] o.a.c.c.C.[.[.[/].[dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is java.lang.RuntimeException: rollback] with root cause
java.lang.RuntimeException: rollback
因为下游的异常,导致事务并没有成功提交,但是前面的事件监听器已经被执行了,此时数据库未正确写入用户,但是短信已经发送出去了,在业务上肯定是不可接受的。
当然也可以说,把注册用户成功事件放到代码的最后面,这当然也是一种方案,但始终无法做到最完美的一致性问题,因为用户最终事务提交还是有可能失败(超时、重复写入等)。
或者说,将注册成功事件放到事务注解的外部,在确保事务提交之后,再发送事件,就像之前将的Redis的分布式锁解锁一样 使用Redis实现分布式锁的坑#3-事务未提交锁就释放了,这种方案固然可以,但体现到业务代码上,就需要再另一个被Spring代理了的bean上来操作,比较麻烦。
其实Spring是为这种场景提供了解决方案的,那就是@TransactionalEventListener,通过注解的名称就可以看出来,他是为了解决事务问题来提供的注解,它的使用和@EventListener完全一致,只是多了一个参数phase,共有4个选择:
TransactionPhase.AFTER_COMMIT,事务提交之后
TransactionPhase.BEFORE_COMMIT,事务提交之前
TransactionPhase.AFTER_COMPLETION,事务完成之后
TransactionPhase.AFTER_ROLLBACK,事务回滚之后
通过名称可以很直观的看到他的作用,接下来我们添加5个事件监听器,分别来看看他们具体的执行时机:
public class UserListener {
@EventListener(AddUserEvent.class)
public void addHandler(AddUserEvent event) {
User user = event.getUser();
log.info("接收到添加用户事件, 发送短信完成 Normal, user={}", user.toString());
}
@TransactionalEventListener(value = AddUserEvent.class, phase = TransactionPhase.AFTER_COMMIT)
public void addHandlerAfterCommit(AddUserEvent event) {
User user = event.getUser();
log.info("接收到添加用户事件, 发送短信完成 AFTER_COMMIT, user={}", user.toString());
}
@TransactionalEventListener(value = AddUserEvent.class, phase = TransactionPhase.BEFORE_COMMIT)
public void addHandlerBeforeCommit(AddUserEvent event) {
User user = event.getUser();
log.info("接收到添加用户事件, 发送短信完成 BEFORE_COMMIT, user={}", user.toString());
}
@TransactionalEventListener(value = AddUserEvent.class, phase = TransactionPhase.AFTER_COMPLETION)
public void addHandlerAfterCompletion(AddUserEvent event) {
User user = event.getUser();
log.info("接收到添加用户事件, 发送短信完成 AFTER_COMPLETION, user={}", user.toString());
}
@TransactionalEventListener(value = AddUserEvent.class, phase = TransactionPhase.AFTER_ROLLBACK)
public void addHandlerAfterRollback(AddUserEvent event) {
User user = event.getUser();
log.info("添加用户事务执行失败, AFTER_ROLLBACK, user={}", user.toString());
}
}
执行一次成功的写入,看看日志。
2024-08-04 17:02:09.069 INFO 56859 --- [nio-8080-exec-4] t.i.l.s.d.service.impl.UserServiceImpl : 添加用户开始
Creating a new SqlSession
Registering transaction synchronization for SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@3e9cf44b]
JDBC Connection [HikariProxyConnection@879009595 wrapping conn0: url=jdbc:h2:~/mydb user=ROOT] will be managed by Spring
==> Preparing: INSERT INTO user ( id, username, address ) VALUES ( ?, ?, ? )
==> Parameters: 1820022399454720001(String), yzt1(String), shenzhen(String)
<== Updates: 1
Releasing transactional SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@3e9cf44b]
2024-08-04 17:02:09.070 INFO 56859 --- [nio-8080-exec-4] t.i.l.s.d.service.impl.UserServiceImpl : 添加用户成功, 发送事件开始
2024-08-04 17:02:09.070 INFO 56859 --- [nio-8080-exec-4] t.i.l.s.demos.listener.UserListener : 接收到添加用户事件, 发送短信完成 Normal, user=User(id=1820022399454720001, username=yzt1, address=shenzhen)
2024-08-04 17:02:09.071 INFO 56859 --- [nio-8080-exec-4] t.i.l.s.d.service.impl.UserServiceImpl : 添加用户成功, 发送事件结束
Transaction synchronization committing SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@3e9cf44b]
2024-08-04 17:02:09.071 INFO 56859 --- [nio-8080-exec-4] t.i.l.s.demos.listener.UserListener : 接收到添加用户事件, 发送短信完成 BEFORE_COMMIT, user=User(id=1820022399454720001, username=yzt1, address=shenzhen)
Transaction synchronization deregistering SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@3e9cf44b]
Transaction synchronization closing SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@3e9cf44b]
2024-08-04 17:02:09.072 INFO 56859 --- [nio-8080-exec-4] t.i.l.s.demos.listener.UserListener : 接收到添加用户事件, 发送短信完成 AFTER_COMMIT, user=User(id=1820022399454720001, username=yzt1, address=shenzhen)
2024-08-04 17:02:09.072 INFO 56859 --- [nio-8080-exec-4] t.i.l.s.demos.listener.UserListener : 接收到添加用户事件, 发送短信完成 AFTER_COMPLETION, user=User(id=1820022399454720001, username=yzt1, address=shenzhen)
再执行一次失败的写入,看看日志:
2024-08-04 17:09:30.720 INFO 56859 --- [nio-8080-exec-7] t.i.l.s.d.service.impl.UserServiceImpl : 添加用户开始
Creating a new SqlSession
Registering transaction synchronization for SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@4ef5b6da]
JDBC Connection [HikariProxyConnection@566058297 wrapping conn0: url=jdbc:h2:~/mydb user=ROOT] will be managed by Spring
==> Preparing: INSERT INTO user ( id, username, address ) VALUES ( ?, ?, ? )
==> Parameters: 1820024251877470209(String), yzt1(String), shanghai(String)
<== Updates: 1
Releasing transactional SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@4ef5b6da]
2024-08-04 17:09:30.722 INFO 56859 --- [nio-8080-exec-7] t.i.l.s.d.service.impl.UserServiceImpl : 添加用户成功, 发送事件开始
2024-08-04 17:09:30.722 INFO 56859 --- [nio-8080-exec-7] t.i.l.s.demos.listener.UserListener : 接收到添加用户事件, 发送短信完成 Normal, user=User(id=1820024251877470209, username=yzt1, address=shanghai)
2024-08-04 17:09:30.722 INFO 56859 --- [nio-8080-exec-7] t.i.l.s.d.service.impl.UserServiceImpl : 添加用户成功, 发送事件结束
Transaction synchronization deregistering SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@4ef5b6da]
Transaction synchronization closing SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@4ef5b6da]
2024-08-04 17:09:30.723 INFO 56859 --- [nio-8080-exec-7] t.i.l.s.demos.listener.UserListener : 接收到添加用户事件, 发送短信完成 AFTER_COMPLETION, user=User(id=1820024251877470209, username=yzt1, address=shanghai)
2024-08-04 17:09:30.723 INFO 56859 --- [nio-8080-exec-7] t.i.l.s.demos.listener.UserListener : 添加用户事务执行失败, AFTER_ROLLBACK, user=User(id=1820024251877470209, username=yzt1, address=shanghai)
2024-08-04 17:09:30.725 ERROR 56859 --- [nio-8080-exec-7] o.a.c.c.C.[.[.[/].[dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is java.lang.RuntimeException: rollback] with root cause
java.lang.RuntimeException: rollback
执行流程
通过上面的日志可以看到整个执行过程如下图所示:
由此可知,有了@TransactionalEventListener,我们不用确定事务的提交时机以及是否成功,只需要编写对应的监听器处理器,并指定执行时事务的时机即可在正确的时间点被调用,这一切都是Spring的AOP在帮我们处理。
TransactionalApplicationListenerMethodAdapter
public void onApplicationEvent(ApplicationEvent event) {
if (TransactionSynchronizationManager.isSynchronizationActive() &&
TransactionSynchronizationManager.isActualTransactionActive()) {
TransactionSynchronizationManager.registerSynchronization(
new TransactionalApplicationListenerSynchronization<>(event, this, this.callbacks));
}
else if (this.annotation.fallbackExecution()) {
if (this.annotation.phase() == TransactionPhase.AFTER_ROLLBACK && logger.isWarnEnabled()) {
logger.warn("Processing " + event + " as a fallback execution on AFTER_ROLLBACK phase");
}
processEvent(event);
}
else {
// No transactional event execution at all
if (logger.isDebugEnabled()) {
logger.debug("No transaction is active - skipping " + event);
}
}
}
如果有事务在进行中,就将其监听器处理器先放到TransactionSynchronizationManager注册一个同步队列,在事务执行到对应的阶段,再回调每个监听了对应阶段的事务处理器。
在AbstractPlatformTransactionManager#processCommit中
protected final void triggerBeforeCommit(DefaultTransactionStatus status) {
if (status.isNewSynchronization()) {
TransactionSynchronizationUtils.triggerBeforeCommit(status.isReadOnly());
}
}
protected final void triggerBeforeCompletion(DefaultTransactionStatus status) {
if (status.isNewSynchronization()) {
TransactionSynchronizationUtils.triggerBeforeCompletion();
}
}
private void triggerAfterCommit(DefaultTransactionStatus status) {
if (status.isNewSynchronization()) {
TransactionSynchronizationUtils.triggerAfterCommit();
}
}
小结
通过本篇文章,我们大致了解了@TransactionalEventListener注解的使用场景和注意事项,也了解了其大概的实现原理,其实SpringEvent的相关源码看起来非常容易,只要稍微看过Spring相关源码,并且对SpringAOP相关逻辑了解的,就可以很容易的看懂,因为他的调用过程没那么多弯弯绕绕,只要看着applicationContext.publishEvent方法一直往下盯,很快就能够看完整个的执行过程。
本篇博客对应的源码,希望能对你有所帮助:spring-boot-events
Read More ~
当Spring多层嵌套事务遇到Mybatis-Plus的saveBatch
前言
事情的起因还要从 Transaction rolled back because it has been marked as rollback-only] with root cause 说起,公司的一个业务,Kafka Consumer消费了来自上游的消息处理并写入数据,整个逻辑很复杂,由最外层的方法注解 @Transactional(rollbackFor = Exception.class) 来保证整个过程数据的一致性,但其中有一个业务不需要事务保证,所以前人写了一个 try catch 来防止子业务流异常阻断全局事务的提交。简化后大概类似于这样:
@Service
public class ServiceA {
@Resource
private ServiceB serviceB;
@Resource
private CompanyService companyService;
/**
* 模拟消费外部的消息, 调用者A
*/
@Transactional(rollbackFor = Exception.class)
public void test(String username) {
// 1. 首先保存自己的数据, 模拟业务操作
Company company = new Company();
company.setName("家里蹲");
company.setAddress("localhost");
companyService.save(company);
// 2. 然后调用Spring的代理方法B
serviceB.doBusiness(username);
}
}
@Service
public class ServiceB {
@Resource
private UserService userService;
@Transactional(rollbackFor = Exception.class)
public void doBusiness(String username) {
// 3. 这里还有一系列事务操作, 但和这次问题无关, 就不放这里干扰了
try {
User user = new User();
user.setUsername(username);
user.setAddress("shenzhen");
// 4. 罪魁祸首的代码, 此处前人为了不干扰主逻辑, catch了异常
// 为了达到"以为的"异常不干扰外部事务
userService.saveBatch(Collections.singletonList(user));
} catch (DuplicateKeyException e) {
// 5. 此处有日志 log.error("xxx业务异常", e)
e.printStackTrace();
}
}
}
上面的代码看起来平平无奇,按照“常规”理解,代码块4位置的流程异常,不会干扰代码块1、3处的事务提交,达到编写者的目的。
但如果是这么简单,我也不会有这次问题的排查过程,也不会有这篇笔记了...
现象
我们从日志监控注意到,代码块5有频繁的异常,并且代码块1、3的数据最终并未写入成功,通过MySQL追踪事务发现,1、3处的语句执行后,均被rollback了。
这就引出了前言中的问题分析来了,在没有确认问题出现在代码块4时,还在找哪些地方有异常可能造成回滚,但通过跟踪程序日志发现,1、3处被回滚的数据,基本都发生在代码块5的日志打印出来之后,并且程序中也发现了下面的异常:
org.springframework.transaction.UnexpectedRollbackException: Transaction rolled back because it has been marked as rollback-only
.......
Transaction rolled back because it has been marked as rollback-only,说明事务被标记为只能回滚,并且根据行号(上述堆栈未标注业务行号)推断出,是发生代码块2处,调用doBusiness方法发生了回滚。
分析
我们都知道,Spring本身没有事务,只有数据库有事务,Spring提供@Transactional以及事务传播机制,只是利用数据库给我们提供的 begin/commit/rollback 来结合 Spring AOP 实现的统一事务管理,通过 PlatformTransactionManager 工具类来实现对事务的管理。(这方面网上优秀的博客太多了,就不细讲了,相信每一个Spring Boy都熟知)
那么,结合对Spring事务管理器和AOP的理解,我们画一张图来理解上面这个“嵌套事务”的过程。
结合流程图来分析,按“常理”理解的话,catch 了 userService.saveBatch,内部的异常就不会造成外面的回滚,因为异常“被”吞了。
但实则不然,Spring事务管理器采用的是 标记位Savepoint 方法,只要被事务管理器感知到了异常出现,就会将当前线程的事务标记成为rollbackOnly状态,后面的事务再想提交就不行了。这也就导致了ServiceB.doBusiness方法在“想要”提交事务时,发现了事务已经被标记了,就抛出了UnexpectedRollbackException异常,该异常进一步的造成了ServiceA.test方法的代理类检测到了异常,ServiceA.test的代理类也开始处理回滚,进一步造成了整个 Kafka Consumer的事务造成了回滚。
源码
通过搜索“Transaction rolled back because it has been marked as rollback-only”异常,可以很快定位到异常流程在源码中的位置,org.springframework.transaction.support.AbstractPlatformTransactionManager#processRollback
而通过跟踪该方法的调用方,可以很快定位到org.springframework.transaction.support.AbstractPlatformTransactionManager#commit方法,该方法正印证我们上方的 ServiceB.doBusiness想要 commit的时候,defStatus.isGlobalRollbackOnly()检测到当前事务已经被标记了rollbackOnly(org.springframework.transaction.support.ResourceHolderSupport#rollbackOnly)
解决
其实该问题在分析之初,大家并没有定位到是因为userService.saveBatch()上面有Mybatis-Plus加的@Transactional注解干扰到 catch 异常这么顺利,而是都在怀疑:为什么我catch了,还把我上层的事务给回滚了!后面虽然怀疑到userService.saveBatch()的事务注解了,又觉得 try 里面的事务,不会影响到外层(没有想到标记位rollbackOnly这里),后面又想到是不是可以在ServiceB.doBusiness()上调整事务传播机制为Propagation.REQUIRES_NEW,但其实都是徒劳,因为始终都已经被Mybatis-Plus的@Transactional标记为rollbackOnly了。
解决方案:
调用无事务注解的save()方法
改用Db.saveBatch方法
改用异步线程(切换了事务管理器)处理 try 中的流程
后记
下面这篇文章,在我们排查问题的过程中,给了很大的帮助,帮我们确定了问题的方向:
告警:MyBatis-Plus中慎用@Transactional注解,坑的差点被开了...
关于是否可以取消saveBatch方法上事务注解的讨论,可以看Mybatis-Plus官方的issue区:
(3.5.7之后,取消了IService,都使用Mapper方法,彻底解决.......)
建议取消数据层的saveBatch等方法的@Transactional注解,交由业务层自行管理 #6333
saveBatch下的事务问题
我新建了一个工程,用来复现该问题:
复现定位Spring多层嵌套事务问题
Read More ~
MySQL GROUP BY 隐式排序
前言
在MySQL的老版本(MySQL5.7及以前)中, GROUP BY xxx 等同于 GROUP BY xxx ORDER BY xxx,在 GROUP BY 的同时,会自带排序的效果。包括也可以手动排序 GROUP BY xxx DESC,等同于 GROUP BY xxx ORDER BY xxx DESC。
MySQL8.0 版本删除了隐式排序,包括显示排序也报错。在升级到MySQL8.0之后,由于缺少了隐式排序,可能会造成查询结果返回的顺序不稳定,造成结果不满足预期。
问题现象
aliyun PolarDB MySQL 版 8.0.1.1.40版本,在DMS查询时,返回的顺序稳定。
在程序JDBC查询时(连接的集群地址),返回的结果多次查询顺序不一致。
由于我们使用的是PolarDB 8.0+,首先开始怀疑是不是aliyun做了什么操作,查到他们的更新日志:https://help.aliyun.com/zh/polardb/polardb-for-mysql/polardb-for-mysql-8-0-1,发现他们在8.0版本中,将隐式排序的功能又加了回来,那么更进一步证明问题出现在数据库层面了。
有了上面的分析和更新日志,我们又进行了接下来的几次测试,进一步缩小问题范围,定位问题的根本原因。
通过MySQL命令行,连接到集群地址,返回的结果多次查询顺序不一致。
通过MySQL命令行,连接到主节点地址,返回的顺序稳定。
通过MySQL命令行,连接到只读从节点地址,返回的结果多次查询顺序不一致。
通过咨询aliyun工程师,了解到DMS默认是直连主库的,结合上述三个执行结果来看,似乎问题出现在了从库,当请求从从库执行来看,顺序总是不一致。
有了上述的结论,通过进一步咨询aliyun的工程师,最终定位到问题是因为其中有台从库开启了并行查询,导致了返回顺序会存在多次查询不一致的问题。
总结
虽然最终定位到了原因是并行查询导致的问题,但是我还是认为,既然aliyun的版本公告中已经说明了加回了 GROUP BY 隐式排序功能,那么不管是并行查询还是什么原因,就算是在 MySQL Server层进行汇总排序,也需要保证最终结果和预期一致,否则会造成使用方的预期不满足。
其次,对于隐式排序肯定不能进行依赖,还是回归到程序上,不论什么情况肯定不能依赖于约定俗成的一些功能,应该强依赖自己写下的指令,比如手动指定 ORDER BY 比事后去分析还是好很多。但整个过程还是学习到不少东西,还是有一定收获的。
扩展
旧版本为什么会有排序?
因为要对数据进行分组的话,本身也会对分组前的数据进行排序,降低数据复杂度,所以原始数据本身就已经有序了。
其次,如果是 GROUP BY 的一个索引字段,那么索引字段本身是 B+ Trree,本身也是有序的,且平时 GROUP BY的字段大概率是有索引的(否则效率太低了),所以绝大部分情况下,旧版本的 GROUP BY 就为我们保留了排序。
为什么要删除隐式排序?
会造成预期外的结果,比如没有明确指定
Read More ~
SpringBoot 3.3 + Neo4j 5.1 + Java17 接入教程
官方文档: Spring Data Neo4j
基于官方文档,基本就能完成基本的接入,Neo4j的版本很乱,如果使用SpringBoot2.+的话,一定要注意Neo4j Server的版本与Driver的匹配关系,否则很容易踩坑。 由于我不是生产使用,直接用官方最新反倒是没遇到此类问题,但网上对于这个问题遇到的很多。
简单接入
写了几个单测,看了下流程,得益于SpringBoot-Data的封装,和操作MongoDB、MySQL等关系型数据库基本无异,使用过程很丝滑。
@Test
public void readAll() {
List<Person> personList = personRepository.findAll();
Assert.isTrue(!personList.isEmpty(), "isEmpty");
}
@Test
public void delById() {
personRepository.findById(71L).ifPresent(person -> {
System.out.println("before delete: " + person.getName());
});
personRepository.deleteById(71L);
Person person = personRepository.findById(71L).orElse(null);
System.out.println("after delete: " + person);
}
@Test
public void save() {
Person person = new Person();
person.setName("人参果树");
personRepository.save(person);
}
findAll
delById
删除前:
删除后:
saveOne
关系的创建
对于节点的基本查询,由于其结构类似于结构性,和结构性数据库查询无异,但是对于N4o4j特殊的关系结构,就有一些不一样了。
@Repository
public interface PersonRepository extends Neo4jRepository<Person, Long> {
Person findByName(String name);
/**
* 创建人物关系
* @param from 源
* @param relation 关系
* @param to 目标
*/
@Query("match (n:person {name: $from}),(m:person {name: $to}) " +
"create (n)-[:西游人物关系{relation:$relation}]->(m)")
void createRelation(String from, String relation, String to);
}
@SpringBootTest
public class PersonRelationShipRepositoryTest {
@Resource
private PersonRepository personRepository;
@Test
public void addRelation() {
Person personYangjian = personRepository.findByName("杨戬");
Person person2 = new Person();
person2.setName("玉鼎真人");
PersonRelationShip relationShip = new PersonRelationShip();
relationShip.setChild(person2);
relationShip.setRelation("师傅");
personYangjian.getPersonRelationShips().add(relationShip);
personRepository.save(personYangjian);
}
@Test
public void addRelation2() {
personRepository.createRelation("玉皇大帝", "妻子", "王母娘娘");
}
}
创建关系第一种方式
创建关系第二种方式
坑
坑1
spring-boot-data 3.+的很多注解都改变了,例如:
在老版本中标注节点使用的是@NodeEntity,新版本被替换为@Node(labels = "person")
在老版本中,关系的表述是在关系对象中通过@RelationshipEntity、@StartNode、@EndNode来描述,在新版这些都没有了。取而代之的是在源对象中补充属性@Relationship(type = "师傅", direction = Relationship.Direction.OUTGOING)来描述关系。
老关系:
@Data
@RelationshipEntity(type = "xxx")
public class PersonRelationShip {
@Id
@GeneratedValue
private Long id;
@StartNode
private Person parent;
@TargetNode
private Person child;
}
新关系:
@Node(labels = "person")
@Data
public class Person {
@Id
@GeneratedValue
private Long id;
@Property
private String name;
@Relationship(type = "师傅", direction = Relationship.Direction.OUTGOING)
private List<PersonRelationShip> personRelationShips;
}
@Data
@RelationshipProperties
public class PersonRelationShip {
@Id
@GeneratedValue
private Long id;
@TargetNode
private Person child;
@Property
private String relation;
}
坑2
spring-boot-data 3.+的@Query中想要获取参数,需要使用 $fieldName,而不是 {0} 的方式,如果自己写的时候一定要注意。比如:
/**
* 创建人物关系
* @param from 源
* @param relation 关系
* @param to 目标
*/
@Query("match (n:person {name: $from}),(m:person {name: $to}) " +
"create (n)-[:西游人物关系{relation:$relation}]->(m)")
void createRelation(String from, String relation, String to);
Read More ~
使用Cloudflare加速Github Pages
之前就有了解过Cloudflare,但是一直没有去实践,最近了解到它提供了免费的CDN可以加速,所以想找个教程按流程试一下,一个是确实Github Pages访问太慢了(虽说Cloudflare在国内也没有节点...但聊胜于无),另一个主要是也可以学习拓展下知识。其实使用起来整个过程还是挺简单的,基本无脑操作。
注册Cloudflare的账号,填写你的域名,交给它托管(打开官网,点击就送)
拷贝Cloudflare提供的DNS解析,替换你的域名服务商提供的解析(我使用的aliyun,在域名-管理-DNS解析)处可替换
等待几分钟,即可失效。
上面的步骤更像是把大象放进冰箱的步骤,过于简单,主要是网络上针对此流程提供的优秀博客数不胜数,没必要再产生一篇重复的内容,并且文章也主要是用于记录自己接入的过程和遇到的坑。
我看的博客是: 使用cdn加速Github Pages的访问速度
跟着这个博客基本就可以把域名完成托管,整个过程主要是遇到一个坑,在配置完成之后,发现本来通过aliyun解析的网站还能打开,托管之后反倒是打不开了,Chrome提示“重定向的次数过多”,这一步主要是因为解析配置的问题,可以登陆Cloudflare,然后再SSL/TLS菜单里调整下面的配置即可:
原本选中的是“灵活的”,调整为“严谨”即可。
Read More ~
Neo4j常用函数
字符串函数
match (n:Lianhuachi) where n.name = '去种田的向凹凸' return n.name,substring(n.name, 0,3)
AGGREGATION聚合函数
和普通SQL一致的含义
match (n:Lianhuachi) return count(n),max(n.age),min(n.age),avg(n.age),sum(n.age)
关系函数
match (x)-[n:`莲花池人物关系`]->(y) return id(n),n
备份
Neo4j 官方文档: Backup modes
➜ bin ./neo4j-admin database dump neo4j --to-path=/tmp
2024-06-26 15:24:15.414+0000 INFO [o.n.c.d.DumpCommand] Starting dump of database 'neo4j'
Done: 37 files, 2.993MiB processed.
2024-06-26 15:24:19.338+0000 INFO [o.n.c.d.DumpCommand] Dump completed successfully
➜ bin ll /tmp/ |grep "neo4j.dump"
-rw-r--r-- 1 imyzt wheel 305K Jun 26 23:24 neo4j.dump
恢复
Neo4j 官方文档: Restore a database dump
➜ bin ./neo4j-admin database load --from-path=/tmp/ neo4j --overwrite-destination=true
Done: 37 files, 2.993MiB processed.
可以看到,数据恢复了过来。
Read More ~
Neo常用命令 二
UNIQUE约束
避免重复时使用(姓名、身份证不能重复)
旧版本
create constraint on (n:Lianhuachi) assert n.name is unique
新版本
如果再执行上面的语法时报错,说明你使用的是新版本的Neo4j,变更了命令语法。
create constraint uniq_name for (n:Lianhuachi) require n.name is unique
在加了唯一索引后,可以看到,在添加重复数据时会报错:
删除唯一约束
drop constraint uniq_name
可以看到,在删除之后再进行添加,是可以添加进去的。
create (n:Lianhuachi {name: "我是野农", "field": "test"}) return n
match (n:Lianhuachi {name: "我是野农""}) return n
distinct去重
和SQL一致,对内容进行去重
match (n:Lianhuachi) return distinct(n.name)
Read More ~
Spring AI
前言
当一开始听说Spring-AI项目时是很懵的,什么?Spring开始训练模型了?不应该啊,Java还能卷模型赛道了吗?...
打开官网了解了下,才知道原来是缝合怪:
Spring AI is an application framework for AI engineering. Its goal is to apply to the AI domain Spring ecosystem design principles such as portability and modular design and promote using POJOs as the building blocks of an application to the AI domain.
官网地址:https://spring.io/projects/spring-ai
换言之就是虽然Java不能卷算法,但是我可以提供一套封装来让你们调用模型提供的API服务,众所周知,我的抽象封装能力还是很强的。你看你们现在这么多厂商提供服务,不得需要一个统一的门面来减少接入成本嘛...
官网列举了目前国外主流的平台:
Chat Models
OpenAI
Azure Open AI
Amazon Bedrock
Cohere's Command
AI21 Labs' Jurassic-2
Meta's LLama 2
Amazon's Titan
Google Vertex AI Palm
Google Gemini
HuggingFace - access thousands of models, including those from Meta such as Llama2
Ollama - run AI models on your local machine
MistralAI
Text-to-image Models
OpenAI with DALL-E
StabilityAI
Transcription (audio to text) Models
OpenAI
... 数不胜数,更何况还有很多国内大厂开源的模型。
所以Spring提供了一套统一的封装门面,其他的厂商也可以基于门面来实现自己的Client,比如Alibaba就接入了com.alibaba.cloud.ai.tongyi.chat.TongYiChatClient。
这就引申出了本博客,记录了下我接入Spring-Ai(OpenAI)、Spring-Ai-Alibaba(TongYi)的过程。
接入Spring-Ai
Spring-Ai的接入相对简单,因为最新的start.spring.io已经维护了OpenAI的依赖包,在IDEA使用Spring-Initializr即可完成初始化。
完成Gradle的初始化之后,得到了一个标准的Spring项目,只需要做一个简单的配置:
spring.application.name=spring-ai-demo
# 生成结果多样性参数,值在0~2之间,值越大越随机越小越固定,但就算为0也会有随机性
spring.ai.openai.chat.temperature=0.7
spring.ai.chat.client.enabled=true
# 如果你需要代理的话
spring.ai.openai.base-url=https://api.xty.app
# 填写自己的key
spring.ai.openai.api-key=${OPENAI_API_KEY}
# 填写你需要使用的模型(也可以使用时代码指定)
spring.ai.openai.chat.options.model=gpt-3.5-turbo
接下来只需要编写一个Java的控制器,来接收HTTP请求,就可以完成对OpenAI的对话。
/**
* @author imyzt
* @date 2024/06/19
* @description AI 入口
*/
@RestController
@RequestMapping
public class AiController {
@Resource
private ChatClient chatClient;
@GetMapping("/ai/chat")
Map<String, Object> chat(@RequestParam String question) {
ChatClient.ChatClientRequest.CallPromptResponseSpec call = chatClient.prompt(new Prompt(question)).call();
return Map.of("question", question, "answer", call.chatResponse());
}
}
整体接入还是比较简单的,但是这里踩了一个坑,不知道是我引入的版本比较新还是什么缘故,它的ChatClient Bean 竟然没有自动注册!所以我还手动注册了一个Bean,代码如下:
@Bean
public ChatClient chatClient(@Autowired OpenAiChatModel openAiChatModel) {
return ChatClient.builder(openAiChatModel).build();
}
代码
https://github.com/imyzt/learning-technology-code/tree/master/dive-in-springboot-projects/spring-ai-demo/spring-ai-demo
Spring-Ai-Alibaba
OpenAI由于API-KEY的费用蛮高,虽然完成了代码的接入,但是最终我还是没有购买它的API...所以又看了国内的厂商,目前主要是Alibaba完成了Spring-Ai的接入,整体接入其实也很简单,建议首先看一遍官网的例子,接下来可以跟着步骤走一下试试。
创建项目,引入依赖
首先完成一个普通SpringBootWeb项目创建,然后引入Alibaba-Ai的依赖
<dependencies>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-ai</artifactId>
<exclusions>
<exclusion>
<artifactId>spring-ai-core</artifactId>
<groupId>org.springframework.ai</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.springboot.ai</groupId>
<artifactId>spring-ai-core</artifactId>
<version>1.0.3</version>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>2023.0.1.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
上面的内容,我把无关紧要的依赖去除了,只保留了核心部分,其中有一个关键点是exclusions了spring-ai-core,因为Alibaba引入的版本太老了。改为自己重新引入最新的版本,其他的和官方文档无差别。
配置
spring:
application:
name: spring-ai-alibaba-demo
cloud:
ai:
tongyi:
api-key: ${TONGYI_KEY}
images:
enabled: true
chat:
enabled: true
编写控制器
@Slf4j
@RestController
@RequestMapping
public class AiController {
@Resource
private ChatClient chatClient;
@Resource
private ImageClient imageClient;
@GetMapping("/ai/chat")
public String chat(@RequestParam String question) {
ChatResponse call = chatClient.call(new Prompt(question));
return call.getResult().getOutput().getContent();
}
@GetMapping("/ai/aigc")
public String aigc(@RequestParam String question) {
ImageResponse call = imageClient.call(new ImagePrompt(question));
return call.getResult().getOutput().getUrl();
}
}
完成上面的步骤,基本就完成了接入,在postman上面输入地址,就可以进行测试了。因为通义不仅有chat,还可以文生图,所以我完成下演示:
文生文
(质量不予置评)
文生图
Spring-Ai-Alibaba还提供了一些示例,在他们的官方Github上,可以参考。
官方还提供了一个简单的HTML来进行页面展示,也可以自己跑一下看看,最终效果如下:
代码
https://github.com/imyzt/learning-technology-code/tree/master/dive-in-springboot-projects/spring-ai-alibaba-demo/spring-ai-alibaba-demo
参考博客
阿里也出手了!Spring CloudAlibaba AI问世了
AI框架之Spring AI与Spring Cloud Alibaba AI使用讲解
Read More ~
Java New Future CDS
什么是CDS?CDS即 Class-Data Sharing,类数据共享功能,该功能可以减少Java应用程序的启动时间和内存占用。
类数据共享功能有助于减少多个Java虚拟机之间的启动时间和内存占用,从JDK12开始,默认的CDS归档文件和JDK二进制文件预先打包,我是用的JDK为OpenJDK OpenJDK 64-Bit Server VM Zulu17.42+19-CA (build 17.0.7+7-LTS, mixed mode, sharing),是支持CDS的。
使用
训练应用程序
首先初始化一个标准的SpringBoot应用,使用 SpringBoot-3.3.0 + Java17
├── pom.xml
├── src
│ ├── main
│ │ ├── java
│ │ │ └── top
│ │ │ └── imyzt
│ │ │ └── learning
│ │ │ └── cds
│ │ │ └── Java12NewFuturesCdsApplication.java
@SpringBootApplication
public class Java12NewFuturesCdsApplication {
public static void main(String[] args) {
SpringApplication.run(Java12NewFuturesCdsApplication.class, args);
}
}
将其执行Maven打包成jar文件
➜ mvn package -DskipTests=true
执行训练命令
➜ cd target
➜ target ✗ java -Djarmode=tools -jar java12-new-futures-cds-0.0.1-SNAPSHOT.jar extract --destination application
➜ cd application
➜ application ✗ java -XX:ArchiveClassesAtExit=application.jsa -Dspring.context.exit=onRefresh -jar java12-new-futures-cds-0.0.1-SNAPSHOT.jar
训练完成后,application目录下,生成了一系列文件:
➜ application git:(master) ✗ tree
.
├── application.jsa
├── java12-new-futures-cds-0.0.1-SNAPSHOT.jar
└── lib
├── jackson-annotations-2.17.1.jar
├── jackson-core-2.17.1.jar
├── jackson-databind-2.17.1.jar
├── jackson-datatype-jdk8-2.17.1.jar
├── jackson-datatype-jsr310-2.17.1.jar
├── jackson-module-parameter-names-2.17.1.jar
├── jakarta.annotation-api-2.1.1.jar
├── jul-to-slf4j-2.0.13.jar
├── log4j-api-2.23.1.jar
├── log4j-to-slf4j-2.23.1.jar
├── logback-classic-1.5.6.jar
├── logback-core-1.5.6.jar
├── micrometer-commons-1.13.0.jar
├── micrometer-observation-1.13.0.jar
├── slf4j-api-2.0.13.jar
├── snakeyaml-2.2.jar
├── spring-aop-6.1.8.jar
├── spring-beans-6.1.8.jar
├── spring-boot-3.3.0.jar
├── spring-boot-autoconfigure-3.3.0.jar
├── spring-boot-jarmode-tools-3.3.0.jar
├── spring-context-6.1.8.jar
├── spring-core-6.1.8.jar
├── spring-expression-6.1.8.jar
├── spring-jcl-6.1.8.jar
├── spring-web-6.1.8.jar
├── spring-webmvc-6.1.8.jar
├── tomcat-embed-core-10.1.24.jar
├── tomcat-embed-el-10.1.24.jar
└── tomcat-embed-websocket-10.1.24.jar
1 directory, 32 files
使用训练的缓存,在启动应用程序时,补充-XX:SharedArchiveFile参数即可。
➜ application ✗ java -XX:SharedArchiveFile=application.jsa -jar java12-new-futures-cds-0.0.1-SNAPSHOT.jar
启动日志:
Started Java12NewFuturesCdsApplication in 2.262 seconds (process running for 2.805)
对比不使用CDS缓存的启动日志:
Started Java12NewFuturesCdsApplication in 4.464 seconds (process running for 5.341)
可以看出来,尽管只是一个空项目,但是相同配置情况下启动速度都有倍增。
结语
从上面的测试可以看出来,项目的启动速度是有成本的效率增长的,但同时也存在弊端,就是每次应用程序发生变更时,需要冲洗进行jsa文件的训练。
不管是native-jar还是CDS,都是Java在云原生时代解决应用启动过程慢的探索,在应用的自动扩容领域,还是有不少的应用场景。
参考
Spring Boot 3.3.0 新特性| 使用 CDS 优化启动时间
CDS即Class-Data Sharing
Read More ~