PySpark
一、概念
Spack是什么?
Apache Spark是用于大规模数据处理的统一分析引擎,是一款分布式的计算框架,用于调度成百上千的服务器集群,计算TB、PB乃至EB级别的海量数据。
PySpark是什么?
pyspark是用spark官方开发的python第三方库,可以使用pip程序快速安装,并像其他第三方库那样使用。PySpark可以作为Python库进行数据处理,提交至Spark集群进行分布式集群计算。
二、准备工作
安装PySpark
按win+r键,输入cmd打开命令提示符程序,输入
pip install pystark
或使用国内代理镜像站(清华大学源)
pip install -i https://pypi.tuna/tsinghua.edu.cn/simple pyspark
也可以在Pycharm里直接安装
除此之外,还需要安装java,地址:https://www.oracle.com/java/technologies/downloads/
配置环境:
变量名:JAVA_HOME变量值:C:\Program Files (x86)\Java\jdk-20 // 要根据自己的实际路径配置变量名:CLASSPATH变量值:.;%JAVA_HOME%\lib\dt.jar;%JAVA_HOME%\lib\tools.jar; 变量名:Path变量值:%JAVA_HOME%\bin;%JAVA_HOME%\jre\bin;
重启Pycharm,选择Run—>Edit Configurations,添加JAVAHOME
3.测试是否安装成功
想要使用Pyspark库完成数据处理,首先需要构建一个执行环境入口对象
PySpark的执行环境入口对象是:类SparkContext的类对象
# 导包from pyspark import SparkConf, SparkContext# 创建SparkConf类对象#conf=SparkConf()#conf.setMaster("local[*]") # setMaster()指定spark的运行模式,local指以单机模式运行在本机上#conf.setAppName("test_spark_app") #指定名称#链式调用的原则是调用的方法返回值都是同一个对象conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app") # 基于SparkConf类对象创建SparkContext类对象sc = SparkContext(conf=conf) #sc就是执行环境入口对象# 打印PySpark的运行版本print(sc.version)# 停止SparkContext对象的运行(停止运行PyStark程序)sc.stop()
运行代码,成功显示PySpark的运行版本,证明安装成功
三、PySpark的编程模型
SparkContext类对象,是PySpark编程中一切功能的入口
PySpark的编程,主要分为如下三大步骤:
1.数据输入
通过SparkContext类对象的成员方法完成数据的读取操作,读取后得到RDD类对象
2.数据处理计算
通过RDD类对象的成员方法,完成各种数据计算的需求
3.数据输出
将处理完成后的RDD对像调用各种成员方法完成写出文件,转换为list,tuple,dict等操作
RDD(Resilient Distributed Datasets),全称:弹性分布式数据集
RDD对象:PySpark支持多种数据的输入,在输入完成之后,都会得到一个RDD的对象。
PySpark针对数据的处理,都是以RDD对象作为载体,即:
- 数据存储在RDD中内
- 各类数据的计算方式也都是RDD的成员方法
- RDD的数据计算方法,返回值依旧是RDD对象
四、数据输入
from pyspark import SparkConf, SparkContextconf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")sc = SparkContext(conf=conf)# 通过parallelize方法将Python对象加载到Spark内,成为RDD对象rdd1 = sc.parallelize([1, 2, 3, 4, 5])rdd2 = sc.parallelize((1, 2, 3, 4, 5))rdd3 = sc.parallelize("PySpark")rdd4 = sc.parallelize({1, 2, 3, 4, 5})rdd5 = sc.parallelize({"key1": "value1", "key2": "value2"})#读取文件转RDD对象rdd6 = sc.textFile("C:/test.txt")# 如果要查看RDD中有什么内容,需要用collect()方法print(rdd1.collect()) # [1, 2, 3, 4, 5]print(rdd2.collect()) # [1, 2, 3, 4, 5]print(rdd3.collect()) # ['P', 'y', 'S', 'p', 'a', 'r', 'k']print(rdd4.collect()) # [1, 2, 3, 4, 5]print(rdd5.collect()) # ['key1', 'key2']print(rdd6.collect()) # ['测试1', '', '测试2', '', '测试3']sc.stop()
五、数据处理
RDD内置丰富的成员方法(算子)
map算子
功能:map算子是将RDD的数据一条条处理(处理的逻辑基于map算子中接受的处理函数),返回新的RDD
from pyspark import SparkConf, SparkContext#Spark不能自动找到python解释器,需要指定import osos.environ['PYSPARK_PYTHON'] = 'C:/Users/tangling/venv/Scripts/python.exe'conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")sc = SparkContext(conf=conf)rdd = sc.parallelize([1, 2, 3, 4, 5])# 通过map方法将全部数据都乘10def func(data): return data * 10rdd2 = rdd.map(func)print(rdd2.collect()) #[10, 20, 30, 40, 50]# 链式调用# 给rdd中的每一个元素乘10后再加5rdd3=rdd.map(lambda x:x*10).map(lambda x:x+5)print(rdd3.collect()) #[15, 25, 35, 45, 55]sc.stop()
flatmap算子
功能:对RDD执行map操作,然后进行解除嵌套操作(与map相比之多了一层解嵌套)
解除嵌套:
list=[[1,2,3],[4,5,6],[7,8,9]] ——> list=[1,2,3,4,5,6,7,8,9]
from pyspark import SparkConf,SparkContextimport osos.environ['PYSPARK_PYTHON']='C:/Users/tangling/venv/Scripts/python.exe'conf=SparkConf().setMaster("local[*]").setAppName("test_spark")sc=SparkContext(conf=conf)rdd=sc.parallelize(["python project","hello world"])#需求:将RDD数据中的一个个单词提取出来rdd2=rdd.flatMap(lambda x:x.split(" "))print(rdd2.collect()) #['python', 'project', 'hello', 'world']sc.close()
reduceByKey算子
功能:针对KV型RDD,自动按照Key分组,然后根据你提供的聚合逻辑,完成组内数据(value)的聚合操作func:(V,V)—>V
接受2个传入参数(类型要一致),返回一个返回值,类型和传入要求一致
from pyspark import SparkConf, SparkContextimport osos.environ['PYSPARK_PYTHON'] = 'C:/Users/tangling/venv/Scripts/python.exe'conf = SparkConf().setMaster("local[*]").setAppName("test_spark")sc = SparkContext(conf=conf)rdd = sc.parallelize([('男', 99), ('男', 88), ('男', 79), ('女', 97), ('女', 89)])# 需求:求男生和女生两个组的成绩之和result = rdd.reduceByKey(lambda a, b: a + b)print(result.collect()) #[('男', 266), ('女', 186)]
案例:统计指定文件的词频
from pyspark import SparkConf, SparkContextimport osos.environ['PYSPARK_PYTHON'] = 'C:/Users/tangling/venv/Scripts/python.exe'conf = SparkConf().setMaster("local[*]").setAppName("test_spark")sc = SparkContext(conf=conf)rdd = sc.textFile('C:/test.txt')#取出全部单词words=rdd.flatMap(lambda x:x.split(" "))#将所有单词都转化成二元元组,单词为key,value设置为1word_with_one_rdd=words.map(lambda word:(word,1)) #分组求和result=word_with_one_rdd.reduceByKey(lambda a,b:a+b)print(result.collect()) #[('world', 1), ('i', 3), ('love', 2), ('yuanyuan', 1), ('pangpang', 1), ('am', 1), ('tangling', 1), ('hello', 1)]
Filter算子
功能:过滤想要的数据进行保留
func:(T)—>bool 传入一个参数进来类型随意,返回值必须是True或False,返回值是True的被留下来,False的数据被丢弃
from pyspark import SparkConf, SparkContextimport osos.environ['PYSPARK_PYTHON'] = 'C:/Users/tangling/venv/Scripts/python.exe'conf = SparkConf().setMaster("local[*]").setAppName("test_spark")sc = SparkContext(conf=conf)rdd = sc.parallelize([1, 2, 3, 4, 5])#只保留rdd中的偶数result = rdd.filter(lambda num: num % 2 == 0)print(result.collect()) #[2,4]
distinct算子
功能:对RDD数据进行去重,返回新的RDD,无需传参
rdd = sc.parallelize([1, 1,2,3,3,4])result = rdd.distinct()print(result.collect()) #[1,2,3,4]
sortBy算子
功能:对RDD数据进行排序,基于你指定的排序依据
语法:rdd.sortBy(func,ascending=False,numPartitions=1)
func:(T)—>U:告知按照RDD中的哪一个数据进行排序,比如lambda x:x[1]表示按照rdd中的第二列元素进行排序
ascending=True(升序)False(降序)
numPartitons:用多少分区排序,全局排序需要设置分区数为1
对之前案例中的结果进行排序
from pyspark import SparkConf, SparkContextimport osos.environ['PYSPARK_PYTHON'] = 'C:/Users/tangling/venv/Scripts/python.exe'conf = SparkConf().setMaster("local[*]").setAppName("test_spark")sc = SparkContext(conf=conf)rdd = sc.textFile('C:/test.txt')#取出全部单词words=rdd.flatMap(lambda x:x.split(" "))#将所有单词都转化成二元元组,单词为key,value设置为1word_with_one_rdd=words.map(lambda word:(word,1)) #分组求和result_rdd=word_with_one_rdd.reduceByKey(lambda a,b:a+b)#对结果进行排序final_rdd=result_rdd.sortBy(lambda x:x[1],ascending=False,numPartitions=1)print(final_rdd.collect()) #[('i', 3), ('love', 2), ('world', 1), ('yuanyuan', 1), ('pangpang', 1), ('am', 1), ('tangling', 1), ('hello', 1)]
综合案例:结合所学知识,完成以下需求:
需求1:城市销售额排名
需求2:全部城市有哪些商品类别在售卖
需求3:北京有哪些商品类型在售卖
{“id”:1,“timestamp”:“2019-05-08T01:03.00Z”,“category”:“平板电脑”,“areaName”:“北京”,“money”:“1450”}|{“id”:2,“timestamp”:“2019-05-08T01:01.00Z”,“category”:“手机”,“areaName”:“北京”,“money”:“1450”}|{“id”:3,“timestamp”:“2019-05-08T01:03.00Z”,“category”:“手机”,“areaName”:“北京”,“money”:“8412”}
{“id”:4,“timestamp”:“2019-05-08T05:01.00Z”,“category”:“电脑”,“areaName”:“上海”,“money”:“1513”}|{“id”:5,“timestamp”:“2019-05-08T01:03.00Z”,“category”:“家电”,“areaName”:“北京”,“money”:“1550”}|{“id”:6,“timestamp”:“2019-05-08T01:01.00Z”,“category”:“电脑”,“areaName”:“杭州”,“money”:“1550”}
{“id”:7,“timestamp”:“2019-05-08T01:03.00Z”,“category”:“电脑”,“areaName”:“北京”,“money”:“5611”}|{“id”:8,“timestamp”:“2019-05-08T03:01.00Z”,“category”:“家电”,“areaName”:“北京”,“money”:“4410”}|{“id”:9,“timestamp”:“2019-05-08T01:03.00Z”,“category”:“家具”,“areaName”:“郑州”,“money”:“1120”}
{“id”:10,“timestamp”:“2019-05-08T01:01.00Z”,“category”:“家具”,“areaName”:“北京”,“money”:“6661”}|{“id”:11,“timestamp”:“2019-05-08T05:03.00Z”,“category”:“家具”,“areaName”:“杭州”,“money”:“1230”}|{“id”:12,“timestamp”:“2019-05-08T01:01.00Z”,“category”:“书籍”,“areaName”:“北京”,“money”:“5550”}
{“id”:13,“timestamp”:“2019-05-08T01:03.00Z”,“category”:“书籍”,“areaName”:“北京”,“money”:“5550”}|{“id”:14,“timestamp”:“2019-05-08T01:01.00Z”,“category”:“电脑”,“areaName”:“北京”,“money”:“1261”}|{“id”:15,“timestamp”:“2019-05-08T03:03.00Z”,“category”:“电脑”,“areaName”:“杭州”,“money”:“6660”}
{“id”:16,“timestamp”:“2019-05-08T01:01.00Z”,“category”:“电脑”,“areaName”:“天津”,“money”:“6660”}|{“id”:17,“timestamp”:“2019-05-08T01:03.00Z”,“category”:“书籍”,“areaName”:“北京”,“money”:“9000”}|{“id”:18,“timestamp”:“2019-05-08T05:01.00Z”,“category”:“书籍”,“areaName”:“北京”,“money”:“1230”}
{“id”:19,“timestamp”:“2019-05-08T01:03.00Z”,“category”:“电脑”,“areaName”:“杭州”,“money”:“5551”}|{“id”:20,“timestamp”:“2019-05-08T01:01.00Z”,“category”:“电脑”,“areaName”:“北京”,“money”:“2450”}
{“id”:21,“timestamp”:“2019-05-08T01:03.00Z”,“category”:“食品”,“areaName”:“北京”,“money”:“5520”}|{“id”:22,“timestamp”:“2019-05-08T01:01.00Z”,“category”:“食品”,“areaName”:“北京”,“money”:“6650”}
{“id”:23,“timestamp”:“2019-05-08T01:03.00Z”,“category”:“服饰”,“areaName”:“杭州”,“money”:“1240”}|{“id”:24,“timestamp”:“2019-05-08T01:01.00Z”,“category”:“食品”,“areaName”:“天津”,“money”:“5600”}
{“id”:25,“timestamp”:“2019-05-08T01:03.00Z”,“category”:“食品”,“areaName”:“北京”,“money”:“7801”}|{“id”:26,“timestamp”:“2019-05-08T01:01.00Z”,“category”:“服饰”,“areaName”:“北京”,“money”:“9000”}
{“id”:27,“timestamp”:“2019-05-08T01:03.00Z”,“category”:“服饰”,“areaName”:“杭州”,“money”:“5600”}|{“id”:28,“timestamp”:“2019-05-08T01:01.00Z”,“category”:“食品”,“areaName”:“北京”,“money”:“8000”}|{“id”:29,“timestamp”:“2019-05-08T02:03.00Z”,“category”:“服饰”,“areaName”:“杭州”,“money”:“7000”}
from pyspark import SparkConf, SparkContextimport jsonimport osos.environ['PYSPARK_PYTHON'] = 'C:/Users/tangling/venv/Scripts/python.exe'conf = SparkConf().setMaster("local[*]").setAppName("test_spark")sc = SparkContext(conf=conf)# TODO 需求1:城市销售额排名file_rdd = sc.textFile('C:/orders.txt')# 取出一个个json字符串json_str_rdd = file_rdd.flatMap(lambda x: x.split("|"))# 将json字符串都转化成字典dict_rdd = json_str_rdd.map(lambda x: json.loads(x))#取出城市和销售额数据city=dict_rdd.map(lambda x:(x['areaName'],int(x['money'])))#按城市分组按销售额聚合city_result_rdd=city.reduceByKey(lambda a,b:a+b)#按销售额聚合结果进行排序city_result_rdd.sortBy(lambda x:x[1],ascending=False,numPartitions=1)print(f"需求1的结果{city_result_rdd.collect()}")#TODO 需求2:全部城市有哪些商品类别在售卖category_rdd=dict_rdd.map(lambda x:x['category']).distinct()print(f"需求2的结果{category_rdd.collect()}")#TODO 需求3:北京有哪些商品类型在售卖#过滤出北京的数据beijing_data_rdd=dict_rdd.filter(lambda x:x['areaName']=='北京')#取出全部商品类别result3=beijing_data_rdd.map(lambda x:x['category']).distinct()print(f"需求3的结果{result3.collect()}")
六、数据输出
数据输出的方法:
collect算子
功能:将RDD各个分区内的数据,统一收集到Driver中,形成一个List对象 即 RDD—>LIST
rdd = sc.parallelize([1, 2, 3, 4, 5])# collect算子,输出RDD为list对象rdd_list: list = rdd.collect()print(rdd_list) # [1, 2, 3, 4, 5]print(type(rdd_list)) #
reduce算子
功能:对RDD数据集按照你传入的逻辑进行聚合
语法:rdd.reduce(func)
func:(T,T)—>T 2个参数,一个返回值,返回值和参数要求类型一致
逻辑图:
rdd = sc.parallelize(range(1,10))# 将rdd的数据进行累加求和print(rdd.reduce(lambda a,b:a+b)) #45
take算子
功能:取出RDD的前n个元素,组合成list返回
rdd=sc.parallelize([3,2,1,3,4,5]).take(5)print(rdd) #[3, 2, 1, 3, 4]
count算子
功能:计算RDD有多少条数据,返回值是一个数字
rdd_count=sc.parallelize([3,2,1,3,4,5]).count()print(rdd_count)#6
saveAsTextFile算子 功能:将RDD的数据写入文本文件中,支持本地写出,hdfs等文件系统
rdd1=sc.parallelize([3,2,1,3,4,5])rdd1.saveAsTextFile("D:/output1")
修改rdd分区为1个
方法一:SparkConf对象设置属性全局并行度为1
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")conf.set("spark.default.parallelism","1")sc = SparkContext(conf=conf)
方法二:在创建RDD的时候,设置parallelize方法传入numSlices参数为1
rdd1=sc.parallelize([3,2,1,3,4,5],numSlices=1)#rdd1=sc.parallelize([1,2,3,4,5],1)
来源地址:https://blog.csdn.net/weixin_46086217/article/details/129805047
免责声明:
① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。
② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341