我的编程空间,编程开发者的网络收藏夹
学习永远不晚

SpringBoot整合RabbitMQ实战教程附死信交换机

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

SpringBoot整合RabbitMQ实战教程附死信交换机

前言

使用springboot,实现以下功能,有两个队列1、2,往里面发送消息,如果处理失败发生异常,可以重试3次,重试3次均失败,那么就将消息发送到死信队列进行统一处理,例如记录数据库、报警等
完整demo项目代码https://gitee.com/daenmax/rabbit-mq-demo

环境

Windows10,IDEA,otp_win64_25.0,rabbitmq-server-3.10.4
1.双击C:\Program Files\RabbitMQ Server\rabbitmq_server-3.10.4\sbin\rabbitmq-server.bat启动MQ服务
2.然后访问http://localhost:15672/,默认账号密码均为guest,
3.手动添加一个虚拟主机为admin_host,手动创建一个用户账号密码均为admin

pom.xml

        <!-- RabbitMQ -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <version>2.7.0</version>
        </dependency>

配置

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: admin
    password: admin
    virtual-host: admin_host
    publisher-confirm-type: correlated
    publisher-returns: true
    listener:
      simple:
        acknowledge-mode: manual
        retry:
          enabled: true    #开启失败重试
          max-attempts: 3    #最大重试次数
          initial-interval: 1000  #重试间隔时间 毫秒

配置文件

RabbitConfig

package com.example.rabitmqdemo.mydemo.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;



@Slf4j
@Component
public class RabbitConfig {
    //业务交换机
    public static final String EXCHANGE_PHCP = "phcp";
    //业务队列1
    public static final String QUEUE_COMPANY = "company";
    //业务队列1的key
    public static final String ROUTINGKEY_COMPANY = "companyKey";
    //业务队列2
    public static final String QUEUE_PROJECT = "project";
    //业务队列2的key
    public static final String ROUTINGKEY_PROJECT = "projectKey";

    //死信交换机
    public static final String EXCHANGE_PHCP_DEAD = "phcp_dead";
    //死信队列1
    public static final String QUEUE_COMPANY_DEAD = "company_dead";
    //死信队列2
    public static final String QUEUE_PROJECT_DEAD = "project_dead";
    //死信队列1的key
    public static final String ROUTINGKEY_COMPANY_DEAD = "companyKey_dead";
    //死信队列2的key
    public static final String ROUTINGKEY_PROJECT_DEAD = "projectKey_dead";


//    
//    @Bean
//    public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
//        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
//        factory.setConnectionFactory(connectionFactory);
//        factory.setMessageConverter(new Jackson2JsonMessageConverter());
//        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
//        return factory;
//    }

    
    @Bean("exchangePhcp")
    public DirectExchange exchangePhcp() {
        return new DirectExchange(EXCHANGE_PHCP);
    }

     * 声明死信交换机
    @Bean("exchangePhcpDead")
    public DirectExchange exchangePhcpDead() {
        return new DirectExchange(EXCHANGE_PHCP_DEAD);

     * 声明业务队列1
     *
     * @return
    @Bean("queueCompany")
    public Queue queueCompany() {
        Map<String,Object> arguments = new HashMap<>(2);
        arguments.put("x-dead-letter-exchange",EXCHANGE_PHCP_DEAD);
        //绑定该队列到死信交换机的队列1
        arguments.put("x-dead-letter-routing-key",ROUTINGKEY_COMPANY_DEAD);
        return QueueBuilder.durable(QUEUE_COMPANY).withArguments(arguments).build();
     * 声明业务队列2
    @Bean("queueProject")
    public Queue queueProject() {
        //绑定该队列到死信交换机的队列2
        arguments.put("x-dead-letter-routing-key",ROUTINGKEY_PROJECT_DEAD);
        return QueueBuilder.durable(QUEUE_PROJECT).withArguments(arguments).build();

     * 声明死信队列1
    @Bean("queueCompanyDead")
    public Queue queueCompanyDead() {
        return new Queue(QUEUE_COMPANY_DEAD);
     * 声明死信队列2
    @Bean("queueProjectDead")
    public Queue queueProjectDead() {
        return new Queue(QUEUE_PROJECT_DEAD);

     * 绑定业务队列1和业务交换机
     * @param queue
     * @param directExchange
    @Bean
    public Binding bindingQueueCompany(@Qualifier("queueCompany") Queue queue, @Qualifier("exchangePhcp") DirectExchange directExchange) {
        return BindingBuilder.bind(queue).to(directExchange).with(RabbitConfig.ROUTINGKEY_COMPANY);

     * 绑定业务队列2和业务交换机
    public Binding bindingQueueProject(@Qualifier("queueProject") Queue queue, @Qualifier("exchangePhcp") DirectExchange directExchange) {
        return BindingBuilder.bind(queue).to(directExchange).with(RabbitConfig.ROUTINGKEY_PROJECT);

     * 绑定死信队列1和死信交换机
    public Binding bindingQueueCompanyDead(@Qualifier("queueCompanyDead") Queue queue, @Qualifier("exchangePhcpDead") DirectExchange directExchange) {
        return BindingBuilder.bind(queue).to(directExchange).with(RabbitConfig.ROUTINGKEY_COMPANY_DEAD);

     * 绑定死信队列2和死信交换机
    public Binding bindingQueueProjectDead(@Qualifier("queueProjectDead") Queue queue, @Qualifier("exchangePhcpDead") DirectExchange directExchange) {
        return BindingBuilder.bind(queue).to(directExchange).with(RabbitConfig.ROUTINGKEY_PROJECT_DEAD);
}

生产者

RabbltProducer

package com.example.rabitmqdemo.mydemo.producer;
import com.example.rabitmqdemo.mydemo.config.RabbitConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
@Component
@Slf4j
public class RabbltProducer implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback{
    @Resource
    private RabbitTemplate rabbitTemplate;
    
    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnsCallback(this);
        rabbitTemplate.setMandatory(true);
    }
    
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            System.out.println("消息发送成功" + correlationData);
        } else {
            System.out.println("消息发送失败:" + cause);
        }
    }
    
    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        String str = new String(returnedMessage.getMessage().getBody());
        System.out.println("消息发送失败:" + str);
    }
    
    public void sendCompany(String str){
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType("application/json");
        Message message = new Message(str.getBytes(StandardCharsets.UTF_8),messageProperties);
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        this.rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_PHCP,RabbitConfig.ROUTINGKEY_COMPANY,message,correlationData);
        //也可以用下面的方式
        //CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        //this.rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_PHCP,RabbitConfig.ROUTINGKEY_COMPANY,str,correlationData);
    }
    
    public void sendProject(String str){
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType("application/json");
        Message message = new Message(str.getBytes(StandardCharsets.UTF_8),messageProperties);
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        this.rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_PHCP,RabbitConfig.ROUTINGKEY_PROJECT,message,correlationData);
        //也可以用下面的方式
        //CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        //this.rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_PHCP,RabbitConfig.ROUTINGKEY_PROJECT,str,correlationData);
    }
}

业务消费者

RabbitConsumer

package com.example.rabitmqdemo.mydemo.consumer;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;

@Component
@Slf4j
public class RabbitConsumer {
    
    @RabbitListener(queues = "company")
    public void company(Message message, Channel channel) throws IOException {
        try{
            System.out.println("次数" + message.getMessageProperties().getDeliveryTag());
            channel.basicQos(1);
            Thread.sleep(2000);
            String s = new String(message.getBody());
            log.info("处理消息"+s);
            //下面两行是尝试手动抛出异常,用来测试重试次数和发送到死信交换机
            //String str = null;
            //str.split("1");
            //处理成功,确认应答
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }catch (Exception e){
            log.error("处理消息时发生异常:"+e.getMessage());
            Boolean redelivered = message.getMessageProperties().getRedelivered();
            if(redelivered){
                log.error("异常重试次数已到达设置次数,将发送到死信交换机");
                channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
            }else {
                log.error("消息即将返回队列处理重试");
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            }
        }
    }
    
    @RabbitListener(queues = "project")
    public void project(Message message, Channel channel) throws IOException {
        try{
            System.out.println("次数" + message.getMessageProperties().getDeliveryTag());
            channel.basicQos(1);
            Thread.sleep(2000);
            String s = new String(message.getBody());
            log.info("处理消息"+s);
            //下面两行是尝试手动抛出异常,用来测试重试次数和发送到死信交换机
            //String str = null;
            //str.split("1");
            //处理成功,确认应答
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }catch (Exception e){
            log.error("处理消息时发生异常:"+e.getMessage());
            Boolean redelivered = message.getMessageProperties().getRedelivered();
            if(redelivered){
                log.error("异常重试次数已到达设置次数,将发送到死信交换机");
                channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
            }else {
                log.error("消息即将返回队列处理重试");
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            }
        }
    }
}

死信消费者

RabbitConsumer

package com.example.rabitmqdemo.mydemo.consumer;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;

@Component
@Slf4j
public class RabbitConsumerDead {
    
    @RabbitListener(queues = "company_dead")
    public void company_dead(Message message, Channel channel) throws IOException {
        try{
            channel.basicQos(1);
            String s = new String(message.getBody());
            log.info("处理死信"+s);
            //在此处记录到数据库、报警之类的操作
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }catch (Exception e){
            log.error("接收异常:"+e.getMessage());
        }
    }
    
    @RabbitListener(queues = "project_dead")
    public void project_dead(Message message, Channel channel) throws IOException {
        try{
            channel.basicQos(1);
            String s = new String(message.getBody());
            log.info("处理死信"+s);
            //在此处记录到数据库、报警之类的操作
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }catch (Exception e){
            log.error("接收异常:"+e.getMessage());
        }
    }
}

测试

MqController

package com.example.rabitmqdemo.mydemo.controller;
import com.example.rabitmqdemo.mydemo.producer.RabbltProducer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RequestMapping("/def")
@RestController
@Slf4j
public class MsgController {
    @Resource
    private RabbltProducer rabbltProducer;
    
    @RequestMapping("/handleCompany")
    public void handleCompany(@RequestBody String jsonStr){
        rabbltProducer.sendCompany(jsonStr);
    }
}

到此这篇关于SpringBoot整合RabbitMQ实战附加死信交换机的文章就介绍到这了,更多相关SpringBoot整合RabbitMQ死信交换机内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

SpringBoot整合RabbitMQ实战教程附死信交换机

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

SpringBoot如何整合RabbitMQ实现死信交换机

本篇内容介绍了“SpringBoot如何整合RabbitMQ实现死信交换机”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!环境Windows1
2023-07-02

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录