怎么理解spark的自定义分区和排序及spark与jdbc
短信预约 -IT技能 免费直播动态提醒
这篇文章将为大家详细讲解有关怎么理解spark的自定义分区和排序及spark与jdbc,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。
//自定义分区import org.apache.spark.SparkConfimport org.apache.spark.SparkConfimport org.apache.spark.SparkContextimport org.apache.spark.Partitionerobject PrimitivePartitionTest { def main(args: Array[String]): Unit = { val conf = new SparkConf conf.setMaster("local[2]").setAppName("Partitioner") val context = new SparkContext(conf) val rdd = context.parallelize(List(("hgs",2),("wd",44),("cm",99),("zz",100),("xzhh",67)), 2) //实例化类,并设置分区类 val partitioner = new CustomPartitioner(2) val rdd1 = rdd.partitionBy(partitioner) rdd1.saveAsTextFile("c:\\partitioner") context.stop() }}//自定义分区类继承spark的Partitionerclass CustomPartitioner(val partitions:Int ) extends Partitioner{ def numPartitions: Int= this.partitions def getPartition(key: Any): Int={ if(key.toString().length()<=2) 0 else 1 }}
//自定义排序package hgs.spark.othertestimport org.apache.spark.SparkConfimport org.apache.spark.SparkContextimport scala.math.Ordered//自定义排序第一种实现方式,通过继承orderedclass Student(val name:String,var age:Int) extends Ordered[Student] with Serializable{ def compare(that: Student): Int={ return this.age-that.age }}class Boy(val name:String,var age:Int) extends Serializable{ }//第二种方式通过实现隐式转换实现object MyPredef{ implicit def toOrderBoy = new Ordering[Boy]{ def compare(x: Boy, y: Boy): Int={ x.age - y.age } }}//引入隐式转换import MyPredef._object CutstomOrder { def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setMaster("local[2]").setAppName("CutstomOrder") val context = new SparkContext(conf) val rdd = context.parallelize(List(("hgs",2),("wd",44),("cm",99),("zz",100),("xzhh",67)), 2) //下面的第二个参数false为降序排列 //val rdd_sorted = rdd.sortBy(f=>new Student(f._1,f._2), false, 1) val rdd_sorted = rdd.sortBy(f=>new Boy(f._1,f._2), false, 1) rdd_sorted.saveAsTextFile("d:\\ordered") context.stop() } }
//JDBCimport org.apache.spark.SparkConfimport org.apache.spark.SparkContextimport org.apache.spark.rdd.JdbcRDDimport java.sql.Connectionimport java.sql.DriverManagerimport java.sql.ResultSetimport scala.collection.mutable.ListBufferobject DataFromJdbcToSpark { def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setMaster("local[2]").setAppName("BroadCastTest") val context = new SparkContext(conf) val sql = "select name,age from test where id>=? and id <=?" var list = new ListBuffer[(String,Int)]() //第七个参数是一个自定义的函数,spark会调用该函数,完成自定义的逻辑,y的数据类型是ResultSet,该函数不可以想自己定义的数组添加数据, //应为应用的函数会将结果保存在JdbcRDD中 val jdbcRDD = new JdbcRDD(context,getConnection,sql,1,8,2,y=>{ (y.getString(1),y.getInt(2)) }) println(jdbcRDD.collect().toBuffer) context.stop() } def getConnection():Connection={ Class.forName("com.mysql.jdbc.Driver") val conn = DriverManager.getConnection("jdbc:mysql://192.168.6.133:3306/hgs","root","123456"); conn }}//----------------------------------------------------------------------package hgs.spark.othertestimport java.sql.Connectionimport java.sql.DriverManagerimport org.apache.commons.dbutils.QueryRunnerimport org.apache.spark.SparkConfimport org.apache.spark.SparkContext//将spark计算后的结果录入数据库object DataFromSparktoJdbc { def main(args: Array[String]): Unit = { val conf = new SparkConf conf.setMaster("local[2]").setAppName("DataFromSparktoJdbc") val context = new SparkContext(conf) val addressrdd= context.textFile("d:\\words") val words = addressrdd.flatMap(_.split(" ")).map(x=>(x,1)).reduceByKey(_+_) //println(words.partitions.size) var p:Int =0 words.foreachPartition(iter=>{ //每个分区一个链接 val qr = new QueryRunner() val conn = getConnection println(conn) val sql = s"insert into words values(?,?)" //可以修改为批量插入效率更高 while(iter.hasNext){ val tpm = iter.next() val obj1 :Object = tpm._1 val obj2 :Object = new Integer(tpm._2) //obj1+conn.toString()可以看到数据库的插入数据作用有三个不同的链接 qr.update(conn, sql,obj1+conn.toString(),obj2) } //println(conn) //println(p) conn.close() }) words.saveAsTextFile("d:\\wordresult") } def getConnection():Connection={ Class.forName("com.mysql.jdbc.Driver") val conn = DriverManager.getConnection("jdbc:mysql://192.168.6.133:3306/hgs","root","123456"); conn } }
//广播变量package hgs.spark.othertestimport org.apache.spark.SparkConfimport org.apache.spark.SparkContextobject BroadCastTest{ def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setMaster("local[2]").setAppName("BroadCastTest") val context = new SparkContext(conf) val addressrdd= context.textFile("d:\\address") val splitaddrdd = addressrdd.map(x=>{ val cs = x.split(",") (cs(0),cs(1)) }).collect().toMap //广播变量,数据被缓存在每个节点,减少了节点之间的数据传送,可以有效的增加效率,广播出去的可以是任意的数据类型 val maprdd = context.broadcast(splitaddrdd) val namerdd = context.textFile("d:\\name") val result = namerdd.map(x=>{ //该出使用了广播的出去的数组 maprdd.value.getOrElse(x, "UnKnown") }) println(result.collect().toBuffer) context.stop() }}
其他一些知识点1.spark 广播变量 rdd.brodcastz(rdd),广播变量的用处是将数据汇聚传输到各个excutor上面,这样在做数据处理的时候减少了数据的传输2.wordcount程序context.textFile(args(0),1).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_) wordcount程序代码,一个wordcount会产生5个RDDsc.textFile() 会产生两个RDD 1.HadoopRDD-> MapPartitionsRDD flatMap() 会产生MapPartitionsRDD map 会产生MapPartitionsRDD reduceByKey 产生ShuuledRDD saveAsTextFile 3.缓存数据到内存 rdd.cache 清理缓存 rdd.unpersist(true),rdd.persist存储及级别 cache方法调用的是persist方法4.spark 远程debug,需要设置sparkcontext.setMaster("spark://xx.xx.xx.xx:7077").setJar("d:/jars/xx.jar")
关于怎么理解spark的自定义分区和排序及spark与jdbc就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。
免责声明:
① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。
② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341