目 录CONTENT

文章目录

RocketMQ—存储、检索、刷盘、过期删除机制

路口、下车
2026-01-20 / 0 评论 / 0 点赞 / 18 阅读 / 0 字
温馨提示:
本文最后更新于2026-01-20,若内容或图片失效,请留言反馈。 部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

存储

Broker存储目录结构

  • Commitlog:这是一个目录,其中包含具体的commitlog文件。文件名长度为20个字符,文件名由该文件保存消息的最大物理offset值在高位补0组成。每个文件大小一般是1GB,可以通过mapedFileSizeCommitLog进行配置。
  • consumequeue:这是一个目录,包含该 Broker 上所有的 Topic 对应的消费队列文件信息。消费队列文件的格式
  • 为“./consumequeue/Topic名字/queue id/具体消费队列文件”。每个消费队列其实是commitlog的一个索引,提供给消费者做拉取消息、更新位点使用。
  • Index:这是一个目录,全部的文件都是按照消息key创建的Hash索引。文件名是用创建时的时间戳命名的。
  • Config:这是一个目录,保存了当前Broker中全部的Topic、订阅关系和消费进度。这些数据Broker会定时从内存持久化到磁盘,以便宕机后恢复。
  • abort:Broker 是否异常关闭的标志。正常关闭时该文件会被删除,异常关闭时则不会。当Broker重新启动时,根据是否异常宕机决定是否需要重新构建Index索引等操作。
  • checkpoint:Broker最近一次正常运行时的状态,比如最后一次正常刷盘的时间、最后一次正确索引的时间等。

QQ_1768868206976.png

Tips:如果没有在配置文件中显式指定存储位置,这些文件默认存放在Linux的"/root/store"目录下

QQ_1768868273970.png

消息的构成

QQ_1768868339352.png

CommitLog的作用

QQ_1768868377787.png
CommitLog目录下有多个CommitLog文件。其实CommitLog只有一个文件,为了方便保存和读写被切分为多个子文件,所有的子文件通过其保存的第一个和最后一个消息的物理位点进行连接。

QQ_1768868423754.png
Broker按照时间和物理的offset顺序写CommitLog文件,每次写的时候需要加锁,每个 CommitLog 子文件的大小默认是1GB(1024×1024×1024B),可以通过mapedFileSizeCommitLog进行配置。当一个CommitLog写满后,创建一个新的CommitLog,继续上一个ComiitLog的Offset写操作,直到写满换成下一个文件。所有CommitLog子文件之间的Offset是连续的,所以最后一个CommitLog总是被写
入的。

索引机制

利用MessageID提取消息

MessageID就是用broker+offset生成的(这里MsgId指的是服务端的),所以很容易就找到对应的commitLog文件来读取消息。

基于ConsumeQueue实现Tag查询

QQ_1768868629755.png
处理过程
第一步,消费者要获取tag=20220101的消息,首先通过执行"20220101".hashcode=99得到Hash值;
第二步,在ConsumeQueue文件中查找hash(tag)=99的offset数据;
第三步,根据物理位置Offset到对应的CommitLog文件中提取消息,因为可能会出现Hash碰撞,所以再次对这些命中数据以字符串匹配方式筛选"20220101"的消息;
第四步,将“20220101”消息提取,封装为Message对象返回。

QQ_1768868767420.png

基于IndexFile实现Key查询

QQ_1768868796469.png
每个Index File文件包含文件头、Hash槽位、索引数据。每个文件的Hash槽位个数、索引数据个数都是固定的。Hash 槽位可以通过Broker 启动参数 maxHashSlotNum进行配置,默认值为500万;索引数据可以通过Broker启动参数maxIndexNum进行配置,默认值为500万×4=2000万,一个Index File约为400MB。

QQ_1768868850080.png

每个Index File文件包含文件头、Hash槽位、索引数据。每个文件的Hash槽位个数、索引数据个数都是固定的。

基于Slot+链表的查询过程

QQ_1768868890007.png
Index File的索引设计在一定程度上参考了Java中的HashMap设计,只是当Index File遇到Hash碰撞时只会用链表,而Java 8中在一定情况下链表会转化为红黑树。
参考文档:https://www.cnblogs.com/xjwhaha/p/15772846.html

QQ_1768869151647.png

Broker过期文件删除机制

由于RocketMQ操作CommitLog、ConsumeQueue文件是基于内存映射机制并在启动的时候回加载CommitLog、ConsumeQueue目录下的所有文件,为了避免内存与磁盘的浪费,不可能将消息永久存储在消息服务器上,所以要引入一种机制来删除已过期的文件。RocketMQ顺序写CommitLog、ConsumeQueue文件,所有写操作全部落在最后一个CommitLog或者ConsumerQueue文件上,之前的文件在下一个文件创建后将不会再被更新。
RocketMQ清除过期文件的方法时:如果当前文件在在一定时间间隔内没有再次被消费,则认为是过期文件,可以被删除,RocketMQ不会关注这个文件上的消息是否全部被消费。默认每个文件的过期时间为72小时通过在Broker配置文件中设置fileReservedTime来改变过期时间,单位为小时。

默认消息清理的时机

Broker每10秒检查一次文件,扫描是否存在CommitLog满足删除条件

删除文件的条件

  • 第一,默认凌晨4点,删除72小时内没有产生消费的CommitLog文件;
  • 第二,磁盘使用空间超过85%;
  • 第三,手动执行删除。

删除CommitLog的执行过程

QQ_1768869343018.png

  • 第一步:克隆全部的 CommitLog 文件。CommitLog 文件可能随时有数据写入,为了不影响正常写入,所以克隆一份来操作。
  • 第二步:检查每一个CommitLog文件是否过期,如果已过期则立即进行删除。在删除前会做一系列检查:检查文件被引用的次数、清理映射的所有内存数据对象、释放内存。清理完成后,删除物理文件。
  • 第三步:上一步的删除有可能失败,比如有线程引用该过期文件,内存映射清理失败等,都可能导致删除失败。如果文件已经关闭,删除前检查没有通过,则可以通过第二次删除来处理,如果删除失败,间隔120秒后再次尝试执行重试删除直到文件删除。

ConsumeQueue、IndexFile文件的删除过程

Consume Queue和Index File都是索引文件,在CommitLog文件被删除后,对应的索引文件其实没有存在的意义,并且占用磁盘空间,所以这些文件应该被删除。
RocketMQ的删除策略是定时检查,满足删除条件时会删除过期或者无意义的文件。
执行过程如下

  • CommitLog.getMinOffset获取当前日志最小偏移量,当前提交日志的最小偏移量对比上一次查询的值发生了变化,也就是说必然是最少一个提交日志文件被删除,那么相应的在消费队列中的过期数据也可以被删除,就执行后面的流程。反之,则意味着不需要执行任何操作,结束方法即可。

QQ_1768869567633.png

  • 遍历其中每一个ConsumeQueue对象,删除过期的消费队列文件以及更新消费队列的最小偏移量;
  • 如果有删除了ConsumeQueue文件,休眠一小段时间,继续对下一个ConsumeQueue检查执行删除操作。

IndexFile索引文件与ConsumeQueue删除逻辑相同,不做赘述。

同步刷盘与异步刷盘

RocketMQ的消息是存储到磁盘上的,这样既能保证断电后恢复,又可以让存储的消息量超出内存的限制。

QQ_1768869692680.png

RocketMQ为了提高性能,会尽可能地保证磁盘的顺序写。消息在通过Producer写入RocketMQ的时候,有两种写磁盘方式:

同步刷盘

在返回应用写成功状态前,消息已经被写入磁盘。具体流程是,消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘,然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,给应用返回消息写成功的状态。

  • 优点:可以保持MQ的消息状态和生产者/消费者的消息状态一致
  • 缺点:性能比异步的低

异步刷盘

异步刷盘方式:在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘操作,快速写入

  • 优点:性能高
  • 缺点:Master宕机,磁盘损坏的情况下,会丢失少量的消息, 导致MQ的消息状态和生产者/消费者的消息状态不一致

配置方式

single-master.conf

#同步刷盘
flushDiskType=SYNC_FLUSH
或者
#异步刷盘
flushDiskType=ASYNC_FLUSH
0

评论区