背景
使用OpenFeign
时,通常会实现RequestInterceptor
接口来自定义FeignConfiguration
,OpenFeign
暴露了feign.RequestTemplate
信息,给到我们在发送请求前自定义参数信息的扩展点。
在分布式系统中,通常会将本服务的信息(UserInfo
、RequestId
)透传至下游服务,从而实现分布式链路追踪等功能,对于像用户信息等,在Web系统中通常使用 ThreadLocal
来存储信息,在自定义的FeignConfiguration
中获取ThreadLocal
再塞入到feign.RequestTemplate
中,实现向下游服务的传递,示例:
public class FeignConfiguration implements RequestInterceptor {
@Override
public void apply(RequestTemplate template) {
ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
String userId = SubjectContext.get().getUserId();
if (null != attributes) {
HttpServletRequest request = attributes.getRequest();
template.header("token", request.getHeader("TOKEN"));
template.header("userId", userId);
}
}
}
简单的Context示例:
public class SubjectContext {
protected static ThreadLocal<UserInfo> subjectContext = new ThreadLocal();
public static void remove() {
subjectContext.remove();
}
public static void set(UserInfo uerInfo) {
subjectContext.set(uerInfo);
}
public static UserInfo get() {
return (UserInfo)subjectContext.get();
}
}
出现错误
上述代码在常规情况下,是能够按照预期执行的。
但是最近项目引入了CircuitBreaker
作为服务熔断的断路器之后,上述代码在执行到SubjectContext.get()
时,会抛出空指针,拿不到用户信息。
通过分析CircuitBreaker
的源码,最终定位到代码出现在Resilience4JCircuitBreaker
内部,在Resilience4JCircuitBreaker
中有一个public <T> T run(Supplier<T> toRun, Function<Throwable, T> fallback)
方法,方法入参的toRun
就是封装过的我们定义的Feign接口,其包装过程在FeignCircuitBreakerInvocationHandler#asSupplier
代码中,如下:
private Supplier<Object> asSupplier(final Method method, final Object[] args) {
final RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();
return () -> {
try {
RequestContextHolder.setRequestAttributes(requestAttributes);
// 执行我们的真正方法
return dispatch.get(method).invoke(args);
}
catch (RuntimeException throwable) {
throw throwable;
}
catch (Throwable throwable) {
throw new RuntimeException(throwable);
}
};
}
Spring Cloud CircuitBreaker Resilience4j 提供了两种实现:
- 使用 Semaphores 的 SemaphoreBulkhead。
- 一个 FixedThreadPoolBulkhead,它使用一个有界队列和一个固定的线程池。
默认情况下,Spring Cloud CircuitBreaker Resilience4j 使用 FixedThreadPoolBulkhead
。要修改默认行为以使用 SemaphoreBulkhead
,请将属性 spring.cloud.circuitbreaker.resilience4j.enableSemaphoreDefaultBulkhead
设为 true
。
正是由于上述原因,默认将我们的FeignConfiguration提交给了线程池,由于我们使用的是ThreadLocal
导致线程本地变量没有向子线程传递,在执行FeignConfiguration时子线程无法拿到Context
信息,最终导致程序的报错。
解决办法
通过分析源码我们发现,执行任务的线程池Resilience4JCircuitBreaker#executorService
是由外部传递过来进行初始化的,调用方在Resilience4JCircuitBreakerFactory#create(java.lang.String, java.lang.String, java.util.concurrent.ExecutorService)
在Resilience4JCircuitBreakerFactory
中发现,是由本实例在create
方法被调用时传入的本类的成员变量,即:
private ExecutorService executorService = Executors.newCachedThreadPool();
private ConcurrentHashMap<String, ExecutorService> executorServices = new ConcurrentHashMap<>();
而我们在没有定义自定义Feign Group时,默认使用的就是executorService
,在本类中有一个Resilience4JCircuitBreakerFactory#configureExecutorService
方法专门保留了外部传入自定义线程池的扩展,我们可以自己实现创建一个支持传递Context
到子线程的线程池,即可将参数向下传递,比如像这样:
@Configurable
@AllArgsConstructor
public class CircuitBreakerConfiguration implements ApplicationRunner {
private final Resilience4JCircuitBreakerFactory factory;
@Override
public void run(ApplicationArguments args) throws Exception {
ContextThreadPoolExecutor contextThreadPoolExecutor =
new ContextThreadPoolExecutor(2, 5, 1, TimeUnit.MINUTES, new ArrayBlockingQueue<>(1024));
// **change ThreadPoolExecutor**
factory.configureExecutorService(contextThreadPoolExecutor);
}
public static class ContextThreadPoolExecutor extends ThreadPoolExecutor {
public ContextThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
public void execute(Runnable command) {
super.execute(wrap(command));
}
private static Runnable wrap(Runnable runnable) {
**SubjectContext context = SubjectContext.getContext();**
return () -> {
// 将参数向下传递
**SubjectContext.setContext(context);**
try {
runnable.run();
} finally {
**SubjectContext.clear();**
}
};
}
}
}
后记
上述的方案只解决了没有自定义Group的情况,官方在自定义Group的情况下是没有保留扩展位的,所以给官方提了一个MR并且已成功合并到主分支,如下:
Customizable groupExecutorService #180