我的编程空间,编程开发者的网络收藏夹
学习永远不晚

spring boot集成rabbitmq的示例分析

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

spring boot集成rabbitmq的示例分析

这篇文章主要为大家展示了“spring boot集成rabbitmq的示例分析”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“spring boot集成rabbitmq的示例分析”这篇文章吧。

一、RabbitMQ的介绍  

RabbitMQ是消息中间件的一种,消息中间件即分布式系统中完成消息的发送和接收的基础软件.这些软件有很多,包括ActiveMQ(apache公司的),RocketMQ(阿里巴巴公司的,现已经转让给apache).

消息中间件的工作过程可以用生产者消费者模型来表示.即,生产者不断的向消息队列发送信息,而消费者从消息队列中消费信息.具体过程如下:

spring boot集成rabbitmq的示例分析

从上图可看出,对于消息队列来说,生产者,消息队列,消费者是最重要的三个概念,生产者发消息到消息队列中去,消费者监听指定的消息队列,并且当消息队列收到消息之后,接收消息队列传来的消息,并且给予相应的处理.消息队列常用于分布式系统之间互相信息的传递.

对于RabbitMQ来说,除了这三个基本模块以外,还添加了一个模块,即交换机(Exchange).它使得生产者和消息队列之间产生了隔离,生产者将消息发送给交换机,而交换机则根据调度策略把相应的消息转发给对应的消息队列.那么RabitMQ的工作流程如下所示:

spring boot集成rabbitmq的示例分析

紧接着说一下交换机.交换机的主要作用是接收相应的消息并且绑定到指定的队列.交换机有四种类型,分别为Direct,topic,headers,Fanout.

Direct是RabbitMQ默认的交换机模式,也是最简单的模式.即创建消息队列的时候,指定一个BindingKey.当发送者发送消息的时候,指定对应的Key.当Key和消息队列的BindingKey一致的时候,消息将会被发送到该消息队列中.

topic转发信息主要是依据通配符,队列和交换机的绑定主要是依据一种模式(通配符+字符串),而当发送消息的时候,只有指定的Key和该模式相匹配的时候,消息才会被发送到该消息队列中.

headers也是根据一个规则进行匹配,在消息队列和交换机绑定的时候会指定一组键值对规则,而发送消息的时候也会指定一组键值对规则,当两组键值对规则相匹配的时候,消息会被发送到匹配的消息队列中.

Fanout是路由广播的形式,将会把消息发给绑定它的全部队列,即便设置了key,也会被忽略. 

概念:

  • 生产者 消息的产生方,负责将消息推送到消息队列

  • 消费者 消息的最终接受方,负责监听队列中的对应消息,消费消息

  • 队列 消息的寄存器,负责存放生产者发送的消息

  • 交换机 负责根据一定规则分发生产者产生的消息

  • 绑定 完成交换机和队列之间的绑定

模式:

direct

直连模式,用于实例间的任务分发

topic

话题模式,通过可配置的规则分发给绑定在该exchange上的队列

headers

适用规则复杂的分发,用headers里的参数表达规则

fanout

分发给所有绑定到该exchange上的队列,忽略routing key

安装

单机版安装很简单,大概步骤如下:

# 安装erlang包 yum install erlang# 安装socat yum install socat# 安装rabbit  rpm -ivh rabbitmq-server-3.6.6-1.el6.noarch.rpm # 启动服务 rabbitmq-server start# 增加管理控制功能 rabbitmq-plugins enable rabbitmq_management# 增加用户: sudo rabbitmqctl add_user root password rabbitmqctl set_user_tags root administrator  rabbitmqctl set_permissions -p / root '.*' '.*' '.*'

集群安装,可参考这篇文章:

     rabbitmq集群安装

以上就是rabbitmq的介绍,下面开始本文的正文:spring boot 集成rabbitmq ,本人在学习rabbitmq时发现网上很少有系统性介绍springboot和rabbitmq如何集成的,其他人总结的都片段化,所以结合个人调研过程,整理此篇文章。

二、springboot配置

废话少说直接上代码:

配置参数

application.yml:

spring: rabbitmq: addresses: 192.168.1.1:5672 username: username password: password publisher-confirms: true virtual-host: /

java config读取参数

@Configuration@ConfigurationProperties(prefix = "spring.rabbitmq")public class RabbitMqConfig { @Value("${spring.rabbitmq.addresses}") private String addresses; @Value("${spring.rabbitmq.username}") private String username; @Value("${spring.rabbitmq.password}") private String password; @Value("${spring.rabbitmq.publisher-confirms}") private Boolean publisherConfirms; @Value("${spring.rabbitmq.virtual-host}") private String virtualHost; // 构建mq实例工厂 @Bean public ConnectionFactory connectionFactory(){ CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setAddresses(addresses); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setPublisherConfirms(publisherConfirms); connectionFactory.setVirtualHost(virtualHost); return connectionFactory; } @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){ return new RabbitAdmin(connectionFactory); } @Bean @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) public RabbitTemplate rabbitTemplate(){ RabbitTemplate template = new RabbitTemplate(connectionFactory()); return template; }}

三、rabbitmq生产者配置

主要配置了直连和话题模式,其中话题模式设置两个队列(queueTopicTest1、queueTopicTest2),此两个队列在和交换机绑定时分别设置不同的routingkey(.TEST.以及lazy.#)来验证匹配模式。

@Configuration@AutoConfigureAfter(RabbitMqConfig.class)public class RabbitMqExchangeConfig {  private static final Logger logger = LoggerFactory.getLogger(RabbitMqExchangeConfig.class);  @Bean TopicExchange contractTopicExchangeDurable(RabbitAdmin rabbitAdmin){ TopicExchange contractTopicExchange = new TopicExchange(RabbitMqEnum.Exchange.CONTRACT_TOPIC.getCode()); rabbitAdmin.declareExchange(contractTopicExchange); logger.debug("完成主题型交换机bean实例化"); return contractTopicExchange; }  @Bean DirectExchange contractDirectExchange(RabbitAdmin rabbitAdmin) { DirectExchange contractDirectExchange = new DirectExchange(RabbitMqEnum.Exchange.CONTRACT_DIRECT.getCode()); rabbitAdmin.declareExchange(contractDirectExchange); logger.debug("完成直连型交换机bean实例化"); return contractDirectExchange; } //在此可以定义队列 @Bean Queue queueTest(RabbitAdmin rabbitAdmin){ Queue queue = new Queue(RabbitMqEnum.QueueName.TESTQUEUE.getCode()); rabbitAdmin.declareQueue(queue); logger.debug("测试队列实例化完成"); return queue; } //topic 1 @Bean Queue queueTopicTest1(RabbitAdmin rabbitAdmin){ Queue queue = new Queue(RabbitMqEnum.QueueName.TOPICTEST1.getCode()); rabbitAdmin.declareQueue(queue); logger.debug("话题测试队列1实例化完成"); return queue; } //topic 2 @Bean Queue queueTopicTest2(RabbitAdmin rabbitAdmin){ Queue queue = new Queue(RabbitMqEnum.QueueName.TOPICTEST2.getCode()); rabbitAdmin.declareQueue(queue); logger.debug("话题测试队列2实例化完成"); return queue; } //在此处完成队列和交换机绑定 @Bean Binding bindingQueueTest(Queue queueTest,DirectExchange exchange,RabbitAdmin rabbitAdmin){ Binding binding = BindingBuilder.bind(queueTest).to(exchange).with(RabbitMqEnum.QueueEnum.TESTQUEUE.getCode()); rabbitAdmin.declareBinding(binding); logger.debug("测试队列与直连型交换机绑定完成"); return binding; } //topic binding1 @Bean Binding bindingQueueTopicTest1(Queue queueTopicTest1,TopicExchange exchange,RabbitAdmin rabbitAdmin){ Binding binding = BindingBuilder.bind(queueTopicTest1).to(exchange).with(RabbitMqEnum.QueueEnum.TESTTOPICQUEUE1.getCode()); rabbitAdmin.declareBinding(binding); logger.debug("测试队列与话题交换机1绑定完成"); return binding; } //topic binding2 @Bean Binding bindingQueueTopicTest2(Queue queueTopicTest2,TopicExchange exchange,RabbitAdmin rabbitAdmin){ Binding binding = BindingBuilder.bind(queueTopicTest2).to(exchange).with(RabbitMqEnum.QueueEnum.TESTTOPICQUEUE2.getCode()); rabbitAdmin.declareBinding(binding); logger.debug("测试队列与话题交换机2绑定完成"); return binding; }}

在这里用到枚举类:RabbitMqEnum

public class RabbitMqEnum {  public enum Exchange { CONTRACT_FANOUT("CONTRACT_FANOUT", "消息分发"), CONTRACT_TOPIC("CONTRACT_TOPIC", "消息订阅"), CONTRACT_DIRECT("CONTRACT_DIRECT", "点对点"); private String code; private String name; Exchange(String code, String name) { this.code = code; this.name = name; } public String getCode() { return code; } public String getName() { return name; } }  public enum QueueName { TESTQUEUE("TESTQUEUE", "测试队列"), TOPICTEST1("TOPICTEST1", "topic测试队列"), TOPICTEST2("TOPICTEST2", "topic测试队列"); private String code; private String name; QueueName(String code, String name) { this.code = code; this.name = name; } public String getCode() { return code; } public String getName() { return name; } }  public enum QueueEnum { TESTQUEUE("TESTQUEUE1", "测试队列key"), TESTTOPICQUEUE1("*.TEST.*", "topic测试队列key"), TESTTOPICQUEUE2("lazy.#", "topic测试队列key"); private String code; private String name; QueueEnum(String code, String name) { this.code = code; this.name = name; } public String getCode() { return code; } public String getName() { return name; } }}

以上完成消息生产者的定义,下面封装调用接口

测试时直接调用此工具类,testUser类需自己实现

rabbitMqSender.sendRabbitmqDirect("TESTQUEUE1",testUser);rabbitMqSender.sendRabbitmqTopic("lazy.1.2",testUser);rabbitMqSender.sendRabbitmqTopic("lazy.TEST.2",testUser);
@Componentpublic class RabbitMqSender implements RabbitTemplate.ConfirmCallback{  private static final Logger logger = LoggerFactory.getLogger(RabbitMqSender.class); private RabbitTemplate rabbitTemplate; @Autowired public RabbitMqSender(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; this.rabbitTemplate.setConfirmCallback(this); } @Override public void confirm(CorrelationData correlationData, boolean b, String s) { logger.info("confirm: " + correlationData.getId()); }  public void sendRabbitmqDirect(String routeKey,Object obj) { CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); logger.info("send: " + correlationData.getId()); this.rabbitTemplate.convertAndSend(RabbitMqEnum.Exchange.CONTRACT_DIRECT.getCode(), routeKey , obj, correlationData); }  public void sendRabbitmqTopic(String routeKey,Object obj) { CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); logger.info("send: " + correlationData.getId()); this.rabbitTemplate.convertAndSend(RabbitMqEnum.Exchange.CONTRACT_TOPIC.getCode(), routeKey , obj, correlationData); }}

四、rabbitmq消费者配置

springboot注解方式监听队列,无法手动指定回调,所以采用了实现ChannelAwareMessageListener接口,重写onMessage来进行手动回调,详见以下代码,详细介绍可以在spring的官网上找amqp相关章节阅读

直连消费者

通过设置TestUser的name来测试回调,分别发两条消息,一条UserName为1,一条为2,查看控制台中队列中消息是否被消费

@Configuration@AutoConfigureAfter(RabbitMqConfig.class)public class ExampleAmqpConfiguration { @Bean("testQueueContainer") public MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueueNames("TESTQUEUE"); container.setMessageListener(exampleListener()); container.setAcknowledgeMode(AcknowledgeMode.MANUAL); return container; } @Bean("testQueueListener") public ChannelAwareMessageListener exampleListener() { return new ChannelAwareMessageListener() { @Override public void onMessage(Message message, Channel channel) throws Exception { TestUser testUser = (TestUser) SerializeUtil.unserialize(message.getBody()); //通过设置TestUser的name来测试回调,分别发两条消息,一条UserName为1,一条为2,查看控制台中队列中消息是否被消费 if ("2".equals(testUser.getUserName())){  System.out.println(testUser.toString());  channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } if ("1".equals(testUser.getUserName())){  System.out.println(testUser.toString());  channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true); } } }; }}

topic消费者1

@Configuration@AutoConfigureAfter(RabbitMqConfig.class)public class TopicAmqpConfiguration { @Bean("topicTest1Container") public MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueueNames("TOPICTEST1"); container.setMessageListener(exampleListener1()); container.setAcknowledgeMode(AcknowledgeMode.MANUAL); return container; } @Bean("topicTest1Listener") public ChannelAwareMessageListener exampleListener1(){ return new ChannelAwareMessageListener() { @Override public void onMessage(Message message, Channel channel) throws Exception { TestUser testUser = (TestUser) SerializeUtil.unserialize(message.getBody()); System.out.println("TOPICTEST1:"+testUser.toString()); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } }; }}

topic消费者2

@Configuration@AutoConfigureAfter(RabbitMqConfig.class)public class TopicAmqpConfiguration2 { @Bean("topicTest2Container") public MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueueNames("TOPICTEST2"); container.setMessageListener(exampleListener()); container.setAcknowledgeMode(AcknowledgeMode.MANUAL); return container; } @Bean("topicTest2Listener") public ChannelAwareMessageListener exampleListener() { return new ChannelAwareMessageListener() { @Override public void

以上是“spring boot集成rabbitmq的示例分析”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注编程网行业资讯频道!

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

spring boot集成rabbitmq的示例分析

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

spring boot集成rabbitmq的示例分析

这篇文章主要为大家展示了“spring boot集成rabbitmq的示例分析”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“spring boot集成rabbitmq的示例分析”这篇文章吧。一、
2023-05-30

spring boot集成shiro的示例分析

小编给大家分享一下spring boot集成shiro的示例分析,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!spring boot提供了一个自带的认证框架,同时
2023-05-30

Spring-Boot集成Solr客户端的示例分析

这篇文章主要为大家展示了“Spring-Boot集成Solr客户端的示例分析”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“Spring-Boot集成Solr客户端的示例分析”这篇文章吧。Solr
2023-05-30

Spring Boot Starter的示例分析

这篇文章给大家分享的是有关Spring Boot Starter的示例分析的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。Spring Boot 简介Spring框架功能很强大,但是就算是一个很简单的项目,我们也要
2023-05-31

spring集成httpclient配置的示例分析

这篇文章主要介绍了spring集成httpclient配置的示例分析,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。一、简介HttpClient是Apache Jakarta
2023-06-20

RabbitMQ集群架构的示例分析

这篇文章主要介绍了RabbitMQ集群架构的示例分析,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。一、为什么使用集群?内建集群作为RabbitMQ最优秀的功能之一,它的作用有
2023-06-05

Java中Spring Boot的示例分析

这篇文章主要介绍Java中Spring Boot的示例分析,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!1. 什么是 Spring Boot?Spring Boot 是为 Spring 服务的,是用来简化新 Spri
2023-06-20

spring boot actuator监控的示例分析

这篇文章主要介绍了spring boot actuator监控的示例分析,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。spring boot actuator介绍Spring
2023-06-25

spring boot整合JMS的示例分析

这篇文章将为大家详细讲解有关spring boot整合JMS的示例分析,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。一、安装ActiveMQ具体的安装步骤,请参考我的另一篇文章:https://www.j
2023-05-30

Spring Boot应用开发的示例分析

这篇文章主要介绍了Spring Boot应用开发的示例分析,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。Spring Boot是由Pivotal团队提供的全新Spring开发
2023-06-20

Spring Boot配置方式的示例分析

这篇文章给大家分享的是有关Spring Boot配置方式的示例分析的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。Spring Boot 允许通过外部配置让你在不同的环境使用同一应用程序的代码,简单说就是可以通过配
2023-05-30

Spring Boot发送邮件的示例分析

这篇文章将为大家详细讲解有关Spring Boot发送邮件的示例分析,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。Spring框架使用JavaMailSender接口为发送邮件提供了一个简单的抽象,并且S
2023-05-30

Spring boot外部配置的示例分析

这篇文章给大家分享的是有关Spring boot外部配置的示例分析的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。前言在项目中为了灵活配置,我们常采用配置文件,常见的配置文件就比如xml和properties,sp
2023-05-30

Spring Boot日志控制的示例分析

这篇文章将为大家详细讲解有关Spring Boot日志控制的示例分析,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。Spring Boot对日志的处理,和我们平时的日志处理完全一致,通过logback.xm
2023-05-30

Spring Boot对jdbc支持的示例分析

这篇文章将为大家详细讲解有关Spring Boot对jdbc支持的示例分析,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。项目结构pom.xmlpom.xml:
2023-06-20

Spring Web MVC和Hibernate集成配置的示例分析

这篇文章主要介绍Spring Web MVC和Hibernate集成配置的示例分析,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!添加项目依赖首先我们需要一个Java Web项目,最好使用Maven或Gradle构建工
2023-05-30

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录