姓名: 总分:
Hadoop、Hive、HBase、数据集成、Scala阶段测试
一、选择题(共20道,每道0.5分)
1、下面哪个程序负责HDFS数据存储( C )
A. NameNode B. Jobtracher
C. DataNode D. SecondaryNameNode
2、下列哪个属性是hdfs-site.xml中的配置( C ) B
A. fs.defaultFS B. dfs.replication
C. yarn.resourcemanager.address D. mapreduce.framework.name
解析:
fs.defaultFS:用于指定HDFS(Hadoop Distributed File System)的默认文件系统URI。它是Java代码访问HDFS时使用的路径。
dfs.replication:该配置项用于设置HDFS中数据块的副本数。
yarn.resourcemanager.address:用于指定 YARN ResourceManager 的 RPC 服务器地址和端口号。ResourceManager 是 YARN 架构中的核心组件之一,负责接收客户端提交的作业(如 MapReduce 任务、Spark 任务等),并为这些作业分配资源(如内存、CPU)以在集群中的 NodeManager 上执行。
mapreduce.framework.name:该配置项用于指定MapReduce作业的运行框架。在Hadoop 2.x及更高版本中,MapReduce作业通常运行在YARN上,因此该配置项的值通常为
yarn
。它告诉MapReduce框架应该使用YARN来管理其作业的执行。
3、Hadoop-3.x集群中的HDFS的默认的数据块的大小是( D )
A. 256M B.32M
C.64M D.128M
4、Hadoop-3.x集群中的HDFS的默认的副本块的个数是( C )
A. 1 B. 2
C. 3 D. 4
5、请问以下哪个命令组成是错误的( C )
A.bin/hadoop fs -cat /data/c.txt B. sbin/hdfs dfsadmin -report
C. bin/hdfs namenode -format D.sbin/stop-dfs.sh
解析:
使用
hadoop fs
命令来访问HDFS文件系统,-cat
选项用于查看指定文件(这里是/data/c.txt
)的内容。用于请求HDFS报告其状态,包括健康报告、数据节点报告等。
在Hadoop中,
hdfs namenode
命令应该位于sbin
目录下,而不是bin
目录。这个命令用于格式化HDFS的NameNode,通常是在HDFS首次设置或需要重置HDFS元数据时使用的。用于停止HDFS守护进程(NameNode和DataNode)。
sbin
目录下包含了Hadoop的管理脚本,如启动和停止服务的脚本。
6、以下与HDFS类似的框架是( C )
A. NTFS B. FAT32
C. GFS D.EXT3
解析:
HDFS(Hadoop Distributed File System):Hadoop分布式文件系统,是一个高度容错性的系统,设计用来部署在低廉的硬件上,并能提供高吞吐量的数据访问,适合大规模数据集上的应用。
GFS(Google File System):Google文件系统,是一个可扩展的分布式文件系统,同样用于大型的、分布式的、对大量数据进行访问的应用。
NTFS(New Technology File System):是Windows操作系统的一个文件系统,NTFS主要用于个人计算机和小型服务器的数据存储和管理。
FAT32(File Allocation Table 32):是另一种文件系统,主要用于小型的存储设备,如U盘和早期的硬盘驱动器。
EXT3(Third extended filesystem):是Linux系统的一种常见的文件系统,主要用于Linux操作系统的数据存储和管理。
7、HBase启动不需要哪个进程( D )
A. HMaster B. HRegionServer
C. QuorumPeerMain D. NodeManager
A. HMaster:这是HBase的主节点进程,负责协调集群中的所有RegionServer并处理元数据操作。因此,HBase启动时需要HMaster进程。
B. HRegionServer:这是负责处理数据的读写请求以及Region的负载均衡的进程。HBase启动时需要RegionServer进程来提供数据服务。
C. QuorumPeerMain:这是ZooKeeper服务器的主类。如果HBase配置为使用ZooKeeper进行协调和管理,那么ZooKeeper(包括QuorumPeerMain进程)将是HBase启动过程中的一部分。虽然它不是HBase本身的进程,但HBase的运行依赖于ZooKeeper,因此在某种程度上可以认为HBase启动时需要ZooKeeper(包括QuorumPeerMain)的支持。
D. NodeManager:这是YARN(Yet Another Resource Negotiator)框架中的一部分,用于管理集群中的节点资源,如CPU、内存等。YARN是Hadoop生态系统中的一个资源管理和任务调度的框架,与HBase的核心功能不直接相关。HBase并不依赖于YARN或NodeManager来运行其基本的数据存储和查询功能。因此,HBase启动时不需要NodeManager进程。
8、下列哪个是纯离线数据采集工具( B )
A. FlinkX B. Sqoop
C. Flume D. Canal
FlinkX是一款基于Flink的分布式离线/实时数据同步插件,可实现多种异构数据源高效的数据同步。
Sqoop是一个纯离线的数据采集工具,主要用于将关系型数据库(如MySQL)中的数据导入到Hadoop的HDFS中,或者从HDFS导出到关系型数据库中。高度依赖MapReduce和YARN
Flume是一个分布式、高可靠性和高可用性的海量日志收集系统,主要用于实时采集日志数据。
Canal是一个基于MySQL数据库增量日志解析的实时数据同步工具,它主要用于提供增量数据订阅和消费。
9、Map的输出结果首先被写入( A )
A. 内存 B. 缓存
C. 磁盘 D. 以上都正确
Map的输出结果首先被写入的是一个内存缓冲区,这个缓冲区可以看作是内存的一部分。
10、MapReduce与HBase的关系,哪些描述是正确的?( B )
A. 两者不可或缺,MapReduce是HBase可以正常运行的保证
B. 两者不是强关联关系,没有MapReduce,HBase可以正常运行
C. MapReduce不可以直接访问HBase
D. 它们之间没有任何关系
A. 两者不可或缺,MapReduce是HBase可以正常运行的保证
- 这个描述并不准确。虽然MapReduce和HBase都是Hadoop生态系统中的重要组件,但它们各自承担着不同的职责。HBase是一个开源的、分布式的非关系型数据库,而MapReduce是一种用于大规模数据处理的编程模型。HBase的正常运行并不直接依赖于MapReduce,它可以独立运行,并通过其他方式(如HBase自带的计算框架Coprocessor)进行数据处理。
B. 两者不是强关联关系,没有MapReduce,HBase可以正常运行
- 这个描述是正确的。HBase和MapReduce是两个独立的项目,它们之间的关系是相互依赖但非强制的。HBase可以独立于MapReduce运行,并通过其他机制(如HBase Shell、HBase API等)进行数据的读写和管理。同时,MapReduce也可以处理非HBase来源的数据,如HDFS上的文件等。
C. MapReduce不可以直接访问HBase
- 这个描述不完全准确。实际上,MapReduce可以通过HBase提供的HBaseTableInputFormat类来直接访问HBase中的数据。这个类允许MapReduce任务将HBase表作为输入源,从而可以对HBase中的数据进行读取和处理。
D. 它们之间没有任何关系
- 这个描述显然是错误的。HBase和MapReduce都是Hadoop生态系统中的关键组件,它们之间存在着紧密的联系和交互。HBase可以作为MapReduce任务的输入或输出源,而MapReduce则可以对HBase中的数据进行高效的并行处理。
11.下列哪个不是Spark的执行模式? ( C )
A. Local B. YARN
C. Mesos D. HDFS
A. Local:这是Spark的一个执行模式,其中Spark应用程序在单个JVM进程中运行,通常用于开发、测试和调试目的。在这个模式下,Spark不需要启动集群,而是在本地机器上运行,非常适合小规模数据处理和快速原型开发。因此,A选项是Spark的一个执行模式。
B. YARN:YARN(Yet Another Resource Negotiator)是Apache Hadoop的资源管理器,用于在Hadoop集群上管理资源和调度任务。Spark可以在YARN上运行,将YARN作为集群管理器,以管理集群资源和调度Spark任务。YARN模式包括yarn-client和yarn-cluster两种运行模式,分别适用于不同的场景。因此,B选项也是Spark的一个执行模式。
C. Mesos:虽然Mesos是一个开源的集群管理器,用于管理跨多种框架(包括Spark)的集群资源,但它本身并不是Spark的一种执行模式。相反,Spark可以在Mesos上运行,利用Mesos提供的资源管理和调度功能。然而,在描述Spark的执行模式时,我们通常不会说“Mesos是Spark的一个执行模式”,而是说“Spark可以在Mesos上运行”。因此,C选项不是Spark的直接执行模式。
D. HDFS:HDFS(Hadoop Distributed File System)是Apache Hadoop的分布式文件系统,用于存储大数据集。它并不是Spark的执行模式,而是Spark可以访问的一种数据存储系统。Spark可以从HDFS中读取数据,进行处理,并将结果写回到HDFS中。然而,HDFS与Spark的执行模式是两个不同的概念。因此,D选项同样不是Spark的执行模式。
12.在Spark中,什么机制用于加速迭代计算? ( B )
A. Checkpointing B. Caching
C. Broadcasting D. Partitioning
解析:
Caching(缓存)
缓存机制是Spark中用于优化迭代计算的重要手段。Spark允许用户将RDD(弹性分布式数据集)或DataFrame等数据集缓存到内存中,以便在后续的计算中重用。
Checkpointing(检查点)
虽然Checkpointing也是Spark中的一种容错机制,但它主要用于在系统故障或节点故障时恢复数据,而不是直接用于加速迭代计算。
Broadcasting(广播变量)
Broadcasting是Spark中用于优化数据传输的一种机制,它允许将大变量(如模型参数、大配置对象等)从Driver端广播到所有Executor端,以减少每个Task的数据传输量。
Partitioning(分区)
Partitioning是Spark中对数据进行分布式存储和计算的基础。通过合理的分区策略,可以将大数据集分割成多个小数据集,并分布到不同的节点上进行并行处理。然而,分区本身并不直接加速迭代计算,而是为并行计算提供了基础。
-
下列哪个函数属于转换操作(Transformation)而不是行动操作(Action)?( C )
A. count() B. collect()
C. filter() D. saveAsTextFile()
-
在Spark中,
persist
和cache
方法有何区别? ( B )
A) cache
是persist
的一个别名,二者完全相同。
B) persist
提供多种存储级别,而cache
总是使用默认的存储级别。
C) cache
用于DataFrame,而persist
用于RDD。
D) persist
用于数据的持久化,而cache
用于数据的临时存储。
- 在Spark中,什么是“窄依赖”(Narrow Dependency)与“宽依赖”(Wide Dependency)?它们如何影响数据的并行处理? ?(A )
A) 窄依赖表示每个父RDD分区映射到子RDD的一个分区,而宽依赖涉及多个父分区到一个子分区,导致shuffle。
B) 宽依赖意味着数据不需要重分布,而窄依赖则需要shuffle。
C) 窄依赖和宽依赖都涉及到数据的shuffle,只是程度不同。
D) 窄依赖与宽依赖仅在DataFrame中存在,对RDD没有意义。
- Spark的
Job
和Stage
在执行过程中如何划分? ( D )
A) Job
由一系列Stage
组成,每个Stage
对应于一个shuffle操作。
B) Job
和Stage
是同义词,没有区别。
C) Stage
由一系列Job
组成,用于并行执行不同的任务。
D) Job
是由用户提交的任务,Stage
是DAGScheduler为优化执行计划而创建的最小执行单元。
解析:
- Job的定义
- Job是Spark中由用户提交的任务,通常是由一个Action操作(如
collect
、count
、save
、reduce
等)触发的。每个Action操作都会生成一个Job。
- Stage的划分
- Stage是Spark中Job处理过程要分为的几个阶段。DAGScheduler(有向无环图调度器)会根据RDD之间的依赖关系(特别是宽依赖,如shuffle操作)将Job划分为多个Stage。
- 划分Stage的依据是RDD之间的宽窄依赖。宽依赖(如
groupByKey
、reduceByKey
、join
等)会导致shuffle操作,从而需要在不同节点间重新分配数据。每当遇到宽依赖时,DAGScheduler就会切分出一个新的Stage。- Stage的数量取决于程序中宽依赖(即shuffle操作)的数量。每个Stage包含一组可以并行执行的任务(Tasks)。
- Task的定义
- Task是Spark中任务运行的最小单位,最终是以Task为单位运行在Executor中的。一个Stage会包含多个Task,这些Task的数量通常取决于Stage中最后一个RDD的分区数。
- Task的内容与Stage相同,但当分区数量为n时,会有n个相同效果的Task被分发到执行程序中执行。
- 在Spark中,
mapPartitions
与map
操作有何区别,以及在什么情况下使用mapPartitions
更合适? ( B )
A) mapPartitions
和map
都是对每个元素进行操作,没有区别。
B) mapPartitions
可以访问整个分区的数据,适用于需要对分区内的数据进行全局操作的场景。
C) map
操作可以改变分区的数量,而mapPartitions
不能。
D) mapPartitions
是map
的别名,用于提高代码可读性。
- Spark的
Kryo
序列化库如何帮助提高性能? ( B )
A) Kryo增加了序列化的复杂度,但提高了数据的完整性。
B) Kryo序列化库提供了一种更紧凑、更快的序列化方式,减少了网络传输和磁盘I/O的开销。
C) Kryo只用于Spark的内部通信,对外部数据无影响。
D) Kryo序列化库是默认的序列化方式,不需要配置。
- 在Spark中,
SparkSession
与SparkContext
的关系是什么?为何推荐使用SparkSession
? ( A )
A) SparkSession
是SparkContext
的封装,提供了更高级的功能,如SQL查询和数据源管理,SparkSession
简化了API,提高了易用性。
B) SparkSession
和SparkContext
可以互换使用,没有推荐使用的原因。
C) SparkContext
是SparkSession
的前身,SparkSession
仅用于Spark SQL。
D) SparkSession
用于管理执行器,SparkContext
用于管理Driver程序。
- Spark的
Broadcast Join
与Shuffle Hash Join
有何区别?在何种情况下应优先考虑使用Broadcast Join
? ( D )A
A) Broadcast Join
将较小的表广播到每个节点,减少shuffle成本;Shuffle Hash Join
需要更多网络传输,适用于大表间的连接。
B) Broadcast Join
和Shuffle Hash Join
没有区别,只是名称不同。
C) Shuffle Hash Join
总是优于Broadcast Join
,因为它更通用。
D) Broadcast Join
用于小数据集,Shuffle Hash Join
用于大数据集,但具体选择与数据大小无关。
解析:
Broadcast Join
和Shuffle Hash Join
是Spark SQL中处理连接(Join)操作的两种不同策略。Broadcast Join
适用于连接操作中的一个小表,它会将这个小表广播到每个节点上,从而避免了大表的shuffle操作,减少了网络传输成本。而Shuffle Hash Join
则适用于大数据集之间的连接,它需要对数据进行shuffle操作来确保连接的正确性。因此,在连接操作中的一个小表时,应优先考虑使用Broadcast Join
。注意,实际选择哪种连接策略还取决于其他因素,如数据分布、集群配置等,但数据大小是一个重要的考虑因素。
二、填空题(共20分,每空0.5分)
1、启动hdfs的shell脚本是:( sh xxx.sh )start-dfs.sh
解析:
启动hdfs的shell脚本是:
start-dfs.sh
。这个脚本用于启动Hadoop分布式文件系统(HDFS)的所有守护进程,包括NameNode和DataNode等。
2、Block是HDFS的基本存储单元,默认大小是( 128 ) MB
3、MapReduce默认输入的格式化类:( InputFormat )TextInputFormat
解析:
MapReduce默认输入的格式化类是
TextInputFormat
。这是MapReduce的默认输入格式,它读取文件的行作为输入,其中行的字节偏移量作为键(Key),行的内容作为值(Value)。
4、Hadoop三大组件:( HDFS )、( MapReduce )、( Yarn )
5、Hiveserver2默认的端口:( 10000 )
解析:
Hiveserver2默认的端口是10000。Hiveserver2是Hive的一个服务组件,它允许用户通过JDBC或ODBC等协议远程连接到Hive并执行SQL查询。
6、HBase的RowKey设计三大原则:( 唯一性 )、( 散列性 )、( 长度适中 )
解析:
- 唯一性:确保RowKey的唯一性,以便能够唯一标识表中的每一行数据。
- 散列性:设计RowKey时应该考虑其散列性,避免大量数据集中在少数几个Region上,从而导致热点问题。
- 长度适中:RowKey的长度应该适中,不宜过长也不宜过短。过长的RowKey会占用较多的存储空间,而过短的RowKey则可能增加数据倾斜的风险。
7、在Spark中,一个( RDD )由一系列的( 分区 )组成,每个Stage由一组( RDD )构成,而Stage之间的依赖关系通常由( 行动算子 )操作触发。
8、hive中数据文件默认存储格式是( txt )TextFile
解析:
hive中数据文件默认存储格式是TextFile。TextFile是一种简单的文本格式,数据以行为单位进行存储,每行数据之间通过换行符分隔。
9、spark core中缓存的实现方式有几种( 使用cache存储到内存中 )( 使用checkpoint存储到磁盘中 )
10、hive中sql转变成mr经过4个器,分别是解析器,( 编译器 )、( 优化器 )、( 执行器 )
三、判断题(共10道,每道1分)
1、Block Size是不可以修改的( F )
2、如果NameNode意外终止,SecondaryNameNode会接替它使集群继续工作( F )
3、MapReduce 切片的大小等于 block的大小( F )
4、在HBase中由HMaster负责用户的IO请求( F )
5、MapReduce中map任务的数量可以自己指定( T )
6、DataX只能用于离线数据采集( T )
7、Flume运行时需要依赖MapReduce( T )F
解析:
Apache Flume是一个分布式、可靠且可用的服务,用于高效地收集、聚合和移动大量日志数据。它并不依赖MapReduce来运行,而是可以独立于Hadoop生态系统运行。
8、MapReduce中环形缓冲区默认大小为128M( F )
9、Spark的SparkContext
和SparkSession
可以同时存在于同一个应用中,SparkContext
提供了更多低级的API,而SparkSession
则提供了高层的API,包括SQL和数据源支持。( T )
10、Spark的map
操作是懒惰求值的,只有在触发行动操作时才会执行计算。( T )
四、简答题(共5道,每道4分)
1、用自己的语言描述SecondaryNameNode的作用。
SecondaryNameNode作为NameNode的秘书,帮助NameNode处理事务。
SecondaryNameNode是用来帮助NameNode完成元数据信息合并,从角色上看属于NameNode的“秘书”
1.定期合并FsImage和Edits文件
-
提供HDFS元数据的冷备份
-
监控HDFS状态
-
提升HDFS的可靠性和性能
2、用自己的语言描述spark的数据倾斜优化方式。
1.使用Hive ETL预处理数据如果导致数据倾斜的是Hive表。如果该Hive表中的数据本身很不均匀(比如某个 key对应了100万数据,其他key才对应了10条数据)。此时可以评估一下,是否可以通过Hive来进行数据预处理(即通过Hive ETL预先对 数据按照key进行聚合,或者是预先和其他表进行join),然后在Spark作业中针对的数据源就不是 原来的Hive表了,而是预处理后的Hive表。此时由于数据已经预先进行过聚合或join操作了,那么 在Spark作业中也就不需要使用原先的shuffle类算子执行这类操作了。 2.过滤少数导致倾斜的key如果发现导致倾斜的key就少数几个,而且对计算本身的影响并不大。如果我们判断那少数几个数据量特别多的key,对作业的执行和计算结果不是特别 重要的话,那么干脆就直接过滤掉那少数几个key。比如,在Spark SQL中可以使用where子句过滤 掉这些key或者在Spark Core中对RDD执行filter算子过滤掉这些key。如果需要每次作业执行时, 动态判定哪些key的数据量最多然后再进行过滤,那么可以使用sample算子对RDD进行采样,然后 计算出每个key的数量,取数据量最多的key过滤掉即可。 3.提高shuffle操作的并行度在对RDD执行shuffle算子时,给shuffle算子传入一个参数,比如 reduceByKey(1000),该参数就设置了这个shuffle算子执行时shuffle read task的数量。对于 Spark SQL中的shuffle类语句,比如group by、join等,需要设置一个参数,即 spark.sql.shuffle.partitions,该参数代表了shuffle read task的并行度,该值默认是200,对于很 多场景来说都有点过小。增加shuffle read task的数量,可以让原本分配给一个task的多个key分配给多个 task,从而让每个task处理比原来更少的数据。4.双重聚合对RDD执行reduceByKey等聚合类shuffle算子或者在Spark SQL中使用group by 语句进行分组聚合时,比较适用这种方案。这个方案的核心实现思路就是进行两阶段聚合。第一次是局部聚合,先给每个key 都打上一个随机数,比如10以内的随机数,此时原先一样的key就变成不一样的了,比如(hello, 1) (hello, 1) (hello, 1) (hello, 1),就会变成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接着 对打上随机数后的数据,执行reduceByKey等聚合操作,进行局部聚合,那么局部聚合结果,就会 变成了(1_hello, 2) (2_hello, 2)。然后将各个key的前缀给去掉,就会变成(hello,2)(hello,2),再次 进行全局聚合操作,就可以得到最终结果了,比如(hello, 4)。 5.将reduce join转为map join在对RDD使用join类操作,或者是在Spark SQL中使用join语句时,而且join操作中 的一个RDD或表的数据量比较小(比如几百M或者一两G),比较适用此方案。 不使用join算子进行连接操作,而使用Broadcast变量与map类算子实现join操作, 进而完全规避掉shuffle类的操作,彻底避免数据倾斜的发生和出现。将较小RDD中的数据直接通过 collect算子拉取到Driver端的内存中来,然后对其创建一个Broadcast变量;接着对另外一个RDD 执行map类算子,在算子函数内,从Broadcast变量中获取较小RDD的全量数据,与当前RDD的每 一条数据按照连接key进行比对,如果连接key相同的话,那么就将两个RDD的数据用你需要的方式 连接起来。 6.采样倾斜key并分拆join操作两个RDD/Hive表进行join的时候,如果数据量都比较大,无法采用“解决方案五 ”,那么此时可以看一下两个RDD/Hive表中的key分布情况。如果出现数据倾斜,是因为其中某一 个RDD/Hive表中的少数几个key的数据量过大,而另一个RDD/Hive表中的所有key都分布比较均 匀,那么采用这个解决方案是比较合适的。思路:对包含少数几个数据量过大的key的那个RDD,通过sample算子采样出一份样本来,然后统计一下每个 key的数量,计算出来数据量最大的是哪 几个key。 然后将这几个key对应的数据从原来的RDD中拆分出来,形成一个单独的RDD,并给每个key都打上n以 内的随机数作为前缀,而不会导致倾斜的大部分key形成另外一个RDD。 接着将需要join的另一个RDD,也过滤出来那几个倾斜key对应的数据并形成一个单独的RDD,将每条数 据膨胀成n条数据,这n条数据都按顺序附加一个0~n的前缀,不会导致倾斜的大部分key也形成另外一个 RDD。 再将附加了随机前缀的独立RDD与另一个膨胀n倍的独立RDD进行join,此时就可以将原先相同的key打 散成n份,分散到多个task中去进行join了。 而另外两个普通的RDD就照常join即可。 最后将两次join的结果使用union算子合并起来即可,就是最终的join结果。 7.使用随机前缀和扩容RDD进行join如果在进行join操作时,RDD中有大量的key导致数据倾斜,那么进行分拆key也没 什么意义。该方案的实现思路基本和“解决方案六”类似,首先查看RDD/Hive表中的数据分布情况,找到那个造成 数据倾斜的RDD/Hive表,比如有多个key都对应了超过1万条数据。 然后将该RDD的每条数据都打上一个n以内的随机前缀。 同时对另外一个正常的RDD进行扩容,将每条数据都扩容成n条数据,扩容出来的每条数据都依次打上一 个0~n的前缀。 最后将两个处理后的RDD进行join即可。
3、用自己的语言描述诉MapReduce流程。
一、输入分片(Input Splitting)
- 过程描述:在MapReduce作业开始前,输入文件(或文件夹)首先被划分为多个InputSplit(输入分片)。默认情况下,每个HDFS的block(数据块)对应一个InputSplit。这样做的目的是将大文件分割成多个小块,以便并行处理。
- 分片大小:分片的大小通常与HDFS的数据块大小相同(默认是128MB),但也可以根据作业需求进行调整。分片时不考虑数据集整体,而是逐个针对每一个文件单独切片。
二、Map阶段
- InputFormat类:使用InputFormat类的子类(如TextInputFormat)把输入文件(夹)划分为InputSplit。
- RecordReader类:每个InputSplit通过RecordReader类被解析成一个个<key,value>键值对。在TextInputFormat中,默认是每行的起始偏移量作为key,每行的内容作为value。
- Mapper类:框架调用Mapper类中的map函数,输入是<k1,v1>键值对,输出是<k2,v2>键值对。程序员可以覆盖map函数,实现自己的逻辑。
三、Combiner阶段(可选)
- 过程描述:Combiner是一个本地化的reduce操作,发生在map端。它的主要作用是减少map端的输出,从而减少shuffle过程中网络传输的数据量,提高作业的执行效率。
- 注意:Combiner的输出是Reducer的输入,因此它绝不能改变最终的计算结果。Combiner适合于等幂操作,如累加、最大值等。
四、Shuffle阶段
- 分区:在map函数处理完数据后,输出的<k2,v2>键值对会根据key进行分区,不同的分区由不同的reduce task处理。分区操作通常通过哈希函数实现。
- 排序与合并:在写入环形缓冲区之前,数据会先进行排序(默认采用快速排序算法)。当缓冲区满(默认为80%容量)时,数据会溢出到磁盘文件中,并在溢出前完成排序。多个溢出文件在最终输出前会进行归并排序,合并成一个大的有序文件。
- 数据传输:map任务完成后,reduce任务会启动数据copy线程,通过HTTP方式请求map任务所在的NodeManager以获取输出文件。
五、Reduce阶段
- 数据合并:Reduce任务将接收到的所有map任务的输出数据(已分区且区内有序)进行合并,相同key的value值会被放到同一个集合中。
- Reducer类:框架调用Reducer类中的reduce函数,对合并后的数据进行处理,最终输出结果。
- OutputFormat类:使用OutputFormat类的子类(如TextOutputFormat)将最终结果输出到文件或数据库等存储介质中。
4、谈谈Hive的优化。
1.本地模式运行,当处理一些小任务时可以选择本地模式运行,这样会使得任务执行的速度会很快。
2.JVM模式,在处理一些需要很多资源的任务时,可以先申请一部分的资源,等运行结束后再将资源释放。
3.严格模式,启动严格模式,禁止分区表的全表扫描,查询数据时必须加limit,禁止笛卡尔积。
4.hive join的数据倾斜问题,当小表join小表时,不用去管它;当小表join大表时,小表放在join的左边;当大表join大表时,应当考虑是否会出现某个reduce数据量过大的情况。空key过滤:当有大量数据同时放入一个reduce时,应当观察该rowkey,一般来说该rowkey对应的数据都是异常数据,需要使用sql语句对其进行过滤。空key转换:当有大量的数据都对应一个空的rowkey时,需要给这些数据随机分配一个rowkey,使它们均匀的分布到一些reduce中。
5.自定义map和reduce的数量,一般不去修改它。
5、用自己的语言描述诉spark的资源调度和任务调度流程。
spark的资源调度:driver向resourcemanager申请资源,resourcemanager选择一个空闲的子节点开启applicationmaster任务,applicationmaster向resourcemanager提交申请资源开启executor的任务。applicationmaster选择一个空闲子节点开启executor,
开启完毕后applicationmaster将executor开启的消息发送给driver,让driver发送执行任务。
spark的任务调度流程:driver端,遇到action算子触发任务执行,将任务提交到有向无环图,DAGscheduler中,根据RDD的血缘关系划分划分stage,将RDD中的分区封装成taskset任务,发送到TASKscheduler。TASKscheduler取出taskset任务,根据RDD 的提供最优的任务执行计划,只移动计算不移动数据,开始对执行任务。
spark的资源调度:
1、Driver提交作业命令
2、向ResourceMananger申请资源
3、ResourceMananger检查一些权限、资源空间,在一个相对空闲的子节点上开启一个ApplicationMaster的进程
4、ApplicationMaster向ResourceMananger申请资源,启动Executor
5、ResourceMananger启动Executor
6、Executor反向注册给Driver,告诉Driver资源申请成功,可以发送任务
spark的任务调度流程:
7、Driver遇到一个行动算子,触发整个作业调度
8、先将整个作业交给DAG有向无环图
DAG Scheduler
9、根据RDD之间的血缘关系,找出宽窄依赖(有没有shuffle的产生)
10、通过宽窄依赖划分stage阶段
11、根据stage阶段,将stage中的task任务封装成一个taskSet对象
12、发送给后续的 Task Scheduler
Task Scheduler
13、从DAG Scheduler发送过来的taskSet中取出task任务
14、根据RDD五大特性的最后一大特性,只移动计算不移动数据,将task任务发送到对应的Executor的线程池中执行
五、代码题(50分)
1、spark sql数据分析以及可视化(30分)
疫情期间各类政府媒体及社交网站,均发布了相关疫情每日统计数据,下面基于数据仓库工具Hive请你统计分析相关疫情数据。
提示:
(数据字段为:日期date、省份province、城市city、新增确诊confirm、新增出院heal、新增死亡dead、消息来源source)
部分数据截图:
题目:
请基于covid19.csv数据,将数据导入到Hive中,使用spark on hive读取数据使用纯SQL完成下列统计分析
请自行在Hive按照数据结构创建对应的表并加载数据
请给出代码语句及结果截图
-
1、统计湖北省每月新增出院病例总数最多的前3个城市(8分)
输出:[月份,城市,每月新增出院病例总数,排名]
create table bigdata30_test3.covid (
dates String,
province String,
city String,
confirm Int,
heal Int,
dead Int,
source String
)ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LOCATION '/bigdata30/data/';select
tt1.month,
tt1.city,
tt1.counts as counts,
tt1.rank
from
(select
t1.month,
t1.city,
t1.counts as counts,
-- row_number:数据相同也不会导致排名重复
row_number() over(partition by t1.month order by t1.counts desc) rank
from
(select
province,
city,
-- 只取出月份
subString(dates,0,2) as month,
count(heal) as counts
from bigdata30_test3.covid
where province = "湖北" and city != "境外输入-英国"
-- 只取出月份用来分组
group by city,province,subString(dates,0,2)) t1) tt1
-- 取每个排序的前三名
WHERE tt1.rank <= 3;结果:
1月 武汉市 24 1
1月 荆门市 10 2
1月 荆州市 10 3
2月 武汉市 62 1
2月 黄石市 54 2
2月 黄冈市 46 3
3月 武汉市 41 1
3月 鄂州市 32 2
3月 孝感市 28 3
4月 武汉市 29 1
4月 荆门市 1 2
4月 襄阳市 1 3
-
2、统计安徽省每月新增确诊人数同比
同比 = (当月指标 - 上月指标)/ 上月指标 (6分)
输出:[月份,每月新增确诊人数,上月新增确诊人数,同比]
将该需求结果写入到mysql中,使用fileBI作图,柱状图 (4分)
-- 纯sql DBeaver中执行 select t1.month, t1.counts, LAG(counts,1,-1) over(order by t1.month) as last_counts, casewhen LAG(counts,1,-1) over(order by t1.month) < 0 then '没有上一个月的数据'else round(((t1.counts - LAG(counts,1,-1) over(order by t1.month)) / LAG(counts,1,-1) over(order by t1.month)),2) end as rate from (select subString(dates,0,2) as month, province, count(confirm) as counts from bigdata30_test3.covid where province = "安徽" group by province,subString(dates,0,2)) t1;-- MySQL建表 create table dataMysql ( months varchar(30), counts varchar(30), last_counts varchar(30), rate varchar(30) )
// 为了将数据写入到MySQL,使用sparksqlpackage com.shujia.DSLexamimport org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Row, SparkSession}import java.sql.{Connection, DriverManager, PreparedStatement}object Exam2_1 {def main(args: Array[String]): Unit = {val sparkSession: SparkSession = SparkSession.builder().master("local").appName("考试大题一")//参数设置的优先级:代码优先级 > 命令优先级 > 配置文件优先级.config("spark.sql.shuffle.partitions", "1").enableHiveSupport() // 开启hive的配置.getOrCreate()sparkSession.sql("use bigdata30_test3")//truncate = false 时,完整地显示某列值,不进行任何截断。val dataDF: DataFrame = sparkSession.sql("""|select|t1.month,|t1.counts,|LAG(counts,1,-1) over(order by t1.month) as last_counts,|case| when LAG(counts,1,-1) over(order by t1.month) < 0 then 0| else round(((t1.counts - LAG(counts,1,-1) over(order by t1.month)) / LAG(counts,1,-1) over(order by t1.month)),2)|end as rate|from|(select|subString(dates,0,2) as month,|province,|count(confirm) as counts|from|bigdata30_test3.covid|where province = "安徽"|group by province,subString(dates,0,2)) t1|""".stripMargin)dataDF.foreach((rdd: Row) => {//注册驱动Class.forName("com.mysql.jdbc.Driver")//创建数据库连接对象val conn: Connection = DriverManager.getConnection("jdbc:mysql://master:3306/exam?useUnicode=true&characterEncoding=UTF-8&useSSL=false","root","123456")//创建预编译对象val statement: PreparedStatement = conn.prepareStatement("insert into dataMysql values(?,?,?,?)")statement.setString(1, rdd.getAs[String]("month"))statement.setLong(2, rdd.getAs[Long]("counts"))statement.setLong(3, rdd.getAs[Long]("last_counts"))statement.setDouble(4, rdd.getAs[Double]("rate"))// 执行sql语句statement.executeUpdate()statement.close()conn.close()})} }
fineBI作图
-
3、统计安徽省各城市连续新增确诊人数、连续新增确诊开始日期、连续新增确诊结束日期及连续新增确诊天数(12分)
输出:[城市,连续新增确诊人数,连续新增确诊开始日期,连续新增确诊结束日期,连续新增确诊天数]
select DISTINCT
tt1.city,
max(tt1.confirm) over(partition by tt1.city,tt1.flag,SPLIT(tt1.new_day, '-')[0]) - min(tt1.confirm) over(partition by tt1.city,tt1.flag,SPLIT(tt1.new_day, '-')[0]) as add_confirm,
min(tt1.new_day) over(partition by tt1.city,tt1.flag,SPLIT(tt1.new_day, '-')[0]) as start_date,
max(tt1.new_day) over(partition by tt1.city,tt1.flag,SPLIT(tt1.new_day, '-')[0]) as end_date,
count(1) over(partition by tt1.city,tt1.flag,SPLIT(tt1.new_day, '-')[0]) as counts
from
(select
*,
CAST(SPLIT(t1.new_day, '-')[1] AS INT) AS day_of_month,
(CAST(SPLIT(t1.new_day, '-')[1] AS INT) - t1.rank) as flag
from
(select * ,
from_unixtime(unix_timestamp(dates,'MM月dd日'),'MM-dd') as new_day,
row_number() over(partition by city order by dates) rank
from bigdata30_test3.covid
where province = "安徽" and source = "安徽卫健委" and heal IS NOT NULL and confirm IS NOT NULL and dead IS NOT NULL) t1) tt1-- 连续新增确诊人数应该是,求出的数据应该是本组的最后一条的confirm减去本组第一天的confirm,而不是下面的一组中的最大的confirm减去最小的confirm ??? 该如何求解
max(tt1.confirm) over(partition by tt1.city,tt1.flag,SPLIT(tt1.new_day, '-')[0]) -min(tt1.confirm) over(partition by tt1.city,tt1.flag,SPLIT(tt1.new_day, '-')[0]) as add_confirm,-- 解决方案:FIRST_VALUE()和LAST_VALUE()函数分别获取了每个分组的第一天和最后一天的确诊人数-- 为了避免下述中出现的分组全出现在结果中的问题,使用FIRST_VALUE()和LAST_VALUE()函数时,
-- 最好指定ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
SELECT DISTINCTtt1.city,LAST_VALUE(tt1.confirm) OVER (PARTITION BY tt1.city, tt1.flag, SPLIT(tt1.new_day, '-')[0] ORDER BY tt1.new_day ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) -FIRST_VALUE(tt1.confirm) OVER (PARTITION BY tt1.city, tt1.flag, SPLIT(tt1.new_day, '-')[0] ORDER BY tt1.new_day ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS add_confirm,MIN(tt1.new_day) OVER (PARTITION BY tt1.city, tt1.flag, SPLIT(tt1.new_day, '-')[0]) AS start_date,MAX(tt1.new_day) OVER (PARTITION BY tt1.city, tt1.flag, SPLIT(tt1.new_day, '-')[0]) AS end_date,COUNT(1) OVER (PARTITION BY tt1.city, tt1.flag, SPLIT(tt1.new_day, '-')[0]) AS counts
FROM(SELECT*,CAST(SPLIT(t1.new_day, '-')[1] AS INT) AS day_of_month,(CAST(SPLIT(t1.new_day, '-')[1] AS INT) - t1.rank) AS flagFROM(SELECT *,FROM_UNIXTIME(UNIX_TIMESTAMP(dates, 'MM月dd日'), 'MM-dd') AS new_day,ROW_NUMBER() OVER (PARTITION BY city ORDER BY dates) AS rankFROM bigdata30_test3.covidWHERE province = '安徽' AND source = '安徽卫健委' AND heal IS NOT NULL AND confirm IS NOT NULL AND dead IS NOT NULL) t1) tt1;-- 注:
在SQL中,ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING是窗口函数(如FIRST_VALUE(), LAST_VALUE(), ROW_NUMBER(), SUM(), AVG()等)的一个子句,用于指定窗口的范围。
UNBOUNDED PRECEDING表示窗口的起始点是分区中的第一行。
UNBOUNDED FOLLOWING表示窗口的结束点是分区中的最后一行。对于FIRST_VALUE()和LAST_VALUE()这样的函数,它们通常需要一个明确的窗口定义来确定“第一”和“最后”是基于什么范围来计算的。如果不提供ROWS BETWEEN子句,某些数据库系统可能会报错,因为它们不知道应该基于哪些行来计算这些值。-- 不加也可正常执行,但是不能加上ORDER BY tt1.new_day,否则会出现整个分组都出现在最终的结果里
形如:
|合肥市| 0| 01-28| 01-30| 3|
|合肥市| 10| 01-28| 01-30| 3|
|合肥市| 7| 01-28| 01-30| 3|
SELECT DISTINCTtt1.city,LAST_VALUE(tt1.confirm) OVER (PARTITION BY tt1.city, tt1.flag, SPLIT(tt1.new_day, '-')[0]) -FIRST_VALUE(tt1.confirm) OVER (PARTITION BY tt1.city, tt1.flag, SPLIT(tt1.new_day, '-')[0]) AS add_confirm,MIN(tt1.new_day) OVER (PARTITION BY tt1.city, tt1.flag, SPLIT(tt1.new_day, '-')[0]) AS start_date,MAX(tt1.new_day) OVER (PARTITION BY tt1.city, tt1.flag, SPLIT(tt1.new_day, '-')[0]) AS end_date,COUNT(1) OVER (PARTITION BY tt1.city, tt1.flag, SPLIT(tt1.new_day, '-')[0]) AS counts
FROM(SELECT*,CAST(SPLIT(t1.new_day, '-')[1] AS INT) AS day_of_month,(CAST(SPLIT(t1.new_day, '-')[1] AS INT) - t1.rank) AS flagFROM(SELECT *,FROM_UNIXTIME(UNIX_TIMESTAMP(dates, 'MM月dd日'), 'MM-dd') AS new_day,ROW_NUMBER() OVER (PARTITION BY city ORDER BY dates) AS rankFROM bigdata30_test3.covidWHERE province = '安徽' AND source = '安徽卫健委' AND heal IS NOT NULL AND confirm IS NOT NULL AND dead IS NOT NULL) t1) tt1;
没指定ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING,又加上了order by dates: 会出现下面这种情况
2、spark DSL数据分析(20分)
现有三份数据,结构如下:
-
live_types 直播间信息表
结构:live_id live_type
直播间id 直播间类型
-
live_events 用户访问直播间记录表
结构:user_id live_id start_time end_time
用户id 直播间id 开始时间 结束时间
-
user_info 用户信息表
结构:user_id user_name
用户id 用户名
题目:
请给出结果截图及Scala代码
1、统计每位用户观看不同类型直播的次数(6分)
输出:[用户id,用户名,直播间类型,次数]
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.{DataFrame, SparkSession}object Exam2 {def main(args: Array[String]): Unit = {val sparkSession: SparkSession = SparkSession.builder().master("local").appName("社保练习").getOrCreate()import org.apache.spark.sql.functions._import sparkSession.implicits._// user_id live_id start_time end_time// 用户id 直播间id 开始时间 结束时间val live_events: DataFrame = sparkSession.read.format("csv").option("sep", "\t").schema("user_id INT,live_id INT,start_time timestamp,end_time timestamp").load("spark/data_exam/live_events.txt")
// live_events.show()//live_id live_type// 直播间id 直播间类型val live_types: DataFrame = sparkSession.read.format("csv").option("sep", "\t").schema("live_id INT,live_type String").load("spark/data_exam/live_types.txt")
// live_types.show()//user_id user_name// 用户id 用户名val user_info: DataFrame = sparkSession.read.format("csv").option("sep", "\t").schema("user_id INT,user_name String").load("spark/data_exam/user_info.txt")
// user_info.show()/*** 1、统计每位用户观看不同类型直播的次数(6分)* > 输出:[用户id,用户名,直播间类型,次数]*/live_events.withColumn("count",count(expr("1")) over Window.partitionBy($"user_id",$"live_id"))// TODO 设置表与表之间进行左连接.join(live_types, live_events("live_id") === live_types("live_id"), "left").join(user_info, live_events("user_id") === user_info("user_id"), "left")/*** 两张表中的字段名相同,要注明字段所属表* 否则会报 Reference 'user_id' is ambiguous, could be: user_id, user_id.*/.select(live_events("user_id"),$"user_name",$"live_type",$"count").distinct().show()+-------+---------+---------+-----+
|user_id|user_name|live_type|count|
+-------+---------+---------+-----+
| 106| Lucy| music| 1|
| 100| LiHua| game| 1|
| 102| Tom| food| 1|
| 104| Bush| game| 1|
| 105| Jam| game| 1|
| 102| Tom| music| 1|
| 100| LiHua| food| 2|
| 101| Bob| food| 1|
| 101| Bob| game| 1|
| 102| Tom| game| 1|
| 104| Bush| food| 1|
+-------+---------+---------+-----+
2、统计每位用户累计观看直播时长,按时长降序排列(6分)
输出:[用户id,用户名,累计时长]
/*** 2、统计每位用户累计观看直播时长,按时长降序排列(6分)* > 输出:[用户id,用户名,累计时长]*/// 100 1 2021-12-01 19:00:00 2021-12-01 19:28:00 start_time timestamp,end_timelive_events.withColumn("times",(unix_timestamp($"end_time","yyyy-MM-dd HH:mm:ss") - unix_timestamp($"start_time","yyyy-MM-dd HH:mm:ss"))).withColumn("all_times", sum($"times") over Window.partitionBy($"user_id")).join(user_info,"user_id")// TODO 时间戳 / 60 ,在最后查询时,可以将秒转换成分钟.select($"user_id",$"user_name",$"all_times" / 60).distinct().orderBy($"all_times".desc).show()+-------+---------+----------------+
|user_id|user_name|(all_times / 60)|
+-------+---------+----------------+
| 104| Bush| 178.0|
| 101| Bob| 163.0|
| 102| Tom| 140.0|
| 106| Lucy| 129.0|
| 100| LiHua| 110.0|
| 105| Jam| 8.0|
+-------+---------+----------------+
3、统计不同类型直播用户累计观看时长降序排名(8分)
输出:[直播间id,直播间类型,用户id,用户名,累计时长,排名]
/*** 3、统计不同类型直播用户累计观看时长降序排名(8分)* > 输出:[直播间id,直播间类型,用户id,用户名,累计时长,排名]*/live_events//TODO 在开始得出时间戳的时候就将其转换成以分钟为单位.withColumn("times", (unix_timestamp($"end_time", "yyyy-MM-dd HH:mm:ss") - unix_timestamp($"start_time", "yyyy-MM-dd HH:mm:ss")) / 60).withColumn("all_times", sum($"times") over Window.partitionBy($"user_id")).join(user_info, "user_id").join(live_types, "live_id").withColumn("rank", row_number() over Window.partitionBy($"live_type").orderBy($"all_times".desc)).select($"live_id",$"live_type",$"user_id",$"user_name",$"all_times",$"rank").show()+-------+---------+-------+---------+---------+----+
|live_id|live_type|user_id|user_name|all_times|rank|
+-------+---------+-------+---------+---------+----+
| 1| food| 104| Bush| 178.0| 1|
| 1| food| 101| Bob| 163.0| 2|
| 1| food| 102| Tom| 140.0| 3|
| 1| food| 100| LiHua| 110.0| 4|
| 1| food| 100| LiHua| 110.0| 5|
| 3| music| 102| Tom| 140.0| 1|
| 3| music| 106| Lucy| 129.0| 2|
| 2| game| 104| Bush| 178.0| 1|
| 2| game| 101| Bob| 163.0| 2|
| 2| game| 102| Tom| 140.0| 3|
| 2| game| 100| LiHua| 110.0| 4|
| 2| game| 105| Jam| 8.0| 5|
+-------+---------+-------+---------+---------+----+