我的编程空间,编程开发者的网络收藏夹
学习永远不晚

RDD的详解、创建及其操作

短信预约 信息系统项目管理师 报名、考试、查分时间动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

RDD的详解、创建及其操作

RDD的详解、创建及其操作

RDD的详解


RDD:弹性分布式数据集,是Spark中最基本的数据抽象,用来表示分布式集合,支持分布式操作!

RDD的创建

RDD中的数据可以来源于2个地方:本地集合或外部数据源

RDD操作

分类

转换算子

Map

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo03Map {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf()
    conf.setAppName("Demo03Map").setMaster("local")
    val sc: SparkContext = new SparkContext(conf)
    //读取文件数据
    val linesRDD: RDD[String] = sc.textFile("spark/data/words.txt")
    //对数据进行扁平化处理
    val flatRDD: RDD[String] = linesRDD.flatMap(_.split(","))


    //按照单词分组
    val groupRDD: RDD[(String, Iterable[String])] = flatRDD.groupBy(w => w)
    //聚合
    val wordsRDD: RDD[String] = groupRDD.map(kv => {
      val key: String = kv._1
      val words: Iterable[String] = kv._2
      key + "," + words.size
    })


    //分组+聚合
    val mapRDD1: RDD[(String, Int)] = flatRDD.map((_, 1))
    val words1: RDD[(String, Int)] = mapRDD1.reduceByKey(_ + _)

    ////分组+聚合
    val mapRDD2: RDD[(String, Int)] = flatRDD.map((_, 1))
    val words2: RDD[(String, Iterable[Int])] = mapRDD2.groupByKey()
    val wordSum: RDD[(String, Int)] = words2.mapValues(_.size)
    wordSum.foreach(println)

    //输出
    wordsRDD.foreach(println)
    words1.foreach(println)
  }
}

flatMap(数据扁平化处理)

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo04FlatMap {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("Demo04FlatMap").setMaster("local")
    val sc: SparkContext = new SparkContext(conf)

    val linesRDD: RDD[String] = sc.parallelize(List("java,scala,python", "map,java,scala"))
    //扁平化处理
    val flatRDD: RDD[String] = linesRDD.flatMap(_.split(","))
    flatRDD.foreach(println)
  }
}

Mappartitions

map和mapPartitions区别

1)map:每次处理一条数据
2)mapPartitions:每次处理一个分区数据

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo05MapPartition {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("Demo05MapPartition").setMaster("local")
    val sc: SparkContext = new SparkContext(conf)
    val stuRDD: RDD[String] = sc.textFile("spark/data/words.txt",3)
    stuRDD.mapPartitions(rdd => {
      println("map partition")
      // 按分区去处理数据
      rdd.map(line => line.split(",")(1))
    }).foreach(println)
  }
}

fliter 过滤

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo06Filter {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("Demo05MapPartition").setMaster("local")
    val sc: SparkContext = new SparkContext(conf)
    val linesRDD: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5))
    //过滤,转换算子
    linesRDD.filter(kv => {
      kv % 2 == 1
    }).foreach(println)
  }
}

sample 取样

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object Demo07Sample {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("Demo05MapPartition").setMaster("local")
    val sc: SparkContext = new SparkContext(conf)
    
    val stuRDD: RDD[String] = sc.textFile("spark/data/students.txt",3)
    stuRDD.sample(withReplacement = true,0.1).foreach(println)
  }
}

union 将相同结结构的数据连接到一起

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo08Union {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("Demo05MapPartition").setMaster("local")
    val sc: SparkContext = new SparkContext(conf)

    
    val lineRDD1: RDD[String] = sc.parallelize(List("java,scala", "data,python"))
    val lineRDD2: RDD[String] = sc.parallelize(List("spark,scala", "java,python"))
    println(lineRDD1.getNumPartitions)
    val unionRDD: RDD[String] = lineRDD1.union(lineRDD2)
    println(unionRDD.getNumPartitions)
    unionRDD.foreach(println)
  }
}

mappatitionWIthindex

    //mapPartitionsWithIndex也是一个转换算子
    // 会在处理每一个分区的时候获得一个index
    //可以选择的执行的分区
    stuRDD.mapPartitionsWithIndex((index, rdd) => {
      println("当前遍历的分区:" + index)
      // 按分区去处理数据
      rdd.map(line => line.split(",")(1))
    }).foreach(println)

join 将数据按照相同key进行关联(数据必须是(K,V))

import java.io

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo09Join {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("Demo05MapPartition").setMaster("local")
    val sc: SparkContext = new SparkContext(conf)
    // 构建K-V格式的RDD
    val tuple2RDD1: RDD[(String, String)] = sc.parallelize(List(("001", "张三"), "002" -> "小红", "003" -> "小明"))
    val tuple2RDD2: RDD[(String, Int)] = sc.parallelize(List(("001", 20), "002" -> 22, "003" -> 21))
    val tuple2RDD3: RDD[(String, String)] = sc.parallelize(List(("001", "男"), "002" -> "女"))
    //将文件进行join
    val joinRDD: RDD[(String, (String, Int))] = tuple2RDD1.join(tuple2RDD2)
    joinRDD.map(kv => {
      val i: String = kv._1
      val j: String = kv._2._1
      val k: Int = kv._2._2
      i + "," + j + "," + k
    }).foreach(println)

    //第二种方式
    joinRDD.map {
      case (id: String, (name: String, age: Int)) => id + "*" + name + "*" + age
    }.foreach(println)

    val leftJoinRDD: RDD[(String, (String, Option[String]))] = tuple2RDD1.leftOuterJoin(tuple2RDD3)
    leftJoinRDD.map {
          //存在关联
      case (id: String, (name: String, Some(gender))) => 
        id + "*" + name + "*" + gender
        //不存在关联
      case (id: String, (name: String, None)) =>
        id + "*" + name + "*" + "_"
    }
  }
}

groupByKey 将kv格式的数据进行key的聚合

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo10GroupByKey {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("Demo10GroupByKey").setMaster("local")
    val sc: SparkContext = new SparkContext(conf)
    

    // 统计班级人数
    val linesRDD: RDD[String] = sc.textFile("spark/data/students.txt")
    linesRDD.groupBy(word => word.split(",")(4))
      .map(kv => {
        val key = kv._1
        val wordsCnt = kv._2.size
        key + "," + wordsCnt
      }).foreach(println)

    val linesMap: RDD[(String, String)] = linesRDD.map(lines => (lines.split(",")(4), lines))
    //按照key进行分组
    linesMap.groupByKey()
      .map(lines=>{
        val key = lines._1
        val wordsCnt: Int = lines._2.size
        key+","+wordsCnt
      }).foreach(println)

  }
}

ReduceByKey
reduceByKey 需要接收一个聚合函数
首先会对数据按key分组 然后在组内进行聚合(一般是加和,也可以是Max、Min之类的操作)
相当于 MR 中的combiner
可以在Map端进行预聚合,减少shuffle过程需要传输的数据量,以此提高效率
相对于groupByKey来说,效率更高,但功能更弱
幂等操作
y = f(x) = f(y) = f(f(x))
reducebyKey与groupbykey的区别
reduceByKey:具有预聚合操作
groupByKey:没有预聚合
在不影响业务逻辑的前提下,优先采用reduceByKey。

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo11ReduceByKey {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("Demo11ReduceByKey").setMaster("local")
    val sc: SparkContext = new SparkContext(conf)
    val linesRDD: RDD[String] = sc.textFile("spark/data/students.txt")
    //统计班级人数
    linesRDD.map(lines => (lines.split(",")(4), lines))
      .groupByKey()
      .map(kv => {
        val key = kv._1
        val cnt = kv._2.size
        key + "" + cnt
      }).foreach(println)


    //ReduceByKey
    
    linesRDD.map(lines=>(lines.split(",")(4),1))
      .reduceByKey(_+_)
      .foreach(println)
  }
}

sort 排序,默认升序

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object Demo12Sort {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("Demo12Sort").setMaster("local")
    val sc: SparkContext = new SparkContext(conf)
    val linesRDD: RDD[String] = sc.textFile("spark/data/students.txt")

    
    linesRDD.sortBy(lines => lines.split(",")(2), ascending = false) //按照年纪降序
      .take(10) //转换算子打印十行
      .foreach(println)

    val mapRDD: RDD[(String, String)] = linesRDD.map(l => (l.split(",")(2), l))
    mapRDD.sortByKey(ascending = false)
      .take(10)
      .foreach(println)
  }
}

Mapvalue

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo13MapValue {
  def main(args: Array[String]): Unit = {
    
    val conf: SparkConf = new SparkConf().setAppName("Demo13MapValue").setMaster("local")
    val sc: SparkContext = new SparkContext(conf)
    val linesRDD: RDD[(String, Int)] = sc.parallelize(List(("zs", 10), ("zzw", 34), ("lm", 18)))
    linesRDD.mapValues(lines=>lines*2)
      .foreach(println)
  }

行为算子

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo14Action {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("****").setMaster("local")
    val sc: SparkContext = new SparkContext(conf)
    val linesRDD: RDD[String] = sc.textFile("spark/data/students.txt")
    linesRDD.take(10)// take 取出前n条数据 相当于limit
      .foreach(println)  //这里的foreach不是行为算子,是take里面的方法

    // count
    // 返回RDD的数据量的多少
    println(linesRDD.count())

    // collect
    // 将RDD转换为Scala中的Array
    // 注意数据量的大小 容易OOM
    val collectRDD: Array[String] = linesRDD.collect()
    collectRDD.take(10)
      .foreach(println)

    // reduce 全局聚合
    // select sum(age) from student group by 1
    val i = linesRDD.map(lines => lines.split(",")(2).toInt)
      .reduce(_ + _)
    println(i)

    //save
    linesRDD.sample(withReplacement = false,0.2)
      .saveAsTextFile("spark/data/save")

  }
}

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

RDD的详解、创建及其操作

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

RDD的详解、创建及其操作

RDD的详解RDD:弹性分布式数据集,是Spark中最基本的数据抽象,用来表示分布式集合,支持分布式操作!RDD的创建RDD中的数据可以来源于2个地方:本地集合或外部数据源RDD操作分类转换算子Mapimport org.apache.spark.rdd.RD
RDD的详解、创建及其操作
2020-12-27

oracle临时表空间的作用与创建及相关操作详解

目录1.1 临时表空间作用1.2 临时表空间和临时表空间组1.3 临时表空间操作(1) 查看表空间(2) 查看表空间详细信息(3) 查看除临时表空间外 表空编程间对应的数据文件(4) 查看临时表空间对应的数据文件(5) 查看临时表空间组信息
2022-07-25

SQL Server数据库创建表及其约束条件的操作方法

这篇文章主要介绍了SQL Server 创建表及其约束条件,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
2022-11-16

GO文件创建及读写操作的方法

本文小编为大家详细介绍“GO文件创建及读写操作的方法”,内容详细,步骤清晰,细节处理妥当,希望这篇“GO文件创建及读写操作的方法”文章能帮助大家解决疑惑,下面跟着小编的思路慢慢深入,一起来学习新知识吧。三种文件操作比较ioutilbufio
2023-06-30

详解Mysql 游标的用法及其作用

[mysql游标的用法及作用] 例子: 当前有三张表A、B、C其中A和B是一对多关系,B和C是一对多关系,现在需要将B中A表的主键存到C中; 常规思路就是将B中查询出来然后通过一个update语句来更新C表就可以了,但是B表中有2000多条
2022-05-24

Shell中数组以及其相关操作的详细实例

Shell中数据类型不多,比如说字符串,数字类型,数组。数组是其中比较重要的一种,其重要应用场景,可以求数组长度,元素长度,遍历其元素,元素切片,替换,删除等操作,使用非常方便。 Shell中的数组不像java/C,只能是一维数组,没有二维
2022-06-04

Kotlin协程操作之创建启动挂起恢复详解

本文的定位是协程的创建、启动、挂起、恢复,也会示例一些简单的使用,这里不对suspend讲解,,也不对协程的高级用法做阐述(热数据通道Channel、冷数据流Flow...),本文主要讲协程稍微深入的全面知识
2022-11-13

详解Java如何利用位操作符创建位掩码

在本文中,我们来看看如何使用位操作符实现低级别的位掩码。我们将看到我们如何将一个单一的int变量作为一个单独的数据容器,感兴趣的可以跟随小编一起学习一下
2022-11-13

编程热搜

目录