我的编程空间,编程开发者的网络收藏夹
学习永远不晚

RocketMQ Broker消息如何刷盘源码解析

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

RocketMQ Broker消息如何刷盘源码解析

前言

我们在学习RocketMQ的时候,我们知道RocketMQ的刷盘策略有两个刷盘策略

  • 同步刷盘

同步刷盘即Broker消息已经被持久化到硬盘后才会向客户端返回成功。同步刷盘的优点是能保证消息不丢失,但是这是以牺牲写入性能为代价的。

  • 异步刷盘

异步刷盘是指Broker将信息存储到pagecache后就立即向客户端返回成功,然后会有一个异步线程定时将内存中的数据写入磁盘,默认时间间隔为500ms。

Broker中的刷盘策略是通过Broker配置文件中flushDiskType进行配置,可以配置ASYNC_FLUSH(异步刷盘)和SYNC_FLUSH(同步刷盘),默认配置是ASYNC_FLUSH

Broker的刷盘采用基于JDK NIO技术,消息首先会存储到内存中,然后再根据不同的刷盘策略在不同时间刷盘,如果有不了解的小伙伴可以参考这篇文章《【NIO实战】深入理解FileChannel》

刷盘相关类介绍

CommitLog中的内部类FlushCommitLogService及其子类CommitRealTimeService、GroupCommitService、FlushRealTimeService分别是用于不同场景下用于刷盘的刷盘行为,他们会单独或者配合起来使用。具体类图如下所示。

如果是同步刷盘会使用GroupCommitService。如果是异步刷盘,并且关闭了堆外缓存(TransientStorePool),则采用FlushRealTimeService刷盘。如果是异步刷盘,并且开启了堆外缓存,则会使用FlushRealTimeService与CommitRealTimeService配合刷盘。

默认的输盘策略是异步关闭堆外缓存,因此默认是采用FlushRealTimeService进行刷盘

Broker刷盘源码分析

消息刷盘相关逻辑都是围绕在CommitLog,因此要想知道消息时如何刷盘的关键是研究CommitLog

CommitLog构造&属性赋值

CommitLog中与刷盘相关的属性有flushCommitLogService、commitLogService。如果是同步刷盘则在构造函数中会给flushCommitLogService赋值GroupCommitService,如果是异步刷盘则给flushCommitLogService赋值FlushRealTimeService。commitLogService的值是CommitRealTimeService,从上面我们可以很明显的看出它只有在异步且开启TransientStorePoolEnabled时才会被使用。

public class CommitLog {
  // 如果是同步刷盘,则是GroupCommitService。如果是异步刷盘则是FlushRealTimeService
  // 默认是异步刷盘,因此是CommitLog$FlushRealTimeService
  private final FlushCommitLogService flushCommitLogService;
  // 开启TransientStorePoolEnable时使用CommitRealTimeService
  private final FlushCommitLogService commitLogService;
	// 构造函数
  public CommitLog(final DefaultMessageStore defaultMessageStore) {
      // 默认是异步刷盘,因此这里是false
      if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
          this.flushCommitLogService = new GroupCommitService();
      } else {
          this.flushCommitLogService = new FlushRealTimeService();
      }
      this.commitLogService = new CommitRealTimeService();
      // 消息回调
      this.appendMessageCallback = new DefaultAppendMessageCallback();
      flushDiskWatcher = new FlushDiskWatcher();
  }
}

TransientStorePoolEnabled介绍

transientStorePoolEnabled配置的默认值为false,开启transientStorePoolEnabled需要手动开启。如果开启transientStorePoolEnabled会开启堆外内存存储池,Broker在启动时会申请5个与CommitLog大小(1GB)相同的堆外内存交给TransientStorePool,创建MappedFile时会向TransientStorePool“借”一个堆外内存ByteBuffer,保存消息时会先将消息保存到堆外内存ByteBuffer中,然后在commit到MappedFile的FileChannel,最后再flush到硬盘中。TransientStorePool属性和一些核心方法源码如下,堆外内存ByteBuffer都是由它来管理。

// org.apache.rocketmq.store.TransientStorePool
public class TransientStorePool {
    // 存储池大小,默认是5
    private final int poolSize;
    // CommitLog MappedFile文件大小,默认1GB
    private final int fileSize;
    // 默认存5个ByteBuffer
    private final Deque<ByteBuffer> availableBuffers;
    // 消息存储配置
    private final MessageStoreConfig storeConfig;
		// TransientStorePool初始化
    public void init() {
        // 默认是5
        for (int i = 0; i < poolSize; i++) {
            // 分配1GB的直接内存
            ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize);
            final long address = ((DirectBuffer) byteBuffer).address();
            Pointer pointer = new Pointer(address);
            LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize));
            // 生成的缓存保存到队列中
            availableBuffers.offer(byteBuffer);
        }
    }
    // 归还缓冲
    public void returnBuffer(ByteBuffer byteBuffer) {
        // 修改position和limit,"清空"缓冲
        byteBuffer.position(0);
        byteBuffer.limit(fileSize);
      	// 缓冲入队
        this.availableBuffers.offerFirst(byteBuffer);
    }
    // 向TransientStorePool借缓冲
    public ByteBuffer borrowBuffer() {
      	// 缓冲出队
        ByteBuffer buffer = availableBuffers.pollFirst();
        return buffer;
    }
}

消息保存源码分析

前面文章《【RocketMQ | 源码分析】Broker是如何保存消息的? 》我们虽然介绍了消息的保存过程,但是开启或者关闭TransientStorePoolEnabled时,消息保存的细节是不同的,我们再打开消息保存MappedFile的源码如下,下面代码中如果writeBuffer不空,则会将消息先追加到writeBuffer,否者直接写入到MappedFile的内存映射文件中。

// org.apache.rocketmq.store.MappedFile#appendMessagesInner
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb,
        PutMessageContext putMessageContext) {
    // 如果写文件位置小于文件size
    if (currentPos < this.fileSize) {
        // 如果writeBuffer不空,则获取writeBuffer的浅拷贝,否则获取MappedFile的内存映射(MappedByteBuffer)的浅拷贝
        ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
        byteBuffer.position(currentPos);
        AppendMessageResult result;
        // 如果是单条消息
        if (messageExt instanceof MessageExtBrokerInner) {
            result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
                    (MessageExtBrokerInner) messageExt, putMessageContext);
        } // ...如果是批量消息
        return result;
    }
}

那么什么情况下MappedFile中的writeBuffer为空,什么情况下writeBuffer不为空呢?我们可以先来了解MappedFile是如何创建的,MappedFile是由AllocateMappedFileService创建的,具体源码如下,如果开启了TransientStorePoolEnabled,则在创建MappedFile时会向TransientStorePool“借”一个ByteBuffer,如果没有开启TransientStorePoolEnabled,MappedFile中的writeBuffer是空,在保存数据时会将数据直接保存到MappedFile的直接内存映射(MappedByteBuffer)中。

private boolean mmapOperation() {
  // ...
  if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
      try {
          mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
        	// 初始化mappedFile会向TransientStorePool"借"一个writeBuffer
          mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
      } catch (RuntimeException e) {
          mappedFile = new MappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
      }
  } else {
    	// 创建MappedFile,没有writeBuffer
      mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
  }
  // ...
}

由上可知,消息保存如下图所示

消息刷盘入口方法源码分析

消息保存和刷盘的入口方法CommitLog#asyncPutMessage,消息保存到mappedFile的缓存后,最后会调用submitFlushRequest方法提交刷盘请求,Broker会根据刷盘策略进行刷盘。

public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
    //... 保存消息
    result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
    // ...
    // 提交刷盘请求
    CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
    // 提交复制请求
    CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);
    // 合并提交刷盘请求和提交复制请求结果
    return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
        if (flushStatus != PutMessageStatus.PUT_OK) {
            putMessageResult.setPutMessageStatus(flushStatus);
        }
        if (replicaStatus != PutMessageStatus.PUT_OK) {
            putMessageResult.setPutMessageStatus(replicaStatus);
        }
        return putMessageResult;
    });
}

提交了刷盘请求后,根据刷盘策略,是否开启堆外缓存,推送消息中是否要等待消息保存有如下四种刷盘方式

  • 异步刷盘(关闭TransientStorePoolEnabled)

异步刷盘(关闭TransientStorePoolEnabled)是默认的刷盘方案,这个刷盘方案先会**异步唤醒(wakeup)**FlushRealTimeService,然后直接返回消息保存成功。由于关闭了TransientStorePoolEnabled,消息是保存到MappedFile中的内存映射文件MappedByteBuffer,FlushRealTimeService将定时MappedByteBuffer刷到磁盘。

  • 异步刷盘(开启TransientStorePoolEnabled)

异步刷盘(开启TransientStorePoolEnabled)会先**异步唤醒(wakeup)**CommitRealTimeService,然后直接返回消息保存成功。由于开启了TransientStorePoolEnabled,消息会保存到MappedFile中的内存映射文件ByteBuffer,CommitRealTimeService定时将ByteBuffer中的数据刷到FileChannel中。

  • 同步刷盘(等待消息保存)

同步刷盘(等待消息保存)会先创建一个刷盘请求(GroupCommitRequest),然后向GroupCommitService提交刷盘请求,最后等待刷盘结果并返回

  • 同步刷盘(不等待消息保存)

同步刷盘(不等待消息保存)也是通过GroupCommitService刷盘,与等待消息保存不同的是不等待的方式异步唤醒(wakeup)GroupCommitService后,直接返回消息保存成功。

四种刷盘方式源码如下所示

public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, MessageExt messageExt) {
    // 同步刷盘
    if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
        // 获取同步刷盘Service
        final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
        if (messageExt.isWaitStoreMsgOK()) {
            // 创建GroupCommitRequest 刷盘偏移量nextOffset = 当前写入偏移量 + 当前消息写入大小
            GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
                    this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
            // 向刷盘监视器(flushDistWatch)提交刷盘请求
            flushDiskWatcher.add(request);
            // 提交刷盘请求,并且唤醒同步刷盘线程
            service.putRequest(request);
            return request.future();
        } else {
            // 同步刷盘,但是不需要等待刷盘结果,那么唤醒同步刷盘线程
            service.wakeup();
            return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
        }
    }
    // 异步刷盘
    else {
        // 是否启动了堆外缓存
        if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
            // 如果没有启动堆外缓存,则唤醒异步刷盘服务 flushRealTimeService
            flushCommitLogService.wakeup();
        } else  {
            // 如果启动了堆外缓存,则唤醒异步转存服务CommitRealTimeService
            commitLogService.wakeup();
        }
        return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
    }
}

将上面四种场景及调用关系如下图所示

总结

本篇文章介绍了TransientStorePool机制以及开启和管理队消息保存的影响,我们还介绍了RocketMQ中四种刷盘策略

  • 同步刷盘-等待消息保存到磁盘
  • 同步刷盘-不等待消息保存到磁盘上
  • 异步刷盘-开启堆外缓存
  • 异步刷盘-不开启堆外缓存

以上就是RocketMQ Broker消息如何刷盘源码解析的详细内容,更多关于RocketMQ Broker消息刷盘的资料请关注编程网其它相关文章!

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

RocketMQ Broker消息如何刷盘源码解析

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

RocketMQ Broker消息如何刷盘源码解析

这篇文章主要为大家介绍了RocketMQ Broker消息如何刷盘源码解析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
2023-05-19

RocketMQ 源码分析Broker消息刷盘服务

这篇文章主要为大家介绍了RocketMQ 源码分析Broker消息刷盘服务示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
2023-05-19

RocketMQ Broker如何保存消息源码解析

这篇文章主要为大家介绍了RocketMQ源码分析Broker如何保存消息详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
2023-05-19

RocketMQ源码分析之Broker过期消息清理机制

这篇文章主要为大家介绍了RocketMQ源码分析之Broker过期消息清理机制示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
2023-05-19

Android下拉刷新完全解析,教你如何一分钟实现下拉刷新功能(附源码)

最近项目中需要用到ListView下拉刷新的功能,一开始想图省事,在网上直接找一个现成的,可是尝试了网上多个版本的下拉刷新之后发现效果都不怎么理想。有些是因为功能不完整或有Bug,有些是因为使用起来太复杂,十全十美的还真没找到。因此我也是放
2022-06-06

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录