错用HashedWheelTimer导致的OOM问题

事件中心在私有化环境下,只要server一启动过几秒就oom,查看日志是 Failed to create a thread: retVal -1073741830, errno 11

异常堆栈:

Caused by: java.lang.OutOfMemoryError: Failed to create a thread: retVal -1073741830, errno 11   
    at java.lang.Thread.startImpl(Native Method) 
    at java.lang.Thread.start(Thread.java:993) 
    at io.netty.util.HashedWheelTimer.start(HashedWheelTimer.java:366) 
    at io.netty.util.HashedWheelTimer.newTimeout(HashedWheelTimer.java:447) 
    at 业务调用代码省略

在标品环境下没有问题,在其他KA客户上也没有问题
通过对日志的分析,最终发现是事件中心的延迟消息代码存在缺陷,使用了NettyHashedWheelTimer,但是语法存在问题,理论上应该是new一个HashedWheelTimer来处理所有时间延迟,但是错用程每次new一个新的HashedWheelTimerHashedWheelTimer内部每次都会new一个新的线程来处理做调度,一个线程占用1MB,最终内存资源被耗尽。

io.netty.util.HashedWheelTimer#HashedWheelTimer源码:

public HashedWheelTimer(
    ThreadFactory threadFactory,
    long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
    long maxPendingTimeouts, Executor taskExecutor) {

    checkNotNull(threadFactory, "threadFactory");
    checkNotNull(unit, "unit");
    checkPositive(tickDuration, "tickDuration");
    checkPositive(ticksPerWheel, "ticksPerWheel");
    this.taskExecutor = checkNotNull(taskExecutor, "taskExecutor");

    // Normalize ticksPerWheel to power of two and initialize the wheel.
    wheel = createWheel(ticksPerWheel);
    mask = wheel.length - 1;

    // Convert tickDuration to nanos.
    long duration = unit.toNanos(tickDuration);

    // Prevent overflow.
    if (duration >= Long.MAX_VALUE / wheel.length) {
        throw new IllegalArgumentException(String.format(
                "tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
                tickDuration, Long.MAX_VALUE / wheel.length));
    }

    if (duration < MILLISECOND_NANOS) {
        logger.warn("Configured tickDuration {} smaller then {}, using 1ms.",
                    tickDuration, MILLISECOND_NANOS);
        this.tickDuration = MILLISECOND_NANOS;
    } else {
        this.tickDuration = duration;
    }

    // 每次都new一个线程来处理
    workerThread = threadFactory.newThread(worker);

    leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;

    this.maxPendingTimeouts = maxPendingTimeouts;

    if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
        WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
        reportTooManyInstances();
    }
}

因为标品和其他KA使用的是阿里云RocketMQ,此客户使用的是自建的开源版RocketMQ,开源RocketMQ是没有自定义时长的延迟消息的,所以我们自己实现了一套时间轮来实现任意时长的延迟消息,当小于60s的延迟消息会丢入我们的时间轮来处理延迟投递,当时此客户的环境中有大量的60s内的延迟消息,导致一启动就会崩溃。
不过在RocketMQ5.0也支持任意时长了。