标签:# 问题解决

当Spring多层嵌套事务遇到Mybatis-Plus的saveBatch

前言 事情的起因还要从 Transaction rolled back because it has been marked as rollback-only] with root cause 说起,公司的一个业务,Kafka Consumer消费了来自上游的消息处理并写入数据,整个逻辑很复杂,由最外层的方法注解 @Transactional(rollbackFor = Exception.class) 来保证整个过程数据的一致性,但其中有一个业务不需要事务保证,所以前人写了一个 try catch 来防止子业务流异常阻断全局事务的提交。简化后大概类似于这样: @Service public class ServiceA { @Resource private ServiceB serviceB; @Resource private CompanyService companyService; /** * 模拟消费外部的消息, 调用者A */ @Transactional(rollbackFor = Exception.class) public void test(String username) { // 1. 首先保存自己的数据, 模拟业务操作 Company company = new Company(); company.setName("家里蹲"); company.setAddress("localhost"); companyService.save(company); // 2. 然后调用Spring的代理方法B serviceB.doBusiness(username); } } @Service public class ServiceB { @Resource private UserService userService; @Transactional(rollbackFor = Exception.class) public void doBusiness(String username) { // 3. 这里还有一系列事务操作, 但和这次问题无关, 就不放这里干扰了 try { User user = new User(); user.setUsername(username); user.setAddress("shenzhen"); // 4. 罪魁祸首的代码, 此处前人为了不干扰主逻辑, catch了异常 // 为了达到"以为的"异常不干扰外部事务 userService.saveBatch(Collections.singletonList(user)); } catch (DuplicateKeyException e) { // 5. 此处有日志 log.error("xxx业务异常", e) e.printStackTrace(); } } } 上面的代码看起来平平无奇,按照“常规”理解,代码块4位置的流程异常,不会干扰代码块1、3处的事务提交,达到编写者的目的。 但如果是这么简单,我也不会有这次问题的排查过程,也不会有这篇笔记了... 现象 我们从日志监控注意到,代码块5有频繁的异常,并且代码块1、3的数据最终并未写入成功,通过MySQL追踪事务发现,1、3处的语句执行后,均被rollback了。 这就引出了前言中的问题分析来了,在没有确认问题出现在代码块4时,还在找哪些地方有异常可能造成回滚,但通过跟踪程序日志发现,1、3处被回滚的数据,基本都发生在代码块5的日志打印出来之后,并且程序中也发现了下面的异常: org.springframework.transaction.UnexpectedRollbackException: Transaction rolled back because it has been marked as rollback-only ....... Transaction rolled back because it has been marked as rollback-only,说明事务被标记为只能回滚,并且根据行号(上述堆栈未标注业务行号)推断出,是发生代码块2处,调用doBusiness方法发生了回滚。 分析 我们都知道,Spring本身没有事务,只有数据库有事务,Spring提供@Transactional以及事务传播机制,只是利用数据库给我们提供的 begin/commit/rollback 来结合 Spring AOP 实现的统一事务管理,通过 PlatformTransactionManager 工具类来实现对事务的管理。(这方面网上优秀的博客太多了,就不细讲了,相信每一个Spring Boy都熟知) 那么,结合对Spring事务管理器和AOP的理解,我们画一张图来理解上面这个“嵌套事务”的过程。 结合流程图来分析,按“常理”理解的话,catch 了 userService.saveBatch,内部的异常就不会造成外面的回滚,因为异常“被”吞了。 但实则不然,Spring事务管理器采用的是 标记位Savepoint 方法,只要被事务管理器感知到了异常出现,就会将当前线程的事务标记成为rollbackOnly状态,后面的事务再想提交就不行了。这也就导致了ServiceB.doBusiness方法在“想要”提交事务时,发现了事务已经被标记了,就抛出了UnexpectedRollbackException异常,该异常进一步的造成了ServiceA.test方法的代理类检测到了异常,ServiceA.test的代理类也开始处理回滚,进一步造成了整个 Kafka Consumer的事务造成了回滚。 源码 通过搜索“Transaction rolled back because it has been marked as rollback-only”异常,可以很快定位到异常流程在源码中的位置,org.springframework.transaction.support.AbstractPlatformTransactionManager#processRollback 而通过跟踪该方法的调用方,可以很快定位到org.springframework.transaction.support.AbstractPlatformTransactionManager#commit方法,该方法正印证我们上方的 ServiceB.doBusiness想要 commit的时候,defStatus.isGlobalRollbackOnly()检测到当前事务已经被标记了rollbackOnly(org.springframework.transaction.support.ResourceHolderSupport#rollbackOnly) 解决 其实该问题在分析之初,大家并没有定位到是因为userService.saveBatch()上面有Mybatis-Plus加的@Transactional注解干扰到 catch 异常这么顺利,而是都在怀疑:为什么我catch了,还把我上层的事务给回滚了!后面虽然怀疑到userService.saveBatch()的事务注解了,又觉得 try 里面的事务,不会影响到外层(没有想到标记位rollbackOnly这里),后面又想到是不是可以在ServiceB.doBusiness()上调整事务传播机制为Propagation.REQUIRES_NEW,但其实都是徒劳,因为始终都已经被Mybatis-Plus的@Transactional标记为rollbackOnly了。 解决方案: 调用无事务注解的save()方法 改用Db.saveBatch方法 改用异步线程(切换了事务管理器)处理 try 中的流程 后记 下面这篇文章,在我们排查问题的过程中,给了很大的帮助,帮我们确定了问题的方向: 告警:MyBatis-Plus中慎用@Transactional注解,坑的差点被开了... 关于是否可以取消saveBatch方法上事务注解的讨论,可以看Mybatis-Plus官方的issue区: (3.5.7之后,取消了IService,都使用Mapper方法,彻底解决.......) 建议取消数据层的saveBatch等方法的@Transactional注解,交由业务层自行管理 #6333 saveBatch下的事务问题 我新建了一个工程,用来复现该问题: 复现定位Spring多层嵌套事务问题
Read More ~

业务逻辑编排错误 & TTL浅拷贝导致参数丢失问题

前言 在DDD项目中,为了方便参数的传递,通常会使用ThreadLocal来保存一个对象来实现对参数的跨方法传递,避免通过形参的形式传递。在内部项目中,有一个项目使用的是 alibaba开源的 transmittable-thread-local来存储参数,新建了一个上下文对象(AbilityContext.java),使用HashMap来临时存储和获取参数。 AbilityContext 示例 public class AbilityContext { private static final ThreadLocal<Map<String, Object>> CONTEXT = new TransmittableThreadLocal<>(); private AbilityContext() { } /** * 初始化上下文 */ public static void initContext() { Map<String, Object> con = CONTEXT.get(); if (con == null) { CONTEXT.set(new HashMap<>(8)); } else { CONTEXT.get().clear(); } } /** * 清除上下文 */ public static void clearContext() { CONTEXT.remove(); } public static Map<String, Object> getInnerMap() { return CONTEXT.get(); } /** * 获取上下文内容 */ public static <T> T getValue(String key) { Map<String, Object> con = CONTEXT.get(); if (con == null) { return null; } return (T) con.get(key); } /** * 设置上下文参数 */ public static void putValue(String key, Object value) { Map<String, Object> con = CONTEXT.get(); if (con == null) { CONTEXT.set(new HashMap<>(8)); con = CONTEXT.get(); } con.put(key, value); } } 项目情况介绍 通常来说,DDD项目的基本流程是由interface->application,中间封装一层来集中处理上下文的初始化和清空动作,如下图: 在正常情况下,上述流程可以正确的完成参数的写入和获取,但是,在项目运行过程中遇到了一个bug,正常写入参数后,偶现性(低频)获取值为NULL,导致程序出错,示例代码如下(隐去业务代码,重新写的伪代码): 其中demo()方法为当时复现的方法 demo2()为伪代码,是业务代码中调用了另一个application,假设其逻辑和demo()方法一致的业务代码。 @Slf4j public class AlibabaTtlWrongUsageExampleApplication { public static void main(String[] args) { demo(i); } private static void demo(int idx) { // 初始化 AbilityContext.initContext(); // 赋业务值 AbilityContext.putValue("main", "mainValue"); // 这里简化了代码,实际上经过了很多层业务代码调用后才出现了此方法 ThreadUtil.execute(() -> { execute->demo2(); }); // do something // 主线程再次获取业务值(偶现为null) String value = AbilityContext.getValue("main"); if (Objects.isNull(value)) { log.warn("lastGetNullValue, idx={}", idx); } } } 上述代码运行设置了一个key=main,值为mainValue。在下方AbilityContext.getValue("main")偶现获取==NULL。 展开分析 当时在分析的开始有推测是业务代码中参数被重新赋值为NULL,但通过对后续业务代码逐行查看,并没有找到重新赋值的逻辑。 在深入业务代码分析的过程中,发现主流程中有一个异步方法调用(ThreadUtil.execute()),再次调用了另一个领域服务(这是不符合DDD规范的!),而领域服务的入口都会AbilityContext.initContext()的逻辑,通过这个线索 ,继续展开了深入分析。 编码者的初衷可能是想到异步线程已经脱离了当前线程,再次调用 initContext()方法是初始化了一个新的对象上下文,但是由于项目使用的是 alibaba TTL,能够实现跨线程的传递,所以在子线程中依旧能拿到父线程的HashMap。并且TTL默认是使用的浅拷贝对象。由于initContext()中,调用了HashMap.clear()方法,相当于将父线程的HashMap给清空了!。 通过比对父子线程的hashCode值确定为同一对象 // 主线程获取hashCode final int hashCode = AbilityContext.getInnerMap().hashCode(); ThreadUtil.execute(() -> { // 子线程对比hashCode log.info("{}, ThreadUtil hashCode={}", idx, AbilityContext.getInnerMap().hashCode() == hashCode); // 子线程再次初始化(错误的根源) AbilityContext.initContext(); // do something }); 14:42:28.198 [pool-1-thread-26] INFO top.imyzt.learning.caseanalysis.ttl.AlibabaTtlWrongUsageExampleApplication -- 25, ThreadUtil hashCode=true 持续分析 有了上述的线索,基本把问题原因找到了,但是为什么是偶现的呢? 因为使用了异步线程,而线程的调度由操作系统的线程调度算法来决定,并不是一定保证顺序的,所以只要当操作系统优先调度异步线程,那么HashMap就被清空了,如果主线程优先往下走,那么就能够获取到完整的HashMap。 后记 至此,问题分析就告一段落了,整个过程中涉及到 TTL值的父子线程传递、对象浅拷贝、线程的调度,还涉及到了DDD的不规范逻辑编排,整个分析下来花费了一上午的时间,收获还是很大的。 transmittable-thread-local TransmittableThreadLocal的传递只有浅拷贝吗? 线程的优先级 我将源代码上传了GitHub,如果你想在本地调试运行上述案例,可以下载到本地调试,有问题可以评论区沟通。
Read More ~

使用Redis实现分布式锁的坑

分布式锁的关键在于对单一资源的竞争。获得资源的实例将继续执行,其余实例要么退出(互斥锁),要么等待(阻塞锁)。 实现分布式锁的方案有很多,既可以直接使用MySQL作为分布式锁(例如xxl-job),也可以利用ZooKeeper、Redis等。 在基于Spring Cloud的业务系统中,一般都会引入Redis作为分布式缓存中间件,因此更多的人会选择使用Redis来实现分布式锁。本文将介绍使用Redis作为分布式锁时常见的问题和解决方法。 1. 没有使用原子操作指令 错误写法 Boolean tryLock = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, lockValue); stringRedisTemplate.expire(lockKey, Duration.ofSeconds(expireTime)); if (!tryLock) { return; } 上述操作通常出现在新手阶段,在写入锁对象时,没有考虑到原子性问题。在Redis中有提供SET NX PX指令,支持在设置锁的同时指定过期时间,并且支持原子性判断key是否已存在。 NX 和 PX 是 Redis 命令中用于设置 key 的两个选项。 NX: 当指定 NX 选项时,只有在 key 不存在的情况下才会设置 key 的值。如果 key 已经存在,则不进行任何操作。 PX: PX 选项用于设置 key 的过期时间(以毫秒为单位)。例如,PX 10000 表示在 10 秒后将 key 设置为过期状态。 正确写法: Boolean tryLock = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, lockValue, expireTime, TimeUnit.MILLISECONDS); 2. 释放了别人的锁 错误写法 try { Boolean tryLock = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, lockValue, expireTime, TimeUnit.MILLISECONDS); if (!tryLock) { return; } // do something } finally { stringRedisTemplate.delete(lockKey); } 在加锁的过程中,没有设定唯一值作为Value存储到Redis中,在释放时,不判断直接对锁进行释放。其二,将获取锁的代码放在了try代码块中。 在上述代码中存在两个问题: 不该执行到finlly代码块:A请求获得了锁正在执行业务代码,而B请求没有获得锁,但是因为获取锁的代码在try代码块中,导致finally一定会执行,B请求就会将A请求的锁释放,而如果A请求依旧未执行完毕,此时C请求过来时,则C请求错误的拿到了锁。 不该删除别人的锁:在删除锁时,应该判断自己是否是上锁人,由于多次执行Redis指令不具备原子性,所以一般是交由LUA脚本来实现的。 if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end 正确写法 提前将LUA脚本载入到Redis服务端 script = new DefaultRedisScript<>(); script.setResultType(Long.class); script.setScriptSource(new ResourceScriptSource(new ClassPathResource("release_lock.lua"))); 获取和释放锁示例 Boolean tryLock = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, lockValue, expireTime, TimeUnit.MILLISECONDS); if (!tryLock) { return; } try { // do something } finally { ArrayList<String> keys = new ArrayList<>(); keys.add(context.getLockKey()); stringRedisTemplate.execute(this.script, keys, context.getLockValue()); } 3. 事务未提交锁就释放了 错误代码 /** * 事务内获取分布式锁 */ @Transactional(rollbackFor = Exception.class) public void saveUserWithDistributedLock(String name) { String lockKey = "lock_key:" + name; RedisLock.LockContext lockContext = redisLock.tryLock(lockKey, 10000L); if (!lockContext.getTryLock()) { // printLog("没拿到锁"); return; } printLog("拿到锁了" + lockKey); try { this.save(name); } finally { redisLock.release(lockContext); printLog("释放锁了"); } } MySQL常规情况下是RR的隔离级别,只有等到事务提交数据才对其他事务可见,存在**“读视图”,在上述的代码中,A请求拿到了锁执行了业务代码,执行到redisLock.release时将锁释放了,但Spring的@Transactional依赖的是AOP,其需要等到方法执行完毕才会提交事务,在这个临界点,B请求可以正常拿到锁,但是A请求的事务还未提交,B请求的读视图**中还未查询到A请求提交的数据,最终造成了数据的不一致性。 正确代码 正确的情况是在另一个方法中获取到锁之后,再调用包含事务的业务代码。此时需要注意SpringAOP在本方法内代理失效的问题,通常需要新建一个Service来处理。 业务代码执行超过锁过期时间 错误代码 // Domain-Service public void save(String name) { String lockKey = "lock_key:" + name; RedisLock.LockContext lockContext = redisLock.tryLock(lockKey, 10000L); if (!lockContext.getTryLock()) { printLog("没拿到锁"); return; } printLog("拿到锁了" + lockKey); try { userService.save(name); } finally { redisLock.release(lockContext); printLog("释放锁了"); } } // UserService @Transactional(rollbackFor = Exception.class) public void save(String name) { List<User> users = userRepository.findUsersByName(name); if (CollUtil.isNotEmpty(users)) { printLog("已经写入, 不再写入" + users); return; } // 业务保存模拟执行很慢 TimeUnit.SECONDS.sleep(70); } 上述代码中,锁对象只有10s的时间,但是业务代码执行却需要70s,A请求虽然拿到了锁,此时后续10秒其他请求均无法获取锁,但是从第11秒开始的请求将可以拿到锁,而此时A请求还未执行完毕,此时开始出现错误的获取锁,最终造成数据的不一致。 正确写法 参考Redisson的WatchDog机制,另外开辟线程每隔 10s 就给还未执行完毕的 Key 自动续期 30s,保证业务代码能够安全的执行完毕再自行释放锁对象。 示例代码: // watch dog Executors.newScheduledThreadPool(1).scheduleAtFixedRate(() -> { if (!LOCK_CONTEXTS.isEmpty()) { for (LockContext lockContext : LOCK_CONTEXTS) { // 如果执行线程还未释放锁, 续期30s(模拟Redisson) stringRedisTemplate.expire(lockContext.getLockKey(), Duration.ofSeconds(30)); Long expire = stringRedisTemplate.getExpire(lockContext.getLockKey()); log.info("WatchDog, expire 30s, lockKey={}, ttl={}", lockContext.getLockKey(), expire); } } }, 0, // 10秒检测一次 10, TimeUnit.SECONDS); 后记 分布式锁的错误还有很多,本篇主要是自己在工作过程中遇到的一些坑,着重介绍新手阶段在编写分布式锁时遇到的比较基础的问题,后面有空再进行其他场景的逐个介绍。 本文参考:聊聊redis分布式锁的8大坑 本文代码:redis-lua-distributed-lock
Read More ~

错用HashedWheelTimer导致的OOM问题

事件中心在私有化环境下,只要server一启动过几秒就oom,查看日志是 Failed to create a thread: retVal -1073741830, errno 11。 异常堆栈: Caused by: java.lang.OutOfMemoryError: Failed to create a thread: retVal -1073741830, errno 11 at java.lang.Thread.startImpl(Native Method) at java.lang.Thread.start(Thread.java:993) at io.netty.util.HashedWheelTimer.start(HashedWheelTimer.java:366) at io.netty.util.HashedWheelTimer.newTimeout(HashedWheelTimer.java:447) at 业务调用代码省略 在标品环境下没有问题,在其他KA客户上也没有问题 通过对日志的分析,最终发现是事件中心的延迟消息代码存在缺陷,使用了Netty的HashedWheelTimer,但是语法存在问题,理论上应该是new一个HashedWheelTimer来处理所有时间延迟,但是错用程每次new一个新的HashedWheelTimer,HashedWheelTimer内部每次都会new一个新的线程来处理做调度,一个线程占用1MB,最终内存资源被耗尽。 io.netty.util.HashedWheelTimer#HashedWheelTimer源码: public HashedWheelTimer( ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection, long maxPendingTimeouts, Executor taskExecutor) { checkNotNull(threadFactory, "threadFactory"); checkNotNull(unit, "unit"); checkPositive(tickDuration, "tickDuration"); checkPositive(ticksPerWheel, "ticksPerWheel"); this.taskExecutor = checkNotNull(taskExecutor, "taskExecutor"); // Normalize ticksPerWheel to power of two and initialize the wheel. wheel = createWheel(ticksPerWheel); mask = wheel.length - 1; // Convert tickDuration to nanos. long duration = unit.toNanos(tickDuration); // Prevent overflow. if (duration >= Long.MAX_VALUE / wheel.length) { throw new IllegalArgumentException(String.format( "tickDuration: %d (expected: 0 < tickDuration in nanos < %d", tickDuration, Long.MAX_VALUE / wheel.length)); } if (duration < MILLISECOND_NANOS) { logger.warn("Configured tickDuration {} smaller then {}, using 1ms.", tickDuration, MILLISECOND_NANOS); this.tickDuration = MILLISECOND_NANOS; } else { this.tickDuration = duration; } // 每次都new一个线程来处理 workerThread = threadFactory.newThread(worker); leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null; this.maxPendingTimeouts = maxPendingTimeouts; if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT && WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) { reportTooManyInstances(); } } 因为标品和其他KA使用的是阿里云RocketMQ,此客户使用的是自建的开源版RocketMQ,开源RocketMQ是没有自定义时长的延迟消息的,所以我们自己实现了一套时间轮来实现任意时长的延迟消息,当小于60s的延迟消息会丢入我们的时间轮来处理延迟投递,当时此客户的环境中有大量的60s内的延迟消息,导致一启动就会崩溃。 不过在RocketMQ5.0也支持任意时长了。
Read More ~

CircuitBreak导致ThreadLocal参数丢失问题

背景 使用OpenFeign时,通常会实现RequestInterceptor接口来自定义FeignConfiguration,OpenFeign暴露了feign.RequestTemplate信息,给到我们在发送请求前自定义参数信息的扩展点。 在分布式系统中,通常会将本服务的信息(UserInfo、RequestId)透传至下游服务,从而实现分布式链路追踪等功能,对于像用户信息等,在Web系统中通常使用 ThreadLocal 来存储信息,在自定义的FeignConfiguration中获取ThreadLocal再塞入到feign.RequestTemplate中,实现向下游服务的传递,示例: public class FeignConfiguration implements RequestInterceptor { @Override public void apply(RequestTemplate template) { ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes(); String userId = SubjectContext.get().getUserId(); if (null != attributes) { HttpServletRequest request = attributes.getRequest(); template.header("token", request.getHeader("TOKEN")); template.header("userId", userId); } } } 简单的Context示例: public class SubjectContext { protected static ThreadLocal<UserInfo> subjectContext = new ThreadLocal(); public static void remove() { subjectContext.remove(); } public static void set(UserInfo uerInfo) { subjectContext.set(uerInfo); } public static UserInfo get() { return (UserInfo)subjectContext.get(); } } 出现错误 上述代码在常规情况下,是能够按照预期执行的。 但是最近项目引入了CircuitBreaker作为服务熔断的断路器之后,上述代码在执行到SubjectContext.get()时,会抛出空指针,拿不到用户信息。 通过分析CircuitBreaker的源码,最终定位到代码出现在Resilience4JCircuitBreaker内部,在Resilience4JCircuitBreaker中有一个public <T> T run(Supplier<T> toRun, Function<Throwable, T> fallback)方法,方法入参的toRun就是封装过的我们定义的Feign接口,其包装过程在FeignCircuitBreakerInvocationHandler#asSupplier代码中,如下: private Supplier<Object> asSupplier(final Method method, final Object[] args) { final RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes(); return () -> { try { RequestContextHolder.setRequestAttributes(requestAttributes); // 执行我们的真正方法 return dispatch.get(method).invoke(args); } catch (RuntimeException throwable) { throw throwable; } catch (Throwable throwable) { throw new RuntimeException(throwable); } }; } Spring Cloud CircuitBreaker Resilience4j 提供了两种实现: 使用 Semaphores 的 SemaphoreBulkhead。 一个 FixedThreadPoolBulkhead,它使用一个有界队列和一个固定的线程池。 默认情况下,Spring Cloud CircuitBreaker Resilience4j 使用 FixedThreadPoolBulkhead。要修改默认行为以使用 SemaphoreBulkhead,请将属性 spring.cloud.circuitbreaker.resilience4j.enableSemaphoreDefaultBulkhead 设为 true。 正是由于上述原因,默认将我们的FeignConfiguration提交给了线程池,由于我们使用的是ThreadLocal导致线程本地变量没有向子线程传递,在执行FeignConfiguration时子线程无法拿到Context信息,最终导致程序的报错。 解决办法 通过分析源码我们发现,执行任务的线程池Resilience4JCircuitBreaker#executorService是由外部传递过来进行初始化的,调用方在Resilience4JCircuitBreakerFactory#create(java.lang.String, java.lang.String, java.util.concurrent.ExecutorService) 在Resilience4JCircuitBreakerFactory中发现,是由本实例在create方法被调用时传入的本类的成员变量,即: private ExecutorService executorService = Executors.newCachedThreadPool(); private ConcurrentHashMap<String, ExecutorService> executorServices = new ConcurrentHashMap<>(); 而我们在没有定义自定义Feign Group时,默认使用的就是executorService,在本类中有一个Resilience4JCircuitBreakerFactory#configureExecutorService方法专门保留了外部传入自定义线程池的扩展,我们可以自己实现创建一个支持传递Context到子线程的线程池,即可将参数向下传递,比如像这样: @Configurable @AllArgsConstructor public class CircuitBreakerConfiguration implements ApplicationRunner { private final Resilience4JCircuitBreakerFactory factory; @Override public void run(ApplicationArguments args) throws Exception { ContextThreadPoolExecutor contextThreadPoolExecutor = new ContextThreadPoolExecutor(2, 5, 1, TimeUnit.MINUTES, new ArrayBlockingQueue<>(1024)); // **change ThreadPoolExecutor** factory.configureExecutorService(contextThreadPoolExecutor); } public static class ContextThreadPoolExecutor extends ThreadPoolExecutor { public ContextThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } public void execute(Runnable command) { super.execute(wrap(command)); } private static Runnable wrap(Runnable runnable) { **SubjectContext context = SubjectContext.getContext();** return () -> { // 将参数向下传递 **SubjectContext.setContext(context);** try { runnable.run(); } finally { **SubjectContext.clear();** } }; } } } 后记 上述的方案只解决了没有自定义Group的情况,官方在自定义Group的情况下是没有保留扩展位的,所以给官方提了一个MR并且已成功合并到主分支,如下: Customizable groupExecutorService #180
Read More ~

RocketMQ Client 启动异常解决

现象 rocketmq-common-4.9.3 版本作为客户端消费数据时,从MessageExt.getMsgId()获取消息ID时,会存在一个潜在的依赖冲突问题,最终导致方法执行失败,如下图: 从现象来看,Could not initialize class org.apache.rocketmq.common.message.MessageClientIDSetter 表示 MessageClientIDSetter类为正确被加载,通过搜索类似的资料找到了以下文章: Client dependency conflict, cause NoClassDefFoundError. Require for shaded client 记一次RocketMQ消息消费异常 从51cto的文章来看,基本可以分析出是MessageClientIDSetter的静态代码块在执行时异常导致类加载器加载类失败。 static { byte[] ip; try { // 执行异常,导致`MessageClientIDSetter`未成功被类加载器加载 ip = UtilAll.getIP(); } catch (Exception e) { ip = createFakeIP(); } LEN = ip.length + 2 + 4 + 4 + 2; ByteBuffer tempBuffer = ByteBuffer.allocate(ip.length + 2 + 4); tempBuffer.put(ip); tempBuffer.putShort((short) UtilAll.getPid()); tempBuffer.putInt(MessageClientIDSetter.class.getClassLoader().hashCode()); FIX_STRING = UtilAll.bytes2string(tempBuffer.array()).toCharArray(); setStartTime(System.currentTimeMillis()); COUNTER = new AtomicInteger(0); } 分析 从上面的博客中已经可以得到结论是依赖冲突造成的,但问题是部分服务会出现,大多数服务不会出现,通过往深层次分析得出结论: rocketmq-acl rocketmq-client 均依赖于 commons-validator-1.7 版本,而项目中如果有类似于 aliyun-log-appender 老版本的依赖,会传递依赖于旧版本的 commons-validator ,如下图所示: Maven在遇到依赖冲突时,首先会采用就近原则,上图中第一个传递依赖有5个层级,而下一个传递依赖只有4个层级,所以就近取到了错误的1.4.0版本。 其它的服务更多是单模块项目,直接依赖于RocketMQ的client代码,而出现问题的服务是一个多模块依赖关系的服务,最终依赖传递了3层,导致依赖链路变长,最终优先依赖了链路更近的由 log-appender 引入进来的 1.4.0 版本。 相关文章: Maven依赖冲突避坑指北
Read More ~