我的编程空间,编程开发者的网络收藏夹
学习永远不晚

RabbitMQ死信队列

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

RabbitMQ死信队列

目录

一、概念

二、出现死信的原因

三、实战

(一)代码架构图

(二)消息被拒

(三)消息TTL过期

(四)队列达到最大长度


先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到 queue 里了, consumer queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费 ,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。 应用场景 : 为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中. 还有比如说 : 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效
  • 消息 TTL 过期
  • 队列达到最大长度(队列满了,无法再添加数据到 mq )
  • 消息被拒绝(basic.reject basic.nack)并且 requeue=false.

(一)代码架构图

(二)消息被拒

生产者

public class Producer {    public static final String NORMAL_EXCHANGE = "normal_exchange";    public static void main(String[] args) throws IOException {        Channel channel = RabbitMqUtils.getChannel();        for (int i = 0; i < 10; i++) {            String message = "info" + i;            channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, message.getBytes());        }    }}
C1 消费者代码( 启动之后关闭该消费者 模拟其接收不到消息 )
public class Consumer01 {    public static final String NORMAL_QUEUE = "normal_queue";    public static final String NORMAL_EXCHANGE = "normal_exchange";    public static final String DEAD_QUEUE = "dead_queue";    public static final String DEAD_EXCHANGE = "dead_exchange";    public static void main(String[] args) throws IOException {        Channel channel = RabbitMqUtils.getChannel();        // 声明普通和死信交换机        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);        // 声明死信队列        channel.queueDeclare(DEAD_QUEUE, false, false, false, null);        // 死信的绑定        channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");        Map arguments = new HashMap<>();        // 普通队列设置对应的交换机        arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);        // 设置死信队列的RouteKey        arguments.put("x-dead-letter-routing-key", "lisi");        // 设置队列最大长度        arguments.put("x-max-length", 6);        // 声明普通队列        channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);        // 普通的绑定        channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");        DeliverCallback deliverCallback = (consumerTag, message) -> {            String msg = new String(message.getBody());            if (msg.equals("info5")) {                System.out.println("Consumer01接收到消息" + message + "并拒绝签收该消息");                channel.basicReject(message.getEnvelope().getDeliveryTag(), false);            } else {                System.out.println("consumer01接收到消息:" + msg);                channel.basicAck(message.getEnvelope().getDeliveryTag(), false);            }        };        channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, consumerTag -> {        });    }}

C2 消费者代码不变 启动消费者 1 然后再启动消费者 2

 

(三)消息TTL过期

生产者代码
public class Producer {    public static final String NORMAL_EXCHANGE = "normal_exchange";    public static void main(String[] args) throws IOException {        Channel channel = RabbitMqUtils.getChannel();        AMQP.BasicProperties properties = new AMQP.BasicProperties()                .builder().expiration("10000").build();        for (int i = 0; i < 10; i++) {            String message = "info" + i;            channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes());        }    }}
消费者 C1 代码 ( 启动之后关闭该消费者 模拟其接收不到消息 )
public class Consumer01 {    public static final String NORMAL_QUEUE = "normal_queue";    public static final String NORMAL_EXCHANGE = "normal_exchange";    public static final String DEAD_QUEUE = "dead_queue";    public static final String DEAD_EXCHANGE = "dead_exchange";    public static void main(String[] args) throws IOException {        Channel channel = RabbitMqUtils.getChannel();        // 声明普通和死信交换机        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);        // 声明死信队列        channel.queueDeclare(DEAD_QUEUE, false, false, false, null);        // 死信的绑定        channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");        Map arguments = new HashMap<>();        // 普通队列设置对应的交换机        arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);        // 设置死信队列的RouteKey        arguments.put("x-dead-letter-routing-key", "lisi");        // 声明普通队列        channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);        // 普通的绑定        channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");        DeliverCallback deliverCallback = (consumerTag, message) -> {            String msg = new String(message.getBody());            System.out.println("consumer01接收到消息:" + msg);        };        channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, consumerTag -> {        });    }}

消费者 C2 代码(以上步骤完成后 启动 C2 消费者 它消费死信队列里面的消息)

public class Consumer02 {    public static final String DEAD_QUEUE = "dead_queue";    public static void main(String[] args) throws IOException {        Channel channel = RabbitMqUtils.getChannel();        System.out.println("等待接收死信队列消息.....");        DeliverCallback deliverCallback = (consumerTag, message) -> {            System.out.println("Consumer02 接收死信队列的消息:" + new String(message.getBody()));        };        channel.basicConsume(DEAD_QUEUE, true, deliverCallback, consumerTag -> {        });    }}

(四)队列达到最大长度

我们在声明普通队列时添加一个参数x-max-length即可

生产者

public class Producer {    public static final String NORMAL_EXCHANGE = "normal_exchange";    public static void main(String[] args) throws IOException {        Channel channel = RabbitMqUtils.getChannel();        for (int i = 0; i < 10; i++) {            String message = "info" + i;            channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, message.getBytes());        }    }}
C1 消费者修改以下代码 ( 启动之后关闭该消费者 模拟其接收不到消息 ),这里设置了队列最多容纳6条消息,此时由于生产者发送了10条消息,所以有4条会进入死信队列。
public class Consumer01 {    public static final String NORMAL_QUEUE = "normal_queue";    public static final String NORMAL_EXCHANGE = "normal_exchange";    public static final String DEAD_QUEUE = "dead_queue";    public static final String DEAD_EXCHANGE = "dead_exchange";    public static void main(String[] args) throws IOException {        Channel channel = RabbitMqUtils.getChannel();        // 声明普通和死信交换机        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);        // 声明死信队列        channel.queueDeclare(DEAD_QUEUE, false, false, false, null);        // 死信的绑定        channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");        Map arguments = new HashMap<>();        // 普通队列设置对应的交换机        arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);        // 设置死信队列的RouteKey        arguments.put("x-dead-letter-routing-key", "lisi");        // 设置队列最大长度        arguments.put("x-max-length", 6);        // 声明普通队列        channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);        // 普通的绑定        channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");        DeliverCallback deliverCallback = (consumerTag, message) -> {            String msg = new String(message.getBody());            System.out.println("consumer01接收到消息:" + msg);        };        channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, consumerTag -> {        });    }}
注意此时需要把原先队列删除 因为参数改变了
C2 消费者代码不变 ( 启动 C2 消费者)

 

总结

死信队列可能出现的三种情况为:

①消息被拒(消费者方拒绝签收该消息)

②消息设置的TTL过期(生产者方设置的过期时间)

③消息投放的队列内消息已经满了,放不进入时消息会进入死信队列

来源地址:https://blog.csdn.net/m0_62946761/article/details/129240859

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

RabbitMQ死信队列

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

RabbitMQ:死信队列

✨ RabbitMQ:死信队列 1.死信队列1.1死信队列基本介绍1.2消息成为死信的三种情况1.3死信队列结构图1.4死信的处理方式 2.TTL消息过期时间2.1基本介绍2.2生产者2.3消费者12.4消费者22.
2023-08-16

聊聊 RabbitMQ 中的死信队列

RabbitMQ的死信队列功能提供了一个强大而灵活的工具来处理无法被正常消费的消息。通过合理配置和使用死信队列,你可以增强消息处理系统的健壮性和可靠性,同时提高系统的可扩展性和可维护性。

RabbitMQ中死信队列和延迟队列如何使用

这篇文章主要讲解了“RabbitMQ中死信队列和延迟队列如何使用”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“RabbitMQ中死信队列和延迟队列如何使用”吧!死信队列简介DLX,全称为De
2023-06-30

.NETCore中RabbitMQ使用死信队列的实现

本文主要介绍了.NETCore中RabbitMQ使用死信队列的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
2023-05-14

SpringBoot怎么整合RabbitMQ处理死信队列和延迟队列

今天小编给大家分享一下SpringBoot怎么整合RabbitMQ处理死信队列和延迟队列的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了
2023-06-30

Springboot实现Rabbitmq死信队列以及延迟队列的优化

由于特定原因导致队列中的消息不能被消费,这样的消息如果没有后续处理就可以放入死信队列中,例如一个订单如果超时未被支付从而自动失效,就将这个订单放到死信队列中。

深入浅出RabbitMQ:顺序消费、死信队列和延时队列

RabbitMQ 是一个功能强大的消息中间件,它在许多互联网应用中扮演了关键角色,比如华为摄像机 SDK 的监控图像数据上报,大部分电商系统的异步消费等等。​

.NET Core中RabbitMQ使用死信队列的实现

.NETCore中RabbitMQ死信队列用于存储因处理失败而被拒绝的消息。可通过设置x-dead-letter-exchange和x-dead-letter-routing-key属性或使用DeadLetterAdvancedConsumer特性实现。死信策略定义了消息被拒绝后的处理方式,包括拒绝退避、死信重路由或丢弃。最佳实践包括定义明确的死信策略、定期监视队列、使用发布者重试机制、定期清除队列以避免增长过大。
.NET Core中RabbitMQ使用死信队列的实现
2024-04-02

​​​​​​​Golang实现RabbitMQ中死信队列几种情况

本文主要介绍了​​​​​​​Golang实现RabbitMQ中死信队列几种情况,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
2023-03-01

小白到高手:轻松学会RabbitMQ延迟队列、重试队列和死信队列

假设我们有一个电商项目,其中涉及到订单的处理。在订单支付后,我们需要发送订单消息到 RabbitMQ 进行异步处理。为了处理可能出现的处理失败情况,我们可以使用延迟队列、重试队列和死信队列来保证订单消息的可靠处理。

.NET Core中RabbitMQ使用死信队列如何实现

本篇内容介绍了“.NET Core中RabbitMQ使用死信队列如何实现”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!在.NET Core中
2023-07-05

​​​​​​​Golang实现RabbitMQ中死信队列的情况有哪些

这篇文章主要讲解了“Golang实现RabbitMQ中死信队列的情况有哪些”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Golang实现RabbitMQ中死信队列的情况有哪些”吧!1、造成死
2023-07-05

RabbitMQ队列中间件消息持久化 确认机制 死信队列原理

这篇文章主要介绍了消息队列中间件之RabbitMQ消息的持久化、确认机制、死信队列原理详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
2023-05-19

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录