我的编程空间,编程开发者的网络收藏夹
学习永远不晚

SpringCloudStream高级特性使用详解

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

SpringCloudStream高级特性使用详解

重试

Consumer端可以配置重试次数,当消息消费失败的时候会进行重试。

底层使用Spring Retry去重试,重试次数可自定义配置。

# 默认重试次数为3,配置大于1时才会生效
spring.cloud.stream.bindings.<channelName>.consumer.maxAttempte=3 

消息发送失败的处理

Producer发送消息出错的情况下,可以配置错误处理,将错误信息发送给对应ID的MessageChannel

  • 消息发送失败的场景下,会将消息发送到一个MessageChannel。这个MessageChannel会取ApplicationContext中name为topic.errorstopic就是配置的destination)的Bean。
  • 如果找不到就会自动构建一个PublishSubscribeChannel
  • 然后使用BridgeHandler订阅这个MessageChannel,同时再设置ApplicationContext中name为errorChannelPublishSubscribeChannel消息通道为BridgeHandleroutputChannel
    public static final String ERROR_CHANNEL_BEAN_NAME = "errorChannel"
    private SubscribableChannel registerErrorInfrastructure(
            ProducerDestination destination) {
        // destination.getName() + ".errors"
        String errorChannelName = errorsBaseName(destination);
        SubscribableChannel errorChannel;
        if (getApplicationContext().containsBean(errorChannelName)) {
            Object errorChannelObject = getApplicationContext().getBean(errorChannelName);
            if (!(errorChannelObject instanceof SubscribableChannel)) {
                throw new IllegalStateException("Error channel '" + errorChannelName
                        + "' must be a SubscribableChannel");
            }
            errorChannel = (SubscribableChannel) errorChannelObject;
        }
        else {
            errorChannel = new PublishSubscribeChannel();
            ((GenericApplicationContext) getApplicationContext()).registerBean(
                    errorChannelName, SubscribableChannel.class, () -> errorChannel);
        }
        MessageChannel defaultErrorChannel = null;
        if (getApplicationContext()
                .containsBean(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)) {
            defaultErrorChannel = getApplicationContext().getBean(
                    IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME,
                    MessageChannel.class);
        }
        if (defaultErrorChannel != null) {
            BridgeHandler errorBridge = new BridgeHandler();
            errorBridge.setOutputChannel(defaultErrorChannel);
            errorChannel.subscribe(errorBridge);
            String errorBridgeHandlerName = getErrorBridgeName(destination);
            ((GenericApplicationContext) getApplicationContext()).registerBean(
                    errorBridgeHandlerName, BridgeHandler.class, () -> errorBridge);
        }
        return errorChannel;
    }
  • 示例代码
spring.cloud.stream.bindings.output.destination=test-output
# 消息发送失败的处理逻辑默认是关闭的
spring.cloud.stream.bindings.output.producer.errorChannelEnabled=true
    @Bean("test-output.errors")
    MessageChannel testOutputErrorChannel() {
        return new PublishSubscribeChannel();
    }
    @Service
    class ErrorProduceService {
        @ServiceActivator(inputChannel = "test-output.errors")
        public void receiveProduceError(Message receiveMsg) {
            System.out.println("receive error msg: " + receiveMsg);
        }
    }

消费错误处理

Consumer消费消息出错的情况下,可以配置错误处理,将错误信息发给对应ID的MessageChannel

消息错误处理与生产错误处理大致相同。错误的MessageChannel对应的name为topic.group.errors,还会加上多个MessageHandler订阅的一些判断,使用ErrorMessageStrategy创建错误消息等内容。

  • 示例代码
spring.cloud.stream.bindings.input.destination=test-input
spring.cloud.stream.bindings.input.group=test-input-group
@StreamListener(Sink.INPUT)
public void receive(String receiveMsg) {
    throw new RuntimeException("Oops");
}
@ServiceActivator(inputChannel = "test-input.test-input-group.errors")
public void receiveConsumeError(Message receiveMsg) {
    System.out.println("receive error msg: " + receiveMsg);
}

建议直接使用topic.group.errors这个消息通道,并设置发送到单播模式的DirectChannel消息通道中(使用@ServiceActivator注解接收会直接构成DirectChannel),这样会确保只会被唯一的一个订阅了topic.group.errorsMessageHandler处理,否则可能会被多个MessageHandler处理,导致出现一些意想不到的结果。

自定义MessageHandler类型

默认情况下,Output Binding对应的MessageChannelInput Binding对应的SubscribeChannel会被构造成DirectChannel

SCS提供了BindingTargetFactory接口进行扩展,比如可以扩展构造PublishSubscribeChannel这种广播类型的MessageChannel

BindingTargetFactory接口只有两个实现类

  • SubscribableChannelBindingTargetFactory:针对Input BindingOutput Binding都会构造成DirectWithAttributesChannel类型的MessageChannel(一种带有HashMap属性的DirectChannel)。
  • MessageSourceBindingTargetFactory:不支持Output BindingInput Binding会构造成DefaultPollableMessageSourceDefaultPollableMessageSource内部维护着MessageSource属性,该属性用于拉取消息。

Endpoint端点

SCS提供了BindingsEndpoint,可以获取Binding信息或对Binding生命周期进行修改,比如startstoppauseresume

BindingsEndpoint的ID是bindings,对外暴露了一下3个操作:

  • 修改Binding状态,可以改成STARTEDSTOPPEDPAUSEDRESUMED,对应Binding接口的4个操作。
  • 查询单个Binding的状态信息。
  • 查询所有Binding的状态信息。
@Endpoint(id = "bindings")
public class BindingsEndpoint {
  ...
  @WriteOperation
    public void changeState(@Selector String name, State state) {
        Binding<?> binding = BindingsEndpoint.this.locateBinding(name);
        if (binding != null) {
            switch (state) {
            case STARTED:
                binding.start();
                break;
            case STOPPED:
                binding.stop();
                break;
            case PAUSED:
                binding.pause();
                break;
            case RESUMED:
                binding.resume();
                break;
            default:
                break;
            }
        }
    }
    @ReadOperation
    public List<?> queryStates() {
        List<Binding<?>> bindings = new ArrayList<>(gatherInputBindings());
        bindings.addAll(gatherOutputBindings());
        return this.objectMapper.convertValue(bindings, List.class);
    }
    @ReadOperation
    public Binding<?> queryState(@Selector String name) {
        Assert.notNull(name, "'name' must not be null");
        return this.locateBinding(name);
    }
  ...
}

Metrics指标

该功能自动与micrometer集成进行Metrics统计,可以通过前缀spring.cloud.stream.metrics进行相关配置,配置项spring.cloud.stream.bindings.applicationMetrics.destination会构造MetersPublisherBinding,将相关的metrics发送到MQ中。

Serverless

默认与Spring Cloud Function集成。

可以使用Function处理消息。配置文件需要加上function配置。

spring.cloud.stream.function.definition=uppercase | addprefix

  @Bean
  public Function<String, String> uppercase() {
      return x -> x.toUpperCase();
  }
  @Bean
  public Function<String, String> addprefix() {
      return x -> "prefix-" + x;
  }

Partition统一

SCS统一Partition相关的设置,可以屏蔽不同MQ Partition的设置。

Producer Binding提供的ProducerProperties提供了一些Partition相关的配置:

  • partitionKeyExpression:partition key提取表达式。
  • partitionKeyExtractorName:是一个实现PartitionKeyExtractorStrategy接口的Bean name。PartitionKeyExtractorStrategy是一个根据Message获取partition key的接口。如果两者都配置,优先级高于partitionKeyExtractorName
  • partitionSelectorName:是一个实现PartitionSelectorStrategy接口的Bean name。PartitionSelectorStrategy是一个根据partition key决定选择哪个partition 的接口。
  • partitionSelectorExpression:partition 选择表达式,会根据表达式和partition key得到最终的partition。如果两者都配置,优先partitionSelectorExpression表达式解析partition。
  • partitionCount:partition 个数。该属性不一定会生效,Kafka Binder 和RocketMQ Binder会使用topic上的partition 个数覆盖该属性。
public final class PartitioningInterceptor implements ChannelInterceptor {
      ...
      @Override
      public Message<?> preSend(Message<?> message, MessageChannel channel) {
      if (!message.getHeaders().containsKey(BinderHeaders.PARTITION_OVERRIDE)) {
        int partition = this.partitionHandler.determinePartition(message);
        return MessageConverterConfigurer.this.messageBuilderFactory
          .fromMessage(message)
          .setHeader(BinderHeaders.PARTITION_HEADER, partition).build();
      }
      else {
        return MessageConverterConfigurer.this.messageBuilderFactory
          .fromMessage(message)
          .setHeader(BinderHeaders.PARTITION_HEADER,
                     message.getHeaders()
                     .get(BinderHeaders.PARTITION_OVERRIDE))
          .removeHeader(BinderHeaders.PARTITION_OVERRIDE).build();
      }
    }
}
public class PartitionHandler {
      ...
      public int determinePartition(Message<?> message) {
        Object key = extractKey(message);
        int partition;
        if (this.producerProperties.getPartitionSelectorExpression() != null) {
            partition = this.producerProperties.getPartitionSelectorExpression()
                    .getValue(this.evaluationContext, key, Integer.class);
        }
        else {
            partition = this.partitionSelectorStrategy.selectPartition(key,
                    this.partitionCount);
        }
        // protection in case a user selector returns a negative.
        return Math.abs(partition % this.partitionCount);
    }
    private Object extractKey(Message<?> message) {
        Object key = invokeKeyExtractor(message);
        if (key == null && this.producerProperties.getPartitionKeyExpression() != null) {
            key = this.producerProperties.getPartitionKeyExpression()
                    .getValue(this.evaluationContext, message);
        }
        Assert.notNull(key, "Partition key cannot be null");
        return key;
    }
      ...
}

Polling Consumer

实现MessageSource进行polling操作的Consumer

普通的Pub/Sub模式需要定义SubscribeableChannel类型的返回值,Polling Consumer需要定义PollableMessageSource类型的返回值。

public interface PollableSink {
    
    String INPUT = "input";
    
    @Input(Sink.INPUT)
    PollableMessageSource input();
}

支持多个Binder同时使用

支持多个Binder同时使用,在配置Binding的时候需要指定对应的Binder

配置全局默认的Binderspring.cloud.stream.default-binder=rocketmq

配置各个Binder内部的配置信息:

spring.cloud.stream.binders.rocketmq.environment.<xx>=xx

spring.cloud.stream.binders.rocketmq.type=rocketmq

配置Binding对应的Binder

spring.cloud.stream.bindings.<channelName>.binder=kafka

spring.cloud.stream.bindings.<channelName>.binder=rocketmq

spring.cloud.stream.bindings.<channelName>.binder=rabbit

建立事件机制

比如,新建BindingCreateEvent事件,用户的应用就可以监听该事件在创建Input BindingOutput Binding 时做业务相关的处理。

以上就是Spring Cloud Stream 高级特性使用详解的详细内容,更多关于Spring Cloud Stream 高级特性的资料请关注编程网其它相关文章!

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

SpringCloudStream高级特性使用详解

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

JavaScript高级 ES7-ES13 新特性详解

这篇文章主要介绍了JavaScript高级 ES7-ES13 新特性详解,本文结合实例代码给大家讲解的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
2023-02-06

详解高性能mysql之MySQL高级特性总结

MySQL是一款广泛使用的关系型数据库管理系统,具有高性能和高可靠性的特点。在高性能MySQL中,有一些高级特性可以帮助提升数据库的性能和可靠性。下面是对这些高级特性的详细解释。1. 主从复制(Master-Slave Replicatio
2023-09-22

PHP高级特性如何使用

这篇文章主要介绍“PHP高级特性如何使用”,在日常操作中,相信很多人在PHP高级特性如何使用问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”PHP高级特性如何使用”的疑惑有所帮助!接下来,请跟着小编一起来学习吧
2023-06-22

React高级特性Context万字详细解读

React的context就是一个全局变量,可以从根组件跨级别在React的组件中传递。Reactcontext的API有两个版本,React16.x之前的是老版本的context,之后的是新版本的context
2022-11-16

JavaRabbitMQ高级特性详细分析

为了保证消息的可靠性传输,包括投递消息的生产方能投递成功,和消息消费的消费方正确消费,RabbitMQ提供了两个确认机制,由于消息按照流通的顺序从左到右,因此为保证可靠性,MQ必须对Producer进行确认,Consumer必须对MQ进行确认
2022-11-13

PHP面向对象编程:高级特性详解

php 的 oop 高级特性包括:接口:定义方法,确保不同类具有相似行为。多态性:子类对象实现父类方法,提供灵活性。命名空间:组织代码,避免命名冲突。特性:复用代码,无需继承即可添加方法和属性。魔法方法:允许对象与语言特定方式交互。PHP
PHP面向对象编程:高级特性详解
2024-05-10

Go高级特性之并发处理http详解

本文深入讲解Go语言高级特性之并发处理HTTP,介绍了Goroutine、channel、Context、Middleware、服务发现和负载均衡等关键特性。通过具体的实现步骤和示例代码,阐述了如何使用Go编写并发高效的HTTP服务。文中还提供了最佳实践,指导开发者编写健壮可靠的HTTP服务。
Go高级特性之并发处理http详解
2024-04-02

PHP高级特性:使用Redis实现高速缓存

在 php 中使用 redis 实现高速缓存,可显著提升应用程序性能和可扩展性。首先安装 redis,其次使用 predis 库连接到 redis。可通过 set 方法设置缓存,使用 get 方法获取缓存。实战案例演示如何设置和获取缓存项,
PHP高级特性:使用Redis实现高速缓存
2024-05-15

Vue高级特性概念原理详细分析

这篇文章主要介绍了Vue高级特性概念原理,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习吧
2023-03-03

PHP高级特性:使用Traits巧妙地复用代码

php 中的 traits 是一种特性,它允许代码复用,无需继承。主要优势包括:代码复用:在不同类之间共享代码,减少重复。灵活性:可以随时添加到类中,而无需重写或扩展类。避免多重继承:提供代码复用的替代方法,消除多重继承的复杂性和风险。PH
PHP高级特性:使用Traits巧妙地复用代码
2024-05-15

PHP高级特性:揭秘注解的力量

php注释中引入了注解,使注释更强大、更有用。注解语法以@符号后跟注解名称作为前缀,主要类型包括:类型注解:指定变量和函数参数的数据类型,有助于验证程序的正确性。类型提示:与类型注解类似,但不是php语法的一部分,用于ide和静态分析工具。
PHP高级特性:揭秘注解的力量
2024-05-14

MySQL高级特性——数据表分区的概念及机制详解

目录分区机制SELECT 查询INSERT 操作DELETE 操作UPDATE 操作分区的类型MySQL 的分区的实现方式是对数据表进行一层包装,这意味着索引实际是基于每个分区定义的,而不是整张表。这个特性和 Oracle 是不同的,在 O
2022-05-16

全面详解Maven打包及其相关插件和高级特性

这篇文章主要为大家介绍了Maven打包及其相关插件和高级特性的全面详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
2023-05-19

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录