我的编程空间,编程开发者的网络收藏夹
学习永远不晚

SparkStreaming两种方式连接Flume

短信预约 信息系统项目管理师 报名、考试、查分时间动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

SparkStreaming两种方式连接Flume

SparkStreaming两种方式连接Flume

SparkStreaming 连接Flume的两种方式分别为:Push(推)和Pull(拉)的方式实现,以Spark Streaming的角度来看,Push方式属于推送(由Flume向Spark推送数据);而Pull属于拉取(Spark 拉取 Flume的输出数据);

 Flume向SparkStreaming推送数据没有研究明白,有大佬指点一下吗?

万分感谢!

1.Spark拉取Flume数据:

导入两个jar包到flume/lib下

 否则抛出这两个异常:

org.apache.flume.FlumeException: Unable to load sink type: org.apache.spark.streaming.flume.sink.SparkSink, class: org.apache.spark.streaming.flume.sink.SparkSink

java.lang.IllegalStateException: begin() called when transaction is OPEN!

2.编写flume 工作文件:

a1.sources = r1
a1.sinks = k1
a1.channels = c1
# source
a1.sources.r1.type=spooldir
a1.sources.r1.spoolDir=/home/zhuzhu/apps/flumeSpooding
a1.sources.r1.fileHeader=true

# Describe the sink
a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
# 当前主机端口
a1.sinks.k1.hostname = 192.168.137.88
a1.sinks.k1.port = 9999

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

3.编写SparkStreaming程序:

package day02

import java.net.InetSocketAddress

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent}
import org.apache.spark.{SparkConf, SparkContext}


object StreamingFlume {

  def main(args: Array[String]): Unit = {
    //1.创建SparkConf对象
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("StreamingFlume")
    //2.创建SparkContext对象
    val sc = new SparkContext(conf)
    //设置日志输出格式,只打印异常日志,在这里设置没有用
    //sc.setLogLevel("WARN")
    //3.创建StreamingContext,Seconds(5):轮询机制,多久执行一次
    val ssc = new StreamingContext(sc, Seconds(5))
    //4.定义一个flume集合,可以接受多个flume数据,多个用,隔开需要new
    val addresses = Seq(new InetSocketAddress("127.0.0.1", 5555))
    //5.获取flume中的数据,
    val stream: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createPollingStream(ssc, addresses, StorageLevel.MEMORY_AND_DISK_2)
    // 6.截取flume数据:{"header":xxxxx   "body":xxxxxx}
    val lineDstream: DStream[String] = stream.map(x => new String(x.event.getBody.array()))
    lineDstream.flatMap(x=>x.split(" ")).map(x=>(x,1)).reduceByKey(_+_).print()
    ssc.start()
    ssc.awaitTermination()
  }
}

 4。开启flume监控文件,开启SparkStreaming程序:

向指定目录上传文件

 

 

 

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

SparkStreaming两种方式连接Flume

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

SparkStreaming两种方式连接Flume

SparkStreaming 连接Flume的两种方式分别为:Push(推)和Pull(拉)的方式实现,以Spark Streaming的角度来看,Push方式属于推送(由Flume向Spark推送数据);而Pull属于拉取(Spark 拉取 Flume的输出
SparkStreaming两种方式连接Flume
2020-08-06

python连接telnet和ssh的两种方式是什么

本篇内容主要讲解“python连接telnet和ssh的两种方式是什么”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“python连接telnet和ssh的两种方式是什么”吧!Telnet 连接方
2023-06-25

Java连接服务器的两种方式SFTP和FTP

在项目开发中,一般文件存储很少再使用SFTP服务,但是也不排除合作伙伴使用SFTP来存储项目中的文件或者通过SFTP来实现文件数据的交互,这篇文章主要介绍了Java集成FTP与SFTP连接池
2023-02-25

数据库连接超时java处理的两种方式

这篇文章主要介绍了数据库连接超时java处理的两种方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
2023-05-15

MySQL数据库的两种连接方式:TCP/IP和Socket

Linux平台环境下主要有两种连接方式,一种是TCP/IP连接方式,另一种就是socket连接。在Windows平台下,有name pipe和share memory(不考虑)两种。TCP/IP连接是网络中用得最多的一种方式。环境:MySQL数据库实例IP:1
MySQL数据库的两种连接方式:TCP/IP和Socket
2017-12-06

windows8中无线网络连接的设置两种方式

适用范围:Windows 8 消费者预览版操作步骤: 方案一:1、确保电脑有无线网卡,并且无线网卡的驱动安装正确并且处于开启状态;2、无线路由器或者无线网络环境正常;3、在桌面上找到网络图标。右击选择"属性";4.“在网络和共享
2022-06-04

jdbc连接mysql的五种方式

public void testConnection1() throws SQLException {// 获取Driver的实现类对象Driver driver = new com.mysql.jdbc.Driver();// 第三方的api;String
jdbc连接mysql的五种方式
2021-03-31

iSCSI存储的3种连接方式

我们分析了iSCSI存储的系统结构,下面来看iSCSI是如何与服务器、工作站等主机设备来连接的,也就是我们如何建立一个iSCSI网络存储系统。iSCSI设备的主机接口一般默认都是IP接口,可以直接与以太网络交换机和iSCSI交换机连接,形成
2023-01-31

VMware网络连接的3种方式

1.bridged(桥接模式)在这种模式下,VMWare虚拟出来的操作系统就像是局域网中的一台独立的主机,它可以访问网内任何一台机器。在桥接模式下,你需要手工为虚拟系统配置IP地址、子网掩码,而且还要和宿主机器处于同一网段,这样虚拟系统才能
2023-01-31

Java连接服务器的两种方式SFTP和FTP有什么区别

这篇文章主要介绍了Java连接服务器的两种方式SFTP和FTP有什么区别的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇Java连接服务器的两种方式SFTP和FTP有什么区别文章都会有所收获,下面我们一起来看看吧
2023-07-05

MySQL开放远程连接权限的两种方法

背景:要求做一个评价系统,由于之前被人做过一小部分,有现成的数据库(数据库:mysql 5.7 ,数据库软件:MySQL workbench 6.3 CE),为了省去安装数据库导出导入表的麻烦,想要直接远程连接对方数据库。方法如下:首
2022-06-30

3、vagrant的3种网络连接方式

1. 简介vagrant提供了3种网络连接方式,分别是forwarded ports,host-only networking和bridged networking2. forwarded portsconfig.vm.network "f
2023-01-31

编程热搜

目录