事件中心在私有化环境下,只要server一启动过几秒就oom,查看日志是 Failed to create a thread: retVal -1073741830, errno 11。
异常堆栈:
Caused by: java.lang.OutOfMemoryError: Failed to create a thread: retVal -1073741830, errno 11
at java.lang.Thread.startImpl(Native Method)
at java.lang.Thread.start(Thread.java:993)
at io.netty.util.HashedWheelTimer.start(HashedWheelTimer.java:366)
at io.netty.util.HashedWheelTimer.newTimeout(HashedWheelTimer.java:447)
at 业务调用代码省略
在标品环境下没有问题,在其他KA客户上也没有问题
通过对日志的分析,最终发现是事件中心的延迟消息代码存在缺陷,使用了Netty的HashedWheelTimer,但是语法存在问题,理论上应该是new一个HashedWheelTimer来处理所有时间延迟,但是错用程每次new一个新的HashedWheelTimer,HashedWheelTimer内部每次都会new一个新的线程来处理做调度,一个线程占用1MB,最终内存资源被耗尽。
io.netty.util.HashedWheelTimer#HashedWheelTimer源码:
public HashedWheelTimer(
ThreadFactory threadFactory,
long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
long maxPendingTimeouts, Executor taskExecutor) {
checkNotNull(threadFactory, "threadFactory");
checkNotNull(unit, "unit");
checkPositive(tickDuration, "tickDuration");
checkPositive(ticksPerWheel, "ticksPerWheel");
this.taskExecutor = checkNotNull(taskExecutor, "taskExecutor");
// Normalize ticksPerWheel to power of two and initialize the wheel.
wheel = createWheel(ticksPerWheel);
mask = wheel.length - 1;
// Convert tickDuration to nanos.
long duration = unit.toNanos(tickDuration);
// Prevent overflow.
if (duration >= Long.MAX_VALUE / wheel.length) {
throw new IllegalArgumentException(String.format(
"tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
tickDuration, Long.MAX_VALUE / wheel.length));
}
if (duration < MILLISECOND_NANOS) {
logger.warn("Configured tickDuration {} smaller then {}, using 1ms.",
tickDuration, MILLISECOND_NANOS);
this.tickDuration = MILLISECOND_NANOS;
} else {
this.tickDuration = duration;
}
// 每次都new一个线程来处理
workerThread = threadFactory.newThread(worker);
leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;
this.maxPendingTimeouts = maxPendingTimeouts;
if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
reportTooManyInstances();
}
}
因为标品和其他KA使用的是阿里云RocketMQ,此客户使用的是自建的开源版RocketMQ,开源RocketMQ是没有自定义时长的延迟消息的,所以我们自己实现了一套时间轮来实现任意时长的延迟消息,当小于60s的延迟消息会丢入我们的时间轮来处理延迟投递,当时此客户的环境中有大量的60s内的延迟消息,导致一启动就会崩溃。
不过在RocketMQ5.0也支持任意时长了。
Read More ~
标签:#
RabbitMQ
ThreadPoolExecutor “非常用” 方法
平时在使用线程池时,更多关注到的是coreSize、maxSize、blockQueue、RejectedExecutionHandler这些参数,但在线程池监控领域,还需要关注到其他的一些方法。在此处做统一记录和备忘:
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 5, 1, TimeUnit.MINUTES, new ArrayBlockingQueue<>(1024), new ThreadPoolExecutor.CallerRunsPolicy());
// 启动所有核心线程(预热)
threadPoolExecutor.prestartAllCoreThreads();
// 启动一个核心线程
threadPoolExecutor.prestartCoreThread();
// 默认情况下构造器中的keepAliveTime指定的是非核心线程的空闲时间, 通过如下方法, 可以允许核心线程超时
threadPoolExecutor.allowCoreThreadTimeOut(true);
// ⭐️ 动态线程池必备方法
// 启动后, 设置核心线程数量
threadPoolExecutor.setCorePoolSize(3);
// 启动后, 设置最大线程数量
threadPoolExecutor.setMaximumPoolSize(10);
// 已执行完的任务总数
threadPoolExecutor.getTaskCount();
// 获取工作队列剩余数量
threadPoolExecutor.getQueue().remainingCapacity();
}
后记
通过上面的代码可知,在运行过程中我们也是可以操作coreSize和maxSize的。那么如何才能实现对Queue的大小进行控制呢?目前开源届常用的是采取RabbitMQ中的VariableLinkedBlockingQueue来实现。
Read More ~