我的编程空间,编程开发者的网络收藏夹
学习永远不晚

RocketMQ生产消息与消费消息超详细讲解

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

RocketMQ生产消息与消费消息超详细讲解

1 RocketMQ简介

RocketMQ是阿里开源的一款非常优秀中间件产品,脱胎于阿里的另一款队列技术MetaQ,后捐赠给Apache基金会作为一款孵化技术,仅仅经历了一年多的时间就成为Apache基金会的顶级项目。并且它现在已经在阿里内部被广泛的应用,并且经受住了多次双十一的这种极致场景的压力(2017年的双十一,RocketMQ流转的消息量达到了万亿级,峰值TPS达到5600万)

2 MQ的常见产品

ActiveMQ:java语言实现,万级数据吞吐量,处理速度ms级,主从架构,成熟度高

RabbitMQ :erlang语言实现,万级数据吞吐量,处理速度us级,主从架构,

RocketMQ :java语言实现,十万级数据吞吐量,处理速度ms级,分布式架构,功能强,扩展性强

kafka :scala语言实现,十万级数据吞吐量,处理速度ms级,分布式架构,功能较少,应用于大数据较多

3 环境搭建

创建maven工程

引入依赖:

 <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.5.2</version>
</dependency>

4 单生产者单消费者模式

生产者:

//生产者,产生消息
public class Producer {
    public static void main(String[] args) throws Exception{
        //1.创建一个发送消息的对象Producer
        DefaultMQProducer producer=new DefaultMQProducer("group1");
        //2.设定发送的命名服务器地址
        producer.setNamesrvAddr("192.168.23.127:9876");
        //3启动发送的服务
        producer.start();
        //4.1创建要发送的消息对象,指定topic,指定内容body
        Message msg=new Message("topic1","hello rocketmq".getBytes("UTF-8"));
        //4.2发送消息
        SendResult result = producer.send(msg);
        System.out.println("返回结果:"+result);
        //5.关闭连接
        producer.shutdown();
    }
}

消费者:

//消费者,消费消息
public class Consumer {
    public static void main(String[] args) throws Exception{
        //1.创建一个接收消息的对象Consumer
        DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("group1");
        //2.设定接收的命名服务器地址
        consumer.setNamesrvAddr("192.168.23.127:9876");
        //3.设置接收消息对应的topic,对应的sub标签为任意*
        consumer.subscribe("topic1","*");
        //4.开启监听,用于接收消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt msg : list) {
                    //System.out.println("收到消息:"+msg);
                    System.out.println("消息:"+new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功处理, mq 收到这个 标记后相同的消息讲不会再次发给消费者
            }
        });
        //5.启动接收消息的服务
        consumer.start();// 开启多线程 监控消息,持续运行
        System.out.println("接收消息服务已运行");
    }
}

测试:

5 单生产者多消费者模式

5.1默认模式(负载均衡)

生产者:

//生产者,产生消息
public class Producer {
    public static void main(String[] args) throws Exception{
        //1.创建一个发送消息的对象Producer
        DefaultMQProducer producer=new DefaultMQProducer("group1");
        //2.设定发送的命名服务器地址
        producer.setNamesrvAddr("192.168.23.127:9876");
        //3启动发送的服务
        producer.start();
        for (int i = 1; i <= 10; i++) {
            Message msg = new Message("topic1",("生产者2: hello rocketmq "+i).getBytes("UTF-8"));
            SendResult result = producer.send(msg);
            System.out.println("返回结果:"+result);
        }
        //5.关闭连接
        producer.shutdown();
    }
}

消费者:

//消费者,消费消息
public class Consumer {
    public static void main(String[] args) throws Exception{
        //1.创建一个接收消息的对象Consumer
        DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("group1");
        //2.设定接收的命名服务器地址
        consumer.setNamesrvAddr("192.168.23.127:9876");
        //3.设置接收消息对应的topic,对应的sub标签为任意*
        consumer.subscribe("topic1","*");
        //4.开启监听,用于接收消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt msg : list) {
                    //System.out.println("收到消息:"+msg);
                    System.out.println("消息:"+new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功处理, mq 收到这个 标记后相同的消息讲不会再次发给消费者
            }
        });
        //5.启动接收消息的服务
        consumer.start();// 开启多线程 监控消息,持续运行
        System.out.println("接收消息服务已运行");
    }
}

测试:

5.2广播模式

生产者的代码不变,消费者的代码改动如下:

		//设置当前消费者的消费模式(默认模式:负载均衡)
        consumer.setMessageModel(MessageModel.CLUSTERING);
        //设置当前消费者的消费模式为广播模式:所有客户端接收的消息是一样的
        consumer.setMessageModel(MessageModel.BROADCASTING);

具体消费者代码:

//消费者,消费消息
public class Consumer {
    public static void main(String[] args) throws Exception{
        //1.创建一个接收消息的对象Consumer
        DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("group1");
        //2.设定接收的命名服务器地址
        consumer.setNamesrvAddr("192.168.23.127:9876");
        //3.设置接收消息对应的topic,对应的sub标签为任意*
        consumer.subscribe("topic1","*");
        //设置当前消费者的消费模式(默认模式:负载均衡)
        //consumer.setMessageModel(MessageModel.CLUSTERING);
        //设置当前消费者的消费模式为广播模式:所有客户端接收的消息是一样的
        consumer.setMessageModel(MessageModel.BROADCASTING);
        //4.开启监听,用于接收消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt msg : list) {
                    //System.out.println("收到消息:"+msg);
                    System.out.println("消息:"+new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功处理, mq 收到这个 标记后相同的消息讲不会再次发给消费者
            }
        });
        //5.启动接收消息的服务
        consumer.start();// 开启多线程 监控消息,持续运行
        System.out.println("接收消息服务已运行");
    }
}

测试:

广播模式的现象

如果 生产者先发送消息, 后启动消费者, 消息只能被消费一次

如果多个消费者先启动(广播模式),后发消息,才有广播的效果

结论: 必须先启动消费者再启动发送者才有广播的效果

6 多生产者多消费者模式

多生产者产生的消息可以被同一个消费者消费,也可以被多个消费者消费

运行多个生产者,在启动消费者

测试:

到此这篇关于RocketMQ生产消息与消费消息超详细讲解的文章就介绍到这了,更多相关RocketMQ生产消息与消费消息内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

RocketMQ生产消息与消费消息超详细讲解

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

RocketMQ生产消息与消费消息超详细讲解

这篇文章主要介绍了RocketMQ生产消息与消费消息,RocketMQ可用于以三种方式发送消息:可靠的同步、可靠的异步和单向传输。前两种消息类型是可靠的,因为无论它们是否成功发送都有响应
2022-12-27

RocketMQ延迟消息超详细讲解

延时消息是指发送到RocketMQ后不会马上被消费者拉取到,而是等待固定的时间,才能被消费者拉取到。延时消息的使用场景很多,比如电商场景下关闭超时未支付的订单,某些场景下需要在固定时间后发送提示消息
2023-02-14

详解RocketMQ消费端如何监听消息

这篇文章主要为大家介绍了RocketMQ消费端如何监听消息示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
2022-12-15

RocketMQ事务消息原理与使用详解

RocketMQ事务消息(TransactionalMessage)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。RocketMQ的事务消息提供类似X/OpenXA的分布式事务功能,通过事务消息能达到分布式事务的最终一致
2023-02-13

Java多线程中的生产者与消费者案例讲解

这篇文章主要讲解了“Java多线程中的生产者与消费者案例讲解”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Java多线程中的生产者与消费者案例讲解”吧!目录前言工具知识点设计思路具体步骤总结
2023-06-20

Docker启动RabbitMQ实现生产者与消费者的详细过程

这篇文章主要介绍了Docker启动RabbitMQ,实现生产者与消费者,通过Docker拉取镜像并启动RabbitMQ,本文结合实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
2023-02-23

Java自带消息队列Queue的使用教程详细讲解

这篇文章主要介绍了Java自带消息队列Queue的使用教程,Java中的queue类是队列数据结构管理类,在它里边的元素可以按照添加它们的相同顺序被移除,队列通常以FIFO的方式排序各个元素,感兴趣想要详细了解可以参考下文
2023-05-20

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录