我的编程空间,编程开发者的网络收藏夹
学习永远不晚

如何解析Flume与Kafka整合

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

如何解析Flume与Kafka整合

这篇文章给大家介绍如何解析Flume与Kafka整合,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。

Flume与Kafka整合


一、概念
1、Flume:Cloudera 开发的分布式日志收集系统,是一种分布式,可靠且可用的服务,用于高效地收集,汇总和移动大量日志数据。 它具有基于流式数据流的简单而灵活的架构。它具有可靠的可靠性机制和许多故障转移和恢复机制,具有强大的容错性和容错能力。它使用一个简单的可扩展数据模型,允许在线分析应用程序。Flume分为OG、NG版本,其中Flume OG 的最后一个发行版本 0.94.0,之后为NG版本。
 
2、Kafka:作为一个集群运行在一台或多台可以跨越多个数据中心的服务器上。在Kafka中,客户端和服务器之间的通信是通过一种简单的,高性能的,语言不可知的TCP协议完成的。协议是版本控制的,并保持与旧版本的向后兼容性。Kafka提供Java客户端,但客户端可以使用多种语言。

3、Kafka通常用于两大类应用,如下:
   A、构建可在系统或应用程序之间可靠获取数据的实时流数据管道
   B、构建实时流应用程序,用于转换或响应数据流
   C、Kafka每个记录由一个键,一个值和一个时间戳组成。

二、产述背景
    基于大数据领域实现日志数据时时采集及数据传递等需要,据此需求下试着完成flume+kafka扇入、扇出功能整合,其中扇出包括:复制流、复用流等功能性测试。后续根据实际需要,将完善kafka与spark streaming进行整合整理工作。
    注:此文档仅限于功能性测试,性能优化方面请大家根据实际情况增加。

三、部署安装
1、测试环境说明:
   操作系统:CentOS 7
   Flume版本:flume-ng-1.6.0-cdh6.7.0
   Kafka版本:kafka_2.11-0.10.0.1
   JDK版本:JDK1.8.0
   Scala版本:2.11.8
2、测试步骤:
2.1、flume部署
2.1.1、下载安装介质,并解压:

此处)折叠或打开

此处)折叠或打开

此处)折叠或打开

  1. cd /app/apache-flume-1.6.0-cdh6.7.0-bin

  2. vi netcatOrKafka-memory-logger.conf

  3.     netcatagent.sources = netcat_sources

  4.     netcatagent.channels = c1 c2

  5.     netcatagent.sinks = logger_sinks kafka_sinks

  6.     

  7.     netcatagent.sources.netcat_sources.type = netcat

  8.     netcatagent.sources.netcat_sources.bind = 0.0.0.0

  9.     netcatagent.sources.netcat_sources.port = 44444

  10.     

  11.     netcatagent.channels.c1.type = memory

  12.     netcatagent.channels.c1.capacity = 1000

  13.     netcatagent.channels.c1.transactionCapacity = 100

  14.     

  15.     netcatagent.channels.c2.type = memory

  16.     netcatagent.channels.c2.capacity = 1000

  17.     netcatagent.channels.c2.transactionCapacity = 100

  18.     

  19.     netcatagent.sinks.logger_sinks.type = logger

  20.     

  21.     netcatagent.sinks.kafka_sinks.type = org.apache.flume.sink.kafka.KafkaSink

  22.     netcatagent.sinks.kafka_sinks.topic = test

  23.     netcatagent.sinks.kafka_sinks.brokerList = 192.168.137.132:9082,192.168.137.133:9082,192.168.137.134:9082

  24.     netcatagent.sinks.kafka_sinks.requiredAcks = 0

  25.     ##netcatagent.sinks.kafka_sinks.batchSize = 20

  26.     netcatagent.sinks.kafka_sinks.producer.type=sync

  27.     netcatagent.sinks.kafka_sinks.custom.encoding=UTF-8

  28.     netcatagent.sinks.kafka_sinks.partition.key=0

  29.     netcatagent.sinks.kafka_sinks.serializer.class=kafka.serializer.StringEncoder

  30.     netcatagent.sinks.kafka_sinks.partitioner.class=org.apache.flume.plugins.SinglePartition

  31.     netcatagent.sinks.kafka_sinks.max.message.size=1000000

  32.     

  33.     netcatagent.sources.netcat_sources.selector.type = replicating

  34.     

  35.     netcatagent.sources.netcat_sources.channels = c1 c2

  36.     netcatagent.sinks.logger_sinks.channel = c1

  37.     netcatagent.sinks.kafka_sinks.channel = c2

2.4.2、启动各测试命令:
   A、启动flume的agent(于192.168.137.130):
      flume-ng agent --name netcatagent \
       --conf $FLUME_HOME/conf \
       --conf-file $FLUME_HOME/conf/netcat-memory-loggerToKafka.conf \
       -Dflume.root.logger=INFO,console
    B、启动kafka消费者(于192.168.137.132):
        kafka-console-consumer.sh \
         --zookeeper 192.168.137.132:2181,192.168.137.133:2181,192.168.137.134:2181/kafka \
          --from-beginning --topic test
     C、测试发送(于192.168.137.130与于192.168.137.132)
telnet发送结果

如何解析Flume与Kafka整合

kafka消费结果

如何解析Flume与Kafka整合

最终logger接收结果

如何解析Flume与Kafka整合

         
   至此flume+kafka扇出--复制流测试(扇入源为:netcat;输出为:kafka+Flume的Logger)测试与验证完成。
   
2.5、flume+kafka扇出--复用流测试(扇入源为:netcat;输出为:kafka+Flume的Logger)

   暂无,后续补充



四、部署安装及验证过程中出现的问题

    1、做flume+kafka扇入测试(扇入源为:netcat+kafka;输出以Flume的Logger类型输出)时,一直未收到kafka数据
        主要原因是在做kafka的配置时在配置文件(server.properties)中写成内容:
       zookeeper.connect=192.168.137.132:2181,192.168.137.133:2181,192.168.137.134:2181
       但在创建topics时,使用的是:
       kafka-topics.sh --create \
   --zookeeper 192.168.137.132:2181,192.168.137.133:2181,192.168.137.134:2181/kafka \
--replication-factor 3 --partitions 3 --topic test
 
其中在kafka的配置文件中zookeeper配置未加/kakfa,但在创建topics的时增加了/kafka
最终使用:
kafka-console-producer.sh \
--broker-list 192.168.137.132:9092,192.168.137.133:9092,192.168.137.134:9092 \
--topic test
命令检查没有topics信息才发现此问题
   
   解决办法:将两个信息同步即可
   
2、做flume+kafka扇入测试(扇入源为:netcat+kafka;输出以Flume的Logger类型输出)时,启动flume的agent时报错。
2018-03-31 10:43:31,241 (conf-file-poller-0) [ERROR - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:142)] Failed to load configuration data. Exception follows.
org.apache.flume.FlumeException: Unable to load source type: org.apache.flume.source.kafka,KafkaSource, class: org.apache.flume.source.kafka,KafkaSource
        at org.apache.flume.source.DefaultSourceFactory.getClass(DefaultSourceFactory.java:69)
        at org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:42)
        at org.apache.flume.node.AbstractConfigurationProvider.loadSources(AbstractConfigurationProvider.java:322)
        at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:97)
        at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.flume.source.kafka,KafkaSource
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:264)
        at org.apache.flume.source.DefaultSourceFactory.getClass(DefaultSourceFactory.java:67)
        ... 11 more


   解决办法:官网资料存在问题,org.apache.flume.source.kafka,KafkaSource其中不应该包括逗号,改为:org.apache.flume.source.kafka.KafkaSource即可。详细官网
如何解析Flume与Kafka整合

关于如何解析Flume与Kafka整合就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

如何解析Flume与Kafka整合

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

如何解析Flume与Kafka整合

这篇文章给大家介绍如何解析Flume与Kafka整合,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。Flume与Kafka整合一、概念1、Flume:Cloudera 开发的分布式日志收集系统,是一种分布式,可靠且可用的
2023-06-03

如何深度解析Kafka

如何深度解析Kafka,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。背景介绍Kafka简介Kafka是一种分布式的,基于发布/订阅的消息系统。主要设计目标如下:
2023-06-17

redis与ssm如何整合

这篇文章主要介绍redis与ssm如何整合,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!SSM+redis整合ssm框架之前已经搭建过了,这里不再做代码复制工作。这里主要是利用redis去做mybatis的二级缓存,
2023-05-30

如何进行storm1.1.3与kafka1.0.0整合

本篇文章给大家分享的是有关如何进行storm1.1.3与kafka1.0.0整合,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。package hgs.core.sk;impor
2023-06-02

如何解析SAP Data Intelligence Modeler里的Kafka Producer和Kafka Consumer

这篇文章将为大家详细讲解有关如何解析SAP Data Intelligence Modeler里的Kafka Producer和Kafka Consumer,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有
2023-06-03

如何将spring与quartz进行整合

如何将spring与quartz进行整合?相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。第0步:在spring配置包扫描以及在 pom导入包spring.xml:pom.xml1
2023-05-31

如何使用spring boot整合kafka和延迟启动消费者

这篇文章给大家分享的是有关如何使用spring boot整合kafka和延迟启动消费者的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。spring boot 整合kafka,延迟启动消费者spring boot整合
2023-06-20

如何解析Kafka 消息丢失与消费精确一次性

今天就跟大家聊聊有关如何解析Kafka 消息丢失与消费精确一次性,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。消息丢失的场景如果Kafka Producer使用“发后即忘”的方式发送
2023-06-01

tomcat插件与Jrebel插件如何整合

这篇文章将为大家详细讲解有关tomcat插件与Jrebel插件如何整合,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。工欲善其事必先利其器tomcat插件是没装myeclipse插件必备的,它减少了部署工程
2023-06-17

Kafka消费与心跳机制如何理解

Kafka消费与心跳机制如何理解,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。导读kafka是一个分布式,分区的,多副本的,多订阅者的消息发布订阅系统(分布式MQ系统),可以用
2023-06-15

如何解析Kafka 1.0.0 多消费者示例

如何解析Kafka 1.0.0 多消费者示例,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。package kafka.demo;import java.util.HashMap
2023-06-02

SpringBoot如何整合ES解析搜索返回字段问题

这篇文章主要讲解了“SpringBoot如何整合ES解析搜索返回字段问题”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“SpringBoot如何整合ES解析搜索返回字段问题”吧!1. 数据构造
2023-07-06

如何解析Kafka中的时间轮问题

这期内容当中小编将会给大家带来有关如何解析Kafka中的时间轮问题,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。写在前面kafka是一个分布式消息中间件,其高可用高吞吐的特点是大数据领域首选的消息中间件,
2023-06-01

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录