基本架构
用户接口: shell/CLI, jdbc/odbc, webui Command Line Interface
跨语言服务 : thrift server 提供了一种能力,让用户可以使用多种不同的语言来操纵hive
Driver Driver 组件完成 HQL 查询语句从词法分析,语法分析,编译,优化,以及生成逻辑执行 计划的生成。生成的逻辑执行计划存储在 HDFS 中,并随后由 MapReduce 调用执行 Hive 的核心是驱动引擎, 驱动引擎由四部分组成:
解释器:解释器的作用是将 HiveSQL 语句转换为抽象语法树(AST)
编译器:编译器是将语法树编译为逻辑执行计划
优化器:优化器是对逻辑执行计划进行优化
执行器:执行器是调用底层的运行框架执行逻辑执行计划
元数据存储系统 : RDBMS MySQL 元数据,通俗的讲,就是存储在 Hive 中的数据的描述信息。 Hive 中的元数据通常包括:表的名字,表的列和分区及其属性,表的属性(内部表和 外部表),表的数据所在目录 Metastore 默认存在自带的 Derby 数据库中。缺点就是不适合多用户操作,并且数据存 储目录不固定。数据库跟着 Hive 走,极度不方便管理 解决方案:通常存我们自己创建的 MySQL 库(本地 或 远程) Hive 和 MySQL 之间通过 MetaStore 服务交互
执行流程:
HiveQL 通过命令行或者客户端提交,经过 Compiler 编译器,运用 MetaStore 中的元数 据进行类型检测和语法分析,生成一个逻辑方案(Logical Plan),然后通过的优化处理,产生 一个 MapReduce 任务。
数据组织
Hive 的存储结构包括数据库、表、视图、分区和表数据等。数据库,表,分区等等都对 应 HDFS 上的一个目录。表数据对应 HDFS 对应目录下的文件。
Hive 中所有的数据都存储在 HDFS 中,没有专门的数据存储格式,因为 Hive 是读模式 (Schema On Read),可支持 TextFile,SequenceFile,RCFile 或者自定义格式等
只需要在创建表的时候告诉 Hive 数据中的列分隔符和行分隔符,Hive 就可以解析数据 Hive 的默认列分隔符:控制符 Ctrl + A,\x01 Hive 的 Hive 的默认行分隔符:换行符 \n
Hive 中包含以下数据模型: database:在 HDFS 中表现为${hive.metastore.warehouse.dir}目录下一个文件夹 table:在 HDFS 中表现所属 database 目录下一个文件夹 external table:与 table 类似,不过其数据存放位置可以指定任意 HDFS 目录路径 partition:在 HDFS 中表现为 table 目录下的子目录 bucket:在 HDFS 中表现为同一个表目录或者分区目录下根据某个字段的值进行 hash 散 列之后的多个文件 view:与传统数据库类似,只读,基于基本表创建
Hive 的元数据存储在 RDBMS 中,除元数据外的其它所有数据都基于 HDFS 存储。默认情 况下,Hive 元数据保存在内嵌的 Derby 数据库中,只能允许一个会话连接,只适合简单的 测试。实际生产环境中不适用,为了支持多用户会话,则需要一个独立的元数据库,使用 MySQL 作为元数据库,Hive 内部对 MySQL 提供了很好的支持。
Hive 中的表分为内部表、外部表、分区表和 Bucket 表 外部表示hive 对存储在 HDFS 上的数据提供了一种新的抽象。而不是管理存储在 HDFS 上的数据。所以不管创建内部 表还是外部表,都可以对 hive 表的数据存储目录中的数据进行增删操作。
内部表和外部表的区别:
删除内部表,删除表元数据和数据
删除外部表,删除元数据,不删除数据
内部表和外部表的使用选择:
大多数情况,他们的区别不明显,如果数据的所有处理都在 Hive 中进行,那么倾向于 选择内部表,但是如果 Hive 和其他工具要针对相同的数据集进行处理,外部表更合适。
使用外部表访问存储在 HDFS 上的初始数据,然后通过 Hive 转换数据并存到内部表中
使用外部表的场景是针对一个数据集有多个不同的 Schema
分区表和分桶表的区别: Hive 数据表可以根据某些字段进行分区操作,细化数据管理,可以让部分查询更快。同 时表和分区也可以进一步被划分为 Buckets,分桶表的原理和 MapReduce 编程中的 HashPartitioner 的原理类似。 分区和分桶都是细化数据管理,但是分区表是手动添加区分,由于 Hive 是读模式,所 以对添加进分区的数据不做模式校验,分桶表中的数据是按照某些分桶字段进行 hash 散列 形成的多个文件,所以数据的准确性也高很多
元数据表 meta表
hive版本 VERSION表,记录hive版本
表名
说明
字段
DBS
该表存储Hive中所有数据库的基本信息
数据库ID,数据库描述,数据库HDFS路径,数据库名,数据库所有者用户名,所有者角色
DATABASE_PARAMS
该表存储数据库的相关参数,在CREATE DATABASE时候用WITH DBPROPERTIES (property_name=property_value, …)指定的参数
数据库ID,参数名,参数值
DBS和DATABASE_PARAMS这两张表通过DB_ID字段关联。
表名
说明
字段
TBLS
该表中存储Hive表、视图、索引表的基本信息
表ID,创建时间,数据库ID,上次访问时间,所有者,保留字段,序列化配置信息,表名,表类型,视图的详细HQL语句,视图的原始HQL语句
TABLE_PARAMS
该表存储表/视图的属性信息
表ID,属性名,属性值
TBL_PRIVS
该表存储表/视图的授权信息
授权ID,授权时间,授权执行用户,授权者类型,被授权用户,被授权用户类型,权限,表ID
文件存储信息相关的元数据表 由于HDFS支持的文件格式很多,而建Hive表时候也可以指定各种文件格式,Hive在将HQL解析成MapReduce时候,需要知道去哪里,使用哪种格式去读写HDFS文件,而这些信息就保存在这几张表中。
表名
说明
字段
SDS
该表保存文件存储的基本信息,如INPUT_FORMAT、OUTPUT_FORMAT、是否压缩等。TBLS表中的SD_ID与该表关联,可以获取Hive表的存储信息
SD_PARAMS
该表存储Hive存储的属性信息,在创建表时候使用STORED BY ‘storage.handler.class.name’ [WITH SERDEPROPERTIES (…)指定
SERDES
该表存储序列化使用的类信息
SERDE_PARAMS
该表存储序列化的一些属性、格式信息,比如:行、列分隔符
表字段相关的元数据表
表名
说明
字段
COLUMNS_V2
该表存储表对应的字段信息
表分区相关的元数据表
表名
说明
字段
PARTITIONS
该表存储表分区的基本信息
PARTITION_KEYS
该表存储分区的字段信息
PARTITION_KEY_VALS
该表存储分区字段值
PARTITION_PARAMS
该表存储分区的属性信息
数据类型与存储格式 基本数据类型
类型
描述
示例
boolean
true/false
TRUE
tinyint
1字节的有符号整数
-128~127 1Y
smallint
2个字节的有符号整数,-32768~32767
1S
int
4个字节的带符号整数
1
bigint
8字节带符号整数
1L
float
4字节单精度浮点数
1.0
double
8字节双精度浮点数
1.0
deicimal
任意精度的带符号小数
1.0
String
字符串,变长
“a”,’b’
varchar
变长字符串
“a”,’b’
char
固定长度字符串
“a”,’b’
binary
字节数组
无法表示
timestamp
时间戳,纳秒精度
122327493795
date
日期
‘2018-04-07’
复杂数据类型
类型
描述
示例
array
有序的的同类型的集合
array(1,2)
map
key-value,key必须为原始类型,value可以任意类型
map(‘a’,1,’b’,2)
struct
字段集合,类型可以不同
struct(‘1’,1,1.0), named_stract(‘col1’,’1’,’col2’,1,’clo3’,1.0)
存储格式 Hive会为每个创建的数据库在HDFS上创建一个目录,该数据库的表会以子目录形式存储,表中的数据会以表目录下的文件形式存储。对于default数据库,默认的缺省数据库没有自己的目录,default数据库下的表默认存放在/user/hive/warehouse目录下。
textfile textfile为默认格式,存储方式为行存储。数据不做压缩,磁盘开销大,数据解析开销大。
SequenceFile SequenceFile是Hadoop API提供的一种二进制文件支持,其具有使用方便、可分割、可压缩的特点。 SequenceFile支持三种压缩选择:NONE, RECORD, BLOCK。 Record压缩率低,一般建议使用BLOCK压缩。
RCFile 一种行列存储相结合的存储方式。
ORCFile 数据按照行分块,每个块按照列存储,其中每个块都存储有一个索引。hive给出的新格式,属于RCFILE的升级版,性能有大幅度提升,而且数据可以压缩存储,压缩快 快速列存取。
Parquet Parquet也是一种行式存储,同时具有很好的压缩性能;同时可以减少大量的表扫描和反序列化的时间。
数据格式 当数据存储在文本文件中,必须按照一定格式区别行和列,并且在Hive中指明这些区分符。Hive默认使用了几个平时很少出现的字符,这些字符一般不会作为内容出现在记录中。
Hive默认的行和列分隔符如下表所示。
分隔符
描述
\n
对于文本文件来说,每行是一条记录,所以\n 来分割记录
^A (Ctrl+A)
分割字段,也可以用\001 来表示
^B (Ctrl+B)
用于分割 Arrary 或者 Struct 中的元素,或者用于 map 中键值之间的分割,也可以用\002 分割。
^C
用于 map 中键和值自己分割,也可以用\003 表示。
DDL hive ddl
数据库DDL
创建库
查看库
删除库
切换库
表DDL 创建表:
1 2 3 4 5 6 7 8 9 CREATE [EXTERNAL] TABLE [IF NOT EXISTS] table_name [(col_name data_type [COMMENT col_comment], ...)] [COMMENT table_comment] [PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)] [CLUSTERED BY (col_name, col_name, ...) [SORTED BY (col_name [ASC|DESC], ...)] INTO num_buckets BUCKETS] [ROW FORMAT row_format] [STORED AS file_format] [LOCATION hdfs_path]
CREATE TABLE 创建一个指定名字的表。如果相同名字的表已经存在,则抛出异常;用户可以用 IF NOT EXIST 选项来忽略这个异常
EXTERNAL 关键字可以让用户创建一个外部表,在建表的同时指定一个指向实际数据的路径(LOCATION)
LIKE 允许用户复制现有的表结构,但是不复制数据
COMMENT可以为表与字段增加描述
PARTITIONED BY 指定分区
ROW FORMAT DELIMITED [FIELDS TERMINATED BY char] [COLLECTION ITEMS TERMINATED BY char] MAP KEYS TERMINATED BY char] [LINES TERMINATED BY char] | SERDE serde_name [WITH SERDEPROPERTIES (property_name=property_value, property_name=property_value, …)] 用户在建表的时候可以自定义 SerDe 或者使用自带的 SerDe。如果没有指定 ROW FORMAT 或者 ROW FORMAT DELIMITED,将会使用自带的 SerDe。在建表的时候, 用户还需要为表指定列,用户在指定表的列的同时也会指定自定义的 SerDe,Hive 通过 SerDe 确定表的具体的列的数据。
STORED AS SEQUENCEFILE //序列化文件 | TEXTFILE //普通的文本文件格式 | RCFILE //行列存储相结合的文件 | INPUTFORMAT input_format_classname OUTPUTFORMAT output_format_classname //自定义文件格式 如果文件数据是纯文本,可以使用 STORED AS TEXTFILE。如果数据需要压缩,使用 STORED AS SEQUENCE 。
LOCATION指定表在HDFS的存储路径
查看表
修改表
删除表
清空表
函数&脚本 内置函数 内置函数
窗口函数 hive窗口函数 _
窗口函数是用于分析用的一类函数,要理解窗口函数要先从聚合函数说起。 大家都知道聚合函数是将某列中多行的值合并为一行,比如sum、count等。 而窗口函数则可以在本行内做运算,得到多行的结果,即每一行对应一行的值。 通用的窗口函数可以用下面的语法来概括:
Function() Over (Partition By Column1,Column2,Order By Column3)
窗口函数又分为以下三类: 聚合型窗口函数 分析型窗口函数 * 取值型窗口函数
聚合型窗口函数 聚合型即SUM(), MIN(),MAX(),AVG(),COUNT()这些常见的聚合函数。 聚合函数配合窗口函数使用可以使计算更加灵活
分析型窗口函数 分析型即RANk(),ROW_NUMBER(),DENSE_RANK()等常见的排序用的窗口函数
row_number函数:生成连续的序号(相同元素序号相同)
rank函数:如两元素排序相同则序号相同,并且会跳过下一个序号
desrank函数:如两元素排序相同则序号相同,不会跳过下一个序号
取值型窗口函数 这几个函数可以通过字面意思记得,LAG是迟滞的意思,也就是对某一列进行往后错行;LEAD是LAG的反义词,也就是对某一列进行提前几行;FIRST_VALUE是对该列到目前为止的首个值,而LAST_VALUE是到目前行为止的最后一个值。
LAG()和LEAD() 可以带3个参数,第一个是返回的值,第二个是前置或者后置的行数,第三个是默认值。
窗口大小 关键是理解 ROWS BETWEEN 含义,也叫做window子句:
PRECEDING:往前
FOLLOWING:往后
CURRENT ROW:当前行
UNBOUNDED:无边界,UNBOUNDED PRECEDING 表示从最前面的起点开始, UNBOUNDED FOLLOWING:表示到最后面的终点
–其他AVG,MIN,MAX,和SUM用法一样
自定义函数 当 Hive 提供的内置函数无法满足业务处理需要时,此时就可以考虑使用用户自定义函数。
UDF(user-defined function)作用于单个数据行,产生一个数据行作为输出。(数学函数,字 符串函数)
UDAF(用户定义聚集函数 User- Defined Aggregation Funcation):接收多个输入数据行,并产 生一个输出数据行。(count,max)
UDTF(表格生成函数 User-Defined Table Functions):接收一行输入,输出多行(explode)
1 2 3 4 5 6 7 8 9 10 11 import org.apache.hadoop.hive.ql.exec.UDF; public class ToLowerCase extends UDF{ // 必须是 public,并且 evaluate 方法可以重载 public String evaluate(String field) { String result = field.toLowerCase(); return result; } }
Hive 的 TRANSFORM 关键字提供了在 SQL 中调用自写脚本的功能。适合实现 Hive 中没有的 功能又不想写 UDF 的情况
1 2 3 4 5 6 7 8 #!/bin/python import sys import datetime for line in sys.stdin: line = line.strip() movie,rate,unixtime,userid = line.split('\t') weekday = datetime.datetime.fromtimestamp(float(unixtime)).isoweekday() print '\t'.join([movie, rate, str(weekday),userid])
1 2 3 hive>add file /home/hadoop/weekday_mapper.py; hive> insert into table lastjsontable select transform(movie,rate,unixtime,userid) using 'python weekday_mapper.py' as(movie,rate,weekday,userid) from rate;
数据倾斜 由于数据分布不均匀,造成数据大量的集中到一点,造成数据热点
hive框架的特性:
不怕数据大,怕数据倾斜
Jobs 数比较多的作业运行效率相对比较低,如子查询比较多
sum,count,max,min 等聚集函数,通常不会有数据倾斜问题
主要表现:
任务进度长时间维持在 99%或者 100%的附近,查看任务监控页面,发现只有少量 reduce 子任务未完成,因为其处理的数据量和其他的 reduce 差异过大。 单一 reduce 处理的记录数和平均记录数相差太大,通常达到好几倍之多,最长时间远大 于平均时长。
容易出现数据倾斜的情况:
group by 不和聚集函数搭配使用的时候
count(distinct),在数据量大的情况下,容易数据倾斜,因为 count(distinct)是按 group by 字段分组,按 distinct 字段排序
小表关联超大表 join
产生数据倾斜的原因
key 分布不均匀
业务数据本身的特性
建表考虑不周全
某些 HQL 语句本身就存在数据倾斜
解决办法 参数调节 hive.map.aggr=true
Map 端部分聚合,相当于Combiner
hive.groupby.skewindata=true
有数据倾斜的时候进行负载均衡,当选项设定为 true,生成的查询计划会有两个 MR Job。第一个 MR Job 中,Map 的输出结果集合会随机分布到 Reduce 中,每个 Reduce 做部分聚合操作,并输出结果,这样处理的结果是相同的 Group By Key 有可能被分发到不同的 Reduce 中,从而达到负载均衡的目的;第二个 MR Job 再根据预处理的数据结果按照 Group By Key 分布到 Reduce 中(这个过程可以保证相同的 Group By Key 被分布到同一个 Reduce 中),最后完成最终的聚合操作。
语句调节
如何Join: 关于驱动表的选取,选用join key分布最均匀的表作为驱动表 做好列裁剪和filter操作,以达到两表做join的时候,数据量相对变小的效果。
大小表Join: 使用mapjoin让小的维度表(1000条以下的记录条数)先进内存。在map端完成reduce. MAPJION会把小表全部读入内存中,在map阶段直接拿另外一个表的数据和内存中表数据做匹配,而普通的equality join则是类似于mapreduce模型中的file join,需要先分组,然后再reduce端进行连接,使用的时候需要结合着场景;由于mapjoin是在map是进行了join操作,省去了reduce的运行,效率也会高很多
大表Join大表: 把空值的key变成一个字符串加上随机数,把倾斜的数据分到不同的reduce上,由于null值关联不上,处理后并不影响最终结果。
count distinct大量相同特殊值 count distinct时,将值为空的情况单独处理,如果是计算count distinct,可以不用处理,直接过滤,在最后结果中加1。如果还有其他计算,需要进行group by,可以先将值为空的记录单独处理,再和其他计算结果进行union。
group by维度过小: 采用sum() group by的方式来替换count(distinct)完成计算。
特殊情况特殊处理: 在业务逻辑优化效果的不大情况下,有些时候是可以将倾斜的数据单独拿出来处理。最后union回去。
例子
空值产生的数据倾斜 在日志中,常会有信息丢失的问题,比如日志中的 user_id,如果取其中的 user_id 和用户表中的 user_id 相关联,就会碰到数据倾斜的问题。 解决方案 1:user_id 为空的不参与关联 解决方案 2:赋予空值新的 key 值
不同数据类型关联产生数据倾斜 用户表中 user_id 字段为 int,log 表中 user_id 为既有 string 也有 int 的类型, 当按照两个表的 user_id 进行 join 操作的时候,默认的 hash 操作会按照 int 类型的 id 进 行分配,这样就会导致所有的 string 类型的 id 就被分到同一个 reducer 当中 解决方案:把数字类型 id 转换成 string 类型的 id
大小表关联查询产生数据倾斜 使用map join解决小表关联大表造成的数据倾斜问题。这个方法使用的频率很高。
hive执行过程 概述
Hive 将 HQL 转换成一组操作符(Operator),比如 GroupByOperator, JoinOperator 等
操作符 Operator 是 Hive 的最小处理单元
每个操作符代表一个 HDFS 操作或者 MapReduce 作业
Hive 通过 ExecMapper 和 ExecReducer 执行 MapReduce 程序,执行模式有本地模式和分 布式两种模式
操作符类型
编译器的工作职责
Parser:将 HQL 语句转换成抽象语法树(AST:Abstract Syntax Tree)
Semantic Analyzer:将抽象语法树转换成查询块
Logic Plan Generator:将查询块转换成逻辑查询计划
Logic Optimizer:重写逻辑查询计划,优化逻辑执行计划。优化过程可能包括谓词下推(Predicate Push Down),分区剪裁(Partition Prunner),关联排序(Join Reorder)
Physical Plan Gernerator:将逻辑计划转化成物理计划(MapReduce Jobs)
Physical Optimizer:选择最佳的 Join 策略,优化物理执行计划。比如基于输入选择执行路径,增加备份作业等
优化器类型
上表中带①符号的,优化目的都是尽量将任务合并到一个 Job 中,以减少 Job 数量,带②的 优化目的是尽量减少 shuffle 数据量
Join Map:
以 JOIN ON 条件中的列作为 Key,如果有多个列,则 Key 是这些列的组合
以 JOIN 之后所关心的列作为 Value,当有多个列时,Value 是这些列的组合。在 Value 中还会包含表的 Tag 信息,用于标明此 Value 对应于哪个表
按照 Key 进行排序
Shuffle:
根据 Key 的值进行 Hash,并将 Key/Value 对按照 Hash 值推至不同对 Reduce 中
Reduce:
Reducer 根据 Key 值进行 Join 操作,并且通过 Tag 来识别不同的表中的数据
例子:
SELECT pv.pageid, u.age FROM page_view pv JOIN user u ON pv.userid = u.userid;
%!(EXTRA markdown.ResourceType=, string=, string=)
Group By 实例:
SELECT pageid, age, count(1) FROM pv_users GROUP BY pageid, age;
distinct 示例:
SELECT age, count(distinct pageid) FROM pv_users GROUP BY age;
按照 age 分组,然后统计每个分组里面的不重复的 pageid 有多少个
该 SQL 语句会按照 age 和 pageid 预先分组,进行 distinct 操作。然后会再按 照 age 进行分组,再进行一次 distinct 操作
优化 常用手段
好的模型设计事半功倍
解决数据倾斜问题
减少 job 数
设置合理的 MapReduce 的 task 数,能有效提升性能。(比如,10w+级别的计算,用 160个 reduce,那是相当的浪费,1 个足够)
了解数据分布,自己动手解决数据倾斜问题是个不错的选择。这是通用的算法优化,但 算法优化有时不能适应特定业务背景,开发人员了解业务,了解数据,可以通过业务逻辑精 确有效的解决数据倾斜问题
数据量较大的情况下,慎用 count(distinct),group by 容易产生倾斜问题
对小文件进行合并,是行之有效的提高调度效率的方法,假如所有的作业设置合理的文件数,对云梯的整体调度效率也会产生积极的正向影响
优化时把握整体,单个作业最优不如整体最优
排序选择
cluster by:对同一字段分桶并排序,不能和 sort by 连用。cluster by除了具有distribute by的功能外还兼具sort by的功能。但是排序只能是倒叙排序,不能指定排序规则为ASC或者DESC。
distribute by + sort by:distribute by是控制在map端如何拆分数据给reduce端的。hive会根据distribute by后面列,对应reduce的个数进行分发,默认是采用hash算法。sort by为每个reduce产生一个排序文件。在有些情况下,你需要控制某个特定行应该到哪个reducer,这通常是为了进行后续的聚集操作。distribute by刚好可以做这件事。因此,distribute by经常和sort by配合使用
sort by:sort by不是全局排序,其在数据进入reducer前完成排序,因此,如果用sort by进行排序,并且设置mapred.reduce.tasks>1,则sort by只会保证每个reducer的输出有序,并不保证全局有序。sort by不同于order by,它不受hive.mapred.mode属性的影响,sort by的数据只能保证在同一个reduce中的数据可以按指定字段排序。使用sort by你可以指定执行的reduce个数(通过set mapred.reduce.tasks=n来指定),对输出的数据再执行归并排序,即可得到全部结果。
order by:全局排序,缺陷是只能使用一个 reduce
笛卡尔积 当 Hive 设定为严格模式(hive.mapred.mode=strict)时,不允许在 HQL 语句中出现笛卡尔积, 这实际说明了 Hive 对笛卡尔积支持较弱。因为找不到 Join key,Hive 只能使用 1 个 reducer 来完成笛卡尔积。
当然也可以使用 limit 的办法来减少某个表参与 join 的数据量,但对于需要笛卡尔积语义的 需求来说,经常是一个大表和一个小表的 Join 操作,结果仍然很大(以至于无法用单机处 理),这时 MapJoin才是最好的解决办法。MapJoin,顾名思义,会在 Map 端完成 Join 操作。 这需要将 Join 操作的一个或多个表完全读入内存。
PS:MapJoin 在子查询中可能出现未知 BUG。在大表和小表做笛卡尔积时,规避笛卡尔积的 方法是,给 Join 添加一个 Join key,原理很简单:将小表扩充一列 join key,并将小表的条 目复制数倍,join key 各不相同;将大表扩充一列 join key 为随机数。
精髓就在于复制几倍,最后就有几个 reduce 来做,而且大表的数据是前面小表扩张 key 值 范围里面随机出来的,所以复制了几倍 n,就相当于这个随机范围就有多大 n,那么相应的, 大表的数据就被随机的分为了 n 份。并且最后处理所用的 reduce 数量也是 n,而且也不会 出现数据倾斜。
设置合理的 maptask 数量 Map 数过大
Map 阶段输出文件太小,产生大量小文件
初始化和创建 Map 的开销很大
Map 数太小
文件处理或查询并发度小,Job 执行时间过长
大量作业时,容易堵塞集群
在 MapReduce 的编程案例中,我们得知,一个MR Job的 MapTask 数量是由输入分片 InputSplit 决定的。而输入分片是由 FileInputFormat.getSplit()决定的。一个输入分片对应一个 MapTask, 而输入分片是由三个参数决定的:
输入分片大小的计算是这么计算出来的:
long splitSize = Math.max(minSize, Math.min(maxSize, blockSize))
默认情况下,输入分片大小和 HDFS 集群默认数据块大小一致,也就是默认一个数据块,启 用一个 MapTask 进行处理,这样做的好处是避免了服务器节点之间的数据传输,提高 job 处 理效率
两种经典的控制 MapTask 的个数方案:减少 MapTask 数或者增加 MapTask 数
减少 MapTask 数是通过合并小文件来实现,这一点主要是针对数据源
增加 MapTask 数可以通过控制上一个 job 的 reduceTask 个数
因为 Hive 语句最终要转换为一系列的 MapReduce Job 的,而每一个 MapReduce Job 是由一 系列的 MapTask 和 ReduceTask 组成的,默认情况下, MapReduce 中一个 MapTask 或者一个 ReduceTask 就会启动一个 JVM 进程,一个 Task 执行完毕后, JVM 进程就退出。这样如果任 务花费时间很短,又要多次启动 JVM 的情况下,JVM 的启动时间会变成一个比较大的消耗, 这个时候,就可以通过重用 JVM 来解决:
set mapred.job.reuse.jvm.num.tasks=5
小文件合并 件数目过多,会给 HDFS 带来压力,并且会影响处理效率,可以通过合并 Map 和 Reduce 的 结果文件来消除这样的影响:
set hive.merge.mapfiles = true ##在 map only 的任务结束时合并小文件
set hive.merge.mapredfiles = false ## true 时在 MapReduce 的任务结束时合并小文件
set hive.merge.size.per.task = 2561000 1000 ##合并文件的大小
set mapred.max.split.size=256000000; ##每个 Map 最大分割大小
set mapred.min.split.size.per.node=1; ##一个节点上 split 的最少值
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; ##执行 Map 前进行小文件合并
设置合理的 reduceTask 的数量 Hadoop MapReduce 程序中,reducer 个数的设定极大影响执行效率,这使得 Hive 怎样决定 reducer 个数成为一个关键问题。遗憾的是 Hive 的估计机制很弱,不指定 reducer 个数的情 况下,Hive 会猜测确定一个 reducer 个数,基于以下两个设定:
hive.exec.reducers.bytes.per.reducer(默认为 256000000)
hive.exec.reducers.max(默认为 1009)
mapreduce.job.reduces=-1(设置一个常量 reducetask 数量)
计算 reducer 数的公式很简单: N=min(参数 2,总输入数据量/参数 1) 通常情况下,有必要手动指定 reducer 个数。考虑到 map 阶段的输出数据量通常会比输入有 大幅减少,因此即使不设定 reducer 个数,重设参数 2 还是必要的。
依据 Hadoop 的经验,可以将参数 2 设定为 0.95*(集群中 datanode 个数)。
合并 MapReduce 操作 Multi-group by 是 Hive 的一个非常好的特性,它使得 Hive 中利用中间结果变得非常方便。 例如:
1 2 3 4 5 6 FROM (SELECT a.status, b.school, b.gender FROM status_updates a JOIN profiles b ON (a.userid = b.userid and a.ds='2009-03-20' ) ) subq1 INSERT OVERWRITE TABLE gender_summary PARTITION(ds='2009-03-20') SELECT subq1.gender, COUNT(1) GROUP BY subq1.gender INSERT OVERWRITE TABLE school_summary PARTITION(ds='2009-03-20') SELECT subq1.school, COUNT(1) GROUP BY subq1.school
上述查询语句使用了 multi-group by 特性连续 group by 了 2 次数据,使用不同的 group by key。 这一特性可以减少一次 MapReduce 操作
合理利用分桶:Bucketing 和 Sampling Bucket 是指将数据以指定列的值为 key 进行 hash,hash 到指定数目的桶中。这样就可以支 持高效采样了。如下例就是以 userid 这一列为 bucket 的依据,共设置 32 个 buckets
1 2 3 4 5 6 7 8 9 10 11 CREATE TABLE page_view(viewTime INT, userid BIGINT, page_url STRING, referrer_url STRING, ip STRING COMMENT 'IP Address of the User') COMMENT 'This is the page view table' PARTITIONED BY(dt STRING, country STRING) CLUSTERED BY(userid) SORTED BY(viewTime) INTO 32 BUCKETS ROW FORMAT DELIMITED FIELDS TERMINATED BY '1' COLLECTION ITEMS TERMINATED BY '2' MAP KEYS TERMINATED BY '3' STORED AS SEQUENCEFILE;
通常情况下,Sampling 在全体数据上进行采样,这样效率自然就低,它要去访问所有数据。 而如果一个表已经对某一列制作了 bucket,就可以采样所有桶中指定序号的某个桶,这就 减少了访问量。
如下例所示就是采样了 page_view 中 32 个桶中的第三个桶的全部数据:
SELECT * FROM page_view TABLESAMPLE(BUCKET 3 OUT OF 32);
如下例所示就是采样了 page_view 中 32 个桶中的第三个桶的一半数据:
SELECT * FROM page_view TABLESAMPLE(BUCKET 3 OUT OF 64);
合理利用分区:Partition Partition 就是分区。分区通过在创建表时启用 partitioned by 实现,用来 partition 的维度并不 是实际数据的某一列,具体分区的标志是由插入内容时给定的。当要查询某一分区的内容时 可以采用 where 语句,形似 where tablename.partition_column = a 来实现。
Join 优化 总体原则:
优先过滤后再进行 Join 操作,最大限度的减少参与 join 的数据量
小表 join 大表,最好启动 mapjoin
Join on 的条件相同的话,最好放入同一个 job,并且 join 表的排列顺序从小到大
在使用写有 Join 操作的查询语句时有一条原则:应该将条目少的表/子查询放在 Join 操作 符的左边。原因是在 Join 操作的 Reduce 阶段,位于 Join 操作符左边的表的内容会被加 载进内存,将条目少的表放在左边,可以有效减少发生 OOM 错误的几率。对于一条语句 中有多个 Join 的情况,如果 Join 的条件相同,比如查询
1 2 3 4 INSERT OVERWRITE TABLE pv_users SELECT pv.pageid, u.age FROM page_view p JOIN user u ON (pv.userid = u.userid) JOIN newuser x ON (u.userid = x.userid);
如果 Join 的 key 相同,不管有多少个表,都会则会合并为一个 Map-Reduce 任务,而不 是”n”个,在做 OUTER JOIN 的时候也是一样
在编写 Join 查询语句时,如果确定是由于 join 出现的数据倾斜,那么请做如下设置:
1 2 set hive.skewjoin.key=100000; // 这个是 join 的键对应的记录条数超过这个值则会进行 分拆,值根据具体数据量设置 set hive.optimize.skewjoin=true; // 如果是 join 过程出现倾斜应该设置为 true
Group By 优化
Map 端部分聚合 并不是所有的聚合操作都需要在 Reduce 端完成,很多聚合操作都可以先在 Map 端进 行部分聚合,最后在 Reduce 端得出最终结果。 MapReduce 的 combiner 组件参数包括:
1 2 set hive.map.aggr = true 是否在 Map 端进行聚合,默认为 True set hive.groupby.mapaggr.checkinterval = 100000 在 Map 端进行聚合操作的条目数目
使用 Group By 有数据倾斜的时候进行负载均衡 当 sql 语句使用 groupby 时数据出现倾斜时,如果该变量设置为 true,那么 Hive 会自动进行 负载均衡。策略就是把 MR 任务拆分成两个:第一个先做预汇总,第二个再做最终汇总 在 MR 的第一个阶段中,Map 的输出结果集合会缓存到 maptaks 中,每个 Reduce 做部分聚 合操作,并输出结果,这样处理的结果是相同 Group By Key 有可能被分发到不同的 Reduce 中, 从而达到负载均衡的目的;第二个阶段 再根据预处理的数据结果按照 Group By Key 分布到 Reduce 中(这个过程可以保证相同的 Group By Key 被分布到同一个 Reduce 中),最后完成 最终的聚合操作。
1 set hive.groupby.skewindata = true
合理利用文件存储格式 创建表时,尽量使用 orc、parquet 这些列式存储格式,因为列式存储的表,每一列的数据在 物理上是存储在一起的,Hive 查询时会只遍历需要列数据,大大减少处理的数据量
本地模式执行 MapReduce Hive 在集群上查询时,默认是在集群上 N 台机器上运行, 需要多个机器进行协调运行,这 个方式很好地解决了大数据量的查询问题。但是当 Hive 查询处理的数据量比较小时,其实 没有必要启动分布式模式去执行,因为以分布式方式执行就涉及到跨网络传输、多节点协调 等,并且消耗资源。这个时间可以只使用本地模式来执行 mapreduce job,只在一台机器上 执行,速度会很快。
并行化处理 一个 hive sql 语句可能会转为多个 mapreduce Job,每一个 job 就是一个 stage,这些 job 顺序 执行,这个在 cli 的运行日志中也可以看到。但是有时候这些任务之间并不是是相互依赖的, 如果集群资源允许的话,可以让多个并不相互依赖 stage 并发执行,这样就节约了时间,提 高了执行速度,但是如果集群资源匮乏时,启用并行化反倒是会导致各个 job 相互抢占资源 而导致整体执行性能的下降。启用并行化:
1 2 set hive.exec.parallel=true; set hive.exec.parallel.thread.number=8; //同一个 sql 允许并行任务的最大线程数
设置压缩存储 Hive 最终是转为 MapReduce 程序来执行的,而 MapReduce 的性能瓶颈在于网络 IO 和 磁盘 IO,要解决性能瓶颈,最主要的是减少数据量,对数据进行压缩是个好的方式。压缩 虽然是减少了数据量,但是压缩过程要消耗 CPU 的,但是在 Hadoop 中, 往往性能瓶颈不 在于 CPU,CPU 压力并不大,所以压缩充分利用了比较空闲的 CPU