我的编程空间,编程开发者的网络收藏夹
学习永远不晚

java分布式流式处理组件Producer分区的作用是什么

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

java分布式流式处理组件Producer分区的作用是什么

这篇文章主要讲解了“java分布式流式处理组件Producer分区的作用是什么”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“java分布式流式处理组件Producer分区的作用是什么”吧!

为什么需要分区

分区的作用

  • 合理的使用存储资源:把海量的数据按照分区切割成一小块的数据存储在多台Broker上。此时能够保证每台服务器存储资源能够被充分利用到。而且小块数据在寻址时间上更有优势~

如果将全部的数据存储在一台机器上,那么要对当前数据做副本的时候,由于服务器资源配置不同,就有可能会出现副本数据存放失败,从而增加数据丢失的可能性。

同时,如果单个文件过大,副本放置时间、内容检索时间都会极大的延长,从而导致Kafka性能降低。

  • 负载均衡: 数据生产或消费期间,生产者已分区的单位发送数据,消费者分区的单位进行消费。 期间,各分区生产和消费数据互不影响,这样能够达到合理控制分区任务的程度,提高任务的并行度。从而达到负载均衡的效果。

刚才我们提到:生产者已分区为单位向Broker发送数据。那么问题来了:

  • 生产者是怎么知道该向哪个分区发送数据呢?

这就是我们接下来要研究的分区策略。

分区策略

private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {    // 如果在消息中指定了分区    if (record.partition() != null)        return record.partition();    if (partitioner != null) {        // 分区器通过计算得到分区        int customPartition = partitioner.partition(            record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);        if (customPartition < 0) {            throw new IllegalArgumentException(String.format(                "The partitioner generated an invalid partition number: %d. Partition number should always be non-negative.", customPartition));        }        return customPartition;    }    // 通过序列化key计算分区    if (serializedKey != null && !partitionerIgnoreKeys) {        // hash the keyBytes to choose a partition        return BuiltInPartitioner.partitionForKey(serializedKey, cluster.partitionsForTopic(record.topic()).size());    } else {        // 返回-1        return RecordMetadata.UNKNOWN_PARTITION;    }}

下面的代码可以说是整个分区器的核心部分,可以通过以下的步骤进行说明:

  • 如果在生产消息的时候,已经指定了需要发送的分区位置,那么就会直接使用已经指定的份具体的位置,这样子还节省了也不计算的时间

  • 如果在生产者配置Properties中指定了分区策略类,那么消息生产就会通过已经指定的分区策略类进行分区计算

  • 否则就会以serializedKey作为参数,通过hash取模的方式计算。如果serializedKey == null,那么就会采用粘性分区的逻辑。 这在Kafka中属于默认分区器。

  • 如果以上情况都没有包含,那么他就会直接返回-1。相当于ack=0的情况。

在Kafka中分区策略我们是可以自定义的。当然Kafka也为我们内置了三种分区策略类。 接下来我们挑个重点来介绍,来给我们自定义分区器做一个铺垫~

java分布式流式处理组件Producer分区的作用是什么

我们已经看到,DefaultPartitionerUniformStickyPartitioner已经被标注为过期类,当然也并不妨碍我们来了解一下。

DefaultPartitioner

在当前版本中,如果没有对partitioner.class进行配置,此时的分区策略就会采用当前类作为默认分区策略类。

而以下是DefaultPartitioner策略类的核心实现方式,并且标记部分的代码实现其实就是UniformStickyPartitioner的计算逻辑

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster, int numPartitions) {    if (keyBytes == null) {        // 就是这段属于UniformStickyPartitioner的实现逻辑        return stickyPartitionCache.partition(topic, cluster);    }    return BuiltInPartitioner.partitionForKey(keyBytes, numPartitions);}

还有一段代码让我们来一起看看

public static int partitionForKey(final byte[] serializedKey, final int numPartitions) {    return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;}

这段代码不管有多复杂,调用方法有多少,但最终我们是能够发现:

  • 它的本质其实是在对序列化Key做哈希计算,然后通过hash值和分区数做取模运算,然后得到结果分区位置

这是一种比较重要的计算方式,但却不是唯一的方式

java分布式流式处理组件Producer分区的作用是什么

---这是分割线---

接下来继续,我们看看如果无法对序列化Key计算,会是怎么样的计算逻辑?

我们先开始来看一下,是在哪个地方得到的serializedKey,并且什么情况下serializedKey会是NULL

看看下面的这个代码眼熟不?

// 生产者生产消息对象ProducerRecord<String, String> record = new ProducerRecord<>(        "newTopic001",        "data from " + KafkaQuickProducer.class.getName());

java分布式流式处理组件Producer分区的作用是什么

// KafkaProducer#doSend()// line994serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
public class StringSerializer implements Serializer<String> {    // 省略。。。    @Override    public byte[] serialize(String topic, String data) {        if (data == null) {            return null;        } else {            return data.getBytes(encoding);        }    }}

从上面的代码来看,基本上能够实锤了:

  • 当在生成ProducerRecord对象的时候,如果没有对消息设置key参数,此时序列化之后的key就是个null

  • 那么当序列化之后的Key为NULL之后,此时分区计算逻辑就会改变。

此时相当于我们已经进入到UniformStickyPartitioner的计算逻辑, 当然了在我们使用的3.3版本中当前类也已经被标注为过期

根据前面的说法,粘性分区主要解决了消息无Key的分区计算逻辑,那么粘性分区并不是说每次都使用同一个分区

它是通过一个大的Batch为单位,尽量将batch内的消息固定在同一个分区内,这样在很大程度上能够保证:

  • 防止消息无规律的分散在不同的分区内,降低分区倾斜

  • 同时不需要每次进行分区计算,也降低了Producer的延迟

而实现方式是采用ConcurrentMap来进行缓存,感兴趣的大家可以看看StickyPartitionCache的源码

而当Batch内消息满足发送条件被发送出去之后,才会开始再次计算下一个分区,为此在KafkaProducer中还专门调用了新的方法

partitioner.onNewBatch(topic, cluster, prevPartition);
public void onNewBatch(String topic, Cluster cluster, int prevPartition) {    stickyPartitionCache.nextPartition(topic, cluster, prevPartition);}

java分布式流式处理组件Producer分区的作用是什么

RoundRobinPartitioner

这是在当前版本中唯一没有被标注的类,未来说不定会成为默认分区策略类,我们不看,就瞄一眼

private int nextValue(String topic) {    AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> new AtomicInteger(0));    return counter.getAndIncrement();}public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);    int numPartitions = partitions.size();    int nextValue = nextValue(topic);    List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);    if (!availablePartitions.isEmpty()) {        int part = Utils.toPositive(nextValue) % availablePartitions.size();        return availablePartitions.get(part).partition();    } else {        // no partitions are available, give a non-available partition        return Utils.toPositive(nextValue) % numPartitions;    }}

这个类的解释,嗯。。你们看那个合适吧~

java分布式流式处理组件Producer分区的作用是什么

其实这个逻辑非常简单:

  • 通过AtomicInteger.getAndIncrement()的方式将每次写入平均分配到不同的分区中

  • 不同与其他分区策略类,它不关心Key是否为NULL

我们先来做个小实验吧: 将分区策略类修改为RoundRobinPartitioner,也方便后续自定义分区器的配置操作

config.setProperty(        ProducerConfig.PARTITIONER_CLASS_CONFIG,         "org.apache.kafka.clients.producer.RoundRobinPartitioner");

就这样就能实现,看结果验证~

java分布式流式处理组件Producer分区的作用是什么

中间穿插了一点小知识,那么接下来就会进入到我们最后一个环节:尝试自定义分区器

自定义分区器

前面我们也提到过,相信大家没有忘记partitioner.class这个配置

那么接下来就进入到重头戏:自定义分区器实战编码环节。

public class CustomPartitioner implements Partitioner {    @Override    public void configure(Map<String, ?> configs) {        // nothing    }    @Override    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {        // 如果keyBytes == null        // 直接去0号位置        if (null == keyBytes) {            return 0;        }        // 已默认分区策略实现        int numPartitions = cluster.partitionsForTopic(topic).size();        return BuiltInPartitioner.partitionForKey(keyBytes, numPartitions);    }    @Override    public void close() {        // nothing    }}

我们就先做的简单一点,主要是想让大家明白自定义分区器的实现:

  • 如果没有给定指定key,那么就默认全部去0号分区

  • 否则就通过key做取模计算

当自定义分区器实现完成之后,接下来我们就需要通过发送者进行验证。当然了,主要还是通过partitioner.class进行修改

// 给出关键代码,其他的都是一样的。就不赘述了~~~config.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG, "top.zopx.kafka.partitioner.CustomPartitioner");

通过执行之后,我们来看看它的运行效果是否满足我们的预期

java分布式流式处理组件Producer分区的作用是什么

另一种运行结果与默认分区器有Key的情况类似,这里就不再重复贴图

感谢各位的阅读,以上就是“java分布式流式处理组件Producer分区的作用是什么”的内容了,经过本文的学习后,相信大家对java分布式流式处理组件Producer分区的作用是什么这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是编程网,小编将为大家推送更多相关知识点的文章,欢迎关注!

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

java分布式流式处理组件Producer分区的作用是什么

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

java分布式流式处理组件Producer分区的作用是什么

这篇文章主要讲解了“java分布式流式处理组件Producer分区的作用是什么”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“java分布式流式处理组件Producer分区的作用是什么”吧!为
2023-07-05

java分布式流式处理组件Producer分区理论

这篇文章主要为大家介绍了java分布式流式处理组件Producer分区理论详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
2023-03-07

java分布式流处理组件Producer怎么使用

这篇文章主要讲解了“java分布式流处理组件Producer怎么使用”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“java分布式流处理组件Producer怎么使用”吧!基于Java的API首
2023-07-05

spring分布式调度处理的方法是什么

Spring分布式调度处理可以通过以下几种方式实现:1. 使用Spring Cloud Task:Spring Cloud Task是一个用于构建独立的任务和微服务的框架。它提供了任务的调度、执行和监控功能,可以在分布式环境下部署和执行任务
2023-10-23

分布式文件系统FastDFS的原理是什么

今天就跟大家聊聊有关分布式文件系统FastDFS的原理是什么,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。什么是FastDFS?FastDFS是一个开源的轻量级分布式文件系统。它解决
2023-06-16

python爬虫中分布式爬虫的作用是什么

这篇文章给大家分享的是有关python爬虫中分布式爬虫的作用是什么的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。随着大数据时代的来临,大数据也吸引了越来越多的关注。网络爬虫是一种高效的信息抓取工具,它集成了搜索引
2023-06-15

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录