spark2.4.3中sparkSQL用户自定义函数该怎么理解
这期内容当中小编将会给大家带来有关spark2.4.3中sparkSQL用户自定义函数该怎么理解,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。
1、简介
从Spark2.0以上的版本开始,spark是使用全新的SparkSession接口代替Spark1.6中的SQLcontext和HiveContext
来实现对数据的加载、转换、处理等工作,并且实现了SQLcontext和HiveContext的所有功能。
我们在新版本中并不需要之前那么繁琐的创建很多对象,只需要创建一个SparkSession对象即可。
SparkSession支持从不同的数据源加载数据,并把数据转换成DataFrame,并支持把DataFrame转换成SQLContext自身中的表。
然后使用SQL语句来操作数据,也提供了HiveQL以及其他依赖于Hive的功能支持。
创建SparkSession
SparkSession 是 Spark SQL 的入口。
使用 Dataset 或者 Datafram 编写 Spark SQL 应用的时候,第一个要创建的对象就是 SparkSession。
Builder 是 SparkSession 的构造器。 通过 Builder, 可以添加各种配置。
Builder 的方法如下:
Method | Description |
---|---|
getOrCreate | 获取或者新建一个 sparkSession |
enableHiveSupport | 增加支持 hive Support |
appName | 设置 application 的名字 |
config | 设置各种配置 |
2、sparkSQL基本使用方法
使用的spark版本2.4.3
spark 1.x中的SQLContext在新版本中已经被废弃,改为SparkSession.builder
可以写成
val conf = new SparkConf().setAppName("helloworld").setMaster("local[*]")val spark1=SparkSession.builder().config(conf).getOrCreate()
或(sparksession构造器私有化在builder中)
val spark = SparkSession.builder .appName("my spark application") .master("local[2]") .getOrCreate()
例:
import org.apache.spark.sql.SparkSessionobject HelloWorld { def main(args: Array[String]): Unit = { val spark = SparkSession.builder .appName("my spark application") .master("local[2]") .getOrCreate() //读取数据 val df = spark.read.json("/usr/local/opt/spark-2.4.3/examples/class="lazy" data-src/main/resources/people.json") //展示所有数据 df.show() //DSL df.select("name").show() //SQL df.createTempView("people") spark.sql("select * from people where age=30").show() //关闭 spark.close() }}
输出结果 1:
//展示所有数据 df.show()
输出结果 2:
//DSL df.select("name").show()
输出结果 3:
//SQL df.createTempView("people") spark.sql("select * from people where age=30").show()
3、通过udf自定义用户函数addName (实现将字段x前拼接上name:x)
scala> spark.read.json("./examples/class="lazy" data-src/main/resources/people.json")res32: org.apache.spark.sql.DataFrame = [age: bigint, name: string]scala> res32.createOrReplaceTempView("people")scala> spark.sql("select * from people")res38: org.apache.spark.sql.DataFrame = [age: bigint, name: string]scala> spark.sql("select * from people").show+----+-------+| age| name|+----+-------+|null|Michael|| 30| Andy|| 19| Justin|+----+-------+scala> spark.udf.register("addName",(x:String)=> "name:"+x)res40: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))scala> spark.sql("select addName(name) as name from people").show+------------+| name|+------------+|name:Michael|| name:Andy|| name:Justin|+------------+
4、通过udaf自定义用户函数
package com.ny.serviceimport org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}import org.apache.spark.sql.types._import org.apache.spark.sql.{Row, SparkSession}class CustomerAvg extends UserDefinedAggregateFunction { //输入的类型 override def inputSchema: StructType = StructType(StructField("salary", LongType) :: Nil) //缓存数据的类型 override def bufferSchema: StructType = { StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil) } //返回值类型 override def dataType: DataType = LongType //幂等性 override def deterministic: Boolean = true //初始化 override def initialize(buffer: MutableAggregationBuffer): Unit = { buffer(0) = 0L buffer(1) = 0L }//更新 分区内操作 override def update(buffer: MutableAggregationBuffer, input: Row): Unit = { buffer(0)=buffer.getLong(0) +input.getLong(0) buffer(1)=buffer.getLong(1)+1L }//合并 分区与分区之间操作 override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { buffer1(0)=buffer1.getLong(0)+buffer2.getLong(0) buffer1(1)=buffer1.getLong(1)+buffer2.getLong(1) } //最终执行的方法 override def evaluate(buffer: Row): Any = { buffer.getLong(0)/buffer.getLong(1) }}object CustomerAvg{ def main(args: Array[String]): Unit = { val spark= SparkSession.builder() .appName("MyAvg") .master("local[2]") .getOrCreate() spark.udf.register("MyAvg",new CustomerAvg)//读数据 val frame = spark.read.json("/usr/local/opt/spark-2.4.3/examples/class="lazy" data-src/main/resources/peopleCP.json") frame.createTempView("peopleCP") spark.sql("select MyAvg(age) avg_age from peopleCP").show() spark.stop() }}
nancylulululu:resources nancy$ vi peopleCP.json {"name":"Michael","age":11}{"name":"Andy", "age":30}{"name":"Justin", "age":19}
返回结果
上述就是小编为大家分享的spark2.4.3中sparkSQL用户自定义函数该怎么理解了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注编程网行业资讯频道。
免责声明:
① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。
② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341