我的编程空间,编程开发者的网络收藏夹
学习永远不晚

SpringbootRabbitmq消息防丢失实践

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

SpringbootRabbitmq消息防丢失实践

前言

之前看很多网上大佬的防丢失的文章,文章中理论知识偏多,所以自己想着实践一下,实践过程中也踩了一些坑,因此写出了这篇文章。如果文章有误人子弟的地方,望在评论区指出。

导致消息出现丢失的原因

  • 发送时失败,指发送端发送完消息准备到达消息队列的过程中,因网络波动、消息队列服务宕机等,消息队列服务无法接收消息,所以导致了丢失。
  • 到达时宕机,消息队列服务接收到消息之后,如果没有开启持久化,消息会存储在内存中(当然内存吃紧的话,也会转入磁盘,缓解内存),如果这个时候服务挂了,那么内存中的消息就会丢失。
  • 发送到消费端失败,消费端接收到了消息的时候,消费端服务挂了,而rabbitmq默认自动ack,也就是说rabbitmq发送到消费端,一旦认定了消费端接收了,无论有无消费成功,rabbitmq都认为是发送成功。

下面我们以这三种情况进行实践。

环境

jdk1.8
Spring boot 2.3.7.RELEASE
Spring-boot-starter-amqp 2.3.7.RELEASE
Rabbitmq 3.7.7

准备工作

我事先准备了好了交换机以及队列:

  • 交换机:message.log.test.exchangemessage.log.test2.exchange
  • 队列:message.loss.test.queue

其中message.loss.test.queuemessage.log.test.exchange是绑定关系,而message.log.test2.exchange没有绑定队列

1.发送时失败

发送时失败,rabbitmq有两种情况是属于发送时失败。

  • 消息未到rabbitmq的交换机(exchange)
  • 消息到达了rabbitmq的交换机(exchange),但是没有到达队列(queue)

第一种的解决方式是使用confirm机制。第二种解决方式则是使用return机制

使用confirm机制

模拟场景

confirm机制是当发送端的消息没有到达rabbitmq的交换机(exchange)时,会触发confirm方法,告诉发送端该消息没有到达rabbitmq,需要做业务处理。
这里我们发送消息到rabbitmq不存在的交换机上,就可以模拟上述场景。

实现RabbitTemplate.ConfirmCallback接口


@Component
public class ConfirmCallBack implements RabbitTemplate.ConfirmCallback {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init(){

        rabbitTemplate.setConfirmCallback(this);
    }

    
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {

        //第一个坑,如果发送端发送消息时没有对correlationData进行处理,conirm方法接收到的对象都会是null
        //当接收失败并且correlationData对象为null,证明目前已经无法追溯回业务,可以做业务日志处理
        if(!ack&&correlationData==null){
            System.out.println(cause);
            //日志处理。。。

            return;
        }
        //如果接收失败
        if(!ack){
            System.out.println("消息Id:"+correlationData.getId());
            Message message=correlationData.getReturnedMessage();
            System.out.println("消息体:"+new String(message.getBody()));
            //这里可以持久化业务消息体到数据库,然后定时去进行补偿处理或者重试等等
            return;
        }

        //处理完成

    }
}

发送端代码


@PostMapping("push")
public boolean push(){

    TestMessage testMessage=new TestMessage();
    testMessage.setName("mq名称");
    testMessage.setBusinessId("业务Id");

    //定义CorrelationData对象以及消息属性。不然comfirm方法无论失败还是成功,CorrelationData参数永远是null
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    //传递业务数据
    correlationData.setReturnedMessage(new Message(JSONObject.toJSON(testMessage).toString().getBytes(StandardCharsets.UTF_8),new MessageProperties()));

  //发送消息(这里发送给了message.log.test.exchange11交换机,但实际rabbitmq并不存在)template.convertAndSend("message.log.test.exchange11","message_loss_test",testMessage,correlationData);

    return true;
}

这里是我踩的第一个坑,如果发送端不定义correlationData,那么confirm接收到的correlationData对象参数 都会是null

实现效果

使用return机制

模拟场景

当消息到达了rabbitmq的交换机的时候,但是又没有到达队列,那么就会触发return方法。
下面我们定义一个没有绑定队列的交换机,然后发送消息到交换机,就可以模拟上述场景

实现RabbitTemplate.ReturnCallback


@Component
public class ReturnCallBack implements RabbitTemplate.ReturnCallback {
    @Resource
    private RabbitTemplate rabbitTemplate;
    @PostConstruct
    public void init() {
        rabbitTemplate.setReturnCallback(this);
    }
    
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        System.out.println("消息标识:" + message.getMessageProperties().getDeliveryTag());
        String messageBody = null;
        try {
            messageBody = new String(message.getBody(), "UTF-8");
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        System.out.println("消息:" + messageBody);
        System.out.println(replyCode);
        System.out.println(replyText);
        System.out.println(exchange);
        System.out.println(routingKey);

    }
}

发送端代码


@PostMapping("push2")
public boolean push2(){

    TestMessage testMessage=new TestMessage();
    testMessage.setName("mq名称2");
    testMessage.setBusinessId("业务Id");

    template.convertAndSend("message.log.test2.exchange","message_loss_test",JSONObject.toJSON(testMessage).toString());

    return true;
}

这里需注意消息体需要JSON序列化,不然returnedMessage方法接收的消息body会是乱码

实现效果

rabbitmq服务挂了,造成内存的消息丢失。

这个开启rabbitmq的持久化机制就好了,开启之后消息到达rabbitmq服务,会实时转入磁盘。这里怎么设置就不多说了,网上挺多文章可以解答。

不过即使开启了还是会有一种情况会造成消息丢失,那就是消息即将要持久化到磁盘的那一刻,服务挂了,就会造成丢失,不过这种情况我也不知道怎么模拟,所以就暂不实践了。

发送到消费端消费失败

上面提到默认情况下rabbitmq使用的是自动ack的方式,我们将它改成手动ack的方式,就可以解决这个问题。

修改application.yml配置文件

rabbitmq:
 listener:
  simple:
    #开启手动确认
    acknowledge-mode: manual
    #开启失败后的重试机制
    retry:
      enabled: true
      #最多重试3次
      max-attempts: 3

下面我们试一下几种消费端消费不成功的场景

消费了,但是忘记做手动确认ack的操作代码。

@Component
public class TestConsumer {

    
    @RabbitListener(queues = {"message.loss.test.queue"})
    public void test(TestMessage testmessage, Message message, Channel channel) throws IOException {
        System.out.println("消费testmessage消息:"+testmessage.getName());
//        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

    }
}

效果

效果流程:

  • 第一次用Postman请求之后,控制台显示了消息被消费的信号。
  • 然后去查看rabbitmq后台管理刚刚被消费的消息以及变为Unacked
  • 停止程序后(关闭消费端),过一阵子,后台管理显示消息变回了Ready,也就是说重新回到了队列。
  • 重新启动程序(开启消费段),消息被重新消费。

总而言之,如果消费端没有做手动确认的操作,那么在消费端还没关闭之前,消息会变成Unacked,不会再次被消费,但一旦消费端关闭了,消息会重新回到队列,让消费端消费。

消费过程中,触发了未知异常,代码没有try catch


@RabbitListener(queues = {"message.loss.test.queue"})
public void test(TestMessage testmessage, Message message, Channel channel) throws IOException {
    System.out.println("消费testmessage消息:"+testmessage.getName());
    //故意触发异常
    if(!StringUtils.isEmpty(testmessage.getName())){

        throw new RuntimeException("11211");
    }
    channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}

效果1

上面的效果图显示,我在触发了异常之后,消息重试了三次,也就是我在application.yml 配置的重试三次

如果我去掉重试机制会是什么效果。

效果2

 效果和忘记做ack操作的效果一样,消息没有ack后,消息会变成Unacked状态,消费端关闭后消息会重新回到队列,然后重新链接的时候,就会再消费一次。

总结

到此这篇关于Spring boot Rabbitmq消息防丢失实践的文章就介绍到这了,更多相关Spring boot Rabbitmq 内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

SpringbootRabbitmq消息防丢失实践

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

RabbitMQ消息丢失怎么防止

这篇文章主要介绍“RabbitMQ消息丢失怎么防止”,在日常操作中,相信很多人在RabbitMQ消息丢失怎么防止问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”RabbitMQ消息丢失怎么防止”的疑惑有所帮助!
2023-07-05

RabbitMq消息防丢失功能实现方式讲解

这篇文章主要介绍了RabbitMq消息防丢失功能实现,RabbitMQ中,消息丢失可以简单的分为两种:客户端丢失和服务端丢失。针对这两种消息丢失,RabbitMQ都给出了相应的解决方案
2023-01-28

在redis中防止消息丢失的机制

在项目中,由于网络问题,我们很难保证生产者发送的消息能100%到达消息队列服务器,也就是说有消息丢失的可能性,因此,生产者就必须具有消息丢失检测和重发机制,这篇文章主要介绍了如何在redis中防止消息丢失,需要的朋友可以参考下
2023-02-08

一文看懂RabbitMQ消息丢失如何防止

这篇文章主要介绍了RabbitMQ消息丢失的场景,以及如何保证信息不丢失,看完这篇文章一定可以帮助你RabbitMQ有更深的理解,需要的朋友可以参考下
2023-03-24

mq消息丢失如何处理

当MQ消息丢失时,可以考虑以下处理方法:消息确认机制:在发送消息时,可以要求消息接收方发送一个确认消息来确认已经成功接收到消息。如果发送方在一定时间内没有收到确认消息,就可以认为消息丢失,并进行相应的处理。消息重发机制:当检测到消息丢失时,
2023-10-26

mq怎么避免消息重复和消息丢失

要避免消息重复和消息丢失,可以采取以下措施:1. 使用独立的消息唯一标识符:在每条消息中添加一个唯一的标识符,可以是消息的ID或者其他唯一标识符,确保消息在系统中唯一。2. 实现幂等性:对于接收到的重复消息,可以在接收端进行幂等性处理,即重
2023-10-20

kafka消息不丢失是如何实现的

Kafka通过以下几个方面来确保消息不丢失:持久化存储:Kafka使用持久化日志文件来存储消息,即将消息写入到硬盘上的文件中。这样即使发生硬件故障,消息仍然可以从磁盘中恢复。复制机制:Kafka使用复制机制来提供高可用性和故障容忍性。每个主
2023-10-20

rabbitmq如何保证消息不丢失

RabbitMQ 提供了多种方式来保证消息不丢失:1. 持久化消息:RabbitMQ 默认将消息存储在内存中,当服务器重启或断电时,消息会丢失。为了避免这种情况,可以将消息持久化到磁盘上。在发送消息时,设置消息的 delivery mode
2023-10-09

redis消息队列丢失如何解决

要解决Redis消息队列丢失的问题,可以考虑以下几点:1. 持久化存储:Redis提供了持久化存储的功能,可以将消息在内存中的数据持久化到硬盘上,以防止数据丢失。可以使用Redis的RDB快照或者AOF日志来实现持久化存储。2. 设置合适的
2023-09-04

mq消息丢失补偿机制是什么

MQ消息丢失补偿机制是一种在消息中间件(MQ)中,当消息发送或消费遇到异常情况导致消息丢失时,能够进行补偿和保证消息不丢失的一种机制。常见的MQ消息丢失补偿机制有以下几种:1. 重试机制:当消息发送或消费失败时,自动进行重试操作,多次尝试发
2023-10-20

Linux推送服务如何避免消息丢失

要避免消息丢失,Linux推送服务可以采取以下措施:使用可靠的消息队列:推送服务可以使用可靠的消息队列来存储消息,并确保消息在发送给订阅者之前被正确处理。一些流行的消息队列系统包括RabbitMQ、Kafka和ActiveMQ。实时监控和告
Linux推送服务如何避免消息丢失
2024-08-23

如何解析Kafka 消息丢失与消费精确一次性

今天就跟大家聊聊有关如何解析Kafka 消息丢失与消费精确一次性,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。消息丢失的场景如果Kafka Producer使用“发后即忘”的方式发送
2023-06-01

mq保证消息不丢失的方法是什么

MQ(消息队列)保证消息不丢失的方法主要有以下几种:1. 持久化机制:在发送消息时,将消息持久化到磁盘上,即使在消息队列崩溃或重启后,消息仍然可靠地保存在磁盘上。可以通过设置消息的持久化标志来实现。2. 消息确认机制:在消息消费者接收到消息
2023-10-12

预防数据丢失:操作系统备份策略的最佳实践

数据丢失是IT专业人员的普遍难题,备份对于确保企业能够从任何丢失或损坏的数据中恢复至关重要。本文探讨了操作系统备份策略的最佳实践,包括评估风险、选择适当的备份方法、定期进行备份测试和验证、制定数据恢复计划等。
预防数据丢失:操作系统备份策略的最佳实践
2024-02-03

redisstream实现消息队列的实践

本文主要介绍了redisstream实现消息队列的实践,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
2022-11-13

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录