第二章 Spark Core第八节 Spark-Core编程(四)Key-Value类型:17) partitionBy➢ 函数签名def partitionBy(partitioner: Partitioner): RDD[(K, V)]➢ 函数说明将数据按照指定 Partitioner 重新进行分区。Spark 默认的分区器是 HashPartitionerval rdd: RDD[(Int, String)] = sc.makeRDD(Array((1,"aaa"),(2,"bbb"),(3,"ccc")),3)val rdd2: RDD[(Int, String)] = rdd.partitionBy(new HashPartitioner(2))18) groupByKey➢ 函数签名def groupByKey(): RDD[(K, Iterable[V])]def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]➢ 函数说明将数据源的数据根据 key 对 value 进行分组val dataRDD1 = sc.makeRDD(List(("a",1),("b",2),("c",3),("a",4)))val dataRDD2 = dataRDD1.groupByKey()val dataRDD3 = dataRDD1.groupByKey(2)val dataRDD4 = dataRDD1.groupByKey(new HashPartitioner(2))19) reduceByKey➢ 函数签名def reduceByKey(func: (V, V) => V): RDD[(K, V)]def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]➢ 函数说明可以将数据按照相同的 Key 对 Value 进行聚合val dataRDD1 = sc.makeRDD(List(("a",1),("b",2),("c",3),("a",4)))val dataRDD2 = dataRDD1.reduceByKey(_+_)val dataRDD3 = dataRDD1.reduceByKey(_+_, 2)reduceByKey 和 groupByKey 的区别:从 shuffle 的角度:reduceByKey 和 groupByKey 都存在 shuffle 的操作,但是 reduceByKey可以在 shuffle 前对分区内相同 key 的数据进行预聚合(combine)功能,这样会减少落盘的数据量,而 groupByKey 只是进行分组,不存在数据量减少的问题,reduceByKey 性能比较高。从功能的角度:reduceByKey 其实包含分组和聚合的功能。GroupByKey 只能分组,不能聚合,所以在分组聚合的场合下,推荐使用 reduceByKey,如果仅仅是分组而不需要聚合。那么还是只能使用 groupByKey20) aggregateByKey➢ 函数签名def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]➢ 函数说明将数据根据不同的规则进行分区内计算和分区间计算val dataRDD1 = sc.makeRDD(List(("a",1),("b",2),("c",3),("a",4)))val dataRDD2 = dataRDD1.aggregateByKey(0)(_+_,_+_)21) foldByKey➢ 函数签名def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]➢ 函数说明当分区内计算规则和分区间计算规则相同时,aggregateByKey 就可以简化为 foldByKeyval dataRDD1 = sc.makeRDD(List(("a",1),("b",2),("c",3),("a",4)))val dataRDD2 = dataRDD1.foldByKey(0)(_+_)22) combineByKey➢ 函数签名def combineByKey[C]( createCombiner: V => C,//将当前值作为参数进行附加操作并返回 mergeValue: (C, V) => C,// 在分区内部进行,将新元素V合并到第一步操作得到的C中 mergeCombiners: (C, C) => C): RDD[(K, C)]//将第二步操作得到的C进行分区间计算➢ 函数说明最通用的对 key-value 型 rdd 进行聚集操作的聚集函数(aggregation function)。类似于aggregate(),combineByKey()允许用户返回值的类型与输入不一致。示例:现有数据 List(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98)),求每个key的总值及每个key对应键值对的个数val list: List[(String, Int)] = List(("a", 88), ("b", 95), ("a", 91), ("b", 93),("a", 95), ("b", 98))val input: RDD[(String, Int)] = sc.makeRDD(list, 2)val combineRDD: RDD[(String, (Int, Int))] = input.combineByKey( (_, 1), //a=>(a,1) (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1), //acc_1为数据源的value,acc_2为key出现的次数,二者进行分区内部的计算 (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2) //将分区内部计算的结果进行分区间的汇总计算,得到每个key的总值以及每个key出现的次数)reduceByKey、foldByKey、aggregateByKey、combineByKey 的区别:reduceByKey: 相同 key 的第一个数据不进行任何计算,分区内和分区间计算规则相同FoldByKey: 每一个key 对应的数据和初始值进行分区内计算,分区内和分区间计算规则相同AggregateByKey:每一个 key 对应的数据和初始值进行分区内计算,分区内和分区间计算规则可以不相同CombineByKey:当计算时,发现数据结构不满足要求时,可以让第一个数据转换结构。分区内和分区间计算规则不相同。23) sortByKey➢ 函数签名def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length) : RDD[(K, V)]➢ 函数说明在一个(K,V)的 RDD 上调用,K 必须实现 Ordered 接口(特质),返回一个按照 key 进行排序val dataRDD1 = sc.makeRDD(List(("a",1),("b",2),("c",3)))val sortRDD1: RDD[(String, Int)] = dataRDD1.sortByKey(true)val sortRDD2: RDD[(String, Int)] = dataRDD1.sortByKey(false)24) join➢ 函数签名def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]➢ 函数说明在类型为(K,V)和(K,W)的 RDD 上调用,返回一个相同 key 对应的所有元素连接在一起的(K,(V,W))的 RDDval rdd: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"), (2, "b"), (3, "c")))val rdd1: RDD[(Int, Int)] = sc.makeRDD(Array((1, 4), (2, 5), (3, 6)))rdd.join(rdd1).collect().foreach(println)25) leftOuterJoin➢ 函数签名def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]➢ 函数说明类似于 SQL 语句的左外连接val dataRDD1 = sc.makeRDD(List(("a",1),("b",2),("c",4)))val dataRDD2 = sc.makeRDD(List(("a",1),("b",2),("c",3)))val rdd: RDD[(String, (Int, Option[Int]))] = dataRDD1.leftOuterJoin(dataRDD2)26) cogroup➢ 函数签名def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]➢ 函数说明在类型为(K,V)和(K,W)的 RDD 上调用,返回一个(K,(Iterable<V>,Iterable<W>))类型的 RDDval dataRDD1 = sc.makeRDD(List(("a",1),("a",2),("c",3)))val dataRDD2 = sc.makeRDD(List(("a",1),("c",2),("c",3)))val value: RDD[(String, (Iterable[Int], Iterable[Int]))] = dataRDD1.cogroup(dataRDD2)