1.Spark安装
1.1 环境
- JDK1.8以上
# 软连接(解耦合) rm -rf /usr/latest #删除软链接(最后的反斜杠不能删掉) ln -s /usr/java/jdk1.8.0 /usr/java/latest#创建新的软连接 # 环境变量 1. ~/bash_profile每个用户自己专属的shell环境,只在用户登录的时候,执行一次,修改后source或重启后生效 2. ~/bashshrc每个用户自己专属的shell环境,在用户登录的时候或者执行shell脚本的时候,执行一次 3. /etc/profile所有用户共用的系统变量,修改以后 source或者重启后生效,当用户登录的时候,执行一次
- Scala环境
- 安装scala
1.2 Spark安装
- 解压
- 配置环境变量
```
1.JAVA_HOME
2.CLASSPATH # java编译文件class的目录
3.HADOOP_HOME
4.HADOOP_CLASSPATH# hadoop依赖包的路径,hadoop安装目录下的lib
5.SPARK_HOME
6.PATH #所有命令配置到系统变量
PATH=$PATH # 在保留原来的一些配置基础上增加配置;“:”是分隔符
```
- 修改spark配置文件
```
# 1.spark-env.shSPARK_MASTER_HOST:主节点ipSPARK_MASTER_PORT:主节点端口号SPARK_WORKER_CORES:从节点核数SPARK_WORKER_MEMORY:cpu分配多少内存,从节点计算所用内存大小SPARK_DIST_CLASSPATH:配置hadoop的依赖包
# 2.slaves从节点的ip地址
# 3.上传spark日志需要的jar包,3个
# 启动sbin /start-all.sh
```
2.Spark Standalone架构
2.1 架构组成
1.Driver:运行spark应用程序的驱动,调用main()函数,创建一个SparkContext对象
2.SaprkContext:SparkContext用来可spark集群进行交互
3.Executor:执行器,在worker上运行的程序,负责计算
4.Task:任务
5.DAG Scheduler:有向无环图,一个DAG中可以分解出多个任务
6.Task Scheduler:任务调度器
2.2运行流程
1.启动Spark集群,worker节点用心跳机制和master进行通信
2.启动Driver,调用main函数,并创建SparkContext对象
3.SparkContext向master申请计算资源,makster会根据worker的心跳来分配worker的资源,并启动worker的Executor进程
4.SparkContext将代码程序解析成DAG结构,并交给DAGScheduler进行角度
5.DAG会在DAG Scheduler中分解为很多的stage(阶段),每个阶段包含多个task
6.stage(多个task组成的组合)会被调度到TaskSheduler中,TaskSchedule将任务分配到worker中,并交给Executor进程进行计算
7.executor会创建一个线程池去执行Task,并将执行结果反馈都SparkContext中,直到所有的task执行完毕
8.SparkContext向mskter注销释放资源
3.Spark on yarn
3.1
3.2
3.3
4.算子
4.1 转换算子
将RDD转为另一个RDD对象,仅仅记住数据集的逻辑操作,不会真正进行计算
- map()
返回一个新的分布式数据集(RDD),由原来每个元素通过func转换而来,和python自带的map一样 - filter
返回一个新的分布式数据集(RDD),由经过func返回True的原来元素组成 过滤 - flatMap
每输入一个元素,都会英东为0到多个输出元素,func返回的是一个序列,不再是单一元素 - sample
随机抽取一个子集 - union
将两个RDD合并成一个RDD - groupByKey
读取k,v类型数据,按key进行分组 - reduceByKey
读取k,v类型数据,按key进行合并 - join
将两个数据集进行合并,将key,value中的value进行合并,key相同保留,不相同的去掉 - groupBy
根据给定的函数分组 - subtract
返回一个集合中不存在另一个的集合的数据 - intersection
返回两个RDD的交集 - cartesian
返回两个rdd自由交叉组合的结果 - combineByKey
将k,v数据形式的数据转为k,c的形式,参数是三个方法 - mapPartitions
把原来RDD分区依次通过一个高阶函数进行处理 - mapPartitionsWithIndex
得到集合和索引 - aggregateByKey
将k,v形式的数据,转为k,c的形式 - sortByKey
根据k升序排序 - sortBy
指定排序方法 - cogroup
根据key进行合并,并进行分组
4.2 执行算子
触发所有转换算子进行RDD的逻辑操作
- collect
把所有计算结果当做一个集合返回 - count
返回rdd中元素个数 - first
取出rdd中第一个元素 - take
取出前n个元素 - takeordered
取出排序以后的前n个元素 - countByKey
统计k,v格式中,k的个数 - foreach
遍历rdd中的元素,并依次传入func作为参数 - saveAsTextFile
将rdd的执行结果保存到本地文件