我的编程空间,编程开发者的网络收藏夹
学习永远不晚

.NETCore基于RabbitMQ实现延时队列的两方法

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

.NETCore基于RabbitMQ实现延时队列的两方法

前言

此文章用来记录自己学习延时队列过程的文章,并用.NET这两种方式实现了简单的Demo。

延时队列的应用场景 应用下单后,30分钟没有支付的话,则自动取消订单活动开始前30分钟,提醒参赛者参加活动。活动结束后,30分钟后提醒未进行评价的参赛人员进行评价…

上述的场景都可以使用延时队列进行对应的处理。

上面的场景虽说可以通过定时器也可以处理,但有点浪费资源, 而上述的场景时间是不定的,例如有两个活动需要提醒参赛者参加,一个是7点开始 ,另一个是8点开始,那么触发处理的一个是6点半,一个是7点半。

实现延时队列的两种方式

使用Rabbitmq实现延时队列可以让消息持久化,也支持分布式

 缺点
第一种第一种方式的缺陷以及解决方案
第二种这个插件的当前设计并不真正适合具有大量延迟消息(例如成百上千或数百万)的场景。详情信息

利用rabbitmq死信队列x-dead-letter-exchange和x-dead-letter-routing-key

实现需要创建两对交换机和队列,其中需要对其中一对的队列进行设置x-dead-letter-exchange和x-dead-letter-routing-key属性,属性指定转发到另一对的交换机,

随后实现流程图如下:

.NETCore实现方式

项目:.NET Core 控制台项目

install-package RabbitMQ.Client

生产者代码:

            ConnectionFactory connectionFactory = new ConnectionFactory
            {
                UserName = "guest",
                Password = "guest",
                HostName = "127.0.0.1"
            };

            //创建连接
            var connection = connectionFactory.CreateConnection();

            //创建通道
            var channl = connection.CreateModel();

           //指定队列的x-dead-letter-exchange和x-dead-letter-routing-key
            Dictionary<string, object> queueArgs = new Dictionary<string, object>()
            {
                { "x-dead-letter-exchange","exchange.business.test" },
                {"x-dead-letter-routing-key","businessRoutingkey" }
            };

            //延时的交换机和队列绑定
            channl.ExchangeDeclare("exchange.business.dlx", "direct", true, false, null);
            channl.QueueDeclare("queue.business.dlx", true, false, false, queueArgs);
            channl.QueueBind("queue.business.dlx", "exchange.business.dlx", "");

            //业务的交换机和队列绑定
            channl.ExchangeDeclare("exchange.business.test", "direct", true, false, null);
            channl.QueueDeclare("queue.business.test", true, false, false, null);
            channl.QueueBind("queue.business.test", "exchange.business.test", "businessRoutingkey", null);

            Console.WriteLine("生产者开始发送消息");
            while (true)
            {

                string message = Console.ReadLine();
                var body = Encoding.UTF8.GetBytes(message);
                var properties = channl.CreateBasicProperties();
                properties.Persistent = true;
                properties.Expiration = "5000";
                //发送一条延时5秒的消息
                channl.BasicPublish("exchange.business.dlx", "", properties, body);

            }

消费者

            ConnectionFactory connectionFactory = new ConnectionFactory
            {
                UserName = "guest",
                Password = "guest",
                HostName = "127.0.0.1"
            };

            //创建连接
            var connection = connectionFactory.CreateConnection();

            var channel = connection.CreateModel();

            EventingBasicConsumer consumer = new EventingBasicConsumer(channel);

            //给消费时添加一个委托
            consumer.Received += (obj, ea) =>
            {
                var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                //打印消费的消息
                Console.WriteLine(message);
                channel.BasicAck(ea.DeliveryTag, false);
            };

            //消费queue.business.test队列的消息
            channel.BasicConsume("queue.business.test", false, consumer);

            Console.ReadKey();
            channel.Dispose();
            connection.Close();

实现效果:

rabbitmq通过安装插件的形式实现(推荐)

使用rabbitmq_delayed_message_exchange 插件提供的x-delayed-message类型的交换机

下载插件的地址:https://www.rabbitmq.com/community-plugins.html
选中rabbitmq_delayed_message_exchange插件

该插件使用只需要声明交换机的时候,指定x-delayed-message类型,然后添加x-delayed-type参数即可

.NET Core 实现

生产者

            ConnectionFactory connectionFactory = new ConnectionFactory()
            {
                UserName = "guest",
                Password = "guest",
                HostName = "127.0.0.1"
            };

            var connection = connectionFactory.CreateConnection();

            var channel = connection.CreateModel();

            Dictionary<string, object> exchangeArgs = new Dictionary<string, object>()
            {
                {"x-delayed-type","direct" }
            };

            //指定x-delayed-message 类型的交换机,并且添加x-delayed-type属性
            channel.ExchangeDeclare("plug.delay.exchange", "x-delayed-message", true, false, exchangeArgs);

            channel.QueueDeclare("plug.delay.queue", true, false, false, null);

            channel.QueueBind("plug.delay.queue", "plug.delay.exchange", "plugdelay");

            var properties = channel.CreateBasicProperties();
            Console.WriteLine("生产者开始发送消息");
            Dictionary<string, object> headers = new Dictionary<string, object>()
            {
                {"x-delay","5000" }
            };
            properties.Persistent = true;
            properties.Headers = headers;
            while (true)
            {

                string message = Console.ReadLine();
                var body = Encoding.UTF8.GetBytes(message);
                channel.BasicPublish("plug.delay.exchange", "plugdelay", properties, body);

            }

消费者:

            ConnectionFactory connectionFactory = new ConnectionFactory
            {
                UserName = "guest",
                Password = "guest",
                HostName = "127.0.0.1"
            };

            //创建连接
            var connection = connectionFactory.CreateConnection();

            var channel = connection.CreateModel();

            EventingBasicConsumer consumer = new EventingBasicConsumer(channel);

            consumer.Received += (obj, ea) =>
            {
                var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                Console.WriteLine(message);
                channel.BasicAck(ea.DeliveryTag, false);
            };

            channel.BasicConsume("plug.delay.queue", false, consumer);

            Console.ReadKey();
            channel.Dispose();
            connection.Close();

实现效果:

第一种方式的缺陷以及解决方案

如果存在A、B消息进入了队列中,A在前,B在后,如果B消息的过期时间比A的过期时间要早,消费的时候,并不会先消费B,再消费A,而是B会等A先消费,即使A要晚过期

举例

生产者代码修改成如下:

            ConnectionFactory connectionFactory = new ConnectionFactory
            {
                UserName = "guest",
                Password = "guest",
                HostName = "127.0.0.1"
            };

            //创建连接
            var connection = connectionFactory.CreateConnection();

            //创建通道
            var channl = connection.CreateModel();

            Dictionary<string, object> queueArgs = new Dictionary<string, object>()
            {
                { "x-dead-letter-exchange","exchange.business.test" },
                {"x-dead-letter-routing-key","businessRoutingkey" }
            };

            //延时的交换机和队列绑定
            channl.ExchangeDeclare("exchange.business.dlx", "direct", true, false, null);
            channl.QueueDeclare("queue.business.dlx", true, false, false, queueArgs);
            channl.QueueBind("queue.business.dlx", "exchange.business.dlx", "");

            //业务的交换机和队列绑定
            channl.ExchangeDeclare("exchange.business.test", "direct", true, false, null);
            channl.QueueDeclare("queue.business.test", true, false, false, null);
            channl.QueueBind("queue.business.test", "exchange.business.test", "businessRoutingkey", null);

            string message1 = "Hello Word!1";
            string message2 = "Hello Word!2";
            var body1 = Encoding.UTF8.GetBytes(message1);
            var body2 = Encoding.UTF8.GetBytes(message2);
            var properties = channl.CreateBasicProperties();
            properties.Persistent = true;
            //先发送过期时间5秒的消息
            properties.Expiration = "5000";
            channl.BasicPublish("exchange.business.dlx", "", properties, body2);

            //再发送过期时间3秒的消息
            properties.Expiration = "3000";
            channl.BasicPublish("exchange.business.dlx", "", properties, body1);

结果:

这里先发了延时20秒的A消息,然后又发了延时10秒的B消息,但是最终结果并不是先消费了B消息,而是等A消息过期后,立刻再去消费B。

这个会影响什么业务呢?好比两个C、D活动,C活动开始时间是7点,D活动开始时间是5点,那么D活动提醒需要等到C活动提醒后,才会立刻提醒,这明显不符合我们的业务需求。

解决方案 每个活动都是单独的创建自己的交换机和队列使用第二种实现方式,即使用插件的形式。

第一种不太现实,因为如果活动多的话,则会创建很多的队列,而且只会使用一次。

业务上还是推荐使用插件的实现方式。

第二种方式的效果

github地址:

https://github.com/MDZZ3/RabbitmqDelay

到此这篇关于.NETCore基于RabbitMQ实现延时队列的两方法的文章就介绍到这了,更多相关.NETCore RabbitMQ 内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

.NETCore基于RabbitMQ实现延时队列的两方法

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

RabbitMQ实现延迟队列的两种方式分别是什么

这期内容当中小编将会给大家带来有关RabbitMQ实现延迟队列的两种方式分别是什么,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。定时任务各种各样,常见的定时任务例如日志备份,我们可能在每天凌晨 3 点去备
2023-06-22

基于Redis实现延时队列的优化方案小结

目录一、延时队列的应用二、延时队列的实编程客栈现三、总结一、延时队列的应用近期在开发部门的新项目,其中有个关键功能就是智能推送,即根据用户行为在特定的时间点向用户推送相应的提醒消息,比如以下业务场景:在用户点击充值项后,半小时内未充值,
2022-07-05

Golang实现基于Redis的可靠延迟队列

目录前言原理详解pending2ReadyScriptready2UnackScriptunack2RetryScriptackconsume前言在之前探讨延时队列的文章中我们提到了 redisson delayqueue 使用 redi
2022-06-22

如何利用rabbitMq的死信队列实现延时消息

这篇文章主要介绍了如何利用rabbitMq的死信队列实现延时消息问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
2023-01-28

基于Java数组实现循环队列的两种方法小结

用java实现循环队列的方法:1、添加一个属性size用来记录眼下的元素个数。目的是当head=rear的时候。通过size=0还是size=数组长度。来区分队列为空,或者队列已满。2、数组中仅仅存储数组大小-1个元素,保证rear转一圈之
2023-05-30

百行代码实现基于Redis的可靠延迟队列

目录原理详解pending2ReadyScriptready2UnackScriptunack2RetryScriptackconsume在之前探讨延时队列的文章中我们提到了 redisson delayqueue 使用 redis 的有序
2022-06-23

Redis实现延迟队列的方法是什么

这篇文章主要介绍“Redis实现延迟队列的方法是什么”,在日常操作中,相信很多人在Redis实现延迟队列的方法是什么问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Redis实现延迟队列的方法是什么”的疑惑有所
2023-07-05

Java实现异步延迟队列的方法详解

目前系统中有很多需要用到延时处理的功能,本文就为大家介绍了Java实现异步延迟队列的方法,文中的示例代码讲解详细,需要的可以参考一下
2023-03-22

Redis优雅地实现延迟队列的方法分享

目录前言使用依赖配置配置文件demo代码执行效果原理分析队列创建生产者消费者整个流程总结思考前言工作中常常会遇到这样的场景,如订单到期未支付取消,到期自动续费等,我们发现延迟队列非常适合在这样的场景中使用。常见的延迟队列的优秀实现有rab
2023-02-26

基于Canal以及消息队列实现MySQL的Binlog近实时同步

基于Canal以及消息队列实现MySQL的Binlog近实时同步 1.canal的应用场景 目前普遍基于日志增量订阅和消费的业务,主要包括 基于数据库增量日志解析,提供增量数据订阅和消费数据库镜像数据库实时备份索引构建和实时维护(拆分异构索
2023-08-21

Python实现基本数据结构中队列的操作方法示例

本文实例讲述了Python实现基本数据结构中队列的操作方法。分享给大家供大家参考,具体如下:#! /usr/bin/env python #coding=utf-8 class Queue(object):def __init__(self
2022-06-04

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录