Spark SQL数据加载和保存的实例分析
今天就跟大家聊聊有关Spark SQL数据加载和保存的实例分析,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。
一、前置知识详解 Spark SQL重要是操作DataFrame,DataFrame本身提供了save和load的操作, Load:可以创建DataFrame, Save:把DataFrame中的数据保存到文件或者说与具体的格式来指明我们要读取的文件的类型以及与具体的格式来指出我们要输出的文件是什么类型。
二、Spark SQL读写数据代码实战
import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.Function;import org.apache.spark.sql.*;import org.apache.spark.sql.types.DataTypes;import org.apache.spark.sql.types.StructField;import org.apache.spark.sql.types.StructType;import java.util.ArrayList;import java.util.List;public class SparkSQLLoadSaveOps { public static void main(String[] args) { SparkConf conf = new SparkConf().setMaster("local").setAppName("SparkSQLLoadSaveOps"); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext = new SQLContext(sc); DataFrame peopleDF = sqlContext.read().format("json").load("E:\\Spark\\Sparkinstanll_package\\Big_Data_Software\\spark-1.6.0-bin-hadoop2.6\\examples\\class="lazy" data-src\\main\\resources\\people.json"); //通过mode来指定输出文件的是append。创建新文件来追加文件 peopleDF.select("name").write().mode(SaveMode.Append).save("E:\\personNames"); }}
读取过程源码分析如下: 1. read方法返回DataFrameReader,用于读取数据。
@Experimental//创建DataFrameReader实例,获得了DataFrameReader引用def read: DataFrameReader = new DataFrameReader(this)
2. 然后再调用DataFrameReader类中的format,指出读取文件的格式。
def format(source: String): DataFrameReader = { this.source = source this}
3. 通过DtaFrameReader中load方法通过路径把传入过来的输入变成DataFrame。
// TODO: Remove this one in Spark 2.0.def load(path: String): DataFrame = { option("path", path).load()}
至此,数据的读取工作就完成了,下面就对DataFrame进行操作。 下面就是写操作!!!
1. 调用DataFrame中select函数进行对列筛选
@scala.annotation.varargsdef select(col: String, cols: String*): DataFrame = select((col +: cols).map(Column(_)) : _*)
2. 然后通过write将结果写入到外部存储系统中。
@Experimentaldef write: DataFrameWriter = new DataFrameWriter(this)
3. 在保持文件的时候mode指定追加文件的方式
def mode(saveMode: SaveMode): DataFrameWriter = { this.mode = saveMode this}
4. 最后,save()方法触发action,将文件输出到指定文件中。
def save(path: String): Unit = { this.extraOptions += ("path" -> path) save()}
三、Spark SQL读写整个流程图如下
四、对于流程中部分函数源码详解
DataFrameReader.Load()
1. Load()返回DataFrame类型的数据集合,使用的数据是从默认的路径读取。
@deprecated("Use read.load(path). This will be removed in Spark 2.0.", "1.4.0")def load(path: String): DataFrame = {//此时的read就是DataFrameReader read.load(path)}
2. 追踪load源码进去,源码如下:在DataFrameReader中的方法。Load()通过路径把输入传进来变成一个DataFrame。
// TODO: Remove this one in Spark 2.0.def load(path: String): DataFrame = { option("path", path).load()}
3. 追踪load源码如下:
def load(): DataFrame = {//对传入的Source进行解析 val resolved = ResolvedDataSource( sqlContext, userSpecifiedSchema = userSpecifiedSchema, partitionColumns = Array.empty[String], provider = source, options = extraOptions.toMap) DataFrame(sqlContext, LogicalRelation(resolved.relation))}
DataFrameReader.format()
1. Format:具体指定文件格式,这就获得一个巨大的启示是:如果是Json文件格式可以保持为Parquet等此类操作。 Spark SQL在读取文件的时候可以指定读取文件的类型。例如,Json,Parquet.
def format(source: String): DataFrameReader = { this.source = source //FileType this}
DataFrame.write()
1. 创建DataFrameWriter实例
@Experimentaldef write: DataFrameWriter = new DataFrameWriter(this)1
2. 追踪DataFrameWriter源码如下:以DataFrame的方式向外部存储系统中写入数据。
@Experimentalfinal class DataFrameWriter private[sql](df: DataFrame) {
DataFrameWriter.mode()
1. Overwrite是覆盖,之前写的数据全都被覆盖了。 Append:是追加,对于普通文件是在一个文件中进行追加,但是对于parquet格式的文件则创建新的文件进行追加。
def mode(saveMode: SaveMode): DataFrameWriter = { this.mode = saveMode this}
2. 通过模式匹配接收外部参数
def mode(saveMode: String): DataFrameWriter = { this.mode = saveMode.toLowerCase match { case "overwrite" => SaveMode.Overwrite case "append" => SaveMode.Append case "ignore" => SaveMode.Ignore case "error" | "default" => SaveMode.ErrorIfExists case _ => throw new IllegalArgumentException(s"Unknown save mode: $saveMode. " + "Accepted modes are 'overwrite', 'append', 'ignore', 'error'.") } this}
DataFrameWriter.save()
1. save将结果保存传入的路径。
def save(path: String): Unit = { this.extraOptions += ("path" -> path) save()}
2. 追踪save方法。
def save(): Unit = { ResolvedDataSource( df.sqlContext, source, partitioningColumns.map(_.toArray).getOrElse(Array.empty[String]), mode, extraOptions.toMap, df)}
3. 其中source是SQLConf的defaultDataSourceNameprivate var source: String = df.sqlContext.conf.defaultDataSourceName其中DEFAULT_DATA_SOURCE_NAME默认参数是parquet。
// This is used to set the default data sourceval DEFAULT_DATA_SOURCE_NAME = stringConf("spark.sql.sources.default", defaultValue = Some("org.apache.spark.sql.parquet"), doc = "The default data source to use in input/output.")
DataFrame.scala中部分函数详解:
1. toDF函数是将RDD转换成DataFrame
// This is declared with parentheses to prevent the Scala compiler from treating// `rdd.toDF("1")` as invoking this toDF and then apply on the returned DataFrame.def toDF(): DataFrame = this
2. show()方法:将结果显示出来
// scalastyle:off printlndef show(numRows: Int, truncate: Boolean): Unit = println(showString(numRows, truncate))// scalastyle:on println
追踪showString源码如下:showString中触发action收集数据。
private[sql] def showString(_numRows: Int, truncate: Boolean = true): String = { val numRows = _numRows.max(0) val sb = new StringBuilder val takeResult = take(numRows + 1) val hasMoreData = takeResult.length > numRows val data = takeResult.take(numRows) val numCols = schema.fieldNames.length
看完上述内容,你们对Spark SQL数据加载和保存的实例分析有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注亿速云行业资讯频道,感谢大家的支持。
免责声明:
① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。
② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341