我的编程空间,编程开发者的网络收藏夹
学习永远不晚

怎样给Kafka新增分区

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

怎样给Kafka新增分区

给Kafka新增分区

数据量猛增的时候,需要给 kafka 的 topic 新增分区,增大处理的数据量,可以通过以下步骤

1、修改 topic 的分区

kafka-topics --zookeeper hadoop004:2181 --alter --topic flink-test-04 --partitions 3

2、迁移数据

生成迁移计划,手动新建一个 json 文件

{
"topics": [
{"topic": "flink-test-03"}
],
"version": 1
}

生成迁移计划

kafka-reassign-partitions --zookeeper hadoop004:2181 --topics-to-move-json-file topic.json --broker-list “120,121,122” --generate

Current partition replica assignment:

{"version":1,"partitions":[{"topic":"flink-test-02","partition":5,"replicas":[120]},{"topic":"flink-test-02","partition":0,"replicas":[121]},{"topic":"flink-test-02","partition":2,"replicas":[120]},{"topic":"flink-test-02","partition":1,"replicas":[122]},{"topic":"flink-test-02","partition":4,"replicas":[122]},{"topic":"flink-test-02","partition":3,"replicas":[121]}]}

新建一个文件reassignment.json,保存上边这些信息

3、迁移

kafka-reassign-partitions --zookeeper hadoop004:2181 --reassignment-json-file reassignment.json --execute

4、验证

kafka-reassign-partitions --zookeeper hadoop004:2181 --reassignment-json-file reassignment.json --verify

Kafka分区原理机制

分区结构

kafka的消息总共是三层结构

Topic(第一层结构,表示一个主题)-> Partition(分区,每个消息可以有多个分区) -> 消息实例(具体的消息文本等等,一个消息实例只可能在一个分区里面,不会出现在多个分区中)


在这里插入图片描述

分区优点

分区其实是一个负载均衡的思想。如此设计能使每一个分区独自处理单独的读写请求,提高吞吐量。

分区策略

  • 轮询策略Round-robin(未指定key新版本默认策略)
  • 随机策略Randomness(老版本默认策略)
  • 消息键排序策略Key-ordering(指定了key,则使用该策略)
  • 根据地理位置进行分区
  • 自定义分区 需要在生产者端实现org.apache.kafka.clients.producer.Partitioner接口,并配置一下实现类的全限定名

根据分区策略实现消息的顺序消费

可以只设置一个分区,这样子消息都是放在一个partition,肯定是先进先出进行消费,然而这种场景无法利用kafka多分区的高吞吐量以及负载均衡的优势。

将需要顺序消费的消息设置key,这个时候根据默认的分区策略,kafka会将所有的相同的key放在一个partition上面,这样既可以使用kafka的partition又可以实现顺序消费。

默认分区策略源码


public class DefaultPartitioner implements Partitioner {
    private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();
    public void configure(Map<String, ?> configs) {}
    
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (keyBytes == null) {
            int nextValue = nextValue(topic);
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                // no partitions are available, give a non-available partition
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {
            // hash the keyBytes to choose a partition
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }
    private int nextValue(String topic) {
        AtomicInteger counter = topicCounterMap.get(topic);
        if (null == counter) {
            counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
            AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
            if (currentCounter != null) {
                counter = currentCounter;
            }
        }
        return counter.getAndIncrement();
    }
    public void close() {}
}

从类注释当中已经很明显的看出来分区逻辑

3. 如果指定了分区,则使用指定分区

4. 如果没有指定分区,但是有key,则使用hash过的key放置消息

5. 如果没有指定分区,也没有key,则使用轮询

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持编程网。

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

怎样给Kafka新增分区

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

怎样给Kafka新增分区

这篇文章主要介绍了怎样给Kafka新增分区问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
2022-12-27

SQLserver怎么给表新增字段

要给表新增字段,可以使用 ALTER TABLE 语句。例如,要在名为 table_name 的表中新增一个名为 column_name 的字段,可以使用以下语法:ALTER TABLE table_nameADD column_na
2023-10-26

win11怎么给硬盘分区

这篇文章主要介绍win11怎么给硬盘分区,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!其实win11的磁盘分区跟win10系统的操作差不多,具体的步骤如下:1、右击“此电脑”,选择“管理”,然后在“计算机管理”窗口左
2023-06-27

win10怎么给电脑分区

在Windows 10中,可以通过以下步骤给电脑分区:1. 打开“磁盘管理”。右键点击“开始”菜单,选择“磁盘管理”选项。2. 在磁盘管理窗口中,你会看到所有的磁盘和分区。找到你想要分区的磁盘,确保它有足够的可分配空间。3. 右键点击该磁盘
2023-08-25

win10怎么给磁盘分区

在Windows 10上给磁盘分区,可以按照以下步骤操作:1. 按下Win + X键,选择“磁盘管理”来打开磁盘管理工具。2. 在磁盘管理中,可以看到计算机上的所有磁盘和分区。选择需要进行分区的磁盘。3. 右键点击该磁盘,并选择“收缩卷”来
2023-09-08

win10怎么给硬盘分区

要给硬盘分区,您可以按照以下步骤操作:1. 打开“控制面板”,通过开始菜单或搜索栏找到它。2. 在控制面板中,选择“管理工具”。3. 在管理工具中,选择“计算机管理”。4. 在计算机管理窗口中,选择“磁盘管理”。5. 在磁盘管理中,您将看到
2023-09-12

win7怎么给电脑分区

要给Windows 7电脑分区,可以按照以下步骤操作:1. 打开“计算机”或“我的电脑”,找到要分区的硬盘。2. 右键单击该硬盘,选择“管理”。3. 在“磁盘管理”窗口中,找到要分区的硬盘,右键单击它。4. 在弹出的菜单中,选择“压缩卷”或
2023-08-25

怎样给centos系统扩展磁盘分区的实现方法

问题/故障/场景/需求eve-ng的虚拟机ova的硬盘只有38G,需要增加空间,在vmware直接扩展硬盘后,重启系统,使用fdisk -l可以看到硬盘扩大了,但文件系统并没有扩大,还需要将这些新增的空间扩展分配
2022-06-04

mysql怎么给表添加分区

要给表添加分区,可以使用ALTER TABLE语句来实现。以下是一个示例:ALTER TABLE table_name PARTITION BY RANGE (column_name) (PARTITION p0 VALUES LESS
mysql怎么给表添加分区
2024-04-09

kafka中的消息分区分配算法怎么用

本文小编为大家详细介绍“kafka中的消息分区分配算法怎么用”,内容详细,步骤清晰,细节处理妥当,希望这篇“kafka中的消息分区分配算法怎么用”文章能帮助大家解决疑惑,下面跟着小编的思路慢慢深入,一起来学习新知识吧。背景kafka有分区机
2023-06-30

怎么增大swap交换分区

本篇内容介绍了“怎么增大swap交换分区”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!# dd if=/dev/zero of=/.swap
2023-06-10

怎样扩展swap分区

怎样扩展swap分区,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。1.mkswap /devices (格式化成swap格式) 2.swapon /swap (激活/swa
2023-06-06

spring kafka怎么实现消费者动态订阅新增的topic

这篇文章主要介绍了spring kafka怎么实现消费者动态订阅新增的topic的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇spring kafka怎么实现消费者动态订阅新增的topic文章都会有所收获,下面
2023-07-04

php怎么给数组增加值后返回最新key

步骤:1、利用array_splice()在数组中插入新元素,语法“array_splice(原数组,插入位置,0,新元素)”;2、利用array_diff()比对原数组和增值后的新数组,语法“array_diff(增值后的新数组,原数组)”,会返回一个包含不同元素的差集数组;3、利用array_keys()获取差集数组的全部键名即可,语法“array_keys(差集数组)”。本教程操作环境:win
2022-08-08

怎么给ubuntu重新分配空间

要重新分配Ubuntu的空间,你可以按照以下步骤操作:1. 首先,备份你的重要数据。在进行分区调整之前,一定要备份所有重要的文件,以防止数据丢失。2. 使用GParted分区编辑器。GParted是一个图形界面的分区编辑器,可以帮助你重新分
2023-09-05

怎么给Mac移动硬盘分区

这篇文章给大家分享的是有关怎么给Mac移动硬盘分区的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。Mac移动硬盘分区步骤:一、安装插件首先,我们需要安装NTFS For Mac软件,这款软件是专门为解决Mac电脑不
2023-06-27

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录