目录
- 算子
- 转换算子示例
- map
- flatMap
- ReduceByKey
- filter
- distinct
- glom
- groupBy
- groupByKey
- SortBy
- sortByKey
- union
- 交集intersection和差集subtract
- join
- partitionBy
- mapPartition
- sample
- 行动算子示例
- ForeachPartition
- Foreach
- SaveAsTextFile
- CountByKey
- Reduce
- fold
- first、take、count
- top、takeOrderd
- takeSample
算子
描述
分布式集合RDD对象的方法被称为算子
类别 | 定义 | 特点 | 作用 |
---|---|---|---|
转换算子 | 返回值仍然是一个 RDD 的算子 | 懒加载,遇到 Action 算子前不执行 | 构建执行计划 |
(例如:map , filter , flatMap , reduceByKey ) | - 不会立即触发计算 - 形成 RDD 的 lineage | - 描述数据如何从一个 RDD 转换为另一个 RDD - 延迟计算 | |
行动算子 | 返回值不是 RDD 的算子 | 立即执行 | 让由转换算子构建好的执行计划开启工作 |
(例如:collect , count , reduce , take ) | - 触发计算 - 执行并返回结果到驱动程序或外部 | - 执行计算并返回结果 - 触发整个 RDD lineage 的计算 |
转换算子是对RDD进行一些格式等方面的操作,使得RDD满足行动算子的要求,行动算子是对RDD的计算操作
转换算子示例
map
将RDD中的数据一条一条处理,处理逻辑基于map算子接收到的函数,返回值为处理后的RDD
package com.wunaiieq.RDD
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object TransformationMap {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("map")val sparkContext = new SparkContext(conf)//1.创建rdd对象val rdd: RDD[Int] = sparkContext.parallelize(List(1, 2, 3, 4))//2.定义函数,作为map算子的传入函数def funcA(x:Int):Int={if((x%2!=0)){return x*2}else{return x}}//3.map中使用funcAval rdd1: RDD[Int] = rdd.map(funcA)println(rdd1.collect().mkString(","))//4.匿名函数的用法def funcB(x:Int): Int ={return x*2}val rdd2: RDD[Int] = rdd.map(funcB)println(rdd2.collect().mkString(","))//关闭scsparkContext.stop()}
}
flatMap
将集合中的每一个元素按照空格进行拆分,拆分后的内容逐一作为一个元素组成一个新的RDD
package com.wunaiieq.RDD//1.导入spark下的SparkConf, SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object TransformationFlatMap {def main(args: Array[String]): Unit = {//2.构建SparkConf对象,并设置本地运行和程序的名称val conf = new SparkConf().setMaster("local[*]").setAppName("flagMap")//3.通过SparkConf对象构建SparkContext对象val sparkContext = new SparkContext(conf)//4.创建一个RDD对象val rdd1: RDD[String] = sparkContext.parallelize(List("a b c", "a a d", "b c d"))//5.将集合中的每一个元素按照空格进行拆分,拆分后的内容逐一作为一个元素组成一个新的RDDval rdd2: RDD[String] = rdd1.flatMap(line=>line.split(" "))print(rdd2.collect().mkString(","))sparkContext.stop()}
}
ReduceByKey
仅适用于Key-Value类型的RDD,自动按照key进行分组,然后根据提供的函数func的逻辑进行聚合操作,完成组内所有value的聚合操作。
package com.wunaiieq.RDDimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object RddReduceByKey {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("reduceByKey")val sparkContext = new SparkContext(conf)//1.创建一个Key-Value类型的RDD对象val rdd: RDD[(String, Int)] = sparkContext.parallelize(List(("a", 1), ("a", 2), ("b", 1), ("a", 3), ("b", 1), ("a", 4)))//2.将相同的key的value做聚合加操作val rdd1: RDD[(String, Int)] = rdd.reduceByKey((x, y) => x + y)println(rdd1.collect().mkString(","))sparkContext.stop()}
}
filter
设置过滤函数,将RDD种符合条件的元素抽取并组成新的RDD(1234,抽取偶数,结果为24)
package com.wunaiieq.RDD//1.导入类
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object RddFilter {def main(args: Array[String]): Unit = {//2.构建SparkConf对象,并设置本地运行和程序名称val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("filter")//3.使用conf对象构建SparkContext对象val sparkContext = new SparkContext(conf)//5.创建Rddval rdd: RDD[Int] = sparkContext.parallelize(List(1, 2, 3, 4, 5, 8))//6.调用fiter算子,过滤掉奇数(x%2==0 false)//只保留偶数(x%2==0 true)val resultRdd: RDD[Int] = rdd.filter(x => x % 2 == 0)//7.输出println(resultRdd.collect().mkString(","))//4.关闭sparkContext对象sparkContext.stop()}
}
distinct
过滤重复值,重复值保留一个
package com.wunaiieq.RDD
//1.导入类
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object RddDistinct {def main(args: Array[String]): Unit = {//2.构建SparkConf对象,并设置本地运行和程序名称val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("filter")//3.使用conf对象构建SparkContet对象val sparkContext = new SparkContext(conf)//5.创建Rddval rdd: RDD[Int] = sparkContext.parallelize(List(1, 2, 3, 2, 5, 2))//6.去重后输出 : 1,2,3,5println(rdd.distinct().collect().mkString(","))//7.key-value型的rdd,如何认定为重复元素?只有key和value的值都相同时认定为重复元素//结果(y,1),(x,1),(x,2)val rdd1: RDD[(Char, Int)] = sparkContext.parallelize(List(('x', 1), ('x', 2), ('x', 1), ('y', 1)))println(rdd1.distinct().collect().mkString(","))//4.关闭sc对象sparkContext.stop()}
}
glom
给RDD增加一个关于分区的嵌套,显示分区效果,对于结果没有太多影响
package com.wunaiieq.RDD//1.导入类
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object RddGlom {def main(args: Array[String]): Unit = {//2.构建SparkConf对象,并设置本地运行和程序名称val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("glom")//3.使用conf对象构建SparkContet对象val sparkContext = new SparkContext(conf)//5.创建Rddval rdd1: RDD[Int] = sparkContext.parallelize(Array(1, 2, 3, 4, 5, 6), 2)//6.rdd1.glom()将rdd的数据加上一个嵌套,这个嵌套是按照分区进行的// rdd1: RDD[Int] 和 rdd2: RDD[Array[Int]] 类型不同val rdd2: RDD[Array[Int]] = rdd1.glom()//7.rddc1: Array[Int]和rddc2: Array[Array[Int]]类型也不一样val rddc1: Array[Int] = rdd1.collect()val rddc2: Array[Array[Int]] = rdd2.collect()println(rddc1.mkString(","))//println(rddc2.mkString(","))rddc2.foreach(arr=>println(arr.mkString(",")))//4.关闭sparkContext对象sparkContext.stop()}
}
groupBy
将输入的RDD进行分组,传入函数为分组要求,比如根据列表种第一个元素,使得所有相同的元素为一组
package com.wunaiieq.RDD//1.导入类
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object RddGroupBy {def main(args: Array[String]): Unit = {//2.构建SparkConf对象,并设置本地运行和程序名称val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("groupBy")//3.使用conf对象构建SparkContet对象val sparkContext = new SparkContext(conf)//5.创建Rddval rdd: RDD[(Char, Int)] = sparkContext.parallelize(Array(('a', 1), ('c', 1), ('a', 2), ('b', 1), ('b', 2), ('a', 3), ('a', 4)))//6.通过groupBy算子对rdd对象中的数据进行分组//groupBy插入的函数的用意是指定按照谁进行分组val gbRdd: RDD[(Char, Iterable[(Char, Int)])] = rdd.groupBy(tupEle => tupEle._1)//收集到Driver端val result1: Array[(Char, Iterable[(Char, Int)])] = gbRdd.collect()//(a,CompactBuffer((a,1), (a,2), (a,3), (a,4))),(b,CompactBuffer((b,1), (b,2)))println(result1.mkString(","))//7.使用map转换算子//(a,List((a,1), (a,2), (a,3), (a,4))),(b,List((b,1), (b,2)))val result2: Array[(Char, List[(Char, Int)])] = gbRdd.map(tup => (tup._1, tup._2.toList)).collect()println(result2.mkString(","))//4.关闭sparkContext对象sparkContext.stop()}
}
groupByKey
针对kv型的RDD进行分组
package com.wunaiieq.RDD//1.导入类
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object RddGroupByKey {def main(args: Array[String]): Unit = {//2.构建SparkConf对象,并设置本地运行和程序名称val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("groupByKey")//3.使用conf对象构建SparkContet对象val sparkContext = new SparkContext(conf)//5.创建Key-Value型Rddval rdd: RDD[(Char, Int)] = sparkContext.parallelize(Array(('a', 1), ('c', 1),('c', 3),('a', 2), ('b', 1), ('b', 2), ('a', 3), ('a', 4)))//6.按照key进行分组,分组后的结果是有二元组组成的RDDval gbkRdd: RDD[(Char, Iterable[Int])] = rdd.groupByKey()val result1: Array[(Char, Iterable[Int])] = gbkRdd.collect()//(a,CompactBuffer(1, 2, 3, 4)),(b,CompactBuffer(1, 2))println(result1.mkString(","))//7.使用map转换算子,对value数据转为Listval result2: Array[(Char, List[Int])] = gbkRdd.map(tup => (tup._1, tup._2.toList)).collect()//(a,List(1, 2, 3, 4)),(b,List(1, 2))println(result2.mkString(","))//4.关闭sparkContext对象sparkContext.stop()}
}
SortBy
排序算子,对rdd按照元祖的第n个值进行排序
package com.wunaiieq.RDD//1.导入类
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object RddSortBy {def main(args: Array[String]): Unit = {//2.构建SparkConf对象,并设置本地运行和程序名称val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("groupByKey")//3.使用conf对象构建SparkContet对象val sparkContext = new SparkContext(conf)//5.创建Key-Value型Rddval rdd: RDD[(Char, Int)] = sparkContext.parallelize(List(('w', 2), ('h', 5), ('k', 9), ('m', 3),('a', 7),('p', 4), ('q', 1), ('n', 8), ('y', 6)))//6.使用sortBy对rdd按照元祖的第二个值进行排序/*f: (T) => K,指定按照第几个元素记进行排序ascending: Boolean = true,true表示升序,false表示降序,默认就是true升序numPartitions: Int = this.partitions.length排序的分区数,默认为rdd的分区数*/val result1: Array[Array[(Char, Int)]] = rdd.sortBy(tup => tup._2,ascending = true, numPartitions = 3).glom().collect()result1.foreach(arr=>println(arr.mkString(",")))//7.全局有序,排序后的分区数设置为1val result2: Array[(Char, Int)] = rdd.sortBy(tup => tup._2,ascending = true, numPartitions = 1).collect()println(result2.mkString(","))//8按照元祖的第一个元素进行降序排序val result3: Array[(Char, Int)] = rdd.sortBy(tup => tup._1,ascending = false, numPartitions = 1).collect()println(result3.mkString(","))//4.关闭sparkContext对象sparkContext.stop()}
}
sortByKey
默认按照key的升序进行排序,生成的RDD对象分区数和原RDD相同
package com.wunaiieq.RDD//1.导入类
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object RddSortByKey {def main(args: Array[String]): Unit = {//2.构建SparkConf对象,并设置本地运行和程序名称val conf: SparkConf = new SparkConf().setMaster("local[1]").setAppName("sortByKey")//3.使用conf对象构建SparkContet对象val sparkContext = new SparkContext(conf)//5.创建Key-Value型Rddval rdd: RDD[(Char, Int)] = sparkContext.parallelize(List(('w', 2), ('h', 5), ('k', 9), ('m', 3),('a', 7),('p', 4), ('q', 1), ('n', 8), ('y', 6)))println("rdd partition num:"+rdd.getNumPartitions)//6.默认按照key的升序排列,生成的RDD对象分区数与原RDD相同。val rdd1: RDD[(Char, Int)] = rdd.sortByKey()println("rdd1 partition num:"+rdd1.getNumPartitions)val result1: Array[(Char, Int)] = rdd1.collect()println(result1.mkString(","))//7.按照key的降序排列,默认生成的RDD对象分区数与原RDD相同。val rdd2: RDD[(Char, Int)] = rdd.sortByKey(ascending = false)println("rdd2 partition num:"+rdd2.getNumPartitions)val result2: Array[(Char, Int)] = rdd2.collect()println(result2.mkString(","))//8.按照key的降序排列,设定生成的RDD对象分区数为4。val rdd3: RDD[(Char, Int)] = rdd.sortByKey(ascending = false, numPartitions = 4)println("rdd3 partition num:"+rdd3.getNumPartitions)val result3: Array[Array[(Char, Int)]] = rdd3.glom().collect()result3.foreach(arr=>println(arr.mkString(",")))//9.按照key的升序排序,生成的RDD的分区数设置为1,就实现了全局有序val result4: Array[(Char, Int)] = rdd.sortByKey(ascending = true, numPartitions = 1).collect()println(result4.mkString(","))//4.关闭sc对象sparkContext.stop()}
}
union
将两个同类型RDD进行合并,合并后的顺序为调用union算子的rdd在前,作为参数的rdd在后,rdd内部顺序不变
package com.wunaiieq.RDD//1.导入类
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object RddUnion {def main(args: Array[String]): Unit = {//2.构建SparkConf对象,并设置本地运行和程序名称val conf: SparkConf = new SparkConf().setMaster("local[1]").setAppName("union")//3.使用conf对象构建SparkContet对象val sparkContext = new SparkContext(conf)//5.创建3个RDD对象val rdd1: RDD[Int] = sparkContext.parallelize(Array(1, 2, 3, 4))val rdd2: RDD[Int] = sparkContext.parallelize(Array(3, 4, 5, 6))val rdd3: RDD[String] = sparkContext.parallelize(Array("x", "y"))//6.union算子将两个rdd对象元素做并集计算,并生成一个新的rdd对象val result21: RDD[Int] = rdd2.union(rdd1)val result12: RDD[Int] = rdd1.union(rdd2)//7.union算子不会去重,无方向性println(result21.collect().mkString(","))println(result12.collect().mkString(","))//8.使用union做并集计算的两个rdd的数据类型要一致,否则出错//rdd1.union(rdd3)//4.关闭sc对象sparkContext.stop()}
}
交集intersection和差集subtract
交集:将两个RDD相同元素获取后组成新的RDD
差集:一个RDD去除和另一个RDD相同元素的剩余元素,组成的新的RDD
package com.wunaiieq.RDD//1.导入类
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object RddIntersectionSubtract {def main(args: Array[String]): Unit = {//2.构建SparkConf对象,并设置本地运行和程序名称val conf: SparkConf = new SparkConf().setMaster("local[1]").setAppName("交集和差集")//3.使用conf对象构建SparkContet对象val sparkContext = new SparkContext(conf)//5.创建5个RDD对象val rdd1: RDD[Int] = sparkContext.parallelize(Array(1, 2, 3, 4))val rdd2: RDD[Int] = sparkContext.parallelize(Array(3, 4, 5, 6))val rdd3: RDD[(Char, Int)] = sparkContext.parallelize(List(('x', 6), ('y', 1)))val rdd4: RDD[(Char, Int)] = sparkContext.parallelize(List(('x', 6), ('z', 3)))val rdd5: RDD[(Char, Int)] = sparkContext.parallelize(List(('x', 5), ('z', 3)))//6.交集运算,将两个rdd中的相同元素获取后组成一个新的rdd,无方向性println(rdd1.intersection(rdd2).collect().mkString(","))println(rdd2.intersection(rdd1).collect().mkString(","))val rdd34: RDD[(Char, Int)] = rdd3.intersection(rdd4)val rdd43: RDD[(Char, Int)] = rdd4.intersection(rdd3)println(rdd34.collect().mkString(","))println(rdd43.collect().mkString(","))//如果是二元组,key和value都相同,才认定为同一个元素println(rdd4.intersection(rdd5).collect().mkString(","))//7.差集,有方向性的println(rdd1.subtract(rdd2).collect().mkString(","))println(rdd2.subtract(rdd1).collect().mkString(","))println(rdd3.subtract(rdd4).collect().mkString(","))println(rdd4.subtract(rdd3).collect().mkString(","))//扩展:只比较keyprintln("-------------------")println(rdd4.subtract(rdd5).collect().mkString(","))//由于只比较key,所以rdd4和rdd5的key均为x,z因此差集为空println("=="+rdd4.subtractByKey(rdd5).collect().mkString(","))//4.关闭sc对象sparkContext.stop()}
}
join
只能使用kv类型的RDD,对相同的key进行匹配连接
package com.wunaiieq.RDD//1.导入类
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object RddJoin {def main(args: Array[String]): Unit = {//2.构建SparkConf对象,并设置本地运行和程序名称val conf: SparkConf = new SparkConf().setMaster("local[1]").setAppName("关联查询")//3.使用conf对象构建SparkContet对象val sc = new SparkContext(conf)//5.创建两个rddval rdd1: RDD[(Int, String)] = sc.makeRDD(List((101, "A"), (102, "B"), (103, "C")))val rdd2: RDD[(Int, String)] = sc.makeRDD(List((101, "a"), (102, "b"), (104, "d"),(101, "aa")))//6.通过join关联算子 rdd1.join(rdd2) 关联条件是key,只保留关联上的数据val rdd1j2: RDD[(Int, (String, String))] = rdd1.join(rdd2)//结果:(101,(A,a)),(101,(A,aa)),(102,(B,b))println(rdd1j2.collect().mkString(","))//7.左外关联(101,(A,Some(a))),(101,(A,Some(aa))),(102,(B,Some(b))),(103,(C,None))val rdd1loj2: RDD[(Int, (String, Option[String]))] = rdd1.leftOuterJoin(rdd2)println(rdd1loj2.collect().mkString(","))//8.右外关联(101,(Some(A),a)),(101,(Some(A),aa)),(104,(None,d)),(102,(Some(B),b))val rdd1roj2: RDD[(Int, (Option[String], String))] = rdd1.rightOuterJoin(rdd2)println(rdd1roj2.collect().mkString(","))//(101,(a,Some(A))),(101,(aa,Some(A))),(104,(d,None)),(102,(b,Some(B)))println(rdd2.leftOuterJoin(rdd1).collect().mkString(","))//4.关闭sc对象sc.stop()}
}
partitionBy
对RDD进行重新分区
package com.wunaiieq.RDD//1.导入类
import org.apache.spark.rdd.RDD
import org.apache.spark.{Partitioner, SparkConf, SparkContext}object RddPartitionBy {def main(args: Array[String]): Unit = {//2.构建SparkConf对象,并设置本地运行和程序名称val conf: SparkConf = new SparkConf().setMaster("local[1]").setAppName("partitionBy")//3.使用conf对象构建SparkContet对象val sc = new SparkContext(conf)//5.创建1个RDD对象val rdd: RDD[(String, Int)] = sc.parallelize(List(("andy", 1), ("jack", 1),("hello", 1), ("lucy", 1), ("tom", 1), ("su", 1)))println("重新自定义分区前:"+rdd.getNumPartitions)//6.调用partitionBy()对rdd对象进行重新分区val rdd1: RDD[(String, Int)] = rdd.partitionBy(new Partitioner {override def numPartitions = 3override def getPartition(key: Any): Int = {//获取key的首字母val firstChar: Char = key.toString.charAt(0)//['a','i] => 0if (firstChar >= 'a' && firstChar <= 'i') {return 0} else if (firstChar >= 'j' && firstChar <= 'q') {return 1} else {return 2}}})val result: Array[Array[(String, Int)]] = rdd1.glom().collect()//输出result.foreach(arr=>println(arr.mkString(",")))//4.关闭sc对象sc.stop()}
}
mapPartition
类似于map,作用是对每个分区进行操作
算子 | 数据处理方式 | 性能特性 | 适用场景 |
---|---|---|---|
map | 对RDD中的每一个元素进行操作 | 每个元素独立处理,通常性能稳定 | 需要对RDD中的每个元素执行简单转换或计算的场景 |
接受一个函数,应用于RDD的每个元素 | 内存使用较为安全,不易导致内存溢出问题 | 不依赖分区元数据或复杂计算的场景 | |
mapPartition | 对RDD中的每一个分区的迭代器进行操作 | 可能具有更好的性能,因为减少了函数调用的开销 | 需要对RDD中的每个分区执行复杂计算或转换的场景 |
接受一个函数,应用于RDD的每个分区 | 内存使用需谨慎,可能因处理大量数据而导致内存溢出 | 依赖分区元数据或需要减少资源创建次数的场景 |
package com.wunaiieq.RDD//1.导入类
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}import scala.collection.mutable.ListBufferobject RddMapPartition {def main(args: Array[String]): Unit = {//2.构建SparkConf对象,并设置本地运行和程序名称val conf: SparkConf = new SparkConf().setMaster("local[1]").setAppName("mapPartition")//3.使用conf对象构建SparkContet对象val sc = new SparkContext(conf)//5.创建RDD对象val rdd: RDD[String] = sc.parallelize(List("a", "b", "c", "d", "e", "f"), 3)//6.自定义一个处理函数def process(datas: Iterator[String]): Iterator[String] = {println("创建数据库连接...")val result = ListBuffer[String]()//遍历datasfor(ele<-datas){result.append(ele)}println("批量插入数据:"+result)println("关闭数据库连接...")//返回result.iterator}//7.调用mapPartitions操作val resultRdd: RDD[String] = rdd.mapPartitions(process)resultRdd.count()//4.关闭sc对象sc.stop()}
}
sample
根据指定规则从RDD中抽取数据
参数名称 | 类型 | 描述 |
---|---|---|
withReplacement | Boolean | 是否允许放回抽样。true 表示允许元素被重复抽取,false 表示不允许。 |
fraction | Double | 抽取比例,表示希望从原始RDD中抽取的样本比例(0到1之间)。若withReplacement 为true ,则此值可理解为每个元素被期望抽取的次数。 |
seed | Long | 随机种子。若提供,则每次运行抽样操作会得到相同的结果;若不提供(或设置为None),则每次结果可能不同。 |
package com.wunaiieq.RDD//1.导入类
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable.ListBuffer
object RddSample {def main(args: Array[String]): Unit = {//2.构建SparkConf对象,并设置本地运行和程序名称val conf: SparkConf = new SparkConf().setMaster("local[1]").setAppName("sample")//3.使用conf对象构建SparkContet对象val sc = new SparkContext(conf)//5.创建RDD对象val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 9, 0), 1)val rdd2: RDD[Int] = rdd.sample(true, 2)println("2"+rdd2.collect().mkString(","))//当种子相同(前两个参数的值也相同)时,多次抽取的结果相同val rdd3: RDD[Int] = rdd.sample(false, 0.3, 6)val rdd4: RDD[Int] = rdd.sample(false, 0.3, 6)println(rdd3.collect().mkString(","))println(rdd4.collect().mkString(","))val rdd5: RDD[Int] = rdd.sample(true, 2, 3)val rdd6: RDD[Int] = rdd.sample(true, 2, 3)println(rdd5.collect().mkString(","))println(rdd6.collect().mkString(","))//4.关闭sc对象sc.stop()}
}
行动算子示例
ForeachPartition
在进行每个分区操作前后增加一些额外的操作
package com.wunaiieq.RDD.action//1.导入类
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable.ListBuffer
object RddForeachPartition {def main(args: Array[String]): Unit = {//2.构建SparkConf对象,并设置本地运行和程序名称val conf: SparkConf = new SparkConf().setMaster("local[1]").setAppName("ForeachPartition")//3.使用conf对象构建SparkContet对象val sc = new SparkContext(conf)//5.创建RDD对象val rdd: RDD[String] = sc.parallelize(List("a", "b", "c", "e", "f", "d"), 3)//6.自定义处理函数def process(datas: Iterator[String]): Unit = {println("操作一-开始一个分区...")val result = ListBuffer[String]()for(data<-datas){result.append(data)}println("当前分区的数据:"+result)println("操作二-结束一个分区...")}//7.调用foreachPartition算子,参数为自定义函数rdd.foreachPartition(process)//4.关闭sc对象sc.stop()}
}
Foreach
对每个元素提供操作逻辑,无返回值
package com.wunaiieq.RDD.action//1.导入类
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}import scala.collection.mutable.ListBuffer
object RddForeach {def main(args: Array[String]): Unit = {//2.构建SparkConf对象,并设置本地运行和程序名称val conf: SparkConf = new SparkConf().setMaster("local[1]").setAppName("foreach")//3.使用conf对象构建SparkContet对象val sc = new SparkContext(conf)//5.创建RDD对象val rdd: RDD[String] = sc.parallelize(List("a", "b", "c", "e", "f", "d"), 3)//6.自定义函数def process(ele:String): Unit ={println("开启process..")println("当前数据"+ele)println("结束process..")}//7.调用foreach算子,参数为自定义函数 f: T => Unitrdd.foreach(process)println("------------------")//8.可以为foreach算子指定print\println函数//rdd.foreach(print)rdd.foreach(println)//4.关闭sc对象sc.stop()}
}
SaveAsTextFile
将RDD数据写入到txt文本文件中,支持本地文件、hdfs等,RDD的每个分区均会产生一个结果文件
package com.wunaiieq.RDD.action//1.导入类
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object RddSaveAsTextFile {def main(args: Array[String]): Unit = {//2.构建SparkConf对象,并设置本地运行和程序名称val conf: SparkConf = new SparkConf().setMaster("local[1]").setAppName("SaveAsTextFile")//3.使用conf对象构建SparkContet对象val sc = new SparkContext(conf)//5.创建RDD对象val rdd: RDD[String] = sc.parallelize(List("a", "b", "c", "e", "f", "d"), 3)//6.将rdd中的数据保持到本地磁盘rdd.saveAsTextFile("data/output/file")
// 7.将数据写入到HDFS文件系统中 node1上namenode需要是active
// rdd.saveAsTextFile("hdfs://node1:9820/spark/output/file")
// 8.为了避免active namenode放生变化,所以最好使用mycluster
// UnknownHostException: mycluster 需要将虚拟机中的core-site.xml和hdfs-site.xml文件下载到本项目的resources目录下
// rdd.saveAsTextFile("hdfs://mycluster/spark/output/file")//4.关闭sc对象sc.stop()}
}
CountByKey
统计kv型RDD中,key出现的次数
转换算子reduceByKey是对RDD做聚合操作,针对相同key的value还会进行相加
package com.wunaiieq.RDD.action//1.导入类
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object RddCountByKey {def main(args: Array[String]): Unit = {//2.构建SparkConf对象,并设置本地运行和程序名称val conf: SparkConf = new SparkConf().setMaster("local[1]").setAppName("foreach")//3.使用conf对象构建SparkContet对象val sc = new SparkContext(conf)//5.创建KeyValue型RDD对象val rdd: RDD[(String, Int)] = sc.parallelize(List(("a", 1), ("b", 1), ("a", 2), ("b", 2), ("a", 3), ("a", 4)))//6.复习转换算子reduceByKey:将相同key的value值做聚合操作val rdd2: RDD[(String, Int)] = rdd.reduceByKey(_ + _)println(rdd2.collect().mkString(","))//7.行动算子countByKey 表示统计key出现次数val result: collection.Map[String, Long] = rdd.countByKey()result.foreach(println)//4.关闭sc对象sc.stop()}
}
Reduce
对传入的RDD做聚合操作(一般为相加)
package com.wunaiieq.RDD.action//1.导入类
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object RddReduce {def main(args: Array[String]): Unit = {//2.构建SparkConf对象,并设置本地运行和程序名称val conf: SparkConf = new SparkConf().setMaster("local[1]").setAppName("reduce")//3.使用conf对象构建SparkContet对象val sc = new SparkContext(conf)//5.创建RDD对象val rdd: RDD[Int] = sc.parallelize(List(1, 2, 3, 4))//6.将rdd中所有元素做聚合加操作val sum: Int = rdd.reduce(_ + _)println(sum)//4.关闭sc对象sc.stop()}
}
fold
此方法也是聚合操作,不过会携带一个初始值,在每个分区进行聚合操作时,会带上初始值一起进行
分区1:10(初始值)+1+2+3(123为分区内数据)=16
分区2:10+4+5+6=25
分区3:10+7+8+9=34
分区聚合:10(初始值)+16(分区1)+25(分区2)+34(分区3)=85
package com.wunaiieq.RDD.action//1.导入类
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object RddFold {def main(args: Array[String]): Unit = {//2.构建SparkConf对象,并设置本地运行和程序名称val conf: SparkConf = new SparkConf().setMaster("local[1]").setAppName("fold")//3.使用conf对象构建SparkContet对象val sc = new SparkContext(conf)//5.创建RDD对象//val rdd: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 1)val rdd: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 3)//6.执行带有初始值的fold聚合操作//10表示初始值,_+_表示做聚合加操作//RDD对象中所有元素的和+(partitionNum+1)*10val result: Int = rdd.fold(10)(_ + _)//7.注意:初始值的类型要和RDD中数据的类型一致//rdd.fold(10.5)(_+_)//错误的println(result)//4.关闭sc对象sc.stop()}
}
first、take、count
first:取出RDD第一个元素
take:取出RDD的前n个元素
count:统计RDD的元素数量
RDD的分区是只存储位置的不同,因此对于结果的顺序是没影响的,但是不同分区的元素数量会影响take的性能
比如第一个分区上1个元素,第二个2个,第三个3个,现在执行take(5)它需要先从第一个分区上读取,然后第二个,第三个,影响效率
package com.wunaiieq.RDD.action//1.导入类
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object RddFirstTakeCount {def main(args: Array[String]): Unit = {//2.构建SparkConf对象,并设置本地运行和程序名称val conf: SparkConf = new SparkConf().setMaster("local[1]").setAppName("FirstTakeCount")//3.使用conf对象构建SparkContet对象val sc = new SparkContext(conf)//5.创建RDD对象,这里是否分区没区别val rdd: RDD[Int] = sc.parallelize(List(5, 2, 1, 4, 3, 8, 6, 1))//6.获取第一个元素val first: Int = rdd.first()println("first:"+first)//7.获取rdd前三个元素val threeEle: Array[Int] = rdd.take(3)println(threeEle.mkString(","))//8.获取rdd中元素的总数val eleNum: Long = rdd.count()println(eleNum)//4.关闭sc对象sc.stop()}
}
top、takeOrderd
top算子:
top(n)取出RDD降序排序后的topN
top(n)(Ordering.Int.reverse)取出RDD升序后的topN
takeOrdered算子:
takeOrdered(n)取出RDD升序排序后的topN
takeOrdered(n)(Ordering.Int.reverse)取出RDD降序排序后的topN
简单来讲就是top和takeOrdered用于获取排序的最开始n位,默认排序位降序、升序,同时可以增加其他自定义的函数
package com.wunaiieq.RDD.action//1.导入类
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object RddTopTakeOrdered {def main(args: Array[String]): Unit = {//2.构建SparkConf对象,并设置本地运行和程序名称val conf: SparkConf = new SparkConf().setMaster("local[1]").setAppName("TakeOrdered")//3.使用conf对象构建SparkContet对象val sc = new SparkContext(conf)//5.创建RDD对象val rdd: RDD[Int] = sc.parallelize(List(5, 2, 1, 4, 3, 8, 6, 1))//6.获取默认降序的top3val top3: Array[Int] = rdd.top(3)println("top3:"+top3.mkString(","))//7.获取默认升序的top3val takeOr3: Array[Int] = rdd.takeOrdered(3)println("takeOr3:"+takeOr3.mkString(","))//8.获取升序的top3val top3r: Array[Int] = rdd.top(3)(Ordering.Int.reverse)println("top3r:"+top3r.mkString(","))//9.获取降序的top3val takeOr3r: Array[Int] = rdd.takeOrdered(3)(Ordering.Int.reverse)println("takeOr3r:"+takeOr3r.mkString(","))//4.关闭sc对象sc.stop()}
}
takeSample
//withReplacement:true可以重复,false:不能重复。数据位置是否重复不是值
//num:抽样的元素个数
//seed:随机种子,数值型的;RDD元素相同的情况下,相同的种子抽样出的元素也相同
def takeSample(withReplacement: Boolean,num: Int,seed: Long = Utils.random.nextLong): Array[T]
)
方法 | 使用场景 | 返回类型 | 采样方式 | 参数设置 |
---|---|---|---|---|
takeSample 行动算子 | 从 RDD 中随机抽取指定数量的样本 | 数组 | 按设定的采样个数 | withReplacement (是否允许重复采样,布尔值)num (要抽取的样本数量,整数)seed (随机算法的种子值,可选) |
sample 转换算子 | 从 RDD 中随机抽取按比例分布的样本 | 新的 RDD | 按指定的比例 | withReplacement (是否允许重复采样,布尔值)fraction (要抽取的样本比例,0 到 1 之间的浮点数)seed (随机算法的种子值,可选) |
package com.wunaiieq.RDD.action//1.导入类
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object RddTakeSample {def main(args: Array[String]): Unit = {//2.构建SparkConf对象,并设置本地运行和程序名称val conf: SparkConf = new SparkConf().setMaster("local[1]").setAppName("TakeSample")//3.使用conf对象构建SparkContet对象val sc = new SparkContext(conf)//5.创建RDD对象val rdd: RDD[Int] = sc.parallelize(List(5, 2, 1, 4, 3, 8, 6, 1))//6.第一个参数为true表示被抽出的元素返回,所以抽样的元素个数可以超过rdd.count() 元素个数val result: Array[Int] = rdd.takeSample(true, 10, 2)println(result.mkString(","))//7.第一个参数为false表示被抽取出的数据不放回,如果数量>rdd.count()println(rdd.takeSample(false,10,2).mkString(","))//8.三个参数的值都一样时,多次抽取结果println(rdd.takeSample(false,5,3).mkString(","))println(rdd.takeSample(false,5,3).mkString(","))//4.关闭sc对象sc.stop()}
}