我的编程空间,编程开发者的网络收藏夹
学习永远不晚

java分布式流处理组件Producer怎么使用

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

java分布式流处理组件Producer怎么使用

这篇文章主要讲解了“java分布式流处理组件Producer怎么使用”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“java分布式流处理组件Producer怎么使用”吧!

    基于Java的API

    首先, 在了解生产者发送消息的原理之前,我们应该先学会如何去发送消息。

    Kafka为我们提供了很多项目可以操作的API客户端,包括:

    • C/C++

    • GO

    • Python

    • ...

    通过官网查看API菜单,官方文档上也是Java的版本。我们根据提示一步步操作即可~

    先新建maven项目,并且引入对应的****kafka-clients依赖

    建议:Kafka-clients依赖版本,最好和安装的kafka版本一致

    <dependency>    <groupId>org.apache.kafka</groupId>    <artifactId>kafka-clients</artifactId>    <version>3.3.1</version></dependency>

    同步发送

    Kafka生产者主要靠KafkaProducer来进行操作。点击到对应的文档页面,我们可以看到关于KafkaProducer<K,V> 的详细信息。

    一个好的组件是非常贴心的, 甚至我们都不用去网上搜任何相关的资料,只需要通过查看对应的注释就可以知道这个东西该怎么用。

    Properties config = new Properties();// --bootstrap-serverconfig.setProperty(  ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,   "master:9092,node01:9092,node02:9092");// key 序列化器config.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// value 序列化器config.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");try(Producer<String, String> producer = new KafkaProducer<>(config)) {    ProducerRecord<String, String> record = new ProducerRecord<>(            "newTopic001",            "key01",            "data from " + KafkaQuickProducer.class.getName()    );     RecordMetadata recordMetadata = producer.send(record).get();    System.out.println(            MessageFormat.format("{0}\t{1}\t{2}\t{3}",                     recordMetadata.topic(),                     recordMetadata.partition(),                    recordMetadata.offset(),                     recordMetadata.timestamp()            )    );} catch (Exception e) {    e.printStackTrace();}

    以上代码就是同步发送的过程,这已经是在开发过程中需要配置的最小单元,而其他关于生产者的配置,我们可以通过ProducerConfig来进行查看

    ** 与命令行上的参数,基本上是一模一样的**

    而关于序列化器的问题,我们在下面原理的部分说明

    异步发送

    我们在调用同步send的时候,发现有两个参数的方法, 而这个方法实现的就是****异步发送

    Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);

    异步发送会将发送结果以事件驱动的形式传递,那么这里,我们就需要注意一点:

    • 程序调用完成之后,不能让他立即执行,否则我们无法查看到具体的发送结果

    接下来我们看具体的程序实现。理论上:我们只需要改最后发送的部分

    Properties config = new Properties();// --bootstrap-serverconfig.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "master:9092,node01:9092,node02:9092");// key 序列化器config.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// value 序列化器config.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");try(Producer<String, String> producer = new KafkaProducer<>(config)) {    ProducerRecord<String, String> record = new ProducerRecord<>(            "newTopic001",            "key01",            "data from " + KafkaQuickProducer.class.getName()    );    async(producer, record);} catch (Exception e) {    e.printStackTrace();}// 异步发送private static void async(Producer<String, String> producer, ProducerRecord<String, String> record) {    producer.send(record, (recordMetadata, exception) -> {        if (null != exception) {            exception.printStackTrace();            return;        }        System.out.println(                MessageFormat.format("{0}\t{1}\t{2}\t{3}",                        recordMetadata.topic(),                        recordMetadata.partition(),                        recordMetadata.offset(),                        recordMetadata.timestamp()                )        );    });    try {        // 将程序进行阻塞,防止由于消息发送成功之后进程停止而无法接收到事件反馈        System.in.read();    } catch (IOException e) {        throw new RuntimeException(e);    }}

    这属于整个生产者发送消息方式的最小单元,本文属于Producer入门阶段。

    在ProducerConfig中还包含了非常多的配置项,更多的配置信息我们会在优化章节中说明。

    原理

    java分布式流处理组件Producer怎么使用

    在第一部分,我们已经了解到,关于生产者最基本的使用方式,到这里,其实我想跟大家聊一聊:

    • 生产者在发送消息的时候中间到底经历了什么?

    大家应该已经看到上面的那张原理图,我们可以从中找出答案!

    主线程

    **这里我们分为两个线程块来说明, 第一部分是Main主线程, 也就是生产者在调用****send()**方法时所在的线程

    在这里,我们可以看到:

    • 外部数据首先被封装为ProducerRecord**,然后调用**send()**方法。

    • 在send()过程中,经过拦截器、序列化器、分区器等处理之后进入到RecordAccumulator中。

    接下来我们仔细聊一聊拦截器、序列化器、分区器的作用

    拦截器

    拦截器很类似于我们在SpringMVC中Interceptor的功能,而且在Producer中我们是可以自定义拦截器的。

    我们可以在发送之前对数据进行拦截处理,比如说:统计生产者发送数据的总量等等。

    当然目前来讲,我们如果不开发Kafka监控平台的话,这里拦截器的用处并不大。我们忽略不计即可

    后续如果有机会的话,我们可以专门写篇文章,用来介绍如何开发一个拦截器

    序列化器

    而序列化器,主要对两个部分的数据进行处理:

    • Key

    • Value

    byte[] serializedKey   = serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());byte[] serializedValue  = valueSerializer.serialize(record.topic(), record.headers(), record.value());

    从本质上来讲,外部数据属于属于对象,而对象不能直接通过网络进行传输。 所以我们就需要一个序列化器,将它转换成字节数组,进而进行传输

    java分布式流处理组件Producer怎么使用

    Kafka本身为我们提供了很多可用的序列化器,不过我们能用到最多的还是StringSerializer。

    在生产端将消息进行序列话,那么在消费端必然会进行反序列化操作

    分区器

    我们知道Kafka是以Topic为消息发送的主体,不过由于Topic是一个虚拟的概念, 所以我们没有办法在实际中查看到关于Topic的相关信息。 但是前面我们也说过, 当前Topic下的消息数据都是通过Partition进行存储的。

    发送出去的消息需要存储在哪个分区中就是通过分区器来进行指定的,在我们没有指定分区策略的情况下,生产者会通过默认的分区策略指定当前消息应该存储在哪个分区下

    java分布式流处理组件Producer怎么使用

    分区的内容还是比较多的,我们会在下一节做详细的说明

    RecordAccumulator

    此时,在主线程的区域中,当消息进入到默认大小为32m的记录缓冲区时, 本区的工作就到此结束。

    缓冲区中有多个双端队列,分别对应Topic不同的分区。每一个分区就会创建一个双端队列。

    此时的消息将会被按照批次的方式存放在队列中, 默认一批为16k大小。当缓冲区达到指定条件之后,****sender线程将会被唤醒,Sender程序将会冲队列中不断拉出消息进行下一步的发送

    Sender线程

    影响Sender线程唤醒的条件

    想要唤醒Sender线程有两个因素,但不是说这两个条件都必须满足,他们是或的关系。

    batch.size是一个条件,这也是后期针对生产者优化的主要参数之一。

    当发送消息之后,生产者会将消息进行整合。将其按照一批一批的方式发送给Broker,从而减少网络间的传输请求次数。默认情况下为16k。

    而如果一批数据的大小累计达到了设置的batch.size之后,sender才会做发送数据的操作

    这是第一个限制

    下面再来介绍一个非常强势的参数:liner.ms。生产者优化的主要参数之二。

    这么说吧,如果你设置的liner.ms=0,表示不延迟直接发送。那么batch.size就不会生效了

    而liner.ms=0属于默认配置

    如果数据一直没有达到设置的batch.size大小,数据也不能不发对吧。所以Kafka也就为我们提供了这样的参数:

    • 当sender等待liner.ms设置的时间之后【单位ms】,不管数据如何都会将消息进行发送

    • 如未设置当前参数,表示没有延迟,直接发送

    下面举个小例子

    config.setProperty(ProducerConfig.LINGER_MS_CONFIG, "5000");

    java分布式流处理组件Producer怎么使用

    开始发送

    RecordAccumulator内存储的数据拉取出来之后,开始将其创建为一个个的Request请求。这里需要注意的是:

    • NetworkClient并非一股脑的将全部可发送数据进行传输请求

    正相反,为了能够保证不同分区所对应DQueue的数据进入到对应的Broker所在的分区内,Kafka将按照<BrokerId, Request>的形式对请求进行传输。如果传输到达Broker之后没有acks应答,那么当前节点下最多能够保存5个未响应的请求。

    ACKS

    这里简单聊一下它的应答方式。在ProducerConfig.ACKS_DOC下我们也可以看到相关的说明:

    • acks=0: 生产者不会等待Broker的应答,直接表示消息已经发送成功。而消息有没有真正达到Broker,不关心。

    当然了,这种方式在性能上来讲是最好的,适合一些数据不重要的场景

    • acks=1: 生产者将消息发送到Broker之后,由Leader在本地将消息进行存储之后,返回发送成功的应答。

    如果Follower还没有同步到消息,Leader就已经挂了。那么此时就会出现消息丢失的情况

    • acks=all:生产者将消息发送到Broker之后,由Leader在本地将消息进行存储,并且Follower同步完消息之后才会返回发送成功的应答。

    这种方式是最能保证数据安全的情况,但是性能也是最低的~

    最后:

    • 当Broker返回成功应答之后,RecordAccumulator中的数据将会被清理

    • 如果失败,可以尝试重试等操作

    感谢各位的阅读,以上就是“java分布式流处理组件Producer怎么使用”的内容了,经过本文的学习后,相信大家对java分布式流处理组件Producer怎么使用这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是编程网,小编将为大家推送更多相关知识点的文章,欢迎关注!

    免责声明:

    ① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

    ② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

    java分布式流处理组件Producer怎么使用

    下载Word文档到电脑,方便收藏和打印~

    下载Word文档

    猜你喜欢

    java分布式流处理组件Producer怎么使用

    这篇文章主要讲解了“java分布式流处理组件Producer怎么使用”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“java分布式流处理组件Producer怎么使用”吧!基于Java的API首
    2023-07-05

    java分布式流式处理组件Producer分区理论

    这篇文章主要为大家介绍了java分布式流式处理组件Producer分区理论详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-03-07

    java分布式流式处理组件Producer分区的作用是什么

    这篇文章主要讲解了“java分布式流式处理组件Producer分区的作用是什么”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“java分布式流式处理组件Producer分区的作用是什么”吧!为
    2023-07-05

    redisson分布式限流RRateLimiter怎么使用

    今天小编给大家分享一下redisson分布式限流RRateLimiter怎么使用的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解一下吧
    2023-07-04

    Java GUI流式布局管理器FlowLayout怎么用

    本文小编为大家详细介绍“Java GUI流式布局管理器FlowLayout怎么用”,内容详细,步骤清晰,细节处理妥当,希望这篇“Java GUI流式布局管理器FlowLayout怎么用”文章能帮助大家解决疑惑,下面跟着小编的思路慢慢深入,一
    2023-06-30

    java怎么使用redis实现分布式锁

    在Java中使用Redis实现分布式锁可以通过以下步骤:1. 引入Redis相关的依赖,例如Jedis或Lettuce。2. 创建一个Redis连接池或连接工厂,用于获取Redis连接。3. 使用Redis连接实例,调用setnx命令(或相
    2023-10-09

    java怎么使用redis实现分布式锁

    使用Redis实现Java分布式锁使用RedisSETNX和EXPIRE命令,可以实现分布式锁,协调对共享资源的访问。获取锁时,尝试设置唯一键,并设置过期时间。释放锁时,删除键。使用Redis的分布式特性和命令的易用性,可以简单且健壮地实现分布式锁。但需要注意键的唯一性、过期时间和竞争条件等注意事项。
    java怎么使用redis实现分布式锁
    2024-04-11

    Java中怎么使用Redis实现分布式锁

    这篇“Java中怎么使用Redis实现分布式锁”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“Java中怎么使用Redis实现
    2023-05-25

    怎么在Java中使用redis实现分布式锁

    本篇文章给大家分享的是有关怎么在Java中使用redis实现分布式锁,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。原理剖析上述三种分布式锁都是通过各自为依据对各个请求进行上锁,
    2023-06-15

    怎么使用代理ip进行分布式爬虫

    本篇内容主要讲解“怎么使用代理ip进行分布式爬虫”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“怎么使用代理ip进行分布式爬虫”吧!用过优质的代理ip之后,还能不能不用担心担心?这件事不会那么简单
    2023-06-25

    编程热搜

    • Python 学习之路 - Python
      一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
      Python 学习之路 - Python
    • chatgpt的中文全称是什么
      chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
      chatgpt的中文全称是什么
    • C/C++中extern函数使用详解
    • C/C++可变参数的使用
      可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
      C/C++可变参数的使用
    • css样式文件该放在哪里
    • php中数组下标必须是连续的吗
    • Python 3 教程
      Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
      Python 3 教程
    • Python pip包管理
      一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
      Python pip包管理
    • ubuntu如何重新编译内核
    • 改善Java代码之慎用java动态编译

    目录