我的编程空间,编程开发者的网络收藏夹
学习永远不晚

Spring Boot与RabbitMQ结合实现延迟队列的示例

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

Spring Boot与RabbitMQ结合实现延迟队列的示例

背景

何为延迟队列?

顾名思义,延迟队列就是进入该队列的消息会被延迟消费的队列。而一般的队列,消息一旦入队了之后就会被消费者马上消费。

场景一:在订单系统中,一个用户下单之后通常有30分钟的时间进行支付,如果30分钟之内没有支付成功,那么这个订单将进行一场处理。这是就可以使用延时队列将订单信息发送到延时队列。

场景二:用户希望通过手机远程遥控家里的智能设备在指定的时间进行工作。这时候就可以将用户指令发送到延时队列,当指令设定的时间到了再将指令推送到只能设备。

延迟队列能做什么?

延迟队列多用于需要延迟工作的场景。最常见的是以下两种场景:

延迟消费。比如:

  1. 用户生成订单之后,需要过一段时间校验订单的支付状态,如果订单仍未支付则需要及时地关闭订单。
  2. 用户注册成功之后,需要过一段时间比如一周后校验用户的使用情况,如果发现用户活跃度较低,则发送邮件或者短信来提醒用户使用。

延迟重试。比如消费者从队列里消费消息时失败了,但是想要延迟一段时间后自动重试。

如果不使用延迟队列,那么我们只能通过一个轮询扫描程序去完成。这种方案既不优雅,也不方便做成统一的服务便于开发人员使用。但是使用延迟队列的话,我们就可以轻而易举地完成。

如何实现?

别急,在下文中,我们将详细介绍如何利用Spring Boot加RabbitMQ来实现延迟队列。

本文出现的示例代码都已push到Github仓库中:https://github.com/Lovelcp/blog-demos/tree/master/spring-boot-rabbitmq-delay-queue

实现思路

在介绍具体的实现思路之前,我们先来介绍一下RabbitMQ的两个特性,一个是Time-To-Live Extensions,另一个是Dead Letter Exchanges。

Time-To-Live Extensions

RabbitMQ允许我们为消息或者队列设置TTL(time to live),也就是过期时间。TTL表明了一条消息可在队列中存活的最大时间,单位为毫秒。也就是说,当某条消息被设置了TTL或者当某条消息进入了设置了TTL的队列时,这条消息会在经过TTL秒后“死亡”,成为Dead Letter。如果既配置了消息的TTL,又配置了队列的TTL,那么较小的那个值会被取用。更多资料请查阅官方文档。

Dead Letter Exchange

刚才提到了,被设置了TTL的消息在过期后会成为Dead Letter。其实在RabbitMQ中,一共有三种消息的“死亡”形式:

  1. 消息被拒绝。通过调用basic.reject或者basic.nack并且设置的requeue参数为false。
  2. 消息因为设置了TTL而过期。
  3. 消息进入了一条已经达到最大长度的队列。

如果队列设置了Dead Letter Exchange(DLX),那么这些Dead Letter就会被重新publish到Dead Letter Exchange,通过Dead Letter Exchange路由到其他队列。更多资料请查阅官方文档。

流程图

聪明的你肯定已经想到了,如何将RabbitMQ的TTL和DLX特性结合在一起,实现一个延迟队列。

针对于上述的延迟队列的两个场景,我们分别有以下两种流程图:

延迟消费

延迟消费是延迟队列最为常用的使用模式。如下图所示,生产者产生的消息首先会进入缓冲队列(图中红色队列)。通过RabbitMQ提供的TTL扩展,这些消息会被设置过期时间,也就是延迟消费的时间。等消息过期之后,这些消息会通过配置好的DLX转发到实际消费队列(图中蓝色队列),以此达到延迟消费的效果。

 Spring Boot与RabbitMQ结合实现延迟队列的示例

延迟重试

延迟重试本质上也是延迟消费的一种,但是这种模式的结构与普通的延迟消费的流程图较为不同,所以单独拎出来介绍。

如下图所示,消费者发现该消息处理出现了异常,比如是因为网络波动引起的异常。那么如果不等待一段时间,直接就重试的话,很可能会导致在这期间内一直无法成功,造成一定的资源浪费。那么我们可以将其先放在缓冲队列中(图中红色队列),等消息经过一段的延迟时间后再次进入实际消费队列中(图中蓝色队列),此时由于已经过了“较长”的时间了,异常的一些波动通常已经恢复,这些消息可以被正常地消费。

Spring Boot与RabbitMQ结合实现延迟队列的示例  

代码实现

接下来我们将介绍如何在Spring Boot中实现基于RabbitMQ的延迟队列。我们假设读者已经拥有了Spring Boot与RabbitMQ的基本知识。

初始化工程

首先我们在Intellij中创建一个Spring Boot工程,并且添加spring-boot-starter-amqp扩展。

配置队列

从上述的流程图中我们可以看到,一个延迟队列的实现,需要一个缓冲队列以及一个实际的消费队列。又由于在RabbitMQ中,我们拥有两种消息过期的配置方式,所以在代码中,我们一共配置了三条队列:

  1. delay_queue_per_message_ttl:TTL配置在消息上的缓冲队列。
  2. delay_queue_per_queue_ttl:TTL配置在队列上的缓冲队列。
  3. delay_process_queue:实际消费队列。

我们通过Java Config的方式将上述的队列配置为Bean。由于我们添加了spring-boot-starter-amqp扩展,Spring Boot在启动时会根据我们的配置自动创建这些队列。为了方便接下来的测试,我们将delay_queue_per_message_ttl以及delay_queue_per_queue_ttl的DLX配置为同一个,且过期的消息都会通过DLX转发到delay_process_queue。

delay_queue_per_message_ttl

首先介绍delay_queue_per_message_ttl的配置代码:

@BeanQueue delayQueuePerMessageTTL() {  return QueueBuilder.durable(DELAY_QUEUE_PER_MESSAGE_TTL_NAME)            .withArgument("x-dead-letter-exchange", DELAY_EXCHANGE_NAME) // DLX,dead letter发送到的exchange            .withArgument("x-dead-letter-routing-key", DELAY_PROCESS_QUEUE_NAME) // dead letter携带的routing key            .build();}

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

Spring Boot与RabbitMQ结合实现延迟队列的示例

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

Spring Boot与RabbitMQ结合实现延迟队列的示例

背景何为延迟队列?顾名思义,延迟队列就是进入该队列的消息会被延迟消费的队列。而一般的队列,消息一旦入队了之后就会被消费者马上消费。场景一:在订单系统中,一个用户下单之后通常有30分钟的时间进行支付,如果30分钟之内没有支付成功,那么这个订单
2023-05-30

SpringBoot整合RabbitMQ实现延迟队列的示例详解

这篇文章主要为大家详细介绍了SpringBoot如何整合RabbitMQ实现延迟队列,文中的示例代码讲解详细,具有一定的学习价值,感兴趣的可以了解一下
2023-05-16

SpringBoot实现redis延迟队列的示例代码

本篇文章介绍了SpringBoot实现Redis延迟队列的示例代码,采用了zset有序集合和list类型。入队任务时设置时间戳score,时间戳到达时任务从zset弹出执行,同时从list中移除。定时任务定时执行任务和移除过期任务,保证队列的正常运作。需要注意redisTemplate的连接池配置、定时任务执行间隔、过期任务清理策略等细节。
SpringBoot实现redis延迟队列的示例代码
2024-04-02

消息队列 RabbitMQ 与 Spring 整合使用的实例代码

一、什么是 RabbitMQRabbitMQ 是实现 AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。消息中间件主要用于组件之间的解耦,消息的发送
2023-05-31

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录