我的编程空间,编程开发者的网络收藏夹
学习永远不晚

PySpark中RDD的数据输出详解

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

PySpark中RDD的数据输出详解

一. 回顾

数据输入:

  • sc.parallelize
  • sc.textFile

数据计算:

  • rdd.map
  • rdd.flatMap
  • rdd.reduceByKey
  • .…

二.输出为python对象

数据输出可用的方法是很多的,这里简单介绍常会用到的4个

  • collect:将RDD内容转换为list
  • reduce:对RDD内容进行自定义聚合
  • take:取出RDD的前N个元素组成list
  • count:统计RDD元素个数

collect算子

功能:将RDD各个分区内的数据,统一收集到Driver中,形成一个List对象
用法:
rdd.collect()
返回值是一个list

演示

from pyspark import SparkContext,SparkConf
import os
os.environ["PYSPARK_PYTHON"]="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe"
 
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
sc=SparkContext(conf=conf)
 
#准备一个RDD
rdd=sc.parallelize([1,2,3,4,5])
#collect算子,输出RDD为list对象
print("rdd是:",rdd)
print("rdd.collect是:",rdd.collect())

结果是

 单独输出rdd,输出的是rdd的类名而非内容

reduce算子

功能:对RDD数据集按照你传入的逻辑进行聚合

语法:

代码

 返回值等于计算函数的返回值

 演示

from pyspark import SparkContext,SparkConf
import os
os.environ["PYSPARK_PYTHON"]="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe"
 
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
sc=SparkContext(conf=conf)
 
#准备一个RDD
rdd=sc.parallelize([1,2,3,4,5])
#collect算子,输出RDD为list对象
print("rdd是:",rdd)
print("rdd.collect是:",rdd.collect())
print("rdd.collect的类型是:",type(rdd.collect()))
#reduce算子,对RDD进行两两聚合
num=rdd.reduce(lambda x,y:x+y)
print(num)

结果是

 take算子

功能:取RDD的前N个元素,组合成list返回给你
用法:

 

 演示

from pyspark import SparkContext,SparkConf
import os
os.environ["PYSPARK_PYTHON"]="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe"
 
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
sc=SparkContext(conf=conf)
 
#准备一个RDD
rdd=sc.parallelize([1,2,3,4,5])
#collect算子,输出RDD为list对象
print("rdd是:",rdd)
print("rdd.collect是:",rdd.collect())
print("rdd.collect的类型是:",type(rdd.collect()))
#reduce算子,对RDD进行两两聚合
num=rdd.reduce(lambda x,y:x+y)
print(num)
#take算子,取出RDD前n个元素,组成list返回
take_list=rdd.take(3)
print(take_list)

结果是

 count算子

功能:计算RDD有多少条数据,返回值是一个数字
用法:

 演示

from pyspark import SparkContext,SparkConf
import os
os.environ["PYSPARK_PYTHON"]="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe"
 
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
sc=SparkContext(conf=conf)
 
#准备一个RDD
rdd=sc.parallelize([1,2,3,4,5])
#collect算子,输出RDD为list对象
print("rdd是:",rdd)
print("rdd.collect是:",rdd.collect())
print("rdd.collect的类型是:",type(rdd.collect()))
#reduce算子,对RDD进行两两聚合
num=rdd.reduce(lambda x,y:x+y)
print(num)
#take算子,取出RDD前n个元素,组成list返回
take_list=rdd.take(3)
print(take_list)
#count算子,统计rdd中有多少条数据,返回值为数字
num_count=rdd.count()
print(num_count)
#关闭链接
sc.stop()

结果是

小结

1.Spark的编程流程就是:

  • 将数据加载为RDD(数据输入)对RDD进行计算(数据计算)
  • 将RDD转换为Python对象(数据输出)

 2.数据输出的方法

  • collect:将RDD内容转换为list
  • reduce:对RDD内容进行自定义聚合
  • take:取出RDD的前N个元素组成list
  • count:统计RDD元素个数

数据输出可用的方法是很多的,这里只是简单介绍4个

三.输出到文件中

savaAsTextFile算子

功能:将RDD的数据写入文本文件中支持本地写出, hdfs等文件系统.
代码:

 演示

 这是因为这个方法本质上依赖大数据的Hadoop框架,需要配置Hadoop 依赖.

配置Hadoop依赖

调用保存文件的算子,需要配置Hadoop依赖。

  • 下载Hadoop安装包解压到电脑任意位置
  • 在Python代码中使用os模块配置: os.environ['HADOOP_HOME']='HADOOP解压文件夹路径′。
  • 下载winutils.exe,并放入Hadoop解压文件夹的bin目录内
  • 下载hadoop.dll,并放入:C:/Windows/System32文件夹内

配置完成之后,执行下面的代码

from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe"
os.environ['HADOOP_HOME']='D:/heima_hadoop/hadoop-3.0.0'
 
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
sc=SparkContext(conf=conf)
 
#准备rdd
rdd1=sc.parallelize([1,2,3,4,5])
rdd2=sc.parallelize([("asdf",3),("w3er_!2",5),("hello",3)])
rdd3=sc.parallelize([[1,2,3],[3,2,4],[4,3,5]])
#输出到文件中
rdd1.saveAsTextFile("D:/output1")
rdd2.saveAsTextFile("D:/output2")
rdd3.saveAsTextFile("D:/output3")

结果是

 输出的文件夹中有这么8文件,是因为RDD被默认为分成8个分区
SaveAsTextFile算子输出文件的个数是根据RDD的分区来决定的,有多少分区就会输出多少个文件,RDD在本电脑中默认是8(该电脑CPU核心数是8核)

 打开设备管理器就可以查看处理器个数,这里是有8个逻辑CPU
或者打开任务管理器就可以看到是4核8个逻辑CPU

 修改rdd分区为1个

方式1, SparkConf对象设置属性全局并行度为1:

 方式2,创建RDD的时候设置( parallelize方法传入numSlices参数为1)

from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe"
os.environ['HADOOP_HOME']='D:/heima_hadoop/hadoop-3.0.0'
 
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
#rdd分区设置为1
conf.set("spark.default.parallelism","1")
sc=SparkContext(conf=conf)
 
#准备rdd
rdd1=sc.parallelize([1,2,3,4,5])
rdd2=sc.parallelize([("asdf",3),("w3er_!2",5),("hello",3)])
rdd3=sc.parallelize([[1,2,3],[3,2,4],[4,3,5]])
#输出到文件中
rdd1.saveAsTextFile("D:/output1")
rdd2.saveAsTextFile("D:/output2")
rdd3.saveAsTextFile("D:/output3")

结果是

 小结

1.RDD输出到文件的方法

  • rdd.saveAsTextFile(路径)
  • 输出的结果是一个文件夹
  • 有几个分区就输出多少个结果文件

2.如何修改RDD分区

  • SparkConf对象设置conf.set("spark.default.parallelism", "7")
  • 创建RDD的时候,sc.parallelize方法传入numSlices参数为1

四.练习案例

需求: 

读取文件转换成RDD,并完成:

  • 打印输出:热门搜索时间段(小时精度)Top3
  • 打印输出:热门搜索词Top3
  • 打印输出:统计黑马程序员关键字在哪个时段被搜索最多
  • 将数据转换为JSON格式,写出为文件

代码

from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe"
os.environ['HADOOP_HOME']='D:/heima_hadoop/hadoop-3.0.0'
 
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
#rdd分区设置为1
conf.set("spark.default.parallelism","1")
sc=SparkContext(conf=conf)
 
rdd=sc.textFile("D:/search_log.txt")
#需求1 打印输出:热门搜索时间段(小时精度)Top3
# 取出全部的时间并转换为小时
# 转换为(小时,1)的二元元组
# Key分组聚合Value
# 排序(降序)
# 取前3
result1=rdd.map(lambda x:x.split("\t")).\
    map(lambda x:x[0][:2]).\
    map(lambda x:(x,1)).\
    reduceByKey(lambda x,y:x+y).\
    sortBy(lambda x:x[1],ascending=False,numPartitions=1).\
    take(3)#上面用的‘/'是换行的意思,当一行代码太长时就可以这样用
print(result1)
#需求2 打印输出:热门搜索词Top3
# 取出全部的搜索词
# (词,1)二元元组
# 分组聚合
# 排序
# Top3
result2=rdd.map(lambda x:x.split("\t")).\
    map(lambda x:x[2])\
    .map(lambda x:(x,1)).\
    reduceByKey(lambda x,y:x+y).\
    sortBy(lambda x:x[1],ascending=False,numPartitions=1).\
    take(3)
print(result2)
#需求3 打印输出:统计黑马程序员关键字在哪个时段被搜索最多
result3=rdd.map(lambda x:x.split("\t")).\
    filter((lambda x:x[2]=="黑马程序员")).\
    map(lambda x:(x[0][:2],1)).\
    reduceByKey(lambda x,y:x+y).\
    sortBy(lambda x:x[1],ascending=False,numPartitions=1).\
    take(3)
print(result3)
#需求4 将数据转换为JSON格式,写出为文件
rdd.map(lambda x:x.split("\t")).\
    map(lambda x:{"time":x[0],"id":x[1],"key":x[2],"num1":x[3],"num2":x[4],"url":x[5]})\
    .saveAsTextFile("D:/out_json")

结果是

到此这篇关于PySpark中RDD的数据输出详解的文章就介绍到这了,更多相关PySpark RDD数据输出内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

PySpark中RDD的数据输出详解

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

PySpark中RDD的数据输出详解

这篇文章主要介绍了PySpark中RDD的数据输出详解,需要的朋友可以参考下
2023-01-15

关于linux中系统输入输出的管理详解

系统中输入输出的管理1.理解系统的输入输出linux系统中,1表示正确输出,2表示错误输出2.管理输入输出的符号(1)输出重定向(输出到指定的位置)> ##重定向正确输出 2> ##重定向错误输出
2022-06-04

图文详解Java中的字节输入与输出流

目录字节输入流字节输入流结构图FileInputStream类构造方法:常用读取方法:字节输出流字节输出流结构图:FileOutputStream类构造方法:常用写入方法:总结字节输入流java.io.InputStream抽象类是所有字节输入流的超类,将数据
2017-02-14

sql中怎么将输出的数据换行

在SQL中,可以使用CONCAT函数将输出的数据换行。例如,可以使用以下语句将两个字段合并为一个字段,并在它们之间添加换行符:SELECT CONCAT(field1, '\n', field2) AS combine
sql中怎么将输出的数据换行
2024-04-09

php中可以输出数据类型的是

php 中输出数据类型的方式有三种:使用 gettype() 函数返回变量的数据类型。使用 var_dump() 函数提供更详细的信息,包括数据类型。使用 print_r() 函数打印数据值并包括数据类型。PHP 中输出数据类型的方式PH
php中可以输出数据类型的是
2024-04-26

JavaScript中输出数据的方式有哪些

这篇文章主要介绍“JavaScript中输出数据的方式有哪些”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“JavaScript中输出数据的方式有哪些”文章能帮助大家解决问题。前言在 JavaScri
2023-06-29

C++ 函数调试详解:如何分析函数的输入和输出参数?

函数调试时,分析输入参数包括:类型匹配、范围、值和边值检查。输出参数分析包含:返回类型验证、指针有效性、引用有效性、值验证等。实战案例演示了如何测试输入和输出参数的有效性,帮助理解代码错误的定位和解决。C++ 函数调试详解:如何分析函数的输
C++ 函数调试详解:如何分析函数的输入和输出参数?
2024-05-03

MySQL中数据导入和导出的方法详解

MySQL中数据导入和导出的方法详解导入和导出数据是数据库管理中常用的操作,在MySQL中也有多种方法可以实现。本文将详细介绍几种常见的方法,并提供相应的代码示例。一、导出数据使用SELECT ... INTO OUTFILE语句导出数据在
2023-10-22

Java中常用数据类型的输入输出方法是什么

这篇文章主要介绍Java中常用数据类型的输入输出方法是什么,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!1、Char型这里说的char型指的是只输入一个字符的情况。1.1 输入格式:import java.io.I
2023-06-22

关于C++中数据16进制输出的方法

本文主要介绍了关于C++中数据16进制输出的方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
2023-03-09

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录