我的编程空间,编程开发者的网络收藏夹
学习永远不晚

PySpark数据分析基础:PySpark基础功能及DataFrame操作基础语法详解

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

PySpark数据分析基础:PySpark基础功能及DataFrame操作基础语法详解

目录

前言

一、PySpark基础功能

 1.Spark SQL 和DataFrame

2.Pandas API on Spark

3.Streaming

4.MLBase/MLlib

5.Spark Core

二、PySpark依赖

Dependencies

三、DataFrame

1.创建

创建不输入schema格式的DataFrame

创建带有schema的DataFrame

从Pandas DataFrame创建

通过由元组列表组成的RDD创建

2.查看

DataFrame.show()

spark.sql.repl.eagerEval.enabled

纵向显示

 查看DataFrame格式和列名

查看统计描述信息

PySpark DataFrame转换为Pandas DataFrame

 3.查询

添加新列实例:

条件查询DataFrame.filter()

 4.运算

Pandas_udf

DataFrame.mapInPandas

5.分组

 联合分组和应用函数

 6.获取数据输入/输出

CSV

 Parquet

 ORC

 四、结合Spark SQL

点关注,防走丢,如有纰漏之处,请留言指教,非常感谢


前言

要想了解PySpark能够干什么可以去看看我之前写的文章,里面很详细介绍了Spark的生态:

Spark框架深度理解一:开发缘由及优缺点

Spark框架深度理解二:生态圈

Spark框架深度理解三:运行架构、核心数据集RDD

PySpark只是通过JVM转换使得Python代码能够在Spark集群上识别运行。故Spark的绝大多数功能都可以被Python程序使用。

上篇文章:一文速学-PySpark数据分析基础:PySpark原理详解

已经把PySpark运行原理讲的很清楚了,现在我们需要了解PySpark语法基础来逐渐编写PySpark程序实现分布式数据计算。

已搭建环境:

Spark:3.3.0

Hadoop:3.3.3

Scala:2.11.12

JDK:1.8.0_201

PySpark:3.1.2


一、PySpark基础功能

PySpark是Python中Apache Spark的接口。它不仅可以使用Python API编写Spark应用程序,还提供了PySpark shell,用于在分布式环境中交互分析数据。PySpark支持Spark的大多数功能,如Spark SQL、DataFrame、Streaming、MLlib(机器学习)和Spark Core。

 1.Spark SQL 和DataFrame

Spark SQL是用于结构化数据处理的Spark模块。它提供了一种称为DataFrame的编程抽象,是由SchemaRDD发展而来。不同于SchemaRDD直接继承RDD,DataFrame自己实现了RDD的绝大多数功能。可以把Spark SQL DataFrame理解为一个分布式的Row对象的数据集合。

Spark SQL已经集成在spark-shell中,因此只要启动spark-shell就可以使用Spark SQL的Shell交互接口。如果在spark-shell中执行SQL语句,需要使用SQLContext对象来调用sql()方法。Spark SQL对数据的查询分成了两个分支:SQLContext和HiveContext,其中HiveContext继承了SQLContext,因此HiveContext除了拥有SQLContext的特性之外还拥有自身的特性。

Spark SQL允许开发人员直接处理RDD,同时也可查询例如在 Apache Hive上存在的外部数据。Spark SQL的一个重要特点是其能够统一处理关系表和RDD,使得开发人员可以轻松地使用SQL命令进行外部查询,同时进行更复杂的数据分析。

2.Pandas API on Spark

Spark上的pandas API可以扩展使用 python pandas库。

  • 轻松切换到pandas API和PySpark API上下文,无需任何开销。
  • 有一个既适用于pandas(测试,较小的数据集)又适用于Spark(分布式数据集)的代码库。
  • 熟练使用pandas的话很快上手

3.Streaming

Apache Spark中的Streaming功能运行在Spark之上,支持跨Streaming和历史数据的强大交互和分析应用程序,同时继承了Spark的易用性和容错特性。Spark Streaming是将流式计算分解成一系列短小的批处理作业。这里的批处理引擎是Spark Core,也就是把Spark Streaming的输入数据按照batch size(如1秒)分成一段一段的数据(Discretized Stream),每一段数据都转换成Spark中的RDD(Resilient Distributed Dataset),然后将Spark Streaming中对DStream的Transformation操作变为针对Spark中对RDD的Transformation操作,将RDD经过操作变成中间结果保存在内存中。

4.MLBase/MLlib

MLlib构建在Spark之上,是一个可扩展的机器学习库,它提供了一组统一的高级API,帮助用户创建和调整实用的机器学习管道。MLBase分为四部分:MLlib、MLI、ML Optimizer和MLRuntime。

  • ML Optimizer会选择它认为最适合的已经在内部实现好了的机器学习算法和相关参数,来处理用户输入的数据,并返回模型或别的帮助分析的结果;
  • MLI 是一个进行特征抽取和高级ML编程抽象的算法实现的API或平台;
  • MLlib是Spark实现一些常见的机器学习算法和实用程序,包括分类、回归、聚类、协同过滤、降维以及底层优化,该算法可以进行可扩充; MLRuntime 基于Spark计算框架,将Spark的分布式计算应用到机器学习领域。
     

5.Spark Core

Spark Core是Spark平台的底层通用执行引擎,所有其他功能都构建在其之上。它提供了RDD(弹性分布式数据集)和内存计算能力。

二、PySpark依赖

Dependencies

Package最低版本限制Note
pandas1.0.5支撑Spark SQL
Numpy1.7满足支撑MLlib基础API
pyarrow1.0.0支撑Spark SQL
Py4j0.10.9.5要求
pandas1.0.5pandas API on Spark需要
pyarrow1.0.0pandas API on Spark需要
Numpy1.14pandas API on Spark需要

请注意,PySpark需要Java 8或更高版本,并正确设置Java_HOME。如果使用JDK 11,请设置Dio.netty.tryReflectionSetAccessible=true 以获取与箭头相关的功能。

AArch64(ARM64)用户注意:PyArrow是PySpark SQL所必需的,但PyArrow 4.0.0中引入了对AArch64的PyArrow支持。如果由于PyArrow安装错误导致PyArrow安装在AArch64上失败,可以按如下方式安装PyArrow>=4.0.0:

pip install "pyarrow>=4.0.0" --prefer-binary

三、DataFrame

PySpark应用程序从初始化SparkSession开始,SparkSession是PySpark的入口点,如下所示。如果通过PySpark可执行文件在PySpark shell中运行它,shell会自动在变量spark中为用户创建会话。

from pyspark.sql import SparkSessionspark = SparkSession.builder.getOrCreate()

1.创建

PySpark DataFrame能够通过pyspark.sql.SparkSession.createDataFrame创建,通常通过传递列表(list)、元组(tuples)和字典(dictionaries)的列表和pyspark.sql.Rows,Pandas DataFrame,由此类列表组成的RDD转换。pyspark.sql.SparkSession.createDataFrame接收schema参数指定DataFrame的架构(优化可加速)。省略时,PySpark通过从数据中提取样本来推断相应的模式。

创建不输入schema格式的DataFrame

from datetime import datetime, dateimport pandas as pdfrom pyspark.sql import Rowdf = spark.createDataFrame([    Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),    Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),    Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))])df
DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

创建带有schema的DataFrame

df = spark.createDataFrame([    (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),    (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),    (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))], schema='a long, b double, c string, d date, e timestamp')df
DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

从Pandas DataFrame创建

pandas_df = pd.DataFrame({    'a': [1, 2, 3],    'b': [2., 3., 4.],    'c': ['string1', 'string2', 'string3'],    'd': [date(2000, 1, 1), date(2000, 2, 1), date(2000, 3, 1)],    'e': [datetime(2000, 1, 1, 12, 0), datetime(2000, 1, 2, 12, 0), datetime(2000, 1, 3, 12, 0)]})df = spark.createDataFrame(pandas_df)df
DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

通过由元组列表组成的RDD创建

rdd = spark.sparkContext.parallelize([    (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),    (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),    (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))])df = spark.createDataFrame(rdd, schema=['a', 'b', 'c', 'd', 'e'])df
DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

 以上的DataFrame格式创建的都是一样的。

df.printSchema()
root |-- a: long (nullable = true) |-- b: double (nullable = true) |-- c: string (nullable = true) |-- d: date (nullable = true) |-- e: timestamp (nullable = true)

2.查看

DataFrame.show()

使用格式:

df.show()
df.show(1)
+---+---+-------+----------+-------------------+|  a|  b|      c|         d|                  e|+---+---+-------+----------+-------------------+|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|+---+---+-------+----------+-------------------+only showing top 1 row

spark.sql.repl.eagerEval.enabled

spark.sql.repl.eagerEval.enabled用于在notebooks(如Jupyter)中快速生成PySpark DataFrame的配置。控制行数可以使用spark.sql.repl.eagerEval.maxNumRows。

spark.conf.set('spark.sql.repl.eagerEval.enabled', True)df

 

spark.conf.set('spark.sql.repl.eagerEval.maxNumRows',1)df

 

纵向显示

行也可以垂直显示。当行太长而无法水平显示时,纵向显示就很明显。

df.show(1, vertical=True)
-RECORD 0------------------ a   | 1 b   | 2.0 c   | string1 d   | 2000-01-01 e   | 2000-01-01 12:00:00only showing top 1 row

 查看DataFrame格式和列名

df.columns
['a', 'b', 'c', 'd', 'e']
df.printSchema()
root |-- a: long (nullable = true) |-- b: double (nullable = true) |-- c: string (nullable = true) |-- d: date (nullable = true) |-- e: timestamp (nullable = true)

查看统计描述信息

df.select("a", "b", "c").describe().show()
+-------+---+---+-------+|summary|  a|  b|      c|+-------+---+---+-------+|  count|  3|  3|      3||   mean|2.0|3.0|   null|| stddev|1.0|1.0|   null||    min|  1|2.0|string1||    max|  3|4.0|string3|+-------+---+---+-------+

DataFrame.collect()将分布式数据收集到驱动程序端,作为Python中的本地数据。请注意,当数据集太大而无法容纳在驱动端时,这可能会引发内存不足错误,因为它将所有数据从执行器收集到驱动端。

df.collect()
[Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1), e=datetime.datetime(2000, 1, 1, 12, 0)), Row(a=2, b=3.0, c='string2', d=datetime.date(2000, 2, 1), e=datetime.datetime(2000, 1, 2, 12, 0)), Row(a=3, b=4.0, c='string3', d=datetime.date(2000, 3, 1), e=datetime.datetime(2000, 1, 3, 12, 0))]

 为了避免引发内存不足异常可以使用DataFrame.take()或者是DataFrame.tail():

df.take(1)
[Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1), e=datetime.datetime(2000, 1, 1, 12, 0))]
df.tail(1)
[Row(a=3, b=4.0, c='string3', d=datetime.date(2000, 3, 1), e=datetime.datetime(2000, 1, 3, 12, 0))]

PySpark DataFrame转换为Pandas DataFrame

 PySpark DataFrame还提供了到pandas DataFrame的转换,以利用pandas API。注意,toPandas还将所有数据收集到driver端,当数据太大而无法放入driver端时,很容易导致内存不足错误。

df.toPandas()

 

 3.查询

PySpark DataFrame是惰性计算的,仅选择一列不会触发计算,但它会返回一个列实例:

df.a
Column<'a'>

大多数按列操作都返回列:

from pyspark.sql import Columnfrom pyspark.sql.functions import uppertype(df.c) == type(upper(df.c)) == type(df.c.isNull())
True

上述生成的Column可用于从DataFrame中选择列。例如,DataFrame.select()获取返回另一个DataFrame的列实例:

df.select(df.c).show()
+-------+|      c|+-------+|string1||string2||string3|+-------+

添加新列实例:

df.withColumn('upper_c', upper(df.c)).show()
+---+---+-------+----------+-------------------+-------+|  a|  b|      c|         d|                  e|upper_c|+---+---+-------+----------+-------------------+-------+|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|STRING1||  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|STRING2||  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|STRING3|+---+---+-------+----------+-------------------+-------+

条件查询DataFrame.filter()

df.filter(df.a == 1).show()
+---+---+-------+----------+-------------------+|  a|  b|      c|         d|                  e|+---+---+-------+----------+-------------------+|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|+---+---+-------+----------+-------------------+

 4.运算

Pandas_udf

PySpark支持各种UDF和API,允许用户执行Python本机函数。另请参阅最新的Pandas UDF( Pandas UDFs)和Pandas Function API( Pandas Function APIs)。例如,下面的示例允许用户在Python本机函数中直接使用pandas Series中的API。

Apache Arrow in PySpark

import pandas as pdfrom pyspark.sql.functions import pandas_udf@pandas_udf('long')def pandas_plus_one(series: pd.Series) -> pd.Series:    # Simply plus one by using pandas Series.    return series + 1df.select(pandas_plus_one(df.a)).show()
+------------------+|pandas_plus_one(a)|+------------------+|                 2||                 3||                 4|+------------------+

DataFrame.mapInPandas

DataFrame.mapInPandas允许用户在pandas DataFrame中直接使用API,而不受结果长度等任何限制。

def pandas_filter_func(iterator):    for pandas_df in iterator:        yield pandas_df[pandas_df.a == 1]df.mapInPandas(pandas_filter_func, schema=df.schema).show()
+---+---+-------+----------+-------------------+|  a|  b|      c|         d|                  e|+---+---+-------+----------+-------------------+|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|+---+---+-------+----------+-------------------+

5.分组

PySpark DataFrame还提供了一种使用常见方法,即拆分-应用-合并策略来处理分组数据的方法。它根据特定条件对数据进行分组,对每个组应用一个函数,然后将它们组合回DataFrame。

df = spark.createDataFrame([    ['red', 'banana', 1, 10], ['blue', 'banana', 2, 20], ['red', 'carrot', 3, 30],    ['blue', 'grape', 4, 40], ['red', 'carrot', 5, 50], ['black', 'carrot', 6, 60],    ['red', 'banana', 7, 70], ['red', 'grape', 8, 80]], schema=['color', 'fruit', 'v1', 'v2'])df.show()
+-----+------+---+---+|color| fruit| v1| v2|+-----+------+---+---+|  red|banana|  1| 10|| blue|banana|  2| 20||  red|carrot|  3| 30|| blue| grape|  4| 40||  red|carrot|  5| 50||black|carrot|  6| 60||  red|banana|  7| 70||  red| grape|  8| 80|+-----+------+---+---+

 分组,然后将avg()函数应用于结果组。

df.groupby('color').avg().show()
+-----+-------+-------+|color|avg(v1)|avg(v2)|+-----+-------+-------+|  red|    4.8|   48.0|| blue|    3.0|   30.0||black|    6.0|   60.0|+-----+-------+-------+

还可以使用pandas API对每个组应用Python自定义函数。

def plus_mean(pandas_df):    return pandas_df.assign(v1=pandas_df.v1 - pandas_df.v1.mean())df.groupby('color').applyInPandas(plus_mean, schema=df.schema).show()
+-----+------+---+---+|color| fruit| v1| v2|+-----+------+---+---+|black|carrot|  0| 60|| blue|banana| -1| 20|| blue| grape|  1| 40||  red|banana| -3| 10||  red|carrot| -1| 30||  red|carrot|  0| 50||  red|banana|  2| 70||  red| grape|  3| 80|+-----+------+---+---+

 联合分组和应用函数

df1 = spark.createDataFrame(    [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],    ('time', 'id', 'v1'))df2 = spark.createDataFrame(    [(20000101, 1, 'x'), (20000101, 2, 'y')],    ('time', 'id', 'v2'))def asof_join(l, r):    return pd.merge_asof(l, r, on='time', by='id')df1.groupby('id').cogroup(df2.groupby('id')).applyInPandas(    asof_join, schema='time int, id int, v1 double, v2 string').show()
+--------+---+---+---+|    time| id| v1| v2|+--------+---+---+---+|20000101|  1|1.0|  x||20000102|  1|3.0|  x||20000101|  2|2.0|  y||20000102|  2|4.0|  y|+--------+---+---+---+

 6.获取数据输入/输出

CSV简单易用。Parquet和ORC是高效紧凑的文件格式,读写速度更快。

PySpark中还有许多其他可用的数据源,如JDBC、text、binaryFile、Avro等。另请参阅Apache Spark文档中最新的Spark SQL、DataFrames和Datasets指南。Spark SQL, DataFrames and Datasets Guide

CSV

df.write.csv('foo.csv', header=True)spark.read.csv('foo.csv', header=True).show()

这里记录一个报错:

java.lang.UnsatisfiedLinkError:org.apache.hadoop.io.nativeio.NativeIO$Windows.access0

 将Hadoop安装目录下的 bin 文件夹中的 hadoop.dll 和 winutils.exe 这两个文件拷贝到 C:\Windows\System32 下,问题解决。

+---+---+-------+----------+--------------------+|  a|  b|      c|         d|                   e|+---+---+-------+----------+--------------------+|  1|2.0|string1|2000-01-01|2000-01-01T12:00:...||  2|3.0|string2|2000-02-01|2000-01-02T12:00:...||  3|4.0|string3|2000-03-01|2000-01-03T12:00:...|+---+---+-------+----------+--------------------+

 Parquet

df.write.parquet('bar.parquet')spark.read.parquet('bar.parquet').show()
+-----+------+---+---+|color| fruit| v1| v2|+-----+------+---+---+|black|carrot|  6| 60|| blue|banana|  2| 20|| blue| grape|  4| 40||  red|carrot|  5| 50||  red|banana|  7| 70||  red|banana|  1| 10||  red|carrot|  3| 30||  red| grape|  8| 80|+-----+------+---+---+

 ORC

df.write.orc('zoo.orc')spark.read.orc('zoo.orc').show()
+-----+------+---+---+|color| fruit| v1| v2|+-----+------+---+---+|  red|banana|  7| 70||  red| grape|  8| 80||black|carrot|  6| 60|| blue|banana|  2| 20||  red|banana|  1| 10||  red|carrot|  5| 50||  red|carrot|  3| 30|| blue| grape|  4| 40|+-----+------+---+---+

 四、结合Spark SQL

DataFrame和Spark SQL共享同一个执行引擎,因此可以无缝地互换使用。例如,可以将数据帧注册为表,并按如下方式轻松运行SQL:

df.createOrReplaceTempView("tableA")spark.sql("SELECT count(*) from tableA").show()
+--------+|count(1)|+--------+|       8|+--------+

 此外UDF也可在现成的SQL中注册和调用

@pandas_udf("integer")def add_one(s: pd.Series) -> pd.Series:    return s + 1spark.udf.register("add_one", add_one)spark.sql("SELECT add_one(v1) FROM tableA").show()

 

这些SQL表达式可以直接混合并用作PySpark列。

from pyspark.sql.functions import exprdf.selectExpr('add_one(v1)').show()df.select(expr('count(*)') > 0).show()


点关注,防走丢,如有纰漏之处,请留言指教,非常感谢

以上就是本期全部内容。我是fanstuck ,有问题大家随时留言讨论 ,我们下期见。

来源地址:https://blog.csdn.net/master_hunter/article/details/125855069

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

PySpark数据分析基础:PySpark基础功能及DataFrame操作基础语法详解

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

django基础之数据库操作方法(详解)

Django 自称是“最适合开发有限期的完美WEB框架”。本文参考《Django web开发指南》,快速搭建一个blog 出来,在中间涉及诸多知识点,这里不会详细说明,如果你是第一次接触Django ,本文会让你在感性上对Django有个认
2022-06-04

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录