Hive 数据倾斜优化
在使用 Hive 进行大数据处理时,数据倾斜是一个常见的问题。本文将详细介绍数据倾斜的概念、表现、常见场景及其解决方案。
1. 什么是数据倾斜?
数据倾斜是指由于数据分布不均匀,导致大量数据集中到某个节点或任务中,造成处理延迟和性能瓶颈。
2. 数据倾斜的表现
- 作业进度长时间维持在接近完成状态(99%或100%)。
- 查看任务监控页面时,发现少量 reduce 任务未完成,因为其处理的数据量远超其他任务。
3. 容易产生数据倾斜的场景
3.1 Join 操作:
- 小表与大表 join 时,key 分布不均。
- 大表与大表 join 时,分桶字段存在大量空值。
3.2 Group By 操作不和聚集函数搭配使用的时候:
-
原因:
- 当某些 key 的值在数据集中频繁出现时,相关的数据将集中到一个或少数的 Reducer 上进行处理。
- 这些 Reducer 处理的数据量过大,导致运行时间长。
-
表现:
- 某个 Reducer 的任务处理时间明显长于其他 Reducer。
- 资源分配不均匀,影响整体作业效率。
方法
- 调整数据分布:通过添加随机数等方法,重新分配数据,减少单个 key 负载。
- 增加 Reducer 数量:合理增加 Reducer 来分散压力。
3.3 Count Distinct 操作:
-
原因:
- 需要对唯一值进行计算,因为 count(distinct)是按 group by字段分组,按 distinct字段排序。
- 如果某个字段的值分布不均匀,某些值过于集中,会导致相关 Reducer 负载过重。
-
表现:
- 处理时间长,可能导致内存溢出。
- 某些任务比其他任务需要更多的时间来完成。
解决方法
- 近似计算:使用
approx_distinct
或其他近似方法减少计算复杂度。 - 预聚合:在进行去重前,先对数据进行预处理,减少数据量。
4. 数据倾斜的原因
-
Key 分布不均匀:
- 在分组(Group By)或连接(Join)中,某些 key 的数据远多于其他 key。
-
业务数据特性:
- 某些特定值(如默认值、异常值)出现频率过高。
-
建表时考虑不周:
- 未合理设计表的分区或分桶策略。
-
SQL 语句特性:
- 特定 SQL 语句在逻辑上引起数据集中。
5. 解决数据倾斜的常用方案
1. JOIN优化
(1)空值产生的数据倾斜
- 场景说明:日志中的 user_id 丢失,导致与用户表关联时出现倾斜。
- 解决方案:
-
方案1:不参与关联
SELECT * FROM log a JOIN user b ON a.user_id IS NOT NULL AND a.user_id = b.user_id UNION ALL SELECT * FROM log c WHERE c.user_id IS NULL;
-
方案2:赋予空值新 key 值
SELECT * FROM log a LEFT OUTER JOIN user b ON CASE WHEN a.user_id IS NULL THEN CONCAT('hive', RAND()) ELSE a.user_id END = b.user_id;
-
总结:方案2效率更高,通过随机字符串分散空值数据。
-
(2)不同数据类型关联产生的数据倾斜
-
场景说明:user 表中的 user_id 为 int,log 表中为 string。
-
解决方案:统一数据类型
SELECT * FROM user a LEFT OUTER JOIN log b ON b.user_id = CAST(a.user_id AS STRING);
(3)大小表关联查询产生的数据倾斜
-
场景说明:使用 map join 解决小表关联大表的倾斜问题。
-
解决方案:
使用 map join 在内存中处理小表,避免 reduce 阶段:
SELECT /*+ MAPJOIN(b) */ a.id, b.name FROM large_table a JOIN small_table b ON a.id = b.id;
- Hive 中自动开启 map join 优化:
SET hive.auto.convert.join=true; SET hive.mapjoin.smalltable.filesize=25000000;
- Hive 中自动开启 map join 优化:
-
大表关联:将大表切分成小表,再分别进行 map join。
-
小表不大不小:
如果小表较大,无法直接用 map join,则采用如下策略:
SELECT /*+ MAPJOIN(x) */ * FROM log a LEFT OUTER JOIN (SELECT /*+ MAPJOIN(c) */ d.*FROM (SELECT DISTINCT user_id FROM log) cJOIN users d ON c.user_id = d.user_id ) x ON a.user_id = x.user_id;
- 总结:根据具体场景选择适合的优化策略。
2. Map 阶段优化
-
使用 Combiner:
- 在 Map 阶段聚合中间结果,减少传输数据量。
-
MapJoin 优化:
- 对小表进行 MapJoin,在 Map 阶段完成连接。
SELECT /*+ MAPJOIN(small_table) */ ... FROM large_table JOIN small_table ON ...
3. 增加 Reducer 个数
- 根据数据量合理调整 Reducer 的数量,以分散负载。
SET mapreduce.job.reduces = <num>;
4. 优化 Count Distinct
- 减少使用使用
Count Distinct
次数或使用approx_distinct
等近似计算方法。
6. 其他
数据扩散
- 定义:数据扩散指的是在执行某些操作(如连接操作)时,数据量显著增加。例如,当两个表进行连接时,结果集的大小远远超过原始表的大小。
- 影响:会导致资源消耗增加,处理时间变长,甚至可能导致内存溢出。
- 解决方法:
- 优化连接条件,确保只连接必需的数据。
- 使用过滤条件提前减少数据量。
数据漂移
- 定义:数据漂移通常指的是数据在不同时间段内的分布或特征发生了变化。例如,由于时间延迟,当天的数据可能在第二天被处理。
- 影响:数据分析结果可能不准确,影响实时性。
- 解决方法:
- 设计合理的时间窗口,确保数据在合适的时间范围内被处理。
- 定期检查和调整数据处理策略以适应数据特征的变化。