我的编程空间,编程开发者的网络收藏夹
学习永远不晚

RocketMQ Broker如何保存消息源码解析

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

RocketMQ Broker如何保存消息源码解析

前言

前面我们介绍了RocketMQ是如何接收消息的,下面我们来介绍Broker是如何保存消息的。

消息存储格式总览

Broker消息存储主要包括CommitLog,ConsumerQueue和Index三个部分。

  • CommitLog

CommitLog主要用于消息存储,所有topic的消息按顺序都存储在CommitLog中。

  • ConsumerQueue

ConsumerQueue对应消费队列,消息存储到CommitLog后,会异步转发到ConsumerQueue文件中

  • Index

消息索引,只要存储消息key与offset的关系

CommitLog介绍

CommitLog是消息和消息数据存储的主体,CommitLog存储的文件目录在${user.home}/store/commitlog中,它其实是一个目录,消息并不是直接存储在CommitLog中,而是存储在由20位数字构成的文件中。

MappedFile详解

commitlog文件夹中文件单元是MappedFile,我们可以把MappedFile理解成一个文件管理的工具,如果需要将数据存储到磁盘,或者快速查找数据,都可以通过MappedFile。

每个MappedFile文件大小默认是1GB,文件名是由20位数字构成,文件名其实是MappedFile的起始偏移量,如果偏移量不足20位,则将偏移量的左边补0。上图中MappedFile的文件名是00000000000000000000,它代表的是CommitLog中的第一个文件,由于每个MappedFile文件大小是1GB,因此第二个文件的偏移量为1024*1024*1024(1GB),计算后的结果为1073741824,因此第二个文件的文件名为00000000001073741824,可依此类推其他文件的文件名。

消息存储格式介绍

消息在commitLog中存储的格式如下所示

  • totalSize

消息总长度,4字节

  • magicCode

魔数,4字节,固定值十六进制是0xdaa320a7,10进制是-875286124

  • bodyCRC

消息体crc校验码,4字节

  • queueId

消息队列id,4字节

  • flag

消息标记,RocketMQ不做处理,默认4字节

  • queueOffset

消息在ConsumeQueue文件中的物理偏移量,默认8字节

  • physicalOffset

消息在CommitLog文件中的物理偏移量,默认8字节

  • sysFlag

消息系统标记,例如是否压缩、是否是事务消息等,4字节

  • bornTimestamp

消息生产者调用消息API的时间戳,8字节

  • bornHost

BORNHOST 消息生产者IP和端口号,8字节

  • storeTimestamp

消息存储时间戳,8字节

  • storeHostAddress

STOREHOSTADDRESS 消息存储Broker的IP和端口号,8字节

  • reconsumeTimes

消息重试次数 4字节

  • Prepared Transaction Offset

事务消息偏移量,8字节

  • bodyLength

消息体长度,4字节

  • body

消息体内容,它是变长的,长度为bodyLength中存储的值

  • TopicLength

topicLength表示topic占用的长度,topicLength占用1字节,也就是255,也就是说topic长度最长不能超过255字节

  • Topic

topic是消息主题名称,topic是变长的,实际占用topicLength字节

  • PropertiesLength

propertiesLength表示properties占用的长度,propertiesLength占用2字节,也就是说properties长度最长不超过65536字节

  • Properties

properties是消息属性,properties是变长的,实际占用propertiesLength字节

DefaultMessageStore介绍

Broker保存消息是通过消息存储默认实现类org.apache.rocketmq.store.DefaultMessageStore执行的,它是Broker存储模块中最最最重要的一个类,提供了很多存储文件的API。DefaultMessageStore中和消息存储相关的属性如下所示,

// 消息存储配置
private final MessageStoreConfig messageStoreConfig;
// CommitLog文件的存储实现类
private final CommitLog commitLog;
// 消息队列存储缓存表,key是topic
private final ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> consumeQueueTable;
// MappedFile分配服务
private final AllocateMappedFileService allocateMappedFileService;
// 直接内存暂存池
private final TransientStorePool transientStorePool;
// broker状态管理器
private final BrokerStatsManager brokerStatsManager;
// 锁文件
// 目录: ${user.home}/store/lock
private RandomAccessFile lockFile;

消息存储源码分析

发送消息存储流程

发送消息存储的入口函数是DefaultMessageStore#asyncPutMessage,它主要分为下面三步

  • 存储状态校验
  • 校验消息存储服务是否关闭,当前Broker是否是从节点,queue是否可写
  • 消息校验
  • 校验topic名称长度是否超过了127字节和property长度是否超过了32767
  • 将消息保存到commitLog
// org.apache.rocketmq.store.DefaultMessageStore#asyncPutMessage
public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg) {
    // 1. 存储状态校验
    PutMessageStatus checkStoreStatus = this.checkStoreStatus();
    if (checkStoreStatus != PutMessageStatus.PUT_OK) {
        return CompletableFuture.completedFuture(new PutMessageResult(checkStoreStatus, null));
    }
    // 2. 校验topic名称和property长度
    PutMessageStatus msgCheckStatus = this.checkMessage(msg);
    if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {
        return CompletableFuture.completedFuture(new PutMessageResult(msgCheckStatus, null));
    }
    // ...
    long beginTime = this.getSystemClock().now();
    // 3. 保存到commitLog
    CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg);
    //...
    return putResultFuture;
}

CommitLog#asyncPutMessage保存消息

CommitLog#asyncPutMessage保存消息可以分为三个阶段

  • 消息预处理阶段
  • 消息保存阶段
  • 消息保存结果处理阶段

消息预处理阶段

消息预处理阶段可以分为下面三个步骤

  • 设置消息存储时间戳和消息体CSC32信息
  • 如果是延迟消息,则设置延迟信息

如果是非事务消息或者是提交的事务消息,并且设置了消息的延迟级别,说明当前消息是延迟消息,Broker在处理延迟消息时会将消息投递到名为SCHEDULE_TOPIC_XXXX的Topic。在消息预处理的阶段,会先将当前消息的topic设置为SCHEDULE_TOPIC_XXXX,queueId设置为延迟级别-1,并且将原来的Topic和queueId设置到消息的REAL_TOPICREAL_QID属性中。

  • 设置ip及构建存储消息上下文
// org.apache.rocketmq.store.CommitLog#asyncPutMessage
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
    // 1. 设置消息存储时间戳和消息体CSC32信息
    msg.setStoreTimestamp(System.currentTimeMillis());     // 设置消息存储时间
    msg.setBodyCRC(UtilAll.crc32(msg.getBody()));					 // 设置消息体CRC32校验值
    // 2. 如果是非事务消息,或者是事务提交消息,判断是否是是否是延迟消息,如果是延迟消息则设置延迟相关信息
    if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
            || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
        // Delay Delivery
        // 如果延迟级别>0,说明是延迟消息
        if (msg.getDelayTimeLevel() > 0) {
            // 如果大于最大的延迟级别,则取最大的延迟级别
            if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
            }
            // 消息topic改成延迟消息topic(SCHEDULE_TOPIC_XXXX)
            topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
            // 延迟topic的queueId:延迟级别-1
            int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
            // 消息属性中设置真实的QueueId
            MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
            MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
            msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
            // 把SCHEDULE_TOPIC_XXXX设置为当前消息的topic,消息先投递到这个队列中
            msg.setTopic(topic);
            msg.setQueueId(queueId);
        }
    }
  	// 3. 设置ip并构建存储消息上下文信息
    msg.setBornHostV6Flag(); // 如果producer的ip是IpV6,则设置生产者IpV6 flag
    msg.setStoreHostAddressV6Flag(); // 如果如果broker的ip是IpV6,则设置BrokerIpV6 flag
    PutMessageThreadLocal putMessageThreadLocal = this.putMessageThreadLocal.get();
    // 构建存消息上下文
    PutMessageContext putMessageContext = new PutMessageContext(generateKey(putMessageThreadLocal.getKeyBuilder(), msg));
  	// ... 省略部分代码
}

消息保存阶段

消息保存阶段可以分为如下步骤

  • 获取消息保存锁
  • 获取最新的mappedFile

获取MappedFile调用的是MappedFileQueue中的方法,获取最新的MappedFile

  • 如果最新的mappedFile为空或者已经满了,则创建新的MappedFile
  • 将消息保存的mappedFile中
  • 处理消息保存结果
  • 释放消息保存锁
// org.apache.rocketmq.store.CommitLog#asyncPutMessage
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
    // ... 省略部分代码
  	// 1. 消息保存锁,默认是ReentrantLock互斥锁
    putMessageLock.lock(); 
    try {
        // 2. 获取最新的mappedFile
        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
        // 3. 如果获取到的mappedFile是null说明之前没有存储消息
        // 如果mappedFile满了,说明需要创建一个新的MappedFile
        if (null == mappedFile || mappedFile.isFull()) {
            mappedFile = this.mappedFileQueue.getLastMappedFile(0); 
        }
				// 如果创建mappedFile失败,则返回异常信息
        if (null == mappedFile) {
            // 创建mappedFile失败
            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
        }
        // 4. 将消息保存的mappedFile中
        result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
        // 5. 处理消息保存结果
      	switch (result.getStatus()) {
            case PUT_OK:
                break;
            // mappedFile满了,重新创建mappedFile后再写入消息
            case END_OF_FILE:
                unlockMappedFile = mappedFile;
                // 创建一个新的文件,然后重新写入
                mappedFile = this.mappedFileQueue.getLastMappedFile(0);
								//...
     						// 写消息
                result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
                break;
            // ...
        }
    } finally {
      	// 6. 释放锁
        putMessageLock.unlock();
    }
		// ... 省略部分代码
}

上面第4步MappedFile#appendMessage逻辑主要有三步

  • 获取当前写文件位置

如果写指针小于文件大小,则对消息进行追加处理

  • 获取写缓冲

  • 调用AppendMessageCallback的doAppend将消息写到内存缓冲中

回调函数doAppend方法分为单条处理逻辑和批量消息处理逻辑,下面仅展示了单条消息处理逻辑

  • 消息保存完成后会更新当前写文件的位置和消息保存时间戳
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb,
        PutMessageContext putMessageContext) {
    // 获取当前写文件位置
    int currentPos = this.wrotePosition.get();
    // 如果写文件位置小于文件size
    if (currentPos < this.fileSize) {
        ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
        byteBuffer.position(currentPos);
        AppendMessageResult result;
        // 如果是单条消息
        if (messageExt instanceof MessageExtBrokerInner) {
            result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,(MessageExtBrokerInner) messageExt, putMessageContext);
        } 
        //...
        // 更新当前写文件位置和消息保存时间戳
        this.wrotePosition.addAndGet(result.getWroteBytes());
        this.storeTimestamp = result.getStoreTimestamp();
        return result;
    }
}

上面保存消息回调函数中的doAppend实际调用的是CommitLog中内部类DefaultAppendMessageCallback的doAppend方法,这里大致可以分为下面几个步骤

  • 获取消息物理偏移量,并且创建消息id生成器,从topicQueueTable中获取Queue的最大相对便宜量。

消息id的格式如下所示,它由ip,端口和消息偏移量公共构成,长度是16字节,为了保证消息的可读性,返回给应用程序的Id转成了字符串。

消息id这么设计的原因是可以根据消息id快速找到broker的IP,端口,以及消息在的物理偏移量,通过它可以快速找到消息

  • 如果消息长度加上消息结束符(8字节)大于maxBlank,则表示该mappedFile已经没有足够的空间保存该消息了,那么就会将消息结束符写入缓冲中,并返回END_OF_FILE,mappedFile消息结束符如下所示

  • 如果空间足够,将queue的相对偏移量,物理偏移量,sysflag,消息创建时间,消息创建ip,消息保存时间及消息体等按照上面消息格式保存到缓冲中。
  • 创建AppendMessageResult对象并返回,它包括消息追加状态、消息写入物理偏移量、消息写入长度、消息ID生成器、消息开始追加的时间戳、消息队列偏移量、消息开始写入的时间戳等属性。
// org.apache.rocketmq.store.CommitLog.DefaultAppendMessageCallback#doAppend
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,final MessageExtBrokerInner msgInner, PutMessageContext putMessageContext) {
    // 1. 物理offset,文件起始offset+写offset
    long wroteOffset = fileFromOffset + byteBuffer.position();
    // 创建消息id supplier
    Supplier<String> msgIdSupplier = () -> {
        int sysflag = msgInner.getSysFlag();
        int msgIdLen = (sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8;
        ByteBuffer msgIdBuffer = ByteBuffer.allocate(msgIdLen);
        MessageExt.socketAddress2ByteBuffer(msgInner.getStoreHost(), msgIdBuffer);
        msgIdBuffer.clear();//because socketAddress2ByteBuffer flip the buffer
        msgIdBuffer.putLong(msgIdLen - 8, wroteOffset);
        return UtilAll.bytes2string(msgIdBuffer.array());
    };
    // topic-ququeId
    String key = putMessageContext.getTopicQueueTableKey();
    // 获取消息queue offset
    Long queueOffset = CommitLog.this.topicQueueTable.get(key);
    // 如果queueOffset是null,则将其置0
    if (null == queueOffset) {
        queueOffset = 0L;
        CommitLog.this.topicQueueTable.put(key, queueOffset);
    }
    // 获取写缓冲
    ByteBuffer preEncodeBuffer = msgInner.getEncodedBuff();
    final int msgLen = preEncodeBuffer.getInt(0);
    // 2. 判断空间是否足够,如果剩余空间不足,则保存TOTAL+MAGICCODE之后,返回BLANK_MAGIC_CODE
    if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
        this.msgStoreItemMemory.clear();
        // 1 TOTALSIZE 写消息总长度
        this.msgStoreItemMemory.putInt(maxBlank);
        // 2 MAGICCODE 写魔数
        this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
        byteBuffer.put(this.msgStoreItemMemory.array(), 0, 8);
        return new AppendMessageResult();
    }
    int pos = 4 + 4 + 4 + 4 + 4;
    // set队列的offset,
    preEncodeBuffer.putLong(pos, queueOffset);
    pos += 8;
    // 设置物理offset: 文件起始offset+当前文件写消息的offset
    preEncodeBuffer.putLong(pos, fileFromOffset + byteBuffer.position());
    int ipLen = (msgInner.getSysFlag() & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
    // set 8 SYSFLAG, 9 BORNTIMESTAMP, 10 BORNHOST, 11 STORETIMESTAMP
    pos += 8 + 4 + 8 + ipLen;
    // 设置存储消息ip地址
    preEncodeBuffer.putLong(pos, msgInner.getStoreTimestamp());
    // 写消息到队列缓冲
    byteBuffer.put(preEncodeBuffer);
    msgInner.setEncodedBuff(null);
  	// 4. 返回消息保存结果
    AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgIdSupplier,
        msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
    return result;
}

消息保存结果处理阶段

消息保存结果处理阶段主要包括下面三个

  • 提交刷盘请求

如果是同步刷盘,则会创建刷盘请求并返回CompleteFuture,如果是异步刷盘,则会唤醒刷盘服务,然后返回消息保存成功的CompleteFuture

  • 提交消息复制请求

如果是同步复制,则创建消息同步请求然后返回CompleteFuture,如果是异步复制则直接放回消息保存成功的CompleteFuture

  • 合并提交刷盘请求和提交消息复制请求

CompleteFuture#thenCombine是将两个CompleteFuture(提交刷盘请求,提交消息复制请求)组合起来,等提交刷盘请求和提交消息复制请求都执行完了之后再执行后续任务

public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
		// ... 省略部分代码
    PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
    // 1. 提交刷盘请求
    CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
    // 2. 提交复制请求
    CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);
    // 3. 合并提交刷盘请求和提交复制请求结果
    return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
        if (flushStatus != PutMessageStatus.PUT_OK) {
            putMessageResult.setPutMessageStatus(flushStatus);
        }
        if (replicaStatus != PutMessageStatus.PUT_OK) {
            putMessageResult.setPutMessageStatus(replicaStatus);
        }
        return putMessageResult;
    });
}

总结

消息保存到commitLog实际上是保存到byteBuffer中,消息是在回调结果时根据配置决定同步/异步刷盘以及同步/异步同步到从节点。消息在这个阶段也并不会将消息分发到comsumeQueue以及Index中。

以上就是RocketMQ | 源码分析】Broker是如何保存消息的?的详细内容,更多关于RocketMQ Broker保存消息的资料请关注编程网其它相关文章!

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

RocketMQ Broker如何保存消息源码解析

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

RocketMQ Broker如何保存消息源码解析

这篇文章主要为大家介绍了RocketMQ源码分析Broker如何保存消息详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
2023-05-19

RocketMQ Broker消息如何刷盘源码解析

这篇文章主要为大家介绍了RocketMQ Broker消息如何刷盘源码解析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
2023-05-19

RocketMQ 源码分析Broker消息刷盘服务

这篇文章主要为大家介绍了RocketMQ 源码分析Broker消息刷盘服务示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
2023-05-19

RocketMQ源码分析之Broker过期消息清理机制

这篇文章主要为大家介绍了RocketMQ源码分析之Broker过期消息清理机制示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
2023-05-19

RocketMQ消息存储文件的加载与恢复机制源码分析

这篇文章主要介绍了RocketMQ源码分析之消息存储文件的加载与恢复机制详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
2023-05-19

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录