您的位置:首页 > 汽车 > 新车 > Hadoop、Hive、HBase、数据集成、Scala阶段测试

Hadoop、Hive、HBase、数据集成、Scala阶段测试

2025/1/5 21:36:19 来源:https://blog.csdn.net/m0_58050808/article/details/140673437  浏览:    关键词:Hadoop、Hive、HBase、数据集成、Scala阶段测试
			姓名:											总分:

Hadoop、Hive、HBase、数据集成、Scala阶段测试

一、选择题(共20道,每道0.5分)

1、下面哪个程序负责HDFS数据存储( C )

A. NameNode B. Jobtracher
C. DataNode D. SecondaryNameNode

2、下列哪个属性是hdfs-site.xml中的配置( C ) B
A. fs.defaultFS B. dfs.replication
C. yarn.resourcemanager.address D. mapreduce.framework.name

解析:

fs.defaultFS:用于指定HDFS(Hadoop Distributed File System)的默认文件系统URI。它是Java代码访问HDFS时使用的路径。

dfs.replication:该配置项用于设置HDFS中数据块的副本数。

yarn.resourcemanager.address:用于指定 YARN ResourceManager 的 RPC 服务器地址和端口号。ResourceManager 是 YARN 架构中的核心组件之一,负责接收客户端提交的作业(如 MapReduce 任务、Spark 任务等),并为这些作业分配资源(如内存、CPU)以在集群中的 NodeManager 上执行。

mapreduce.framework.name:该配置项用于指定MapReduce作业的运行框架。在Hadoop 2.x及更高版本中,MapReduce作业通常运行在YARN上,因此该配置项的值通常为yarn。它告诉MapReduce框架应该使用YARN来管理其作业的执行。

3、Hadoop-3.x集群中的HDFS的默认的数据块的大小是( D )
A. 256M B.32M
C.64M D.128M
4、Hadoop-3.x集群中的HDFS的默认的副本块的个数是( C )
A. 1 B. 2
C. 3 D. 4

5、请问以下哪个命令组成是错误的( C )
A.bin/hadoop fs -cat /data/c.txt B. sbin/hdfs dfsadmin -report
C. bin/hdfs namenode -format D.sbin/stop-dfs.sh

解析:

使用hadoop fs命令来访问HDFS文件系统,-cat选项用于查看指定文件(这里是/data/c.txt)的内容。

用于请求HDFS报告其状态,包括健康报告、数据节点报告等。

在Hadoop中,hdfs namenode命令应该位于sbin目录下,而不是bin目录。这个命令用于格式化HDFS的NameNode,通常是在HDFS首次设置或需要重置HDFS元数据时使用的。

用于停止HDFS守护进程(NameNode和DataNode)。sbin目录下包含了Hadoop的管理脚本,如启动和停止服务的脚本。

6、以下与HDFS类似的框架是( C )
A. NTFS B. FAT32
C. GFS D.EXT3

解析:

HDFS(Hadoop Distributed File System):Hadoop分布式文件系统,是一个高度容错性的系统,设计用来部署在低廉的硬件上,并能提供高吞吐量的数据访问,适合大规模数据集上的应用。

GFS(Google File System):Google文件系统,是一个可扩展的分布式文件系统,同样用于大型的、分布式的、对大量数据进行访问的应用。

NTFS(New Technology File System):是Windows操作系统的一个文件系统,NTFS主要用于个人计算机和小型服务器的数据存储和管理。

FAT32(File Allocation Table 32):是另一种文件系统,主要用于小型的存储设备,如U盘和早期的硬盘驱动器。

EXT3(Third extended filesystem):是Linux系统的一种常见的文件系统,主要用于Linux操作系统的数据存储和管理。

7、HBase启动不需要哪个进程( D )

A. HMaster B. HRegionServer
C. QuorumPeerMain D. NodeManager

A. HMaster:这是HBase的主节点进程,负责协调集群中的所有RegionServer并处理元数据操作。因此,HBase启动时需要HMaster进程。

B. HRegionServer:这是负责处理数据的读写请求以及Region的负载均衡的进程。HBase启动时需要RegionServer进程来提供数据服务。

C. QuorumPeerMain:这是ZooKeeper服务器的主类。如果HBase配置为使用ZooKeeper进行协调和管理,那么ZooKeeper(包括QuorumPeerMain进程)将是HBase启动过程中的一部分。虽然它不是HBase本身的进程,但HBase的运行依赖于ZooKeeper,因此在某种程度上可以认为HBase启动时需要ZooKeeper(包括QuorumPeerMain)的支持。

D. NodeManager:这是YARN(Yet Another Resource Negotiator)框架中的一部分,用于管理集群中的节点资源,如CPU、内存等。YARN是Hadoop生态系统中的一个资源管理和任务调度的框架,与HBase的核心功能不直接相关。HBase并不依赖于YARN或NodeManager来运行其基本的数据存储和查询功能。因此,HBase启动时不需要NodeManager进程。

8、下列哪个是纯离线数据采集工具( B )

A. FlinkX B. Sqoop
C. Flume D. Canal

FlinkX是一款基于Flink的分布式离线/实时数据同步插件,可实现多种异构数据源高效的数据同步。

Sqoop是一个纯离线的数据采集工具,主要用于将关系型数据库(如MySQL)中的数据导入到Hadoop的HDFS中,或者从HDFS导出到关系型数据库中。高度依赖MapReduce和YARN

Flume是一个分布式、高可靠性和高可用性的海量日志收集系统,主要用于实时采集日志数据。

Canal是一个基于MySQL数据库增量日志解析的实时数据同步工具,它主要用于提供增量数据订阅和消费。

9、Map的输出结果首先被写入( A )
A. 内存 B. 缓存
C. 磁盘 D. 以上都正确

Map的输出结果首先被写入的是一个内存缓冲区,这个缓冲区可以看作是内存的一部分。

10、MapReduce与HBase的关系,哪些描述是正确的?( B )

A. 两者不可或缺,MapReduce是HBase可以正常运行的保证

B. 两者不是强关联关系,没有MapReduce,HBase可以正常运行

C. MapReduce不可以直接访问HBase

D. 它们之间没有任何关系

A. 两者不可或缺,MapReduce是HBase可以正常运行的保证

  • 这个描述并不准确。虽然MapReduce和HBase都是Hadoop生态系统中的重要组件,但它们各自承担着不同的职责。HBase是一个开源的、分布式的非关系型数据库,而MapReduce是一种用于大规模数据处理的编程模型。HBase的正常运行并不直接依赖于MapReduce,它可以独立运行,并通过其他方式(如HBase自带的计算框架Coprocessor)进行数据处理。

B. 两者不是强关联关系,没有MapReduce,HBase可以正常运行

  • 这个描述是正确的。HBase和MapReduce是两个独立的项目,它们之间的关系是相互依赖但非强制的。HBase可以独立于MapReduce运行,并通过其他机制(如HBase Shell、HBase API等)进行数据的读写和管理。同时,MapReduce也可以处理非HBase来源的数据,如HDFS上的文件等。

C. MapReduce不可以直接访问HBase

  • 这个描述不完全准确。实际上,MapReduce可以通过HBase提供的HBaseTableInputFormat类来直接访问HBase中的数据。这个类允许MapReduce任务将HBase表作为输入源,从而可以对HBase中的数据进行读取和处理。

D. 它们之间没有任何关系

  • 这个描述显然是错误的。HBase和MapReduce都是Hadoop生态系统中的关键组件,它们之间存在着紧密的联系和交互。HBase可以作为MapReduce任务的输入或输出源,而MapReduce则可以对HBase中的数据进行高效的并行处理。

11.下列哪个不是Spark的执行模式? ( C )

A. Local B. YARN

C. Mesos D. HDFS

A. Local:这是Spark的一个执行模式,其中Spark应用程序在单个JVM进程中运行,通常用于开发、测试和调试目的。在这个模式下,Spark不需要启动集群,而是在本地机器上运行,非常适合小规模数据处理和快速原型开发。因此,A选项是Spark的一个执行模式。

B. YARN:YARN(Yet Another Resource Negotiator)是Apache Hadoop的资源管理器,用于在Hadoop集群上管理资源和调度任务。Spark可以在YARN上运行,将YARN作为集群管理器,以管理集群资源和调度Spark任务。YARN模式包括yarn-client和yarn-cluster两种运行模式,分别适用于不同的场景。因此,B选项也是Spark的一个执行模式。

C. Mesos:虽然Mesos是一个开源的集群管理器,用于管理跨多种框架(包括Spark)的集群资源,但它本身并不是Spark的一种执行模式。相反,Spark可以在Mesos上运行,利用Mesos提供的资源管理和调度功能。然而,在描述Spark的执行模式时,我们通常不会说“Mesos是Spark的一个执行模式”,而是说“Spark可以在Mesos上运行”。因此,C选项不是Spark的直接执行模式。

D. HDFS:HDFS(Hadoop Distributed File System)是Apache Hadoop的分布式文件系统,用于存储大数据集。它并不是Spark的执行模式,而是Spark可以访问的一种数据存储系统。Spark可以从HDFS中读取数据,进行处理,并将结果写回到HDFS中。然而,HDFS与Spark的执行模式是两个不同的概念。因此,D选项同样不是Spark的执行模式。

12.在Spark中,什么机制用于加速迭代计算? ( B )

A. Checkpointing B. Caching

C. Broadcasting D. Partitioning

解析:

Caching(缓存)

缓存机制是Spark中用于优化迭代计算的重要手段。Spark允许用户将RDD(弹性分布式数据集)或DataFrame等数据集缓存到内存中,以便在后续的计算中重用。

Checkpointing(检查点)

虽然Checkpointing也是Spark中的一种容错机制,但它主要用于在系统故障或节点故障时恢复数据,而不是直接用于加速迭代计算。

Broadcasting(广播变量)

Broadcasting是Spark中用于优化数据传输的一种机制,它允许将大变量(如模型参数、大配置对象等)从Driver端广播到所有Executor端,以减少每个Task的数据传输量。

Partitioning(分区)

Partitioning是Spark中对数据进行分布式存储和计算的基础。通过合理的分区策略,可以将大数据集分割成多个小数据集,并分布到不同的节点上进行并行处理。然而,分区本身并不直接加速迭代计算,而是为并行计算提供了基础。

  1. 下列哪个函数属于转换操作(Transformation)而不是行动操作(Action)?( C )

    A. count() B. collect()

    C. filter() D. saveAsTextFile()

  2. 在Spark中,persistcache方法有何区别? ( B )

A) cachepersist的一个别名,二者完全相同。

B) persist提供多种存储级别,而cache总是使用默认的存储级别。

C) cache用于DataFrame,而persist用于RDD。

D) persist用于数据的持久化,而cache用于数据的临时存储。

  1. 在Spark中,什么是“窄依赖”(Narrow Dependency)与“宽依赖”(Wide Dependency)?它们如何影响数据的并行处理? ?(A )

A) 窄依赖表示每个父RDD分区映射到子RDD的一个分区,而宽依赖涉及多个父分区到一个子分区,导致shuffle。

B) 宽依赖意味着数据不需要重分布,而窄依赖则需要shuffle。

C) 窄依赖和宽依赖都涉及到数据的shuffle,只是程度不同。

D) 窄依赖与宽依赖仅在DataFrame中存在,对RDD没有意义。

  1. Spark的JobStage在执行过程中如何划分? ( D )

A) Job由一系列Stage组成,每个Stage对应于一个shuffle操作。

B) JobStage是同义词,没有区别。

C) Stage由一系列Job组成,用于并行执行不同的任务。

D) Job是由用户提交的任务,Stage是DAGScheduler为优化执行计划而创建的最小执行单元。

解析:

  1. Job的定义
  • Job是Spark中由用户提交的任务,通常是由一个Action操作(如collectcountsavereduce等)触发的。每个Action操作都会生成一个Job。
  1. Stage的划分
  • Stage是Spark中Job处理过程要分为的几个阶段。DAGScheduler(有向无环图调度器)会根据RDD之间的依赖关系(特别是宽依赖,如shuffle操作)将Job划分为多个Stage。
  • 划分Stage的依据是RDD之间的宽窄依赖。宽依赖(如groupByKeyreduceByKeyjoin等)会导致shuffle操作,从而需要在不同节点间重新分配数据。每当遇到宽依赖时,DAGScheduler就会切分出一个新的Stage。
  • Stage的数量取决于程序中宽依赖(即shuffle操作)的数量。每个Stage包含一组可以并行执行的任务(Tasks)。
  1. Task的定义
  • Task是Spark中任务运行的最小单位,最终是以Task为单位运行在Executor中的。一个Stage会包含多个Task,这些Task的数量通常取决于Stage中最后一个RDD的分区数。
  • Task的内容与Stage相同,但当分区数量为n时,会有n个相同效果的Task被分发到执行程序中执行。
  1. 在Spark中,mapPartitionsmap操作有何区别,以及在什么情况下使用mapPartitions更合适? ( B )

A) mapPartitionsmap都是对每个元素进行操作,没有区别。

B) mapPartitions可以访问整个分区的数据,适用于需要对分区内的数据进行全局操作的场景。

C) map操作可以改变分区的数量,而mapPartitions不能。

D) mapPartitionsmap的别名,用于提高代码可读性。

  1. Spark的Kryo序列化库如何帮助提高性能? ( B )

A) Kryo增加了序列化的复杂度,但提高了数据的完整性。

B) Kryo序列化库提供了一种更紧凑、更快的序列化方式,减少了网络传输和磁盘I/O的开销。

C) Kryo只用于Spark的内部通信,对外部数据无影响。

D) Kryo序列化库是默认的序列化方式,不需要配置。

  1. 在Spark中,SparkSessionSparkContext的关系是什么?为何推荐使用SparkSession? ( A )

A) SparkSessionSparkContext的封装,提供了更高级的功能,如SQL查询和数据源管理,SparkSession简化了API,提高了易用性。

B) SparkSessionSparkContext可以互换使用,没有推荐使用的原因。

C) SparkContextSparkSession的前身,SparkSession仅用于Spark SQL。

D) SparkSession用于管理执行器,SparkContext用于管理Driver程序。

  1. Spark的Broadcast JoinShuffle Hash Join有何区别?在何种情况下应优先考虑使用Broadcast Join? ( D )A

A) Broadcast Join将较小的表广播到每个节点,减少shuffle成本;Shuffle Hash Join需要更多网络传输,适用于大表间的连接。

B) Broadcast JoinShuffle Hash Join没有区别,只是名称不同。

C) Shuffle Hash Join总是优于Broadcast Join,因为它更通用。

D) Broadcast Join用于小数据集,Shuffle Hash Join用于大数据集,但具体选择与数据大小无关。

解析:

Broadcast JoinShuffle Hash Join是Spark SQL中处理连接(Join)操作的两种不同策略。Broadcast Join适用于连接操作中的一个小表,它会将这个小表广播到每个节点上,从而避免了大表的shuffle操作,减少了网络传输成本。而Shuffle Hash Join则适用于大数据集之间的连接,它需要对数据进行shuffle操作来确保连接的正确性。因此,在连接操作中的一个小表时,应优先考虑使用Broadcast Join。注意,实际选择哪种连接策略还取决于其他因素,如数据分布、集群配置等,但数据大小是一个重要的考虑因素。

二、填空题(共20分,每空0.5分)

1、启动hdfs的shell脚本是:( sh xxx.sh )start-dfs.sh

解析:

启动hdfs的shell脚本是:start-dfs.sh。这个脚本用于启动Hadoop分布式文件系统(HDFS)的所有守护进程,包括NameNode和DataNode等。

2、Block是HDFS的基本存储单元,默认大小是( 128 ) MB

3、MapReduce默认输入的格式化类:( InputFormat )TextInputFormat

解析:

MapReduce默认输入的格式化类是TextInputFormat。这是MapReduce的默认输入格式,它读取文件的行作为输入,其中行的字节偏移量作为键(Key),行的内容作为值(Value)。

4、Hadoop三大组件:( HDFS )、( MapReduce )、( Yarn )

5、Hiveserver2默认的端口:( 10000 )

解析:

Hiveserver2默认的端口是10000。Hiveserver2是Hive的一个服务组件,它允许用户通过JDBC或ODBC等协议远程连接到Hive并执行SQL查询。

6、HBase的RowKey设计三大原则:( 唯一性 )、( 散列性 )、( 长度适中 )

解析:

  • 唯一性:确保RowKey的唯一性,以便能够唯一标识表中的每一行数据。
  • 散列性:设计RowKey时应该考虑其散列性,避免大量数据集中在少数几个Region上,从而导致热点问题。
  • 长度适中:RowKey的长度应该适中,不宜过长也不宜过短。过长的RowKey会占用较多的存储空间,而过短的RowKey则可能增加数据倾斜的风险。

7、在Spark中,一个( RDD )由一系列的( 分区 )组成,每个Stage由一组( RDD )构成,而Stage之间的依赖关系通常由( 行动算子 )操作触发。

8、hive中数据文件默认存储格式是( txt )TextFile

解析:

hive中数据文件默认存储格式是TextFile。TextFile是一种简单的文本格式,数据以行为单位进行存储,每行数据之间通过换行符分隔。

9、spark core中缓存的实现方式有几种( 使用cache存储到内存中 )( 使用checkpoint存储到磁盘中 )

10、hive中sql转变成mr经过4个器,分别是解析器,( 编译器 )、( 优化器 )、( 执行器 )

三、判断题(共10道,每道1分)

1、Block Size是不可以修改的( F )

2、如果NameNode意外终止,SecondaryNameNode会接替它使集群继续工作( F )

3、MapReduce 切片的大小等于 block的大小( F )

4、在HBase中由HMaster负责用户的IO请求( F )

5、MapReduce中map任务的数量可以自己指定( T )

6、DataX只能用于离线数据采集( T )

7、Flume运行时需要依赖MapReduce( T )F

解析:

Apache Flume是一个分布式、可靠且可用的服务,用于高效地收集、聚合和移动大量日志数据。它并不依赖MapReduce来运行,而是可以独立于Hadoop生态系统运行。

8、MapReduce中环形缓冲区默认大小为128M( F )

9、Spark的SparkContextSparkSession可以同时存在于同一个应用中,SparkContext提供了更多低级的API,而SparkSession则提供了高层的API,包括SQL和数据源支持。( T )

10、Spark的map操作是懒惰求值的,只有在触发行动操作时才会执行计算。( T )

四、简答题(共5道,每道4分)

1、用自己的语言描述SecondaryNameNode的作用。

SecondaryNameNode作为NameNode的秘书,帮助NameNode处理事务。

SecondaryNameNode是用来帮助NameNode完成元数据信息合并,从角色上看属于NameNode的“秘书”

1.定期合并FsImage和Edits文件

  1. 提供HDFS元数据的冷备份

  2. 监控HDFS状态

  3. 提升HDFS的可靠性和性能

2、用自己的语言描述spark的数据倾斜优化方式。

1.使用Hive ETL预处理数据如果导致数据倾斜的是Hive表。如果该Hive表中的数据本身很不均匀(比如某个 key对应了100万数据,其他key才对应了10条数据)。此时可以评估一下,是否可以通过Hive来进行数据预处理(即通过Hive ETL预先对 数据按照key进行聚合,或者是预先和其他表进行join),然后在Spark作业中针对的数据源就不是 原来的Hive表了,而是预处理后的Hive表。此时由于数据已经预先进行过聚合或join操作了,那么 在Spark作业中也就不需要使用原先的shuffle类算子执行这类操作了。 2.过滤少数导致倾斜的key如果发现导致倾斜的key就少数几个,而且对计算本身的影响并不大。如果我们判断那少数几个数据量特别多的key,对作业的执行和计算结果不是特别 重要的话,那么干脆就直接过滤掉那少数几个key。比如,在Spark SQL中可以使用where子句过滤 掉这些key或者在Spark Core中对RDD执行filter算子过滤掉这些key。如果需要每次作业执行时, 动态判定哪些key的数据量最多然后再进行过滤,那么可以使用sample算子对RDD进行采样,然后 计算出每个key的数量,取数据量最多的key过滤掉即可。 3.提高shuffle操作的并行度在对RDD执行shuffle算子时,给shuffle算子传入一个参数,比如 reduceByKey(1000),该参数就设置了这个shuffle算子执行时shuffle read task的数量。对于 Spark SQL中的shuffle类语句,比如group by、join等,需要设置一个参数,即 spark.sql.shuffle.partitions,该参数代表了shuffle read task的并行度,该值默认是200,对于很 多场景来说都有点过小。增加shuffle read task的数量,可以让原本分配给一个task的多个key分配给多个 task,从而让每个task处理比原来更少的数据。4.双重聚合对RDD执行reduceByKey等聚合类shuffle算子或者在Spark SQL中使用group by 语句进行分组聚合时,比较适用这种方案。这个方案的核心实现思路就是进行两阶段聚合。第一次是局部聚合,先给每个key 都打上一个随机数,比如10以内的随机数,此时原先一样的key就变成不一样的了,比如(hello, 1) (hello, 1) (hello, 1) (hello, 1),就会变成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接着 对打上随机数后的数据,执行reduceByKey等聚合操作,进行局部聚合,那么局部聚合结果,就会 变成了(1_hello, 2) (2_hello, 2)。然后将各个key的前缀给去掉,就会变成(hello,2)(hello,2),再次 进行全局聚合操作,就可以得到最终结果了,比如(hello, 4)。 5.将reduce join转为map join在对RDD使用join类操作,或者是在Spark SQL中使用join语句时,而且join操作中 的一个RDD或表的数据量比较小(比如几百M或者一两G),比较适用此方案。 不使用join算子进行连接操作,而使用Broadcast变量与map类算子实现join操作, 进而完全规避掉shuffle类的操作,彻底避免数据倾斜的发生和出现。将较小RDD中的数据直接通过 collect算子拉取到Driver端的内存中来,然后对其创建一个Broadcast变量;接着对另外一个RDD 执行map类算子,在算子函数内,从Broadcast变量中获取较小RDD的全量数据,与当前RDD的每 一条数据按照连接key进行比对,如果连接key相同的话,那么就将两个RDD的数据用你需要的方式 连接起来。 6.采样倾斜key并分拆join操作两个RDD/Hive表进行join的时候,如果数据量都比较大,无法采用“解决方案五 ”,那么此时可以看一下两个RDD/Hive表中的key分布情况。如果出现数据倾斜,是因为其中某一 个RDD/Hive表中的少数几个key的数据量过大,而另一个RDD/Hive表中的所有key都分布比较均 匀,那么采用这个解决方案是比较合适的。思路:对包含少数几个数据量过大的key的那个RDD,通过sample算子采样出一份样本来,然后统计一下每个 key的数量,计算出来数据量最大的是哪 几个key。 然后将这几个key对应的数据从原来的RDD中拆分出来,形成一个单独的RDD,并给每个key都打上n以 内的随机数作为前缀,而不会导致倾斜的大部分key形成另外一个RDD。 接着将需要join的另一个RDD,也过滤出来那几个倾斜key对应的数据并形成一个单独的RDD,将每条数 据膨胀成n条数据,这n条数据都按顺序附加一个0~n的前缀,不会导致倾斜的大部分key也形成另外一个 RDD。 再将附加了随机前缀的独立RDD与另一个膨胀n倍的独立RDD进行join,此时就可以将原先相同的key打 散成n份,分散到多个task中去进行join了。 而另外两个普通的RDD就照常join即可。 最后将两次join的结果使用union算子合并起来即可,就是最终的join结果。 7.使用随机前缀和扩容RDD进行join如果在进行join操作时,RDD中有大量的key导致数据倾斜,那么进行分拆key也没 什么意义。该方案的实现思路基本和“解决方案六”类似,首先查看RDD/Hive表中的数据分布情况,找到那个造成 数据倾斜的RDD/Hive表,比如有多个key都对应了超过1万条数据。 然后将该RDD的每条数据都打上一个n以内的随机前缀。 同时对另外一个正常的RDD进行扩容,将每条数据都扩容成n条数据,扩容出来的每条数据都依次打上一 个0~n的前缀。 最后将两个处理后的RDD进行join即可。

3、用自己的语言描述诉MapReduce流程。

一、输入分片(Input Splitting)

  • 过程描述:在MapReduce作业开始前,输入文件(或文件夹)首先被划分为多个InputSplit(输入分片)。默认情况下,每个HDFS的block(数据块)对应一个InputSplit。这样做的目的是将大文件分割成多个小块,以便并行处理。
  • 分片大小:分片的大小通常与HDFS的数据块大小相同(默认是128MB),但也可以根据作业需求进行调整。分片时不考虑数据集整体,而是逐个针对每一个文件单独切片。

二、Map阶段

  • InputFormat类:使用InputFormat类的子类(如TextInputFormat)把输入文件(夹)划分为InputSplit。
  • RecordReader类:每个InputSplit通过RecordReader类被解析成一个个<key,value>键值对。在TextInputFormat中,默认是每行的起始偏移量作为key,每行的内容作为value。
  • Mapper类:框架调用Mapper类中的map函数,输入是<k1,v1>键值对,输出是<k2,v2>键值对。程序员可以覆盖map函数,实现自己的逻辑。

三、Combiner阶段(可选)

  • 过程描述:Combiner是一个本地化的reduce操作,发生在map端。它的主要作用是减少map端的输出,从而减少shuffle过程中网络传输的数据量,提高作业的执行效率。
  • 注意:Combiner的输出是Reducer的输入,因此它绝不能改变最终的计算结果。Combiner适合于等幂操作,如累加、最大值等。

四、Shuffle阶段

  • 分区:在map函数处理完数据后,输出的<k2,v2>键值对会根据key进行分区,不同的分区由不同的reduce task处理。分区操作通常通过哈希函数实现。
  • 排序与合并:在写入环形缓冲区之前,数据会先进行排序(默认采用快速排序算法)。当缓冲区满(默认为80%容量)时,数据会溢出到磁盘文件中,并在溢出前完成排序。多个溢出文件在最终输出前会进行归并排序,合并成一个大的有序文件。
  • 数据传输:map任务完成后,reduce任务会启动数据copy线程,通过HTTP方式请求map任务所在的NodeManager以获取输出文件。

五、Reduce阶段

  • 数据合并:Reduce任务将接收到的所有map任务的输出数据(已分区且区内有序)进行合并,相同key的value值会被放到同一个集合中。
  • Reducer类:框架调用Reducer类中的reduce函数,对合并后的数据进行处理,最终输出结果。
  • OutputFormat类:使用OutputFormat类的子类(如TextOutputFormat)将最终结果输出到文件或数据库等存储介质中。

4、谈谈Hive的优化。

1.本地模式运行,当处理一些小任务时可以选择本地模式运行,这样会使得任务执行的速度会很快。

2.JVM模式,在处理一些需要很多资源的任务时,可以先申请一部分的资源,等运行结束后再将资源释放。

3.严格模式,启动严格模式,禁止分区表的全表扫描,查询数据时必须加limit,禁止笛卡尔积。

4.hive join的数据倾斜问题,当小表join小表时,不用去管它;当小表join大表时,小表放在join的左边;当大表join大表时,应当考虑是否会出现某个reduce数据量过大的情况。空key过滤:当有大量数据同时放入一个reduce时,应当观察该rowkey,一般来说该rowkey对应的数据都是异常数据,需要使用sql语句对其进行过滤。空key转换:当有大量的数据都对应一个空的rowkey时,需要给这些数据随机分配一个rowkey,使它们均匀的分布到一些reduce中。

5.自定义map和reduce的数量,一般不去修改它。

5、用自己的语言描述诉spark的资源调度和任务调度流程。

spark的资源调度:driver向resourcemanager申请资源,resourcemanager选择一个空闲的子节点开启applicationmaster任务,applicationmaster向resourcemanager提交申请资源开启executor的任务。applicationmaster选择一个空闲子节点开启executor,

开启完毕后applicationmaster将executor开启的消息发送给driver,让driver发送执行任务。

spark的任务调度流程:driver端,遇到action算子触发任务执行,将任务提交到有向无环图,DAGscheduler中,根据RDD的血缘关系划分划分stage,将RDD中的分区封装成taskset任务,发送到TASKscheduler。TASKscheduler取出taskset任务,根据RDD 的提供最优的任务执行计划,只移动计算不移动数据,开始对执行任务。

spark的资源调度:

1、Driver提交作业命令

2、向ResourceMananger申请资源

3、ResourceMananger检查一些权限、资源空间,在一个相对空闲的子节点上开启一个ApplicationMaster的进程

4、ApplicationMaster向ResourceMananger申请资源,启动Executor

5、ResourceMananger启动Executor

6、Executor反向注册给Driver,告诉Driver资源申请成功,可以发送任务

spark的任务调度流程:

7、Driver遇到一个行动算子,触发整个作业调度

8、先将整个作业交给DAG有向无环图

DAG Scheduler

9、根据RDD之间的血缘关系,找出宽窄依赖(有没有shuffle的产生)

10、通过宽窄依赖划分stage阶段

11、根据stage阶段,将stage中的task任务封装成一个taskSet对象

12、发送给后续的 Task Scheduler

Task Scheduler

13、从DAG Scheduler发送过来的taskSet中取出task任务

14、根据RDD五大特性的最后一大特性,只移动计算不移动数据,将task任务发送到对应的Executor的线程池中执行

五、代码题(50分)

1、spark sql数据分析以及可视化(30分)

疫情期间各类政府媒体及社交网站,均发布了相关疫情每日统计数据,下面基于数据仓库工具Hive请你统计分析相关疫情数据。

提示:

数据字段为:日期date、省份province、城市city、新增确诊confirm、新增出院heal、新增死亡dead、消息来源source)

部分数据截图:
在这里插入图片描述

题目:

请基于covid19.csv数据,将数据导入到Hive中,使用spark on hive读取数据使用纯SQL完成下列统计分析

请自行在Hive按照数据结构创建对应的表并加载数据

请给出代码语句及结果截图

  • 1、统计湖北省每月新增出院病例总数最多的前3个城市(8分)

    输出:[月份,城市,每月新增出院病例总数,排名]

create table bigdata30_test3.covid (
dates String,
province  String,
city String,
confirm Int,
heal Int,
dead Int,
source String
)ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LOCATION '/bigdata30/data/';select
tt1.month,
tt1.city,
tt1.counts as counts,
tt1.rank
from
(select 
t1.month,
t1.city,
t1.counts as counts,
-- row_number:数据相同也不会导致排名重复
row_number() over(partition by t1.month order by t1.counts desc) rank
from
(select 
province,
city,
-- 只取出月份
subString(dates,0,2) as month,
count(heal) as counts
from bigdata30_test3.covid
where province = "湖北" and city != "境外输入-英国"
-- 只取出月份用来分组
group by city,province,subString(dates,0,2)) t1) tt1
-- 取每个排序的前三名
WHERE tt1.rank <= 3;结果:
1月	武汉市	24	1
1月	荆门市	10	2
1月	荆州市	10	3
2月	武汉市	62	1
2月	黄石市	54	2
2月	黄冈市	46	3
3月	武汉市	41	1
3月	鄂州市	32	2
3月	孝感市	28	3
4月	武汉市	29	1
4月	荆门市	1	2
4月	襄阳市	1	3
  • 2、统计安徽省每月新增确诊人数同比

    同比 = (当月指标 - 上月指标)/ 上月指标 (6分)

    输出:[月份,每月新增确诊人数,上月新增确诊人数,同比]

    将该需求结果写入到mysql中,使用fileBI作图,柱状图 (4分)

    -- 纯sql DBeaver中执行
    select 
    t1.month,
    t1.counts,
    LAG(counts,1,-1) over(order by t1.month) as last_counts,
    casewhen LAG(counts,1,-1) over(order by t1.month) < 0 then '没有上一个月的数据'else round(((t1.counts - LAG(counts,1,-1) over(order by t1.month)) / LAG(counts,1,-1) over(order by t1.month)),2)
    end as rate
    from
    (select 
    subString(dates,0,2) as month,
    province,
    count(confirm) as counts
    from 
    bigdata30_test3.covid
    where province = "安徽"
    group by province,subString(dates,0,2)) t1;-- MySQL建表
    create table dataMysql (
    months varchar(30),
    counts  varchar(30),
    last_counts varchar(30),
    rate varchar(30)
    )
    
    // 为了将数据写入到MySQL,使用sparksqlpackage com.shujia.DSLexamimport org.apache.spark.rdd.RDD
    import org.apache.spark.sql.{DataFrame, Row, SparkSession}import java.sql.{Connection, DriverManager, PreparedStatement}object Exam2_1 {def main(args: Array[String]): Unit = {val sparkSession: SparkSession = SparkSession.builder().master("local").appName("考试大题一")//参数设置的优先级:代码优先级 > 命令优先级 > 配置文件优先级.config("spark.sql.shuffle.partitions", "1").enableHiveSupport() // 开启hive的配置.getOrCreate()sparkSession.sql("use bigdata30_test3")//truncate = false 时,完整地显示某列值,不进行任何截断。val dataDF: DataFrame = sparkSession.sql("""|select|t1.month,|t1.counts,|LAG(counts,1,-1) over(order by t1.month) as last_counts,|case|    when LAG(counts,1,-1) over(order by t1.month) < 0 then 0|    else round(((t1.counts - LAG(counts,1,-1) over(order by t1.month)) / LAG(counts,1,-1) over(order by t1.month)),2)|end as rate|from|(select|subString(dates,0,2) as month,|province,|count(confirm) as counts|from|bigdata30_test3.covid|where province = "安徽"|group by province,subString(dates,0,2)) t1|""".stripMargin)dataDF.foreach((rdd: Row) => {//注册驱动Class.forName("com.mysql.jdbc.Driver")//创建数据库连接对象val conn: Connection = DriverManager.getConnection("jdbc:mysql://master:3306/exam?useUnicode=true&characterEncoding=UTF-8&useSSL=false","root","123456")//创建预编译对象val statement: PreparedStatement = conn.prepareStatement("insert into dataMysql values(?,?,?,?)")statement.setString(1, rdd.getAs[String]("month"))statement.setLong(2, rdd.getAs[Long]("counts"))statement.setLong(3, rdd.getAs[Long]("last_counts"))statement.setDouble(4, rdd.getAs[Double]("rate"))//          执行sql语句statement.executeUpdate()statement.close()conn.close()})}
    }

    在这里插入图片描述

fineBI作图
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

  • 3、统计安徽省各城市连续新增确诊人数、连续新增确诊开始日期、连续新增确诊结束日期及连续新增确诊天数(12分)

    输出:[城市,连续新增确诊人数,连续新增确诊开始日期,连续新增确诊结束日期,连续新增确诊天数]

select DISTINCT 
tt1.city,
max(tt1.confirm) over(partition by tt1.city,tt1.flag,SPLIT(tt1.new_day, '-')[0]) - min(tt1.confirm) over(partition by tt1.city,tt1.flag,SPLIT(tt1.new_day, '-')[0]) as add_confirm,
min(tt1.new_day) over(partition by tt1.city,tt1.flag,SPLIT(tt1.new_day, '-')[0]) as start_date,
max(tt1.new_day) over(partition by tt1.city,tt1.flag,SPLIT(tt1.new_day, '-')[0]) as end_date,
count(1) over(partition by tt1.city,tt1.flag,SPLIT(tt1.new_day, '-')[0]) as counts
from
(select
*,
CAST(SPLIT(t1.new_day, '-')[1] AS INT) AS day_of_month,
(CAST(SPLIT(t1.new_day, '-')[1] AS INT) - t1.rank) as flag
from
(select * ,
from_unixtime(unix_timestamp(dates,'MM月dd日'),'MM-dd') as new_day,
row_number() over(partition by city order by dates) rank
from bigdata30_test3.covid
where province = "安徽" and source = "安徽卫健委" and heal IS NOT NULL and confirm IS NOT NULL and dead IS NOT NULL) t1) tt1-- 连续新增确诊人数应该是,求出的数据应该是本组的最后一条的confirm减去本组第一天的confirm,而不是下面的一组中的最大的confirm减去最小的confirm ???  该如何求解
max(tt1.confirm) over(partition by tt1.city,tt1.flag,SPLIT(tt1.new_day, '-')[0]) -min(tt1.confirm) over(partition by tt1.city,tt1.flag,SPLIT(tt1.new_day, '-')[0]) as add_confirm,-- 解决方案:FIRST_VALUE()和LAST_VALUE()函数分别获取了每个分组的第一天和最后一天的确诊人数-- 为了避免下述中出现的分组全出现在结果中的问题,使用FIRST_VALUE()和LAST_VALUE()函数时,
-- 最好指定ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
SELECT DISTINCTtt1.city,LAST_VALUE(tt1.confirm) OVER (PARTITION BY tt1.city, tt1.flag, SPLIT(tt1.new_day, '-')[0] ORDER BY tt1.new_day ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) -FIRST_VALUE(tt1.confirm) OVER (PARTITION BY tt1.city, tt1.flag, SPLIT(tt1.new_day, '-')[0] ORDER BY tt1.new_day ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS add_confirm,MIN(tt1.new_day) OVER (PARTITION BY tt1.city, tt1.flag, SPLIT(tt1.new_day, '-')[0]) AS start_date,MAX(tt1.new_day) OVER (PARTITION BY tt1.city, tt1.flag, SPLIT(tt1.new_day, '-')[0]) AS end_date,COUNT(1) OVER (PARTITION BY tt1.city, tt1.flag, SPLIT(tt1.new_day, '-')[0]) AS counts
FROM(SELECT*,CAST(SPLIT(t1.new_day, '-')[1] AS INT) AS day_of_month,(CAST(SPLIT(t1.new_day, '-')[1] AS INT) - t1.rank) AS flagFROM(SELECT *,FROM_UNIXTIME(UNIX_TIMESTAMP(dates, 'MM月dd日'), 'MM-dd') AS new_day,ROW_NUMBER() OVER (PARTITION BY city ORDER BY dates) AS rankFROM bigdata30_test3.covidWHERE province = '安徽' AND source = '安徽卫健委' AND heal IS NOT NULL AND confirm IS NOT NULL AND dead IS NOT NULL) t1) tt1;-- 注:SQL中,ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING是窗口函数(如FIRST_VALUE(), LAST_VALUE(), ROW_NUMBER(), SUM(), AVG()等)的一个子句,用于指定窗口的范围。
UNBOUNDED PRECEDING表示窗口的起始点是分区中的第一行。
UNBOUNDED FOLLOWING表示窗口的结束点是分区中的最后一行。对于FIRST_VALUE()和LAST_VALUE()这样的函数,它们通常需要一个明确的窗口定义来确定“第一”和“最后”是基于什么范围来计算的。如果不提供ROWS BETWEEN子句,某些数据库系统可能会报错,因为它们不知道应该基于哪些行来计算这些值。-- 不加也可正常执行,但是不能加上ORDER BY tt1.new_day,否则会出现整个分组都出现在最终的结果里
形如:
|合肥市|          0|     01-28|   01-30|     3|
|合肥市|         10|     01-28|   01-30|     3|
|合肥市|          7|     01-28|   01-30|     3| 
SELECT DISTINCTtt1.city,LAST_VALUE(tt1.confirm) OVER (PARTITION BY tt1.city, tt1.flag, SPLIT(tt1.new_day, '-')[0]) -FIRST_VALUE(tt1.confirm) OVER (PARTITION BY tt1.city, tt1.flag, SPLIT(tt1.new_day, '-')[0]) AS add_confirm,MIN(tt1.new_day) OVER (PARTITION BY tt1.city, tt1.flag, SPLIT(tt1.new_day, '-')[0]) AS start_date,MAX(tt1.new_day) OVER (PARTITION BY tt1.city, tt1.flag, SPLIT(tt1.new_day, '-')[0]) AS end_date,COUNT(1) OVER (PARTITION BY tt1.city, tt1.flag, SPLIT(tt1.new_day, '-')[0]) AS counts
FROM(SELECT*,CAST(SPLIT(t1.new_day, '-')[1] AS INT) AS day_of_month,(CAST(SPLIT(t1.new_day, '-')[1] AS INT) - t1.rank) AS flagFROM(SELECT *,FROM_UNIXTIME(UNIX_TIMESTAMP(dates, 'MM月dd日'), 'MM-dd') AS new_day,ROW_NUMBER() OVER (PARTITION BY city ORDER BY dates) AS rankFROM bigdata30_test3.covidWHERE province = '安徽' AND source = '安徽卫健委' AND heal IS NOT NULL AND confirm IS NOT NULL AND dead IS NOT NULL) t1) tt1;

没指定ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING,又加上了order by dates: 会出现下面这种情况

在这里插入图片描述

2、spark DSL数据分析(20分)

现有三份数据,结构如下:

  • live_types 直播间信息表

    结构:live_id live_type

    ​ 直播间id 直播间类型

  • live_events 用户访问直播间记录表

    结构:user_id live_id start_time end_time

    ​ 用户id 直播间id 开始时间 结束时间

  • user_info 用户信息表

    结构:user_id user_name

    ​ 用户id 用户名

题目:

请给出结果截图及Scala代码

1、统计每位用户观看不同类型直播的次数(6分)

输出:[用户id,用户名,直播间类型,次数]

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.{DataFrame, SparkSession}object Exam2 {def main(args: Array[String]): Unit = {val sparkSession: SparkSession = SparkSession.builder().master("local").appName("社保练习").getOrCreate()import org.apache.spark.sql.functions._import sparkSession.implicits._// user_id live_id  start_time end_time//       用户id  直播间id      开始时间  结束时间val live_events: DataFrame = sparkSession.read.format("csv").option("sep", "\t").schema("user_id INT,live_id INT,start_time timestamp,end_time timestamp").load("spark/data_exam/live_events.txt")
//    live_events.show()//live_id  live_type//   直播间id 直播间类型val live_types: DataFrame = sparkSession.read.format("csv").option("sep", "\t").schema("live_id INT,live_type String").load("spark/data_exam/live_types.txt")
//    live_types.show()//user_id  user_name//      用户id     用户名val user_info: DataFrame = sparkSession.read.format("csv").option("sep", "\t").schema("user_id INT,user_name String").load("spark/data_exam/user_info.txt")
//    user_info.show()/*** 1、统计每位用户观看不同类型直播的次数(6分)* > 输出:[用户id,用户名,直播间类型,次数]*/live_events.withColumn("count",count(expr("1")) over Window.partitionBy($"user_id",$"live_id"))// TODO 设置表与表之间进行左连接.join(live_types, live_events("live_id") === live_types("live_id"), "left").join(user_info, live_events("user_id") === user_info("user_id"), "left")/*** 两张表中的字段名相同,要注明字段所属表* 否则会报 Reference 'user_id' is ambiguous, could be: user_id, user_id.*/.select(live_events("user_id"),$"user_name",$"live_type",$"count").distinct().show()+-------+---------+---------+-----+
|user_id|user_name|live_type|count|
+-------+---------+---------+-----+
|    106|     Lucy|    music|    1|
|    100|    LiHua|     game|    1|
|    102|      Tom|     food|    1|
|    104|     Bush|     game|    1|
|    105|      Jam|     game|    1|
|    102|      Tom|    music|    1|
|    100|    LiHua|     food|    2|
|    101|      Bob|     food|    1|
|    101|      Bob|     game|    1|
|    102|      Tom|     game|    1|
|    104|     Bush|     food|    1|
+-------+---------+---------+-----+

2、统计每位用户累计观看直播时长,按时长降序排列(6分)

输出:[用户id,用户名,累计时长]

    /*** 2、统计每位用户累计观看直播时长,按时长降序排列(6分)* > 输出:[用户id,用户名,累计时长]*/// 100	1	2021-12-01 19:00:00	2021-12-01 19:28:00 start_time timestamp,end_timelive_events.withColumn("times",(unix_timestamp($"end_time","yyyy-MM-dd HH:mm:ss") - unix_timestamp($"start_time","yyyy-MM-dd HH:mm:ss"))).withColumn("all_times", sum($"times") over Window.partitionBy($"user_id")).join(user_info,"user_id")// TODO 时间戳 / 60 ,在最后查询时,可以将秒转换成分钟.select($"user_id",$"user_name",$"all_times" / 60).distinct().orderBy($"all_times".desc).show()+-------+---------+----------------+
|user_id|user_name|(all_times / 60)|
+-------+---------+----------------+
|    104|     Bush|           178.0|
|    101|      Bob|           163.0|
|    102|      Tom|           140.0|
|    106|     Lucy|           129.0|
|    100|    LiHua|           110.0|
|    105|      Jam|             8.0|
+-------+---------+----------------+

3、统计不同类型直播用户累计观看时长降序排名(8分)

输出:[直播间id,直播间类型,用户id,用户名,累计时长,排名]

    /*** 3、统计不同类型直播用户累计观看时长降序排名(8分)* > 输出:[直播间id,直播间类型,用户id,用户名,累计时长,排名]*/live_events//TODO 在开始得出时间戳的时候就将其转换成以分钟为单位.withColumn("times", (unix_timestamp($"end_time", "yyyy-MM-dd HH:mm:ss") - unix_timestamp($"start_time", "yyyy-MM-dd HH:mm:ss")) / 60).withColumn("all_times", sum($"times") over Window.partitionBy($"user_id")).join(user_info, "user_id").join(live_types, "live_id").withColumn("rank", row_number() over Window.partitionBy($"live_type").orderBy($"all_times".desc)).select($"live_id",$"live_type",$"user_id",$"user_name",$"all_times",$"rank").show()+-------+---------+-------+---------+---------+----+
|live_id|live_type|user_id|user_name|all_times|rank|
+-------+---------+-------+---------+---------+----+
|      1|     food|    104|     Bush|    178.0|   1|
|      1|     food|    101|      Bob|    163.0|   2|
|      1|     food|    102|      Tom|    140.0|   3|
|      1|     food|    100|    LiHua|    110.0|   4|
|      1|     food|    100|    LiHua|    110.0|   5|
|      3|    music|    102|      Tom|    140.0|   1|
|      3|    music|    106|     Lucy|    129.0|   2|
|      2|     game|    104|     Bush|    178.0|   1|
|      2|     game|    101|      Bob|    163.0|   2|
|      2|     game|    102|      Tom|    140.0|   3|
|      2|     game|    100|    LiHua|    110.0|   4|
|      2|     game|    105|      Jam|      8.0|   5|
+-------+---------+-------+---------+---------+----+

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com