一、Hive 小文件概述
在Hive中,所谓的小文件是指文件巨细远小于HDFS块巨细的文件,一般小于128 MB,乃至更少。这些小文件可能是Hive表的一部分,每个小文件都包含一个或几个表的记录,它们以文本格局存储。
Hive一般用于分析许多数据,但它在处理小文件方面表现欠安,Hive中存在许多小文件会引起以下问题:
-
存储空间占用过多:在Hadoop生态系统中,每个小文件都将占用必定的存储空间,并且每个小文件也需求一个块来存储。假如存在许多的小文件,将浪费许多的存储空间。
-
处理推迟:小文件数量过多,会引起许多IO操作,导致处理推迟。
-
查询功能下降:小文件用于分区和表区分,可能导致查询推迟并下降查询功能。此外,小文件还会添加元数据的数量,使得Hive在查询元数据时变得更加缓慢。
-
数据歪斜:假如数据散布不均匀,会导致一些Reduce使命处理了彻底不同的分区,这会使某些Reduce使命的运转速度与其他Reduce使命比较非常慢。
因而,为了防止这些问题,咱们需求对Hive中小文件的处理进行优化,削减小文件数量和巨细,以进步数据处理功率和准确性。
二、Hive 小文件发生的布景
Hive中小文件发生的布景主要是由于以下两个原因:
-
数据写入频率较高:假如表的写入频率较高,也就意味着会频频地添加、更新或删去记录,这可能会导致小文件的发生。由于Hive表被映射到HDFS文件,因而假如频频地写入数据,它们可能以小文件的形式存在。
-
映射表的切分限制:Hive表映射为HDFS文件时会依照数据块巨细进行切分和管理。假如表中存在小于单个数据块巨细的数据,生成的文件就会比数据块小。这可能会导致许多小文件的发生,
综上所述,Hive中小文件的存在与数据写入频率高和表映射为HDFS文件的切分方法有关。为了处理小文件问题,咱们需求了解这些布景并针对其原因来优化处理。
三、环境预备
假如现已有了环境了,能够疏忽,假如想快速布置环境进行测试能够参阅我这篇文章:经过 docker-compose 快速布置 Hive 详细教程
# 登录容器
docker exec -it hive-hiveserver2 bash
# 衔接hive
beeline -u jdbc:hive2://hive-hiveserver2:10000 -n hadoop
四、Hive 小文件管理
为了处理Hive中的小文件问题,能够采纳以下一些有用办法:
-
文件兼并:将多个小文件兼并成一个大文件,采用 Hadoop 文件兼并API能够将多个小文件兼并成一个大文件。兼并文件后,能够削减小文件数量,削减Hadoop文件管理担负,削减HDFS元数据和NameNode内存耗费。
-
紧缩文件:能够运用紧缩算法(如gzip、bzip2等)对小文件进行紧缩,这样能够削减磁盘空间和网络带宽的运用,并削减小文件损坏的可能性。
-
存储格局优化:Hive支撑多种存储格局,如ORC、Parquet、Avro等。这些格局允许将多个小文件紧缩并序列化成一个大文件,存储时占用更少的磁盘和网络带宽。存储格局优化关于处理小文件问题非常有用。
-
分区表:关于一些常变化的数据,引荐运用分区表。分区表将数据依照不同的分区值存储在不同的目录中。这削减了小文件数量并进步了查询功率。
-
废物回收:假如一个表旧数据常常更新或删去,就会发生许多无用的小文件,因而主张进行废物回收。能够定期履行HDFS文件删去指令或许设置TTL等机制,定期删去冗余数据以削减HDFS文件、元数据和NameNode内存的耗费。
经过采纳上述办法中的一种或多种,能够极大地削减Hive中小文件数量,优化Hive表的表现并进步查询功率。
1)小文件兼并(常用)
能够运用以下指令将 Hive 表中的小文件兼并为一个大文件:
set hive.merge.size.per.task=256000000;
set hive.merge.smallfiles.avgsize=16000000;
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
set hive.merge.mapfiles=true;
# 未分区
INSERT OVERWRITE TABLE table_new SELECT * FROM table_old;
# 分区
INSERT OVERWRITE TABLE table_new SELECT column1,column2 FROM table_old where partitions;
上述代码中的参数意义如下:
-
hive.merge.size.per.task
:设置MapReduce使命处理的最大数据巨细,单位是字节,默许为256MB。 -
hive.merge.smallfiles.avgsize
:设置假如小于该平均巨细的文件需求兼并在一起,以减小小文件的数量和规划,单位是字节,默许为16MB。 -
hive.input.format
:运用 CombinHiveInputFormat 作为输入格局兼并小文件。 -
hive.merge.mapfiles
:兼并Map文件(.mapred或.mapreduce)以削减小文件的数量。
1、示例演示一(非分区表)
# 非分区表
CREATE TABLE student (
id INT,
name STRING,
age INT,
address STRING
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n';
-- 添加数据,这儿多履行几回,会生成多个文件,便利下面文件兼并试验
INSERT INTO TABLE student VALUES (1, 'stu1', 15, 'add1'),(2, 'stu2', 16, 'add2'),(3, 'stu3', 17, 'add3');
-- 也可运用LOAD DATA LOCAL
LOAD DATA LOCAL INPATH './stu.txt' INTO TABLE student;
从上图可看到现已有许多小文件了,接下来便是进行兼并了。履行以下指令即可:
INSERT OVERWRITE TABLE student SELECT * FROM student;
现已将多个文件兼并成一个文件了,到达了小文件兼并的作用了。
2、示例演示二(分区表)
其实用的多的还是按分区进行兼并,一般表都是有分区的,按分区兼并的优点便是削减读写压力,数据量大的状况下分批兼并是非常友好的。
# 分区表
CREATE TABLE student_patitions (
id INT,
name STRING,
age INT,
address STRING
)
PARTITIONED BY (year string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n';
-- 敞开动态分区,默许是false
set hive.exec.dynamic.partition=true;
-- 敞开允许一切分区都是动态的,否则有必要要有静态分区才能运用。
set hive.exec.dynamic.partition.mode=nostrick;
-- Hive默许状况下设置的最大动态分区创立数是100。
set hive.exec.max.dynamic.partitions=10000;
-- 添加数据,这儿多履行几回,会生成多个文件,便利下面文件兼并试验
INSERT INTO TABLE student_patitions PARTITION (year=2019) VALUES (1, 'stu1', 15, 'add1'),(2, 'stu2', 16, 'add2'),(3, 'stu3', 17, 'add3');
INSERT INTO TABLE student_patitions PARTITION (year=2023) VALUES (1, 'stu1', 15, 'add1'),(2, 'stu2', 16, 'add2'),(3, 'stu3', 17, 'add3');
-- 也可运用LOAD DATA LOCAL
LOAD DATA LOCAL INPATH './stu_pt.txt' INTO TABLE student_patitions PARTITION (year=2020);
从上图可看到现已有许多小文件了,接下来便是进行兼并了。履行以下指令即可:
-- 按分区兼并
insert overwrite table student_patitions partition(year=2019)
select id, name, age, address from student_patitions where year=2019;
-- 动态分区兼并,有些版别不支撑*,
-- *
insert overwrite table student_patitions partition(year) select * from student_patitions;
-- insert overwrite table student_patitions partition(year) select id, name, age, address from student_patitions;
-- 也能够经过load data方法
load data local inpath './stu_pt.txt' overwrite into table student_patitions partition(year=2019);
3、示例演示三(暂时表)
还有一个更靠谱的计划便是经过将现有的表数据兼并写到别的一张暂时新表,然后承认兼并无误后,将原始表和表数据删去,再将新表名改成旧表名。
示例如下:
-- 分区表
CREATE TABLE student_patitions2 (
id INT,
name STRING,
age INT,
address STRING
)
PARTITIONED BY (year string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n';
-- 敞开动态分区,默许是false
set hive.exec.dynamic.partition=true;
-- 敞开允许一切分区都是动态的,否则有必要要有静态分区才能运用。
set hive.exec.dynamic.partition.mode=nostrick;
-- Hive默许状况下设置的最大动态分区创立数是100。
set hive.exec.max.dynamic.partitions=10000;
-- 添加数据,这儿多履行几回,会生成多个文件,便利下面文件兼并试验
INSERT INTO TABLE student_patitions2 PARTITION (year=2019) VALUES (1, 'stu1', 15, 'add1'),(2, 'stu2', 16, 'add2'),(3, 'stu3', 17, 'add3');
INSERT INTO TABLE student_patitions2 PARTITION (year=2023) VALUES (1, 'stu1', 15, 'add1'),(2, 'stu2', 16, 'add2'),(3, 'stu3', 17, 'add3');
-- 也可运用LOAD DATA LOCAL
LOAD DATA LOCAL INPATH './stu_pt.txt' INTO TABLE student_patitions2 PARTITION (year=2020);
创立暂时表并将添加兼并数据
CREATE TABLE student_patitions2_temp (
id INT,
name STRING,
age INT,
address STRING
)
PARTITIONED BY (year string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n';
-- 按分区兼并,有些版别不支撑*
insert overwrite table student_patitions2_temp partition(year)
select * from student_patitions2;
-- insert overwrite table student_patitions2_temp partition(year) select id, name, age, address from student_patitions2;
-- 也能够经过load data方法
load data local inpath './stu_pt.txt' overwrite into table student_patitions2_temp partition(year=2019);
删去旧表,修正表表称号
# 删表,假如是外部表还是删去数据文件
DROP TABLE student_patitions2;
ALTER TABLE student_patitions2_temp RENAME TO student_patitions2;
2)文件紧缩
能够运用以下指令将表中的小文件进行紧缩:
SET hive.exec.compress.output=true;
SET mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec;
INSERT OVERWRITE TABLE table_new SELECT * FROM table_old;
3)存储格局优化
运用存储格局进行优化,能够将多个小文件紧缩和序列化成一个大文件。以下是运用ORC格局的完结示例:
SET hive.exec.compress.output=true;
SET orc.compress=SNAPPY;
SET hive.exec.orc.default.compress=SNAPPY;
CREATE TABLE table_new STORED AS ORC AS SELECT * FROM table_old;
上述代码中的参数意义如下:
-
hive.exec.compress.output
:指定是否敞开紧缩,假如启用则会对输出进行紧缩,以节省存储空间和网络带宽。 -
orc.compress
:设置紧缩算法,这儿运用SNAPPY
。 -
hive.exec.orc.default.compress
:设置ORC文件默许紧缩算法,这儿运用SNAPPY
。
4)分区表
能够运用以下SQL句子创立分区表:
CREATE TABLE table_new(
column1 INT,
column2 STRING
)
PARTITIONED BY (
day STRING
)
ROW FORMAT DELIMITED
STORED AS TEXTFILE;
这儿将表依照分区值进行存储,能够进步查询功率,削减小文件数量。
5)废物回收
删去HDFS中过期的小文件能够削减 HDFS 的存储开支。
能够运用如下指令进行删去操作:
hdfs dfs -rm /path/to/file-*
也能够运用 HiveQL 参数 EXPIRE 进行废物回收,以将无用的文件从HDFS中删去:
ALTER TABLE table_old DROP PARTITION (day '2016-01-01') PURGE;
上述代码中将删去旧的分区并从HDFS中永久删去不再需求的数据。
综上所述,能够经过上述方法来处理Hive中小文件问题,以进步Hive的查询功率和功能。
五、HDFS 数据平衡
1)HDFS 数据歪斜
HDFS数据歪斜是指存在一些数据块的巨细显着大于其他数据块,导致作业在运转时的处理时刻和功能严重不平衡。这一般是由于数据散布不均匀,或许使命负载不均匀导致的。Hive的MapReduce作业常常面对HDFS数据歪斜的问题,这会导致一部分Mapper处理的数据量很大,而其他Mapper却没有得到充分利用。
以下是一些缓解HDFS数据歪斜的方法:
-
增大文件块巨细:假如您的作业常常面对数据歪斜问题,能够测验增大数据块的巨细。这样能够下降Mapper需求处理的数据块数量,然后削减数据块分配不均衡的可能性。
-
数据兼并:假如您的作业中存在许多较小的文件,能够测验将它们兼并为几个较大的文件。这样能够削减地图使命的数目,并有助于均衡使命的负载。
-
数据重分区:假如在您的作业中数据散布极不均匀,能够测验运用数据重分区(例如Hive中的 CLUSTER BY 或 DISTRIBUTE BY 句子)来重新组织数据。这能够协助将相似的数据放在同一个分区中,然后削减数据歪斜的可能性。
-
动态分区:在Hive中,动态分区可用于依据数据中实践的分区键动态创立分区。它能够运用较小的数据块巨细来进步作业的并行性。动态分区还能够经过保证数据分配均衡来缓解数据歪斜的问题。
-
紧缩:运用紧缩技能能够减小数据块巨细,并削减歪斜问题的可能性。常用的紧缩格局包含Gzip、Snappy、LZO等。
HDFS数据歪斜不仅可能出现在数据块的巨细上,还可能出现在数据节点(Datanode)的负载上。假如一个Datanode存储的数据块远远多于其他Datanode,那么它处理作业时的负载将远高于其他节点,然后导致整个集群功能下降。下面是一些缓解HDFS数据节点歪斜问题的方法:
-
添加节点:能够向集群中添加更多的节点,以添加存储能力。这样能够分散节点的负载,防止单个节点负载过高。虽然这样做可能会添加集群的维护本钱,但它能够进步集群的功能和可靠性。一般添加完新节点需求做数据平衡,要不然新节点磁盘运用率远低于其它节点的磁盘。
-
均衡数据散布:您能够运用HDFS中的均衡指令(hdfs balancer)来均衡数据散布。该指令将依据需求将块移动到不同的节点,以坚持一切节点的负载相对均衡。
-
更改块巨细:当块巨细不均衡时,您能够测验依据每个节点的存储容量添加或削减块巨细,以保证每个节点的负载相对均衡。例如,假如一个节点存储许多的小文件,则能够将块巨细添加到更适合这种状况的巨细(例如512MB或1GB),以削减每个节点的块数。
-
数据迁移:假如一个节点负载过高,您能够从该节点中移动一些块到其他节点中,以减轻该节点的负载。这能够经过将块从一个节点复制到另一个节点来完结。需求留意的是,这样做可能会影响作业的功能,因而主张在维护适宜的功能的同时进行数据迁移。
需求留意的是,缓解HDFS数据节点歪斜问题需求归纳考虑多种因素,包含数据散布、集群规划、硬件配置等。依据具体状况,您能够采纳不同的办法来缓解数据节点歪斜的问题。
2)HDFS 数据平衡
HDFS提供了 hdfs balancer
指令来进行数据平衡呢。hdfs balancer指令能够让HDFS集群重新均衡散布数据块,保证HDFS集群中数据块在各个节点上均衡散布。
hdfs balancer
指令的语法如下:
hdfs balancer -help
Usage: java Balancer
[-policy <policy>] the balancing policy: datanode or blockpool
[-threshold <threshold>] Percentage of disk capacity
[-exclude [-f <hosts-file> | comma-sperated list of hosts]] Excludes the specified datanodes.
[-include [-f <hosts-file> | comma-sperated list of hosts]] Includes only the specified datanodes.
参数详解:
-
-threshold
:某datanode的运用率和整个集群运用率的百分比差值阈值,到达这个阈值就发动hdfs balancer,取值从1到100,不宜太小,由于在平衡进程中也有数据写入,太小无法到达平衡,默许值:10
-
-policy
:分为blockpool和datanode,前者是block pool等级的平衡后者是datanode等级的平衡,BlockPool 战略平衡了块池等级和 DataNode 等级的存储。BlockPool 战略仅适用于 Federated HDFS 服务 -
-exclude
:不为空,则不在这些机器上进行平衡 -
-include
:不为空,则仅在这些机器上进行平衡-idleiterations
:最大迭代次数
别的还有两个常用的参数:
-
dfs.datanode.balance.bandwidthPerSec
:HDFS做均衡时运用的最大带宽,默许为1048576,即1MB/s,对大多数千兆乃至万兆带宽的集群来说过小。不过该值能够在发动balancer脚本时再设置,能够不修正集群层面默许值。目前目前咱们产线环境设置的是50M/s~100M/s。 -
dfs.balancer.block-move.timeout
:是一个Hadoop数据平衡指令hdfs balancer的选项之一,用于设置数据块移动的最长时刻。该选项指定了块移动操作在多长时刻内有必要完结。该选项默许值为120000
毫秒(即2分钟),能够经过以下指令进行修正:
简单运用:
# 发动数据平衡,默许阈值为 10%
hdfs balancer
# 默许相差值为10% 带宽速率为10M/s,超时时刻10分钟,进程信息会直接打印在客户端 ctrl+c即可间断
hdfs balancer -Ddfs.balancer.block-move.timeout=600000
#能够手动设置相差值 一般相差值越小 需求平衡的时刻就越长,//设置为20% 这个参数本身便是百分比 不用带%
hdfs balancer -threshold 20
#假如怕影响事务能够动态设置一下带宽再履行上述指令,1M/s
hdfs dfsadmin -setBalancerBandwidth 1048576
#或许直接带参运转,带宽为1M/s
hdfs balancer -Ddfs.datanode.balance.bandwidthPerSec=1048576 -Ddfs.balancer.block-move.timeout=600000
关于 Hive 小文件管理和 HDFS 数据平衡讲解就先到这儿了,有任何疑问欢迎给我留言或私信,也可重视我的大众号【大数据与云原生技能分享】加群交流或私信交流,假如本文章对你有所帮协助,帮助一键三连(点赞、转发、分享)~