我的编程空间,编程开发者的网络收藏夹
学习永远不晚

Kafka动态增加Topic的副本

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

Kafka动态增加Topic的副本

由于Producer和Consumer都只会与Leader角色的分区副本相连,所以kafka需要以集群的组织形式提供主题下的消息高可用。kafka支持主备复制,所以消息具备高可用和持久性。

    一个分区可以有多个副本,这些副本保存在不同的broker上。每个分区的副本中都会有一个作为Leader。当一个broker失败时,Leader在这台broker上的分区都会变得不可用,kafka会自动移除Leader,再其他副本中选一个作为新的Leader。

在通常情况下,增加分区可以提供kafka集群的吞吐量。然而,也应该意识到集群的总分区数或是单台服务器上的分区数过多,会增加不可用及延迟的风险。

1.png

关于副本的更多信息,请参考链接:

https://blog.csdn.net/weixin_38750084/article/details/82942564

 

目前的kakfa集群有3个节点,server.properties 关于topic的配置为:

offsets.topic.replication.factor=1                                                                                                                                                                                           
transaction.state.log.replication.factor=1                                                                                                                                                                                   
transaction.state.log.min.isr=1

目前的设置为1个副本,这样不健全。如果有一台服务器挂掉了,那么就会造成数据丢失!

因此,需要将副本数改为3,也就是每台服务器都有一个副本,这样才是稳妥的!

 

kafka-topics.sh 不能用来增加副本因子replication-factor。实际应该使用kafka bin目录下面的kafka-reassign-partitions.sh

 

查看topic详情

首先查看kafka的所有topic

/kafka/bin/kafka-topics.sh --zookeeper zookeeper-1.default.svc.cluster.local:2181 --list
输出:
test
...

 

查看topic为test的详细信息

/kafka/bin/kafka-topics.sh --describe --zookeeper zookeeper-1.default.svc.cluster.local:2181 --topic test

 

输出:

Topic:test    PartitionCount:3    ReplicationFactor:1    Configs:
    Topic: test    Partition: 0    Leader: 1    Replicas: 1    Isr: 1
    Topic: test    Partition: 1    Leader: 2    Replicas: 2    Isr: 2
    Topic: test    Partition: 2    Leader: 3    Replicas: 3    Isr: 3

 

可以看到test的副本数为1

 

扩容副本

kafka-reassign-partitions.sh 执行时,依赖一个json文件。

创建 test.json

{
    "version": 1,
    "partitions": [
        {
            "topic": "test",
            "partition": 0,
            "replicas": [
                1,
                2,
            ]
        },
        {
            "topic": "test",
            "partition": 1,
            "replicas": [
                1,
                2,
            ]
        },
        {
            "topic": "test",
            "partition": 2,
            "replicas": [
                1,
                2,
            ]
        }
    ]
}

 

注意:这个json文件和上面查看的test详情,是有关联的!否则会导致执行失败

关系图

1.png

 

正式执行脚本

/kafka/bin/kafka-reassign-partitions.sh --zookeeper  zookeeper-1.default.svc.cluster.local:2181 --reassignment-json-file test.json --execute

参数解释:

--reassignment-json-file 带有分区的JSON文件
--execute 按规定启动重新分配通过---重新分配JSON文件选择权。

 

执行输出:

Current partition replica assignment

{"version":1,"partitions":[{"topic":"test","partition":2,"replicas":[1],"log_dirs":["any"]},{"topic":"test","partition":1,"replicas":[3],"log_dirs":["any"]},{"topic":"test","partition":0,"replicas":[2],"log_dirs":["any"]}]}

出现 Successfully 表示成功了!

 

再次查看topic为test的partition详情

/kafka/bin/kafka-topics.sh --describe --zookeeper zookeeper-1.default.svc.cluster.local:2181 --topic test

 

输出:

Topic:test    PartitionCount:3    ReplicationFactor:3    Configs:
    Topic: test    Partition: 0    Leader: 2    Replicas: 1,2,3    Isr: 2,3,1
    Topic: test    Partition: 1    Leader: 3    Replicas: 1,2,3    Isr: 3,1,2
    Topic: test    Partition: 2    Leader: 1    Replicas: 1,2,3    Isr: 1,3,2

 

可以发现,副本已经改为3了!

 

默认配置

在java代码或者python代码中,是直接发送生产者消息。topic的名字是动态生成的(当kafka发现topic不存在时,会自动创建),那么它的partitions和replication-factor的数量是由服务端决定的

因为kafka集群有3个节点,所有需要改成3个

offsets.topic.replication.factor=3                                                                                                                                                                                           
transaction.state.log.replication.factor=3                                                                                                                                                                                   
transaction.state.log.min.isr=3
num.partitions=1
default.replication.factor=3

 

参数解释:

offsets.topic.replication.factor 用于配置offset记录的topic的partition的副本个数
transaction.state.log.replication.factor 事务主题的复制因子 
transaction.state.log.min.isr 覆盖事务主题的min.insync.replicas配置

num.partitions 新建Topic时默认的分区数

default.replication.factor 自动创建topic时的默认副本的个数

 

注意:这些参数,设置得更高以确保高可用性!

其中 default.replication.factor 是真正决定,topi的副本数量的

 

关于kafka配置文件的更多解释,请参考链接:

https://blog.csdn.net/memoordit/article/details/78850086

 

那么默认参数,如何测试呢?

很简单,由于在应用代码,是不会主动创建topic的,由kafka集群自动创建topic。

那么由代码进行一次,生产者和消费者,就可以了!

 

Python测试

这个脚本是普通版的kafka消息测试,没有ACL配置!

 

test.py

#!/usr/bin/env python3
# coding: utf-8
import sys
import io

def setup_io():  # 设置默认屏幕输出为utf-8编码
    sys.stdout = sys.__stdout__ = io.TextIOWrapper(sys.stdout.detach(), encoding='utf-8', line_buffering=True)
    sys.stderr = sys.__stderr__ = io.TextIOWrapper(sys.stderr.detach(), encoding='utf-8', line_buffering=True)
setup_io()

import time
from kafka import KafkaProducer
from kafka import KafkaConsumer

class KafkaClient(object):
    def __init__(self, kafka_server, port, topic, content):
        self.kafka_server = kafka_server  # kafka服务器ip地址
        self.port = port  # kafka端口
        self.topic = topic  # topic名
        self.content = content # 内容

    def producer(self):
        producer = KafkaProducer(bootstrap_servers=['%s:%s' % (kafka_server, port)])
        producer.send(topic, content)
        producer.flush()  # flush确保所有meg都传送给broker
        producer.close()
        return producer

    def consumer(self):
        consumer = KafkaConsumer(topic, group_id='test_group', bootstrap_servers=['%s:%s' % (kafka_server, port)])
        # consumer.close()
        return consumer

    def main(self):
        startime = time.time()  # 开始时间

        client = KafkaClient(self.kafka_server, self.port, self.topic, self.content)  # 实例化客户端

        client.producer()  # 执行生产者
        print("已执行生产者")
        consumer = client.consumer()  # 执行消费者
        print("已执行消费者")
        print("等待结果输出...")
        flag = False
        for msg in consumer:
            # recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
            # 判断生产的消息和消费的消息是否一致
            print(msg.value)
            # print(self.content)
            if msg.value == self.content:
                flag = True
                break

        consumer.close()  # 关闭消费者对象
        endtime = time.time()  # 结束时间

        if flag:
            # %.2f %(xx) 表示保留小数点2位
            return "kafka验证消息成功,花费时间", '%.2f 秒' % (endtime - startime)
        else:
            return "kafka验证消息失败,花费时间", '%.2f 秒' % (endtime - startime)


if __name__ == '__main__':
    kafka_server = "kafka-1.default.svc.cluster.local"
    port = "9092"
    topic = "test_xxx"
    content = "hello honey".encode('utf-8')

    client = KafkaClient(kafka_server,port,topic,content)  # 实例化客户端
    print(client.main())

这里指定的topic为 test_xxx

执行Python脚本,然后到服务器上面,查看topic为test_xxx的详细信息

/kafka/bin/kafka-topics.sh --describe --zookeeper zookeeper-1.default.svc.cluster.local:2181 --topic test_xxx

 

输出如下:

Topic:test_xxx    PartitionCount:3    ReplicationFactor:3    Configs:
    Topic: test_xxx    Partition: 0    Leader: 2    Replicas: 1,2,3    Isr: 2,3,1
    Topic: test_xxx    Partition: 1    Leader: 3    Replicas: 1,2,3    Isr: 3,1,2
    Topic: test_xxx    Partition: 2    Leader: 1    Replicas: 1,2,3    Isr: 1,3,2

 

可以发现副本为3,说明默认配置生效了!

 


免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

Kafka动态增加Topic的副本

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

Kafka动态增加Topic的副本

由于Producer和Consumer都只会与Leader角色的分区副本相连,所以kafka需要以集群的组织形式提供主题下的消息高可用。kafka支持主备复制,所以消息具备高可用和持久性。 一个分区可以有多个副本,这些副本保存在不同的br
2023-01-31

spring-kafka使消费者动态订阅新增的topic问题

这篇文章主要介绍了spring-kafka使消费者动态订阅新增的topic问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
2022-12-27

spring kafka怎么实现消费者动态订阅新增的topic

这篇文章主要介绍了spring kafka怎么实现消费者动态订阅新增的topic的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇spring kafka怎么实现消费者动态订阅新增的topic文章都会有所收获,下面
2023-07-04

redis动态增加节点的方法是什么

Redis动态增加节点的方法有两种,分别是使用Redis Sentinel和使用Redis Cluster。1. 使用Redis Sentinel:Redis Sentinel是Redis的高可用解决方案,它可以自动监控Redis节点的健康
2023-08-24

vue实现动态路由添加功能的简单方法(无废话版本)

ue动态路由(约定路由),听起来好像很玄乎的样子,但是你要是理解了实现思路,你会发现没有想象中的那么难,下面这篇文章主要给大家介绍了关于vue实现动态路由添加功能的简单方法,需要的朋友可以参考下
2023-02-16

SQLServer 错误 35250 到主要副本的连接处于非活动状态。 无法处理该命令。 故障 处理 修复 支持远程

详细信息 Attribute 值 产品名称 SQL Server 事件 ID 35250 事件源 MSSQLSERVER 组件 SQLEngine 符号名称 HADR_PRIMARYNOTACTIVE 消息正文 ...
SQLServer 错误 35250 到主要副本的连接处于非活动状态。  无法处理该命令。 故障 处理 修复 支持远程
2023-11-05

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录