# Hive 优化

# 一、启用压缩

压缩可以使磁盘上存储的数据量变小,例如,文本文件格式能够压缩40%甚至更高比例,这样可以通过降低I/O来提高查询速度。除非产生的数据用于外部系统,或者存在格式兼容性问题,建议总是启用压缩。压缩与解压缩会消耗CPU资源,但Hive产生的MadReduce作业往往是I/O密集型的,因此CPU开销通常不是问题。

# 1.1 压缩格式

MR支持的压缩编码

压缩格式 算法 文件扩展名 是否可切分
DEFLATE DEFLATE .deflate
Gzip DEFLATE .gz
bzip2 bzip2 .bz2
LZO LZO .lzo
Snappy Snappy .snappy

为了支持多种压缩/解压缩算法,Hadoop引入了编码/解码器,如下表所示:

压缩格式 对应的编码/解码器
DEFLATE org.apache.hadoop.io.compress.DefaultCodec
Gzip org.apache.hadoop.io.compress.GzipCodec
bzip2 org.apache.hadoop.io.compress.BZip2Codec
LZO com.hadoop.compression.lzo.LzopCodec
Snappy org.apache.hadoop.io.compress.SnappyCodec

查看群集开启了哪些压缩

hadoop checknative
1

压缩性能的比较

压缩算法 原始文件大小 压缩文件大小 压缩速度 解压速度
gzip 8.3GB 1.8GB 17.5MB/s 58MB/s
Gzip 8.3GB 1.1GB 2.4MB/s 9.5MB/s
bzip2 8.3GB 2.9GB 49.3MB/s 74.6MB/s

WARNING

注意输出压缩并不适用所有文件格式,比如ORC就不需要以上设置。具体可以看我做的sequence 对比 orc测试。

# 1.2 压缩参数配置

要在 Hadoop 中启用压缩,可以配置如下参数(mapred-site.xml 文件中):

参数 默认值 阶段 建议
io.compression.codecs(在core-site.xml中配置) org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.Lz4Codec 输入压缩 Hadoop使用文件扩展名判断是否支持某种编解码器
mapreduce.map.output.compress false mapper输出 这个参数设为true启用压缩
mapreduce.map.output.compress.codec org.apache.hadoop.io.compress.DefaultCodec mapper输出 使用 LZO、LZ4或snappy编解码器在此阶段压缩数据
mapreduce.output.fileoutputformat.compress false reducer输出 这个参数设为true启用压缩
mapreduce.output.fileoutputformat.compress.codec org.apache.hadoop.io.compress.DefaultCodec reducer输出 使用标准工具或者编解码器,如gzip和bzip2
mapreduce.output.fileoutputformat.compress.type RECORD reducer输出 SequenceFile输出使用的压缩类型:NONE和BLOCK

# 1.3 开启 Map 输出阶段压缩(MR 引擎)

开启 map 输出阶段压缩可以减少 job 中 map 和 Reduce task 间数据传输量。具体配置如下:

  1. 开启 hive 中间传输数据压缩功能
hive (default)>set hive.exec.compress.intermediate=true;
1
  1. 开启 mapreduce 中 map 输出压缩功能
hive (default)>set mapreduce.map.output.compress=true;
1
  1. 设置 mapreduce 中 map 输出数据的压缩方式
hive (default)>set mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.SnappyCodec
1

# 1.4 开启 Reduce 输出阶段压缩

当 Hive 将输出写入到表中时,输出内容同样可以进行压缩。属性 hive.exec.compress.output 控制着这个功能。用户可能需要保持默认设置文件中的默认值 false, 这样默认的输出就是非压缩的纯文本文件了。用户可以通过在查询语句或执行脚本中设置这 个值为 true,来开启输出结果压缩功能。

  1. 开启 hive 最终输出数据压缩功能
hive (default)>set hive.exec.compress.output=true;
1
  1. 开启 mapreduce 最终输出数据压缩
hive (default)>set mapreduce.output.fileoutputformat.compress=true;
1
  1. 设置 mapreduce 最终数据输出压缩方式
hive (default)> set mapreduce.output.fileoutputformat.compress.codec =org.apache.hadoop.io.compress.SnappyCodec;
1
  1. 设置 mapreduce 最终数据输出压缩为块压缩
set mapreduce.output.fileoutputformat.compress.type=BLOCK;
1
  1. 测试一下输出结果是否是压缩文件
hive (default)> insert overwrite local directory '/opt/module/data/distribute-result' select * from emp distribute by deptno sort by empno desc;
1

# 二、存储格式

# 2.1 支持的存储格式

Hive 会在 HDFS 为每个数据库上创建一个目录,数据库中的表是该目录的子目录,表中的数据会以文件的形式存储在对应的表目录下。Hive 支持以下几种文件存储格式:

格式 说明
TextFile 默认格式,数据不做压缩,磁盘开销大,数据解析开销大。可结合 Gzip、Bzip2 使用,但使用 Gzip 这种方式,hive 不会对数据进行切分,从而无法对数据进行并行操作。
SequenceFile SequenceFile 是 Hadoop API 提供的一种二进制文件,它将数据以<key,value>的形式序列化到文件中。这种二进制文件内部使用 Hadoop 的标准的 Writable 接口实现序列化和反序列化。它与 Hadoop API 中的 MapFile 是互相兼容的。Hive 中的 SequenceFile 继承自 Hadoop API 的 SequenceFile,不过它的 key 为空,使用 value 存放实际的值,这样是为了避免 MR 在运行 map 阶段进行额外的排序操作。
RCFile RCFile 文件格式是 FaceBook 开源的一种 Hive 的文件存储格式,首先将表分为几个行组,对每个行组内的数据按列存储,每一列的数据都是分开存储。
ORC Files ORC 是在一定程度上扩展了 RCFile,是对 RCFile 的优化。
Avro Files Avro 是一个数据序列化系统,设计用于支持大批量数据交换的应用。它的主要特点有:支持二进制序列化方式,可以便捷,快速地处理大量数据;动态语言友好,Avro 提供的机制使动态语言可以方便地处理 Avro 数据。
Parquet Parquet 是基于 Dremel 的数据模型和算法实现的,面向分析型业务的列式存储格式。它通过按列进行高效压缩和特殊的编码技术,从而在降低存储空间的同时提高了 IO 效率。

以上压缩格式中 ORC 和 Parquet 的综合性能突出,使用较为广泛,推荐使用这两种格式。

# 2.2 列式存储和行式存储

  1. 行存储的特点

查询满足条件的一整行数据的时候,列存储则需要去每个聚集的字段找到对应的每个列的值,行存储只需要找到其中一个值,其余的值都在相邻地方,所以此时行存储查询的速度 更快。

  1. 列存储的特点

因为每个字段的数据聚集存储,在查询只需要少数几个字段的时候,能大大减少读取的数据量;每个字段的数据类型一定是相同的,列式存储可以针对性的设计更好的设计压缩算法。

# 2.3 Orc 格式

Orc (Optimized Row Columnar)是 Hive 0.11 版里引入的新的存储格式。 如下图所示可以看到每个 Orc 文件由 1 个或多个 stripe 组成,每个 stripe 一般为 HDFS 的块大小,每一个 stripe 包含多条记录,这些记录按照列进行独立存储,对应到 Parquet 中的 row group 的概念。每个 Stripe 里有三部分组成,分别是 Index Data,Row Data,Stripe Footer:

  1. Index Data:一个轻量级的 index,默认是每隔 1W 行做一个索引。这里做的索引应该 只是记录某行的各字段在 Row Data 中的 offset。
  2. Row Data:存的是具体的数据,先取部分行,然后对这些行按列进行存储。对每个 列进行了编码,分成多个 Stream 来存储。
  3. Stripe Footer:存的是各个 Stream 的类型,长度等信息。

每个文件有一个 File Footer,这里面存的是每个 Stripe 的行数,每个 Column 的数据类 型信息等;每个文件的尾部是一个 PostScript,这里面记录了整个文件的压缩类型以及 FileFooter 的长度信息等。在读取文件时,会 seek 到文件尾部读 PostScript,从里面解析到 File Footer长度,再读FileFooter,从里面解析到各个Stripe信息,再读各个Stripe,即从后往前读。

更多查看ORC官方文档 (opens new window)

# 2.4 Parquet 格式

Parquet 文件是以二进制方式存储的,所以是不可以直接读取的,文件中包括该文件的 数据和元数据,因此 Parquet 格式文件是自解析的。

  1. 行组(Row Group):每一个行组包含一定的行数,在一个 HDFS 文件中至少存储一 个行组,类似于 orc 的 stripe 的概念。
  2. 列块(Column Chunk):在一个行组中每一列保存在一个列块中,行组中的所有列连 续的存储在这个行组文件中。一个列块中的值都是相同类型的,不同的列块可能使用不同的 算法进行压缩。
  3. 页(Page):每一个列块划分为多个页,一个页是最小的编码的单位,在同一个列块 的不同页可能使用不同的编码方式。

通常情况下,在存储 Parquet 数据的时候会按照 Block 大小设置行组的大小,由于一般 情况下每一个 Mapper 任务处理数据的最小单位是一个 Block,这样可以把每一个行组由一 个 Mapper 任务处理,增大任务执行并行度。Parquet 文件的格式。

上图展示了一个 Parquet 文件的内容,一个文件中可以存储多个行组,文件的首位都是该文件的 Magic Code,用于校验它是否是一个 Parquet 文件,Footer length 记录了文件元数据的大小,通过该值和文件长度可以计算出元数据的偏移量,文件的元数据中包括每一个行组的元数据信息和该文件存储数据的 Schema 信息。除了文件中每一个行组的元数据,每一 页的开始都会存储该页的元数据,在 Parquet 中,有三种类型的页:数据页、字典页和索引 页。数据页用于存储当前行组中该列的值,字典页存储该列值的编码字典,每一个列块中最 多包含一个字典页,索引页用来存储当前行组下该列的索引,目前 Parquet 中还不支持索引页。

# 2.5 指定存储格式

通常在创建表的时候使用 STORED AS 参数指定:

CREATE TABLE page_view(viewTime INT, userid BIGINT)
 ROW FORMAT DELIMITED
   FIELDS TERMINATED BY '\001'
   COLLECTION ITEMS TERMINATED BY '\002'
   MAP KEYS TERMINATED BY '\003'
 STORED AS SEQUENCEFILE;
1
2
3
4
5
6

各个存储文件类型指定方式如下:

  • STORED AS TEXTFILE
  • STORED AS SEQUENCEFILE
  • STORED AS ORC
  • STORED AS PARQUET
  • STORED AS AVRO
  • STORED AS RCFILE

# 2.6 Sequence 对比 ORC

群集磁盘资源紧张,需要对表数据进行压缩,将从SequenceORC中选其一作为以后仓库表文件的存储格式。本文就SnappyZLIB(Gzip)压缩级别进行表大小以及查询效率的对比

  1. 压缩参数

Sequence

-- Snappy
set hive.exec.compress.output=true;
set mapred.output.compress=true;
set mapred.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
set mapred.output.compression.type=BLOCK;

-- Gzip
set hive.exec.compress.output=true;
set mapred.output.compress=true;
set mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec;
set mapred.output.compression.type=BLOCK;
1
2
3
4
5
6
7
8
9
10
11

ORC

-- Snappy
set hive.exec.orc.default.compress=SNAPPY;

-- ZLIB
set hive.exec.orc.default.compress=ZLIB;
1
2
3
4
5
  1. 大小对比

订单主表

格式 文件大小 占用大小 压缩比(倍)
text 19.8G 59.5G
seq_snappy 6.1G 18.4G 3.24
orc_snappy 4.2G 12.5G 4.71
seq_gzip 3.4G 10.3G 5.82
orc_zlib 3.0G 9.0G 6.6

订单明细表

格式 文件大小 占用大小 压缩比(倍)
text 18.8G 56.5G
seq_snappy 4.2G 12.5G 4.38
orc_snappy 2.6G 7.8G 7.23
seq_gzip 2.3G 6.9G 8.17
orc_zlib 1.7G 5.0G 11.05
  1. SQL查询效率

简单Count

格式 第一次 第二次 第三次
seq_snappy 23.16 21.813 23.031
orc_snappy 22.154 19.911 20.582
seq_gzip 33.362 26.446 27.537
orc_zlib 21.197 19.23 18.499

过滤并分组

格式 第一次 第二次 第三次
seq_snappy 27.436 26.453 27.796
orc_snappy 21.568 25.971 29.075
seq_gzip 35.16 78.649 38.198
orc_zlib 20.273 21.491 19.539

联查询Count

主表与明细表通过订单ID关联

格式 第一次 第二次 第三次
seq_snappy 64.336 68.128 63.755
orc_snappy 71.017 100.511 65.912
seq_gzip 98.201 79.246 79.736
orc_zlib 99.996 69.23 74.677

实际查询

主表与明细表通过订单ID关联,分组,单表select * , max ,sum等聚合

格式 第一次 第二次 第三次 第四次 第五次 第六次
seq_snappy 580.938 1065.992 787.999 1365.383 622.456 605.303
orc_snappy 696.887 1333.952 1034.791 806.678 789.647 794.424
seq_gzip 806.734 1448.114 1237.069 901.431 901.75 854
orc_zlib 1983 1983 1294.967 906.825 926.922 975.837

第六次细节

类型 Job Stage Map Reduce Elapsed Vcore Map(Seconds) Vcore Reduce(Seconds) Vcore Total(Sedonds) -(orc-seq)/seq
seq_snap job_1583725051064_59160 Stage-1 57 11 5mins,44sec 1694222 2924159 4618381
orc_snap job_1583725051064_59190 Stage-1 49 7 8mins,45sec 1494752 2895072 4389824 4.95%
seq_snap job_1583725051064_59187 Stage-2 38 10 4mins,8sec 1193042 1820443 3013485
orc_snap job_1583725051064_59198 Stage-2 39 10 4mins,12sec 1197185 1941388 3138573 -4.15%
seq_gzip job_1583725051064_59028 Stage-1 49 6 9mins,39sec 1667028 2851133 4518161
orc_zlib job_1583725051064_59046 Stage-1 48 5 11mins,24sec 1477584 2937803 4415387 2.27%
seq_gzip job_1583725051064_59043 Stage-2 42 10 4mins,19sec 1213325 2140155 3353480
orc_zlib job_1583725051064_59065 Stage-2 40 10 4mins,37sec 1206666 2188277 3394943 -1.24%
类型 Elapsed Vcore Total(Sedonds) -(orc-seq)/seq
seq_snap 9mins,52sec 7631866
orc_snap 12mins,57sec 7528397 1.36%
seq_gzip 13mins,58sec 7871641
orc_zlib 16mins 7810330 0.78%
  1. 结论
  • 以数据压缩后的大小来看,ORC完胜。
  • SQL查询方面,因为orc的文件更小,所以map和reduce数相对较小,导致运算时间增长(20%左右),但总体CPU资源耗用还是要比Seq格式少1%左右。

# 三、优化连接

可以通过配置Map连接和倾斜连接的相关属性提升连接查询的性能。一般处理大量小表关联导致运行缓慢

# 3.1 自动Map连接(跨服务器广播小表的Mapjoin)

当连接一个大表和一个小表时,自动Map连接是一个非常有用的特性。如果启用了该特性,小表将保存在每个节点的本地缓存中,并在Map阶段与大表进行连接。开启自动Map连接提供了两个好处。首先,将小表装进缓存将节省每个数据节点上的读取时间。其次,它避免了Hive查询中的倾斜连接,因为每个数据块的连接操作已经在Map阶段完成了。设置下面的属性启用自动Map连接属性。

  1. 设置自动选择 Mapjoin
set hive.auto.convert.join = true; 默认为 true
1
  1. 大表小表的阈值设置(默认 25M 以下认为是小表):
set hive.mapjoin.smalltable.filesize = 25000000;
1

# 桶Map连接

如果连接中使用的表是按特定列分桶的,可以开启桶Map连接提升性能。

<property>
    <name>hive.optimize.bucketmapjoin</name>
    <value>true</value>
</property>
<property>
    <name>hive.optimize.bucketmapjoin.sortedmerge</name>
    <value>true</value>
</property>
1
2
3
4
5
6
7
8

说明:

  • hive.optimize.bucketmapjoin:是否尝试桶Map连接。
  • hive.optimize.bucketmapjoin.sortedmerge:是否尝试在Map连接中使用归并排序。

# 四、Group by 优化

默认情况下,Map 阶段同一 Key 数据分发给一个 reduce,当一个 key 数据过大时就倾斜了。并不是所有的聚合操作都需要在 Reduce 端完成,很多聚合操作都可以先在 Map 端进行 部分聚合,最后在 Reduce 端得出最终结果。

  1. 是否在 Map 端进行聚合,默认为 True
set hive.map.aggr = true
1
  1. 在 Map 端进行聚合操作的条目数目
set hive.groupby.mapaggr.checkinterval = 100000
1
  1. 有数据倾斜的时候进行负载均衡(默认是 false)
set hive.groupby.skewindata = true
1

当选项设定为 true,生成的查询计划会有两个 MR Job。第一个 MR Job 中,Map 的输出 结果会随机分布到 Reduce 中,每个 Reduce 做部分聚合操作,并输出结果,这样处理的结果 是相同的 Group By Key 有可能被分发到不同的 Reduce 中,从而达到负载均衡的目的;第二 个 MR Job 再根据预处理的数据结果按照 Group By Key 分布到 Reduce 中(这个过程可以保证 相同的 Group By Key 被分布到同一个 Reduce 中),最后完成最终的聚合操作。

# 五、 Count(Distinct) 去重统计

数据量小的时候无所谓,数据量大的情况下,由于 COUNT DISTINCT 操作需要用一个 Reduce Task 来完成,这一个 Reduce 需要处理的数据量太大,就会导致整个 Job 很难完成, 一般 COUNT DISTINCT 使用先 GROUP BY 再 COUNT 的方式替换,但是需要注意 group by 造成 的数据倾斜问题.

# 六、合理设置 Map 及 Reduce 数

  1. 通常情况下,作业会通过 input 的目录产生一个或者多个 map 任务。 主要的决定因素有:input 的文件总个数,input 的文件大小,集群设置的文件块大小。
  2. 是不是 map 数越多越好? 答案是否定的。如果一个任务有很多小文件(远远小于块大小 128m),则每个小文件也会被当做一个块,用一个 map 任务来完成,而一个 map 任务启动和初始化的时间远远大 于逻辑处理的时间,就会造成很大的资源浪费。而且,同时可执行的 map 数是受限的。
  3. 是不是保证每个 map 处理接近 128m 的文件块,就高枕无忧了? 答案也是不一定。比如有一个 127m 的文件,正常会用一个 map 去完成,但这个文件只 有一个或者两个小字段,却有几千万的记录,如果 map 处理的逻辑比较复杂,用一个 map 任务去做,肯定也比较耗时。

针对上面的问题 2 和 3,我们需要采取两种方式来解决:即减少 map 数和增加 map 数;

# 6.1 复杂文件增加 Map 数

当 input 的文件都很大,任务逻辑复杂,map 执行非常慢的时候,可以考虑增加 Map 数, 来使得每个 map 处理的数据量减少,从而提高任务的执行效率。 增加 map 的方法为:根据computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))=blocksize=128M 公式, 调整 maxSize 最大值。让 maxSize 最大值低于 blocksize 就可以增加 map 的个数。

-- 实例
hive (default)> select count(*) from emp;
Hadoop job information for Stage-1: number of mappers: 1; number of
reducers: 1
-- 设置最大切片值为 100 个字节
hive (default)> set mapreduce.input.fileinputformat.split.maxsize=100;
hive (default)> select count(*) from emp;
Hadoop job information for Stage-1: number of mappers: 6; number of
reducers: 1
1
2
3
4
5
6
7
8
9

# 6.2 小文件进行合并

  1. 在 map 执行前合并小文件,减少 map 数:CombineHiveInputFormat 具有对小文件进行合并的功能(系统默认的格式)。HiveInputFormat 没有对小文件合并功能。
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat
1
  1. 在 Map-Reduce 的任务结束时合并小文件的设置:
-- 在 map-only 任务结束时合并小文件,默认 true
SET hive.merge.mapfiles = true;
-- 在 map-reduce 任务结束时合并小文件,默认 false
SET hive.merge.mapredfiles = true;
-- 合并文件的大小,默认 256M
SET hive.merge.size.per.task = 268435456;
-- 当输出文件的平均大小小于该值时,启动一个独立的 map-reduce 任务进行文件 merge
SET hive.merge.smallfiles.avgsize = 16777216;
1
2
3
4
5
6
7
8

# 6.3 合理设置 Reduce 数

  1. 调整 reduce 个数方法一
-- 每个 Reduce 处理的数据量默认是 256MB
hive.exec.reducers.bytes.per.reducer=256000000
-- 每个任务最大的 reduce 数,默认为 1009
hive.exec.reducers.max=1009
-- 计算 reducer 数的公式
N=min(参数 2,总输入数据量/参数 1)
1
2
3
4
5
6
  1. 调整 reduce 个数方法二
-- 在 hadoop 的 mapred-default.xml 文件中修改设置每个 job 的 Reduce 个数
set mapreduce.job.reduces = 15;
1
2
  1. reduce 个数并不是越多越好
    (1)过多的启动和初始化 reduce 也会消耗时间和资源; (2)另外,有多少个 reduce,就会有多少个输出文件,如果生成了很多个小文件,那 么如果这些小文件作为下一个任务的输入,则也会出现小文件过多的问题; 在设置 reduce 个数的时候也需要考虑这两个原则:处理大数据量利用合适的 reduce 数; 使单个 reduce 任务处理数据量大小要合适;

# 七、启用并行执行

Hive 会将一个查询转化成一个或者多个阶段。这样的阶段可以是 MapReduce 阶段、抽 样阶段、合并阶段、limit 阶段。或者 Hive 执行过程中可能需要的其他阶段。默认情况下, Hive 一次只会执行一个阶段。不过,某个特定的 job 可能包含众多的阶段,而这些阶段可能 并非完全互相依赖的,也就是说有些阶段是可以并行执行的,这样可能使得整个 job 的执行 时间缩短。不过,如果有更多的阶段可以并行执行,那么 job 可能就越快完成。通过设置参数 hive.exec.parallel 值为 true,就可以开启并发执行。不过,在共享集群中, 需要注意下,如果 job 中并行阶段增多,那么集群利用率就会增加。

set hive.exec.parallel=true; //打开任务并行执行
set hive.exec.parallel.thread.number=16; //同一个 sql 允许最大并行度,默认为8
1
2

当然,得是在系统资源比较空闲的时候才有优势,否则,没资源,并行也起不来。

# 八、启用MapReduce严格模式

Hive提供了一个严格模式,可以防止用户执行那些可能产生负面影响的查询。通过设置下面的属性启用MapReduce严格模式。

<property>
    <name>hive.mapred.mode</name>
    <value>strict</value>
</property>
1
2
3
4

严格模式禁止3种类型的查询。

  • 对于分区表,where子句中不包含分区字段过滤条件的查询语句不允许执行。
  • 对于使用了order by子句的查询,要求必须使用limit子句,否则不允许执行。
  • 限制笛卡尔积查询。

# 九、启用向量化

向量化特性在Hive 0.13.1版本中被首次引入。通过查询执行向量化,使Hive从单行处理数据改为批量处理方式,具体来说是一次处理1024行而不是原来的每次只处理一行,这大大提升了指令流水线和缓存的利用率,从而提高了表扫描、聚合、过滤和连接等操作的性能。可以设置下面的属性启用查询执行向量化。

<property>
    <name>hive.vectorized.execution.enabled</name>
    <value>true</value>
</property>
<property>
    <name>hive.vectorized.execution.reduce.enabled</name>
    <value>true</value>
</property>
<property>
    <name>hive.vectorized.execution.reduce.groupby.enabled</name>
    <value>true</value>
</property>
1
2
3
4
5
6
7
8
9
10
11
12

说明:

  • hive.vectorized.execution.enabled:如果该标志设置为true,则开启查询执行的向量模式,默认值为false。
  • hive.vectorized.execution.reduce.enabled:如果该标志设置为true,则开启查询执行reduce端的向量模式,默认值为true。
  • hive.vectorized.execution.reduce.groupby.enabled:如果该标志设置为true,则开启查询执行reduce端group by操作的向量

# 十、内存溢出相关设置

set hive.exec.parallel=true;
set mapred.max.split.size=128000000;
set mapreduce.map.memory.mb=6144;
set mapreduce.map.java.opts=-Xmx6144m;
set hive.exec.reducers.bytes.per.reducer=536870912;
set mapreduce.reduce.memory.mb=8192;
set mapreduce.reduce.java.opts=-Xmx8192m;
1
2
3
4
5
6
7

# 十一、避免使用order by全局排序

Hive中使用order by子句实现全局排序。order by只用一个Reducer产生结果,对于大数据集,这种做法效率很低。如果不需要全局有序,则可以使用sort by子句,该子句为每个reducer生成一个排好序的文件。如果需要控制一个特定数据行流向哪个reducer,可以使用distribute by子句,例如:

select id, name, salary, dept from employee
distribute by dept sort by id asc, name desc;
1
2

属于一个dept的数据会分配到同一个reducer进行处理,同一个dept的所有记录按照id、name列排序。最终的结果集是全局有序的。

我们也可以使用CLUSTER BY它具备DISTRIBUTE BY 和 SORT BY的功能,但是只能正序排序。无法指定倒序。算是上面查询的精简版。

SELECT s.emp_id,s.emp_name,s.emp_mobile,s.city
FROM employee3 s
CLUSTER BY s.city;
1
2
3

# 十二、优化limit操作

默认时limit操作仍然会执行整个查询,然后返回限定的行数。在有些情况下这种处理方式很浪费,因此可以通过设置下面的属性避免此行为。

<property>
    <name>hive.limit.optimize.enable</name>
    <value>true</value>
</property>
<property>
    <name>hive.limit.row.max.size</name>
    <value>100000</value>
</property>
<property>
    <name>hive.limit.optimize.limit.file</name>
    <value>10</value>
</property>
<property>
    <name>hive.limit.optimize.fetch.max</name>
    <value>50000</value>
</property>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

说明:

  • hive.limit.optimize.enable:是否启用limit优化。当使用limit语句时,对源数据进行抽样。
  • hive.limit.row.max.size:在使用limit做数据的子集查询时保证的最小行数据量。
  • hive.limit.optimize.limit.file:在使用limit做数据子集查询时,采样的最大文件数。
  • hive.limit.optimize.fetch.max:使用简单limit数据抽样时,允许的最大行数。

# 十三、分区表

分区表实际上就是对应一个 HDFS 文件系统上的独立的文件夹,该文件夹下是该分区所 有的数据文件。 Hive 中的分区就是分目录,把一个大的数据集根据业务需要分割成小的数据集。 在查询时通过 WHERE 子句中的表达式选择查询所需要的指定的分区,这样的查询效率 会提高很多,所以我们需要把常常用在 WHERE 语句中的字段指定为表的分区字段。

# 13.1 创建分区表

在 Hive 中可以使用 PARTITIONED BY 子句创建分区表。表可以包含一个或多个分区列,程序会为分区列中的每个不同值组合创建单独的数据目录。下面的我们创建一张雇员表作为测试:

 CREATE EXTERNAL TABLE emp_partition(
    empno INT,
    ename STRING,
    job STRING,
    mgr INT,
    hiredate TIMESTAMP,
    sal DECIMAL(7,2),
    comm DECIMAL(7,2)
    )
    PARTITIONED BY (deptno INT)   -- 按照部门编号进行分区
    ROW FORMAT DELIMITED FIELDS TERMINATED BY "\t"
    LOCATION '/hive/emp_partition';
1
2
3
4
5
6
7
8
9
10
11
12

# 13.2 指定分区导入

LOAD DATA LOCAL INPATH 'input/hive/partitions/file1'
INTO TABLE logs
PARTITION (dt='2001-01-01', country='GB');
1
2
3

# 13.3 显示分区信息

show partitions logs;
1

# 13.4 修复分区表

msck repair table order_created_partition;
1

# 13.5 添加分区

ALTER TABLE page_views ADD IF NOT EXISTS
PARTITION (dt=2013-09-09, applicationtype=‘iPhone’) LOCATION ‘/somewhere/on/hdfs/data/iphone/currentPARTITION (dt=2013-09-08, applicationtype=‘iPhone’) LOCATION ‘/somewhere/on/hdfs/data/prev1/iphone’;
-- 增加多个分区
alter table dept_partition add partition(day='20200405') partition(day='20200406');
1
2
3
4
5

# 13.6 删除分区

ALTER TABLE log_messages DROP IF EXISTS PARTITION(year=2015,month=1,day=2);
-- 同时删除多个分区
alter table dept_partition drop partition (day='20200404'), partition(day='20200405');
1
2
3

# 13.7 查看分区表有多少分区

show partitions dept_partition;
1

# 13.8 分区限制查询

ALTER TABLE log_messages
PARTITION(year=2015,month=1,day=1)
ENABLE OFFLINE;
1
2
3

# 13.9 防止删除分区

ALTER TABLE log_messages
PARTITION(year=2015,month=1,day-1)
ENABLE NO_DROP;
1
2
3

# 13.10 修改某分区的文件格式

ALTER TABLE XXX PARTITION (EVENT_MONTH='2014-06') SET FILEFORMAT TEXTFILE;
1

# 13.11 动态分区

关系型数据库中,对分区表 Insert 数据时候,数据库自动会根据分区字段的值,将数据 插入到相应的分区中,Hive 中也提供了类似的机制,即动态分区(Dynamic Partition),只不过, 使用 Hive 的动态分区,需要进行相应的配置。

  1. 开启动态分区参数设置
-- 开启动态分区功能(默认 true,开启)
set hive.exec.dynamic.partition=true;
-- 设置为非严格模式(动态分区的模式,默认 strict,表示必须指定至少一个分区为 静态分区,nonstrict 模式表示允许所有的分区字段都可以使用动态分区。)
set hive.exec.dynamic.partition.mode=nonstrict;
-- 在所有执行 MR 的节点上,最大一共可以创建多少个动态分区。默认 1000
set hive.exec.max.dynamic.partitions=1000;
-- 在每个执行 MR 的节点上,最大可以创建多少个动态分区。
-- 该参数需要根据实际的数据来设定。比如:源数据中包含了一年的数据,即 day 字段有 365 个值,那么该参数就需要设置成大于 365,如果使用默认值 100,则会报错。
set hive.exec.max.dynamic.partitions.pernode=100
-- 整个 MR Job 中,最大可以创建多少个 HDFS 文件。默认 100000
set hive.exec.max.created.files=100000
-- 当有空分区生成时,是否抛出异常。一般不需要设置。默认 false
set hive.error.on.empty.partition=false
-- 开启该参数的话,分区列会全局排序,使得reduce端每个分区只有一个文件写入,降低reduce的内存压力。
set hive.optimize.sort.dynamic.partition=true;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
  1. 案例实操
-- 创建目标分区表
hive (default)> create table dept_partition_dy(id int, name string)
partitioned by (loc int) row format delimited fields terminated by '\t';
-- 设置动态分区
hive (default)> set hive.exec.dynamic.partition.mode = nonstrict;
hive (default)> insert into table dept_partition_dy partition(loc) select
deptno, dname, loc from dept;
-- 查看目标分区表的分区情况
hive (default)> show partitions dept_partition;
1
2
3
4
5
6
7
8
9

# 十四、分桶表

分区提供一个隔离数据和优化查询的便利方式。不过,并非所有的数据集都可形成合理 的分区。对于一张表或者分区,Hive 可以进一步组织成桶,也就是更为细粒度的数据范围 划分。 分桶是将数据集分解成更容易管理的若干部分的另一个技术。分区针对的是数据的存储 路径,分桶针对的是数据文件。

# 14.1 创建分桶表

在 Hive 中,我们可以通过 CLUSTERED BY 指定分桶列,并通过 SORTED BY 指定桶中数据的排序参考列。下面为分桶表建表语句示例:

  CREATE EXTERNAL TABLE emp_bucket(
    empno INT,
    ename STRING,
    job STRING,
    mgr INT,
    hiredate TIMESTAMP,
    sal DECIMAL(7,2),
    comm DECIMAL(7,2),
    deptno INT)
    CLUSTERED BY(empno) SORTED BY(empno ASC) INTO 4 BUCKETS  --按照员工编号散列到四个 bucket 中
    ROW FORMAT DELIMITED FIELDS TERMINATED BY "\t"
    LOCATION '/hive/emp_bucket';
1
2
3
4
5
6
7
8
9
10
11
12

# 14.2 加载数据到分桶表

这里直接使用 Load 语句向分桶表加载数据,数据时可以加载成功的,但是数据并不会分桶。

load data inpath '/read_state.csv' into table input_table;
1

这是由于分桶的实质是对指定字段做了 hash 散列然后存放到对应文件中,这意味着向分桶表中插入数据是必然要通过 MapReduce,且 Reducer 的数量必须等于分桶的数量。由于以上原因,分桶表的数据通常只能使用 CTAS(CREATE TABLE AS SELECT) 方式插入,因为 CTAS 操作会触发 MapReduce。加载数据步骤如下:

# 1. 设置强制分桶

set hive.enforce.bucketing = true; --Hive 2.x 不需要这一步
1

在 Hive 0.x and 1.x 版本,必须使用设置 hive.enforce.bucketing = true,表示强制分桶,允许程序根据表结构自动选择正确数量的 Reducer 和 cluster by column 来进行分桶。 属性hive.enforce.bucketing = true与分区中的hive.exec.dynamic.partition = true属性类似,当我们启用该属性时,动态桶倒入会开启。

# 2. CTAS导入数据

INSERT INTO TABLE emp_bucket SELECT *  FROM emp;  --这里的 emp 表就是一张普通的雇员表
1

可以从执行日志看到 CTAS 触发 MapReduce 操作,且 Reducer 数量和建表时候指定 bucket 数量一致:

# 14.3 查询样本数据

1.返回1/4的桶的数据

SELECT * FROM bucketed_users TABLESAMPLE(BUCKET 1 OUT OF 4 ON id);

2.返回1/2的桶的数据

SELECT * FROM bucketed_users TABLESAMPLE(BUCKET 1 OUT OF 2 ON id);

3.使用rand() 函数对于没有划分成桶的表进行取样,即使只需要读取很小一部分样本,也要扫描整个输入数据集:

SELECT * FROM users TABLESAMPLE(BUCKET 1 OUT OF 4 ON rand());

# 14.4 指定桶大小

set mapred.reduce.tasks=64;
INSERT OVERWRITE TABLE t1
SELECT a,b,c FROM t2 CLUSTER BY b;
1
2
3

# 十六、CBO 优化

join 的时候表的顺序的关系:前面的表都会被加载到内存中。后面的表进行磁盘扫描

select a.*, b.*, c.* from a join b on a.id = b.id join c on a.id = c.id;

Hive 自 0.14.0 开始,加入了一项 "Cost based Optimizer" 来对 HQL 执行计划进行优化, 这个功能通过 "hive.cbo.enable" 来开启。在 Hive 1.1.0 之后,这个 feature 是默认开启的, 它可以 自动优化 HQL 中多个 Join 的顺序,并选择合适的 Join 算法。 CBO,成本优化器,代价最小的执行计划就是最好的执行计划。传统的数据库,成本优 化器做出最优化的执行计划是依据统计信息来计算的。 Hive 的成本优化器也一样,Hive 在提供最终执行前,优化每个查询的执行逻辑和物理 执行计划。这些优化工作是交给底层来完成的。根据查询成本执行进一步的优化,从而产生 潜在的不同决策:如何排序连接,执行哪种类型的连接,并度等等。 要使用基于成本的优化(也称为 CBO),请在查询开始设置以下参数:

set hive.cbo.enable=true;
set hive.compute.query.using.stats=true;
set hive.stats.fetch.column.stats=true;
set hive.stats.fetch.partition.stats=true;
1
2
3
4

# 十七 谓词下推

将 SQL 语句中的 where 谓词逻辑都尽可能提前执行,减少下游处理的数据量。对应逻 辑优化器是 PredicatePushDown,配置项为 hive.optimize.ppd,默认为 true。

-- 打开谓词下推优化属性
hive (default)> set hive.optimize.ppd = true; #谓词下推,默认是 true
-- 查看先关联两张表,再用 where 条件过滤的执行计划
hive (default)> explain select o.id from bigtable b join bigtable o on o.id = b.id where o.id <= 10;
--  查看子查询后,再关联表的执行计划
hive (default)> explain select b.id from bigtable b join (select id from bigtable where id <= 10) o on b.id = o.id;
1
2
3
4
5
6

# 十八 大表、大表 SMB(Sort Merge Bucket Join) Join

  1. 创建大表
create table bigtable2(
  id bigint,
   t bigint,
   uid string,
   keyword string,
   url_rank int,
   click_num int,
   click_url string)
row format delimited fields terminated by '\t';
load data local inpath '/opt/module/data/bigtable' into table bigtable2;
1
2
3
4
5
6
7
8
9
10
  1. 测试大表直接 JOIN
insert overwrite table jointable
select b.id, b.t, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url
from bigtable a
join bigtable2 b
on a.id = b.id;
1
2
3
4
5

测试结果 Time taken: 72.289 seconds

  1. 创建分通表 1
create table bigtable_buck1(
   id bigint,
   t bigint,
   uid string,
   keyword string,
   url_rank int,
   click_num int,
   click_url string)
clustered by(id)
sorted by(id)
into 6 buckets
row format delimited fields terminated by '\t';
load data local inpath '/opt/module/data/bigtable' into table
bigtable_buck1;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
  1. 创建分通表 2 分桶数和第一张表的分桶数为倍数关系
 create table bigtable_buck2(
   id bigint,
   t bigint,
   uid string,
   keyword string,
   url_rank int,
   click_num int,
   click_url string)
clustered by(id)
sorted by(id)
into 6 buckets
row format delimited fields terminated by '\t';
load data local inpath '/opt/module/data/
1
2
3
4
5
6
7
8
9
10
11
12
13
  1. 设置参数
set hive.optimize.bucketmapjoin = true;
set hive.optimize.bucketmapjoin.sortedmerge = true;
set hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
1
2
3
  1. 测试 Time taken: 34.685 seconds
insert overwrite table jointable
select b.id, b.t, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url
from bigtable_buck1 s
join bigtable_buck2 b
on b.id = s.id;
1
2
3
4
5

# 十五、常见错误及解决方案

# 15.1 JVM 堆内存溢出

描述:java.lang.OutOfMemoryError: Java heap space

解决:在 yarn-site.xml 中调整如下代码

<property>
    <name>yarn.scheduler.maximum-allocation-mb</name>
    <value>2048</value>
</property>
<property>
    <name>yarn.scheduler.minimum-allocation-mb</name>
    <value>2048</value>
</property>
<property>
    <name>yarn.nodemanager.vmem-pmem-ratio</name>
    <value>2.1</value>
</property>
<property>
    <name>mapred.child.java.opts</name>
    <value>-Xmx1024m</value>
</property>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

# 15.2 orc格式存储的job无故报错

set hive.vectorized.execution.enabled=false;
set hive.auto.convert.join=false;
1
2
更新时间: 1/20/2022, 6:52:58 PM