我的编程空间,编程开发者的网络收藏夹
学习永远不晚

RocketMQ消息过滤与查询的实现

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

RocketMQ消息过滤与查询的实现

消息过滤

RocketMQ分布式消息队列的消息过滤方式有别于其它MQ中间件,是在Consumer端订阅消息时再做消息过滤的。

RocketMQ这么做是还是在于其Producer端写入消息和Consomer端订阅消息采用分离存储的机制来实现的,Consumer端订阅消息是需要通过ConsumeQueue这个消息消费的逻辑队列拿到一个索引,然后再从CommitLog里面读取真正的消息实体内容,所以说到底也是还绕不开其存储结构。

其ConsumeQueue的存储结构如下,可以看到其中有8个字节存储的Message Tag的哈希值,基于Tag的消息过滤正式基于这个字段值的。

主要支持如下2种的过滤方式

(1) Tag过滤方式:

Consumer端在订阅消息时除了指定Topic还可以指定TAG,如果一个消息有多个TAG,可以用||分隔。

其中,Consumer端会将这个订阅请求构建成一个 SubscriptionData,发送一个Pull消息的请求给Broker端。Broker端从RocketMQ的文件存储层—Store读取数据之前,会用这些数据先构建一个MessageFilter,然后传给Store。

Store从 ConsumeQueue读取到一条记录后,会用它记录的消息tag hash值去做过滤,由于在服务端只是根据hashcode进行判断,无法精确对tag原始字符串进行过滤,故在消息消费端拉取到消息后,还需要对消息的原始tag字符串进行比对,如果不同,则丢弃该消息,不进行消息消费。

(2) SQL92的过滤方式:

这种方式的大致做法和上面的Tag过滤方式一样,只是在Store层的具体过滤过程不太一样,真正的 SQL expression 的构建和执行由rocketmq-filter模块负责的。

每次过滤都去执行SQL表达式会影响效率,所以RocketMQ使用了BloomFilter避免了每次都去执行。SQL92的表达式上下文为消息的属性。

消息查询

RocketMQ支持按照下面两种维度(“按照Message Id查询消息”、“按照Message Key查询消息”)进行消息查询。

按照MessageId查询消息

RocketMQ中的MessageId的长度总共有16字节,其中包含了消息存储主机地址(IP地址和端口),消息Commit Log offset。

“按照MessageId查询消息”在RocketMQ中具体做法是:Client端从MessageId中解析出Broker的地址(IP地址和端口)和Commit Log的偏移地址后封装成一个RPC请求后通过Remoting通信层发送(业务请求码:VIEW_MESSAGE_BY_ID)。

Broker端走的是QueryMessageProcessor,读取消息的过程用其中的 commitLog offset 和 size 去 commitLog 中找到真正的记录并解析成一个完整的消息返回。

按照Message Key查询消息

“按照Message Key查询消息”,主要是基于RocketMQ的IndexFile索引文件来实现的。RocketMQ的索引文件逻辑结构,类似JDK中HashMap的实现。

索引文件的具体结构如下:

IndexFile索引文件为用户提供通过“按照Message Key查询消息”的消息索引查询服务,IndexFile文件的存储位置是:$HOME\store\index\${fileName},文件名fileName是以创建时的时间戳命名的,文件大小是固定的,等于40+500W\*4+2000W\*20= 420000040个字节大小。

如果消息的properties中设置了UNIQ_KEY这个属性,就用 topic + “#” + UNIQ_KEY的value作为 key 来做写入操作。

如果消息设置了KEYS属性(多个KEY以空格分隔),也会用 topic + “#” + KEY 来做索引。

其中的索引数据包含了Key Hash/CommitLog Offset/Timestamp/NextIndex offset 这四个字段,一共20 Byte。

NextIndex offset 即前面读出来的 slotValue,如果有 hash冲突,就可以用这个字段将所有冲突的索引用链表的方式串起来了。

Timestamp记录的是消息storeTimestamp之间的差,并不是一个绝对的时间。整个Index File的结构如图,40 Byte 的Header用于保存一些总的统计信息,4\*500W的 Slot Table并不保存真正的索引数据,而是保存每个槽位对应的单向链表的头。

20\*2000W 是真正的索引数据,即一个 Index File 可以保存 2000W个索引。

“按照Message Key查询消息”的方式,RocketMQ的具体做法是,主要通过Broker端的QueryMessageProcessor业务处理器来查询,读取消息的过程就是用topic和key找到IndexFile索引文件中的一条记录,根据其中的commitLog offset从CommitLog文件中读取消息的实体内容。

以上为个人经验,希望能给大家一个参考,也希望大家多多支持编程网。

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

RocketMQ消息过滤与查询的实现

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

队列在PHP与MySQL中的消息过滤和消息路由的实现方法

随着互联网的快速发展,消息队列(Message Queue)作为一种重要的通信机制,在Web开发中扮演着至关重要的角色。消息队列可以用于实现解耦、削峰填谷、异步处理等功能。本文将介绍在PHP与MySQL中如何实现消息过滤和消息路由,并提供具
2023-10-21

SQL汇总统计与GROUPBY过滤查询实现

这篇文章主要介绍了SQL汇总统计与GROUPBY过滤查询实现,GROUPBY实质是先排序后分组,遵照索引建的最佳左前缀。当无法使用索引时,增大max_length_for_sort_data和sort_buffer参数的值
2023-01-05

SQL汇总统计与GROUP BY过滤查询实现

目录1、汇总统计2、GROUT BY3、如何对分组统计的结果进行过滤4、如何对分组统计的结果进行排序5、介绍SELECT语句中各个子句的书写顺序6、上方用到的表1、汇总统计介绍几个聚集函数有多少名学生SELECT COUNT(*) FR
2023-01-05

Springboot中RocketMQ怎么实现消息发送与接收

本文小编为大家详细介绍“Springboot中RocketMQ怎么实现消息发送与接收”,内容详细,步骤清晰,细节处理妥当,希望这篇“Springboot中RocketMQ怎么实现消息发送与接收”文章能帮助大家解决疑惑,下面跟着小编的思路慢慢
2023-07-02

MongoDB Driver 中通过过滤器实现文档查询

一、举例:在物流币点击记录里面查询出 同一IP,同一条线路,同一页面,四小时内点击次数    思路:1、根据当前查询的时间将时间回退到四小时前,然后将四小时前的时间转换为时间戳,然后组合其它条件查询 出大于对应时间戳的所有记录数。 string TheTi

	MongoDB Driver  中通过过滤器实现文档查询
2017-11-06

如何实现一个Laravel查询过滤器

今天小编给大家分享一下如何实现一个Laravel查询过滤器的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解一下吧。上下文在撰写本文时,
2023-07-05

队列技术在PHP与MySQL中的消息过滤和消息订阅的应用

随着互联网的快速发展,消息处理已成为各种应用程序中至关重要的一环。在PHP与MySQL的数据库环境下,队列技术的应用可以满足消息的高效处理需求。本文将介绍队列技术在PHP与MySQL中消息过滤和消息订阅的应用,并给出具体的代码示例。一、消息
2023-10-21

队列技术在PHP与MySQL中的消息过滤和消息中间件的应用

引言:随着互联网的高速发展,我们的应用程序不再仅仅是简单的网页,而是涉及到大量的异步任务和消息传递。为了使我们的应用程序更加健壮和高效,队列技术成为了一个必不可少的工具。本文将介绍队列技术在PHP与MySQL中的消息过滤和消息中间件的应用,
2023-10-21

J-Hi查询过滤器的实现原理是什么

本篇文章给大家分享的是有关J-Hi查询过滤器的实现原理是什么,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。J-Hi设计自己的查询过滤器而没有直接采用Hibernate的Crit
2023-06-17

在Storm中如何实现消息过滤和路由功能

在Storm中,可以通过定义Bolt来实现消息过滤和路由功能。具体步骤如下:创建一个过滤器Bolt来处理消息过滤功能。在这个Bolt中,可以根据消息的内容或者特定的条件来判断是否需要处理该消息。如果需要处理,则可以继续传递消息;如果不需要处
在Storm中如何实现消息过滤和路由功能
2024-03-11

RocketMQ Broker实现高可用高并发的消息中转服务

RocketMQ消息代理(Broker)是一种高可用、高并发的消息中转服务,能够接收并存储生产者发送的消息,并将消息发送给消费者。它具有多种消息存储模式和消息传递模式,支持水平扩展和故障转移等特性,可以为分布式应用提供可靠的消息传递服务
2023-05-17

Xunsearch搜索的查询优化与结果过滤(如何优化Xunsearch的查询结果?)

Xunsearch查询优化与结果过滤本文详解Xunsearch搜索引擎的查询优化和结果过滤技巧,包括:查询优化:优化词库、查询语法、排序、缓存结果过滤:关键词高亮、结果分组、过滤条件、结果分页、结果排序、结果摘要、结果去重优化技巧:索引优化、查询并发控制、日志分析、集群部署通过优化查询和过滤,可提升Xunsearch查询性能和结果质量,为用户提供高效、精准的搜索体验。
Xunsearch搜索的查询优化与结果过滤(如何优化Xunsearch的查询结果?)
2024-04-02

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录