我的编程空间,编程开发者的网络收藏夹
学习永远不晚

RocketMQ消息存储文件的加载与恢复机制源码分析

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

RocketMQ消息存储文件的加载与恢复机制源码分析

前言

前面文章我们介绍了Broker是如何将消息全量存储到CommitLog文件中,并异步生成dispatchRequest任务更新ConsumeQueue,IndexFile的过程以及ConsumeQueue和IndexFile的文件结构。由于是异步转发消息,就可能出现消息成功存储到CommitLog文件,转发请求任务执行失败,Broker宕机了,此时CommitLog和Index消息并未处理完,导致CommitLog与ConsumeQueue和IndexFile文件中的数据不一致。如果由一部分消息在CommitLog中存在,在ConsumeQueue中不存在,那么这部分消息Consumer将永远无法消费到了,那么Broker是如何保证数据一致性的呢?

StoreCheckPoint介绍

StoreCheckPoint的作用是记录CommitLog,ConsumeQueue和IndexFile的刷盘点,当Broker异常结束时会根据StoreCheckPoint的数据恢复,StoreCheckPoint属性如下

public class StoreCheckpoint {
    // commitLog最后一条信息的刷盘时间戳
    private volatile long physicMsgTimestamp = 0;
    // consumeQueue最后一个存储单元刷盘时间戳
    private volatile long logicsMsgTimestamp = 0;
    // 最近一个已经写完IndexFile的最后一条记录刷盘时间戳
    private volatile long indexMsgTimestamp = 0;
}

StoreCheckPoint文件的存储位置是${user.home}/store/checkpoint,文件的固定长度为4K,但StoreCheckPoint只占用了前24个字节,存储格式如下图所示

StoreCheckPoint时间戳更新时机

physicMsgTimestamp

FlushRealTimeService刷盘时更新

// org.apache.rocketmq.store.CommitLog.FlushRealTimeService#run
public void run() {
  // ...
  // 更新CommitLog刷盘时间戳
  if (storeTimestamp > 0) { 					       CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
  }
}

GroupCommitService刷盘时更新

// org.apache.rocketmq.store.CommitLog.GroupCommitService#doCommit
private void doCommit() {
  // ...
  // 更新CommitLog刷盘时间戳
  if (storeTimestamp > 0) {
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
  }
}

logicsMsgTimestamp

ConsumeQueue保存消息存储单元时更新

// org.apache.rocketmq.store.ConsumeQueue#putMessagePositionInfoWrapper
public void putMessagePositionInfoWrapper(DispatchRequest request, boolean multiQueue) {
  // ...
  // 如果consumeQueue保存成功,则更新ConsumeQueue存储点信息
  if (result) {
this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());
  }
}

ConsumeQueue刷盘时更新并触发StoreCheckPoint刷盘

// org.apache.rocketmq.store.DefaultMessageStore.FlushConsumeQueueService#doFlush
private void doFlush(int retryTimes) {
  // ...
  // 更新ConsumeQueue存储时间戳,并刷盘
  if (0 == flushConsumeQueueLeastPages) {
    if (logicsMsgTimestamp > 0) {
  DefaultMessageStore.this.getStoreCheckpoint().setLogicsMsgTimestamp(logicsMsgTimestamp);
    }
    // 更新存储点
    DefaultMessageStore.this.getStoreCheckpoint().flush();
  }
}

indexMsgTimestamp

// org.apache.rocketmq.store.index.IndexService#getAndCreateLastIndexFile
public IndexFile getAndCreateLastIndexFile() {
  // 获取最新IndexFile,如果IndexFile已经满了,需要创建一个新的IndexFile
  if (indexFile == null) {
          indexFile =
              new IndexFile(fileName, this.hashSlotNum, this.indexNum, lastUpdateEndPhyOffset,
                  lastUpdateIndexTimestamp);
			// 如果创建新的IndexFile成功,原IndexFile刷盘
      if (indexFile != null) {
          final IndexFile flushThisFile = prevIndexFile;
          Thread flushThread = new Thread(new Runnable() {
              @Override
              public void run() {
                	// indexFile刷盘
                  IndexService.this.flush(flushThisFile);
              }
          }, "FlushIndexFileThread");
          flushThread.setDaemon(true);
          flushThread.start();
      }
  }
  return indexFile;
}
// org.apache.rocketmq.store.index.IndexService#flush
public void flush(final IndexFile f) {
    if (null == f)
        return;
    long indexMsgTimestamp = 0;
    if (f.isWriteFull()) {
        indexMsgTimestamp = f.getEndTimestamp();
    }
    f.flush();
    if (indexMsgTimestamp > 0) {
        // 更新checkPoint的indexMsgTimestamp并触发刷盘
        this.defaultMessageStore.getStoreCheckpoint().setIndexMsgTimestamp(indexMsgTimestamp);
        this.defaultMessageStore.getStoreCheckpoint().flush();
    }
}
  • 保存消息Index,获取最新的IndexFile如果满了,则会创建一个新的IndexFile,并且更新IndexMsgTimestamp并触发StoreCheckPoint刷盘

StoreCheckPoint刷盘源码

StoreCheckPoint刷盘源码如下所示,就是将CommitLog,ConsumeQueue和IndexFile刷盘时间戳持久化到硬盘上,由上面源码可知它的刷盘触发时机

  • ConsumeQueue刷盘时触发
  • 创建新IndexFile文件时触发

StoreCheckPoint刷盘源码如下

// org.apache.rocketmq.store.StoreCheckpoint#flush
public void flush() {
    this.mappedByteBuffer.putLong(0, this.physicMsgTimestamp);
    this.mappedByteBuffer.putLong(8, this.logicsMsgTimestamp);
    this.mappedByteBuffer.putLong(16, this.indexMsgTimestamp);
    this.mappedByteBuffer.force();
}

消息加载源码分析

在BrokerController启动时会调用DefaultMessageStore#load加载存储文件加载和恢复过程主要分为下面几步

  • 判断Broker上次是否正常退出。这个判断逻辑是根据${user.home}/store/abort是否存在。如果文件存在,说明上次是异常退出,如果文件不存在,则说明是正常退出。
  • 加载CommitLog
  • 加载ConsumeQueue
  • 加载StoreCheckPoint
  • 加载IndexFile
  • 恢复ConsumeQueue与IndexFile
  • 加载延迟队列服务
// org.apache.rocketmq.store.DefaultMessageStore#load
public boolean load() {
    boolean result = true;
    try {
      	// 1. Broker上次是否正常退出	
        boolean lastExitOK = !this.isTempFileExist();
        log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally");
        // 2. 加载commitLog
        result = result && this.commitLog.load();
				// 3. 加载consumeQueue
        result = result && this.loadConsumeQueue();
        if (result) {
            // 4. 加载StoreCheckPoint
            this.storeCheckpoint =
                new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
            // 5. 加载IndexFile
            this.indexService.load(lastExitOK);
            // 6. 恢复ConsumeQueue与IndexFile
            this.recover(lastExitOK);
						// 7. 延迟队列服务加载
            if (null != scheduleMessageService) {
                result =  this.scheduleMessageService.load();
            }
        }
    } 
    return result;
}

CommitLog加载

前面文章介绍过,CommitLog文件的存储目录是${user.home}/store/commitlog/,并且CommitLog文件的底层是MappedFile,由MappedFileQueue管理。

CommitLog文件的加载其实调用的是MappedFileQueue#load方法,代码如下所示,load()中首先加载CommitLog文件目录下的所有文件,并调用doLoad()方法加载CommitLog。

// org.apache.rocketmq.store.MappedFileQueue#load
public boolean load() {
    File dir = new File(this.storePath);
    File[] ls = dir.listFiles();
    if (ls != null) {
        return doLoad(Arrays.asList(ls));
    }
    return true;
}

MappedFile的加载过程如下所示,核心逻辑主要分为下面三步

  • 按照文件名称将文件排序,排序好的文件就会按照消息保存的先后顺序存放在列表中
  • 校验文件大小与mappedFile是否一致,如果commitLog文件大小与mappedFileSize不一致,则说明配置被改了,或者CommitLog文件被修改
  • 创建mappedFile,并且设置wrotePosition,flushedPosition,committedPosition为mappedFileSize
public boolean doLoad(List<File> files) {
    // 按照文件名称排序
    files.sort(Comparator.comparing(File::getName));
    for (File file : files) {
      	// 如果commitLog文件大小与mappedFileSize不一致,则说明配置被改了,或者CommitLog文件被修改
        if (file.length() != this.mappedFileSize) {
            return false;
        }
        try {
          	// 创建MappedFile
            MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);
            mappedFile.setWrotePosition(this.mappedFileSize);
            mappedFile.setFlushedPosition(this.mappedFileSize);
            mappedFile.setCommittedPosition(this.mappedFileSize);
            this.mappedFiles.add(mappedFile);
        } 
    }
    return true;
}

看到这里肯定会有疑问,加载后的MappedFile的wrotePosition,flushedPosition和committedPosition的值都为mappedFileSize,如果最后一个MappedFile没有使用完,Broker启动后还会从最后一个MappedFile开始写么?我们可以在后面消息文件恢复源码分析找到答案。

ConsumeQueue加载

从前面文章我们知道,ConsumeQueue文件底层其实也是MappedFile,因此ConsumeQueue文件的加载与CommitLog加载差别不大。ConsumeQueue加载逻辑为

  • 获取ConsumeQueue目录下存储的所有Topic目录,遍历Topic目录
  • 遍历每个Topic目录下的所有queueId目录,逐个加载ququeId中的所有MappedFile
// org.apache.rocketmq.store.DefaultMessageStore#loadConsumeQueue
private boolean loadConsumeQueue() {
  // 获取consumeQueue目录
  File dirLogic = new File(StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()));
  // topic文件夹数组
  File[] fileTopicList = dirLogic.listFiles();
  if (fileTopicList != null) {
      // 遍历topic	
      for (File fileTopic : fileTopicList) {
          // 获取topic名称
          String topic = fileTopic.getName();
          // 获取queueId文件夹数组
          File[] fileQueueIdList = fileTopic.listFiles();
          // 遍历queueId
          if (fileQueueIdList != null) {
              for (File fileQueueId : fileQueueIdList) {
                  int queueId;
                  // 文件夹名称就是queueId
                  queueId = Integer.parseInt(fileQueueId.getName());
                  // 构建consumeQueue
                  ConsumeQueue logic = new ConsumeQueue();
                  this.putConsumeQueue(topic, queueId, logic);
                  // ConsumeQueue加载
                  if (!logic.load()) {
                      return false;
                  }
              }
          }
      }
  }
  return true;
}

IndexFile加载

IndexFile文件加载过程调用的是IndexService#load,首先获取${user.home}/store/index目录下的所有文件,遍历所有文件,如果IndexFile最后存储时间大于StoreCheckPoint中indexMsgTimestamp,则会先删除IndexFile

// org.apache.rocketmq.store.index.IndexService#load
public boolean load(final boolean lastExitOK) {
    // indexFile文件目录
    File dir = new File(this.storePath);
    // indexFile文件列表
    File[] files = dir.listFiles();
    if (files != null) {
        // 文件排序
        Arrays.sort(files);
        for (File file : files) {
            try {
                IndexFile f = new IndexFile(file.getPath(), this.hashSlotNum, this.indexNum, 0, 0);
                f.load();
                if (!lastExitOK) {
                    // 文件最后存储时间戳大于刷盘点,则摧毁indexFile,重建
                    if (f.getEndTimestamp() > this.defaultMessageStore.getStoreCheckpoint()
                        .getIndexMsgTimestamp()) {
                        f.destroy(0);
                        continue;
                    }
                }
                this.indexFileList.add(f);
            } 
        }
    }
    return true;
}

ConsumeQueue与IndexFile恢复

如果是正常退出,数据都已经正常刷盘,前面我们说到CommitLog在加载时的wrotePosition,flushedPosition,committedPosition都设置为mappedFileSize,

因此即使是正常退出,也会调用CommitLog#recoverNormally找到最后一条消息的位置,更新这三个属性。

// org.apache.rocketmq.store.DefaultMessageStore#recover
private void recover(final boolean lastExitOK) {
    // consumeQueue中最大物理偏移量
    long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue();
    if (lastExitOK) {
        // 正常退出文件恢复
        this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue);
    } else {
        // 异常退出文件恢复
        this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue);
    }
    // 恢复topicQueueTable
    this.recoverTopicQueueTable();
}

正常恢复的源码如下,由于Broker是正常关闭,因此CommitLog,ConsumeQueue与IndexFile都已经正确刷盘,并且三者的消息是一致的。正常恢复的主要目的是找到找到最后一条消息的偏移量,然后更新CommitLog的MappedFileQueue中的刷盘点(flushWhere)和提交点(committedWhere),

  • 从最后3个mappedFile开始恢复,如果mappedFile总数不足3个,则从第0个mappedFile开始恢复
  • 逐个遍历mappedFile,找到每个MappedFile的最后一条消息的偏移量,并将其更新到CommitLog中MappedFileQueue的刷盘点和提交点中
  • 清除ConsumeQueue冗余数据
    public void recoverNormally(long maxPhyOffsetOfConsumeQueue) {
        // 确认消息是否完整,默认是true
        boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
        final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
        if (!mappedFiles.isEmpty()) {
            // 默认从最后3个mappedFile开始恢复
            int index = mappedFiles.size() - 3;
            // 如果commitLog不足三个,则从第一个文件开始恢复
            if (index < 0)
                index = 0;
            MappedFile mappedFile = mappedFiles.get(index);
            ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
            // 最后一个MappedFile的文件起始偏移量
            long processOffset = mappedFile.getFileFromOffset();
            // mappedFileOffset偏移量
            long mappedFileOffset = 0;
            // 遍历CommitLog文件
            while (true) {
                // 校验消息完整性
                DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
                // 获取消息size
                int size = dispatchRequest.getMsgSize();
                // 返回结果为true并且消息size>0,说明消息是完整的
                if (dispatchRequest.isSuccess() && size > 0) {
                    mappedFileOffset += size;
                }
            }
            // 最大物理偏移量
            processOffset += mappedFileOffset;
            // 更新flushedWhere和committedPosition指针
            this.mappedFileQueue.setFlushedWhere(processOffset);
            this.mappedFileQueue.setCommittedWhere(processOffset);
            this.mappedFileQueue.truncateDirtyFiles(processOffset);
            // 清除ConsumeQueue冗余数据
            if (maxPhyOffsetOfConsumeQueue >= processOffset) {
                this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
            }
        } 
    }

异常恢复源码如下,由于上次Broker没有正常关闭,因此由可能存在CommitLog、ConsumeQueue与IndexFile不一致的情况,因此在异常恢复时可能需要恢复ConsumeQueue和IndexFile,异常恢复核心逻辑主要包括

  • 倒序查CommitLog的mappedFile文件,找到第一条消息存储的时间戳比StoreCheckPoint里的physicMsgTimestamp,logicsMsgTimestamp和indexMsgTimestamp三者都小的最大MappedFile,该mappedFile至少有一部分消息是被正常转发,正常存储,正常刷盘的
  • 从该mappedFile开始逐条转发消息,重新恢复ConsumeQueue和IndexFile
  • 当遍历到最后一条消息,将其偏移量更新到CommitLog中MappedFileQueue的刷盘点和提交点中
  • 清除ConsumeQueue冗余数据
// org.apache.rocketmq.store.CommitLog#recoverAbnormally
public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) {
    // 是否CRC校验
    boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
    final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
    if (!mappedFiles.isEmpty()) {
        // 最后一个mappedFile的index
        int index = mappedFiles.size() - 1;
        MappedFile mappedFile = null;
        // 倒序遍历mappedFile数组,
        for (; index >= 0; index--) {
            mappedFile = mappedFiles.get(index);
            // 1. 如果第一条消息的时间戳小于存储点时间戳
            if (this.isMappedFileMatchedRecover(mappedFile)) {
                break;
            }
        }
        long processOffset = mappedFile.getFileFromOffset();
        long mappedFileOffset = 0;
        while (true) {
            DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
            int size = dispatchRequest.getMsgSize();
            if (dispatchRequest.isSuccess()) {
                if (size > 0) {
                    mappedFileOffset += size;
                    // 2. 转发消息
                    if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
                        if (dispatchRequest.getCommitLogOffset() < this.defaultMessageStore.getConfirmOffset()) {
                            this.defaultMessageStore.doDispatch(dispatchRequest);
                        }
                    } else {
                        this.defaultMessageStore.doDispatch(dispatchRequest);
                    }
                }
        }
				// 3. 更新MappedFileQueue中的刷盘位置和提交位置
        processOffset += mappedFileOffset;
        this.mappedFileQueue.setFlushedWhere(processOffset);
        this.mappedFileQueue.setCommittedWhere(processOffset);
        this.mappedFileQueue.truncateDirtyFiles(processOffset);
        // 清除ConsumeQueue中的冗余数据
        if (maxPhyOffsetOfConsumeQueue >= processOffset) {
            this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
        }
    }
}

总结

Broker启动时会分别加载CommitLog、ConsumeQueue与IndexFile。加载完成后,如果Broker上次是正常退出,只需要找到CommitLog的最后一条消息,并更新刷盘点和提交点。如果Broker上次是异常退出,就有可能出现ConsumeQueue、IndexFile与CommitLog不一致的情况,需要根据StoreCheckPoint存储的时间戳从CommitLog找到消息,逐条恢复ConsumeQueue与IndexFile。

以上就是RocketMQ | 源码分析】消息存储文件的加载与恢复机制的详细内容,更多关于RocketMQ 消息存储文件加载恢复的资料请关注编程网其它相关文章!

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

RocketMQ消息存储文件的加载与恢复机制源码分析

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

RocketMQ消息存储文件的加载与恢复机制源码分析

这篇文章主要介绍了RocketMQ源码分析之消息存储文件的加载与恢复机制详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
2023-05-19

RocketMQ消息存储文件的加载与恢复机制是什么

RocketMQ的消息存储文件加载与恢复机制主要包括两个方面:文件的加载和文件的恢复。文件加载:RocketMQ使用MmappedFile来加载消息存储文件。MmappedFile是一种内存映射文件,通过将文件映射到内存中,可以提高文件的读
RocketMQ消息存储文件的加载与恢复机制是什么
2024-04-09

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录