我的编程空间,编程开发者的网络收藏夹
学习永远不晚

RabbitMQ使用教程

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

RabbitMQ使用教程

1、RabbitMq简介

RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。
AMQP :Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。

2、队列(Queue)

队列是常用的数据结构之一,是一种特殊的线性表,特殊之处在于它只允许在表的前端(front)进行删除操作,而在表的后端(rear)进行插入操作。进行插入操作的端称为队尾,进行删除操作的端称为对头。

3、消息队列(Message Queue)

消息:计算机/应用 间传送的数据单位,可以非常简单,例如只包含文本字符串,也可以很复杂,可能包含嵌入对象。
消息队列:在消息的传输过程中保存消息的容器

消息传输时,先发送到队列,队列的主要目的是提供路由并保证消息的传递,如果发送消息时接收者不可用,消息队列会保留消息,直到可以成功的传递它。

可以把消息队列理解成快递公司,你需要寄一个物件(消息)给你的朋友,快递公司收到物件会保证物件送到你的朋友手中,可能存在多次寄送才送达成功的情况,比如第一次送过去,你朋友不在家

消息队列中间件是分布式系统中重要的组件
解决 应用耦合异步消息流量削锋 等问题
实现 高性能高可用可伸缩最终一致性

4、RabbitMQ安装及配置

因为RabbitMQ是基于Erlang语言开发的,所以我们要先安装Erlang环境
并保持版本匹配,版本匹配网址:https://www.rabbitmq.com/which-erlang.html

4.1、安装Erlang环境

Erlang下载地址:https://www.erlang.org/downloads
右键 otp_win64_20.2.exe 以管理员身份运行
在这里插入图片描述
傻瓜式安装,一路next
检查环境变量是否存在ERLANG_HOME,若不存在则进行设置
在这里插入图片描述

4.2、安装RabbitMQ

RabbitMQ下载地址:https://www.rabbitmq.com/install-windows.html
安装rabbitmq-server-3.7.4.exe ,傻瓜式安装

注意:不要安装在包含中文和空格的目录下!安装后window服务中就存在rabbitMQ了,并且是启动状态。

安装完成后,执行以下命令激活 管理界面(插件)RabbitMQ Manage Plugin

  1. 进入rabbitMQ安装目录的sbin目录

  2. 点击上方的路径框输入cmd,按下回车键
    在这里插入图片描述

  3. 输入命令点击回车

    rabbitmq-plugins enable rabbitmq_management
  4. 重启服务,双击rabbitmq-server.bat(双击后稍等片刻)
    在这里插入图片描述

  5. 浏览器输入http://127.0.0.1:15672 , 即可看到rabbitmq管理界面的登陆页
    在这里插入图片描述

  6. 输入用户名和密码登录,默认都为guest
    在这里插入图片描述

    最上侧的导航以此是:概览、连接、信道、交换器、队列、用户管理

使用压缩文件安装

  1. 解压下载的文件
  2. 安装插件
  3. 安装服务(以管理员身份打开命令窗口)
    E:\environment\rabbitmq\rabbitmq_server-3.10.9\sbin>rabbitmq-service.bat installE:\environment\Erlang_OTP\erts-13.0\bin\erlsrv: Service RabbitMQ added to system.

到此RabbitMQ的安装就算完成了,其中有几个默认值:
默认的端口号:5672
默认的用户是 guest guest
管理后台的默认端口号:15672

4.3、卸载RabbitMQ

  1. 停止rabbitMq服务(任务管理器停止服务)
  2. 控制面 => 卸载程序 => 卸载rabbitMq,erlang otp
  3. 任务管理器中,查找进程epmd.exe(详细信息)。 如果在运行,右键单击,然后“结束任务”。
  4. 删除RabbitMQ和Erlang的所有安装目录
  5. 使用Everything查找rabbitmq,删除c盘相关文件
  6. 删除文件 C:Windows文件夹下的erlang.cookie文件(如果存在),转到用户文件夹,删除文件.erlang.cookie
  7. 在用户(User)文件夹中,转到AppData,Roaming,RabbitMQ 删除文件夹

5、RabbitMQ概念

在这里插入图片描述

  1. Broker:又称server,它提供一种传输服务,它的角色就是维护一条从生产者到消费者的路线,保证数据能按照指定的方式进行传输,
  2. Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
  3. Queue:消息的载体,每个消息都会被投到一个或多个队列。
  4. Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来.
  5. Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
  6. vhost:虚拟主机,一个broker里可以有多个vhost,用作不同用户的权限分离。
  7. Producer:消息生产者,就是投递消息的程序.
  8. Consumer:消息消费者,就是接受消息的程序.
  9. Channel:消息通道,在客户端的每个连接里,可建立多个channel。几乎所有的操作都在channel中完成

6、支持的消息类型

消息模型:https://www.rabbitmq.com/getstarted.html
RabbitMQ消息类型

6.1、简单模式 Simple

在这里插入图片描述
P(producer/ publisher):生产者,一个发送消息的用户应用程序。
C(consumer):消费者,消费和接收的意思,消费者是一个主要用来等待接收消息的用户应用程序
队列(红色区域):rabbitmq内部类似于邮箱的一个概念。虽然消息流经rabbitmq和你的应用程序,但是它们只能存储在队列中。队列只受主机的内存和磁盘限制,实质上是一个大的消息缓冲区。许多生产者可以发送消息到一个队列,许多消费者可以尝试从一个队列接收数据。

总之:生产者将消息发送到队列,消费者从队列中获取消息,队列是存储消息的缓冲区。

6.2、工作模式 Work

工作模式又称竞争消费者模式
主要思想就是避免执行资源密集型任务时,必须等待它执行完成。相反我们需要稍后完成任务,我们将任务封装为消息并将其发送到队列。 在后台运行的工作进程将获取任务并最终执行作业。当你运行许多消费者时,任务将在他们之间共享,但是一个消息只能被一个消费者获取。
在这里插入图片描述
P:生产者:任务的发布者
C1:消费者1
C2:消费者2

如何避免消息堆积?
1、采用workqueue,多个消费者监听同一队列。
2、接收到消息以后,而是通过线程池,异步消费。

6.3、发布订阅模式(广播机制)(扇型交换机)

1个生产者,多个消费者,每一个消费者都有自己的队列,生产者没有将消息直接发送到队列,而是发送到了交换机,每个队列都要绑定到交换机,生产者发送的消息,经过交换机到达队列,实现一个消息被多个消费者获取的目的( 如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
在这里插入图片描述
交换机(Exchange)【蓝色圆圈】:
一方面接收生产者发送的消息。另一方面知道如何处理消息,如递交给某个特别队列、递交给所有队列、还是将消息丢弃。到底如何操作,取决于交换机的类型。Exchange只负责转发消息,不具备存储消息的能力。

Exchange类型只要由以下三种:

  1. Direct Exchange(直连型交换机):
    根据消息携带的路由键将消息投递给对应队列。
    大致流程为:有一个队列绑定到一个直连交换机上,同时赋予一个路由键 key 。然后当一个消息携带着路由键 key,这个消息通过生产者发送给交换机时,交换机就会根据这个路由值 key 去寻找绑定值也是 key 的队列。

  2. Fanout Exchange(扇型交换机):
    这个交换机没有路由键概念,就算你绑了路由键也是无视的。 这个交换机在接收到消息后,会直接转发到绑定到它上面的所有队列。

  3. Topic Exchange(主题交换机):
    这个交换机其实跟直连交换机流程差不多,但是它的特点就是在它的路由键和绑定键之间是有规则的。

    (星号 * ) 用来表示一个单词 (必须出现的)
    (井号 # ) 用来表示任意数量(零个或多个)单词

    举例:
    队列Q1 绑定键为*.TT.*
    队列Q2绑定键为TT.#
    如果一条消息携带的路由键为 A.TT.B,那么队列Q1将会收到
    如果一条消息携带的路由键为TT.AA.BB,那么队列Q2将会收到

    当一个队列的绑定键为 “#” 的时候,这个队列将会无视消息的路由键,接收所有的消息。
    当 * 和 # 这两个特殊字符都未在绑定键中出现的时候,此时主题交换机就拥有的直连交换机的行为。
    所以主题交换机也就实现了扇形交换机的功能,和直连交换机的功能

除了以上常用交换机,还有Header Exchange(头交换机) ,Default Exchange(默认交换机) 和 Dead Letter Exchange (死信交换机)

6.4、 路由模式(直连交换机)

在广播模式中,生产者发布消息,所有消费者都可以获取所有消息。
在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。
在Direct模型下,队列与交换机的绑定,不能是任意绑定了,而是指定一个Routing Key(路由key)
消息的发送方在向Exchange发送消息时,也必须指定消息的routing key。
在这里插入图片描述
P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
X:Exchange(交换机),接收生产者的消息,然后把消息递交给与routing key完全匹配的队列
C1:消费者,其所在队列指定了需要routing key 为 error 的消息
C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息

在这种情况下,一个消息在布时指定了路由键为error将会只被c1下消耗,路由键为info和error和warning的消息都将被c2消耗,其他的消息都将被丢失。

6.5、主题模式 Topic(主题交换机)

Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。
只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!
在这里插入图片描述

7、消息持久化

目的是为了避免消息丢失

消息丢失:

  1. 消费者的ACK机制。可以防止消费者丢失消息。

    消费者领取消息后,还没执行操作就挂掉了呢?或者抛出了异常?消息消费失败,但是RabbitMQ无从得知,这样消息就丢失了!
    因此,RabbitMQ有一个ACK机制。当消费者获取消息后,会向RabbitMQ发送回执ACK,告知消息已经被接收。不过这种回执ACK分两种情况:
    1、自动ACK:消息一旦被接收,消费者自动发送ACK
    2、手动ACK:消息接收后,不会发送ACK,需要手动调用

  2. 如果在消费者消费之前,MQ就宕机了,消息就没了。

消息持久化,前提是:队列、Exchange都持久化
Exchange持久化
在这里插入图片描述
队列持久化
在这里插入图片描述
消息持久
在这里插入图片描述

8、代码示例

  1. 引入依赖
    org.springframework.bootspring-boot-starter-amqp2.5.13
  2. 修改配置文件
    spring:  # rabbitmq配置  rabbitmq:    host: 127.0.0.1    port: 5672    username: guest    password: guest
    配置文件详解
    spring:  rabbitmq:    host: 127.0.0.1 #ip    port: 5672      #端口    username: guest #账号    password: guest #密码    virtualHost:    #链接的虚拟主机    addresses: 127.0.0.1:5672     #多个以逗号分隔,与host功能一样。    requestedHeartbeat: 60 #指定心跳超时,单位秒,0为不指定;默认60s    publisherConfirms: true  #发布确认机制是否启用    publisherReturns: #发布返回是否启用    connectionTimeout: #链接超时。单位ms。0表示无穷大不超时    ### ssl相关    ssl:      enabled: #是否支持ssl      keyStore: #指定持有SSL certificate的key store的路径      keyStoreType: #key store类型 默认PKCS12      keyStorePassword: #指定访问key store的密码      trustStore: #指定持有SSL certificates的Trust store      trustStoreType: #默认JKS      trustStorePassword: #访问密码      algorithm: #ssl使用的算法,例如,TLSv1.1      verifyHostname: #是否开启hostname验证    ### cache相关    cache:      channel:         size: #缓存中保持的channel数量        checkoutTimeout: #当缓存数量被设置时,从缓存中获取一个channel的超时时间,单位毫秒;如果为0,则总是创建一个新channel      connection:        mode: #连接工厂缓存模式:CHANNEL 和 CONNECTION        size: #缓存的连接数,只有是CONNECTION模式时生效    ### listener    listener:       type: #两种类型,SIMPLE,DIRECT       ## simple类型       simple:         concurrency: #最小消费者数量         maxConcurrency: #最大的消费者数量         transactionSize: #指定一个事务处理的消息数量,最好是小于等于prefetch的数量         missingQueuesFatal: #是否停止容器当容器中的队列不可用         ## 与direct相同配置部分         autoStartup: #是否自动启动容器         acknowledgeMode: #表示消息确认方式,其有三种配置方式,分别是none、manual和auto;默认auto         prefetch: #指定一个请求能处理多少个消息,如果有事务的话,必须大于等于transaction数量         defaultRequeueRejected: #决定被拒绝的消息是否重新入队;默认是true(与参数acknowledge-mode有关系)         idleEventInterval: #container events发布频率,单位ms         ##重试机制         retry:            stateless: #有无状态           enabled:  #是否开启           maxAttempts: #最大重试次数,默认3           initialInterval: #重试间隔           multiplier: #对于上一次重试的乘数           maxInterval: #最大重试时间间隔       direct:         consumersPerQueue: #每个队列消费者数量         missingQueuesFatal:         #...其余配置看上方公共配置     ## template相关     template:       mandatory: #是否启用强制信息;默认false       receiveTimeout: #`receive()`接收方法超时时间       replyTimeout: #`sendAndReceive()`超时时间       exchange: #默认的交换机       routingKey: #默认的路由       defaultReceiveQueue: #默认的接收队列       ## retry重试相关       retry:          enabled: #是否开启         maxAttempts: #最大重试次数         initialInterval: #重试间隔         multiplier: #失败间隔乘数         maxInterval: #最大间隔

8.1、简单模式

8.1.1、配置类
import org.springframework.amqp.core.Queue;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class RabbitMqConfig {        public static final String QUEUE_NAME = "my_queue"; //队列名称        @Bean    public Queue simpleQueue() {               return new Queue(QUEUE_NAME, true, false, false, null);    }}
8.1.2、消费者
import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import java.util.Map;@Component@RabbitListener(queues = "my_queue")//监听的队列名称//@RabbitListener(queuesToDeclare = @Queue("simple_queue"))  //如果simple_queue队列不存在,则创建simple_queue队列。默认队列是持久化,非独占式的public class SimpleConsumer {//消费者如果监听到消息队列有消息传入,则会自动消费    @RabbitHandler    public void receive(Map message) {        System.out.println("简单模式 -> 消费者收到map类型消息  : " + testMessage.toString());    }     @RabbitHandler    public void receive2(String message) {        System.out.println("简单模式 -> 消费者收到string类型消息  : " + testMessage.toString());    }}

@RabbitListener 注解属性的作用:
queuesToDeclare:如果 simple_queue 队列不存在,则会自动创建simple_queue队列。默认队列是持久化,非独占式的
queues:里面的队列必须存在,否则就会报错:
@RabbitListener(queues = {“simple_queue2”}),如果队列 simple_queue2 不存在,那么启动消费者就会报错
注意:
@RabbitListener 既可以标记在类上,也可以标记在方法上
标记在类上: 需配合 @RabbitHandler 注解一起使用。当有收到消息的时候,就交给 @RabbitHandler 的方法处理,具体使用哪个方法处理,根据 MessageConverter 转换后的参数类型(入参类型进行决定)
标记在方法上: 就由指定的方法进行处理

8.1.3、生产者
@Controller@RequestMapping(value = "simple")public class SimpleProducer {    @Autowired    RabbitTemplate rabbitTemplate;  //使用RabbitTemplate,这提供了接收/发送等等方法        @RequestMapping(value = "sendMsg")    @ResponseBody    public String send(String messageId,String messageData) {        SimpleDateFormat simpleDateFormat  = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");        String createTime = simpleDateFormat.format(new Date());        Map map=new HashMap<>();        map.put("messageId",messageId);        map.put("messageData",messageData);        map.put("createTime",createTime);        //将消息发送到队列my_queue中        rabbitTemplate.convertAndSend("my_queue", map);        //receive将接收到消息        System.out.println("rabbitMQ 简单模式消息发送成功!");        return "true";    }    }
8.1.3、消息手动确认
  1. yml文件添加配置

    listener:  simple:    concurrency: 1    max-concurrency: 1    acknowledge-mode: manual    prefetch: 1
  2. 消费者代码修改

    @Componentpublic class SimpleConsumer {    @RabbitListener(queues = "my_queue")//监听的队列名称    public void process(Message message, Channel channel) throws IOException {        String str = new String(message.getBody());        JSONObject msgData = (JSONObject) JSON.parse(str);        Object messageId = msgData.get("messageId");        if (null==messageId || messageId.toString().equals("")) {                        channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);            System.out.println("简单模式 -> 消费者拒收消息 : " + msgData.toString());        }else {                        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);            System.out.println("简单模式 -> 消费者收到消息  : " + msgData.toString());        }    }}

    消息确认机制:
    ① 自动确认
    这也是默认的消息确认情况。 AcknowledgeMode.NONE

    RabbitMQ成功将消息发出(即将消息成功写入TCP Socket)中立即认为本次投递已经被正确处理,不管消费者端是否成功处理本次投递。
    所以这种情况如果消费端消费逻辑抛出异常,也就是消费端没有处理成功这条消息,那么就相当于丢失了消息。一般这种情况我们都是使用try catch捕捉异常后,打印日志用于追踪数据,这样找出对应数据再做后续处理。

    ③ 手动确认
    这个比较关键,也是我们配置接收消息确认机制时,多数选择的模式。
    消费者收到消息后,手动调用basic.ack/basic.nack/basic.reject后,RabbitMQ收到这些消息后,才认为本次投递成功。
    basic.ack用于肯定确认
    basic.nack用于否定确认(注意:这是AMQP 0-9-1的RabbitMQ扩展)
    basic.reject用于否定确认,但与basic.nack相比有一个限制:一次只能拒绝单条消息

    channel.basicReject(deliveryTag, true) : 拒绝消费当前消息
    第一个参数是当前消息在队列中的的索引
    第二参数传入true,就是将数据重新丢回队列里,那么下次还会消费这消息。
    第二参数传入false,就是告诉服务器,我已经知道这条消息数据了,因为一些原因拒绝它,而且服务器也把这个消息丢掉就行。 下次不想再消费这条消息了。

    使用拒绝后重新入列这个确认模式要谨慎,因为一般都是出现异常的时候,catch异常再拒绝入列,选择是否重入列。如果使用不当会导致一些每次都被你重入列的消息一直消费-入列-消费-入列这样循环,会导致消息积压。

    channel.basicNack(deliveryTag, false, true) : 否定消费确认
    第一个参数是当前消息在队列中的的索引
    第二个参数是指是否针对多条消息;如果是true,也就是说一次性针对当前通道的消息的tagID小于当前这条消息的,都拒绝确认。
    第三个参数是指是否重新入列,也就是指不确认的消息是否重新丢回到队列里面去。

    同样使用不确认后重新入列这个确认模式要谨慎,因为这里也可能因为考虑不周出现消息一直被重新丢回去的情况,导致积压。

8.1.4、消息回调

ConfirmCallback: 当消息到达交换机触发回调
ReturnsCallback:消息(带有路由键routingKey)到达交换机,与交换机的所有绑定键进行匹配,触发回调
若要使用消息回调,

  1. 修改配置
    publisher-confirm-type: correlatedpublisher-returns: true
  2. 设置mandatory
    设置rabbitTemplate的mandatory为true 或者在配置中设置 rabbitmq.template.mandatory=true

配置文件新增代码:

@Beanpublic RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {     RabbitTemplate rabbitTemplate = new RabbitTemplate();     rabbitTemplate.setConnectionFactory(connectionFactory);     //设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数     rabbitTemplate.setMandatory(true);     //确认消息发送到交换机     rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {         @Override         public void confirm(CorrelationData correlationData, boolean ack, String cause) {             System.out.println("=======================> ConfirmCallback <=======================");             System.out.println("ConfirmCallback ===>"+"相关数据:"+correlationData);             System.out.println("ConfirmCallback ===>"+"确认情况:"+ack);             System.out.println("ConfirmCallback ===>"+"原因:"+cause);         }     });     //确认消息已发送到队列     rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {         @Override         public void returnedMessage(ReturnedMessage returnedMessage) {             Message message = returnedMessage.getMessage();             int replyCode = returnedMessage.getReplyCode();             String replyText = returnedMessage.getReplyText();             String exchange = returnedMessage.getExchange();             String routingKey = returnedMessage.getRoutingKey();             System.out.println("=======================> ReturnsCallback <=======================");             System.out.println("ReturnCallback ===>"+"消息:"+message.toString());             System.out.println("ReturnCallback ===>"+"回应码:"+replyCode);             System.out.println("ReturnCallback ===>"+"回应信息:"+replyText);             System.out.println("ReturnCallback ===>"+"交换机:"+exchange);             System.out.println("ReturnCallback ===>"+"路由键:"+routingKey);         }     });     return rabbitTemplate; }

8.2、工作模式

运行许多消费者,任务在他们之间共享,但是一个消息只能被一个消费者获取。
设置prefetchCount值为1。 这告诉RabbitMQ一次不要向消费者发送多于一条的消息。
换句话说,不要向消费者发送新消息,直到它处理并确认了前一个消息。 相反,它会将其分派给不是忙碌的下一个消费者。

当有多个消费者时,我们的消息会被哪个消费者消费呢,我们又该如何均衡消费者消费信息的多少呢?
主要有两种模式:
轮询模式的分发:一个消费者一条,按均分配(关闭手动应答,开启自动应答)
公平分发:根据消费者的消费能力进行公平分发,处理快的处理的多,处理慢的处理的少;按劳分配(关闭自动应答,开启手动应答)

  1. yml文件添加 prefetch
    listener:  simple:     prefetch: 1
  2. 创建两个消费者(生产者,配置类不变)
    @Componentpublic class SimpleConsumer {    @RabbitListener(queues = "my_queue")//监听的队列名称    public void process(Message message, Channel channel) throws IOException, InterruptedException {        String str = new String(message.getBody());        JSONObject msgData = (JSONObject) JSON.parse(str);        Object messageId = msgData.get("messageId");        if (null==messageId || messageId.toString().equals("")) {                        channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);            System.out.println("简单模式 -> 消费者拒收消息 : " + msgData.toString());        }else {                        Thread.sleep(2000);            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);            System.out.println("简单模式 process -> 消费者收到消息  : " + msgData.toString());        }    }    @RabbitListener(queues = "my_queue")//监听的队列名称    public void process2(Message message, Channel channel) throws IOException, InterruptedException {        String str = new String(message.getBody());        JSONObject msgData = (JSONObject) JSON.parse(str);        Object messageId = msgData.get("messageId");        if (null==messageId || messageId.toString().equals("")) {                        channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);            System.out.println("简单模式 -> 消费者拒收消息 : " + msgData.toString());        }else {                        Thread.sleep(5000);            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);            System.out.println("简单模式 process2 -> 消费者收到消息  : " + msgData.toString());        }    }}

8.3、订阅模型-Fanout

8.3.1、配置类
import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.FanoutExchange;import org.springframework.amqp.core.Queue;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class FanoutRabbitMqConfig {    //队列名称常量    public static final String QUEUE_NAME1 = "fanout_queue1";    public static final String QUEUE_NAME2 = "fanout_queue2";    public static final String QUEUE_NAME3 = "fanout_queue3";    //交换机名称常量    public static final String EXCHANGE_NAME = "fanout_exchange";        @Bean    public Queue fanoutQueue1() {        return new Queue(QUEUE_NAME1, true, false, false, null);    }        @Bean    public Queue fanoutQueue2() {        return new Queue(QUEUE_NAME2, true, false, false, null);    }        @Bean    public Queue fanoutQueue3() {        return new Queue(QUEUE_NAME3, true, false, false, null);    }        @Bean    FanoutExchange fanoutExchange() {        return new FanoutExchange(EXCHANGE_NAME,true,false,null);    }        @Bean    Binding bindingExchange1() {        return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());    }    @Bean    Binding bindingExchange2() {        return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());    }    @Bean    Binding bindingExchange3() {        return BindingBuilder.bind(fanoutQueue3()).to(fanoutExchange());    }}
8.3.2、生产者

声明Exchange,不再声明Queue
发送消息到Exchange,不再发送到Queue

import com.alibaba.fastjson.JSONObject;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Controller;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.ResponseBody;import java.text.SimpleDateFormat;import java.util.Date;@Controller@RequestMapping(value = "fanout")public class FanoutProducer {    @Autowired    RabbitTemplate rabbitTemplate;        @RequestMapping(value = "sendMsg")    @ResponseBody    public String send(String messageId,String messageData) {        SimpleDateFormat simpleDateFormat  = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");        String createTime = simpleDateFormat.format(new Date());        JSONObject jsonObject = new JSONObject();        jsonObject.put("messageId",messageId);        jsonObject.put("messageData",messageData);        jsonObject.put("createTime",createTime);        //将消息发送到队列my_queue中        rabbitTemplate.convertAndSend("fanout_exchange",null, jsonObject.toString());        System.out.println("rabbitMQ 简单模式消息发送成功!");        return "true";    }}
8.3.3、消费者
import com.rabbitmq.client.Channel;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import java.io.IOException;@Componentpublic class FanoutReceiver {    @RabbitListener(queues = "fanout_queue1")    public void receive1(String msgData, Message message, Channel channel) throws IOException {        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);        System.out.println("fanout_queue1消费者收到消息  : " +msgData);    }    @RabbitListener(queues = "fanout_queue2")    public void receive2(String msgData, Message message, Channel channel) throws IOException {        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);        System.out.println("fanout_queue2消费者收到消息  : " +msgData);    }    @RabbitListener(queues = "fanout_queue3")    public void receive3(String msgData, Message message, Channel channel) throws IOException {        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);        System.out.println("fanout_queue3消费者收到消息  : " +msgData);    }}
8.3.4、发送消息后台结果
rabbitMQ 简单模式消息发送成功!fanout_queue3消费者收到消息  : {"createTime":"2022-10-06 23:02:29","messageId":"55","messageData":"bhhh"}fanout_queue1消费者收到消息  : {"createTime":"2022-10-06 23:02:29","messageId":"55","messageData":"bhhh"}fanout_queue2消费者收到消息  : {"createTime":"2022-10-06 23:02:29","messageId":"55","messageData":"bhhh"}rabbitMQ 简单模式消息发送成功!fanout_queue2消费者收到消息  : {"createTime":"2022-10-06 23:03:03","messageId":"66","messageData":"就立刻"}fanout_queue1消费者收到消息  : {"createTime":"2022-10-06 23:03:03","messageId":"66","messageData":"就立刻"}fanout_queue3消费者收到消息  : {"createTime":"2022-10-06 23:03:03","messageId":"66","messageData":"就立刻"}

8.4、订阅模型-Direct

8.4.1、配置类
import org.springframework.amqp.core.*;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class DirectRabbitMqConfig {    //队列名称常量    public static final String QUEUE_NAME1 = "direct_queue1";    public static final String QUEUE_NAME2 = "direct_queue2";    public static final String QUEUE_NAME3 = "direct_queue3";    //交换机名称常量    public static final String EXCHANGE_NAME = "direct_exchange";        @Bean    public Queue directQueue1() {        return new Queue(QUEUE_NAME1, true, false, false, null);    }        @Bean    public Queue directQueue2() {        return new Queue(QUEUE_NAME2, true, false, false, null);    }        @Bean    public Queue directQueue3() {        return new Queue(QUEUE_NAME3, true, false, false, null);    }        @Bean    DirectExchange directExchange() {        return new DirectExchange(EXCHANGE_NAME,true,false,null);    }        @Bean    Binding bindingDirectExchange1() {        return BindingBuilder.bind(directQueue1()).to(directExchange()).with("info");    }    @Bean    Binding bindingDirectExchange2() {        return BindingBuilder.bind(directQueue2()).to(directExchange()).with("error");    }    @Bean    Binding bindingDirectExchange3() {        return BindingBuilder.bind(directQueue3()).to(directExchange()).with("warn");    }}
8.4.2、生产者
import com.alibaba.fastjson.JSONObject;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Controller;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.ResponseBody;import java.text.SimpleDateFormat;import java.util.Date;@Controller@RequestMapping(value = "direct")public class DirectProducer {    @Autowired    RabbitTemplate rabbitTemplate;        @RequestMapping(value = "sendMsg")    @ResponseBody    public String send(String messageId,String messageData,String routingKey) {        SimpleDateFormat simpleDateFormat  = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");        String createTime = simpleDateFormat.format(new Date());        JSONObject jsonObject = new JSONObject();        jsonObject.put("messageId",messageId);        jsonObject.put("messageData",messageData);        jsonObject.put("createTime",createTime);        //将消息发送到交换机        rabbitTemplate.convertAndSend("direct_exchange",routingKey, jsonObject.toString());        System.out.println("rabbitMQ 简单模式消息发送成功!");        return "true";    }}
8.4.3、消耗者
import com.rabbitmq.client.Channel;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import java.io.IOException;@Componentpublic class DirectReceiver {    @RabbitListener(queues = "direct_queue1")    public void receive1(String msgData, Message message, Channel channel) throws IOException {        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);        System.out.println("direct_queue1消费者收到消息  : " +msgData);    }    @RabbitListener(queues = "direct_queue2")    public void receive2(String msgData, Message message, Channel channel) throws IOException {        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);        System.out.println("direct_queue2消费者收到消息  : " +msgData);    }    @RabbitListener(queues = "direct_queue3")    public void receive3(String msgData, Message message, Channel channel) throws IOException {        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);        System.out.println("direct_queue3消费者收到消息  : " +msgData);    }}
8.4.4、发送消息后台结果
消息发送成功!路由键:warndirect_queue3消费者收到消息  : {"createTime":"2022-10-07 11:31:45","messageId":"1","messageData":"sacascsac"}消息发送成功!路由键:infodirect_queue1消费者收到消息  : {"createTime":"2022-10-07 11:31:54","messageId":"1","messageData":"sacascsac"}消息发送成功!路由键:errordirect_queue2消费者收到消息  : {"createTime":"2022-10-07 11:31:59","messageId":"1","messageData":"sacascsac"}消息发送成功!路由键:error1 (消息被丢弃)

8.5、订阅模型-Topic

8.5.1、配置类
import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.Queue;import org.springframework.amqp.core.TopicExchange;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class TopicRabbitMqConfig {    //队列名称常量    public static final String QUEUE_NAME1 = "topic_queue1";    public static final String QUEUE_NAME2 = "topic_queue2";    public static final String QUEUE_NAME3 = "topic_queue3";    //交换机名称常量    public static final String EXCHANGE_NAME = "topic_exchange";        @Bean    public Queue topicQueue1() {        return new Queue(QUEUE_NAME1, true, false, false, null);    }        @Bean    public Queue topicQueue2() {        return new Queue(QUEUE_NAME2, true, false, false, null);    }        @Bean    public Queue topicQueue3() {        return new Queue(QUEUE_NAME3, true, false, false, null);    }        @Bean    TopicExchange topicExchange() {        return new TopicExchange(EXCHANGE_NAME,true,false,null);    }        @Bean    Binding bindingTopicExchange1() {        //消息携带的路由键是以"topic."开头,就会分发到该队列        return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("topic.#");    }    @Bean    Binding bindingTopicExchange2() {        //消息携带的路由键是包含.topic.,就会分发到该队列        return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("*.topic.*");    }    @Bean    Binding bindingTopicExchange3() {        //消息携带的路由键是以".topic"结尾,就会分发到该队列        return BindingBuilder.bind(topicQueue3()).to(topicExchange()).with("#.topic");    }}
8.5.2、生产者
import com.alibaba.fastjson.JSONObject;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Controller;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.ResponseBody;import java.text.SimpleDateFormat;import java.util.Date;@Controller@RequestMapping(value = "topic")public class TopicProducer {    @Autowired    RabbitTemplate rabbitTemplate;        @RequestMapping(value = "sendMsg")    @ResponseBody    public String send(String messageId,String messageData,String routingKey) {        SimpleDateFormat simpleDateFormat  = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");        String createTime = simpleDateFormat.format(new Date());        JSONObject jsonObject = new JSONObject();        jsonObject.put("messageId",messageId);        jsonObject.put("messageData",messageData);        jsonObject.put("createTime",createTime);        //将消息发送到队列my_queue中        rabbitTemplate.convertAndSend("topic_exchange",routingKey, jsonObject.toString());        System.out.println("消息发送成功!路由键:"+routingKey);        return "true";    }}
8.5.3、消费者
import com.rabbitmq.client.Channel;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import java.io.IOException;@Componentpublic class TopicReceiver {    @RabbitListener(queues = "topic_queue1")    public void receive1(String msgData, Message message, Channel channel) throws IOException {        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);        System.out.println("topic_queue1消费者收到消息  : " +msgData);    }    @RabbitListener(queues = "topic_queue2")    public void receive2(String msgData, Message message, Channel channel) throws IOException {        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);        System.out.println("topic_queue2消费者收到消息  : " +msgData);    }    @RabbitListener(queues = "topic_queue3")    public void receive3(String msgData, Message message, Channel channel) throws IOException {        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);        System.out.println("topic_queue3消费者收到消息  : " +msgData);    }}
8.5.4、发送消息后台结果
消息发送成功!路由键:topic.onetopic_queue1消费者收到消息  : {"createTime":"2022-10-07 12:05:17","messageId":"1","messageData":"sacascsac"}消息发送成功!路由键:A.topic.Btopic_queue2消费者收到消息  : {"createTime":"2022-10-07 12:05:46","messageId":"1","messageData":"sacascsac"}消息发送成功!路由键:C.topictopic_queue3消费者收到消息  : {"createTime":"2022-10-07 12:06:02","messageId":"1","messageData":"sacascsac"}消息发送成功!路由键:SFC.topic.AFBGBtopic_queue2消费者收到消息  : {"createTime":"2022-10-07 12:06:52","messageId":"1","messageData":"sacascsac"}消息发送成功!路由键:S.D.FC.topic.A.S.D.F (消息被丢弃)

9、一个简单的消息推送到接收的流程

在这里插入图片描述
生产者产生消息,将消息推送到中间方框里面也就是rabbitMq的服务器,然后经过服务器里面的交换机、队列等各种关系将数据处理入列后,最终由右边的消费者获取对应监听的消息进行消耗处理。

来源地址:https://blog.csdn.net/weixin_45486926/article/details/127170831

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

RabbitMQ使用教程

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

RabbitMQ使用教程

1、RabbitMq简介 RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。 AMQP :Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此
2023-08-18

centos8 使用yum 安装 rabbitmq的教程

进入/etc/yum.repos.d/ 文件夹 创建rabbitmq-erlang.repo 文件 内容如下[rabbitmq-erlang] name=rabbitmq-erlang baseurl=https://dl.bintray
2022-06-04

Laravel 队列入门教程 (RabbitMQ)

假设你已经学过了基础 laravel任务和队列 和本系列的第一部分, 我们学习使用不同的队列连接(除了数据库), 如何为不同的任务来使
2023-02-09

Webman使用RabbitMQ消息中间件实现系统异步解耦实战教程

AMQP 即 Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,是应用层协议的一个开放标准,为面向消息的中间件设计。

使用 Docker 搭建 RabbitMQ

compose fileversion: "3.4"services: rabbitmq-docker: image: rabbitmq:3.8.2-management container_name: rabbitmq-docker ho
使用 Docker 搭建 RabbitMQ
2020-05-18

laravel如何使用RabbitMQ

这篇文章主要介绍“laravel如何使用RabbitMQ”,在日常操作中,相信很多人在laravel如何使用RabbitMQ问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”laravel如何使用RabbitMQ
2023-06-22

RabbitMQ的使用场景

RabbitMQ的主要功能是接收、存储和转发消息,可以很好地解耦发送者和接收者之间的关系,提高系统的可靠性和可扩展性。它被广泛应用于分布式系统、微服务架构、大数据处理等领域。

GNS3使用教程

经常有人在技术交流群里面,要教程。我也看了些前辈录的视频都很不错,就是时间录的有点长,文件大。为了让新手更快的熟悉使用GNS3模拟器,学习cisco的技术,我深夜为大家奉献GNS3使用教程,本教程优点:文件小7M不到,时间是17分钟,80%
2023-01-31

loadrunner使用教程

LoadRunner是一种性能测试工具,被广泛用于测试Web、移动和企业应用程序的性能和负载。以下是一个简单的LoadRunner使用教程:1. 安装LoadRunner:从官方网站下载LoadRunner并进行安装。2. 创建新项目:打开
2023-09-14

openstack使用教程

以下是一个简单的OpenStack使用教程:1. 安装OpenStack:首先,你需要在一台服务器或多台服务器上安装OpenStack。你可以选择使用OpenStack官方提供的安装工具,如OpenStack-Ansible、TripleO
2023-10-11

Seata使用教程

文章目录 一、Seata简介1.Seata 概念介绍2.分布式事务3.Seata核心组件4.Seata 工作流程5.Seata四大模式二、Seata实战教程1.下载资源2.配置Seata-Server3.增加相关表结构4.代码配置三
2023-08-16

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录