我的编程空间,编程开发者的网络收藏夹
学习永远不晚

springboot2.5.6集成RabbitMq实现Topic主题模式的方法是什么

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

springboot2.5.6集成RabbitMq实现Topic主题模式的方法是什么

本篇内容主要讲解“springboot2.5.6集成RabbitMq实现Topic主题模式的方法是什么”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“springboot2.5.6集成RabbitMq实现Topic主题模式的方法是什么”吧!

application.yml

server:  port: 8184spring:  application:    name: rabbitmq-demo  rabbitmq:    host: 127.0.0.1 # ip地址    port: 5672    username: admin # 连接账号    password: 123456 # 连接密码    template:      retry:        enabled: true # 开启失败重试        initial-interval: 10000ms # 第一次重试的间隔时长        max-interval: 300000ms # 最长重试间隔,超过这个间隔将不再重试        multiplier: 2 # 下次重试间隔的倍数,此处是2即下次重试间隔是上次的2倍      exchange: topic.exchange # 缺省的交换机名称,此处配置后,发送消息如果不指定交换机就会使用这个    publisher-confirm-type: correlated # 生产者确认机制,确保消息会正确发送,如果发送失败会有错误回执,从而触发重试    publisher-returns: true    listener:      type: simple      simple:        acknowledge-mode: manual        prefetch: 1 # 限制每次发送一条数据。        concurrency: 3 # 同一个队列启动几个消费者        max-concurrency: 3 # 启动消费者最大数量        # 重试策略相关配置        retry:          enabled: true # 是否支持重试          max-attempts: 5          stateless: false          multiplier: 1.0 # 时间策略乘数因子          initial-interval: 1000ms          max-interval: 10000ms        default-requeue-rejected: true

pom.xml引入依赖

<!-- rabbitmq -->        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-amqp</artifactId>        </dependency>

常量类创建

public class RabbitMqConstants {    public final static String TEST1_QUEUE = "test1-queue";    public final static String TEST2_QUEUE = "test2-queue";    public final static String EXCHANGE_NAME = "test.topic.exchange";        public final static String TOPIC_TEST1_ROUTINGKEY = "topic.test1.*";    public final static String TOPIC_TEST1_ROUTINGKEY_TEST = "topic.test1.test";        public final static String TOPIC_TEST2_ROUTINGKEY = "topic.test2.*";    public final static String TOPIC_TEST2_ROUTINGKEY_TEST = "topic.test2.test";}

配置Configuration

import com.example.demo.common.RabbitMqConstants;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.*;import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.beans.factory.config.ConfigurableBeanFactory;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.context.annotation.Scope;@Slf4j@Configurationpublic class RabbitMqConfig {    @Autowired    private CachingConnectionFactory connectionFactory;        @Bean(RabbitMqConstants.EXCHANGE_NAME)    public Exchange exchange(){ //durable(true) 持久化,mq重启之后交换机还在        // Topic模式        //return ExchangeBuilder.topicExchange(RabbitMqConstants.EXCHANGE_NAME).durable(true).build();        //发布订阅模式        return ExchangeBuilder.fanoutExchange(RabbitMqConstants.EXCHANGE_NAME).durable(true).build();    }        @Bean(RabbitMqConstants.TEST1_QUEUE)    public Queue esQueue() {        return new Queue(RabbitMqConstants.TEST1_QUEUE);    }        @Bean(RabbitMqConstants.TEST2_QUEUE)    public Queue gitalkQueue() {        return new Queue(RabbitMqConstants.TEST2_QUEUE);    }        @Bean    public Binding bindingEs(@Qualifier(RabbitMqConstants.TEST1_QUEUE) Queue queue,                             @Qualifier(RabbitMqConstants.EXCHANGE_NAME) Exchange exchange) {        return BindingBuilder.bind(queue).to(exchange).with(RabbitMqConstants.TOPIC_TEST1_ROUTINGKEY).noargs();    }        @Bean    public Binding bindingGitalk(@Qualifier(RabbitMqConstants.TEST2_QUEUE) Queue queue,                                 @Qualifier(RabbitMqConstants.EXCHANGE_NAME) Exchange exchange) {        return BindingBuilder.bind(queue).to(exchange).with(RabbitMqConstants.TOPIC_TEST2_ROUTINGKEY).noargs();    }        @Bean    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)    public RabbitTemplate rabbitTemplate() {         RabbitTemplate template = new RabbitTemplate(connectionFactory);        return template;    }}

Rabbit工具类创建

import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import java.util.UUID;@Slf4j@Componentpublic class RabbitMqUtils implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback{    private RabbitTemplate rabbitTemplate;        @Autowired    public RabbitMqUtils(RabbitTemplate rabbitTemplate) {        this.rabbitTemplate = rabbitTemplate;        //这是是设置回调能收到发送到响应        rabbitTemplate.setConfirmCallback(this);        //如果设置备份队列则不起作用        rabbitTemplate.setMandatory(true);        rabbitTemplate.setReturnCallback(this);    }        @Override    public void confirm(CorrelationData correlationData, boolean ack, String cause) {        if(ack){            log.info("消息发送成功:correlationData({}),ack({}),cause({})",correlationData,ack,cause);        }else{            log.info("消息发送失败:correlationData({}),ack({}),cause({})",correlationData,ack,cause);        }    }        @Override    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {        log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText,message);    }        public void send(String queueName, Object obj){        CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());        this.rabbitTemplate.convertAndSend(queueName, obj, correlationId);    }        public void sendByRoutingKey(String exChange, String routingKey, Object obj){        CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());        this.rabbitTemplate.convertAndSend(exChange, routingKey, obj, correlationId);    }}

service创建

public interface TestService {    String sendTest1(String content);    String sendTest2(String content);}

impl实现

import com.example.demo.common.RabbitMqConstants;import com.example.demo.util.RabbitMqUtils;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;@Service@Slf4jpublic class TestServiceImpl implements TestService {    @Autowired    private RabbitMqUtils rabbitMqUtils;    @Override    public String sendTest1(String content) {        rabbitMqUtils.sendByRoutingKey(RabbitMqConstants.EXCHANGE_NAME,                RabbitMqConstants.TOPIC_TEST1_ROUTINGKEY_TEST, content);        log.info(RabbitMqConstants.TOPIC_TEST1_ROUTINGKEY_TEST+"***************发送成功*****************");        return "发送成功!";    }    @Override    public String sendTest2(String content) {        rabbitMqUtils.sendByRoutingKey(RabbitMqConstants.EXCHANGE_NAME,                RabbitMqConstants.TOPIC_TEST2_ROUTINGKEY_TEST, content);        log.info(RabbitMqConstants.TOPIC_TEST2_ROUTINGKEY_TEST+"***************发送成功*****************");        return "发送成功!";    }}

监听类

import com.example.demo.common.RabbitMqConstants;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.ExchangeTypes;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.Exchange;import org.springframework.amqp.rabbit.annotation.Queue;import org.springframework.amqp.rabbit.annotation.QueueBinding;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import com.rabbitmq.client.Channel;@Slf4j@Componentpublic class RabbitMqListener {    @RabbitListener(queues = RabbitMqConstants.TEST1_QUEUE)    public void test1Consumer(Message message, Channel channel) {        try {            //手动确认消息已经被消费            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);            log.info("Counsoum1消费消息:" + message.toString() + "。成功!");        } catch (Exception e) {            e.printStackTrace();            log.info("Counsoum1消费消息:" + message.toString() + "。失败!");        }    }    @RabbitListener(queues = RabbitMqConstants.TEST2_QUEUE)    public void test2Consumer(Message message, Channel channel) {        try {            //手动确认消息已经被消费            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);            log.info("Counsoum2消费消息:" + message.toString() + "。成功!");        } catch (Exception e) {            e.printStackTrace();            log.info("Counsoum2消费消息:" + message.toString() + "。失败!");        }    }}

Controller测试

import com.example.demo.server.TestService;import jdk.nashorn.internal.objects.annotations.Getter;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.*;import java.util.Map;@Slf4j@RestController@RequestMapping("/enterprise")public class TestController {    @Autowired    private TestService testService;    @GetMapping("/finance")    public String hello3(@RequestParam(required = false) Map<String, Object> params) {        return testService.sendTest2(params.get("entId").toString());    }        @PostMapping(value = "/finance2")    public String sendTest2(@RequestBody String content) {        return testService.sendTest2(content);    }}

到此,相信大家对“springboot2.5.6集成RabbitMq实现Topic主题模式的方法是什么”有了更深的了解,不妨来实际操作一番吧!这里是编程网网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

springboot2.5.6集成RabbitMq实现Topic主题模式的方法是什么

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

springboot2.5.6集成RabbitMq实现Topic主题模式的方法是什么

本篇内容主要讲解“springboot2.5.6集成RabbitMq实现Topic主题模式的方法是什么”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“springboot2.5.6集成Rabbit
2023-06-25

BUILDER模式的实现方法是什么

本篇内容主要讲解“BUILDER模式的实现方法是什么”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“BUILDER模式的实现方法是什么”吧!效果它将构造代码和表示代码分开Builder模式将构建对
2023-06-19

tk.mybatis实现uuid主键生成的方法是什么

这篇文章主要介绍“tk.mybatis实现uuid主键生成的方法是什么”,在日常操作中,相信很多人在tk.mybatis实现uuid主键生成的方法是什么问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”tk.my
2023-06-21

C++实现单例模式的方法是什么

这篇文章将为大家详细讲解有关C++实现单例模式的方法是什么,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。饿汉模式类实例化就会占用内存,浪费资源,效率高,不存在线程安全问题。class Sin
2023-06-22

Java观察者模式的实现方法是什么

Java观察者模式的实现方法如下:定义观察者接口(Observer):观察者接口定义了观察者对象的更新方法,通常包括一个update()方法来接收被观察者的通知。定义被观察者接口(Subject):被观察者接口定义了被观察者的行为,包括添加
2023-10-23

PHP单例模式的原理及实现方法是什么

本篇内容介绍了“PHP单例模式的原理及实现方法是什么”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!单例模式Singleton Pattern
2023-07-05

PHP桥接模式的优点与实现方法是什么

这篇文章主要介绍“PHP桥接模式的优点与实现方法是什么”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“PHP桥接模式的优点与实现方法是什么”文章能帮助大家解决问题。桥接模式Bridge Pattern
2023-07-05

PHP抽象工厂模式的优点与实现方法是什么

本篇内容介绍了“PHP抽象工厂模式的优点与实现方法是什么”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!抽象工厂模式Abstract Fact
2023-07-05

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录