我的编程空间,编程开发者的网络收藏夹
学习永远不晚

如何在windowns中配置PySpark环境

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

如何在windowns中配置PySpark环境

如何在windowns中配置PySpark环境?相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。

首先需要下载hadoop和spark,解压,然后设置环境变量。
hadoop清华源下载
spark清华源下载

HADOOP_HOME => /path/hadoopSPARK_HOME => /path/spark

安装pyspark。

pip install pyspark

基本使用

可以在shell终端,输入pyspark,有如下回显:

如何在windowns中配置PySpark环境

输入以下指令进行测试,并创建SparkContext,SparkContext是任何spark功能的入口点。

>>> from pyspark import SparkContext>>> sc = SparkContext("local", "First App")

如果以上不会报错,恭喜可以开始使用pyspark编写代码了。
不过,我这里使用IDE来编写代码,首先我们先在终端执行以下代码关闭SparkContext。

>>> sc.stop()

下面使用pycharm编写代码,如果修改了环境变量需要先重启pycharm。
在pycharm运行如下程序,程序会起本地模式的spark计算引擎,通过spark统计abc.txt文件中a和b出现行的数量,文件路径需要自己指定。

from pyspark import SparkContextsc = SparkContext("local", "First App")logFile = "abc.txt"logData = sc.textFile(logFile).cache()numAs = logData.filter(lambda s: 'a' in s).count()numBs = logData.filter(lambda s: 'b' in s).count()print("Line with a:%i,line with b:%i" % (numAs, numBs))

运行结果如下:

20/03/11 16:15:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
20/03/11 16:15:58 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
Line with a:3,line with b:1

这里说一下,同样的工作使用python可以做,spark也可以做,使用spark主要是为了高效的进行分布式计算。
戳pyspark教程
戳spark教程

RDD

RDD代表Resilient Distributed Dataset,它们是在多个节点上运行和操作以在集群上进行并行处理的元素,RDD是spark计算的操作对象。
一般,我们先使用数据创建RDD,然后对RDD进行操作。
对RDD操作有两种方法:
Transformation(转换) - 这些操作应用于RDD以创建新的RDD。例如filter,groupBy和map。
Action(操作) - 这些是应用于RDD的操作,它指示Spark执行计算并将结果发送回驱动程序,例如count,collect等。

创建RDD

parallelize是从列表创建RDD,先看一个例子:

from pyspark import SparkContextsc = SparkContext("local", "count app")words = sc.parallelize(    ["scala",     "java",     "hadoop",     "spark",     "akka",     "spark vs hadoop",     "pyspark",     "pyspark and spark"     ])print(words)

结果中我们得到一个对象,就是我们列表数据的RDD对象,spark之后可以对他进行操作。

ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195

Count

count方法返回RDD中的元素个数。

from pyspark import SparkContextsc = SparkContext("local", "count app")words = sc.parallelize(    ["scala",     "java",     "hadoop",     "spark",     "akka",     "spark vs hadoop",     "pyspark",     "pyspark and spark"     ])print(words)counts = words.count()print("Number of elements in RDD -> %i" % counts)

返回结果:

Number of elements in RDD -> 8

Collect

collect返回RDD中的所有元素。

from pyspark import SparkContextsc = SparkContext("local", "collect app")words = sc.parallelize(    ["scala",     "java",     "hadoop",     "spark",     "akka",     "spark vs hadoop",     "pyspark",     "pyspark and spark"     ])coll = words.collect()print("Elements in RDD -> %s" % coll)

返回结果:

Elements in RDD -> ['scala', 'java', 'hadoop', 'spark', 'akka', 'spark vs hadoop', 'pyspark', 'pyspark and spark']

foreach

每个元素会使用foreach内的函数进行处理,但是不会返回任何对象。
下面的程序中,我们定义的一个累加器accumulator,用于储存在foreach执行过程中的值。

from pyspark import SparkContextsc = SparkContext("local", "ForEach app")accum = sc.accumulator(0)data = [1, 2, 3, 4, 5]rdd = sc.parallelize(data)def increment_counter(x):    print(x)    accum.add(x) return 0s = rdd.foreach(increment_counter)print(s)  # Noneprint("Counter value: ", accum)

返回结果:

None
Counter value:  15

filter

返回一个包含元素的新RDD,满足过滤器的条件。

from pyspark import SparkContextsc = SparkContext("local", "Filter app")words = sc.parallelize(    ["scala",     "java",     "hadoop",     "spark",     "akka",     "spark vs hadoop",     "pyspark",     "pyspark and spark"])words_filter = words.filter(lambda x: 'spark' in x)filtered = words_filter.collect()print("Fitered RDD -> %s" % (filtered)) Fitered RDD -> ['spark', 'spark vs hadoop', 'pyspark', 'pyspark and spark']

也可以改写成这样:

from pyspark import SparkContextsc = SparkContext("local", "Filter app")words = sc.parallelize(    ["scala",     "java",     "hadoop",     "spark",     "akka",     "spark vs hadoop",     "pyspark",     "pyspark and spark"])def g(x):    for i in x:        if "spark" in x:            return iwords_filter = words.filter(g)filtered = words_filter.collect()print("Fitered RDD -> %s" % (filtered))

map

将函数应用于RDD中的每个元素并返回新的RDD。

from pyspark import SparkContextsc = SparkContext("local", "Map app")words = sc.parallelize(    ["scala",     "java",     "hadoop",     "spark",     "akka",     "spark vs hadoop",     "pyspark",     "pyspark and spark"])words_map = words.map(lambda x: (x, 1, "_{}".format(x)))mapping = words_map.collect()print("Key value pair -> %s" % (mapping))

返回结果:

Key value pair -> [('scala', 1, '_scala'), ('java', 1, '_java'), ('hadoop', 1, '_hadoop'), ('spark', 1, '_spark'), ('akka', 1, '_akka'), ('spark vs hadoop', 1, '_spark vs hadoop'), ('pyspark', 1, '_pyspark'), ('pyspark and spark', 1, '_pyspark and spark')]

Reduce

执行指定的可交换和关联二元操作后,然后返回RDD中的元素。

from pyspark import SparkContextfrom operator import addsc = SparkContext("local", "Reduce app")nums = sc.parallelize([1, 2, 3, 4, 5])adding = nums.reduce(add)print("Adding all the elements -> %i" % (adding))

 这里的add是python内置的函数,可以使用ide查看:

def add(a, b):    "Same as a + b."    return a + b

reduce会依次对元素相加,相加后的结果加上其他元素,最后返回结果(RDD中的元素)。

Adding all the elements -> 15

Join

返回RDD,包含两者同时匹配的键,键包含对应的所有元素。

from pyspark import SparkContextsc = SparkContext("local", "Join app")x = sc.parallelize([("spark", 1), ("hadoop", 4), ("python", 4)])y = sc.parallelize([("spark", 2), ("hadoop", 5)])print("x =>", x.collect())print("y =>", y.collect())joined = x.join(y)final = joined.collect()print( "Join RDD -> %s" % (final))

返回结果:

x => [('spark', 1), ('hadoop', 4), ('python', 4)]
y => [('spark', 2), ('hadoop', 5)]
Join RDD -> [('hadoop', (4, 5)), ('spark', (1, 2))]

看完上述内容,你们掌握如何在windowns中配置PySpark环境的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注编程网行业资讯频道,感谢各位的阅读!

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

如何在windowns中配置PySpark环境

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

如何在windowns中配置PySpark环境

如何在windowns中配置PySpark环境?相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。首先需要下载hadoop和spark,解压,然后设置环境变量。hadoop清华源下载
2023-06-15

在VScode中如何配置ROS环境

本篇内容主要讲解“在VScode中如何配置ROS环境”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“在VScode中如何配置ROS环境”吧!准备工作首先在VScode中安装ROS和catkin_t
2023-07-02

如何在Nest.js中配置环境变量

这篇文章主要介绍了如何在Nest.js中配置环境变量,此处给大家介绍的非常详细,对大家的学习或工作具有一定的参考价值,需要的朋友可以参考下:环境变量配置简述程序在不同的环境下需要不同的环境变量,例如生产环境、测试环境以及开发环境所需要不同的
2023-06-06

在Linux中如何配置QT环境变量

这篇文章主要为大家展示了“在Linux中如何配置QT环境变量”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“在Linux中如何配置QT环境变量”这篇文章吧。Qt是一个跨平台的C++图形界面应用程序
2023-06-27

如何在PyCharm中配置使用Anaconda环境

这篇文章主要介绍了如何在PyCharm中配置使用Anaconda环境,图文讲解写的非常详细简单易懂,还不会的小伙伴快来看看吧
2023-05-13

jdk1.7如何在myEclipse环境中进行配置

这期内容当中小编将会给大家带来有关jdk1.7如何在myEclipse环境中进行配置,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。第一步:下载jdk1.7第二步:安装jdk1.7     将下载的压缩包进
2023-05-31

如何在CentOs环境下配置SMTP

如何在CentOs环境下配置SMTP,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。 经常在一些应用场景中,需要能够使用SMTP进行邮件的发送,比如网络爬虫中,
2023-06-06

UBUNTU中如何配置LAMP环境

这篇文章主要介绍“UBUNTU中如何配置LAMP环境”,在日常操作中,相信很多人在UBUNTU中如何配置LAMP环境问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”UBUNTU中如何配置LAMP环境”的疑惑有所
2023-07-04

eclipse中java环境如何配置

要在Eclipse中配置Java环境,您可以按照以下步骤进行操作:1. 下载并安装JDK(Java Development Kit):访问Oracle官方网站(https://www.oracle.com/java/technologies
2023-09-08

IDEA中如何配置Node环境

这篇文章主要介绍了IDEA中如何配置Node环境的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇IDEA中如何配置Node环境文章都会有所收获,下面我们一起来看看吧。一、下载Node.js官网下载链接:Node.
2023-07-05

如何在 VS code中配置一个C++ 环境

本篇文章给大家分享的是有关如何在 VS code中配置一个C++ 环境,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。一、VS code 前言VS code作为一款当下非常受欢迎
2023-06-06

如何配置php环境

如何配置php环境?很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。一.nginx实现php动态解析原理nginx 是一个高性能的http服务器和反向代理服务器。即
2023-06-15

如何配置java环境

1、右键我的电脑--属性,点击高级系统设置---环境变量。在系统变量中,新建一个名为”JAVA_HOME“的系统变量,变量值为jdk的安装路径,例如我的安装在C:Program FilesJavajdk1.7.0_802、找到系统变量Path,在原变量值的后面
如何配置java环境
2014-10-24

Flex1.5环境如何配置

这篇文章主要为大家展示了“Flex1.5环境如何配置”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“Flex1.5环境如何配置”这篇文章吧。Flex1.5环境配置虽然价格高,但无可否认!它是目前同
2023-06-17

Flutter环境如何配置

这篇文章主要介绍了Flutter环境如何配置的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇Flutter环境如何配置文章都会有所收获,下面我们一起来看看吧。当前环境win10as2022.1.1版本jdk11配
2023-07-05

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录