我的编程空间,编程开发者的网络收藏夹
学习永远不晚

【Python】PySpark 数据计算 ① ( RDD#map 方法 | RDD#map 语法 | 传入普通函数 | 传入 lambda 匿名函数 | 链式调用 )

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

【Python】PySpark 数据计算 ① ( RDD#map 方法 | RDD#map 语法 | 传入普通函数 | 传入 lambda 匿名函数 | 链式调用 )

文章目录





一、RDD#map 方法




1、RDD#map 方法引入


在 PySpark 中 RDD 对象 提供了一种 数据计算方法 RDD#map 方法 ;

该 RDD#map 函数 可以对 RDD 数据中的每个元素应用一个函数 , 该 被应用的函数 ,

  • 可以将每个元素转换为另一种类型 ,
  • 也可以针对 RDD 数据的 原始元素进行 指定操作 ;

计算完毕后 , 会返回一个新的 RDD 对象 ;


2、RDD#map 语法


map 方法 , 又称为 map 算子 , 可以将 RDD 中的数据元素 逐个进行处理 , 处理的逻辑 需要用外部 通过 参数传入 map 函数 ;


RDD#map 语法 :

rdd.map(fun)

传入的 fun 是一个函数 , 其函数类型为 :

(T) -> U

上述 函数 类型 前面的 小括号 及其中的内容 , 表示 函数 的参数类型 ,

  • () 表示不传入参数 ;
  • (T) 表示传入 1 个参数 ;

同时 T 类型是 泛型 , 表示任意类型 , 也就是说 该函数的 参数 可以是任意类型的 ;


上述 函数 类型 右箭头 后面的 U , -> U 表示的是 函数 返回值类型 ,

  • (T) -> U 表示 参数 类型为 T , 返回值类型为 U , T 和 U 类型都是任意类型 , 可以是一个类型 , 也可以是不同的类型 ;
  • (T) -> T 函数类型中 , T 可以是任意类型 , 但是如果确定了参数 , 那么返回值必须也是相同的类型 ;

U 类型也是 泛型 , 表示任意类型 , 也就是说 该函数的 参数 可以是任意类型的 ;


3、RDD#map 用法


RDD#map 方法 , 接收一个 函数 作为参数 , 计算时 , 该 函数参数 会被应用于 RDD 数据中的每个元素 ;

下面的 代码 , 传入一个 lambda 匿名函数 , 将 RDD 对象中的元素都乘以 10 ;

# 将 RDD 对象中的元素都乘以 10rdd.map(lambda x: x * 10)  

4、代码示例 - RDD#map 数值计算 ( 传入普通函数 )


在下面的代码中 ,

首先 , 创建了一个包含整数的 RDD ,

# 创建一个包含整数的 RDDrdd = sparkContext.parallelize([1, 2, 3, 4, 5])

然后 , 使用 map() 方法将每个元素乘以 10 ;

# 为每个元素执行的函数def func(element):    return element * 10# 应用 map 操作,将每个元素乘以 10rdd2 = rdd.map(func)

最后 , 打印新的 RDD 中的内容 ;

# 打印新的 RDD 中的内容print(rdd2.collect())

代码示例 :

"""PySpark 数据处理"""# 导入 PySpark 相关包from pyspark import SparkConf, SparkContext# 为 PySpark 配置 Python 解释器import osos.environ['PYSPARK_PYTHON'] = "Y:/002_WorkSpace/PycharmProjects/pythonProject/venv/Scripts/python.exe"# 创建 SparkConf 实例对象 , 该对象用于配置 Spark 任务# setMaster("local[*]") 表示在单机模式下 本机运行# setAppName("hello_spark") 是给 Spark 程序起一个名字sparkConf = SparkConf() \    .setMaster("local[*]") \    .setAppName("hello_spark")# 创建 PySpark 执行环境 入口对象sparkContext = SparkContext(conf=sparkConf)# 打印 PySpark 版本号print("PySpark 版本号 : ", sparkContext.version)# 创建一个包含整数的 RDDrdd = sparkContext.parallelize([1, 2, 3, 4, 5])# 为每个元素执行的函数def func(element):    return element * 10# 应用 map 操作,将每个元素乘以 10rdd2 = rdd.map(func)# 打印新的 RDD 中的内容print(rdd2.collect())# 停止 PySpark 程序sparkContext.stop()

执行结果 :

Y:\002_WorkSpace\PycharmProjects\pythonProject\venv\Scripts\python.exe Y:/002_WorkSpace/PycharmProjects/HelloPython/hello.py23/07/30 21:39:59 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblemsSetting default log level to "WARN".To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).23/07/30 21:39:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicablePySpark 版本号 :  3.4.1[10, 20, 30, 40, 50]Process finished with exit code 0

在这里插入图片描述


5、代码示例 - RDD#map 数值计算 ( 传入 lambda 匿名函数 )


在下面的代码中 ,

首先 , 创建了一个包含整数的 RDD ,

# 创建一个包含整数的 RDDrdd = sparkContext.parallelize([1, 2, 3, 4, 5])

然后 , 使用 map() 方法将每个元素乘以 10 , 这里传入了 lambda 函数作为参数 , 该函数接受一个整数参数 element , 并返回 element * 10 ;

# 应用 map 操作,将每个元素乘以 10rdd2 = rdd.map(lambda element: element * 10)

最后 , 打印新的 RDD 中的内容 ;

# 打印新的 RDD 中的内容print(rdd2.collect())

代码示例 :

"""PySpark 数据处理"""# 导入 PySpark 相关包from pyspark import SparkConf, SparkContext# 为 PySpark 配置 Python 解释器import osos.environ['PYSPARK_PYTHON'] = "Y:/002_WorkSpace/PycharmProjects/pythonProject/venv/Scripts/python.exe"# 创建 SparkConf 实例对象 , 该对象用于配置 Spark 任务# setMaster("local[*]") 表示在单机模式下 本机运行# setAppName("hello_spark") 是给 Spark 程序起一个名字sparkConf = SparkConf() \    .setMaster("local[*]") \    .setAppName("hello_spark")# 创建 PySpark 执行环境 入口对象sparkContext = SparkContext(conf=sparkConf)# 打印 PySpark 版本号print("PySpark 版本号 : ", sparkContext.version)# 创建一个包含整数的 RDDrdd = sparkContext.parallelize([1, 2, 3, 4, 5])# 应用 map 操作,将每个元素乘以 10rdd2 = rdd.map(lambda element: element * 10)# 打印新的 RDD 中的内容print(rdd2.collect())# 停止 PySpark 程序sparkContext.stop()

执行结果 :

Y:\002_WorkSpace\PycharmProjects\pythonProject\venv\Scripts\python.exe Y:/002_WorkSpace/PycharmProjects/HelloPython/hello.py23/07/30 21:46:53 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblemsSetting default log level to "WARN".To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).23/07/30 21:46:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicablePySpark 版本号 :  3.4.1[10, 20, 30, 40, 50]Process finished with exit code 0

在这里插入图片描述


6、代码示例 - RDD#map 数值计算 ( 链式调用 )


在下面的代码中 , 先对 RDD 对象中的每个元素数据都乘以 10 , 然后再对计算后的数据每个元素加上 5 , 最后对最新的计算数据每个元素除以 2 , 整个过程通过函数式编程 , 链式调用完成 ;

核心代码如下 :

# 创建一个包含整数的 RDDrdd = sparkContext.parallelize([1, 2, 3, 4, 5])# 应用 map 操作,将每个元素乘以 10rdd2 = rdd.map(lambda element: element * 10)\    .map(lambda element: element + 5)\    .map(lambda element: element / 2)# 打印新的 RDD 中的内容print(rdd2.collect())

代码示例 :

"""PySpark 数据处理"""# 导入 PySpark 相关包from pyspark import SparkConf, SparkContext# 为 PySpark 配置 Python 解释器import osos.environ['PYSPARK_PYTHON'] = "Y:/002_WorkSpace/PycharmProjects/pythonProject/venv/Scripts/python.exe"# 创建 SparkConf 实例对象 , 该对象用于配置 Spark 任务# setMaster("local[*]") 表示在单机模式下 本机运行# setAppName("hello_spark") 是给 Spark 程序起一个名字sparkConf = SparkConf() \    .setMaster("local[*]") \    .setAppName("hello_spark")# 创建 PySpark 执行环境 入口对象sparkContext = SparkContext(conf=sparkConf)# 打印 PySpark 版本号print("PySpark 版本号 : ", sparkContext.version)# 创建一个包含整数的 RDDrdd = sparkContext.parallelize([1, 2, 3, 4, 5])# 应用 map 操作,将每个元素乘以 10rdd2 = rdd.map(lambda element: element * 10)\    .map(lambda element: element + 5)\    .map(lambda element: element / 2)# 打印新的 RDD 中的内容print(rdd2.collect())# 停止 PySpark 程序sparkContext.stop()

执行结果 :

Y:\002_WorkSpace\PycharmProjects\pythonProject\venv\Scripts\python.exe Y:/002_WorkSpace/PycharmProjects/HelloPython/hello.py23/07/30 21:50:29 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblemsSetting default log level to "WARN".To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).23/07/30 21:50:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicablePySpark 版本号 :  3.4.1[7.5, 12.5, 17.5, 22.5, 27.5]Process finished with exit code 0

在这里插入图片描述

来源地址:https://blog.csdn.net/han1202012/article/details/132011469

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

【Python】PySpark 数据计算 ① ( RDD#map 方法 | RDD#map 语法 | 传入普通函数 | 传入 lambda 匿名函数 | 链式调用 )

下载Word文档到电脑,方便收藏和打印~

下载Word文档

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录