我的编程空间,编程开发者的网络收藏夹
学习永远不晚

Java Spring boot 整合RabbitMQ(二):工作队列(Work queues)-B2B2C小程序电子商务

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

Java Spring boot 整合RabbitMQ(二):工作队列(Work queues)-B2B2C小程序电子商务

现在,我们将发送一些字符串,把这些字符串当作复杂的任务。我们并没有一个真实的复杂任务,类似于图片大小被调整或 pdf 文件被渲染,所以我们通过 sleep () 方法来模拟这种情况。我们在字符串中加上点号(.)来表示任务的复杂程度,一个点(.)将会耗时 1 秒钟。比如 “Hello…” 就会耗时 3 秒钟。

如果您尚未设置项目,请参阅第一个教程中的设置。我们将遵循与第一个教程相同的模式:创建一个包(tut2)并创建 Tut2Config、Tut2Receiver 和 Tut2Sender。

代码整合

首先创建一个新的包(tut2),我们将在这里放置我们的三个类。在配置类 Tut2Config 中,我们设置了两个配置文件 ——tut2 和 work-queues。我们利用 Spring 来将队列 Queue 暴露为一个 bean。我们配置消费者,并定义两个 bean 以对应于上图中的工作进程 receiver1 和 receiver2。

配置类

@Profile({"tut2", "work-queues"})@Configurationpublic class Tut2Config {    @Bean    public Queue queue() {        return new Queue("work-queues");    }      @Profile ("receiver")    private static class ReceiverConfig {        @Bean        public Tut2Receiver receiver1() {            return new Tut2Receiver(1);        }        @Bean        public Tut2Receiver receiver2() {            return new Tut2Receiver(2);        }    }    @Profile("sender")     @Bean   public Tut2Sender sender() {        return new Tut2Sender();    }}

生产者
我们简单修改一下生产者的代码,以添加点号(.)的方式来人为的增加该任务的时长,字符串中的每个点号(.)都会增加 1s 的耗时。

public class Tut2Sender { @Autowired private AmqpTemplate template; @Autowired    private Queue queue;    int dots = 0;    int count = 0;   @Scheduled(fixedDelay = 1000, initialDelay = 500)    public void send(){        StringBuilder builder = new StringBuilder("Hello");        if (dots++ == 3) {            dots = 1;        }        for (int i = 0; i < dots; i++) {            builder.append('.');        }        builder.append(Integer.toString(++count));        String message = builder.toString();        template.convertAndSend(queue.getName(), message);        System.out.println(" [x] Sent '" + message + "'");    }}

消费者
我们的消费者 Tut2Receiver 通过 doWork () 方法模拟了一个耗时的虚假任务,它需要为消息体中每一个点号(.)模拟 1 秒钟的操作。并且我们为消费者增加了一个实例编号,以知道是哪个实例消费了消息和处理的时长。

@RebbitListener(queues = "work-queues")public class Tut2Receiver {    private int instance;    public Tut2Receiver(int instance) {        this.instance = instance;    }    @RabbitHandler    public void receive(String in) throws InterruptedException {        StopWatch watch = new StopWatch();        watch.start();        System.out.println("instance " + this.instance +                " [x] Received '" + in + "'");        doWork(in);        watch.stop();        System.out.println("instance " + this.instance +                " [x] Done in " + watch.getTotalTimeSeconds() + "s");    }    private void doWork(String in) throws InterruptedException {        for (char ch : in.toCharArray()) {            if (ch == '.') {                Thread.sleep(1000);            }        }    }}

运行

maven 编译

mvn clean package -Dmaven.test.skip=true

运行

java -jar target/rabbitmq-tutorial-0.0.1-SNAPSHOT.jar --spring.profiles.active=tut2,sender --tutorial.client.duration=60000

java -jar target/rabbitmq-tutorial-0.0.1-SNAPSHOT.jar --spring.profiles.active=tut2,receiver --tutorial.client.duration=60000

输出

// Sender

Ready … running for 10000ms

[x] Sent ‘Hello.1’

[x] Sent ‘Hello…2’

[x] Sent ‘Hello…3’

[x] Sent ‘Hello.4’

[x] Sent ‘Hello…5’

[x] Sent ‘Hello…6’

[x] Sent ‘Hello.7’

[x] Sent ‘Hello…8’

[x] Sent ‘Hello…9’

// Receiver

Ready … running for 10000ms

instance 1 [x] Received ‘Hello.1’

instance 2 [x] Received ‘Hello…2’

instance 1 [x] Done in 1.005s

instance 1 [x] Received ‘Hello…3’

instance 2 [x] Done in 2.007s

instance 2 [x] Received ‘Hello.4’

instance 2 [x] Done in 1.005s

instance 1 [x] Done in 3.01s

instance 1 [x] Received ‘Hello…5’

instance 2 [x] Received ‘Hello…6’

instance 1 [x] Done in 2.006s

instance 1 [x] Received ‘Hello.7’

instance 1 [x] Done in 1.002s

instance 1 [x] Received ‘Hello…9’

instance 2 [x] Done in 3.01s

instance 2 [x] Received ‘Hello…8’

prefetch

从消费者这端的输出可以看出来,instance 1 得到的任务编号始终是奇数(Hello.1,Hello…3,Hello…5,Hello.7),而 instance 2 得到的任务编号始终是偶数。了解springcloud架构可以加求求:三五三六二四七二五九

如果感觉这次的输出只是巧合,可以多试几次或通过 --tutorial.client.duration= 调整时长得到更多的输出,而结果肯定都是一样的。

这里设计的问题就是之前在基础概念里讲到的调度策略的问题了。要实现公平调度(Fair dispatch)就是设置 prefetch 的值,实现方式有两种。

全局设置

在 application.yml 中设置 spring.rabbitmq.listener.simple.prefetch=1 即可,这会影响到本 Spring Boot 应用中所有使用默认 SimpleRabbitListenerContainerFactory 的消费者。

网上很多人说改配置 pring.rabbitmq.listener.prefetc,实测已经无效,应该是版本的问题。我所使用的版本(RabbitMQ:3.7.4,Spring Boot: 2.0.1.RELEASE),除了 spring.rabbitmq.listener.simple.prefetch,还有一个 spring.rabbitmq.listener.direct.prefetch 可以配置。

改了配置后再运行,可以看到 instance 1 可以获取到”Hello…6”、”Hello…12” 了。

Ready … running for 60000ms

instance 1 [x] Received ‘Hello.1’

instance 2 [x] Received ‘Hello…2’

instance 1 [x] Done in 1.004s

instance 1 [x] Received ‘Hello…3’

instance 2 [x] Done in 2.008s

instance 2 [x] Received ‘Hello.4’

instance 2 [x] Done in 1.004s

instance 2 [x] Received ‘Hello…5’

instance 1 [x] Done in 3.012s

instance 1 [x] Received ‘Hello…6’

instance 2 [x] Done in 2.007s

instance 2 [x] Received ‘Hello.7’

instance 2 [x] Done in 1.004s

instance 2 [x] Received ‘Hello…8’

instance 1 [x] Done in 3.011s

instance 1 [x] Received ‘Hello…9’

instance 2 [x] Done in 2.007s

instance 2 [x] Received ‘Hello.10’

instance 2 [x] Done in 1.006s

instance 2 [x] Received ‘Hello…11’

instance 1 [x] Done in 3.01s

instance 1 [x] Received ‘Hello…12’

特定消费者

上边是改了全局的消费者,如果只针对特定的消费者的话,又怎么处理呢?

我们可以通过自定义 RabbitListenerContainerFactory 来实现。

@Beanpublic RabbitListenerContainerFactory<SimpleMessageListenerContainer> prefetchOneRabbitListenerContainerFactory(ConnectionFactory rabbitConnectionFactory) {    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();    factory.setConnectionFactory(rabbitConnectionFactory);    factory.setPrefetchCount(1);    return factory;}

然后在特定的消费者上指定 containerFactory

@RebbitListener(queues = "hello", containerFactory = "prefetchTenRabbitListenerContainerFactory")public void receive(String in) {    System.out.println(" [x] Received '" + in + "'")}

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

Java Spring boot 整合RabbitMQ(二):工作队列(Work queues)-B2B2C小程序电子商务

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

Java Spring boot 整合RabbitMQ(二):工作队列(Work queues)-B2B2C小程序电子商务

现在,我们将发送一些字符串,把这些字符串当作复杂的任务。我们并没有一个真实的复杂任务,类似于图片大小被调整或 pdf 文件被渲染,所以我们通过 sleep () 方法来模拟这种情况。我们在字符串中加上点号(.)来表示任务的复杂程度,一个点(
2023-06-05

Java Spring Boot 整合RabbitMQ(一):Hello World -B2B2C小程序电子商务

Spring Boot 整合环境:RabbitMQ:3.7.4Spring Boot:2.0.1.RELEASE因为有 Starter POMs,在 Spring Boot 中整合 RabbitMQ 是一件非常容易的事,其中的 AMQP 模
2023-06-05

Java Spring boot 整合RabbitMQ(五):主题(Topics)-B2B2C小程序电子商务

在我们的日志系统中,我们不只希望订阅基于严重程度的日志,同时还希望订阅基于发送来源的日志。Unix 工具 syslog 就是同时基于严重程度 -severity (info/warn/crit…) 和 设备 -facility (auth/
2023-06-05

Java Spring boot整合RabbitMQ如何实现B2B2C小程序电子商务

小编给大家分享一下Java Spring boot整合RabbitMQ如何实现B2B2C小程序电子商务,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!之前我们发送和接收到队列中的消息,现在是时候在 RabbitMQ 中引入完
2023-06-05

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录