我的编程空间,编程开发者的网络收藏夹
学习永远不晚

如何解析SparkSQL外部数据源

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

如何解析SparkSQL外部数据源

这期内容当中小编将会给大家带来有关如何解析SparkSQL外部数据源,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。

场景介绍:

    大数据MapReduce,Hive,Spark作业,首先需要加载数据,数据的存放源可能是HDFS、HBase、S3、OSS mongoDB;数据格式也可能为json、text、csv、parquet、jdbc..或者数据格式经过压缩,不同格式文件需要不同的解析方式,

    如果需要HDFS关联MySQL数据,可以通过sqoop进行一些列转换到,如果使用External Data Source API直接加载为DF拿到数据,简单的说可以通过SparkSQL拿到外部数据源数据加载成DF。

加载方式:

build-in :内置加载外部数据如 json、text、parquet、jdbc、HDFS;

third-party:第三方加载外部数据如HBase、S3、OSS mongoDB

    第三方JAR地址:https://spark-packages.org/  

    Maven工程需要导入gav

    spark-shell:需要外部导入--package g:a:v  

    SPARK_HOME/bin/spark-shell --packages com.databricks:spark-csv_2.11:1.5.0

        优势:下载依赖包到本地

缺点:内网环境没有网络无法下载

一、外部数据源读取parquet文件:

Spark context Web UI available at http://hadoop001:4040

Spark context available as 'sc' (master = local[2], app id = local-1536244013147).

Spark session available as 'spark'.

Welcome to

      ____              __

     / __/__  ___ _____/ /__

    _\ \/ _ \/ _ `/ __/  '_/

   /___/ .__/\_,_/_/ /_/\_\   version 2.3.1

      /_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_45)

Type in expressions to have them evaluated.

Type :help for more information.

scala> spark.read.load("file:///home/hadoop/app/spark--bin-2.6.0-cdh6.7.0/examples/class="lazy" data-src/main/resources/people.txt").show

提示错误:/people.txt is not a Parquet file

注意:spark.read.load()底层默认读取Parquet file

scala> spark.read.load("file:///home/hadoop/app/spark--bin-2.6.0-cdh6.7.0/examples/class="lazy" data-src/main/resources/users.parquet").show

18/09/06 10:32:29 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException

+------+--------------+----------------+                                        |  name|favorite_color|favorite_numbers|+------+--------------+----------------+|Alyssa|          null|  [3, 9, 15, 20]||   Ben|           red|              []|+------+--------------+----------------+

scala> val users = spark.read.load("file:///home/hadoop/app/spark--bin-2.6.0-cdh6.7.0/examples/class="lazy" data-src/main/resources/users.parquet")

users: org.apache.spark.sql.DataFrame = [name: string, favorite_color: string ... 1 more field]

scala> users.printSchema

root

 |-- name: string (nullable = true)

 |-- favorite_color: string (nullable = true)

 |-- favorite_numbers: array (nullable = true)

 |    |-- element: integer (containsNull = true)

scala> users.show

+------+--------------+----------------+|  name|favorite_color|favorite_numbers|+------+--------------+----------------+|Alyssa|          null|  [3, 9, 15, 20]||   Ben|           red|              []|+------+--------------+----------------+

-- 查看列,常规操作

scala> users.select("name").show

+------+|  name|+------+|Alyssa||   Ben|+------+

二、转换操作

-- 转成json格式输出

scala> users.select("name","favorite_color").write.format("json").save("file:////home/hadoop/data/parquet/")

[hadoop@hadoop001 parquet]$ cat *{"name":"Alyssa"}{"name":"Ben","favorite_color":"red"}

-- 不采取压缩

.option("compression","none")  

-- 转成text格式输出

scala> users.select("name").write.format("text").save("file:////home/hadoop/data/parquet2/")

[hadoop@hadoop001 parquet2]$ cat *

Alyssa

-- Save Modes

用法:.mode("")

1、default  -- 目标目录存在,抛出异常

2、append   -- 目标目录存在,重跑数据+1,无法保证数据幂等

3、overwrite-- 目标目录存在,覆盖原文件

4、ignore-- 忽略你的模式,目标纯在将不保存

三、spark-shell操作JDBC数据

-- 读取外部MySQL数据为DF

val jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://hadoop001:3306/ruozedata").option("driver","com.mysql.jdbc.Driver").option("dbtable", "city_info").option("user","root").option("password", "root").load()

-- 查看表信息

jdbcDF.show()

-- 获取本地数据 

val deptDF = spark.table("dept") 

-- join关联使用

deptDF.join(jdbcDF,deptDF.col("deptid") === jdbcDF.col("deptid"))

-- DF写入MySQL本地,数据类型有变化,重复写入需要加上.mode("overwrite")

jdbcDF.write.format("jdbc").option("url", "jdbc:mysql://hadoop001:3306/hive_data").option("driver","com.mysql.jdbc.Driver").option("dbtable", "city_info_bak").option("user","root").option("password", "root").save()

mysql> show tables

+---------------------------+| Tables_in_hive_data       |+---------------------------+| bucketing_cols            || cds                       || city_info_bak             |+---------------------------+

-- 如果想类型不发生变化指定option指定字段类型

.option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")

四、spark-sql操作JDBC数据

-- SQL创建临时表视图,单session

CREATE TEMPORARY VIEW emp_sqlUSING org.apache.spark.sql.jdbcOPTIONS (  url "jdbc:mysql://hadoop001:3306/ruozedata",  dbtable "city_info",  user 'root',  password 'root')

show tbales;

INSERT INTO TABLE emp_sql

SELECT * FROM emp_sql

五、Perdicate Push Down(PPD)

               disk         network                  CPU

外部数据外(1T)------->获取本地磁盘(1T)---------->提交到集群(1T)--------->结果(1G)

               disk        network                   CPU

外部数据外(1T)------->经过列裁剪(10G)----------->提交到集群(10G)----------->传结果(1g)

               disk          CPU                 network

外部数据外(1T)------->经过列裁剪(10G)---------->进过计算(1G)----------->传输结果

六、SparkSQL外部数据源实现机制

-- 0.有效的读取外部数据源的数据的

-- 1.buildScan扫描整张表,变成一个RDD[ROW]

trait TableScan {

def buildScan(): RDD[Row]  

}

-- 2.PrunedScan获取表的裁剪列

trait PrunedScan {

def buildScan(requiredColumns: Array[String]): RDD[Row] 

-- 3.PrunedFilteredScan列裁剪,行过滤

trait PrunedFilteredScan {

def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row]

-- 4.加载外部数据源的数据,定义数据的schema信息

abstract class BaseRelation{

-- 5.Relation产生BaseRelation使用

trait RelationProvider

}

总归:

-- 查询类操作

trait PrunedScan {

  def buildScan(requiredColumns: Array[String]): RDD[Row]

}  

-- 列裁剪,行过滤

trait PrunedFilteredScan {

  def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row]

}  

-- 写入类操作

trait InsertableRelation {

  def insert(data: DataFrame, overwrite: Boolean): Unit

}

上述就是小编为大家分享的如何解析SparkSQL外部数据源了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注编程网行业资讯频道。

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

如何解析SparkSQL外部数据源

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

如何解析SparkSQL外部数据源

这期内容当中小编将会给大家带来有关如何解析SparkSQL外部数据源,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。场景介绍: 大数据MapReduce,Hive,Spark作业,首先需要加载数据,数
2023-06-02

PostgreSQL中的外部表和外部数据源如何使用

在PostgreSQL中,外部表和外部数据源可以通过使用外部数据包装器(Foreign Data Wrapper)来实现。外部数据包装器是一个用于访问外部数据源的插件,它可以让用户在数据库中创建外部表,以便直接查询外部数据源中的数据。要使
PostgreSQL中的外部表和外部数据源如何使用
2024-03-14

如何在C++中集成外部数据源以丰富分析过程?

c++++ 中集成外部数据源可大幅拓展数据分析能力。步驟包括:選擇與目標數據源兼容的連接器,根據數據源要求建立連接,並使用 sql 進行查詢。一個使用 odbc 連接器連接 mysql 的實例顯示如何提取數據結果。集成外部數據源可豐富分析過
如何在C++中集成外部数据源以丰富分析过程?
2024-05-15

vue如何弄外部数据

本篇内容主要讲解“vue如何弄外部数据”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“vue如何弄外部数据”吧!Vue提供了两种主要方式来处理外部数据:Prop和Vuex。1. PropProp是
2023-07-06

docker连不上外部数据库如何解决

如果无法连接外部数据库,可能是由于以下几个原因:1. 网络问题:确保您的网络连接正常,尝试使用ping命令测试是否能够与外部数据库服务器通信。2. 防火墙问题:检查您的防火墙设置,确保允许从Docker容器中访问外部数据库服务器的端口。3.
2023-10-09

详解Spark Sql在UDF中如何引用外部数据

这篇文章主要为大家介绍了详解Spark Sql在UDF中如何引用外部数据示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
2023-02-01

Rancher Server单容器如何部署使用外部数据库

这期内容当中小编将会给大家带来有关Rancher Server单容器如何部署使用外部数据库,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。Rancher 除了使用内部的数据库,你可以启动一个Rancher
2023-06-19

ajax返回值给外部函数如何解决

本篇文章给大家分享的是有关ajax返回值给外部函数如何解决,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。如下所示:function validateUser(mya){ var
2023-06-08

如何解决PHP开发中的外部资源调用和访问

在PHP开发中,我们经常会遇到需要调用和访问外部资源的情况,比如其他网站的API接口、数据库、文件系统等。正确处理和管理这些外部资源的调用和访问是保证开发效率和系统稳定性的关键。本文将分享几种常见的解决方案,并提供具体的代码示例。使用CUR
2023-10-21

如何解决PHP开发中的外部资源访问和调用

在PHP开发中,我们经常会遇到需要访问和调用外部资源的情况,比如API接口、第三方库或者其他服务器资源。在处理这些外部资源时,我们需要考虑如何进行安全的访问和调用,同时保证性能和可靠性。本文将介绍几种常见的解决方案,并提供相应的代码示例。一
2023-10-21

如何进行O2OA的外部数据库修改配置

本篇文章给大家分享的是有关如何进行O2OA的外部数据库修改配置,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。一、数据库准备工作 - Mysql数据库安装下载和安装数据库的知识并
2023-06-04

java如何解析json数据

Java可以使用Json库来解析JSON数据。以下是使用Jackson库来解析JSON数据的示例代码:1. 导入Jackson库的依赖:```xmlcom.fasterxml.jackson.corejackson-databind2.12
2023-08-24

PHP如何解析JSON数据

本文小编为大家详细介绍“PHP如何解析JSON数据”,内容详细,步骤清晰,细节处理妥当,希望这篇“PHP如何解析JSON数据”文章能帮助大家解决疑惑,下面跟着小编的思路慢慢深入,一起来学习新知识吧。什么是JSON?JSON是一种标准的轻量级
2023-06-30

esp8266如何解析json数据

这篇文章主要介绍了esp8266如何解析json数据的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇esp8266如何解析json数据文章都会有所收获,下面我们一起来看看吧。#include
2023-06-29

weblogic数据源不存在如何解决

如果WebLogic数据源不存在,您可以尝试以下解决方法:1. 检查数据源配置:在WebLogic控制台中,确保您的数据源已正确配置。检查用户名、密码、URL等配置是否正确。2. 重新启动服务器:有时,重新启动WebLogic服务器可以解决
2023-10-08

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录