RocketMQ 相较于 RabbitMQ、Kafka 能够集成两者的不少优点(原生的延迟消息、支持事务消息、能够支撑较大流量),加之Java技术架构,所以已作为目前队列选型调研时的第一选择,本文主要以图解形式讲明白RocketMQ一条消息的基本流程。
核心概念
- NameServer:注册中心,用于保存Topic的路由信息、管理Broker的存活状态;NameServer一般是多节点部署的,多个NameServer之间互不通信。
- Broker:用于存储消息,在启动时会向NameServer注册自己的地址信息,启动后每30s向NameServer心跳报告健康状态;Broker实例可以有多个,相同的BrokerName为一组Broker,每个Broker组只保存一部分信息。
- Topic:消息的主题,一个Topic的消息可以分布在不同的Broker组下。
- Queue:一个Topic可以有很多Queue,默认一个Topic在同一个Broker组下是4个,如果一个Topic在2个Broker组中,则有可能是8个Queue。一个Queue只能被一个Consumer消费,一个Consumer可以同时消费多个Queue。
- Producer:消息的生产者、可以成组出现(Producer Group)。
- Consumer:消息的消费者,可以成组出现(Consumer Group)。
NameServer
NameServer是Broker和Topic路由的注册中心,支持Broker的动态注册与发现。
主要包含2个功能:
-
Broker管理:接受Broker集群的注册信息并且保存下来作为消息路由信息的基本数据,提供心跳监测机制,检查Broker是否还存活。
-
路由信息管理:每个NameServer都保存着Broker集群的整个路由信息和用于客户端查询的队列信息,Producer和Consumer可以通过NameServer获得整个Broker集群的路由信息,从而进行消息的投递和消费。
注册流程:
NameServer每个节点之间是不通信的,每个Broker在启动时,会给所有NameServer注册自己的信息,每30秒心跳
上报自己的信息(BrokerId、Broker地址、名称、所属集群名称等信息),NameServer收到后会更新Broker的最新存活时间。
优缺点:
优点:NameServer集群搭建简单,随意启动即可(因为互不通信)
缺点:简单增加节点无效,Broker无法感知新NameServer不会向他注册信息和心跳。
路由剔除:
NameServer定时任务每10秒
扫描一次Broker列表,当Broker最新心跳时间戳距离当前时间超过120秒
,则将Broker从列表中剔除。
运维方案:
当需要升级RocketMQ集群时,可以依次将每个节点对外关闭读写权限。
- Producer发送到Broker会收到无权读写而切换另一个Broker投递消息。
- 超过120秒不心跳,NameServer会自动将此Broker下线。
- Producer每30秒会从NameServer拉取Broker信息,当NameServer记录为下线后此Broker也不会再有Producer进行消息投递。然后再关闭升级此Broker即可。
路由发现:
当Topic路由信息变化时,NameServer不会推送而是等客户端每30秒拉取一次最新的路由信息。
客户端(Producer)NameServer选择策略:
首先采用
随机策略
, 然后采用轮询策略
生成一个随机数,从配置的NameServer集群地址中根据数量取模然后连接,如果连接失败会切换为轮询逐个连接其他节点。
扩展:zk client如何选择zk server?
经过两次Shuffle,然后选择第一台ZK Server。
将配置文件中的zk server地址进行第一次shuffle,然后随机选择一个,这个选择出的一般都是hostname,然后获取到该hostname对应的所有IP,再对这些ip进行第二次shuffle。
Broker
Broker充当着消息中转角色,负责存储消息、转发消息。
Broker在RocketMQ系统中负责接收并存储从生产者发送来的消息。同时为消费者的拉取请求做准备。
Broker同时存储着消息相关的元数据,包括消费者组消费进度偏移offset、主题、队列等。
一个Topic存储在不同的Broker中,按Queue轮询存放,一个Queue在Master/Slave中存储多份做主备集群。
Broker节点集群是主从集群,Broker集群是主备集群。
- Remoting Module: 整个Broker的实体,负责处理来自clients的请求,由以下模块构成。
- Client Manager:客户端管理器,负责接受、解析来自客户端(Producer、Consumer)请求,管理客户端,比如维护Consumer的Topic订阅关系。
- Store Service:存储服务,处理
消息存储到物理硬盘
和消息查询
功能。 - HA Service: 高可用服务,提供
Master Broker
和Slave Broker
之间的数据同步功能。 - Index Service: 索引服务,根据特定的
Message Key
,对投递到Broker的消息进行索引服务,同时提供根据Message Key
对消息进行快速查询的功能。
消息的生产和发送
通过上面的流程,我们知道了NameServer和Broker的交互流程,Producer启动时,只指向了NameServer,并不知道Broker的信息。那么一条消息从Producer生产后,是如何投递到哪个Broker的哪个Queue上面的呢?
针对这种情况,RocketMQ采用了2种Queue选择算法:
- 轮询算法
- 最小投递延迟算法
Producer可以将消息写入到某个Broker中的Queue中,过程如下:
- Producer发送消息前,会向获取NameServer获取Topic的路由信息
- NameServer返回该Topic的
路由表
和Broker列表
- Producer根据代码中指定的Queue选择策略,从Queue列表中选择一个队列
- Producer对消息做处理,比如超过4M会进行压缩
- Producer向选择出的Queue所在的broker发送RPC请求,将消息发送到选择出的Queue中
路由表:一个Map,Key为QueueData实例列表,QueueData并不是一个queue对应一个queueData,而是一个Broker中该Topic所有的QueueData对应一个QueueData。即只要涉及到该Topic的Broker,一个Broker对应一个QueueData。
路由表的key为Topic名称,value则为所有涉及该Topic的brokerName列表。路由表(一个Topic分散在哪些Broker上面,方便消费者和生产者连接Queue):
Map<TopicName, List<BrokerName>>
Broker列表:一个Map,Key为BrokerName,Value为BrokerData,一个Broker对应一个BrokerData实例,一套BrokerName名称相同的Master-Slave小集群对应一个BrokerData
BrokerData中包含一个Map,key为brokerId,value为该broker对应的地址,brokerId=0表示master
Broker列表(一个集群中Broker的分布,方便消费者和生产者连接Broker):
Map<BrokerName, Map<brokerId, brokerAddress>
对于无序消息的Queue选择算法:
- 轮询算法
默认选择方法,保证每个Queue都均匀获得消息
问题:当Broker出问题时,此Broker上的Queue投递延迟严重时,会造成producer消息堆积。
- 最小投递延迟算法
统计每次消息投递的延迟,统计处投递延迟最小的Queue,如果延迟相同则选用轮询算法
问题:Queue上的消息分配不均,投递延迟小的Queue会存在大量消息,导致对应Queue的消费者要处理的消息很大。