我的编程空间,编程开发者的网络收藏夹
学习永远不晚

微服务同时接入多个Kafka

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

微服务同时接入多个Kafka

最近在做微服务的迁移改造工作,其中有一个服务需要订阅多个Kafka,如果使用spring kafka自动配置的话只能配置一个Kafka,不符合需求,该文总结了如何配置多个Kafka,希望对您有帮助。

准备工作

  • 自己搭建一个Kafka
    从官方下载Kafka,选择对应Spring Boot 的版本,好在Kafka支持的版本范围比较广,当前最新版本是3.2.1,支持2.12-3.2.1 范围的版本,覆盖了Spring Boot 2.0x-Spring Boot 3.0.x
    https://kafka.apache.org/downloads
    在这里插入图片描述
  • 解压安装
    进入bin目录,执行如下命令,按照如下顺序启动
    Linux
# 配置文件选择自己对应的目录zookeeper-server-start.sh ../config/zookeeper.properties

Windows

windows/zookeeper-server-start.bat ../config/zookeeper.properties

打开另外一个终端,启动KafkaServer
Linux

kafka-server-start.sh ../config/server.properties

Windows

windows/kafka-server-start.bat ../config/server.properties

最小化配置Kafka

如下是最小化配置Kafka
pom.xml 引入依赖

<dependency><groupId>org.springframework.kafkagroupId><artifactId>spring-kafkaartifactId>dependency>

application.properties

server.port=8090spring.application.name=single-kafka-server#kafka 服务器地址spring.kafka.bootstrap-servers=localhost:9092#消费者分组,配置后,自动创建spring.kafka.consumer.group-id=default_group

KafkaProducer 生产者

@Slf4j@Component@EnableSchedulingpublic class KafkaProducer {    @Resource    private KafkaTemplate kafkaTemplate;    private void sendTest() {    //topic 会自动创建        kafkaTemplate.send("topic1", "hello kafka");    }    @Scheduled(fixedRate = 1000 * 10)    public void testKafka() {        log.info("send message...");        sendTest();    }}

KafkaConsumer 消费者

@Slf4j@Componentpublic class KafkaConsumer {    @KafkaListener(topics = {"topic1"})    public void processMessage(String spuId) {        log.warn("process spuId ={}", spuId);    }}

运行效果:
在这里插入图片描述

多Kafka配置

配置稍微复杂了一点,灵魂就是手动创建,同样引入依赖
pom.xml

<dependency><groupId>org.springframework.kafkagroupId><artifactId>spring-kafkaartifactId>dependency>

application.properties

server.port=8090spring.application.name=kafka-server#kafka1#服务器地址spring.kafka.one.bootstrap-servers=localhost:9092spring.kafka.one.consumer.group-id=default_group#kafka2spring.kafka.two.bootstrap-servers=localhost:9092spring.kafka.two.consumer.group-id=default_group2

第一个Kafka配置,需要区分各Bean的名称
KafkaOneConfig

@Configurationpublic class KafkaOneConfig {    @Value("${spring.kafka.one.bootstrap-servers}")    private String bootstrapServers;    @Value("${spring.kafka.one.consumer.group-id}")    private String groupId;    @Bean    public KafkaTemplate<String, String> kafkaOneTemplate() {        return new KafkaTemplate<>(producerFactory());    }    @Bean(name = "kafkaOneContainerFactory")    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaOneContainerFactory() {        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();        factory.setConsumerFactory(consumerFactory());        factory.getContainerProperties().setPollTimeout(3000);        return factory;    }    private ProducerFactory<String, String> producerFactory() {        return new DefaultKafkaProducerFactory<>(producerConfigs());    }    private ConsumerFactory<Integer, String> consumerFactory() {        return new DefaultKafkaConsumerFactory<>(consumerConfigs());    }    private Map<String, Object> producerConfigs() {        Map<String, Object> props = new HashMap<>();        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);        return props;    }    private Map<String, Object> consumerConfigs() {        Map<String, Object> props = new HashMap<>();        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);        return props;    }}

kafkaOneTemplate 定义第一个Kafka的高级模板,用来发送消息
kafkaOneContainerFactory 消费监听容器,配置在@KafkaListener中,
producerFactory 生产者工厂
consumerFactory 消费者工厂
producerConfigs 生产者配置
consumerConfigs 消费者配置

同样创建第二个Kafka,配置含义,同第一个Kafka
KafkaTwoConfig

@Configurationpublic class KafkaTwoConfig {    @Value("${spring.kafka.two.bootstrap-servers}")    private String bootstrapServers;    @Value("${spring.kafka.two.consumer.group-id}")    private String groupId;    @Bean    public KafkaTemplate<String, String> kafkaTwoTemplate() {        return new KafkaTemplate<>(producerFactory());    }    @Bean(name = "kafkaTwoContainerFactory")    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaTwoContainerFactory() {        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();        factory.setConsumerFactory(consumerFactory());        factory.getContainerProperties().setPollTimeout(3000);        return factory;    }    private ProducerFactory<String, String> producerFactory() {        return new DefaultKafkaProducerFactory<>(producerConfigs());    }    public ConsumerFactory<Integer, String> consumerFactory() {        return new DefaultKafkaConsumerFactory<>(consumerConfigs());    }    private Map<String, Object> producerConfigs() {        Map<String, Object> props = new HashMap<>();        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);        return props;    }    private Map<String, Object> consumerConfigs() {        Map<String, Object> props = new HashMap<>();        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);        return props;    }}

创建一个测试的消费者,注意配置不同的监听容器containerFactory
KafkaConsumer

@Slf4j@Componentpublic class KafkaConsumer {    @KafkaListener(topics = {"topic1"}, containerFactory = "kafkaOneContainerFactory")    public void oneProcessItemcenterSpuMessage(String spuId) {        log.warn("one process spuId ={}", spuId);    }    @KafkaListener(topics = {"topic2"},containerFactory = "kafkaTwoContainerFactory")    public void twoProcessItemcenterSpuMessage(String spuId) {        log.warn("two process spuId ={}", spuId);    }}

创建一个测试的生产者,定时往两个topic中发送消息
KafkaProducer

@Slf4j@Componentpublic class KafkaProducer {    @Resource    private KafkaTemplate kafkaOneTemplate;    @Resource    private KafkaTemplate kafkaTwoTemplate;    private void sendTest() {        kafkaOneTemplate.send("topic1", "hello kafka one");        kafkaTwoTemplate.send("topic2", "hello kafka two");    }    @Scheduled(fixedRate = 1000 * 10)    public void testKafka() {        log.info("send message...");        sendTest();    }}

最后运行效果:
在这里插入图片描述

其他kafka文章:
【从面试题看源码】-看完Kafka性能优化-让你吊打面试官

来源地址:https://blog.csdn.net/weixin_40972073/article/details/126682094

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

微服务同时接入多个Kafka

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

微服务同时接入多个Kafka

最近在做微服务的迁移改造工作,其中有一个服务需要订阅多个Kafka,如果使用spring kafka自动配置的话只能配置一个Kafka,不符合需求,该文总结了如何配置多个Kafka,希望对您有帮助。 文章目录 准备工作最小化配
2023-08-19

微软云服务器价格多少钱一个小时

微软云服务器的价格可能因品牌、地区、型号、配置等因素而有所不同,因此无法给出确切的价格。不过,微软公布的价格通常包含服务器的软件、许可证费用等,不同品牌和配置的服务器价格也可能存在较大差异。通常,一台微软云服务器的价格在几万到数十万人民币之间。具体的价格取决于使用的硬件设备、软件费用、运维服务等多种因素。如果您需要了解关于微软云服务器的具体信息,建议您联系微软官方或者其他相关的渠道来进一步了解
2023-10-26

详解Spring中singleton bean如何同时服务多个请求

这篇文章主要介绍了详解Spring中singleton bean如何同时服务多个请求
2023-02-07

多版本共存:CentOS上同时搭建多个web服务器的指南

在CentOS上同时搭建多个web服务器需要进行以下步骤:1. 安装CentOS操作系统:首先需要在服务器上安装CentOS操作系统。可以通过光盘安装或者使用ISO镜像进行安装。2. 安装Apache服务器:Apache是最常见的web服务
2023-10-09

微软云服务器一年多少钱一个小时正常

微软云服务器的价格因不同的配置和服务而异。最低配置的云服务器通常需要几百美元,而更高级别的云服务器则需要几千美元甚至上万元。如果您选择的是高级别的云服务器,例如高端VMwareHyper-V或AWSSphinx,您可能需要考虑更多的硬件和软件要求,例如处理器、内存、存储、网络、安全等。这些硬件和软件要求的具体情况可能会
2023-10-27

npm script命令同时进行多个监听服务的方法

最近在搭建一个静态页面偏多的网站, 用vue或React有点大材小用,使用纯html / css / js 又不好用, 于是就用npm手动搭建一个简单的本地开发环境, 本地环境要实现几个基本功能在本地开启http服务 ; 且开启服务后, 会
2022-06-04

Java编程Socket如何实现多个客户端连接同一个服务端

这篇文章主要介绍Java编程Socket如何实现多个客户端连接同一个服务端,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!Java Socket(套接字)通常也称作"套接字",用于描述IP地址和端口,是一个通信链的句柄
2023-05-30

如何使用阿里云服务器同时运行多个域名

阿里云服务器是一种灵活、可扩展的云计算服务,可以为用户提供强大的计算能力和存储资源。然而,有时候用户可能需要同时运行多个域名,这时候就需要使用阿里云服务器来实现。本文将介绍如何使用阿里云服务器同时运行多个域名的方法。详细说明:步骤一:购买阿里云服务器首先,您需要在阿里云官网上购买一台适合您需求的服务器。您可以根据自己的
如何使用阿里云服务器同时运行多个域名
2024-01-20

2G 4核阿里云服务器可以同时运行多少个应用程序?

阿里云服务器提供了多种配置,包括2G4核的配置。对于这种配置的阿里云服务器,很多人都有一个疑问,那就是这种配置的阿里云服务器可以同时运行多少个应用程序?接下来,我们将详细解答这个问题。阿里云服务器的性能取决于许多因素,包括CPU的核心数、内存大小、硬盘速度等等。对于2G4核的阿里云服务器,其主要的性能参数是CPU
2G 4核阿里云服务器可以同时运行多少个应用程序?
2023-12-10

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录