我的编程空间,编程开发者的网络收藏夹
学习永远不晚

Kafka连接器建立数据管道

短信预约 信息系统项目管理师 报名、考试、查分时间动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

Kafka连接器建立数据管道

Kafka连接器建立数据管道

1.概述

最近,有同学留言咨询Kafka连接器的相关内容,今天笔者给大家分享一下Kafka连接器建立数据管道的相关内容。

2.内容

Kafka连接器是一种用于Kafka系统和其他系统之间进行功能扩展、数据传输的工具。通过Kafka连接器能够简单、快速的将大量数据集移入到Kafka系统,或者从Kafka系统中移出,例如Kafka连接器可以低延时的将数据库或者应用服务器中的指标数据收集到Kafka系统主题中。
另外,Kafka连接器可以通过作业导出的方式,将Kafka系统主题传输到二次存储和查询系统中,或者传输到批处理系统中进行离线分析。

2.1 使用场景

Kafka连接器通常用来构建数据管道,一般来说有两种使用场景。
1. 开始和结束的端点
第一种,将Kafka系统作为数据管道的开始和结束的端点。例如,将Kafka系统主题中的数据移出到HBase数据库,或者把Oracle数据库中的数据移入到Kafka系统。

 

 

 

2. 数据传输的中间介质
第二种,把Kafka系统作为一个中间传输介质。例如,为了把海量日志数据存储到ElasticSearch中,可以先把这些日志数据传输到Kafka系统中,然后再从Kafka系统中将这些数据移出到ElasticSearch进行存储。

ElasticSearch是一个基于Lucene(Lucene是一款高性能、可扩展的信息检索工具库)实现的存储介质。它提供了一个分布式多用户能力的全文搜索引擎,基RESTful(一种软件架构风格、设计风格,但是并非标准,只是提供了一组设计原则和约束条件)接口实现。

 

 

 

Kafka连接器的存在,给数据管道带来很重要的价值。例如,Kafka连接器可以作为数据管道各个数据阶段的缓冲区,有效的将消费者实例和生产者实例进行解耦。
Kafka系统解除耦合的能力、系统的安全性、数据处理的效率等方面均表现不俗,因而使用Kafka连接器来构建数据管道是一个最佳的选举。

2.2 特性和优势

Kafka连接器包含一些重要的特性,并且给数据管道提供了一个成熟稳定的框架。同时,Kafka连接器还提供了一些简单易用的工具库,大大降低的开发人员的研发成本。
1. 特性
Kafka连接器具体包含的特性如下。

  • 通用框架:Kafka连接器制定了一种标准,用来约束Kafka系统与其他系统集成,简化了Kafka连接器的开发、部署和管理;
  • 单机模式和分布式模式:Kafka连接器支持两种模式,既能扩展到支持大型集群的服务管理,也可以缩小到开发、测试等小规模的集群;
  • REST接口:使用REST API来提交请求并管理Kafka集群;
  • 自动管理偏移量:通过连接器的少量信息,Kafka连接器可以自动管理偏移量;
  • 分布式和可扩展:Kafka连接器是建立在现有的组管理协议上,通过添加更多的连接器实例来水平扩展,实现分布式服务;
  • 数据流和批量集成:利用Kafka系统已有的能力,Kafka连接器是桥接数据流和批处理系统的一种理想的解决方案。

2. 优势
在Kafka连接器中有两个核心的概念,它们分别是Source和Sink。其中Source负责将数据导入到Kafka系统,而Sink则负责将数据从Kafka系统中进行导出。
Source和Sink在实现数据导入和导出的过程被称之连接器,即Source连接器和Sink连接器。这两种连接器提供了对业务层面数据读取和写入的抽象接口,简化了生命周期的管理工作。
在处理数据时,Source连接器和Sink连接器会初始化各自的任务,并将数据结构进行标准化的封装。在实际应用场景中,不同的业务中的数据格式是不一样的,因此,Kafka连接器通过注册数据结构,来解决数据格式验证和兼容性问题。
当数据源发生变化时,Kafka连接器会生成新的数据结构,通过不同的处理策略来完成对数据格式的兼容。

2.3 核心概念

在Kafka连接器中存在几个核心的概念,它们分别是连接器实例(Connectors)、任务数(Tasks)、事件线程数(Workers)、转换器(Converters)。
1. 连机器实例
在Kafka连接器中,连接器实例决定了消息数据的流向,即消息数据从何处复制,以及将复制的消息数据写入到何处。
一个连接器实例负责Kafka系统与其他系统之间的逻辑处理,连接器实例通常以JAR包的形式存在,通过实现Kafka系统应用接口来完成。
2. 任务数
在分布式模式下,每一个连接器实例可以将一个作业切分成多个任务(Task),然后再将任务分发到各个事件线程(Worker)中去执行。任务不会保存当前的状态信息,通常由特定的Kafka主题来保存,例如指定具体属性offset.storage.topic和status.storage.topic的值来保存。
在分布式模式中,会存在任务均衡的概念。当一个连接器实例首次提交到Kafka集群,所有的事件线程都会做一个任务均衡的操作,来保证每一个事件线程都运行差不多数量的任务,避免所有任务集中到某一个事件线程。
3. 事件线程
在Kafka系统中,连接器实例和任务数都是逻辑层面的,需要有具体的线程来执行。在Kafka连接器中,事件线程就是用来执行具体的任务,事件线程包含两种,分别是单机模式和分布式模式。
4. 转换器
转换器会将字节数据转换成Kafka连接器内部的格式,同时,也能将Kafka连接器内部存储的数据格式转换成字节数据。

3.操作连接器

连接器作为Kafka的一部分,随着Kafka系统一起发布,所以无需独立安装。在大数据应用场景下,建议在每台物理机上安装一个Kafka。根据实际需求,可以在一部分物理机上启动Kafka实例(即代理节点Broker),在另一部分物理机上启动连接器。

在Kafka系统中,Kafka连接器最终是以一个常驻进程的形式运行在后台服务中,它提供了一个用来管理连接器实例的REST API。默认情况下,服务端口地址是8083。

提示:
Representational State Transfer,简称REST,即表现层状态转移。REST是所有Web应用程序都应用遵守的一种规范。符合REST设计规范的应用接口,即REST API。

在Kafka连接器中,REST API支持获取、写入、创建等接口,具体内容如下图所示:

 

 在Kafka系统中,Kafka连接器目前支持两种运行模式,它们分别是单机模式和分布式模式。

3.1 单击模式导入

在单机模式下,所有的事件线程都在一个单进程中运行。单机模式使用起来更加简单,特别是在开发和定位分析问题的时候,使用单机模式会比较适合。
(1)编辑单机模式配置文件。
在单机模式下,主题的偏移量是存储在/tmp/connect.offsets目录下,在$KAFKA_HOME/config目录下有一个connect-standalone.properties文件,通过设置offset.storage.file.filename属性值来改变存储路径。
每次Kafka连接器启动时,通过加载$KAFKA_HOME/config/connect-file-source.properties配置文件中的name属性来获取主题的偏移量,然后执行后续的读写操作。

# 设置连接器名称
name=local-file-source
# 指定连接器类
connector.class=FileStreamSource
# 设置最大任务数
tasks.max=1
# 指定读取的文件
file=/tmp/test.txt
# 指定主题名
topic=connect_test

(2)在即将读取的文件中,添加数据,具体操作命令如下。
# 新建一个test.txt文件并添加数据
[hadoop@dn1 ~]$ vi /tmp/test.txt

# 添加内容如下
kafka
hadoop
kafka-connect

# 保存并退出

在使用Kafka文件连接器时,连接器实例会监听配置的数据文件,如果文件中有数据更新,例如:追加新的消息数据。连接器实例会及时处理新增的消息数据。
(3)启动Kafka连接器单机模式的命令与启动Kafka代理节点类似,具体操作命令如下。

# 启动一个单机模式的连接器
[hadoop@dn1 bin]$ ./connect-standalone.sh ../config/connect-standalone.properties
../config/connect-file-source.properties

(4)使用Kafka系统命令查看导入到主题(connect_test)中的数据,具体操作命令如下。

# 使用Kafka命令查看
[hadoop@dn1 bin]$ ./kafka-console-consumer.sh --zookeeper dn1:2181 --topic connect_test
--from-beginning

3.2 分布式模式导入

在分布式模式中,Kafka连接器会自动均衡每个事件线程所处理的任务数。允许用户动态的增加或者减少,在执行任务、修改配置、以及提交偏移量时能够得到容错保障。
在分布式模式中,Kafka连接器会在主题中存储偏移量、配置、以及任务状态。建议手动创建存储偏移量的主题,可以按需设置主题分区数和副本数。
需要注意的是,除了配置一些通用的属性之外,还需要配置以下几个重要的属性。

  • group.id(默认值connect-cluster):连接器组唯一名称,切记不能和消费者组名称冲突;
  • config.storage.topic(默认值connect-configs):用来存储连接器实例和任务配置,需要注意的是,该主题应该以单分区多副本的形式存在,建议手动创建,如果自动创建可能会存在多个分区;
  • offset.storage.topic(默认值connect-offsets):用来存储偏移量,该主题应该以多分区多副本的形式存在;
  • status.storage.topic(默认值connect-status):用来存储任务状态,该主题建议以多分区多副本的形式存在。

在分布式模式中,Kafka连接器配置文件不能使用命令行,需要使用REST API来执行创建、修改和销毁Kafka连接器操作。
(1)编辑分布式模式配置文件(connect-distributed.properties)

# 设置Kafka集群地址
bootstrap.servers=dn1:9092,dn2:9092,dn3:9092
# 设置连接器唯一组名称
group.id=connect-cluster
# 指定键值对JSON转换器类
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# 启用键值对转换器
key.converter.schemas.enable=true
value.converter.schemas.enable=true
# 设置内部键值对转换器, 例如偏移量、配置等
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
# 设置偏移量存储主题
offset.storage.topic=connect_offsets
# 设置配置存储主题
config.storage.topic=connect_configs
# 设置任务状态存储主题
status.storage.topic=connect_status
# 设置偏移量持久化时间间隔
offset.flush.interval.ms=10000

(2)创建偏移量、配置、以及任务状态主题,具体操作命令如下。

# 创建配置主题
kafka-topics.sh --create --zookeeper dn1:2181 --replication-factor 3 --partitions 1
 --topic connect_configs
# 创建偏移量主题
kafka-topics.sh --create --zookeeper dn1:2181 --replication-factor 3 --partitions 6
--topic connect_offsets
# 创建任务状态主题
kafka-topics.sh --create --zookeeper dn1:2181 --replication-factor 3 --partitions 6
 --topic connect_status

(3)启动分布式模式连接器,具体操作命令如下。

# 启动分布式模式连接器
[hadoop@dn1 bin]$ ./connect-distributed.sh ../config/connect-distributed.properties

(4)执行REST API命令查看当前Kafka连接器的版本号,具体操作命令如下。

# 查看连接器版本号
[hadoop@dn1 ~]$ curl http://dn1:8083/

(5)查看当前已安装的连接器插件,通过浏览器访问http://dn1:8083/connector-plugins地址来查看

 

 

(6)创建一个新的连接器实例,具体操作命令如下。

# 创建一个新的连接器实例
[hadoop@dn1 ~]$ curl "http://dn1:8083/connectors" -X POST -i –H
 "Content-Type:application/json" -d "{"name":"distributed-console-source","config":
{"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector",
"tasks.max":"1","topic":"distributed_connect_test",
"file":"/tmp/distributed_test.txt"}}"

然后在浏览器访问http://dn1:8083/connectors地址查看当前成功创建的连接器实例名称,如下图所示:

 

 

(7)查看使用分布式模式导入到主题(distributed_connect_test)中的数据,具体操作命令如下。

# 在文件/tmp/distributed_test.txt中添加消息数据
[hadoop@dn1 ~]$ vi /tmp/distributed_test.txt

# 添加如下内容(这条注释不要写入到distributed_test.txt文件中)
distributed_kafka
kafka_connection
kafka
hadoop

# 然后保存并退出(这条注释不要写入到distributed_test.txt文件中)

# 使用Kafka系统命令,查看主题distributed_connect_test中的数据
[hadoop@dn1 ~]$ kafka-console-consumer.sh --zookeeper dn1:2181 –topic
 distributed_connect_test --from-beginning

4.总结

Kafka 连接器可以从DB存储或应用程序服务器收集数据到Topic,使数据可用于低延迟的流处理。导出作业可以将数据从Topic传输到二次存储和查询系统,或者传递到批处理系统以便进行离线分析。

5.结束语

这篇博客就和大家分享到这里,如果大家在研究学习的过程当中有什么问题,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!

另外,博主出书了《Kafka并不难学》和《Hadoop大数据挖掘从入门到进阶实战》,喜欢的朋友或同学, 可以在公告栏那里点击购买链接购买博主的书进行学习,在此感谢大家的支持。关注下面公众号,根据提示,可免费获取书籍的教学视频。

TRANSLATE with x English
Arabic Hebrew Polish
Bulgarian Hindi Portuguese
Catalan Hmong Daw Romanian
Chinese Simplified Hungarian Russian
Chinese Traditional Indonesian Slovak
Czech Italian Slovenian
Danish Japanese Spanish
Dutch Klingon Swedish
English Korean Thai
Estonian Latvian Turkish
Finnish Lithuanian Ukrainian
French Malay Urdu
German Maltese Vietnamese
Greek Norwegian Welsh
Haitian Creole Persian  
  TRANSLATE with COPY THE URL BELOW Back EMBED THE SNIPPET BELOW IN YOUR SITE
" readonly="readonly"> Enable collaborative features and customize widget: Bing Webmaster Portal Back 联系方式:
邮箱:smartloli.org@gmail.com
Twitter:https://twitter.com/smartloli
QQ群(Hadoop - 交流社区1):424769183
QQ群(Kafka并不难学): 825943084
温馨提示:请大家加群的时候写上加群理由(姓名+公司/学校),方便管理员审核,谢谢!

热爱生活,享受编程,与君共勉!


公众号:


作者:哥不是小萝莉 [关于我][犒赏]

出处:http://www.cnblogs.com/smartloli/

转载请注明出处,谢谢合作!

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

Kafka连接器建立数据管道

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

Kafka连接器建立数据管道

1.概述最近,有同学留言咨询Kafka连接器的相关内容,今天笔者给大家分享一下Kafka连接器建立数据管道的相关内容。2.内容Kafka连接器是一种用于Kafka系统和其他系统之间进行功能扩展、数据传输的工具。通过Kafka连接器能够简单、快速的将大量数据集移
Kafka连接器建立数据管道
2014-11-30

jdbc怎么建立数据库连接

JDBC(Java Database Connectivity)可以通过以下步骤建立数据库连接:1. 导入JDBC相关的库文件。 在Java项目中,需要导入JDBC相关的库文件,一般通过引入JDBC驱动程序来实现。不同数据库对应的JDBC
jdbc怎么建立数据库连接
2024-02-29

jdbc如何建立数据库连接

使用JDBC建立数据库连接的步骤如下:1. 加载数据库驱动程序:首先需要加载特定数据库的驱动程序,例如MySQL的驱动程序是com.mysql.jdbc.Driver。可以通过Class.forName()方法来加载驱动程序。2. 建立数据
2023-10-25

php如何建立数据库连接

在 PHP 中,可以使用 mysqli 或 PDO 扩展来建立数据库连接。使用 mysqli 扩展建立数据库连接的步骤如下:1. 使用 `mysqli_connect()` 函数来连接数据库。该函数接受四个参数,分别是数据库主机名、用户名、
2023-09-05

mysql怎么建立连接数据库

要建立 mysql 数据库连接,请按照以下步骤操作: 1. 安装 mysql 2. 启动 mysql 服务 3. 使用 mysql 客户程序连接到 mysql 服务,提供用户名、密码、主机名或 ip 地址、端口号 4. 使用 "use [数
mysql怎么建立连接数据库
2024-08-05

windows云服务器怎么建立数据库连接文件管理

准备硬件环境在开始建立连接文件之前,需要准备硬件环境。您需要购买适当的服务器硬件,如CPU、内存、磁盘和网络适配器等。如果您不确定自己的需求,可以联系云服务提供商的技术支持。安装云服务提供商提供的云服务器安装云服务提供商的云服务器操作系统和应用程序。这可以帮助您更好地管理您的云服务器,并使其具有可用性和可扩展性。您可以
windows云服务器怎么建立数据库连接文件管理
2023-10-28

access数据库怎么建立和连接

要建立和连接Access数据库,您可以按照以下步骤操作:1. 打开Microsoft Access软件。2. 在主界面上,点击“新建”按钮,选择“空白数据库”选项。3. 在弹出的对话框中,选择保存数据库的位置和命名,然后点击“创建”按钮。4
2023-09-08

JDBC建立数据库连接的代码

本文实例为大家分享了JDBC建立数据库连接的具体代码,供大家参考,具体内容如下import java.sql.DriverManager;import java.sql.ResultSet;import java.sql.SQLExcept
2023-05-30

windows云服务器怎么建立数据库连接

在WindowsServer2008或之后版本中,您可以使用以下步骤建立数据库连接:打开您的Windows服务(例如:Apache、Tomcat或MySQL服务器),并在其中打开"数据库"和"连接"选项卡。选择您想要连接的数据库,并选择"连接方式"(例如"本地连接"、"远程连接"或"RDP协议")。在"连接设置"中,选择"启用RDP协议",以便您可
2023-10-25

云服务器的数据库怎么建立连接

注册云服务器:在注册账号时,用户需要选择云服务器的类型,例如MySQL、MongoDB等。注册成功后,你可以在云服务器中选择要连接的数据库。安装云服务器:在安装云服务器之前,用户需要选择要连接的数据库,例如MongoDB。然后,你可以下载并安装数据库驱动程序,这可以帮助你轻松地在云服务器中连接到所需的数据库。配置云服务
云服务器的数据库怎么建立连接
2023-10-28

windows云服务器怎么建立数据库连接文件管理权限

Windows云服务器可以使用本地账户来连接和管理数据库。以下是建立数据库连接文件管理权限的步骤:在Windows控制面板中找到“管理工具”按钮并点击它。在打开的“管理工具”对话框中,选择“文件和打印机”选项。在弹出的对话框中,输入“连接到我的服务器/计算机”,并点击“下一步”。在“连接到我的服务器/计算机”下方,选择“我是一个本地连接使用者&rdqu
2023-10-26

PHP怎么建立和关闭数据库连接

本篇内容主要讲解“PHP怎么建立和关闭数据库连接”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“PHP怎么建立和关闭数据库连接”吧!PHP建立和关闭数据库连接之mysql_connect()res
2023-06-17

怎么建立云服务器连接数据库服务器端

建立云服务器到数据库服务器连接指南了解如何在云服务器上建立与数据库服务器的连接,让应用程序访问数据。本指南涵盖数据库选择、服务器创建、安全组配置、客户端安装、连接建立和验证,以及最佳实践,如加密、权限最小化和监控。遵循这些步骤,确保安全可靠的连接,满足应用程序需求,保障数据安全和业务连续性。
怎么建立云服务器连接数据库服务器端
2024-04-12

windows云服务器怎么建立数据库连接方式

Windows云服务器是一种提供了在互联网上存储和共享文件的服务器。您可以使用Windows提供的DirectAccess工具来创建一个WordPress客户端。以下是创建一个数据库连接的步骤:打开Windows操作系统,进入控制面板,找到WordPress或类似的软件。右键单击“WordPress”或类似的软件,然后选择“创建WordPress连接”。在创建连接的对话框中,输入“登录名
2023-10-26

怎么建立云服务器连接数据库服务器端

要建立云服务器与数据库服务器的连接,需要按照以下步骤操作:1. 确保云服务器和数据库服务器都正常运行,并且网络通畅。2. 在云服务器上安装数据库客户端软件,例如MySQL客户端。3. 打开数据库客户端软件,输入数据库服务器的IP地址、端口号
2023-09-22

windows云服务器怎么建立数据库连接文件

要在WindowsServer2016上建立一个数据库连接文件,可以按照以下步骤操作:打开“控制面板”,选择“管理工具”。打开“连接”。在“连接”窗格中,选择“本地连接”。点击右侧的“新建连接”。在弹出的“创建新端口”对话框中选择要用于连接的数据库服务器的端口号。点击“下一步”。输入数据库服务器的IP地址和端口(如果
2023-10-26

PHP PDO 与 MySQL:建立强大的数据库连接

使用 PHP PDO 与 MySQL 建立数据库连接对于任何 web 应用程序来说都是至关重要的。本文将深入探讨 PDO 的优点,并提供逐步的代码演示,指导您建立一个强大的数据库连接。
PHP PDO 与 MySQL:建立强大的数据库连接
2024-02-17

windows云服务器怎么建立数据库连接文件夹

在WindowsServer2008或以前版本中,你可以使用WindowsAPI来创建一个名为"database.configuration"的文件,该文件位于"varlibmiddleware.so"组中,其中包含以下信息:database_name:该数据库的名称database_path:数据库的根目录connection_type:连接类型以下是具体的设置信息:datab
2023-10-26

热门标签

编程热搜

编程资源站

目录