现象
rocketmq-common-4.9.3 版本作为客户端消费数据时,从MessageExt.getMsgId()获取消息ID时,会存在一个潜在的依赖冲突问题,最终导致方法执行失败,如下图:
从现象来看,Could not initialize class org.apache.rocketmq.common.message.MessageClientIDSetter 表示 MessageClientIDSetter类为正确被加载,通过搜索类似的资料找到了以下文章:
Client dependency conflict, cause NoClassDefFoundError. Require for shaded client
记一次RocketMQ消息消费异常
从51cto的文章来看,基本可以分析出是MessageClientIDSetter的静态代码块在执行时异常导致类加载器加载类失败。
static {
byte[] ip;
try {
// 执行异常,导致`MessageClientIDSetter`未成功被类加载器加载
ip = UtilAll.getIP();
} catch (Exception e) {
ip = createFakeIP();
}
LEN = ip.length + 2 + 4 + 4 + 2;
ByteBuffer tempBuffer = ByteBuffer.allocate(ip.length + 2 + 4);
tempBuffer.put(ip);
tempBuffer.putShort((short) UtilAll.getPid());
tempBuffer.putInt(MessageClientIDSetter.class.getClassLoader().hashCode());
FIX_STRING = UtilAll.bytes2string(tempBuffer.array()).toCharArray();
setStartTime(System.currentTimeMillis());
COUNTER = new AtomicInteger(0);
}
分析
从上面的博客中已经可以得到结论是依赖冲突造成的,但问题是部分服务会出现,大多数服务不会出现,通过往深层次分析得出结论:
rocketmq-acl rocketmq-client 均依赖于 commons-validator-1.7 版本,而项目中如果有类似于 aliyun-log-appender 老版本的依赖,会传递依赖于旧版本的 commons-validator ,如下图所示:
Maven在遇到依赖冲突时,首先会采用就近原则,上图中第一个传递依赖有5个层级,而下一个传递依赖只有4个层级,所以就近取到了错误的1.4.0版本。
其它的服务更多是单模块项目,直接依赖于RocketMQ的client代码,而出现问题的服务是一个多模块依赖关系的服务,最终依赖传递了3层,导致依赖链路变长,最终优先依赖了链路更近的由 log-appender 引入进来的 1.4.0 版本。
相关文章: Maven依赖冲突避坑指北
Read More ~
标签:#
RocketMQ
RocketMQ消息存储过程
讲完了消息的生产过程,接下来记录一下消息发送到Broker后如何存储和分发。
前言
首先看下RocketMQ Broker的消息存储目录结构,RocketMQ的消息存储在本地文件系统中,默认在当前用户主目录下的store目录中:
abort:该文件在Broker启动后自动创建,正常关闭自动消息,如果没有启动Broker的情况下看到此文件表示上次Broker是非正常关闭的
checkpoint:存储着commitlog,consumequeue,index文件的最后刷盘时间
commitlog:存放commitlog文件,消息就是存储在其中
consumerqueue:存放consumequeue文件,队列就是存放在其中
index:存放消息索引文件,支持根据key查询的依据
lock:运行期间使用到的全局资源锁
消息存储
如何保证高性能读写
传统的IO读写方式,存在多次上下文切换和多次数据的拷贝,在并发极高的MQ场景下,会严重影响读写效率。所以为了减少内核态<=>用户态切换,减少数据拷贝次数,引入了零拷贝技术。
零拷贝
零拷贝是一种思想,指的是CPU不需要先将数据从某处内存复制到另一个特定区域,实现零拷贝的方式有以下几种方式:
mmap()
sendfile()
mmap()
mmap(memory map) 是一种内存映射方法,即将一个文件或者其它对象映射到进程的地址空间,实现文件磁盘地址和进程虚拟地址空间中一段虚拟地址的一一对应关系。
对应上图中内核缓冲区->用户缓冲区->Socket缓冲区的过程直接变成了内核缓冲区-(CPU拷贝)->Socket缓冲区,减少了一次拷贝所需的时间。
但整个过程中,上下文切换依旧是4次,相比于传统IO没有提升,切换过程:
当用户发起mmap调用的时候会发生上下文切换1,进行内存映射;
然后数据被拷贝到内核缓冲区,mmap返回,发生上下文切换2;
随后用户调用write,发生上下文切换3;
将内核缓冲区的数据拷贝到Socket缓冲区,write返回,发生上下文切换4。
FileChannel fileChannel =
new RandomAccessFile("mmap_test.txt", "rw").getChannel();
MappedByteBuffer mappedByteBuffer =
fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, fileChannel.size());
sendfile()
sendfile()和mmap()一样可以减少一次CPU拷贝,但它可以减少2次上下文切换,切换过程:
用户发起sendfile()调用时会发生切换1,之后数据通过DMA拷贝到内核缓冲区
之后再将内核缓冲区的数据CPU拷贝到Socket缓冲区,最后拷贝到网卡,sendfile()返回,发生切换2。
//调用transferTo方法向目标数据传输
FileChannel channel = FileChannel.open(Paths.get("./test.txt"),
StandardOpenOption.WRITE, StandardOpenOption.CREATE);
channel.transferTo(position, len, target);
commitlog(mappedFile)
目录与文件
真正存储消息,一个Broker只有一个commitlog目录,所有Topic的消息都存放于此。
mappedFile大小1G,文件名由20位十进制数组成,表示当前文件的第一条消息的起始位置偏移量。
第一个文件名一定是20位0构成,第一条消息的偏移量commitlog offset为0.
当第一个文件放满时,会自动生成第二个文件继续存放消息,假设第一个1073741820字节(1G=1073741824字节),最后只剩4字节不够存放下一条数据。则下一个文件名为00000000001073741824
消息单元
consumequeue
存储topic消息在commitlog的位置索引。
为了提高效率,会为每个topic在~/store/consumequeue中创建一个目录{TopicId}/{QueueId},consumequeue文件是commitlog的索引文件,可以根据consumequeue定位到具体的消息在commitlog中的位置。
consumequeue文件名也是由20位数字组成,表示当前文件的第一个索引条目的起始位置偏移量,与mappedFile文件不同的是,其后续的文件名也是固定的,因为consumequeue文件大小是固定不变的。
每个consumequeue文件可以包含30w个索引条目,其中包含:
消息在mappedFile文件中的偏移量commitlog offset
消息长度
消息Tag的hashcode值
这三个属性占用20个字节,所以每个文件的大小是固定的30w * 20字节
对文件的读写
消息写入
一条消息进入到Broker后经历了以下几个过程才最终被持久化:
Broker根据queueId,获取到该消息对应索引条目要在consumequeue目录中的写入偏移量,即QueueOffset
将queueId、queueOffset等数据,与消息一起封装为消息单元
将消息单元写入到commitlog
形成消息索引条目
将消息索引条目分发到相应的consumequeue
消息拉取
当Consumer来拉取消息时会经历以下几个步骤:
Consumer获取到其要消费消息所在Queue的消费偏移量offset,计算出其要消费消息的消费offset
消费offset即消费进度,consumer对某个queue的消费offset,即消费到了该queue的第几条消息
消息offset = 消费offset + 1
Consumer向Broker发送拉取请求,其中会包含其要拉取消息的Queue、消息offset及消息Tag
Broker计算在该consumequeue中的queueOffset
queueOffset = 消息offset * 20字节
从该queueOffset处开始向后查找第一个指定Tag的索引条目
解析该索引条目的前8个字节,即可定位到该消息在commitlog中的commitlog offset
从对应commitlog offset中读取消息单元,并发送给Consumer
刷盘机制
RocketMQ消息写入到Commitlog文件中时并不是直接写入到文件,而是先写到PageCache中,也就是前面IO图中的内核缓冲区,所以RocketMQ也和MySQL等类似的log刷盘机制
异步刷盘
写入到PageCache后直接返回生产者消息存储成功,另外的后台线程在将消息刷到磁盘。其提供了2套刷盘机制:
固定时间,默认每隔0.5s会刷一次【默认方式】
每存一次会通知刷盘,但不会等待结果,同时如果0.5s没有收到通知,也会主动刷盘。
同步刷盘
同步刷盘机制很好理解,即每次一定等写入成功磁盘才会返回生产者写入成功,此方式可靠性更高,但会一定程度上影响系统吞吐量。
后记
本文参考:
https://juejin.cn/post/7186880907582636069
Read More ~
Java 实现简单单机时间轮方案
时间轮的使用场景自不必多说,最近研究RocketMQ 5.0时,想简单写一个活跃下思路,遂写了下面的方案(没有参照任何代码,没有优化),主要做下记录。
package top.imyzt.learning.algorithm.timer;
import lombok.Getter;
import lombok.Setter;
import lombok.SneakyThrows;
import java.time.LocalDateTime;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
/**
* 简易单机时间轮
* @author imyzt
* @date 2023-11-13 22:03
*/
public class SingleTimingWheel {
public static void main(String[] args) throws InterruptedException {
TimingWheel timingWheel = new TimingWheel(60);
TimingWheel timingWheel2 = new TimingWheel(12);
while (true) {
System.out.print("请输入延时周期: ");
Scanner scanner = new Scanner(System.in);
String next = scanner.next();
if ("exit".equals(next)) {
timingWheel.shutdown();
timingWheel2.shutdown();
break;
}
String[] splits = next.split(",");
for (String split : splits) {
int delayTime = Integer.parseInt(split);
System.out.println("新生产一个任务, 延迟" + delayTime + "秒后执行" + ", 当前时间: " +
LocalDateTime.now() + ", 预计执行时间: " + LocalDateTime.now().plusSeconds(delayTime)
);
Task task = new Task(() -> Thread.currentThread().getName(), delayTime);
timingWheel.addTask(task);
timingWheel2.addTask(task);
}
}
TimeUnit.SECONDS.sleep(1);
System.exit(0);
}
}
class TimingWheel {
private final ExecutorService EXECUTOR_TASK_POOL;
private final ScheduledExecutorService SCHEDULED_TASK_POOL;
/**
* 时间轮周期
*/
private final int timer;
/**
* 记录每个刻度的任务
*/
private final List<LinkedList<Task>> secondWheel;
/**
* 刻度计数器
*/
private final AtomicInteger secondAtomic;
/**
* 任务队列
*/
private final Queue<Task> taskQueue;
/**
* 运行标记
*/
private boolean flag;
public TimingWheel(int timer) {
this.timer = timer;
this.secondWheel = IntStream.range(0, timer).mapToObj(d -> new LinkedList<Task>()).collect(Collectors.toList());
this.secondAtomic = new AtomicInteger(0);
this.taskQueue = new LinkedBlockingQueue<>();
this.EXECUTOR_TASK_POOL = Executors.newSingleThreadExecutor();
this.SCHEDULED_TASK_POOL = Executors.newSingleThreadScheduledExecutor();
this.flag = true;
this.init();
}
public void addTask(Task task) {
int delayTime = task.getDelayTime();
int targetRunSecond = delayTime + secondAtomic.get();
int cycle = delayTime / timer;
int index = targetRunSecond % timer;
task.setCycle(cycle);
System.out.printf("任务id: %s, 当前刻度: %s, cycle: %s, 计划执行刻度: %s \n", task.getTaskId(), secondAtomic.get(), cycle, index);
LinkedList<Task> tasks = secondWheel.get(index);
if (tasks == null) {
tasks = new LinkedList<>();
}
tasks.add(task);
}
public void shutdown() {
EXECUTOR_TASK_POOL.shutdown();
SCHEDULED_TASK_POOL.shutdown();
this.flag = false;
System.out.println("[" + timer + "]shutdown...");
}
@SneakyThrows
private void init () {
SCHEDULED_TASK_POOL.scheduleAtFixedRate(() -> {
int second = secondAtomic.getAndAdd(1);
if (second + 1 == timer) {
secondAtomic.set(0);
}
LinkedList<Task> tasks = secondWheel.get(second);
if (tasks != null && !tasks.isEmpty()) {
Iterator<Task> iterator = tasks.iterator();
while (iterator.hasNext()) {
Task task = iterator.next();
Integer taskCycle = task.getCycle();
if (taskCycle != 0) {
task.setCycle(taskCycle - 1);
System.out.println(task.getTaskId() + "还未到时间, 当前周期" + taskCycle);
continue;
}
taskQueue.add(task);
// 从队列中剔除
iterator.remove();
}
}
}, 0, 1, TimeUnit.SECONDS);
EXECUTOR_TASK_POOL.execute(() -> {
while (flag) {
Task task = taskQueue.poll();
if (task != null) {
System.out.println(LocalDateTime.now() + ", [" + timer + "]时间轮调度任务====>" + task);
}
}
});
}
}
@Getter
class Task {
private final Integer taskId;
/**
* 执行任务
*/
private final Supplier<String> runner;
/**
* 当前第几轮
*/
@Setter
private Integer cycle;
private final Integer delayTime;
/**
* 创建时间
*/
private final LocalDateTime createdAt;
/**
* 理应执行时间
*/
private final LocalDateTime runnerTime;
public Task(Supplier<String> runner, Integer delayTime) {
this.taskId = new Random().nextInt() * 10000;
this.runner = runner;
this.delayTime = delayTime;
this.createdAt = LocalDateTime.now();
this.runnerTime = this.createdAt.plusSeconds(delayTime);
}
@Override
public String toString() {
return "Task{" +
"taskId=" + taskId +
", runner=" + runner.get() +
", cycle=" + cycle +
", delayTime=" + delayTime +
", createdAt=" + createdAt +
", runnerTime=" + runnerTime +
'}';
}
}
Read More ~
RocketMQ消息生产基本流程
RocketMQ 相较于 RabbitMQ、Kafka 能够集成两者的不少优点(原生的延迟消息、支持事务消息、能够支撑较大流量),加之Java技术架构,所以已作为目前队列选型调研时的第一选择,本文主要以图解形式讲明白RocketMQ一条消息的基本流程。
核心概念
NameServer:注册中心,用于保存Topic的路由信息、管理Broker的存活状态;NameServer一般是多节点部署的,多个NameServer之间互不通信。
Broker:用于存储消息,在启动时会向NameServer注册自己的地址信息,启动后每30s向NameServer心跳报告健康状态;Broker实例可以有多个,相同的BrokerName为一组Broker,每个Broker组只保存一部分信息。
Topic:消息的主题,一个Topic的消息可以分布在不同的Broker组下。
Queue:一个Topic可以有很多Queue,默认一个Topic在同一个Broker组下是4个,如果一个Topic在2个Broker组中,则有可能是8个Queue。一个Queue只能被一个Consumer消费,一个Consumer可以同时消费多个Queue。
Producer:消息的生产者、可以成组出现(Producer Group)。
Consumer:消息的消费者,可以成组出现(Consumer Group)。
NameServer
NameServer是Broker和Topic路由的注册中心,支持Broker的动态注册与发现。
主要包含2个功能:
Broker管理:接受Broker集群的注册信息并且保存下来作为消息路由信息的基本数据,提供心跳监测机制,检查Broker是否还存活。
路由信息管理:每个NameServer都保存着Broker集群的整个路由信息和用于客户端查询的队列信息,Producer和Consumer可以通过NameServer获得整个Broker集群的路由信息,从而进行消息的投递和消费。
注册流程:
NameServer每个节点之间是不通信的,每个Broker在启动时,会给所有NameServer注册自己的信息,每30秒心跳上报自己的信息(BrokerId、Broker地址、名称、所属集群名称等信息),NameServer收到后会更新Broker的最新存活时间。
优缺点:
优点:NameServer集群搭建简单,随意启动即可(因为互不通信)
缺点:简单增加节点无效,Broker无法感知新NameServer不会向他注册信息和心跳。
路由剔除:
NameServer定时任务每10秒扫描一次Broker列表,当Broker最新心跳时间戳距离当前时间超过120秒,则将Broker从列表中剔除。
运维方案:
当需要升级RocketMQ集群时,可以依次将每个节点对外关闭读写权限。
Producer发送到Broker会收到无权读写而切换另一个Broker投递消息。
超过120秒不心跳,NameServer会自动将此Broker下线。
Producer每30秒会从NameServer拉取Broker信息,当NameServer记录为下线后此Broker也不会再有Producer进行消息投递。然后再关闭升级此Broker即可。
路由发现:
当Topic路由信息变化时,NameServer不会推送而是等客户端每30秒拉取一次最新的路由信息。
客户端(Producer)NameServer选择策略:
首先采用随机策略, 然后采用轮询策略
生成一个随机数,从配置的NameServer集群地址中根据数量取模然后连接,如果连接失败会切换为轮询逐个连接其他节点。
扩展:zk client如何选择zk server?
经过两次Shuffle,然后选择第一台ZK Server。
将配置文件中的zk server地址进行第一次shuffle,然后随机选择一个,这个选择出的一般都是hostname,然后获取到该hostname对应的所有IP,再对这些ip进行第二次shuffle。
Broker
Broker充当着消息中转角色,负责存储消息、转发消息。
Broker在RocketMQ系统中负责接收并存储从生产者发送来的消息。同时为消费者的拉取请求做准备。
Broker同时存储着消息相关的元数据,包括消费者组消费进度偏移offset、主题、队列等。
一个Topic存储在不同的Broker中,按Queue轮询存放,一个Queue在Master/Slave中存储多份做主备集群。
Broker节点集群是主从集群,Broker集群是主备集群。
Remoting Module: 整个Broker的实体,负责处理来自clients的请求,由以下模块构成。
Client Manager:客户端管理器,负责接受、解析来自客户端(Producer、Consumer)请求,管理客户端,比如维护Consumer的Topic订阅关系。
Store Service:存储服务,处理消息存储到物理硬盘和消息查询功能。
HA Service: 高可用服务,提供Master Broker和Slave Broker之间的数据同步功能。
Index Service: 索引服务,根据特定的Message Key,对投递到Broker的消息进行索引服务,同时提供根据 Message Key 对消息进行快速查询的功能。
消息的生产和发送
通过上面的流程,我们知道了NameServer和Broker的交互流程,Producer启动时,只指向了NameServer,并不知道Broker的信息。那么一条消息从Producer生产后,是如何投递到哪个Broker的哪个Queue上面的呢?
针对这种情况,RocketMQ采用了2种Queue选择算法:
轮询算法
最小投递延迟算法
Producer可以将消息写入到某个Broker中的Queue中,过程如下:
Producer发送消息前,会向获取NameServer获取Topic的路由信息
NameServer返回该Topic的路由表和Broker列表
Producer根据代码中指定的Queue选择策略,从Queue列表中选择一个队列
Producer对消息做处理,比如超过4M会进行压缩
Producer向选择出的Queue所在的broker发送RPC请求,将消息发送到选择出的Queue中
路由表:一个Map,Key为QueueData实例列表,QueueData并不是一个queue对应一个queueData,而是一个Broker中该Topic所有的QueueData对应一个QueueData。即只要涉及到该Topic的Broker,一个Broker对应一个QueueData。
路由表的key为Topic名称,value则为所有涉及该Topic的brokerName列表。
路由表(一个Topic分散在哪些Broker上面,方便消费者和生产者连接Queue):
Map<TopicName, List<BrokerName>>
Broker列表:一个Map,Key为BrokerName,Value为BrokerData,一个Broker对应一个BrokerData实例,一套BrokerName名称相同的Master-Slave小集群对应一个BrokerData
BrokerData中包含一个Map,key为brokerId,value为该broker对应的地址,brokerId=0表示master
Broker列表(一个集群中Broker的分布,方便消费者和生产者连接Broker):
Map<BrokerName, Map<brokerId, brokerAddress>
对于无序消息的Queue选择算法:
轮询算法
默认选择方法,保证每个Queue都均匀获得消息
问题:当Broker出问题时,此Broker上的Queue投递延迟严重时,会造成producer消息堆积。
最小投递延迟算法
统计每次消息投递的延迟,统计处投递延迟最小的Queue,如果延迟相同则选用轮询算法
问题:Queue上的消息分配不均,投递延迟小的Queue会存在大量消息,导致对应Queue的消费者要处理的消息很大。
Read More ~