我的编程空间,编程开发者的网络收藏夹
学习永远不晚

PySpark

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

PySpark

一、概念

Spack是什么?
Apache Spark是用于大规模数据处理的统一分析引擎,是一款分布式的计算框架,用于调度成百上千的服务器集群,计算TB、PB乃至EB级别的海量数据。

PySpark是什么?
pyspark是用spark官方开发的python第三方库,可以使用pip程序快速安装,并像其他第三方库那样使用。PySpark可以作为Python库进行数据处理,提交至Spark集群进行分布式集群计算。

二、准备工作

安装PySpark
按win+r键,输入cmd打开命令提示符程序,输入
pip install pystark
或使用国内代理镜像站(清华大学源)
pip install -i https://pypi.tuna/tsinghua.edu.cn/simple pyspark
也可以在Pycharm里直接安装
在这里插入图片描述

除此之外,还需要安装java,地址:https://www.oracle.com/java/technologies/downloads/
配置环境:

变量名:JAVA_HOME变量值:C:\Program Files (x86)\Java\jdk-20     // 要根据自己的实际路径配置变量名:CLASSPATH变量值:.;%JAVA_HOME%\lib\dt.jar;%JAVA_HOME%\lib\tools.jar;     变量名:Path变量值:%JAVA_HOME%\bin;%JAVA_HOME%\jre\bin;

重启Pycharm,选择Run—>Edit Configurations,添加JAVAHOME
在这里插入图片描述
3.测试是否安装成功

想要使用Pyspark库完成数据处理,首先需要构建一个执行环境入口对象
PySpark的执行环境入口对象是:类SparkContext的类对象

# 导包from pyspark import SparkConf, SparkContext# 创建SparkConf类对象#conf=SparkConf()#conf.setMaster("local[*]")  # setMaster()指定spark的运行模式,local指以单机模式运行在本机上#conf.setAppName("test_spark_app") #指定名称#链式调用的原则是调用的方法返回值都是同一个对象conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")  # 基于SparkConf类对象创建SparkContext类对象sc = SparkContext(conf=conf)  #sc就是执行环境入口对象# 打印PySpark的运行版本print(sc.version)# 停止SparkContext对象的运行(停止运行PyStark程序)sc.stop()

运行代码,成功显示PySpark的运行版本,证明安装成功
在这里插入图片描述

三、PySpark的编程模型

SparkContext类对象,是PySpark编程中一切功能的入口

PySpark的编程,主要分为如下三大步骤:
1.数据输入
通过SparkContext类对象的成员方法完成数据的读取操作,读取后得到RDD类对象
2.数据处理计算
通过RDD类对象的成员方法,完成各种数据计算的需求
3.数据输出
将处理完成后的RDD对像调用各种成员方法完成写出文件,转换为list,tuple,dict等操作

RDD(Resilient Distributed Datasets),全称:弹性分布式数据集
RDD对象:PySpark支持多种数据的输入,在输入完成之后,都会得到一个RDD的对象。
PySpark针对数据的处理,都是以RDD对象作为载体,即:

  • 数据存储在RDD中内
  • 各类数据的计算方式也都是RDD的成员方法
  • RDD的数据计算方法,返回值依旧是RDD对象

四、数据输入

from pyspark import SparkConf, SparkContextconf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")sc = SparkContext(conf=conf)# 通过parallelize方法将Python对象加载到Spark内,成为RDD对象rdd1 = sc.parallelize([1, 2, 3, 4, 5])rdd2 = sc.parallelize((1, 2, 3, 4, 5))rdd3 = sc.parallelize("PySpark")rdd4 = sc.parallelize({1, 2, 3, 4, 5})rdd5 = sc.parallelize({"key1": "value1", "key2": "value2"})#读取文件转RDD对象rdd6 = sc.textFile("C:/test.txt")# 如果要查看RDD中有什么内容,需要用collect()方法print(rdd1.collect())  # [1, 2, 3, 4, 5]print(rdd2.collect())  # [1, 2, 3, 4, 5]print(rdd3.collect())  # ['P', 'y', 'S', 'p', 'a', 'r', 'k']print(rdd4.collect())  # [1, 2, 3, 4, 5]print(rdd5.collect())  # ['key1', 'key2']print(rdd6.collect())  # ['测试1', '', '测试2', '', '测试3']sc.stop()

五、数据处理

RDD内置丰富的成员方法(算子)

map算子
功能:map算子是将RDD的数据一条条处理(处理的逻辑基于map算子中接受的处理函数),返回新的RDD
在这里插入图片描述

from pyspark import SparkConf, SparkContext#Spark不能自动找到python解释器,需要指定import osos.environ['PYSPARK_PYTHON'] = 'C:/Users/tangling/venv/Scripts/python.exe'conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")sc = SparkContext(conf=conf)rdd = sc.parallelize([1, 2, 3, 4, 5])# 通过map方法将全部数据都乘10def func(data):    return data * 10rdd2 = rdd.map(func)print(rdd2.collect())    #[10, 20, 30, 40, 50]# 链式调用# 给rdd中的每一个元素乘10后再加5rdd3=rdd.map(lambda x:x*10).map(lambda x:x+5)print(rdd3.collect()) #[15, 25, 35, 45, 55]sc.stop()

flatmap算子
功能:对RDD执行map操作,然后进行解除嵌套操作(与map相比之多了一层解嵌套)
解除嵌套:
list=[[1,2,3],[4,5,6],[7,8,9]] ——> list=[1,2,3,4,5,6,7,8,9]

from pyspark import SparkConf,SparkContextimport osos.environ['PYSPARK_PYTHON']='C:/Users/tangling/venv/Scripts/python.exe'conf=SparkConf().setMaster("local[*]").setAppName("test_spark")sc=SparkContext(conf=conf)rdd=sc.parallelize(["python project","hello world"])#需求:将RDD数据中的一个个单词提取出来rdd2=rdd.flatMap(lambda x:x.split(" "))print(rdd2.collect())  #['python', 'project', 'hello', 'world']sc.close()

reduceByKey算子
功能:针对KV型RDD,自动按照Key分组,然后根据你提供的聚合逻辑,完成组内数据(value)的聚合操作

func:(V,V)—>V
接受2个传入参数(类型要一致),返回一个返回值,类型和传入要求一致

from pyspark import SparkConf, SparkContextimport osos.environ['PYSPARK_PYTHON'] = 'C:/Users/tangling/venv/Scripts/python.exe'conf = SparkConf().setMaster("local[*]").setAppName("test_spark")sc = SparkContext(conf=conf)rdd = sc.parallelize([('男', 99), ('男', 88), ('男', 79), ('女', 97), ('女', 89)])# 需求:求男生和女生两个组的成绩之和result = rdd.reduceByKey(lambda a, b: a + b)print(result.collect())   #[('男', 266), ('女', 186)]

案例:统计指定文件的词频

from pyspark import SparkConf, SparkContextimport osos.environ['PYSPARK_PYTHON'] = 'C:/Users/tangling/venv/Scripts/python.exe'conf = SparkConf().setMaster("local[*]").setAppName("test_spark")sc = SparkContext(conf=conf)rdd = sc.textFile('C:/test.txt')#取出全部单词words=rdd.flatMap(lambda x:x.split(" "))#将所有单词都转化成二元元组,单词为key,value设置为1word_with_one_rdd=words.map(lambda word:(word,1)) #分组求和result=word_with_one_rdd.reduceByKey(lambda a,b:a+b)print(result.collect())  #[('world', 1), ('i', 3), ('love', 2), ('yuanyuan', 1), ('pangpang', 1), ('am', 1), ('tangling', 1), ('hello', 1)]

Filter算子
功能:过滤想要的数据进行保留
func:(T)—>bool 传入一个参数进来类型随意,返回值必须是True或False,返回值是True的被留下来,False的数据被丢弃

from pyspark import SparkConf, SparkContextimport osos.environ['PYSPARK_PYTHON'] = 'C:/Users/tangling/venv/Scripts/python.exe'conf = SparkConf().setMaster("local[*]").setAppName("test_spark")sc = SparkContext(conf=conf)rdd = sc.parallelize([1, 2, 3, 4, 5])#只保留rdd中的偶数result = rdd.filter(lambda num: num % 2 == 0)print(result.collect())    #[2,4]

distinct算子
功能:对RDD数据进行去重,返回新的RDD,无需传参

rdd = sc.parallelize([1, 1,2,3,3,4])result = rdd.distinct()print(result.collect())   #[1,2,3,4]

sortBy算子
功能:对RDD数据进行排序,基于你指定的排序依据
语法:rdd.sortBy(func,ascending=False,numPartitions=1)
func:(T)—>U:告知按照RDD中的哪一个数据进行排序,比如lambda x:x[1]表示按照rdd中的第二列元素进行排序
ascending=True(升序)False(降序)
numPartitons:用多少分区排序,全局排序需要设置分区数为1

对之前案例中的结果进行排序

from pyspark import SparkConf, SparkContextimport osos.environ['PYSPARK_PYTHON'] = 'C:/Users/tangling/venv/Scripts/python.exe'conf = SparkConf().setMaster("local[*]").setAppName("test_spark")sc = SparkContext(conf=conf)rdd = sc.textFile('C:/test.txt')#取出全部单词words=rdd.flatMap(lambda x:x.split(" "))#将所有单词都转化成二元元组,单词为key,value设置为1word_with_one_rdd=words.map(lambda word:(word,1)) #分组求和result_rdd=word_with_one_rdd.reduceByKey(lambda a,b:a+b)#对结果进行排序final_rdd=result_rdd.sortBy(lambda x:x[1],ascending=False,numPartitions=1)print(final_rdd.collect()) #[('i', 3), ('love', 2), ('world', 1), ('yuanyuan', 1), ('pangpang', 1), ('am', 1), ('tangling', 1), ('hello', 1)]

综合案例:结合所学知识,完成以下需求:
需求1:城市销售额排名
需求2:全部城市有哪些商品类别在售卖
需求3:北京有哪些商品类型在售卖

{“id”:1,“timestamp”:“2019-05-08T01:03.00Z”,“category”:“平板电脑”,“areaName”:“北京”,“money”:“1450”}|{“id”:2,“timestamp”:“2019-05-08T01:01.00Z”,“category”:“手机”,“areaName”:“北京”,“money”:“1450”}|{“id”:3,“timestamp”:“2019-05-08T01:03.00Z”,“category”:“手机”,“areaName”:“北京”,“money”:“8412”}
{“id”:4,“timestamp”:“2019-05-08T05:01.00Z”,“category”:“电脑”,“areaName”:“上海”,“money”:“1513”}|{“id”:5,“timestamp”:“2019-05-08T01:03.00Z”,“category”:“家电”,“areaName”:“北京”,“money”:“1550”}|{“id”:6,“timestamp”:“2019-05-08T01:01.00Z”,“category”:“电脑”,“areaName”:“杭州”,“money”:“1550”}
{“id”:7,“timestamp”:“2019-05-08T01:03.00Z”,“category”:“电脑”,“areaName”:“北京”,“money”:“5611”}|{“id”:8,“timestamp”:“2019-05-08T03:01.00Z”,“category”:“家电”,“areaName”:“北京”,“money”:“4410”}|{“id”:9,“timestamp”:“2019-05-08T01:03.00Z”,“category”:“家具”,“areaName”:“郑州”,“money”:“1120”}
{“id”:10,“timestamp”:“2019-05-08T01:01.00Z”,“category”:“家具”,“areaName”:“北京”,“money”:“6661”}|{“id”:11,“timestamp”:“2019-05-08T05:03.00Z”,“category”:“家具”,“areaName”:“杭州”,“money”:“1230”}|{“id”:12,“timestamp”:“2019-05-08T01:01.00Z”,“category”:“书籍”,“areaName”:“北京”,“money”:“5550”}
{“id”:13,“timestamp”:“2019-05-08T01:03.00Z”,“category”:“书籍”,“areaName”:“北京”,“money”:“5550”}|{“id”:14,“timestamp”:“2019-05-08T01:01.00Z”,“category”:“电脑”,“areaName”:“北京”,“money”:“1261”}|{“id”:15,“timestamp”:“2019-05-08T03:03.00Z”,“category”:“电脑”,“areaName”:“杭州”,“money”:“6660”}
{“id”:16,“timestamp”:“2019-05-08T01:01.00Z”,“category”:“电脑”,“areaName”:“天津”,“money”:“6660”}|{“id”:17,“timestamp”:“2019-05-08T01:03.00Z”,“category”:“书籍”,“areaName”:“北京”,“money”:“9000”}|{“id”:18,“timestamp”:“2019-05-08T05:01.00Z”,“category”:“书籍”,“areaName”:“北京”,“money”:“1230”}
{“id”:19,“timestamp”:“2019-05-08T01:03.00Z”,“category”:“电脑”,“areaName”:“杭州”,“money”:“5551”}|{“id”:20,“timestamp”:“2019-05-08T01:01.00Z”,“category”:“电脑”,“areaName”:“北京”,“money”:“2450”}
{“id”:21,“timestamp”:“2019-05-08T01:03.00Z”,“category”:“食品”,“areaName”:“北京”,“money”:“5520”}|{“id”:22,“timestamp”:“2019-05-08T01:01.00Z”,“category”:“食品”,“areaName”:“北京”,“money”:“6650”}
{“id”:23,“timestamp”:“2019-05-08T01:03.00Z”,“category”:“服饰”,“areaName”:“杭州”,“money”:“1240”}|{“id”:24,“timestamp”:“2019-05-08T01:01.00Z”,“category”:“食品”,“areaName”:“天津”,“money”:“5600”}
{“id”:25,“timestamp”:“2019-05-08T01:03.00Z”,“category”:“食品”,“areaName”:“北京”,“money”:“7801”}|{“id”:26,“timestamp”:“2019-05-08T01:01.00Z”,“category”:“服饰”,“areaName”:“北京”,“money”:“9000”}
{“id”:27,“timestamp”:“2019-05-08T01:03.00Z”,“category”:“服饰”,“areaName”:“杭州”,“money”:“5600”}|{“id”:28,“timestamp”:“2019-05-08T01:01.00Z”,“category”:“食品”,“areaName”:“北京”,“money”:“8000”}|{“id”:29,“timestamp”:“2019-05-08T02:03.00Z”,“category”:“服饰”,“areaName”:“杭州”,“money”:“7000”}

from pyspark import SparkConf, SparkContextimport jsonimport osos.environ['PYSPARK_PYTHON'] = 'C:/Users/tangling/venv/Scripts/python.exe'conf = SparkConf().setMaster("local[*]").setAppName("test_spark")sc = SparkContext(conf=conf)# TODO 需求1:城市销售额排名file_rdd = sc.textFile('C:/orders.txt')# 取出一个个json字符串json_str_rdd = file_rdd.flatMap(lambda x: x.split("|"))# 将json字符串都转化成字典dict_rdd = json_str_rdd.map(lambda x: json.loads(x))#取出城市和销售额数据city=dict_rdd.map(lambda x:(x['areaName'],int(x['money'])))#按城市分组按销售额聚合city_result_rdd=city.reduceByKey(lambda a,b:a+b)#按销售额聚合结果进行排序city_result_rdd.sortBy(lambda x:x[1],ascending=False,numPartitions=1)print(f"需求1的结果{city_result_rdd.collect()}")#TODO  需求2:全部城市有哪些商品类别在售卖category_rdd=dict_rdd.map(lambda x:x['category']).distinct()print(f"需求2的结果{category_rdd.collect()}")#TODO   需求3:北京有哪些商品类型在售卖#过滤出北京的数据beijing_data_rdd=dict_rdd.filter(lambda x:x['areaName']=='北京')#取出全部商品类别result3=beijing_data_rdd.map(lambda x:x['category']).distinct()print(f"需求3的结果{result3.collect()}")

六、数据输出

数据输出的方法:

collect算子
功能:将RDD各个分区内的数据,统一收集到Driver中,形成一个List对象 即 RDD—>LIST

rdd = sc.parallelize([1, 2, 3, 4, 5])# collect算子,输出RDD为list对象rdd_list: list = rdd.collect()print(rdd_list)  # [1, 2, 3, 4, 5]print(type(rdd_list))  #

reduce算子
功能:对RDD数据集按照你传入的逻辑进行聚合
语法:rdd.reduce(func)
func:(T,T)—>T 2个参数,一个返回值,返回值和参数要求类型一致
逻辑图:在这里插入图片描述

rdd = sc.parallelize(range(1,10))# 将rdd的数据进行累加求和print(rdd.reduce(lambda a,b:a+b))    #45

take算子
功能:取出RDD的前n个元素,组合成list返回

rdd=sc.parallelize([3,2,1,3,4,5]).take(5)print(rdd)    #[3, 2, 1, 3, 4]

count算子
功能:计算RDD有多少条数据,返回值是一个数字

rdd_count=sc.parallelize([3,2,1,3,4,5]).count()print(rdd_count)#6

saveAsTextFile算子 功能:将RDD的数据写入文本文件中,支持本地写出,hdfs等文件系统

rdd1=sc.parallelize([3,2,1,3,4,5])rdd1.saveAsTextFile("D:/output1")

修改rdd分区为1个
方法一:SparkConf对象设置属性全局并行度为1

conf = SparkConf().setMaster("local[*]").setAppName("test_spark")conf.set("spark.default.parallelism","1")sc = SparkContext(conf=conf)

方法二:在创建RDD的时候,设置parallelize方法传入numSlices参数为1

rdd1=sc.parallelize([3,2,1,3,4,5],numSlices=1)#rdd1=sc.parallelize([1,2,3,4,5],1)

来源地址:https://blog.csdn.net/weixin_46086217/article/details/129805047

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

PySpark

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

2023-08-31

pyspark修改python版本

ubuntu自带的python 版本是2.7,我们要把pyspark默认改成anaconda python 3.6down votYou can specify the version of Python for the driver by
2023-01-31

python实例pyspark以及pyt

%pyspark#查询认证用户import sys#import MySQLdbimport mysql.connectorimport pandas as pdimport datetimeimport timeoptmap = {   
2023-01-31

Pyspark如何读取parquet数据

这期内容当中小编将会给大家带来有关Pyspark如何读取parquet数据,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。可以跳过不符合条件的数据,只读取需要的数据,降低IO数据量;压缩编码可以降低磁盘存储
2023-06-03

PySpark和RDD对象最新详解

Spark是一款分布式的计算框架,用于调度成百上千的服务器集群,计算TB、PB乃至EB级别的海量数据,PySpark是由Spark官方开发的Python语言第三方库,本文重点介绍PySpark和RDD对象,感兴趣的朋友一起看看吧
2023-01-11

如何在pyspark中创建DataFrame

如何在pyspark中创建DataFrame?很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。pyspark创建DataFrame为了便于操作,使用pyspark时
2023-06-15

PySpark中RDD的数据输出详解

这篇文章主要介绍了PySpark中RDD的数据输出详解,需要的朋友可以参考下
2023-01-15

如何在windowns中配置PySpark环境

如何在windowns中配置PySpark环境?相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。首先需要下载hadoop和spark,解压,然后设置环境变量。hadoop清华源下载
2023-06-15

Linux下远程连接Jupyter+pyspark部署教程

博主最近试在服务器上进行spark编程,因此,在开始编程作业之前,要先搭建一个便利的编程环境,这样才能做到舒心地开发。本文主要有以下内容: 1、python多版本管理利器-pythonbrew 2、Jupyter notebooks 安装
2022-06-04

pyspark dataframe列的合并与拆分实例

这篇文章主要介绍了pyspark dataframe列的合并与拆分实例,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
2023-03-23

pyspark dataframe列的合并与拆分方法是什么

这篇文章主要介绍了pyspark dataframe列的合并与拆分方法是什么的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇pyspark dataframe列的合并与拆分方法是什么文章都会有所收获,下面我们一起
2023-07-05

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录