我的编程空间,编程开发者的网络收藏夹
学习永远不晚

spring kafka @KafkaListener详解与使用过程

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

spring kafka @KafkaListener详解与使用过程

说明

  • 从2.2.4版开始,您可以直接在注释上指定Kafka使用者属性,这些属性将覆盖在使用者工厂中配置的具有相同名称的所有属性。您不能通过这种方式指定group.id和client.id属性。他们将被忽略;
  • 可以使用#{…​}或属性占位符(${…​})在SpEL上配置注释上的大多数属性。

比如:

   @KafkaListener(id = "consumer-id",topics = "SHI_TOPIC1",concurrency = "${listen.concurrency:3}",
            clientIdPrefix = "myClientId")

属性concurrency将会从容器中获取listen.concurrency的值,如果不存在就默认用3

@KafkaListener详解

id 监听器的id

①. 消费者线程命名规则

填写:

2020-11-19 14:24:15 c.d.b.k.KafkaListeners 120 [INFO] 线程:Thread[consumer-id5-1-C-1,5,main]-groupId:BASE-DEMO consumer-id5 消费

没有填写ID:

2020-11-19 10:41:26 c.d.b.k.KafkaListeners 137 [INFO] 线程:Thread[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1,5,main] consumer-id7

②.在相同容器中的监听器ID不能重复

否则会报错

Caused by: java.lang.IllegalStateException: Another endpoint is already registered with id

③.会覆盖消费者工厂的消费组GroupId

假如配置文件属性配置了消费组kafka.consumer.group-id=BASE-DEMO
正常情况它是该容器中的默认消费组
但是如果设置了 @KafkaListener(id = "consumer-id7", topics = {"SHI_TOPIC3"})
那么当前消费者的消费组就是consumer-id7 ;

当然如果你不想要他作为groupId的话 可以设置属性idIsGroup = false;那么还是会使用默认的GroupId;

④. 如果配置了属性groupId,则其优先级最高

 @KafkaListener(id = "consumer-id5",idIsGroup = false,topics = "SHI_TOPIC3",groupId = "groupId-test")

例如上面代码中最终这个消费者的消费组GroupId是 “groupId-test”

该id属性(如果存在)将用作Kafka消费者group.id属性,并覆盖消费者工厂中的已配置属性(如果存在)您还可以groupId显式设置或将其设置idIsGroup为false,以恢复使用使用者工厂的先前行为group.id。

groupId 消费组名

指定该消费组的消费组名; 关于消费组名的配置可以看看上面的 id 监听器的id

如何获取消费者 group.id

在监听器中调用KafkaUtils.getConsumerGroupId()可以获得当前的groupId; 可以在日志中打印出来; 可以知道是哪个客户端消费的;

topics 指定要监听哪些topic(与topicPattern、topicPartitions 三选一)

可以同时监听多个
topics = {"SHI_TOPIC3","SHI_TOPIC4"}

topicPattern 匹配Topic进行监听(与topics、topicPartitions 三选一) topicPartitions 显式分区分配

可以为监听器配置明确的主题和分区(以及可选的初始偏移量)

@KafkaListener(id = "thing2", topicPartitions =
        { @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
          @TopicPartition(topic = "topic2", partitions = "0",
             partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
        })
public void listen(ConsumerRecord<?, ?> record) {
    ...
}

上面例子意思是 监听topic1的0,1分区;监听topic2的第0分区,并且第1分区从offset为100的开始消费;

errorHandler 异常处理

实现KafkaListenerErrorHandler; 然后做一些异常处理;

@Component
public class KafkaDefaultListenerErrorHandler implements KafkaListenerErrorHandler {
    @Override
    public Object handleError(Message<?> message, ListenerExecutionFailedException exception) {
        return null;
    }

    @Override
    public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
    	//do someting
        return null;
    }
}

调用的时候 填写beanName;例如errorHandler="kafkaDefaultListenerErrorHandler"

containerFactory 监听器工厂

指定生成监听器的工厂类;

例如我写一个 批量消费的工厂类

    
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> batchFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(kafkaConsumerFactory());
        //设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
        factory.setBatchListener(true);
        return factory;
    }

使用containerFactory = "batchFactory"

clientIdPrefix 客户端前缀

会覆盖消费者工厂的kafka.consumer.client-id属性; 最为前缀后面接 -n n是数字

concurrency并发数

会覆盖消费者工厂中的concurrency ,这里的并发数就是多线程消费; 比如说单机情况下,你设置了3; 相当于就是启动了3个客户端来分配消费分区;分布式情况 总线程数=concurrency*机器数量; 并不是设置越多越好,具体如何设置请看Java concurrency之集合

    
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> concurrencyFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(kafkaConsumerFactory());
        factory.setConcurrency(6);
        return factory;
    }
    @KafkaListener(id = "consumer-id5",idIsGroup = false,topics = "SHI_TOPIC3", containerFactory = "concurrencyFactory",concurrency = "1)

虽然使用的工厂是concurrencyFactory(concurrency配置了6); 但是他最终生成的监听器数量 是1;

properties 配置其他属性

kafka中的属性看org.apache.kafka.clients.consumer.ConsumerConfig ;
同名的都可以修改掉;

用法

    @KafkaListener(id = "consumer-id5",idIsGroup = false,topics = "SHI_TOPIC3", containerFactory = "concurrencyFactory",concurrency = "1"
            , clientIdPrefix = "myClientId5",groupId = "groupId-test",
            properties = {
                    "enable.auto.commit:false","max.poll.interval.ms:6000" },errorHandler="kafkaDefaultListenerErrorHandler")

@KafkaListener使用

KafkaListenerEndpointRegistry

    @Autowired
    private KafkaListenerEndpointRegistry registry;
       //.... 获取所有注册的监听器
        registry.getAllListenerContainers();

设置入参验证器

当您将Spring Boot与验证启动器一起使用时,将LocalValidatorFactoryBean自动配置:如下

@Configuration
@EnableKafka
public class Config implements KafkaListenerConfigurer {

    @Autowired
    private LocalValidatorFactoryBean validator;
    ...

    @Override
    public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
      registrar.setValidator(this.validator);
    }
}

使用

@KafkaListener(id="validated", topics = "annotated35", errorHandler = "validationErrorHandler",
      containerFactory = "kafkaJsonListenerContainerFactory")
public void validatedListener(@Payload @Valid ValidatedClass val) {
    ...
}

@Bean
public KafkaListenerErrorHandler validationErrorHandler() {
    return (m, e) -> {
        ...
    };
}

spring-kafka官方文档

扩展:Spring for Apache Kafka @KafkaListener使用及注意事项

官方文档:   https://docs.spring.io/spring-kafka/reference/html/

 @KafkaListener

The @KafkaListener annotation is used to designate a bean method as a listener for a listener container. The bean is wrapped in a MessagingMessageListenerAdapter configured with various features, such as converters to convert the data, if necessary, to match the method parameters.

If, say, six TopicPartition instances are provided and the concurrency is 3; each container gets two partitions. For five TopicPartition instances, two containers get two partitions, and the third gets one. If the concurrency is greater than the number of TopicPartitions, the concurrency is adjusted down such that each container gets one partition.

You can now configure a KafkaListenerErrorHandler to handle exceptions. See Handling Exceptions for more information.

By default, the @KafkaListener id property is now used as the group.id property, overriding the property configured in the consumer factory (if present). Further, you can explicitly configure the groupId on the annotation. Previously, you would have needed a separate container factory (and consumer factory) to use different group.id values for listeners. To restore the previous behavior of using the factory configured group.id, set the idIsGroup property on the annotation to false.

示例:

   demo类:

public class Listener {

    @KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId")
    public void listen(String data) {
        ...
    }

}</code>

配置类及注解:
@Configuration
@EnableKafka
public class KafkaConfig {

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
                        kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
        ...
        return props;
    }
}

到此这篇关于spring-kafka @KafkaListener详解与使用的文章就介绍到这了,更多相关spring-kafka使用内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

spring kafka @KafkaListener详解与使用过程

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

spring kafka @KafkaListener详解与使用过程

这篇文章主要介绍了spring-kafka @KafkaListener详解与使用,本文结合实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
2023-02-20

spring kafka框架中@KafkaListener注解怎么使用

这篇文章主要介绍“spring kafka框架中@KafkaListener注解怎么使用”,在日常操作中,相信很多人在spring kafka框架中@KafkaListener注解怎么使用问题上存在疑惑,小编查阅了各式资料,整理出简单好用的
2023-07-05

ReactContext详解使用过程

在Reactor中提供了Context来替代ThreadLocal,可以实现一个跨线程的共享变量的透明方式。本文主要为大家介绍了Context的用法的用法,感兴趣的可以了解一下
2023-03-06

Spring Integration概述与怎么使用详解

公司项目需要用到springintegration,而网上关于springintegration的有价值的参考资料比较少,下面这篇文章主要给大家介绍了关于Spring Integration概述与怎么使用的相关资料,需要的朋友可以参考下
2023-02-24

详解Spring的@Value作用与使用场景

这篇文章主要介绍了详解Spring的@Value作用与使用场景,Spring为大家提供许多开箱即用的功能,@Value就是一个极其常用的功能,它能将配置信息注入到bean中去,需要的朋友可以参考下
2023-05-19

SpringBoot使用Spark过程详解

这篇文章主要介绍SpringBoot使用Spark的方法的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望能帮助大家解决问题
2023-02-15

一文详解Spring AOP的配置与使用

面向切面编程(俗称AOP)提供了一种面向对象编程(俗称OOP)的补充,面向对象编程最核心的单元是类(class),然而面向切面编程最核心的单元是切面(Aspects)。本文就来和大家聊聊AOP的配置与使用,感兴趣的可以了解一下
2022-11-13

详解redis与spring的整合(使用缓存)

1、实现目标通过redis缓存数据。(目的不是加快查询的速度,而是减少数据库的负担)  2、所需jar包注意:jdies和commons-pool两个jar的版本是有对应关系的,注意引入jar包是要配对使用,否则将会报错。因为commons
2023-05-31

详解使用Spring Boot开发Restful程序

一、简介Spring Boot是由Pivotal团队提供的全新框架,其设计目的是用来简化新spring应用的初始搭建以及开发过程。该框架使用了特定的方式来进行配置,从而使开发人员不再需要定义样板化的配置。通过这种方式,Boot致力于在蓬勃发
2023-05-31

spring的异步执行使用与源码详解

这篇文章主要介绍了spring的异步执行使用与源码详解,Spring中通过在方法上设置@Async注解,可使得方法被异步调用,需要的朋友可以参考下
2023-05-20

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录