您的位置:首页 > 汽车 > 时评 > 大数据存储技术笔记

大数据存储技术笔记

2024/12/22 20:39:12 来源:https://blog.csdn.net/yesyesyes_yes/article/details/139736161  浏览:    关键词:大数据存储技术笔记

目录

大数据的特性

HDFS 读流程的基本步骤

HDFS 写流程的基本步骤

MapReduce的执行过程

MapReduce 中 combiner 作用

hadoop 调度器及其工作方法

Hive 中内部表与外部表区别(创建删除角度)

Hadoop 的 2 个主要组件及其功能

Hadoop MapReduce 的工作流程

正常工作的 hadoop 集群中 Hadoop 分别启动哪些进程及其作用

Hadoop 中的 NameNode 和 DataNode 分别承担的角色

Hadoop 集群中的 NameNode 如何保证单点故障问题

Hadoop 中的 Block 是什么及其作用

Hadoop 在大数据处理中的优势

FsImage 镜像文件和 EditLog 日志文件 HDFS 中提供的 Secondary NameNode 节点的职责

Hadoop 中的分区器(Partitioner)的作用

YARN 提交作业的流程

Mapper+Reducer+Master代码


大数据的特性

全样而非抽样、效率而非精确 、相关而非因果

HDFS(Hadoop Distributed File System)的读流程主要涉及客户端(Client)、NameNode 和 DataNode 之间的交互。

HDFS 读流程的基本步骤

文件切分(对于上传操作): 当文件上传到 HDFS 时,客户端(Client)首先将文件切分成一个个的数据块(Block),然后进行上传。

与 NameNode 交互: 客户端向 NameNode 发送请求,获取要读取文件的位置信息。这个位置信息包括了组成该文件的所有数据块(Block)以及它们所在的 DataNode 的地址。

与 DataNode 交互: 一旦客户端获得了文件的数据块及其所在的 DataNode 的位置信息,它就会与这些 DataNode 进行交互,以读取数据。 客户端可能会并行地从多个 DataNode 读取数据块,以提高读取效率。

数据读取: 客户端从 DataNode 读取数据块,并在本地重新组装这些数据块,以还原原始文件。

关闭连接: 当文件读取完成后,客户端关闭与 DataNode 和 NameNode 的连接。

HDFS(Hadoop Distributed File System)的写流程涉及客户端(Client)、NameNode 和 DataNode 之间的交互。

HDFS 写流程的基本步骤

客户端发起请求: 客户端(Client)首先向 HDFS 的 NameNode 发送写入文件的请求。NameNode 是 HDFS 的 主节点,负责管理文件系统的命名空间和元数据信息。

NameNode 响应请求: NameNode 接收到客户端的请求后,会检查自身是否正常运行,判断要创建的文件是否存在, 以及客户端是否具有创建文件的权限。如果以上检查都通过,NameNode 会在 HDFS 文件系 统中创建一个空文件,并将这一操作记录在 edits.log 文件中。如果检查中有任何一项未通过, NameNode 会向客户端抛出异常,文件创建失败。

文件切分: 客户端将待写入的文件切分成固定大小的数据块(通常为 128MB)。每个数据块都会被分配一个唯一的块标识符。

数据块副本选择: 在写入数据块之前,客户端需要选择数据块的副本位置。这通常是基于 HDFS 的副本放置策略,旨在减少数据传输的开销和延迟。

数据块写入: 客户端将数据块分别发送给副本位置所在的 DataNode。DataNode 接收到数据块后,会将数据块暂存到本地磁盘上的临时文件中。

数据块复制: 一旦数据块被写入到一个 DataNode 的临时文件中,该 DataNode 会将其复制到其他副本位置所在的 DataNode 上。

副本确认: 当所有副本都完成数据写入后,DataNode 会向客户端发送副本确认信息。客户端收到所有副本的确认信息后,将告知 NameNode 数据块的写入完成。

MapReduce的执行过程

MapReduce 中 combiner 作用

Hadoop 中的 Combiner 是 MapReduce 作业中的一个可选组件,用于在 Map 阶段对输出数据进行部分聚合。

Combiner 可以减少 Map 和 Reduce 之间传输的数据量,从而降低网络带宽的消耗。

同时,Combiner 还可以减少 Reduce 阶段的计算量,提高作业的执行效率。

hadoop 调度器及其工作方法

Fifo schedular:默认,先进先出的原则。

Capacity schedular:计算能力调度器,选择占用最小,优先级高的先执行,以此类推。

Fair schedule:公平调度,所有的 job 具有相同的资源。

Hive 中内部表与外部表区别(创建删除角度)

创建表阶段: 外部表创建表的时候,不会移动数到数据仓库目录中(/user/hive/warehouse),只会记录表数 据存放的路径,内部表会把数据复制或剪切到表的目录下。

删除表阶段: 外部表在删除表的时候只会删除表的元数据信息不会删除表数据,内部表删除时会将元数据 信息和表数据同时删除

Hadoop 的 2 个主要组件及其功能

Hadoop 主要由两个核心组件组成:Hadoop Distributed File System (HDFS) 和 MapReduce。

HDFS:Hadoop 的分布式文件系统,用于存储和管理大数据集。它采用主/从架构,包括一个 NameNode(主节点)和多个 DataNode(从节点)。NameNode 管理文件系统的元数据, 而 DataNode 存储实际的数据块。

MapReduce:Hadoop 的编程框架,用于处理存储在 HDFS 中的大数据。它将复杂的数据处 理任务分解为两个主要阶段:Map 阶段和 Reduce 阶段。Map 阶段将数据划分为多个键值对,Reduce 阶段则对这些键值对进行聚合和输出。

Hadoop MapReduce 的工作流程

输入阶段:MapReduce 作业从 HDFS 或其他输入源读取数据。

Map 阶段:Map 任务将输入数据划分为多个键值对,并对其进行处理。处理结果作为中间 键值对输出到本地磁盘。

Shuffle 阶段:MapReduce 框架将 Map 任务输出的中间键值对按照键进行排序和分组,并将 相同键的值传递给 Reduce 任务。

Reduce 阶段:Reduce 任务接收 Shuffle 阶段传递过来的键值对,对它们进行聚合处理,并将 结果输出到 HDFS 或其他输出源。

输出阶段:MapReduce 作业将 Reduce 任务输出的结果存储到 HDFS 或其他输出目标。

正常工作的 hadoop 集群中 Hadoop 分别启动哪些进程及其作用

Namenode:管理集群,并记录 datanode 文件信息。

Secondnamenode:可以做冷备,对一定范围内数据做快照性备份。

Datanode:存储数据 。

ResourceManager:负责集群中所有资源的统一管理和分配,它接收来自各个节点 (nodemanager)的资源汇报信息,并把这些信息按照一定的策略分配给各个应用程序(实际上是 applicationmanager)

NodeManager:是 yarn 中每个节点上的代理,它管理 hadoop 集群中单个计算节点,包括与 resoucemanager 保持通信,监督 container 的生命周期管理,监控每个 container 的资源 使用(内存、cpu 等)情况,追踪节点健康状况,管理日志和不同应用程序用到的附属服务。

Hadoop 中的 NameNode 和 DataNode 分别承担的角色

NameNode:NameNode 是 HDFS 中的主节点,负责维护文件系统的元数据,如文件目录结 构、文件块与 DataNode 的映射关系等。它不接受客户端的读写请求,而是将请求转发给 DataNode,并协调数据的读写操作。

DataNode:DataNode 是 HDFS 中的从节点,负责存储实际的数据块。客户端通过与 NameNode 通信获取数据块的存储位置,然后直接与 DataNode 进行数据的读写操作。DataNode 还会定 期向 NameNode 发送心跳信息,以报告自身的状态和数据块的完整性。

Hadoop 集群中的 NameNode 如何保证单点故障问题

Hadoop 集群中的 NameNode 单点故障问题可以通过设置 NameNode 的高可用性(HA)来解 决。

HA 配置使用两个 NameNode 实例,一个处于活动状态(Active),另一个处于备用状态 (Standby)。

当活动 NameNode 出现故障时,备用 NameNode 会自动接管其工作,从而确保集群的连续 性和可用性。

此外,还可以使用 Zookeeper 等分布式协调服务来监控和管理 NameNode 的状态和切换过程。

Hadoop 中的 Block 是什么及其作用

Hadoop 中的 Block 是 HDFS 中数据存储的基本单位,通常大小为 128MB(可配置)。HDFS 将数据划分为多个 Block,并将它们分散存储在集群中的多个 DataNode 上。

这种设计提高了数据的可靠性和可扩展性,因为每个 Block 都有多个副本存储在不同的节点上,并且可以通过增加节点来扩展存储能力。

Hadoop 在大数据处理中的优势

可扩展性:Hadoop 能够处理 PB 级甚至更大的数据集,通过增加节点可以轻松地扩展集群的处理能力。

容错性:Hadoop 采用分布式存储和计算的方式,将数据分散存储在多个节点上,并通过数据冗余和故障恢复机制保证数据的高可用性。

灵活性:Hadoop 支持多种编程语言和工具进行数据处理,如 Java、Python 等,同时提供了丰富的 API 和生态系统供开发者使用。

成本效益:Hadoop 运行在普通硬件上,相比传统的高性能计算集群具有更低的成本。同时, Hadoop 的开源特性使得用户可以免费获取和使用它。

FsImage 镜像文件和 EditLog 日志文件 HDFS 中提供的 Secondary NameNode 节点的职责

FsImage 镜像文件用于存储整个文件系统命名空间的信息,EditLog 日志文件用于持久化记录文件系统元数据发生的变化。

Secondary NameNode 节点主要是周期性的把 NameNode 中的 EditLog 日志文件合并到 FsImage 镜像文件中,从而减小 EditLog 日志文件的大小,缩短集群重启时间,并且也保证 了 HDFS 系统的完整性。

Hadoop 中的分区器(Partitioner)的作用

在 Hadoop 的 MapReduce 作业中,分区器用于确定 Map 阶段输出的键值对应该发送到哪个 Reducer 进行处理。

分区器根据键的哈希值或其他逻辑将数据划分为不同的分区,并将每个分区的数据发送到对应的 Reducer。

这种设计可以确保具有相同键的数据被发送到同一个 Reducer 进行处理,从而便于在 Reduce 阶段进行聚合操作。

YARN(Yet Another Resource Negotiator)是 Hadoop 的一个资源管理系统,它负责集群中的 资源管理和作业调度。

YARN 提交作业的流程

作业提交: Client(客户端)调用 job.waitForCompletion 方法,向整个集群提交 MapReduce 作业。 Client 向 ResourceManager(RM)申请一个作业 ID。 RM 给 Client 返回该 job 的资源提交路径和作业 ID。 Client 提交 jar 包、切片信息和配置文件到指定的资源提交路径。 Client 提交完资源后,向 RM 申请运行 MRAppMaster(MapReduce 应用程序主)。

作业初始化: 当 RM 收到 Client 的请求后,将该 job 添加到容量调度器中。 某一个空闲的 NodeManager(NM)领取到该 Job。 该 NM 创建 Container,并产生 MRAppMaster。 下载 Client 提交的资源到本地。

资源分配: MRAppMaster 与 RM 的应用程序管理器(Application Manager)进行交互,协商以获取运行 作业所需的资源。 RM 的调度器根据集群的资源情况和配置的调度策略(如容量调度、公平调度等),将资源 分配给 MRAppMaster。

任务执行: MRAppMaster 将获取到的资源分配给各个 NM。 NM 上的 Executor(执行器)根据分配到的资源和作业的具体任务,开始执行 Map Task 或 Reduce Task。

作业监控和完成: RM 的应用程序管理器负责监控 MRAppMaster 的运行状态,并在必要时进行重启。 当所有任务执行完毕后,MRAppMaster 通知 RM 作业已完成。 Client 通过 RM 获取作业完成状态,并获取结果。

Mapper+Reducer+Master词频统计代码

Mapper

package edu.mapper;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//获取读取到的每一行数据String line = value.toString();//通过空格进行分割String[] words = line.split(" ");//将初步处理的结果进行输出for (String word: words) {context.write(new Text(word), new IntWritable(1));}}
}

Reducer

package edu.reducer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class WordCountReducer extends Reducer<Text, IntWritable, Text,IntWritable> {protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {Integer count = 0;for(IntWritable value: values){//将每一个分组中的数据进行累加计数count += value.get();}//将最终结果进行输出context.write(key, new IntWritable(count));}
}

Master

package edu;
import edu.mapper.WordCountMapper;
import edu.reducer.WordCountReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;public class WordCountMaster {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {//初始化配置Configuration conf = new Configuration();conf.set("fs.defaultFS", "hdfs://virtual-machine-name:9000");//初始化job参数,指定job名称Job job = Job.getInstance(conf,"WordCount");//设置运行job的类job.setJarByClass(WordCountMaster.class);//设置Mapper类job.setMapperClass(WordCountMapper.class);//设置Reducer类job.setReducerClass(WordCountReducer.class);//设置Map的输出数据类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);//设置Reducer的输出数据类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);//设置输入的路径//FileInputFormat.setInputPaths(job, new Path(Hadoop_Url+"input/wordCount"));//设置输出的路径//FileOutputFormat.setOutputPath(job, new Path(Hadoop_Url+"output/wordCount/1"));//指定该mapreduce程序数据的输入和输出路径Path inputPath = new Path("/wordcount/input");Path outputPath = new Path("/wordcount/output");FileSystem fs = FileSystem.get(conf);if(fs.exists(outputPath)){fs.delete(outputPath, true);}FileInputFormat.setInputPaths(job, inputPath);FileOutputFormat.setOutputPath(job, outputPath);//提交jobboolean result = job.waitForCompletion(true);//执行成功后进行后续操作if (result) {System.out.println("Congratulations!");}}
}

计算第四列每个元素出现的个数

a,b,c,d

b,b,f,e

a,a,c,f

mapper

import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;public class LogMapper extends Mapper<LongWritable, Text, Text, LongWritable> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();String[] columns = line.split(",");if (columns.length == 4) {context.write(new Text(columns[3]), new LongWritable(1));}}
}

reducer

import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;public class LogReducer extends Reducer<Text, LongWritable, Text, LongWritable> {@Overrideprotected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {long count = 0;for (LongWritable value : values) {count += value.get();}context.write(key, new LongWritable(count));}
}

main

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class LogAnalysis {public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf, "Log Analysis");job.setJarByClass(LogAnalysis.class);job.setMapperClass(LogMapper.class);job.setReducerClass(LogReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(LongWritable.class);FileInputFormat.addInputPath(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);}
}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com