一、RDD的介绍(了解)
RDD:resilient distributed dataset(弹性分布式数据集合 ) spark的计算核心,spark采用rdd管理数据
-
RDD
-
RDD是spark的一种数据模型(规定数据的存储结构和计算方法)
-
python中的数据模型
-
list [] 可以重复存储数据 append
-
set{} 不允许重复存储
-
dict {k:v} get(key)
-
-
RDD的模型可以对内存数进行共享管理
-
-
分布式
-
数据可以在多台服务器上同时计算执行
-
-
弹性
-
可以根据计算的需求将数据进行分区拆分,本质就是将数据分成多份
-
二、RDD的特点(了解)
-
分区
-
可以将计算的海量数据分成多份,需要分成多少可分区可以通过方法指定
-
每个分区都可以对应一个task线程执行计算
-
-
只读
-
rdd中的数据不能直接修改,需要通过方法计算后得到一个新的rdd
-
rdd本身存储的数只能读取
-
-
依赖
-
rdd之间是有依赖关系的
-
新的rdd是通过旧的rdd计算得到
-
-
缓存
-
可以将计算的中结果缓存起来,如果后续计算错误时,可以从缓存位置重新计算
-
将数据存储在内存或本地磁盘
-
作用是容错
-
缓存在执行计算任务程序结束后会释放删除
-
-
checkpoint
-
作用和缓存一样
-
checkpoint可以将数据存储在分布式存储系统中,比如hdfs
-
三、创建RDD数据(掌握)
将需要计算的数据转为rdd的数据,就可以利用spark的内存计算方法进行分布式计算操作,这些计算方法就是有rdd提供的
rdd数据的转化方法是有sparkcontext提供的,所以需要先生成sparkcontext,sparkcontext中还包含资源申请和任务划分功能
SparkContext称为Spark的入口类
3-1 Python数据转化为rdd
# 导入sparkcontext from pyspark import SparkContext # 创建SparkContext对象 sc = SparkContext() # 将Python数据转为rdd # data_int = 10 # 数值类型不能转化rdd # 能for循环遍历的数据都能转为rdd data_str = 'abc' data_list = [1, 2, 3, 4] data_dict = {'a': 1, 'b': 2} data_set = {1, 2, 3, 4} data_tuple = (1, 2, 3, 4) rdd = sc.parallelize(data_tuple) # rdd的计算 # rdd的数据输出展示 # 获取所有rdd数据 res = rdd.collect() print(res)
3-2 文件数据(hdfs)转化为rdd
# 将读取的hdfs文件数据转为rdd from pyspark import SparkContext # 生成SparkContext类对象 sc = SparkContext() # 读取文件数据转为rdd rdd1 = sc.textFile('hdfs://node1:8020/data') rdd2 = sc.textFile('/data/words.txt') # 查看数据 res = rdd1.collect() print(res) res = rdd2.collect() print(res)
3-3 rdd的分区
-
python数据转发的分区数指定
# RDD分区使用 # 导入sparkcontext from pyspark import SparkContext # 创建SparkContext对象 sc = SparkContext() # 创建生成rdd是可以指定分区数 # Python数据转为rdd指定 # numSlices 可以指定分区数 rdd_py = sc.parallelize([1,2,3,4,5,6],numSlices=10) # rdd计算 # 查看rdd分区数据 res1 = rdd_py.glom().collect() print(res1)
-
读取的文件数据进行分区数指定
# RDD分区使用 # 导入sparkcontext from pyspark import SparkContext # 创建SparkContext对象 sc = SparkContext() # 创建生成rdd是可以指定分区数 # file文件读取数据指定分区数据 # minPartitions 指定分区 # 文件大小/分区数 = 值 -----余数 # 余数/值 * 100%=百分比 百分比大于10% 会多创建一个分区 rdd_file = sc.textFile('hdfs://node1:8020/data',minPartitions=1) # 在spark并行度部分会讲解如何根据资源设置分区数 # rdd计算 # 查看rdd分区数据 res2 = rdd_file.glom().collect() print(res2)
3-4 小文件数据读取
300M 3个块 对应三个分区
在一个目录下,有多个文件,如果文件的大小不够一个块的大小,一个文件就对应一个分区,文件超过一个块,那就一个block(128M)块对应一个分区。
目录下都是小文件,那么读取目录下的文件数据,会对应很多个分区
一个分区对应一个task线程,当小文件过多时,会占用大量的线程,造成资源浪费
使用wholeTextFiles方法可以解决
该方法会现将读取到的数据合并在一起,然后重新进行分区
# 导入sparkcontext from pyspark import SparkContext # 创建SparkContext对象 sc = SparkContext(master='yarn') # rdd = sc.textFile('hdfs://node1:8020/data') # rdd计算 # wholeTextFiles 会合并小文件数据 # minPartitions 指定分区数 rdd_mini = sc.wholeTextFiles('hdfs://node1:8020/data',minPartitions=1) # 展示数据 # res1 = rdd.glom().collect() # print(res1) res2 = rdd_mini.glom().collect() print(res2)
java.lang.NoSuchMethodError
java包类的冲突
node1操作 同步到node2和node3
四、常用RDD算子(掌握)
将数据转化为rdd之后,就需要进行rdd的计算了,rdd提供了计算方法
rdd的方法又称为rdd算子
4-1 算子(方法)介绍
rdd中封装了各种算子方便进行计算,主要分为两类
-
transformation
-
转化算子 对rdd数据进行转化计算得到
新的rdd
,定义了一个线程任务
-
-
action
-
执行算子 触发计算任务,让计算任务进行执行,得到结果
-
触发线程执行的
-
rdd的转化算子大部分都是从rdd中读取元素数据(rdd中每条数据),具体计算需要开发人员编写函数传递到rdd算子中
rdd的执行算子则大部分是用来获取数据 collect方法就是触发算子
4-2 常用transformation算子(掌握)
-
map
-
rdd.map(lambda 参数:参数计算)
-
参数接受每个元素数据
-
# 转化算子map的使用 from pyspark import SparkContext # 创建SparkContext对象 sc = SparkContext() # 生成rdd data = [1, 2, 3, 4] rdd = sc.parallelize(data) # 对rdd进行计算 # 转化算子map使用 # 将处理数据函数当成参数传递给map # 定义函数只需要一个接受参数 def func(x):"""数据计算逻辑函数:param x: 接收每一个rdd的元素数据:return:"""return x + 1 def func2(x):"""数据计算逻辑函数:param x: 接收每一个rdd的元素数据:return:"""return str(x) # 转化算子执行后会返回新的rdd rdd_map = rdd.map(func) rdd_map2 = rdd.map(func2) rdd_map3 = rdd_map2.map(lambda x: [x]) # 对rdd数据结果展示 # 使用rdd的触发算子,collect获取是所有的rdd元素数据 res = rdd_map.collect() print(res) res2 = rdd_map2.collect() print(res2) res3 = rdd_map3.collect() print(res3)
-
flatMap
-
处理的是二维嵌套列表数据 [[1,2,3],[4,5,6],[7,8,9]] [1,2,3,4]
-
rdd.flatMap(lambda 参数:[参数计算])
-
from pyspark import SparkContext # 创建SparkContext对象 sc = SparkContext() # 生成rdd data = [[1, 2], [3, 4]] data2 = ['a,b,c','d,f,g'] # 将数据转为['a','b','c','d','f','g'] rdd = sc.parallelize(data) rdd2 = sc.parallelize(data2) # rdd计算 # flatMap算子使用 将rdd元素中的列表数依次遍历取出对应的值放入新的rdd [1,2,3,4] # 传递一个函数,函数接受一个参数 rdd_flatMap = rdd.flatMap(lambda x: x) rdd_map = rdd2.map(lambda x:x.split(',')) rdd_flatMap2 = rdd_map.flatMap(lambda x:x) # 输出展示数据 # 使用执行算子 res = rdd_flatMap.collect() print(res) res2 = rdd_map.collect() print(res2) res3 = rdd_flatMap2.collect() print(res3)
-
fliter
-
rdd.filter(lambda 参数:参数条件过滤)
-
条件过滤的书写和Python中if判断一样
-
# 3、过滤算子 rdd7 = sc.parallelize([1, 2, 3, 4]) rdd8 = sc.parallelize(['a', 'b', 'c', 'a']) # filter算子,可以接受rdd中每个元素数据,然后传递给函数进行过滤 # lambda需要有一个接收值x,x接收到每个元素数据后,如何进行过滤需要写判断逻辑 # 判断条件的书写逻辑和if的判断逻辑一样 filter_rdd = rdd7.filter(lambda x: x > 2) filter_rdd2 = rdd8.filter(lambda x: x == 'a')
-
distinct 去重
-
不需要lambda rdd.distinct
-
# 4、去重 # distinct 会对rdd中的重复数据进行去重,去重后会返回一个新的rdd distinct_rdd = rdd8.distinct()
-
groupBy 分组
-
rdd.groupBy(lambda 参数:根据参数编写分组条件)
-
mapValues(list)
-
# 5、对数据进行分组 # groupBy是分组算子,会读取rdd中每个元素数据,传递给函数使用 # lambda需要一个接收值x,接收groupBy传递的元素数据,然后指定分组规则 # hash(x) % 2 对x中的元素数据进行hash取余,将数据分成两组,余数相同的数据会放在一起 # groupBy返回一个新的rdd,rdd的结构形式是 [(key,value),(k,v)] groupBy_rdd = rdd8.groupBy(lambda x: hash(x) % 2)# 6、对kv形式的数据进行取值处理 # mapValues,可以获取kv中的value值部分传递给函数进行使用 # mapValues返回一个新的rdd数据 mapValues_rdd = groupBy_rdd.mapValues(lambda x: list(x))
-
k-v数据 [(k,v),(k1,v1)]
-
groupByKey()
-
rdd.groupByKey()
-
-
reduceByKey()
-
rdd.reduceByKey(lambda 参数1,参数2:对两个参数计算)
-
-
sortByKey()
-
rdd.sortByKey()
-
-
# 6-2 对kv形式的数据进行分组,系统key值得数据会放在一起 rdd9 = sc.parallelize([('a', 1), ('b', 2), ('c', 3), ('a', 1), ('b', 2)]) # 不需要传递处理函数,返回一个新的rdd groupByKey_rdd = rdd9.groupByKey() mapValues_rdd2 = groupByKey_rdd.mapValues(lambda x: list(x))# 6-3 对kv形式的数据先进行分组,在进行聚合计算 # reduceByKey会将相同key的数据放在一起,然后对每个key中对应的valeu进行累加计算 # reduceByKey会将分组后的数据,按照key值传递个函数进行计算 # lambda需要接受两个参数,后面编写累加计算 x=0 y=3 x+y=3 x=3,y=0 res=x+y=3 reduceByKey_rdd = rdd9.reduceByKey(lambda x, y: x + y)# 6-4 对kv形式的数据先进行排序 # 不需要指定函数,按照key排序,默认升序 sortByKey_rdd= rdd9.sortByKey() sortByKey_rdd2= rdd9.sortByKey(ascending=False)
-
sortBy() 排序
-
rdd.sortBy(lambda x:x,ascending=False)
-
# 排序算子 # sortBy 可以指定按照哪个数据进行排序 # sortBy会将rdd中的元素数据传递给函数使用 # lambda 需要一个接受值x,接受rdd中每个元素 # 如果元素是kv类型可以通过下标方式指定按照那种排序 x[0] 代表key x[1] 代表value值 # 默认升序 sortBy_rdd = rdd9.sortBy(lambda x: x[1]) # 降序 sortBy_rdd2 = rdd9.sortBy(lambda x: x[1],ascending=False)
4-3 常用action算子(掌握)
-
collect() 取出rdd中所有值
-
rdd.collect()
-
-
reduce() 非k-v类型数据累加 [1,2,3,4,6]
-
rdd.reduce(lambda 参数1,参数2:两个参数计算)
-
-
count() 统计rdd元素个数
-
rdd.count()
-
-
take() 取出指定数量值
-
rdd.take(数量)
-
# 执行算子的使用 from pyspark import SparkContextsc = SparkContext()# python转为rdd rdd = sc.parallelize([1, 2, 3, 4, 5, 6])# transformation算的转化计算 # map,flatMap,fliter等# 触发计算 # action算子计算完成返回的是计算结果,不在是rdd了,不能在进行rdd操作了 # collect方法,触发计算获取是所有计算结果 res = rdd.collect() print(res) # reduce方法,传递一个计算逻辑,对元素数据进行累加计算 # 可以不需要转化算直接累加计算,但是不能处理kv形式数据 res = rdd.reduce(lambda x,y:x+y) # x初始为0 y一次获取元素数据 x=0,y=1 x= x+y=1 # x=1,y=2 x+y=3 # x=3,y=3 x+y=6 print(res)# count 获取rdd元素个数 res = rdd.count() print(res)# take取指定数量的元素数据 res=rdd.take(3) print(res)
4-4 词频统计案例(掌握)
# 词频统计 # 导入sparkcontext from pyspark import SparkContext# 创建SparkContext对象 sc = SparkContext()# 将hdfs的文件数据读取后转为rdd # 第一个参数 指定读取的文件路径 # rdd = sc.textFile('hdfs://node1:8020/data') # 简写 有的会读取错误,当错误是就写完整 # rdd = sc.textFile('/data') # 读取某单独文件 rdd = sc.textFile('hdfs://node1:8020/data/words.txt') # rdd计算 # 对读取到的rdd中的每行数据,先进行切割获取每个单词的数据 # rdd_map = rdd.map(lambda x: x.split(',')) rdd_flatMap= rdd.flatMap(lambda x: x.split(','))# 将单词数据转化为k-v结构数据 [(k,v),(k1,v1)] 给每个单词的value一个初始值1 rdd_map_kv = rdd_flatMap.map(lambda x:(x,1))# 对kv数据进行聚合计算 hive:[1,1] 求和 求平均数 求最大值 求最小值 rdd_reduceByKey = rdd_map_kv.reduceByKey(lambda x,y:x+y) # 现将相同key值的数据放在一起,然后对相同key值内的进行累加# 展示数据 res = rdd.collect() print(res)# res2 = rdd_map.collect() # print(res2)res3 = rdd_flatMap.collect() print(res3)res4 = rdd_map_kv.collect() print(res4)res5 = rdd_reduceByKey.collect() print(res5) # [('hadoop',1),('flink',1),('spark',2),('hive',2)]
4-5 其他高级算子
-
多个rdd的方法
-
union 合并两个rdd 不去重
-
join k-v类型数据 通过key进行关联
-
# 多个rdd操作 from pyspark import SparkContextsc = SparkContext()rdd1 = sc.parallelize([1,2,3,4]) rdd2 = sc.parallelize([5,6,7,4])rdd_kv1 = sc.parallelize([('a',1),('b',2),('c',3)]) rdd_kv2 = sc.parallelize([('c',4),('d',5),('e',6)])# rdd之间的合并 # rdd1和并rdd2,合并后会返回先的rdd union_rdd = rdd1.union(rdd2) # rdd3和并rdd4,合并后会返回先的rdd union_kv_rdd = rdd_kv1.union(rdd_kv2)# kv形式rdd进行join关联 通过key关联 # 内关联 相同key的数据会保留下来 join_rdd = rdd_kv1.join(rdd_kv2) # 左关联 左边rdd的数据会被保留下来,如果右边rdd有对应的key值数据会显示,没有对应key值会显示为空 leftOuterJoin_rdd = rdd_kv1.leftOuterJoin(rdd_kv2) # 右关联 右边rdd的数据会被保留下来,如果左边rdd有对应的key值数据会显示,没有对应key值会显示为空 rightOuterJoin_rdd = rdd_kv1.rightOuterJoin(rdd_kv2)# 查看结果 res = union_rdd.collect() print(f'union合并结果:{res}') res2 = union_kv_rdd.collect() print(f'kv_union合并结果:{res2}') res3 = join_rdd.collect() print(f'join内关联结果:{res3}') res4 = leftOuterJoin_rdd.collect() print(f'左关联结果:{res4}') res5 = rightOuterJoin_rdd.collect() print(f'右关联结果:{res5}')
-
重分区
# 1、导入sparkcontext类 from pyspark import SparkContext# 2、初始化SparkContext类型 # 没有指定master参数,默认使用本机资源 sc = SparkContext()# 3、将数据转为rdd数据 # 转化Python数据 data_list = [1, 2, 3, 4, 5, 6]rdd = sc.parallelize(data_list,numSlices=10)map_rdd = rdd.map(lambda x:x+1)# 修改rdd分区信息 # 指定修改的分区数 返回得到新的rdd # repartition 在进行使用时更多进行的是减少分区数 repartition_rdd = map_rdd.repartition(4)repartition_rdd2 = map_rdd.repartition(3)# 查看结果信息 # glom() 查看当前rdd的分区信息 res = map_rdd.glom().collect() print(res) res2 = repartition_rdd.glom().collect() print(res2)res3 = repartition_rdd2.glom().collect() print(res3)
-
数据保存
# 1、导入sparkcontext类 from pyspark import SparkContext# 2、初始化SparkContext类型 # 没有指定master参数,默认使用本机资源 sc = SparkContext()# 3、将数据转为rdd数据 # 转化Python数据 data_list = [1, 2, 3, 4, 5, 6]rdd = sc.parallelize(data_list,numSlices=3)rdd.saveAsTextFile('/itcast111')