Spring Boot与RabbitMQ结合实现延迟队列的示例
背景
何为延迟队列?
顾名思义,延迟队列就是进入该队列的消息会被延迟消费的队列。而一般的队列,消息一旦入队了之后就会被消费者马上消费。
场景一:在订单系统中,一个用户下单之后通常有30分钟的时间进行支付,如果30分钟之内没有支付成功,那么这个订单将进行一场处理。这是就可以使用延时队列将订单信息发送到延时队列。
场景二:用户希望通过手机远程遥控家里的智能设备在指定的时间进行工作。这时候就可以将用户指令发送到延时队列,当指令设定的时间到了再将指令推送到只能设备。
延迟队列能做什么?
延迟队列多用于需要延迟工作的场景。最常见的是以下两种场景:
延迟消费。比如:
- 用户生成订单之后,需要过一段时间校验订单的支付状态,如果订单仍未支付则需要及时地关闭订单。
- 用户注册成功之后,需要过一段时间比如一周后校验用户的使用情况,如果发现用户活跃度较低,则发送邮件或者短信来提醒用户使用。
延迟重试。比如消费者从队列里消费消息时失败了,但是想要延迟一段时间后自动重试。
如果不使用延迟队列,那么我们只能通过一个轮询扫描程序去完成。这种方案既不优雅,也不方便做成统一的服务便于开发人员使用。但是使用延迟队列的话,我们就可以轻而易举地完成。
如何实现?
别急,在下文中,我们将详细介绍如何利用Spring Boot加RabbitMQ来实现延迟队列。
本文出现的示例代码都已push到Github仓库中:https://github.com/Lovelcp/blog-demos/tree/master/spring-boot-rabbitmq-delay-queue
实现思路
在介绍具体的实现思路之前,我们先来介绍一下RabbitMQ的两个特性,一个是Time-To-Live Extensions,另一个是Dead Letter Exchanges。
Time-To-Live Extensions
RabbitMQ允许我们为消息或者队列设置TTL(time to live),也就是过期时间。TTL表明了一条消息可在队列中存活的最大时间,单位为毫秒。也就是说,当某条消息被设置了TTL或者当某条消息进入了设置了TTL的队列时,这条消息会在经过TTL秒后“死亡”,成为Dead Letter。如果既配置了消息的TTL,又配置了队列的TTL,那么较小的那个值会被取用。更多资料请查阅官方文档。
Dead Letter Exchange
刚才提到了,被设置了TTL的消息在过期后会成为Dead Letter。其实在RabbitMQ中,一共有三种消息的“死亡”形式:
- 消息被拒绝。通过调用basic.reject或者basic.nack并且设置的requeue参数为false。
- 消息因为设置了TTL而过期。
- 消息进入了一条已经达到最大长度的队列。
如果队列设置了Dead Letter Exchange(DLX),那么这些Dead Letter就会被重新publish到Dead Letter Exchange,通过Dead Letter Exchange路由到其他队列。更多资料请查阅官方文档。
流程图
聪明的你肯定已经想到了,如何将RabbitMQ的TTL和DLX特性结合在一起,实现一个延迟队列。
针对于上述的延迟队列的两个场景,我们分别有以下两种流程图:
延迟消费
延迟消费是延迟队列最为常用的使用模式。如下图所示,生产者产生的消息首先会进入缓冲队列(图中红色队列)。通过RabbitMQ提供的TTL扩展,这些消息会被设置过期时间,也就是延迟消费的时间。等消息过期之后,这些消息会通过配置好的DLX转发到实际消费队列(图中蓝色队列),以此达到延迟消费的效果。
延迟重试
延迟重试本质上也是延迟消费的一种,但是这种模式的结构与普通的延迟消费的流程图较为不同,所以单独拎出来介绍。
如下图所示,消费者发现该消息处理出现了异常,比如是因为网络波动引起的异常。那么如果不等待一段时间,直接就重试的话,很可能会导致在这期间内一直无法成功,造成一定的资源浪费。那么我们可以将其先放在缓冲队列中(图中红色队列),等消息经过一段的延迟时间后再次进入实际消费队列中(图中蓝色队列),此时由于已经过了“较长”的时间了,异常的一些波动通常已经恢复,这些消息可以被正常地消费。
代码实现
接下来我们将介绍如何在Spring Boot中实现基于RabbitMQ的延迟队列。我们假设读者已经拥有了Spring Boot与RabbitMQ的基本知识。
初始化工程
首先我们在Intellij中创建一个Spring Boot工程,并且添加spring-boot-starter-amqp扩展。
配置队列
从上述的流程图中我们可以看到,一个延迟队列的实现,需要一个缓冲队列以及一个实际的消费队列。又由于在RabbitMQ中,我们拥有两种消息过期的配置方式,所以在代码中,我们一共配置了三条队列:
- delay_queue_per_message_ttl:TTL配置在消息上的缓冲队列。
- delay_queue_per_queue_ttl:TTL配置在队列上的缓冲队列。
- delay_process_queue:实际消费队列。
我们通过Java Config的方式将上述的队列配置为Bean。由于我们添加了spring-boot-starter-amqp扩展,Spring Boot在启动时会根据我们的配置自动创建这些队列。为了方便接下来的测试,我们将delay_queue_per_message_ttl以及delay_queue_per_queue_ttl的DLX配置为同一个,且过期的消息都会通过DLX转发到delay_process_queue。
delay_queue_per_message_ttl
首先介绍delay_queue_per_message_ttl的配置代码:
@BeanQueue delayQueuePerMessageTTL() { return QueueBuilder.durable(DELAY_QUEUE_PER_MESSAGE_TTL_NAME) .withArgument("x-dead-letter-exchange", DELAY_EXCHANGE_NAME) // DLX,dead letter发送到的exchange .withArgument("x-dead-letter-routing-key", DELAY_PROCESS_QUEUE_NAME) // dead letter携带的routing key .build();}
免责声明:
① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。
② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341
Spring Boot与RabbitMQ结合实现延迟队列的示例
下载Word文档到电脑,方便收藏和打印~
相关文章
- 在 Java 中怎样生成随机正方形坐标?(Java中如何生成随机正方形坐标)
- 如何在 Java 中定义 list 并实现反转?(Java定义list怎样实现反转)
- Java 中 SimpleDateFormat 如何巧妙处理闰年?(Java SimpleDateFormat如何处理闰年)
- Java 中 BeanUtils 工具类常用方法有哪些?(Java BeanUtils工具类常用方法有哪些)
- 如何在 Java 数据分析中应用 ARIMA 模型?(ARIMA模型在Java数据分析中的应用)
- 如何使用 Java 的 Scanner 读取二进制文件?(Java的Scanner如何读取二进制文件)
- 在 Java 中如何进行变量的声明?(Java中怎么声明一个变量)
- 如何将 Java 父类强制转换成子类?(java父类怎么强制转换成子类)
- 在 Java 中,add()函数的最佳实践究竟是什么?(在Java中add()函数最佳实践是什么)
- 为什么要选择 Gosling Java 而不是其他版本呢?(为什么选择Gosling Java而不是其他版本)