我的编程空间,编程开发者的网络收藏夹
学习永远不晚

SparkSQl中运行原理的示例分析

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

SparkSQl中运行原理的示例分析

这篇文章将为大家详细讲解有关SparkSQl中运行原理的示例分析,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。

一:什么是SparkSQL?

(一)SparkSQL简介

Spark SQL是Spark的一个模块,用于处理结构化的数据,它提供了一个数据抽象DataFrame(最核心的编程抽象就是DataFrame),并且SparkSQL作为分布式SQL查询引擎。
Spark SQL就是将SQL转换成一个任务,提交到集群上运行,类似于Hive的执行方式。

(二)SparkSQL运行原理

将Spark SQL转化为RDD,然后提交到集群执行。

(三)SparkSQL特点

(1)容易整合,Spark SQL已经集成在Spark中

(2)提供了统一的数据访问方式:JSON、CSV、JDBC、Parquet等都是使用统一的方式进行访问

(3)兼容 Hive

(4)标准的数据连接:JDBC、ODBC

二:DataFrame

(一)什么是DataFrame?

在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格。

DataFrame是组织成命名列的数据集。

它在概念上等同于关系数据库中的表,但在底层具有更丰富的优化。

关系型数据库中的表由表结构和数据组成,而DataFrame也类似,由schema(结构)和数据组成,其数据集是RDD。

SparkSQl中运行原理的示例分析

DataFrame可以根据很多源进行构建,包括:结构化的数据文件,hive中的表,外部的关系型数据库,以及RDD

补充:Spark中的RDD、DataFrame和DataSet讲解

(一)Spark中的模块

SparkSQl中运行原理的示例分析

上图展示了Spark的模块及各模块之间的关系:

底层是Spark-core核心模块,Spark每个模块都有一个核心抽象,Spark-core的核心抽象是RDD,

Spark SQL等都基于RDD封装了自己的抽象,在Spark SQL中是DataFrame/DataSet。

相对来说RDD是更偏底层的抽象,DataFrame/DataSet是在其上做了一层封装,做了优化,使用起来更加方便。

从功能上来说,DataFrame/DataSet能做的事情RDD都能做,RDD能做的事情DataFrame/DataSet不一定能做。

(二)RDD和DataFrame的区别

DataFrame与RDD的主要区别在于:

DataFrame

DataFrame带有schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型。

SparkSQl中运行原理的示例分析

使得Spark SQL得以洞察更多的结构信息,从而对藏于DataFrame背后的数据源以及作用于DataFrame之上的变换进行了针对性的优化,最终达到大幅提升运行时效率的目标。

RDD

RDD,由于无从得知所存数据元素的具体内部结构,Spark Core只能在stage层面进行简单、通用的流水线优化。

DataFrame和RDD联系:

DataFrame底层是以RDD为基础的分布式数据集,和RDD的主要区别的是:RDD中没有schema信息,而DataFrame中数据每一行都包含schema

DataFrame = RDD[Row] + shcema

三:SparkSession

(一)SparkSession简介

SparkSession是Spark 2.0引如的新概念。SparkSession为用户提供了统一的切入点,来让用户学习spark的各项功能。

在spark的早期版本中,SparkContext是spark的主要切入点,由于RDD是主要的API,我们通过sparkcontext来创建和操作RDD。

对于每个其他的API,我们需要使用不同的context。

例如,对于Streming,我们需要使用StreamingContext;对于sql,使用sqlContext;对于Hive,使用hiveContext。

但是随着DataSet和DataFrame的API逐渐成为标准的API,就需要为他们建立接入点。所以在spark2.0中,引入SparkSession作为DataSet和DataFrame API的切入点。

SparkSession封装了SparkConf、SparkContext和SQLContext。为了向后兼容,SQLContext和HiveContext也被保存下来。

(二)SparkSession实质

SparkSession实质上是SQLContext和HiveContext的组合(未来可能还会加上StreamingContext),所以在SQLContext和HiveContext上可用的API在SparkSession上同样是可以使用的。

SparkSession内部封装了sparkContext,所以计算实际上是由sparkContext完成的。

(三)SparkSession特点

   ----为用户提供一个统一的切入点使用Spark各项功能

----允许用户通过它调用DataFrame和Dataset相关 API来编写程序

----减少了用户需要了解的一些概念,可以很容易的与Spark进行交互

----与Spark交互之时不需要显示的创建SparkConf, SparkContext以及 SQlContext,这些对象已经封闭在SparkSession中

四:通过RDD创建DataFrame

(一)通过样本类创建(反射)

case class People(val name:String,val age:Int)  //可以声明数据类型object WordCount {  def main(args:Array[String]):Unit={    val conf = new SparkConf()    //设置运行模式为本地运行,不然默认是集群模式    //conf.setMaster("local")  //默认是集群模式    //设置任务名    conf.setAppName("WordCount").setMaster("local")    conf.set("spark.default.parallelism","5")    //设置SparkContext,是SparkCore的程序入口    val sc = new SparkContext(conf)    val Sqlsc = new SQLContext(sc)  //根据SparkContext生成SQLContext        val array = Array("mark,14","kitty,23","dasi,45")    val peopleRDD = sc.parallelize(array).map(line=>{    //生成RDD      People(line.split(",")(0),line.split(",")(1).trim().toInt)    })        import Sqlsc.implicits._  //引入全部方法    //将RDD转换成DataFrame    val df = peopleRDD.toDF()      //将DataFrame转换成一个临时的视图    df.createOrReplaceTempView("people")    //使用SQL语句进行查询    Sqlsc.sql("select * from people").show()  }}

SparkSQl中运行原理的示例分析

(二)通过SparkSession创建DataFrame

object WordCount {  def main(args:Array[String]):Unit={    val conf = new SparkConf()    //设置运行模式为本地运行,不然默认是集群模式    //conf.setMaster("local")  //默认是集群模式    //设置任务名    conf.setAppName("WordCount").setMaster("local")    conf.set("spark.default.parallelism","5")    //设置SparkContext,是SparkCore的程序入口    val sc = new SparkContext(conf)    val Sqlsc = new SQLContext(sc)  //根据SparkContext生成SQLContext        val array = Array("mark,14","kitty,23","dasi,45")    //1.需要将RDD数据映射成Row,需要引入import org.apache.spark.sql.Row    val peopleRDD = sc.parallelize(array).map(line=>{    //生成RDD      val fields = line.split(",")      Row(fields(0),fields(1).trim().toInt)    })        //2.创建StructType定义结构    val st:StructType = StructType(      //字段名,字段类型,是否可以为空      List(  //传参是列表类型,或者使用StructField("name", StringType, true) :: StructField("age", IntegerType, true) :: Nil来构建列表          StructField("name",StringType,true),          StructField("age",IntegerType,true)          )    )        //3.使用SparkSession建立DataFrame    val df = Sqlsc.createDataFrame(peopleRDD,st)    //将DataFrame转换成一个临时的视图    df.createOrReplaceTempView("people")    //使用SQL语句进行查询    Sqlsc.sql("select * from people").show()  }}

SparkSQl中运行原理的示例分析

(三)通过 json 文件创建DataFrames

[{"name":"dafa","age":12},{"name":"safaw","age":17},{"name":"ge","age":34}]
def main(args:Array[String]):Unit={    val conf = new SparkConf()    //设置运行模式为本地运行,不然默认是集群模式    //conf.setMaster("local")  //默认是集群模式    //设置任务名    conf.setAppName("WordCount").setMaster("local")    //设置SparkContext,是SparkCore的程序入口    val sc = new SparkContext(conf)    val Sqlsc = new SQLContext(sc)  //根据SparkContext生成SQLContext        //通过json数据直接创建DataFrame    val df = Sqlsc.read.json("E:\\1.json")        //将DataFrame转换成一个临时的视图    df.createOrReplaceTempView("people1")    //使用SQL语句进行查询    Sqlsc.sql("select * from people1").show()  }

SparkSQl中运行原理的示例分析

五:临时视图

(一)什么是视图

视图是一个虚表,跟Mysql里的概念是一样的,视图基于实际的表而存在,其实质是一系列的查询语句

(二)类型

局部视图(Temoporary View):只在当前会话中有效,如果创建它的会话终止,则视图也会消失。

全局视图(Global Temporary View): 在全局范围内有效,不同的Session中都可以访问,生命周期是Spark的Application运行周期,全局视图会绑定到系统保留的数据库global_temp中,因此使用它的时候必须加上相应前缀。

(三)创建视图

创建局部视图:df.createOrReplaceTempView("emp")
创建全局视图:df.createOrReplaceGlobalTempView("empG")

(四)视图查询

spark.sql("select * from emp").show
spark.sql("select * from global_temp.empG").show  //查询全局视图,需要添加前缀

(五)会话周期

spark.newSession.sql("select * from emp").show -----> 报错,Table or View Not Found
spark.newSession.sql("select * from global_temp.empG").show ---->可以正常查询

六:DataFrame的read和save和savemode

(一)数据读取

val conf = new SparkConf().setAppName("TestDataFrame2").setMaster("local")    val sc = new SparkContext(conf)    val sqlContext = new SQLContext(sc)    //方式一    val df1 = sqlContext.read.json("E:\\666\\people.json")    val df2 = sqlContext.read.parquet("E:\\666\\users.parquet")    //方式二    val df3 = sqlContext.read.format("json").load("E:\\666\\people.json")    val df4 = sqlContext.read.format("parquet").load("E:\\666\\users.parquet")    //方式三,默认是parquet格式    val df5 = sqlContext.load("E:\\666\\users.parquet")   //方式四,使用MySQL进行数据源读取    val url = "jdbc:mysql://192.168.123.102:3306/hivedb"    val table = "dbs"    val properties = new Properties()    properties.setProperty("user","root")    properties.setProperty("password","root")    //需要传入Mysql的URL、表明、properties(连接数据库的用户名密码)    val df = sqlContext.read.jdbc(url,table,properties)    df.createOrReplaceTempView("dbs")    sqlContext.sql("select * from dbs").show()

使用Hive作为数据源:需要在pom.xml文件中添加依赖

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive -->        <dependency>            <groupId>org.apache.spark</groupId>            <artifactId>spark-hive_2.11</artifactId>            <version>2.3.0</version>        </dependency>

开发环境则把resource文件夹下添加hive-site.xml文件,集群环境把hive的配置文件要发到$SPARK_HOME/conf目录下

SparkSQl中运行原理的示例分析

<configuration>        <property>                <name>javax.jdo.option.ConnectionURL</name>                <value>jdbc:mysql://localhost:3306/hivedb?createDatabaseIfNotExist=true</value>                <description>JDBC connect string for a JDBC metastore</description>                <!-- 如果 mysql 和 hive 在同一个服务器节点,那么请更改 hadoop02 为 localhost -->        </property>        <property>                <name>javax.jdo.option.ConnectionDriverName</name>                <value>com.mysql.jdbc.Driver</value>                <description>Driver class name for a JDBC metastore</description>        </property>        <property>                <name>javax.jdo.option.ConnectionUserName</name>                <value>root</value>                <description>username to use against metastore database</description>        </property>        <property>                <name>javax.jdo.option.ConnectionPassword</name>                <value>root</value>        <description>password to use against metastore database</description>        </property>    <property>                <name>hive.metastore.warehouse.dir</name>                <value>/hive/warehouse</value>                <description>hive default warehouse, if nessecory, change it</description>        </property>  </configuration>hive-site.xml配置文件
val conf = new SparkConf().setMaster("local").setAppName(this.getClass.getSimpleName)    val sc = new SparkContext(conf)    val sqlContext = new HiveContext(sc)    sqlContext.sql("select * from myhive.student").show()

(二)数据保存

val conf = new SparkConf().setAppName("TestDataFrame2").setMaster("local")    val sc = new SparkContext(conf)    val sqlContext = new SQLContext(sc)    val df1 = sqlContext.read.json("E:\\666\\people.json")    //方式一    df1.write.json("E:\\111")    df1.write.parquet("E:\\222")    //方式二    df1.write.format("json").save("E:\\333")    df1.write.format("parquet").save("E:\\444")    //方式三    df1.write.save("E:\\555")

(三)数据保存模式

df1.write.format("parquet").mode(SaveMode.Ignore).save("E:\\444")

七:数据集DataSet

Dataset也是一个分布式数据容器,简单来说是类似二维表,Dataset里头存有schema数据结构信息和原生数据,Dataset的底层封装的是RDD,当RDD的泛型是Row类型的时候,我们也可以称它为DataFrame。即Dataset<Row> = DataFrame。DataFrame是特殊的Dataset。

Spark整合了Dataset和DataFrame,前者是有明确类型的数据集,后者是无明确类型的数据集。根据官方的文档:

Dataset是一种强类型集合,与领域对象相关,可以使用函数或者关系进行分布式的操作。
每个Dataset也有一个无类型的视图,叫做DataFrame,也就是关于Row的Dataset。
简单来说,Dataset一般都是Dataset[T]形式,这里的T是指数据的类型,如上图中的Person,而DataFrame就是一个Dataset[Row]。

Datasets是懒加载的,即只有actions被调用的时候才会触发计算。在内部,Dataset代表一个逻辑计划,用来描述产生数据需要的计算。当一个action被调用的时候,Spark的query优化器会优化这个逻辑计划并以分布式的方式在物理上进行实际的计算操作。

(一)创建和使用DataSet---使用序列

(1,"Tom")  (2,"Mary")

测试数据

(1)定义case class
             case class MyData(a:Int,b:String)
(2)使用序列创建DataSet
             val DS = Seq(MyData(1,"Tom"),MyData(2,"Mary")).toDS

(二)创建和使用DataSet---通过case class作为编码器,将DataFrame转换成DataSet

(1)定义case class
             case class Person(name:String,age:BigInt)
(2)读入JSON的数据
             val df = spark.read.json("/root/temp/people.json")
(3)将DataFrame转换成DataSet
             val PersonDS =df.as[Person]

(三)创建和使用DataSet---读取HDFS数据文件

(1)读取HDFS的文件,直接创建DataSet
             val lineDS = spark.read.text("hdfs://bigdata111:9000/input/data.txt").as[String]
(2)分词操作,查询长度大于3的单词
             val words = lineDS.flatMap(_.split(" ")).filter(_.length > 3)
             words.show
             words.collect

关于“SparkSQl中运行原理的示例分析”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,使各位可以学到更多知识,如果觉得文章不错,请把它分享出去让更多的人看到。

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

SparkSQl中运行原理的示例分析

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

SparkSQl中运行原理的示例分析

这篇文章将为大家详细讲解有关SparkSQl中运行原理的示例分析,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。一:什么是SparkSQL?(一)SparkSQL简介Spark SQL是Spark的一个模块
2023-06-20

Flink on yarn运行原理的示例分析

小编给大家分享一下Flink on yarn运行原理的示例分析,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!Flink运行时由两种类型的进程组成:1),JobManager也叫master协调分布式执行。他们调度任务,协调
2023-06-19

JavaScript运行的示例分析

这篇文章给大家分享的是有关JavaScript运行的示例分析的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。1.1 语法分析预编译之前,先通篇扫描看看有没有语法错误1.2 预编译 1.2.1 函数声明整体提升声明函
2023-06-29

java中Builder原理的示例分析

这篇文章主要为大家展示了“java中Builder原理的示例分析”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“java中Builder原理的示例分析”这篇文章吧。首先给一个简单的Builder设
2023-06-22

Java中Lock原理的示例分析

小编给大家分享一下Java中Lock原理的示例分析,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!常用的java框架有哪些1.SpringMVC,Spring We
2023-06-14

java中JVM运行时内存整理的示例分析

这篇文章给大家分享的是有关java中JVM运行时内存整理的示例分析的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。Java可以用来干什么Java主要应用于:1. web开发;2. Android开发;3. 客户端开
2023-06-14

kubernetes中kubelet运行机制的示例分析

这篇文章给大家分享的是有关kubernetes中kubelet运行机制的示例分析的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。一:简介在Kubernetes集群中,每个Node节点上都会启动一个Kubelet服务
2023-06-04

Java线程运行的示例分析

这篇文章将为大家详细讲解有关Java线程运行的示例分析,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。栈与栈帧JVM中由堆、栈、方法区所组成,其中栈内存就是分配给线程使用的,每个线程启动后,虚拟机都会为其分
2023-06-29

kubernetes中网络原理的示例分析

小编给大家分享一下kubernetes中网络原理的示例分析,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!一:体系结构图二:说明1.网络命名空间处于不同命名空间的网
2023-06-04

java中多态原理的示例分析

这篇文章将为大家详细讲解有关java中多态原理的示例分析,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。Java可以用来干什么Java主要应用于:1. web开发;2. Android开发;3. 客户端开发
2023-06-14

Golang中运行与Plan9汇编的示例分析

小编给大家分享一下Golang中运行与Plan9汇编的示例分析,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!Golang的运行环境当我们把编译后的Go代码运行起来
2023-06-16

PHP中CLI命令行运行模式的示例分析

这篇文章将为大家详细讲解有关PHP中CLI命令行运行模式的示例分析,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。PHP的CLI命令行运行模式浅析在做开发的时候,我们不仅仅只是做各种网站或者接口,也经常需要
2023-06-15

concurrenthashmap中size方法原理的示例分析

小编给大家分享一下concurrenthashmap中size方法原理的示例分析,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!concurrenthashmap的size方法原理同上,这也是同一个面试的时候别人问的,我只是
2023-06-29

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录