我的编程空间,编程开发者的网络收藏夹
学习永远不晚

Spring Boot中怎么使用@KafkaListener并发批量接收消息

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

Spring Boot中怎么使用@KafkaListener并发批量接收消息

这篇“Spring Boot中怎么使用@KafkaListener并发批量接收消息”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“Spring Boot中怎么使用@KafkaListener并发批量接收消息”文章吧。

###第一步,并发消费###
先看代码,重点是这我们使用的是ConcurrentKafkaListenerContainerFactory并且设置了factory.setConcurrency(4); (我的topic有4个分区,为了加快消费将并发设置为4,也就是有4个KafkaMessageListenerContainer)

    @Bean    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();        factory.setConsumerFactory(consumerFactory());        factory.setConcurrency(4);        factory.setBatchListener(true);        factory.getContainerProperties().setPollTimeout(3000);        return factory;    }

注意也可以直接在application.properties中添加spring.kafka.listener.concurrency=3,然后使用@KafkaListener并发消费。

###第二步,批量消费###
然后是批量消费。重点是factory.setBatchListener(true);
以及 propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
一个设启用批量消费,一个设置批量消费每次最多消费多少条消息记录。

重点说明一下,我们设置的ConsumerConfig.MAX_POLL_RECORDS_CONFIG是50,并不是说如果没有达到50条消息,我们就一直等待。官方的解释是"The maximum number of records returned in a single call to poll().", 也就是50表示的是一次poll最多返回的记录数。

从启动日志中可以看到还有个 max.poll.interval.ms = 300000, 也就说每间隔max.poll.interval.ms我们就调用一次poll。每次poll最多返回50条记录。

max.poll.interval.ms官方解释是"The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member. ";

    @Bean    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();        factory.setConsumerFactory(consumerFactory());        factory.setConcurrency(4);        factory.setBatchListener(true);        factory.getContainerProperties().setPollTimeout(3000);        return factory;    }   @Bean    public Map<String, Object> consumerConfigs() {        Map<String, Object> propsMap = new HashMap<>();        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, propsConfig.getBroker());        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, propsConfig.getEnableAutoCommit());        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, propsConfig.getGroupId());        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, propsConfig.getAutoOffsetReset());        propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);        return propsMap;    }

启动日志截图

Spring Boot中怎么使用@KafkaListener并发批量接收消息

关于max.poll.records和max.poll.interval.ms官方解释截图:

Spring Boot中怎么使用@KafkaListener并发批量接收消息

###第三步,分区消费###
对于只有一个分区的topic,不需要分区消费,因为没有意义。下面的例子是针对有2个分区的情况(我的完整代码中有4个listenPartitionX方法,我的topic设置了4个分区),读者可以根据自己的情况进行调整。

public class MyListener {    private static final String TPOIC = "topic02";    @KafkaListener(id = "id0", topicPartitions = { @TopicPartition(topic = TPOIC, partitions = { "0" }) })    public void listenPartition0(List<ConsumerRecord<?, ?>> records) {        log.info("Id0 Listener, Thread ID: " + Thread.currentThread().getId());        log.info("Id0 records size " +  records.size());        for (ConsumerRecord<?, ?> record : records) {            Optional<?> kafkaMessage = Optional.ofNullable(record.value());            log.info("Received: " + record);            if (kafkaMessage.isPresent()) {                Object message = record.value();                String topic = record.topic();                log.info("p0 Received message={}",  message);            }        }    }    @KafkaListener(id = "id1", topicPartitions = { @TopicPartition(topic = TPOIC, partitions = { "1" }) })    public void listenPartition1(List<ConsumerRecord<?, ?>> records) {        log.info("Id1 Listener, Thread ID: " + Thread.currentThread().getId());        log.info("Id1 records size " +  records.size());        for (ConsumerRecord<?, ?> record : records) {            Optional<?> kafkaMessage = Optional.ofNullable(record.value());            log.info("Received: " + record);            if (kafkaMessage.isPresent()) {                Object message = record.value();                String topic = record.topic();                log.info("p1 Received message={}",  message);            }        }}

以上就是关于“Spring Boot中怎么使用@KafkaListener并发批量接收消息”这篇文章的内容,相信大家都有了一定的了解,希望小编分享的内容对大家有帮助,若想了解更多相关的知识内容,请关注编程网行业资讯频道。

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

Spring Boot中怎么使用@KafkaListener并发批量接收消息

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

Spring Boot中怎么使用@KafkaListener并发批量接收消息

这篇“Spring Boot中怎么使用@KafkaListener并发批量接收消息”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这
2023-07-05

如何在spring boot中使用spring-kafka实现一个接收消息功能

本篇文章为大家展示了如何在spring boot中使用spring-kafka实现一个接收消息功能,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。实现方法pom.xml文件如下
2023-05-31

C#怎么使用udp实现消息的接收和发送

本篇内容主要讲解“C#怎么使用udp实现消息的接收和发送”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“C#怎么使用udp实现消息的接收和发送”吧!使用udp实现消息的接收和发送代码比较简单,但是
2023-07-05

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录