我的编程空间,编程开发者的网络收藏夹
学习永远不晚

怎么理解spark的自定义分区和排序及spark与jdbc

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

怎么理解spark的自定义分区和排序及spark与jdbc

这篇文章将为大家详细讲解有关怎么理解spark的自定义分区和排序及spark与jdbc,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。

//自定义分区import org.apache.spark.SparkConfimport org.apache.spark.SparkConfimport org.apache.spark.SparkContextimport org.apache.spark.Partitionerobject PrimitivePartitionTest {  def main(args: Array[String]): Unit = {    val conf = new SparkConf    conf.setMaster("local[2]").setAppName("Partitioner")    val context = new SparkContext(conf)    val rdd = context.parallelize(List(("hgs",2),("wd",44),("cm",99),("zz",100),("xzhh",67)), 2)    //实例化类,并设置分区类    val partitioner = new CustomPartitioner(2)    val rdd1 = rdd.partitionBy(partitioner)    rdd1.saveAsTextFile("c:\\partitioner")    context.stop()      }}//自定义分区类继承spark的Partitionerclass CustomPartitioner(val partitions:Int ) extends Partitioner{         def numPartitions: Int= this.partitions       def getPartition(key: Any): Int={      if(key.toString().length()<=2)        0      else        1          }}
//自定义排序package hgs.spark.othertestimport org.apache.spark.SparkConfimport org.apache.spark.SparkContextimport scala.math.Ordered//自定义排序第一种实现方式,通过继承orderedclass Student(val name:String,var age:Int) extends Ordered[Student] with Serializable{  def compare(that: Student): Int={    return this.age-that.age  }}class Boy(val name:String,var age:Int) extends  Serializable{  }//第二种方式通过实现隐式转换实现object MyPredef{  implicit def toOrderBoy = new Ordering[Boy]{   def compare(x: Boy, y: Boy): Int={     x.age - y.age   }  }}//引入隐式转换import MyPredef._object CutstomOrder {   def main(args: Array[String]): Unit = {     val conf = new SparkConf()     conf.setMaster("local[2]").setAppName("CutstomOrder")     val context = new SparkContext(conf)     val rdd = context.parallelize(List(("hgs",2),("wd",44),("cm",99),("zz",100),("xzhh",67)), 2)     //下面的第二个参数false为降序排列     //val rdd_sorted = rdd.sortBy(f=>new Student(f._1,f._2), false, 1)     val rdd_sorted = rdd.sortBy(f=>new Boy(f._1,f._2), false, 1)     rdd_sorted.saveAsTextFile("d:\\ordered")     context.stop()   } }
//JDBCimport org.apache.spark.SparkConfimport org.apache.spark.SparkContextimport org.apache.spark.rdd.JdbcRDDimport java.sql.Connectionimport java.sql.DriverManagerimport java.sql.ResultSetimport scala.collection.mutable.ListBufferobject DataFromJdbcToSpark {  def main(args: Array[String]): Unit = {     val conf = new SparkConf()    conf.setMaster("local[2]").setAppName("BroadCastTest")    val context = new SparkContext(conf)    val sql = "select name,age from test where id>=? and id <=?"    var list = new ListBuffer[(String,Int)]()    //第七个参数是一个自定义的函数,spark会调用该函数,完成自定义的逻辑,y的数据类型是ResultSet,该函数不可以想自己定义的数组添加数据,    //应为应用的函数会将结果保存在JdbcRDD中    val jdbcRDD = new JdbcRDD(context,getConnection,sql,1,8,2,y=>{    (y.getString(1),y.getInt(2))           })          println(jdbcRDD.collect().toBuffer)     context.stop()      }      def getConnection():Connection={    Class.forName("com.mysql.jdbc.Driver")    val  conn = DriverManager.getConnection("jdbc:mysql://192.168.6.133:3306/hgs","root","123456");    conn  }}//----------------------------------------------------------------------package hgs.spark.othertestimport java.sql.Connectionimport java.sql.DriverManagerimport org.apache.commons.dbutils.QueryRunnerimport org.apache.spark.SparkConfimport org.apache.spark.SparkContext//将spark计算后的结果录入数据库object DataFromSparktoJdbc {    def main(args: Array[String]): Unit = {        val conf = new SparkConf    conf.setMaster("local[2]").setAppName("DataFromSparktoJdbc")    val context = new SparkContext(conf)    val addressrdd= context.textFile("d:\\words")    val words = addressrdd.flatMap(_.split(" ")).map(x=>(x,1)).reduceByKey(_+_)    //println(words.partitions.size)    var p:Int =0    words.foreachPartition(iter=>{      //每个分区一个链接      val qr = new QueryRunner()      val conn = getConnection      println(conn)      val sql = s"insert into words values(?,?)"      //可以修改为批量插入效率更高      while(iter.hasNext){        val tpm = iter.next()          val obj1 :Object = tpm._1        val obj2 :Object = new Integer(tpm._2)        //obj1+conn.toString()可以看到数据库的插入数据作用有三个不同的链接        qr.update(conn, sql,obj1+conn.toString(),obj2)      }      //println(conn)      //println(p)      conn.close()          })    words.saveAsTextFile("d:\\wordresult")  }  def getConnection():Connection={    Class.forName("com.mysql.jdbc.Driver")    val  conn = DriverManager.getConnection("jdbc:mysql://192.168.6.133:3306/hgs","root","123456");    conn  }  }
//广播变量package hgs.spark.othertestimport org.apache.spark.SparkConfimport org.apache.spark.SparkContextobject BroadCastTest{  def main(args: Array[String]): Unit = {    val conf = new SparkConf()    conf.setMaster("local[2]").setAppName("BroadCastTest")    val context = new SparkContext(conf)    val addressrdd= context.textFile("d:\\address")    val splitaddrdd =     addressrdd.map(x=>{            val cs = x.split(",")      (cs(0),cs(1))    }).collect().toMap    //广播变量,数据被缓存在每个节点,减少了节点之间的数据传送,可以有效的增加效率,广播出去的可以是任意的数据类型    val maprdd = context.broadcast(splitaddrdd)    val namerdd = context.textFile("d:\\name")        val result = namerdd.map(x=>{      //该出使用了广播的出去的数组      maprdd.value.getOrElse(x, "UnKnown")          })    println(result.collect().toBuffer)    context.stop()  }}
其他一些知识点1.spark 广播变量 rdd.brodcastz(rdd),广播变量的用处是将数据汇聚传输到各个excutor上面,这样在做数据处理的时候减少了数据的传输2.wordcount程序context.textFile(args(0),1).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_) wordcount程序代码,一个wordcount会产生5个RDDsc.textFile() 会产生两个RDD 1.HadoopRDD-> MapPartitionsRDD   flatMap() 会产生MapPartitionsRDD   map 会产生MapPartitionsRDD   reduceByKey 产生ShuuledRDD   saveAsTextFile   3.缓存数据到内存 rdd.cache   清理缓存 rdd.unpersist(true),rdd.persist存储及级别 cache方法调用的是persist方法4.spark 远程debug,需要设置sparkcontext.setMaster("spark://xx.xx.xx.xx:7077").setJar("d:/jars/xx.jar")

关于怎么理解spark的自定义分区和排序及spark与jdbc就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

怎么理解spark的自定义分区和排序及spark与jdbc

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

怎么理解spark的自定义分区和排序及spark与jdbc

这篇文章将为大家详细讲解有关怎么理解spark的自定义分区和排序及spark与jdbc,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。//自定义分区import org.apache.spar
2023-06-02

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录