时间轮的使用场景自不必多说,最近研究RocketMQ 5.0时,想简单写一个活跃下思路,遂写了下面的方案(没有参照任何代码,没有优化),主要做下记录。
package top.imyzt.learning.algorithm.timer;
import lombok.Getter;
import lombok.Setter;
import lombok.SneakyThrows;
import java.time.LocalDateTime;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
/**
* 简易单机时间轮
* @author imyzt
* @date 2023-11-13 22:03
*/
public class SingleTimingWheel {
public static void main(String[] args) throws InterruptedException {
TimingWheel timingWheel = new TimingWheel(60);
TimingWheel timingWheel2 = new TimingWheel(12);
while (true) {
System.out.print("请输入延时周期: ");
Scanner scanner = new Scanner(System.in);
String next = scanner.next();
if ("exit".equals(next)) {
timingWheel.shutdown();
timingWheel2.shutdown();
break;
}
String[] splits = next.split(",");
for (String split : splits) {
int delayTime = Integer.parseInt(split);
System.out.println("新生产一个任务, 延迟" + delayTime + "秒后执行" + ", 当前时间: " +
LocalDateTime.now() + ", 预计执行时间: " + LocalDateTime.now().plusSeconds(delayTime)
);
Task task = new Task(() -> Thread.currentThread().getName(), delayTime);
timingWheel.addTask(task);
timingWheel2.addTask(task);
}
}
TimeUnit.SECONDS.sleep(1);
System.exit(0);
}
}
class TimingWheel {
private final ExecutorService EXECUTOR_TASK_POOL;
private final ScheduledExecutorService SCHEDULED_TASK_POOL;
/**
* 时间轮周期
*/
private final int timer;
/**
* 记录每个刻度的任务
*/
private final List<LinkedList<Task>> secondWheel;
/**
* 刻度计数器
*/
private final AtomicInteger secondAtomic;
/**
* 任务队列
*/
private final Queue<Task> taskQueue;
/**
* 运行标记
*/
private boolean flag;
public TimingWheel(int timer) {
this.timer = timer;
this.secondWheel = IntStream.range(0, timer).mapToObj(d -> new LinkedList<Task>()).collect(Collectors.toList());
this.secondAtomic = new AtomicInteger(0);
this.taskQueue = new LinkedBlockingQueue<>();
this.EXECUTOR_TASK_POOL = Executors.newSingleThreadExecutor();
this.SCHEDULED_TASK_POOL = Executors.newSingleThreadScheduledExecutor();
this.flag = true;
this.init();
}
public void addTask(Task task) {
int delayTime = task.getDelayTime();
int targetRunSecond = delayTime + secondAtomic.get();
int cycle = delayTime / timer;
int index = targetRunSecond % timer;
task.setCycle(cycle);
System.out.printf("任务id: %s, 当前刻度: %s, cycle: %s, 计划执行刻度: %s \n", task.getTaskId(), secondAtomic.get(), cycle, index);
LinkedList<Task> tasks = secondWheel.get(index);
if (tasks == null) {
tasks = new LinkedList<>();
}
tasks.add(task);
}
public void shutdown() {
EXECUTOR_TASK_POOL.shutdown();
SCHEDULED_TASK_POOL.shutdown();
this.flag = false;
System.out.println("[" + timer + "]shutdown...");
}
@SneakyThrows
private void init () {
SCHEDULED_TASK_POOL.scheduleAtFixedRate(() -> {
int second = secondAtomic.getAndAdd(1);
if (second + 1 == timer) {
secondAtomic.set(0);
}
LinkedList<Task> tasks = secondWheel.get(second);
if (tasks != null && !tasks.isEmpty()) {
Iterator<Task> iterator = tasks.iterator();
while (iterator.hasNext()) {
Task task = iterator.next();
Integer taskCycle = task.getCycle();
if (taskCycle != 0) {
task.setCycle(taskCycle - 1);
System.out.println(task.getTaskId() + "还未到时间, 当前周期" + taskCycle);
continue;
}
taskQueue.add(task);
// 从队列中剔除
iterator.remove();
}
}
}, 0, 1, TimeUnit.SECONDS);
EXECUTOR_TASK_POOL.execute(() -> {
while (flag) {
Task task = taskQueue.poll();
if (task != null) {
System.out.println(LocalDateTime.now() + ", [" + timer + "]时间轮调度任务====>" + task);
}
}
});
}
}
@Getter
class Task {
private final Integer taskId;
/**
* 执行任务
*/
private final Supplier<String> runner;
/**
* 当前第几轮
*/
@Setter
private Integer cycle;
private final Integer delayTime;
/**
* 创建时间
*/
private final LocalDateTime createdAt;
/**
* 理应执行时间
*/
private final LocalDateTime runnerTime;
public Task(Supplier<String> runner, Integer delayTime) {
this.taskId = new Random().nextInt() * 10000;
this.runner = runner;
this.delayTime = delayTime;
this.createdAt = LocalDateTime.now();
this.runnerTime = this.createdAt.plusSeconds(delayTime);
}
@Override
public String toString() {
return "Task{" +
"taskId=" + taskId +
", runner=" + runner.get() +
", cycle=" + cycle +
", delayTime=" + delayTime +
", createdAt=" + createdAt +
", runnerTime=" + runnerTime +
'}';
}
}