我的编程空间,编程开发者的网络收藏夹
学习永远不晚

Springboot如何整合RocketMQ收发消息

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

Springboot如何整合RocketMQ收发消息

这篇文章将为大家详细讲解有关Springboot如何整合RocketMQ收发消息,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。

Springboot 整合 RocketMQ 收发消息

创建springboot项目

pom.xml添加rocketmq-spring-boot-starter依赖。

<dependency>    <groupId>org.apache.rocketmq</groupId>    <artifactId>rocketmq-spring-boot-starter</artifactId>    <version>2.1.0</version></dependency>

yml 配置

application.yml

rocketmq:  name-server: 192.168.64.141:9876

application-demo1.yml

使用 demo1 profile 指定生产者组组名

rocketmq:  producer:    group: producer-demo1

application-demo2.yml

使用 demo2 profile 指定生产者组组名

rocketmq:  producer:    group: producer-demo2

测试

demo 1

  • 发送普通消息

  • 发送 Spring 的通用 Message 对象

  • 发送异步消息

  • 发送顺序消息

生产者

package cn.tedu.demo2.m1;import org.apache.rocketmq.client.producer.SendCallback;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.spring.core.RocketMQTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.messaging.Message;import org.springframework.messaging.support.MessageBuilder;import org.springframework.stereotype.Component;@Componentpublic class Producer {    @Autowired    private RocketMQTemplate t ;    public  void send(){        //发送同步消息        t.convertAndSend("Topic1:TagA", "Hello world! ");        //发送spring的Message        Message<String> message = MessageBuilder.withPayload("Hello Spring message! ").build();        t.send("Topic1:TagA",message);        //发送异步消息        t.asyncSend("Topic1:TagA", "hello world asyn", new SendCallback() {            @Override            public void onSuccess(SendResult sendResult) {                System.out.println("发送成功");            }            @Override            public void onException(Throwable throwable) {                System.out.println("发送失败");            }        });        //发送顺序消息        t.syncSendOrderly("Topic1", "98456237,创建", "98456237");        t.syncSendOrderly("Topic1", "98456237,支付", "98456237");        t.syncSendOrderly("Topic1", "98456237,完成", "98456237");    }}

消费者

package cn.tedu.demo2.m1;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.springframework.stereotype.Component;@Component@RocketMQMessageListener(topic = "Topic1",consumerGroup = "consumer-demo1")public class Consumer  implements RocketMQListener<String> {    @Override    public void onMessage(String s) {        System.out.println("收到"+s);    }}

主类

package cn.tedu.demo2.m1;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplicationpublic class Main {    public static void main(String[] args) {        SpringApplication.run(Main.class, args);    }}

测试类

需要放在 test 文件夹

激活 demo1 profile  @ActiveProfiles("demo1")

package cn.tedu.demo2.m1;import org.junit.jupiter.api.Test;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.ActiveProfiles;@SpringBootTest@ActiveProfiles("demo1")public class Test1 {    @Autowired    private  Producer producer;    @Test    public void test1(){        producer.send();        try {            Thread.sleep(5000);        } catch (InterruptedException e) {            e.printStackTrace();        }    }}

demo 2

发送事务消息

生产者

package cn.tedu.demo2.m2;import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;import org.apache.rocketmq.spring.core.RocketMQTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.messaging.Message;import org.springframework.messaging.support.MessageBuilder;import org.springframework.stereotype.Component;@Componentpublic class Producer {    @Autowired    private RocketMQTemplate t;    public void send(){        Message<String> message = MessageBuilder.withPayload("Hello world").build();        //一旦发送消息,则执行监听器        t.sendMessageInTransaction("Topic2",message,null);    }    @RocketMQTransactionListener    class Lis implements RocketMQLocalTransactionListener {        @Override        public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {            System.out.println("执行本地事务");            return RocketMQLocalTransactionState.UNKNOWN;        }        @Override        public RocketMQLocalTransactionState checkLocalTransaction(Message message) {            System.out.println("执行事务回查");            return RocketMQLocalTransactionState.COMMIT;        }    }}

消费者

package cn.tedu.demo2.m2;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.springframework.stereotype.Component;@Component@RocketMQMessageListener(topic = "Topic2",consumerGroup = "consumer-demo2")public class Consumer implements RocketMQListener<String> {    @Override    public void onMessage(String s) {        System.out.println("收到"+s);    }}

主类

package cn.tedu.demo2.m2;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplicationpublic class Main {    public static void main(String[] args) {        SpringApplication.run(Main.class, args);    }}

测试类

package cn.tedu.demo2.m2;import org.junit.jupiter.api.Test;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.ActiveProfiles;@SpringBootTest@ActiveProfiles("demo2")public class Test2 {    @Autowired    private  Producer producer;    @Test    public void  test1(){        producer.send();        //为了能够收到消费者消费的数据,这里通过休眠模拟等待时间        try {            Thread.sleep(30000);        } catch (InterruptedException e) {            e.printStackTrace();        }    }}

关于“Springboot如何整合RocketMQ收发消息”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,使各位可以学到更多知识,如果觉得文章不错,请把它分享出去让更多的人看到。

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

Springboot如何整合RocketMQ收发消息

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

Springboot如何整合RocketMQ收发消息

这篇文章将为大家详细讲解有关Springboot如何整合RocketMQ收发消息,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。Springboot 整合 RocketMQ 收发消息创建springboot
2023-06-22

Springboot中RocketMQ怎么实现消息发送与接收

本文小编为大家详细介绍“Springboot中RocketMQ怎么实现消息发送与接收”,内容详细,步骤清晰,细节处理妥当,希望这篇“Springboot中RocketMQ怎么实现消息发送与接收”文章能帮助大家解决疑惑,下面跟着小编的思路慢慢
2023-07-02

spring整合JMS如何实现同步收发消息

这篇文章给大家分享的是有关spring整合JMS如何实现同步收发消息的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。1. 安装ActiveMQ注意:JDK版本需要1.7及以上才行到Apache官方网站下载最新的Ac
2023-05-30

如何进行SpringBoot+RabbitMQ方式收发消息

本篇文章给大家分享的是有关如何进行SpringBoot+RabbitMQ方式收发消息,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。本篇会和SpringBoot做整合,采用自动配
2023-06-16

SpringBoot如何实现MQTT消息发送和接收

今天小编给大家分享一下SpringBoot如何实现MQTT消息发送和接收的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解一下吧。Spr
2023-07-05

如何解决SpringBoot整合RocketMQ遇到的问题

本篇内容主要讲解“如何解决SpringBoot整合RocketMQ遇到的问题”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“如何解决SpringBoot整合RocketMQ遇到的问题”吧!应用场景
2023-06-20

springboot整合rocketmq如何实现分布式事务

这篇文章给大家分享的是有关springboot整合rocketmq如何实现分布式事务的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。1 执行流程(1) 发送方向 MQ 服务端发送消息。(2) MQ Server 将
2023-06-15

springboot整合企微webhook机器人发送消息提醒

这篇文章主要为大家介绍了springboot整合企微webhook机器人发送消息提醒,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
2022-12-19

SpringBoot怎么整合WebSocket实现后端向前端发送消息

这篇文章主要讲解了“SpringBoot怎么整合WebSocket实现后端向前端发送消息”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“SpringBoot怎么整合WebSocket实现后端向
2023-07-05

SpringBoot整合Redis实现消息发布与订阅的示例代码

能实现发送与接收信息的中间介有很多,比如:RocketMQ、RabbitMQ、ActiveMQ、Kafka等,本文主要介绍了Redis的推送与订阅功能并集成SpringBoot的实现,感兴趣的可以了解一下
2022-11-13

SpringBoot整合WebSocket实现后端向前端发送消息的实例代码

WebSocket使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据,下面这篇文章主要给大家介绍了关于SpringBoot整合WebSocket实现后端向前端发送消息的相关资料,需要的朋友可以参考下
2023-03-06

springboot如何整合邮件发送功能

这篇文章给大家介绍springboot如何整合邮件发送功能,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。 pom依赖 org.springframework.bo
2023-06-22

SpringBoot整合RedisTemplate如何实现缓存信息监控

这篇文章给大家分享的是有关SpringBoot整合RedisTemplate如何实现缓存信息监控的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。SpringBoot 整合 Redis 数据库实现数据缓存的本质是整合
2023-06-28

C#使用udp如何实现消息的接收和发送

这篇文章主要介绍了C#使用udp如何实现消息的接收和发送问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
2023-02-26

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录