事件中心在私有化环境下,只要server一启动过几秒就oom,查看日志是 Failed to create a thread: retVal -1073741830, errno 11。
异常堆栈:
Caused by: java.lang.OutOfMemoryError: Failed to create a thread: retVal -1073741830, errno 11
at java.lang.Thread.startImpl(Native Method)
at java.lang.Thread.start(Thread.java:993)
at io.netty.util.HashedWheelTimer.start(HashedWheelTimer.java:366)
at io.netty.util.HashedWheelTimer.newTimeout(HashedWheelTimer.java:447)
at 业务调用代码省略
在标品环境下没有问题,在其他KA客户上也没有问题
通过对日志的分析,最终发现是事件中心的延迟消息代码存在缺陷,使用了Netty的HashedWheelTimer,但是语法存在问题,理论上应该是new一个HashedWheelTimer来处理所有时间延迟,但是错用程每次new一个新的HashedWheelTimer,HashedWheelTimer内部每次都会new一个新的线程来处理做调度,一个线程占用1MB,最终内存资源被耗尽。
io.netty.util.HashedWheelTimer#HashedWheelTimer源码:
public HashedWheelTimer(
ThreadFactory threadFactory,
long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
long maxPendingTimeouts, Executor taskExecutor) {
checkNotNull(threadFactory, "threadFactory");
checkNotNull(unit, "unit");
checkPositive(tickDuration, "tickDuration");
checkPositive(ticksPerWheel, "ticksPerWheel");
this.taskExecutor = checkNotNull(taskExecutor, "taskExecutor");
// Normalize ticksPerWheel to power of two and initialize the wheel.
wheel = createWheel(ticksPerWheel);
mask = wheel.length - 1;
// Convert tickDuration to nanos.
long duration = unit.toNanos(tickDuration);
// Prevent overflow.
if (duration >= Long.MAX_VALUE / wheel.length) {
throw new IllegalArgumentException(String.format(
"tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
tickDuration, Long.MAX_VALUE / wheel.length));
}
if (duration < MILLISECOND_NANOS) {
logger.warn("Configured tickDuration {} smaller then {}, using 1ms.",
tickDuration, MILLISECOND_NANOS);
this.tickDuration = MILLISECOND_NANOS;
} else {
this.tickDuration = duration;
}
// 每次都new一个线程来处理
workerThread = threadFactory.newThread(worker);
leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;
this.maxPendingTimeouts = maxPendingTimeouts;
if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
reportTooManyInstances();
}
}
因为标品和其他KA使用的是阿里云RocketMQ,此客户使用的是自建的开源版RocketMQ,开源RocketMQ是没有自定义时长的延迟消息的,所以我们自己实现了一套时间轮来实现任意时长的延迟消息,当小于60s的延迟消息会丢入我们的时间轮来处理延迟投递,当时此客户的环境中有大量的60s内的延迟消息,导致一启动就会崩溃。
不过在RocketMQ5.0也支持任意时长了。
Read More ~
标签:#
TimeingWheel
Java 实现简单单机时间轮方案
时间轮的使用场景自不必多说,最近研究RocketMQ 5.0时,想简单写一个活跃下思路,遂写了下面的方案(没有参照任何代码,没有优化),主要做下记录。
package top.imyzt.learning.algorithm.timer;
import lombok.Getter;
import lombok.Setter;
import lombok.SneakyThrows;
import java.time.LocalDateTime;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
/**
* 简易单机时间轮
* @author imyzt
* @date 2023-11-13 22:03
*/
public class SingleTimingWheel {
public static void main(String[] args) throws InterruptedException {
TimingWheel timingWheel = new TimingWheel(60);
TimingWheel timingWheel2 = new TimingWheel(12);
while (true) {
System.out.print("请输入延时周期: ");
Scanner scanner = new Scanner(System.in);
String next = scanner.next();
if ("exit".equals(next)) {
timingWheel.shutdown();
timingWheel2.shutdown();
break;
}
String[] splits = next.split(",");
for (String split : splits) {
int delayTime = Integer.parseInt(split);
System.out.println("新生产一个任务, 延迟" + delayTime + "秒后执行" + ", 当前时间: " +
LocalDateTime.now() + ", 预计执行时间: " + LocalDateTime.now().plusSeconds(delayTime)
);
Task task = new Task(() -> Thread.currentThread().getName(), delayTime);
timingWheel.addTask(task);
timingWheel2.addTask(task);
}
}
TimeUnit.SECONDS.sleep(1);
System.exit(0);
}
}
class TimingWheel {
private final ExecutorService EXECUTOR_TASK_POOL;
private final ScheduledExecutorService SCHEDULED_TASK_POOL;
/**
* 时间轮周期
*/
private final int timer;
/**
* 记录每个刻度的任务
*/
private final List<LinkedList<Task>> secondWheel;
/**
* 刻度计数器
*/
private final AtomicInteger secondAtomic;
/**
* 任务队列
*/
private final Queue<Task> taskQueue;
/**
* 运行标记
*/
private boolean flag;
public TimingWheel(int timer) {
this.timer = timer;
this.secondWheel = IntStream.range(0, timer).mapToObj(d -> new LinkedList<Task>()).collect(Collectors.toList());
this.secondAtomic = new AtomicInteger(0);
this.taskQueue = new LinkedBlockingQueue<>();
this.EXECUTOR_TASK_POOL = Executors.newSingleThreadExecutor();
this.SCHEDULED_TASK_POOL = Executors.newSingleThreadScheduledExecutor();
this.flag = true;
this.init();
}
public void addTask(Task task) {
int delayTime = task.getDelayTime();
int targetRunSecond = delayTime + secondAtomic.get();
int cycle = delayTime / timer;
int index = targetRunSecond % timer;
task.setCycle(cycle);
System.out.printf("任务id: %s, 当前刻度: %s, cycle: %s, 计划执行刻度: %s \n", task.getTaskId(), secondAtomic.get(), cycle, index);
LinkedList<Task> tasks = secondWheel.get(index);
if (tasks == null) {
tasks = new LinkedList<>();
}
tasks.add(task);
}
public void shutdown() {
EXECUTOR_TASK_POOL.shutdown();
SCHEDULED_TASK_POOL.shutdown();
this.flag = false;
System.out.println("[" + timer + "]shutdown...");
}
@SneakyThrows
private void init () {
SCHEDULED_TASK_POOL.scheduleAtFixedRate(() -> {
int second = secondAtomic.getAndAdd(1);
if (second + 1 == timer) {
secondAtomic.set(0);
}
LinkedList<Task> tasks = secondWheel.get(second);
if (tasks != null && !tasks.isEmpty()) {
Iterator<Task> iterator = tasks.iterator();
while (iterator.hasNext()) {
Task task = iterator.next();
Integer taskCycle = task.getCycle();
if (taskCycle != 0) {
task.setCycle(taskCycle - 1);
System.out.println(task.getTaskId() + "还未到时间, 当前周期" + taskCycle);
continue;
}
taskQueue.add(task);
// 从队列中剔除
iterator.remove();
}
}
}, 0, 1, TimeUnit.SECONDS);
EXECUTOR_TASK_POOL.execute(() -> {
while (flag) {
Task task = taskQueue.poll();
if (task != null) {
System.out.println(LocalDateTime.now() + ", [" + timer + "]时间轮调度任务====>" + task);
}
}
});
}
}
@Getter
class Task {
private final Integer taskId;
/**
* 执行任务
*/
private final Supplier<String> runner;
/**
* 当前第几轮
*/
@Setter
private Integer cycle;
private final Integer delayTime;
/**
* 创建时间
*/
private final LocalDateTime createdAt;
/**
* 理应执行时间
*/
private final LocalDateTime runnerTime;
public Task(Supplier<String> runner, Integer delayTime) {
this.taskId = new Random().nextInt() * 10000;
this.runner = runner;
this.delayTime = delayTime;
this.createdAt = LocalDateTime.now();
this.runnerTime = this.createdAt.plusSeconds(delayTime);
}
@Override
public String toString() {
return "Task{" +
"taskId=" + taskId +
", runner=" + runner.get() +
", cycle=" + cycle +
", delayTime=" + delayTime +
", createdAt=" + createdAt +
", runnerTime=" + runnerTime +
'}';
}
}
Read More ~