我的编程空间,编程开发者的网络收藏夹
学习永远不晚

springkafka框架中@KafkaListener注解解读和使用案例

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

springkafka框架中@KafkaListener注解解读和使用案例

简介

Kafka 目前主要作为一个分布式的发布订阅式的消息系统使用,也是目前最流行的消息队列系统之一。因此,也越来越多的框架对 kafka 做了集成,比如本文将要说到的 spring-kafka。

Kafka 既然作为一个消息发布订阅系统,就包括消息生成者和消息消费者。本文主要讲述的 spring-kafka 框架的 kafkaListener 注解的深入解读和使用案例。

解读

源码解读

@Target({ ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE })

@Retention(RetentionPolicy.RUNTIME)

@MessageMapping

@Documented

@Repeatable(KafkaListeners.class)

public @interface KafkaListener {
   

   String id() default "";
   

   String containerFactory() default "";
   

   String[] topics() default {};
   

   String topicPattern() default "";
   

   TopicPartition[] topicPartitions() default {};
   

   String containerGroup() default "";
   

   String errorHandler() default "";
   

   String groupId() default "";
   

   boolean idIsGroup() default true;
   

   String clientIdPrefix() default "";
   

   String beanRef() default "__listener";
}

使用案例

ConsumerRecord 类消费

使用 ConsumerRecord 类接收有一定的好处,ConsumerRecord 类里面包含分区信息、消息头、消息体等内容,如果业务需要获取这些参数时,使用 ConsumerRecord 会是个不错的选择。如果使用具体的类型接收消息体则更加方便,比如说用 String 类型去接收消息体。

这里我们编写一个 Listener 方法,监听 "topic1"Topic,并把 ConsumerRecord 里面所包含的内容打印到控制台中:

@Component

public class Listener {
    private static final Logger log = LoggerFactory.getLogger(Listener.class);
    @KafkaListener(id = "consumer", topics = "topic1")

    public void consumerListener(ConsumerRecord record) {

        log.info("topic.quick.consumer receive : " + record.toString());

    }
}

批量消费

批量消费在现实业务场景中是很有实用性的。因为批量消费可以增大 kafka 消费吞吐量, 提高性能。

批量消费实现步骤:

1、重新创建一份新的消费者配置,配置为一次拉取 10 条消息

2、创建一个监听容器工厂,命名为:batchContainerFactory,设置其为批量消费并设置并发量为 5,这个并发量根据分区数决定,必须小于等于分区数,否则会有线程一直处于空闲状态。

3、创建一个分区数为 8 的 Topic。

4、创建监听方法,设置消费 id 为 “batchConsumer”,clientID 前缀为“batch”,监听“batch”,使用“batchContainerFactory” 工厂创建该监听容器。

@Component

public class BatchListener {
    private static final Logger log= LoggerFactory.getLogger(BatchListener.class);
    private Map consumerProps() {

        Map props = new HashMap<>();

        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);

        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");

        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");

        //一次拉取消息数量

        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10");

        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,

                NumberDeserializers.IntegerDeserializer.class);

        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,

                StringDeserializer.class);

        return props;

    }
    @Bean("batchContainerFactory")

    public ConcurrentKafkaListenerContainerFactory listenerContainer() {

        ConcurrentKafkaListenerContainerFactory container

                = new ConcurrentKafkaListenerContainerFactory();

        container.setConsumerFactory(new DefaultKafkaConsumerFactory(consumerProps()));

        //设置并发量,小于或等于Topic的分区数

        container.setConcurrency(5);

        //必须 设置为批量监听

        container.setBatchListener(true);

        return container;

    }
    @Bean

    public NewTopic batchTopic() {

        return new NewTopic("topic.batch", 8, (short) 1);

    }
    @KafkaListener(id = "batchConsumer",clientIdPrefix = "batch"

            ,topics = {"topic.batch"},containerFactory = "batchContainerFactory")

    public void batchListener(List data) {

        log.info("topic.batch  receive : ");

        for (String s : data) {

            log.info(  s);

        }

    }

}

监听 Topic 中指定的分区

使用 @KafkaListener 注解的 topicPartitions 属性监听不同的 partition 分区。

@TopicPartition:topic-- 需要监听的 Topic 的名称,partitions – 需要监听 Topic 的分区 id。

partitionOffsets – 可以设置从某个偏移量开始监听,@PartitionOffset:partition – 分区 Id,非数组,initialOffset – 初始偏移量。

@Bean

public NewTopic batchWithPartitionTopic() {

    return new NewTopic("topic.batch.partition", 8, (short) 1);

}
@KafkaListener(id = "batchWithPartition",clientIdPrefix = "bwp",containerFactory = "batchContainerFactory",

        topicPartitions = {

                @TopicPartition(topic = "topic.batch.partition",partitions = {"1","3"}),

                @TopicPartition(topic = "topic.batch.partition",partitions = {"0","4"},

                        partitionOffsets = @PartitionOffset(partition = "2",initialOffset = "100"))

        }

)

public void batchListenerWithPartition(List data) {

    log.info("topic.batch.partition  receive : ");

    for (String s : data) {

        log.info(s);

    }

}

注解方式获取消息头及消息体

当你接收的消息包含请求头,以及你监听方法需要获取该消息非常多的字段时可以通过这种方式。。这里使用的是默认的监听容器工厂创建的,如果你想使用批量消费,把对应的类型改为 List 即可,比如 List data , List key。

@Payload:获取的是消息的消息体,也就是发送内容

@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY):获取发送消息的 key

@Header(KafkaHeaders.RECEIVED_PARTITION_ID):获取当前消息是从哪个分区中监听到的

@Header(KafkaHeaders.RECEIVED_TOPIC):获取监听的 TopicName

@Header(KafkaHeaders.RECEIVED_TIMESTAMP):获取时间戳

@KafkaListener(id = "params", topics = "topic.params")

public void otherListener(@Payload String data,

                         @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key,

                         @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,

                         @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,

                         @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts) {

    log.info("topic.params receive : \n"+

            "data : "+data+"\n"+

            "key : "+key+"\n"+

            "partitionId : "+partition+"\n"+

            "topic : "+topic+"\n"+

            "timestamp : "+ts+"\n"

    );

}

使用 Ack 机制确认消费

Kafka 是通过最新保存偏移量进行消息消费的,而且确认消费的消息并不会立刻删除,所以我们可以重复的消费未被删除的数据,当第一条消息未被确认,而第二条消息被确认的时候,Kafka 会保存第二条消息的偏移量,也就是说第一条消息再也不会被监听器所获取,除非是根据第一条消息的偏移量手动获取。Kafka 的 ack 机制可以有效的确保消费不被丢失。因为自动提交是在 kafka 拉取到数据之后就直接提交,这样很容易丢失数据,尤其是在需要事物控制的时候。

使用 Kafka 的 Ack 机制比较简单,只需简单的三步即可:

  • 设置 ENABLE_AUTO_COMMIT_CONFIG=false,禁止自动提交
  • 设置 AckMode=MANUAL_IMMEDIATE
  • 监听方法加入 Acknowledgment ack 参数

4.使用 Consumer.seek 方法,可以指定到某个偏移量的位置

@Component

public class AckListener {

    private static final Logger log = LoggerFactory.getLogger(AckListener.class);
    private Map consumerProps() {

        Map props = new HashMap<>();

        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");

        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");

        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);

        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        return props;

    }
    @Bean("ackContainerFactory")

    public ConcurrentKafkaListenerContainerFactory ackContainerFactory() {

        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();

        factory.setConsumerFactory(new DefaultKafkaConsumerFactory(consumerProps()));

        factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);

        factory.setConsumerFactory(new DefaultKafkaConsumerFactory(consumerProps()));

        return factory;

    }
    @KafkaListener(id = "ack", topics = "topic.ack", containerFactory = "ackContainerFactory")

    public void ackListener(ConsumerRecord record, Acknowledgment ack) {

        log.info("topic.quick.ack receive : " + record.value());

        ack.acknowledge();

    }

}

解决重复消费

上一节中使用 ack 手动提交偏移量时,假如 consumer 挂了重启,那它将从 committed offset 位置开始重新消费,而不是 consume offset 位置。这也就意味着有可能重复消费。

在 0.9 客户端中,有 3 种 ack 策略:

策略 1: 自动的,周期性的 ack。

策略 2:consumer.commitSync(),调用 commitSync,手动同步 ack。每处理完 1 条消息,commitSync 1 次。

策略 3:consumer. commitASync(),手动异步 ack。、

那么使用策略 2,提交每处理完 1 条消息,就发送一次 commitSync。那这样是不是就可以解决 “重复消费” 了呢?如下代码:

while (true) {

        List buffer = new ArrayList<>();

        ConsumerRecords records = consumer.poll(100);

        for (ConsumerRecord record : records) {

            buffer.add(record);

        }

        insertIntoDb(buffer);    //消除处理,存到db

        consumer.commitSync();   //同步发送ack

        buffer.clear();

    }

}

答案是否定的!因为上面的 insertIntoDb 和 commitSync 做不到原子操作:如果在数据处理完成,commitSync 的时候挂了,服务器再次重启,消息仍然会重复消费。

     那么如何解决重复消费的问题呢?答案是自己保存 committed offset,而不是依赖 kafka 的集群保存 committed offset,把消息的处理和保存 offset 做成一个原子操作,并且对消息加入唯一 id, 进行判重。

依照官方文档, 要自己保存偏移量, 需要:

  • enable.auto.commit=false, 禁用自动 ack。
  • 每次取到消息,把对应的 offset 存下来。
  • 下次重启,通过 consumer.seek 函数,定位到自己保存的 offset,从那开始消费。
  • 更进一步处理可以对消息加入唯一 id, 进行判重。

到此这篇关于kafka @KafkaListener 注解解读的文章就介绍到这了,更多相关@KafkaListener 注解内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

springkafka框架中@KafkaListener注解解读和使用案例

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

springkafka框架中@KafkaListener注解解读和使用案例

Kafka目前主要作为一个分布式的发布订阅式的消息系统使用,也是目前最流行的消息队列系统之一,这篇文章主要介绍了kafka@KafkaListener注解解读,需要的朋友可以参考下
2023-02-20

spring kafka框架中@KafkaListener注解怎么使用

这篇文章主要介绍“spring kafka框架中@KafkaListener注解怎么使用”,在日常操作中,相信很多人在spring kafka框架中@KafkaListener注解怎么使用问题上存在疑惑,小编查阅了各式资料,整理出简单好用的
2023-07-05

Lombok中@Builder和@SuperBuilder注解的用法案例

@Builder 是 lombok 中的注解,可以使用builder()构造的Person.PersonBuilder对象进行链式调用,给所有属性依次赋值,这篇文章主要介绍了Lombok中@Builder和@SuperBuilder注解的用法,需要的朋友可以参考下
2023-01-11

举例讲解Android应用开发中OTTO框架的基本使用

OTTO是一个EventBus类型的事件传输总线,它可以提供“存储转发”的功能,让你APP中各个组件的交流更加便利,让你的程序分层更加清晰。 使用场景 OTTO基于Observer设计模式。它有发布者,订阅者这两个主要对象。OTTO的最佳实
2022-06-06

Retrofit网络请求框架之注解解析和动态代理方法怎么使用

本篇内容介绍了“Retrofit网络请求框架之注解解析和动态代理方法怎么使用”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!Retrofit是
2023-07-05

解析sessionstorage的用途和在网页交互中的使用案例

sessionStorage的作用及其在网页交互中的应用案例解析随着互联网的发展,网页交互对于用户体验的重要性越来越被重视。为了实现更好的网页交互效果,开发人员需要使用一些技术手段来存储和管理用户的数据。sessionStorage就是其
解析sessionstorage的用途和在网页交互中的使用案例
2024-01-15

如何使用Java注解和反射实现Junit4中的用例调用

这篇文章主要讲解了“如何使用Java注解和反射实现Junit4中的用例调用”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“如何使用Java注解和反射实现Junit4中的用例调用”吧!实例需求需
2023-06-15

详解shell脚本中的case条件语句介绍和使用案例

#前言:这篇我们接着写shell的另外一个条件语句case,上篇讲解了if条件语句。case条件语句我们常用于实现系统服务启动脚本等场景,case条件语句也相当于if条件语句多分支结构,多个选择,case看起来更规范和易读 #case条件语
2022-06-04

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录