一,算子的分类
整个DRR算子分为两大类:
Transformation(转换算子):
返回值:是一个新的DRR
特点:转换算子只是定义数据的处理规则,并不会立刻执行,是lazy(惰性)的。需要有Action算子触发。
Action算子(动作算子):
返回值:要么没有返回值None,或者返回非DRR类型的数据
特点:动作算子都是立即执行。执行时,会将它上游的其他算子一同执行触发
二,DRR的转换算子
2.1单值类型算子
-
map算子:
-
格式:rdd.map(fn)
-
说明: 主要根据传入的函数,对数据进行一对一的转换操作,传入一行,返回一行
-
需求: 数字加一后返回
init_add=sc.parallelize([1,2,3,4,5,6,7,8,9,10])print(init_add.map(lambda x: x + 1).collect())
-
flatMap算子:
-
格式:rdd.flatMap(fn)
-
说明:在map算子的基础上,加入一个压扁的操作, 主要适用于一行中包含多个内容的操作,实现一转多的操作
-
需求: 将姓名拆分,把每个姓名都放到同一个数据集中
init_rdd1 = sc.parallelize(['张三 李四 王五', '赵六 周日'])print(init_rdd1.flatMap(lambda x: x.split(' ')).collect())
-
groupBy 算子:
-
格式: groupBy(fn)
-
说明: 根据用户传入的自定义函数,对数据进行分组操作
-
注意: mapValues(list)可以以列表形式展示数据
-
需求: 将数据分成奇数和偶数
init_rdd3 = sc.parallelize([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])print(init_rdd3.groupBy(lambda num: "偶数" if num / 2 == 0 else "奇数").mapValues(list).collect())
-
filter算子:
-
格式:filter(fn)
-
说明:根据用户传入的自定义函数对数据进行过滤操作。自定义函数的返回值类型是bool类型。True表示满足过滤条件,会将数据保留下来;False会将数据丢弃掉
-
过滤掉数值<=3的数据
init_rdd4 = sc.parallelize([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])print(init_rdd4.filter(lambda num: num > 3).collect())
2.2双值类型算子
-
union(并集) 和intersection(交集)
-
格式: rdd1.union(rdd2) rdd1.intersection(rdd2)
-
注意: 并集的结果可以使用distinct()函数去重
-
print('----------------union(并集) 和intersection(交集)-------------------')rdd1 = sc.parallelize([3, 3, 2, 6, 8, 0])rdd2 = sc.parallelize([3, 2, 1, 5, 7])#union取并集不会对重复出现的数据去重print(rdd1.union(rdd2).collect())#对并集的结果进行去重print(rdd1.union(rdd2).distinct().collect())#交集print(rdd1.intersection(rdd2).collect())