我的编程空间,编程开发者的网络收藏夹
学习永远不晚

如何在pyspark中创建DataFrame

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

如何在pyspark中创建DataFrame

如何在pyspark中创建DataFrame?很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。

pyspark创建DataFrame

为了便于操作,使用pyspark时我们通常将数据转为DataFrame的形式来完成清洗和分析动作。

RDD和DataFrame

在上一篇pyspark基本操作有提到RDD也是spark中的操作的分布式数据对象。

这里简单看一下RDD和DataFrame的类型。

print(type(rdd))  # <class 'pyspark.rdd.RDD'>print(type(df))   # <class 'pyspark.sql.dataframe.DataFrame'>

翻阅了一下源码的定义,可以看到他们之间并没有继承关系。

class RDD(object):    """    A Resilient Distributed Dataset (RDD), the basic abstraction in Spark.    Represents an immutable, partitioned collection of elements that can be    operated on in parallel.    """
class DataFrame(object):    """A distributed collection of data grouped into named columns.    A :class:`DataFrame` is equivalent to a relational table in Spark SQL,    and can be created using various functions in :class:`SparkSession`:: ...    """

RDD是一种弹性分布式数据集,Spark中的基本抽象。表示一种不可变的、分区储存的集合,可以进行并行操作。
DataFrame是一种以列对数据进行分组表达的分布式集合, DataFrame等同于Spark SQL中的关系表。相同点是,他们都是为了支持分布式计算而设计。

但是RDD只是元素的集合,但是DataFrame以列进行分组,类似于MySQL的表或pandas中的DataFrame。

如何在pyspark中创建DataFrame

实际工作中,我们用的更多的还是DataFrame。

使用二元组创建DataFrame

尝试第一种情形发现,仅仅传入二元组,结果是没有列名称的。
于是我们尝试第二种,同时传入二元组和列名称。

a = [('Alice', 1)]output = spark.createDataFrame(a).collect()print(output)# [Row(_1='Alice', _2=1)]output = spark.createDataFrame(a, ['name', 'age']).collect()print(output)# [Row(name='Alice', age=1)]

这里collect()是按行展示数据表,也可以使用show()对数据表进行展示。

spark.createDataFrame(a).show()# +-----+---+# |   _1| _2|# +-----+---+# |Alice|  1|# +-----+---+spark.createDataFrame(a, ['name', 'age']).show()# +-----+---+# | name|age|# +-----+---+# |Alice|  1|# +-----+---+

使用键值对创建DataFrame

d = [{'name': 'Alice', 'age': 1}]output = spark.createDataFrame(d).collect()print(output)# [Row(age=1, name='Alice')]

使用rdd创建DataFrame

a = [('Alice', 1)]rdd = sc.parallelize(a)output = spark.createDataFrame(rdd).collect()print(output)output = spark.createDataFrame(rdd, ["name", "age"]).collect()print(output)# [Row(_1='Alice', _2=1)]# [Row(name='Alice', age=1)]

基于rdd和ROW创建DataFrame

from pyspark.sql import Rowa = [('Alice', 1)]rdd = sc.parallelize(a)Person = Row("name", "age")person = rdd.map(lambda r: Person(*r))output = spark.createDataFrame(person).collect()print(output)# [Row(name='Alice', age=1)]

基于rdd和StructType创建DataFrame

from pyspark.sql.types import *a = [('Alice', 1)]rdd = sc.parallelize(a)schema = StructType(    [        StructField("name", StringType(), True),        StructField("age", IntegerType(), True)    ])output = spark.createDataFrame(rdd, schema).collect()print(output)# [Row(name='Alice', age=1)]

基于pandas DataFrame创建pyspark DataFrame

df.toPandas()可以把pyspark DataFrame转换为pandas DataFrame。

df = spark.createDataFrame(rdd, ['name', 'age'])print(df)  # DataFrame[name: string, age: bigint]print(type(df.toPandas()))  # <class 'pandas.core.frame.DataFrame'># 传入pandas DataFrameoutput = spark.createDataFrame(df.toPandas()).collect()print(output)# [Row(name='Alice', age=1)]

创建有序的DataFrame

output = spark.range(1, 7, 2).collect()print(output)# [Row(id=1), Row(id=3), Row(id=5)]output = spark.range(3).collect()print(output)# [Row(id=0), Row(id=1), Row(id=2)]

通过临时表得到DataFrame

spark.registerDataFrameAsTable(df, "table1")df2 = spark.table("table1")b = df.collect() == df2.collect()print(b)# True

配置DataFrame和临时表

创建DataFrame时指定列类型

在createDataFrame中可以指定列类型,只保留满足数据类型的列,如果没有满足的列,会抛出错误。

a = [('Alice', 1)]rdd = sc.parallelize(a)# 指定类型于预期数据对应时,正常创建output = spark.createDataFrame(rdd, "a: string, b: int").collect()print(output)  # [Row(a='Alice', b=1)]rdd = rdd.map(lambda row: row[1])print(rdd)  # PythonRDD[7] at RDD at PythonRDD.scala:53# 只有int类型对应上,过滤掉其他列。output = spark.createDataFrame(rdd, "int").collect()print(output)   # [Row(value=1)]# 没有列能对应上,会抛出错误。output = spark.createDataFrame(rdd, "boolean").collect()# TypeError: field value: BooleanType can not accept object 1 in type <class 'int'>

注册DataFrame为临时表

spark.registerDataFrameAsTable(df, "table1")spark.dropTempTable("table1")

获取和修改配置

print(spark.getConf("spark.sql.shuffle.partitions"))  # 200print(spark.getConf("spark.sql.shuffle.partitions", u"10"))  # 10print(spark.setConf("spark.sql.shuffle.partitions", u"50"))  # Noneprint(spark.getConf("spark.sql.shuffle.partitions", u"10"))  # 50

注册自定义函数

spark.registerFunction("stringLengthString", lambda x: len(x))output = spark.sql("SELECT stringLengthString('test')").collect()print(output)# [Row(stringLengthString(test)='4')]spark.registerFunction("stringLengthString", lambda x: len(x), IntegerType())output = spark.sql("SELECT stringLengthString('test')").collect()print(output)# [Row(stringLengthString(test)=4)]spark.udf.register("stringLengthInt", lambda x: len(x), IntegerType())output = spark.sql("SELECT stringLengthInt('test')").collect()print(output)# [Row(stringLengthInt(test)=4)]

查看临时表列表

可以查看所有临时表名称和对象。

spark.registerDataFrameAsTable(df, "table1")print(spark.tableNames())  # ['table1']print(spark.tables())  # DataFrame[database: string, tableName: string, isTemporary: boolean]print("table1" in spark.tableNames())  # Trueprint("table1" in spark.tableNames("default"))  # Truespark.registerDataFrameAsTable(df, "table1")df2 = spark.tables()df2.filter("tableName = 'table1'").first()print(df2)  # DataFrame[database: string, tableName: string, isTemporary: boolean]

从其他数据源创建DataFrame

MySQL

前提是需要下载jar包。
Mysql-connector-java.jar

from pyspark import SparkContextfrom pyspark.sql import SQLContextimport pyspark.sql.functions as Fsc = SparkContext("local", appName="mysqltest")sqlContext = SQLContext(sc)df = sqlContext.read.format("jdbc").options(    url="jdbc:mysql://localhost:3306/mydata?user=root&password=mysql&"        "useUnicode=true&characterEncoding=utf-8&useJDBCCompliantTimezoneShift=true&"        "useLegacyDatetimeCode=false&serverTimezone=UTC ", dbtable="detail_data").load()df.show(n=5)sc.stop()

看完上述内容是否对您有帮助呢?如果还想对相关知识有进一步的了解或阅读更多相关文章,请关注编程网行业资讯频道,感谢您对编程网的支持。

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

如何在pyspark中创建DataFrame

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

如何在pyspark中创建DataFrame

如何在pyspark中创建DataFrame?很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。pyspark创建DataFrame为了便于操作,使用pyspark时
2023-06-15

如何在Python中创建Dataframe

今天就跟大家聊聊有关如何在Python中创建Dataframe,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。Python主要用来做什么Python主要应用于:1、Web开发;2、数据
2023-06-14

如何在pandas中遍历dataframe

这篇文章将为大家详细讲解有关如何在pandas中遍历dataframe,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。方法一:使用df.iterrows()获取可迭代对象, 然后使用for循环
2023-06-14

如何在Python中使用DataFrame

如何在Python中使用DataFrame?相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。1、方法说明df.values:返回一个 ndarray 类型的对象,包涵 DataFr
2023-06-15

如何在windowns中配置PySpark环境

如何在windowns中配置PySpark环境?相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。首先需要下载hadoop和spark,解压,然后设置环境变量。hadoop清华源下载
2023-06-15

如何在python中创建堆

如何在python中创建堆?相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。python是什么意思Python是一种跨平台的、具有解释性、编译性、互动性和面向对象的脚本语言,其最初
2023-06-14

如何在Pandas中创建Series

如何在Pandas中创建Series?很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。1、使用一个列表生成一个SeriesmyList = [a, b, c, d]
2023-06-14

如何基于其他 DataFrame 行过滤器创建列?

问题内容我有一个名为“hourly_data”的 lazyframe,其中包含一个名为“time”的每小时日期时间列。我还有一个名为“future_periods”的 dataframe,其中包含两个日期时间列,称为“start”(未来
如何基于其他 DataFrame 行过滤器创建列?
2024-02-09

如何在Django中创建APP

这篇文章给大家分享的是有关如何在Django中创建APP的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。1.Django中创建APP首先我们打开PyCharm,找到之前你创建的Django项目,我的项目是First
2023-06-02

如何在PostgreSQL中创建表

要在PostgreSQL中创建表,您可以使用CREATE TABLE语句。以下是一个示例:CREATE TABLE table_name (column1 datatype1,column2 datatype2,column3 da
如何在PostgreSQL中创建表
2024-04-09

如何在oracle中创建表

创建 oracle 表涉及以下步骤:使用 create table 语法指定表名、列名、数据类型、约束和默认值。表名应简洁、描述性,且不超过 30 个字符。列名应描述性,数据类型指定列中存储的数据类型。not null 约束确保列中不允许使
如何在oracle中创建表
2024-06-12

如何在SQLServer中创建视图

要在SQL Server中创建视图,可以使用以下语法:CREATE VIEW view_name ASSELECT column1, column2, ...FROM table_nameWHERE condition;其中,vie
如何在SQLServer中创建视图
2024-04-09

如何在JavaScript中创建数组

这篇文章将为大家详细讲解有关如何在JavaScript中创建数组,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。javascript是一种什么语言javascript是一种动态类型、弱类型的语
2023-06-14

如何在Zabbix中创建主机

要在Zabbix中创建主机,您需要按照以下步骤操作:登录到Zabbix的Web界面。在左侧导航栏中,点击“配置”选项。在“配置”菜单下,选择“主机”选项。点击“创建主机”按钮。在弹出窗口中,输入主机的名称、可选的主机群组、可选的主机
如何在Zabbix中创建主机
2024-04-09

如何在SQLite中创建视图

要在SQLite中创建视图,可以使用CREATE VIEW语句。以下是一个示例:CREATE VIEW employee_view ASSELECT employee_id, first_name, last_name, departm
如何在SQLite中创建视图
2024-04-09

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录