我的编程空间,编程开发者的网络收藏夹
学习永远不晚

SpringBoot2实现MessageQueue消息队列

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

SpringBoot2实现MessageQueue消息队列

什么是消息队列

通常说的消息队列,简称MQ(Message Queue),指的就是消息中间件。简单理解为一个使用队列来通信的组件,本质上就是个转发器,包含发消息,存消息,消费消息的过程。

一、异步与同步

1.1 同步通讯与异步通讯

  • 同步通讯:时效性强。比如视频电话,实时传到对方,同时对方出回应。
  • 异步通讯:比如网络聊天,非实时反馈的,不会立即得到结果,可以之后再回复。

1.2 同步调用的问题

微服务基于Feign的调用就是同步的方式。

以购物场景为例

但是如果要加业务就需要为支付服务加业务,改动其代码,耦合度高。

同时,同步调用,要等待服务结束后,在进行下一个服务。支付总耗时,是支付服务依次调用服务耗时的时间和,耗时过长。

此外,如果仓储服务挂掉了,支付服务就会被卡在那里。当过多的支付服务都卡在那里,于是资源耗尽,支付服务也挂掉了。

问题:

  • 耦合度高
  • 性能下降
  • 资源浪费
  • 级联失败

1.3 异步调用方案

异步调用常见的就是事件驱动模式

当支付服务告知了Broker后,就可以继续自己的事情了,而不需要等待。

优势:

  • 代码解耦合:不需要改动支付服务,只需要让服务订阅或者取消订阅Broker即可。
  • 耗时减少了:只计算支付服务和通知Broker的时间。
  • 不存在级联失败的问题,仓储服务挂了不再影响支付服务。
  • 流量消峰:当流量过大时,请求排在Broker中,服务能做几个就做几个,做不了的就排着。

缺点:

  • Broker挂了也会出问题,依赖于Broker的可靠性,安全性,吞吐能力
  • 架构复杂了,业务没有明显的流程线,不好追踪管理

二、MQ消息队列

在上述的结构中,就是Broker。

常用的MQ有几种实现。

RabbitMQActiveMQRocketMQKafaka
公司/社区RabbitApache阿里Apache
开发语言ErlangJavaJavaScala&Java
协议支持AMQP、XMPP、SMTP、STOMPOpenWire、STOMP、REST、XMPP、AMQP自定义协议自定义协议
可用性一般
单机吞吐量一般非常高
消息延迟微秒级毫秒级毫秒级毫秒以内
消息可靠性一般一般
  • 一般中小型公司,用的就是RabbitMQ。
  • 如果大型企业,做深度定制,可以用RocketMQ
  • Kafaka则是用于大量数据情况下的处理,但安全可靠性相对较差。
  • ActiveMQ是很早的消息队列,如今几乎没有维护。

2.1 单机部署MQ

通过docker部署最简单,

docker pull rabbitmq:3-management

也可以用命令安装,这里直接用容器了。

启动信息如下

docker run -e RABBITMQ_DEFAULT_USER=yjx23332 -e RABBITMQ_DEFAULT_PASS=123456 -v mq-plugins:/plugins --name mq --hostname mq1 -p 15672:15672 -p 5672:5672 -d rabbitmq:3-management

-e 为设置环境变量
两个端口,15672是管理平台端口,5672是发送消息的端口。

记得开放对应端口。如果是腾讯云或者阿里云,也要在购买的服务器管理页面打开放行端口。

firewall-cmd --zone=public --add-port=15672/tcp --permanent
firewall-cmd --zone=public --add-port=5672/tcp --permanent
firewall-cmd --reload
查询端口是否开放

firewall-cmd --query-port=15672/tcp 查看某个端口firewall-cmd --zone=public --list-ports 查看所有

登陆成功后,即可进入以下界面。

我们可在这里为添加用户和角色

virtualhosts虚拟主机:对不用户进行隔离,避免相互影响。
此处可以添加

点击用户,可以配置其虚拟主机权限等。

此处设置交换机

2.2 结构和概念

使用消息队列中消息的对象。我们称之为消费者。

在一个virtualhost下:

2.3 常见的消息模型

基本消息队列BasicQueue:最简单的实现

工作消息队列WorkQueue:在工作者之间分配任务

发布订阅带有交换机,分为:

Fanout Exchange:广播,发布订阅(publish/subscribe):一次性向读个消费者发送消息。

Direct Exchange:路由(Routing):有选择的接收消息

Topic Exchange:主题 (Topics):根据主题接收消息。

请求回复模型(RPC):收到请求然后答复。

发布者确认模式(Publisher Confirms):会让发布者知道发送是否成功。

三、SpringAMQP

3.1 用非自动装配的方式使用消息队列

需要在项目中引入AMQP,记得加入父类spring-boot-starter-parent

		<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

引入Junit方便测试

		<dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <scope>test</scope>
        </dependency>

测试一下MQ

我们创建如下两个子项目。

为test写一个测试用例

package com.yjx23332.mq.helloworld;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.TimeoutException;



public class PublisherTest {
    @Test
    public void testSendMessage() throws IOException, TimeoutException{
        //1.建立连接
        ConnectionFactory factory = new ConnectionFactory();
        //1.2.设置连接参数
        factory.setHost("IP");//MQ地址设置
        factory.setPort(5672);//端口设置
        factory.setVirtualHost("/");//设置虚拟主机
        factory.setUsername("账号");
        factory.setPassword("密码");
        //1.2 建立连接
        Connection connection = factory.newConnection();

        //2.创建通道
        Channel channel = connection.createChannel();

        //3.创建消息队列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName,false,false,false,null);

        //4.发送消息
        String message = "hello,rabbitmq!";
        channel.basicPublish("",queueName,null,message.getBytes());
        System.out.println("已发送消息:【"+message+"】");

        //5.关闭通道
        if(channel != null){
            channel.close();
        }
        if(connection != null){
            connection.close();
        }
    }
}

我们在发送消息前打上断点,用junit运行,就可以看到连接创建和通道创建.。

因为发完就不管了,因此必须打断点,才看得到连接和通道。

完成后可以看到队列中

接下来我们处理消息,基本一样,只需要修改几个部分。

注意我们没有关闭连接,因为在业务中,要一直处理。

package com.yjx23332.mq.helloworld;

import com.rabbitmq.client.*;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.TimeoutException;


public class ConsumerTest {
    @Test
    public static void main(String[] args) throws IOException, TimeoutException{
        //1.建立连接
        ConnectionFactory factory = new ConnectionFactory();
        //1.2.设置连接参数
        factory.setHost("101.43.65.53");//MQ地址设置
        factory.setPort(5672);//端口设置
        factory.setVirtualHost("/");//设置虚拟主机
        factory.setUsername("yjx23332");
        factory.setPassword("123456");
        //1.2 建立连接
        Connection connection = factory.newConnection();

        //2.创建通道
        Channel channel = connection.createChannel();

        //3.创建消息队列
        //为什么这里也要创建?避免消费者先执行,还没有队列。同时,相同的队列创建重复执行没有影响。
        String queueName = "simple.queue";
        channel.queueDeclare(queueName,false,false,false,null);

        //4.处理消息
        String message = "hello,rabbitmq!";
        //DefaultConsumer 是回调函数,一旦有消息,异步处理
        channel.basicConsume(queueName,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties,byte[] body)throws  IOException{
                System.out.println("接收到消息:【"+ new String (body)+"】");
            }
        });
        System.out.println("####################等待接收消息##################");
//
//        //5.关闭通道
//        if(channel != null){
//            channel.close();
//        }
//        if(connection != null){
//            connection.close();
//        }
    }
}

结果如下

可以看到,因为回调的原因,后面的输出先执行

队列中消息处理完毕

3.2 SpringAMQP介绍

AMQP:Advanced Message Queuing Protocol:高级消息队列协议。于应用程序之间传递业务消息的开放标准。

Spring AMQP:基于AMQP协议的一套API规范,提供模板来发送和接收消息。其中Spring-amqp是基础抽象,Spring-rabbit是底层的默认实现。可参考Spring AMQP官网。

3.3 基础消息队列功能使用

导入依赖

		<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

为了方便测试,我们引入SpringBoot单元测试

	<dependency>
  		<groupId>org.springframework.boot</groupId>
  		<artifactId>spring-boot-starter-test</artifactId>
  		<scope>test</scope>
	</dependency>
	<dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
    	<scope>test</scope>
    </dependency>

然后准备一个yml文件,配置和之前用代码写得相似。

spring:
  rabbitmq:
    host: 
    port: 5672
    virtual-host: /
    username: 
    password: 

我们直接走单元测试,这里就不创建一个队列了,直接放消息。

package com.yjx23332.mq.helloworld;


import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.io.IOException;
import java.util.concurrent.TimeoutException;


@SpringBootTest
@RunWith(SpringRunner.class)
public class PublisherTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void testSimpleQueue() throws IOException, TimeoutException{
        rabbitTemplate.convertAndSend("simple.queue","hello,spring amqp");
    }
}

接下来为消费者建一个监听器(记得配置yml文件)

package com.yjx23332.mq.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class SpringRabbitListener {
    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueueMessage(String msg) throws InterruptedException{
        System.out.println("spring 消费者接收到消息:【"+msg+"】");
    }
}

消息一旦消费,就会被移除,Rabbit MQ不存在回溯功能。

3.4 工作队列的配置

一个队列绑定多个消费者。

我们准备发送50条消息

package com.yjx23332.mq.helloworld;


import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;



@SpringBootTest
@RunWith(SpringRunner.class)
public class PublisherTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void testSimpleQueue() throws InterruptedException{
        for(int i = 0;i < 50;i++){
            rabbitTemplate.convertAndSend("simple.queue","hello,spring amqp___" + i);
            Thread.sleep(20);
        }
    }
}

修改消费者

package com.yjx23332.mq.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.time.LocalTime;

@Component
public class SpringRabbitListener {
    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueueMessage(String msg) throws InterruptedException{
        System.out.println("spring 消费者接收到消息:【"+msg+"】" + LocalTime.now());
        Thread.sleep(20);
    }
    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueueMessage2(String msg) throws InterruptedException{
        System.err.println("spring 消费者接收到消息:【"+msg+"】"+ LocalTime.now());
        Thread.sleep(200);
    }
}

从结果会发现处理总时长超过了1秒达到了5秒,查看输出会发现消息被平均分配给了两个。一个处理偶数,一个处理奇数。但由于处理速度不同,因此处理总时长超过了1秒。

这里是因为消费预取导致的,在执行前会提前把消息从队列拿出,然后各自处理。

但我们希望的是,做的快的多做,做的慢的少做。

因此我们可以修改yml文件:

spring:
  rabbitmq:
    host: 
    port: 5672
    virtual-host: /
    username: 
    password: 
    listener:
      simple:
        prefetch:  1 # 每次只能获取几条消息,执行完了再取下一条,默认是无限

重启后再次执行就会发现正常了。

3.5 发布与订阅模式

我们需要将同一消息发送给多个消费者。需要加入交换机来实现。注意,交换机只负责消费路由,但不存储消息,丢失一概不负责。

3.5.1 SpringAMQP交换机类

3.5.2 Fanout Exchange

我们在consumer服务中声明Exchange、Queue、Binding.

package com.yjx23332.mq.confg;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutConfig {
    //声明FanoutExchange交换机
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("yjx23332.fanout");
    }
    //声明一个队列
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fanout.queue1");
    }
    //绑定队列和交换机
    @Bean
    public Binding bindingQueue1(Queue fanoutQueue1,FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }
    //声明第二个队列
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanout.queue2");
    }
    //绑定第二个队列和交换机
    @Bean
    public Binding bindingQueue2(Queue fanoutQueue2,FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
}

运行后,会看到:

修改监听器

package com.yjx23332.mq.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


@Component
public class MQlistener {
    @RabbitListener(queues = "fanout.queue1")
    public void listenFanoutQueue1(String msg){
        System.out.println("spring 消费者接收q1到消息:【"+msg+"】");
    }
    @RabbitListener(queues = "fanout.queue2")
    public void listenFanoutQueue2(String msg){
        System.err.println("spring 消费者接收到q2消息:【"+msg+"】");
    }
}

我们再修改publisher的测试代码

package com.yjx23332.mq;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@SpringBootTest
@RunWith(SpringRunner.class)
public class test {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void testFanoutExchange(){
        String exchangeName = "yjx23332.fanout";
        String message = "hello world!";
        rabbitTemplate.convertAndSend(exchangeName,"",message);
    }
}

启动

3.5.3 DirectExchange

将接收到的消息根据规则路由到指定的Queue,因此称为路由模式(routes)。

  • 每一个Queue都与Exchange设置一个BindingKey
  • 发布者发送消息时,指定消息的RoutingKey
  • Exchange将消息路由到BindingKey与消息RoutingKey一致的队列
  • 一个队列可以指定多个BindingKey,且队列之间的BindingKey可以重复

由于基于Config创建队列交换机的方式很麻烦,我们用新的方式声明交换机、队列。

删除上一节我们在config中的声明代码。

然后在listener中进行

package com.yjx23332.mq.listener;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


@Component
public class MQlistener {
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue1"),
            exchange = @Exchange(value = "yjx23332.direct",type = ExchangeTypes.DIRECT),
            key = {"red","blue"} //bindingkey
    ))
    public void listenDirectQueue1(String msg){
        System.out.println("spring 消费者接收q1到消息:【"+msg+"】");
    }
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue2"),
            exchange = @Exchange(value = "yjx23332.direct" , type = ExchangeTypes.DIRECT),
            key = {"red","yellow"}
    ))
    public void listenDirectQueue2(String msg) {
        System.err.println("spring 消费者接收到q2消息:【"+msg+"】");
    }
}

运行后,我们可以看到

接下来,我们修改Test代码

package com.yjx23332.mq;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@SpringBootTest
@RunWith(SpringRunner.class)
public class test {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void testDirectExchange(){
        String exchangeName = "yjx23332.direct";
        rabbitTemplate.convertAndSend(exchangeName,"red","hello red");
        rabbitTemplate.convertAndSend(exchangeName,"blue","hello blue");
        rabbitTemplate.convertAndSend(exchangeName,"yellow","hello yellow");
    }
}

3.5.4 TopicExchange

与DirectExchange类似,但是它的routingKey必须是多个单词表,并用’.'分割。
当队列与交换机绑定时,可以使用通配符。避免当bindkey过多导致的麻烦。

#:代表0个或多个单词
*:代指一个单词

比如

China.news
Japan.news
就可以用 #.news
同理
China.weather
China.news
就可以用 China.#

我们沿用上一节的代码,做一点修改即可

package com.yjx23332.mq.listener;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


@Component
public class MQlistener {
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue"),
            exchange = @Exchange(value = "yjx23332.topic",type = ExchangeTypes.TOPIC),
            key = {"China.#"} //bindingkey
    ))
    public void listenTopicQueue1(String msg){
        System.out.println("spring 消费者接收q1到消息:【"+msg+"】");
    }
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue2"),
            exchange = @Exchange(value = "yjx23332.topic",type = ExchangeTypes.TOPIC),
            key = {"#.news"}
    ))
    public void listenTopicQueue2(String msg) {
        System.err.println("spring 消费者接收到q2消息:【"+msg+"】");
    }
}

重启后,可看到

修改Test代码

package com.yjx23332.mq;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@SpringBootTest
@RunWith(SpringRunner.class)
public class test {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void testDirectExchange(){
        String exchangeName = "yjx23332.topic";
        rabbitTemplate.convertAndSend(exchangeName,"China.news","江苏地表最高温度将达到72摄氏度");
        rabbitTemplate.convertAndSend(exchangeName,"China.weather","未来温度仍将升高");
        rabbitTemplate.convertAndSend(exchangeName,"Japan.news","安培中枪");
    }
}

3.6 消息转换器

在发送中,我们接收消息的类型是Object。SpringAMQP会帮我们序列化后变为字节发送。
用默认JDK的序列化ObjectOutputStream是没有问题的,但是中间过程是乱码,我们这里改用JSON方式的序列化,这样在消息队列中查看也是正常的。

默认JDK的消息信息:

接下来我们配置消息转换。

我们先在消费者声明一个queue,并设置处理方式

package com.yjx23332.mq.listener;


import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


@Component
public class MQlistener {
    @RabbitListener(queuesToDeclare = @Queue("object.queue"))
    public void listenObjectQueue(String msg){
        System.out.println("spring 消费者接收到Object消息:【"+msg+"】");
    }
}

我们为发送类引入依赖并编写配置

		<dependency>
            <groupId>com.fasterxml.jackson.dataformat</groupId>
            <artifactId>jackson-dataformat-yaml</artifactId>
        </dependency>

覆盖默认的消息转换。

package com.yjx23332.mq.config;


import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MessageConverterConfig {
    @Bean
    public MessageConverter jsonMessageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}

随后修改Test

package com.yjx23332.mq;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.HashMap;
import java.util.Map;

@SpringBootTest
@RunWith(SpringRunner.class)
public class test {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void testObjectQueue(){
        String queueName = "object.queue";
        Map<String,Object> msg = new HashMap<>();
        msg.put("name","yjx23332");
        msg.put("age",21);
        rabbitTemplate.convertAndSend(queueName,msg);
    }
}

结果如下:

这时消息不再是乱码

我们在为消费者配置转换,并修改监听器。当然,如果我们在两边都不配置消息转换器,这里结果是一样的。

package com.yjx23332.mq.listener;


import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;


@Component
public class MQlistener {
    @RabbitListener(queuesToDeclare = @Queue("object.queue"))
    public void listenObjectQueue(Map<String,Object> msg){
        System.out.println("spring 消费者接收到Object消息:【 name = "+msg.get("name")+",age = "+msg.get("age")+"】");
    }

}

结果如下

参考文献

[1]Spring AMQP官网
[2]黑马程序员Java微服务
[3]RabbitMQ官方文档

到此这篇关于 SpringBoot2实现MessageQueue消息队列的文章就介绍到这了,更多相关 SpringBoot2 MessageQueue消息队列内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

SpringBoot2实现MessageQueue消息队列

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

SpringBoot2实现MessageQueue消息队列

本文主要介绍了SpringBoot2实现MessageQueue消息队列,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
2023-05-17

SpringBoot2如何整合Redis哨兵集群 实现消息队列场景

这篇文章主要介绍了SpringBoot2如何整合Redis哨兵集群 实现消息队列场景,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。一、Redis集群简介1、RedisClus
2023-06-02

MSMQ消息队列怎么实现

这篇文章主要介绍了MSMQ消息队列怎么实现的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇MSMQ消息队列怎么实现文章都会有所收获,下面我们一起来看看吧。一、引言Windows Communication Fou
2023-06-29

go怎么实现消息队列

在Go语言中,你可以使用第三方库来实现消息队列。目前比较常用的消息队列库有:1. RabbitMQ:RabbitMQ是一个开源的消息队列系统,使用AMQP协议来实现消息的传递。你可以使用Go语言的第三方库`github.com/stread
2023-08-31

golang怎么实现消息队列

在Golang中实现消息队列可以使用Golang的内置库和第三方库来实现。以下是两种常见的实现方式:使用Golang内置的channel:Golang中的channel是用于在goroutine之间进行通信的一种机制,可以用来实现简单的消息
2023-10-25

redis如何实现消息队列

Redis可以实现消息队列的功能,常用的实现方式是使用Redis的List数据结构来存储消息队列中的消息。具体实现步骤如下:将消息添加到队列中:使用Redis的LPUSH命令将消息添加到队列的头部(即左侧),使用RPUSH命令将消息添加到
redis如何实现消息队列
2024-04-22

redis怎么实现消息队列

Redis可以通过以下几种方式实现消息队列:1. List数据结构:使用Redis的List数据结构实现简单的消息队列。生产者将消息推入List的尾部,消费者从List的头部获取消息。可以使用`LPUSH`将消息推入队列,使用`BRPOP`
2023-09-14

redisstream实现消息队列的实践

本文主要介绍了redisstream实现消息队列的实践,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
2022-11-13

php redis消息队列怎么实现

PHPRedis消息队列实现本文介绍了如何使用PHP和Redis实现消息队列,包括安装Redis、建立PHP连接、创建和订阅队列、消费消息、使用发布和订阅模式、管道以及监控队列。还提供了有关错误处理和最佳实践的建议。
php redis消息队列怎么实现
2024-04-11

PHP怎么实现RabbitMQ消息列队

这篇“PHP怎么实现RabbitMQ消息列队”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“PHP怎么实现RabbitMQ消息
2023-06-30

redis stream 实现消息队列的实践

目录Redis 实现消息对列4中方法发布订阅list 队列zset 队列Stream 队列基本命令xadd 生产消息读取消息xgroup 消费者组xreadgroup 消费消息Pending 等待列表消息确认消息转移信息监控SpringBo
2022-08-10

RabbitMQ 消息队列

RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入
2023-01-31

Python消息队列

消息中间件 --->就是消息队列异步方式:不需要立马得到结果,需要排队同步方式:需要实时获得数据,坚决不能排队例子:#多进程模块multiprocessingfrom multiprocessing import Processfrom m
2023-01-31

RabbitMQ消息队列

一、简介  RabbitMQ是一个在AMQP基础上完整的、可复用的企业消息系统,遵循Mozilla Public License开源协议。MQ全称Message Queue(消息队列),它是一种应用程序对应用程序的通信方式。应用程序通过读写
2023-01-31

golang怎么实现消息队列功能

Golang可以使用第三方库来实现消息队列功能,常用的库有NSQ、RabbitMQ和Apache Kafka等。下面以NSQ为例,介绍如何使用Golang实现消息队列功能。1. 首先,安装NSQ并启动NSQ服务。可以从https://nsq
2023-10-20

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录