我的编程空间,编程开发者的网络收藏夹
学习永远不晚

关于spring boot整合kafka+注解方式

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

关于spring boot整合kafka+注解方式

spring boot自动配置方式整合

spring boot具有许多自动化配置,对于kafka的自动化配置当然也包含在内,基于spring boot自动配置方式整合kafka,需要做以下步骤。

引入kafka的pom依赖包


<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.2.2.RELEASE</version>
</dependency>

在配置文件中配置kafka相关属性配置,分别配置生产者和消费者的属性,在程序启动时,spring boot框架会自动读取这些配置的属性,创建相关的生产者、消费者等。下面展示一个简单的配置。


#kafka默认消费者配置
spring.kafka.consumer.bootstrap-servers=192.168.0.15:9092
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest
#kafka默认生产者配置
spring.kafka.producer.bootstrap-servers=192.168.0.15:9092
spring.kafka.producer.acks=-1
spring.kafka.client-id=kafka-producer
spring.kafka.producer.batch-size=5

当然,在实际生产中的配置肯定比上面的配置复杂,需要一些定制化的操作,那么spring boot的自动化配置创建的生产者或者消费者都不能满足我们时,应该需要自定义化相关配置,这个在后续举例,这里先分析自动化配置。

在进行了如上配置之后,需要生产者时,使用方式为下代码所示。


@RunWith(SpringRunner.class)
@SpringBootTest(classes = {UserSSOApplication.class})
public class UserSSOApplicationTests {
   @Resource
    //注入kafkatemplete,这个由spring boot自动创建
   KafkaTemplate kafkaTemplate;
   @Test
   public void testKafkaSendMsg() {
       //发送消息
      kafkaTemplate.send("test", 0,12,"1222");
   }
}

消费者的使用注解方式整合,代码如下。


@Component
@Slf4j
public class KafkaMessageReceiver2 {
    //指定监听的topic,当前消费者组id
    @KafkaListener(topics = {"test"}, groupId = "receiver")
    public void registryReceiver(ConsumerRecord<Integer, String> integerStringConsumerRecords) {
        log.info(integerStringConsumerRecords.value());
    }
}

上面是最简单的配置,实现的一个简单例子,如果需要更加定制化的配置,可以参考类

org.springframework.boot.autoconfigure.kafka.KafkaProperties这里面包含了大部分需要的kafka配置。针对配置,在properties文件中添加即可。

spring boot自动配置的不足

上面是依赖spring boot自动化配置完成的整合方式,实际上所有的配置实现都是在org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration中完成。可以看出个是依赖于@Configuration完成bean配置,这种配置方式基本能够实现大部分情况,只要熟悉org.springframework.boot.autoconfigure.kafka.KafkaProperties中的配置即可。

但是这种方式还有一个问题,就是org.springframework.boot.autoconfigure.kafka.KafkaProperties中并没有涵盖所有的org.apache.kafka.clients.producer.ProducerConfig中的配置,这就导致某些特殊配置不能依赖spring boot自动创建,需要我们手动创建Producer和comsumer。


@Configuration
@ConditionalOnClass(KafkaTemplate.class)
@EnableConfigurationProperties(KafkaProperties.class)
@Import(KafkaAnnotationDrivenConfiguration.class)
public class KafkaAutoConfiguration {
   private final KafkaProperties properties;
   private final RecordMessageConverter messageConverter;
   public KafkaAutoConfiguration(KafkaProperties properties,
         ObjectProvider<RecordMessageConverter> messageConverter) {
      this.properties = properties;
      this.messageConverter = messageConverter.getIfUnique();
   }
   @Bean
   @ConditionalOnMissingBean(KafkaTemplate.class)
   public KafkaTemplate<?, ?> kafkaTemplate(
         ProducerFactory<Object, Object> kafkaProducerFactory,
         ProducerListener<Object, Object> kafkaProducerListener) {
      KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(
            kafkaProducerFactory);
      if (this.messageConverter != null) {
         kafkaTemplate.setMessageConverter(this.messageConverter);
      }
      kafkaTemplate.setProducerListener(kafkaProducerListener);
      kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
      return kafkaTemplate;
   }
   @Bean
   @ConditionalOnMissingBean(ConsumerFactory.class)
   public ConsumerFactory<?, ?> kafkaConsumerFactory() {
      return new DefaultKafkaConsumerFactory<>(
            this.properties.buildConsumerProperties());
   }
   @Bean
   @ConditionalOnMissingBean(ProducerFactory.class)
   public ProducerFactory<?, ?> kafkaProducerFactory() {
      DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory<>(
            this.properties.buildProducerProperties());
      String transactionIdPrefix = this.properties.getProducer()
            .getTransactionIdPrefix();
      if (transactionIdPrefix != null) {
         factory.setTransactionIdPrefix(transactionIdPrefix);
      }
      return factory;
   }
  //略略略
}

spring boot下手动配置kafka

由于需要对某些特殊配置进行配置,我们可能需要手动配置kafka相关的bean,创建一个配置类如下,类似于

org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration,这里创建了对应类型的bean之后,org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration中的对应Bean定义将不起作用。

所有的生产者配置可以参考ProducerConfig类,所有的消费者配置可以参考ConsumerConfig类。



@Configuration
public class KafkaConfig {
    @Value("${spring.kafka.consumer.bootstrap-servers}")
    private String bootstrapServers;
    //构造消费者属性map,ConsumerConfig中的可配置属性比spring boot自动配置要多
    private Map<String, Object> consumerProperties(){
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "activity-service");
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        return props;
    }
    
    @Bean("consumerFactory")
    public DefaultKafkaConsumerFactory consumerFactory(){
        return new DefaultKafkaConsumerFactory(consumerProperties());
    }
    @Bean("listenerContainerFactory")
    //个性化定义消费者
    public ConcurrentKafkaListenerContainerFactory listenerContainerFactory(DefaultKafkaConsumerFactory consumerFactory) {
        //指定使用DefaultKafkaConsumerFactory
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory);
        
 //设置消费者ack模式为手动,看需求设置  
        factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
        //设置可批量拉取消息消费,拉取数量一次3,看需求设置
        factory.setConcurrency(3);
        factory.setBatchListener(true);
        return factory;
    }
   
    //创建生产者配置map,ProducerConfig中的可配置属性比spring boot自动配置要多
    private Map<String, Object> producerProperties(){
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.ACKS_CONFIG, "-1");
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 5);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 500);
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        return props;
    }
    
    @Bean("produceFactory")
    public DefaultKafkaProducerFactory produceFactory(){
        return new DefaultKafkaProducerFactory(producerProperties());
    }
    
    @Bean
    public KafkaTemplate kafkaTemplate(DefaultKafkaProducerFactory produceFactory){
        return new KafkaTemplate(produceFactory);
    }
}

生产者的使用方式,跟自动配置一样,直接注入KafkaTemplate即可。主要是消费者的使用有些不同。

批量消费消息

上面的消费者配置配置了一个bean,@Bean(“listenerContainerFactory”),这个bean可以指定为消费者,注解方式中是如下的使用方式。

containerFactory = "listenerContainerFactory"指定了使用listenerContainerFactory作为消费者。

  • 注意registryReceiver中的参数,ConsumerRecord对比之前的消费者,因为设置listenerContainerFactory是批量消费,因此ConsumerRecord是一个List,如果不是批量消费的话,相对应就是一个对象。
  • 注意第二个参数Acknowledgment,这个参数只有在设置消费者的ack应答模式为AckMode.MANUAL_IMMEDIATE才能注入,意思是需要手动ack。

@Component
@Slf4j
public class KafkaMessageReceiver {   
    
    @KafkaListener(topics = {"test"}, containerFactory = "listenerContainerFactory")
    public void registryReceiver(List<ConsumerRecord<Integer, String>> integerStringConsumerRecords, Acknowledgment acknowledgment) {
        Iterator<ConsumerRecord<Integer, String>> it = integerStringConsumerRecords.iterator();
        while (it.hasNext()){
            ConsumerRecord<Integer, String> consumerRecords = it.next();
            //dosome
             acknowledgment.acknowledge();
        }
    } 
}

如果不想要批量消费消息,那就可以另外定义一个bean类似于@Bean(“listenerContainerFactory”),如下,只要不设置批量消费即可。


@Bean("listenerContainerFactory2")
    //个性化定义消费者
    public ConcurrentKafkaListenerContainerFactory listenerContainerFactory2(DefaultKafkaConsumerFactory consumerFactory) {
        //指定使用DefaultKafkaConsumerFactory
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory);
        
 //设置消费者ack模式为手动,看需求设置  
        factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }

spring boot整合kafka报错

Timeout expired while fetching topic metadata

这种报错应检查 kafka连接问题,服务是否启动,端口是否正确。

Kafka Producer error Expiring 10 record(s) for TOPIC:XXXXXX: xxx ms has passed since batch creation plus linger time

这种报错要考虑kafka和spring对应的版本问题,我的springboot 2.1.2在使用kafka_2.12-2.1.1时出现此问题,将kafka版本换为2.11-1.1.1后,问题解决。

以上为个人经验,希望能给大家一个参考,也希望大家多多支持编程网。

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

关于spring boot整合kafka+注解方式

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

Spring Boot整合Kafka教程详解

这篇文章主要为大家介绍了Spring Boot整合Kafka教程详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
2023-03-10

Spring Boot如何利用注解方式整合MyBatis

今天小编给大家分享一下Spring Boot如何利用注解方式整合MyBatis的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解一下吧。
2023-06-30

关于Spring Boot WebSocket整合以及nginx配置详解

前言本文主要给大家介绍了关于Spring Boot WebSocket整合及nginx配置的相关内容,分享出来供大家参考学习,下面话不多说了,来一起看看详细的介绍吧。一:Spring Boot WebSocket整合创建一个maven项目,
2023-05-31

Spring基于注解整合Redis完整实例

在《Redis之——Spring整合Redis》一文中,向大家介绍了如何将spring与Redis整合起来,但不是基于注解的形式,很多同学都希望能够通过注解的形式来简单的将Spring与Redis整合起来,这样,在使用的时候,只需要在相应的
2023-05-31

spring boot如何整合redis主从sentinel方式

小编给大家分享一下spring boot如何整合redis主从sentinel方式,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!springboot整合redis主从sentinel一主二从三sentinel配置1、mas
2023-06-29

Spring Boot怎么利用XML方式整合MyBatis

本篇内容介绍了“Spring Boot怎么利用XML方式整合MyBatis”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!一、前言下图是整个整
2023-06-30

详解Spring Boot集成MyBatis(注解方式)

MyBatis是支持定制化SQL、存储过程以及高级映射的优秀的持久层框架,避免了几乎所有的JDBC代码和手动设置参数以及获取结果集。spring Boot是能支持快速创建Spring应用的Java框架。本文通过一个例子来学习Spring B
2023-05-31

spring boot基于注解声明式事务配置的示例分析

小编给大家分享一下spring boot基于注解声明式事务配置的示例分析,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!事务配置1、配置方式一1)开启spring事务管理,在spring boot启动类添加注解@Enable
2023-06-20

spring整合redis缓存实现以注解的形式使用

本篇文章给大家分享的是有关spring整合redis缓存实现以注解的形式使用,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。maven项目中在pom.xml中依赖2个jar包,其
2023-05-31

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录