我的编程空间,编程开发者的网络收藏夹
学习永远不晚

SpringcloudStream消息驱动工具使用介绍

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

SpringcloudStream消息驱动工具使用介绍

springcloud Stream

什么是springcloud Stream

  现在市面上有很多的消息中间件,每一个公司使用的都有所不同,为了减少学习的成本,springcloud Stream可以让我们不再关注消息中间件MQ的具体细节,我们只需要通过适配绑定的方式即可实现不同MQ之间的切换,但是遗憾的是springcloud Stream目前只支持RabbitMQ和Kafka。

  SpringCloud Stream是一个构建消息驱动微服务的框架,应用程序通过inputs或者 outputs来与SpringCloud Stream中的binder进行交互,我们可以通过配置来binding ,而 SpringCloud Stream 的binder负责与中间件交互,所以我们只需要搞清楚如何与Stream交互就可以很方便的使用消息驱动了!

什么是Binder

  Binder是SpringCloud Stream的一个抽象概念,是应用与消息中间件之间的粘合剂,通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离,可以动态的改变消息的destinations(对应于 Kafka的topic,RabbitMQ的exchanges),这些都可以通过外部配置项来做到,甚至可以任意的改变中间件的类型但是不需要修改一行代码

为什么使用Stream

  比方说我们用到了RabbitMQ和Kafka,由于这两个消息中间件的架构上的不同像RabbitMQ有exchange,kafka有Topic和Partitions分区,这些中间件的差异性导致我们实际项目开发给我们造成了一定的困扰,我们如果用了两个消息队列的其中一种,后面的业务需求,我想往另外一种消息队列进行迁移;这时候无疑就是一个灾难性的,一大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这衬候springcloud Stream给我们提供了一种解耦合的方式。

Stream使用案例

前置知识

Stream处理消息的架构

  Source、Sink: 简单的可理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出,接受消息就是输入。Channel: 通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介。Binder: 消息的生产者和消费者中间层,实现了应用程序与消息中间件细节之间的隔离

  通过以上两张图片可知,消息的处理流向是:消息生产者处理完业务逻辑之后消息到达source中,接着前往Channel通道进行排队,然后通过binder绑定器将消息数据发送到底层mq,然后又通过binder绑定器接收到底层mq发送来的消息数据,接着前往Channel通道进行排队,由Sink接收到消息数据,消息消费者拿到消息数据执行相应的业务逻辑

Stream常用注解

消息生产者8801模块搭建

  第一步: 创建一个maven模块,引入相关依赖,最主要的就是stream整合rabbitmq的依赖

<!--stream的rabbitmq依赖-->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

  第二步: 配置文件的编写

server:
  port: 8801

spring:
  application:
    name: cloud-stream-provider
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit: # 表示定义的名称,用于于binding整合
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关的环境配置
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # 服务的整合处理
        output: # 这个名字是一个通道的名称
          destination: studyExchange # 表示要使用的Exchange名称定义
          content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
          binder: defaultRabbit  # 设置要绑定的消息服务的具体设置

eureka:
  client: # 客户端进行Eureka注册的配置
    service-url:
      defaultZone: http://localhost:7001/eureka

  第三步: 主程序类

@SpringBootApplication
public class CloudStreamRabbitmqProvider8801Application {
    public static void main(String[] args) {
        SpringApplication.run(CloudStreamRabbitmqProvider8801Application.class, args);
        System.out.println("启动成功");
    }
}

  第四步: 业务层service代码编写,注意:这里实现类注入的对象由之前的dao层对象换成了channel通道对象,详细的发送由实现类的第12完成

public interface IMessageProviderService {
    
    String send();
}
@EnableBinding(Source.class)
public class MessageProviderServiceImpl implements IMessageProviderService {
    
    @Resource
    private MessageChannel output;
    @Override
    public String send() {
        String serial = UUID.randomUUID().toString();
        output.send(MessageBuilder.withPayload(serial).build());

        System.out.println("*****serial: " + serial);
        return serial;
    }
}

  第五步: controller接口

@RestController
public class SendMessageController {
    @Resource
    private IMessageProviderService messageProviderService;
    @GetMapping(value = "/sendMessage")
    public String sendMessage() {
        return messageProviderService.send();
    }
}

消息消费者8802模块搭建

  第一步: 创建一个maven模块,引入相关依赖,最主要的就是stream整合rabbitmq的依赖

<!--stream的rabbitmq依赖-->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

  第二步: 配置文件的编写,与生产者的区别就在于bindings下的是input而不是output

server:
  port: 8802

spring:
  application:
    name: cloud-stream-consumer
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit: # 表示定义的名称,用于于binding整合
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关的环境配置
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # 服务的整合处理
        input: # 这个名字是一个通道的名称
          destination: studyExchange # 表示要使用的Exchange名称定义
          content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
          binder: defaultRabbit  # 设置要绑定的消息服务的具体设置
eureka:
  client: # 客户端进行Eureka注册的配置
    service-url:
      defaultZone: http://localhost:7001/eureka

  第三步: 主程序类

@SpringBootApplication
public class CloudStreamRabbitmqConsumer8802Application {
    public static void main(String[] args) {
        SpringApplication.run(CloudStreamRabbitmqConsumer8802Application.class, args);
        System.out.println("启动成功");
    }
}

  第四步: controller接口,使用url请求生产者8801,即可在消费者8802端接收到8801发送的消息

@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListener {
    @Value("${server.port}")
    private String serverPort;
    @StreamListener(Sink.INPUT)
    public void input(Message<String> message) {
        System.out.println("消费者1号 ----> port:" + serverPort + "\t从8801接受到的消息是:" + message.getPayload());
    }
}

  两个模块搭建完成进行测试,首先启动注册中心7001,然后分别启动消息生产者8801和消息消费者8802,通过url请求访问8001的发送消息请求,会向指定管道中发送一条消息,如果此时这个管道中有消费者即可接收到这条消息。而如何指定消息的管道归属呢,就是通过配置文件中的indings.input.destination来指定,命名相同的服务就会处在同一条管道中

Stream带来的问题

重复消费问题

  按照之前的使用,会带来重复消费问题: 也就是说一个通道上有不止一个消息消费者,stream上默认每一个消费者都属于不同的组,这样的话就会导致这个消息被多个组的消费者重复消费

  知道了问题出现的原因就很容易解决了,只要我们自定义配置分组,将这些消费者都分配到同一个组中就能避免重复消费的问题出现了(同一个组间的消费者是竞争关系,不管组间有多少的消费者都只会消费一次)

自定义分组

  只需要在配置文件修改一处配置即可实现自定义组名并且自定义分组,组名相同的服务会被分配到同一组,通道内的消息数据会被该组中的所有消费者轮询消费

持久化问题

  上面自定义分组使用的group配置除了可以自定义分组和分组名之外,还可以实现消息的持久化,也就是说使用group配置自定义分组和分组名的消息消费者,就算在消息生产者发送消息的时候挂掉了,等这个消费者重启之后依然是能够消费之前发送的消息

这里一个生产者和两个消费者存在以下十三种情况(生产者发送四次消息):

1、都使用group分组的两个不同组成员,在生产者生产的时候

  • 都没挂(各消费四次)
  • 挂了其中一个(各消费四次)
  • 都挂了(各消费四次)

2、都使用group分组的两个同组成员,在生产者生产的时候

  • 都没挂(各消费两次)
  • 挂了其中一个(没挂的把四次消费完)
  • 都挂了(各消费两次)

3、其中一个使用group分组的两个成员,在生产者生产的时候

  • 都挂了(都不消费)
  • group的挂了(各消费四次)
  • 没group的挂了(没挂的消费四次,挂的由于没有持久化所以不消费)
  • 都没挂(各消费四次)

4、都不使用group分组的两个成员,在生产者生产的时候

  • 都挂了(都不消费)
  • 挂了其中一个(没挂的消费四次,挂的由于没有持久化所以不消费)
  • 都没挂(各消费四次)

  总之一句话,通道里的消息会持久化给使用group配置的消息消费者(每一组都有一份),就算发送消息的时候这些消费者挂了,如果同组的消费者有没挂的就会把这些消息竞争消费完;如果同组没有消费者,等他重启之后还是会消费这些消息

到此这篇关于Springcloud Stream消息驱动工具使用介绍的文章就介绍到这了,更多相关Springcloud Stream内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

SpringcloudStream消息驱动工具使用介绍

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

BeanUtils工具类的介绍和使用

BeanUtils是Apache Commons BeanUtils库中的一个工具类,用于简化JavaBean之间的属性复制。它提供了一组静态方法,可以实现源对象的属性值复制到目标对象中,而不需要手动编写大量的复制代码。使用BeanUtil
2023-09-21

PostgreSQL工具pgAdmin的介绍及使用

目录1. pgAdmin的介绍2. VQhKYwpgAdmin的使用1. pgAdmin的介绍pgAdmin 4是一款专门针对PostgreSQL数据库编程的客户端管理软件,该版本在pgAdmin 3的基础上做了较大的架构变化,由之前py
2022-07-14

JDK14性能管理工具:jstat使用介绍

今天我们的系列文章要介绍的是这四个工具:Jstat(sun.tools.jstat) 它的全称是Java Virtual Machine Statistics Monitoring Tool,是用来监控JVM状态的工具。jstack(sun
2023-06-03

如何分析工具perf的介绍与使用

这期内容当中小编将会给大家带来有关如何分析工具perf的介绍与使用,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。测试环境:Ubuntu16.04 + Kernel:4.4.0-31apt-get inst
2023-06-05

SpringCloudNetfilxRibbon负载均衡工具使用方法介绍

Ribbon是Netflix的组件之一,负责注册中心的负载均衡,有助于控制HTTP和TCP客户端行为。SpringCloudNetflixRibbon一般配合Ribbon进行使用,利用在Eureka中读取的服务信息,在调用服务节点时合理进行负载
2022-12-12

如何用好PS中的钢笔工具 Photoshop钢笔工具的使用介绍

很多学习初学potoshop的朋友都对【钢笔工具】又爱又恨。它的功能十分强大,甚至达到了BT的地步。它不仅可以用来抠图,还可以描绘出幻化多端的各种线条,美轮美奂;更厉害的是在ps中设计出来的线条还可以应用到3dmax等其它软件中使用。但很多
2022-06-04

windows8消费者预览版中高级启动使用介绍

适用范围: Windows 8 消费者预览版 操作步骤: 1、系统下按“Windows+C”键打开应用Charm菜单 ,选择“设置”。 参见下图2、在右侧弹出菜单中选择“更多电脑设
2022-06-04

win7系统右击桌面快捷小工具使用介绍

在Win7系统中,右击桌面可以访问快捷小工具,这些小工具可以帮助你快速访问一些常用的功能或者进行一些操作。以下是几个常用的快捷小工具及其使用介绍:1. 个性化:通过该小工具,你可以快速打开个性化设置,以便更改桌面背景、屏幕保护程序、窗口颜色
2023-09-02

AI 代码生成工具 Cursor 安装和使用介绍大全

1: 概述 嘿,小伙伴们,听说你们要了解一款基于 AI 技术的代码生成工具 Cursor,让我们来一起看看如何安装和使用吧! Cursor 是一款基于 AI 技术的代码生成工具,它可以帮助开发人员自动生成代码,提高开发效率。下面是安装和使用
2023-08-18

将URL地址都变成ooooooooo的神奇小工具使用介绍

这篇文章主要为大家介绍了将URL地址都变成ooooooooo的神奇小工具使用介绍,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
2023-05-17

C/C++程序链接与反汇编工具objdump的使用介绍

这篇文章主要介绍了C/C++程序链接与反汇编工具objdump的使用,程序构建过程的第二个阶段就是链接,链接过程输入的是目标文件的集合。每个目标文件可以被看作单个源代码文件的二进制存储版本
2023-02-03

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录