0%

主板+cpu:
一般来说选购17cm*17cm的ITX主板,加低功耗cpu。
1.华擎J3455集成主板,主板和CPU一体的,被动散热,不需要风扇。只要没有解码视频的需求,这款绝对够用了,但是这款现在货很少,不是很好买,我当时520块钱买的
2.N5095集成主板,J4125集成主板。和J3455类似,但是在nas的使用量上没有j3455高
3.也可以自己买ITX主板+低压CPU+ITX散热器
4.也可以买ATX主板+低压CPU

散热器:
背动散热的集成主板不需要散热器
主动散热购买ITX散热器就行,比如 大镰刀(SCYTHE)S950MCPU散热器/超频三刀锋散热器

机箱:
1.比较推荐去闲鱼淘一个4盘位的附带电源的蜗牛星际机箱,不到200可以买到
2.也可以直接买ITX机箱/ATX机箱,好处是空间比较大安装方便,坏处是机箱很少有超过2个HDD盘位的,扩展性比较差

电源:
1.可以在机箱方案1中和机箱一起淘一个
2.自己买的话也别挑太差的,不然容易损坏硬盘

硬盘:
系统盘:用.m2的固态硬盘,这个拼多多上随便买个128G或者256G的就行,100元左右
数据盘:买监控盘或者绿盘以上的硬盘,太差的损坏率太高。比如希捷的酷狼,记得要买同一型号的同一大小的硬盘。

内存:看主板支持的内存型号购买,8G*2一般来说够了,推荐金百达品牌,频率不用太高,200出头就能买到

SATA扩展槽:主板没有很多SATA插槽的话就要买SATA PCI扩展槽,几十块钱

特点

  • 真正的列式数据库管理系统
    除了数据本身外不存在其他额外的数据,避免了在值旁边存储它们的长度«number»

  • 完备的DBMS功能
    允许在运行时创建表和数据库、加载数据和运行查询,而无需重新配置或重启服务。

  • 数据压缩
    除了在磁盘空间和CPU消耗之间进行不同权衡的高效通用压缩编解码器之外,ClickHouse还提供针对特定类型数据的专用编解码器,这使得ClickHouse能够与更小的数据库(如时间序列数据库)竞争并超越它们。

  • 数据的磁盘存储
    ClickHouse被设计用于工作在传统磁盘上的系统,它提供每GB更低的存储成本,但如果可以使用SSD和内存,它也会合理的利用这些资源。

  • 多核心并行处理
    ClickHouse会使用服务器上一切可用的资源,从而以最自然的方式并行处理大型查询。

  • 多服务器分布式处理
    在ClickHouse中,数据可以保存在不同的shard上,每一个shard都由一组用于容错的replica组成,查询可以并行地在所有shard上进行处理。这些对用户来说是透明的

  • 支持SQL
    ClickHouse支持一种基于SQL的声明式查询语言,它在许多情况下与ANSI SQL标准相同。
    支持的查询GROUP BY, ORDER BY, FROM, JOIN, IN以及非相关子查询。
    相关(依赖性)子查询和窗口函数暂不受支持,但将来会被实现。

  • 向量引擎
    为了高效的使用CPU,数据不仅仅按列存储,同时还按向量(列的一部分)进行处理,这样可以更加高效地使用CPU。

  • 实时的数据更新
    ClickHouse支持在表中定义主键。为了使查询能够快速在主键中进行范围查找,数据总是以增量的方式有序的存储在MergeTree中。因此,数据可以持续不断地高效的写入到表中,并且写入的过程中不会存在任何加锁的行为。

  • 索引
    按照主键对数据进行排序,这将帮助ClickHouse在几十毫秒以内完成对数据特定值或范围的查找。

  • 支持近似计算
    ClickHouse提供各种各样在允许牺牲数据精度的情况下对查询进行加速的方法:

    1. 用于近似计算的各类聚合函数,如:distinct values, medians, quantiles
    2. 基于数据的部分样本进行近似查询。这时,仅会从磁盘检索少部分比例的数据。
    3. 不使用全部的聚合条件,通过随机选择有限个数据聚合条件进行聚合。这在数据聚合条件满足某些分布条件下,在提供相当准确的聚合结果的同时降低了计算资源的使用。
  • Adaptive Join Algorithm
    ClickHouse支持自定义JOIN多个表,它更倾向于散列连接算法,如果有多个大表,则使用合并-连接算法

  • 支持数据复制和数据完整性
    ClickHouse使用异步的多主复制技术。当数据被写入任何一个可用副本后,系统会在后台将数据分发给其他副本,以保证系统在不同副本上保持相同的数据。在大多数情况下ClickHouse能在故障后自动恢复,在一些少数的复杂情况下需要手动恢复。
    更多信息,参见 数据复制。

  • 表引擎
    MergeTree
    MergeTree这个名词是在我们耳熟能详的LSM Tree之上做减法而来——去掉了MemTable和Log

    • 适用于高负载,最强大的表引擎
    • 同构后续的后台数据处理,快速插入数据,然后应用规则在后台合并这些部分
    • 支持数据复制、分区、辅助数据跳过索引以及其他引擎不支持的其他功能

    Log

    • 轻量级引擎,功能最少。当您需要快速编写许多小表(最多约100万行)并在以后作为一个整体读取它们时,它们是最有效的。

    Integration Engines

    • 与其他数据存储和处理系统通信的引擎。

    Special Engines(特殊引擎,不知道如何分类,ClickHouse特有的)

  • 高阶函数
    Retention 留存分析
    windowFunnel 漏斗查询
    sequenceCount 关键链路分析

集群架构

ClickHouse 不同于 Elasticsearch、HDFS 这类主从架构的分布式系统,它采用多主(无中心)架构,集群中的每个节点角色对等,客户端访问任意一个节点都能得到相同的效果。

ClickHouse 借助分片将数据进行横向切分,而分片依赖集群,每个集群由 1 到多个分片组成,每个分片对应了 CH 的 1 个服务节点;分片数量的上限取决与节点数量(1 个分片只能对应 1 个服务节点)。

但是 ClickHouse 并不像其他分布式系统那样,拥有高度自动化的分片功能;CH 提供了本地表与分布式表的概念;一张本地表等同于一个数据分片。而分布式表是张逻辑表,本身不存储任何数据,它是本地表的访问代理,其作用类似分库中间件。借助分布式表,能够代理访问多个数据分片,从而实现分布式查询。当然,也可以在应用层实现数据分发。

ClickHouse 同时支持数据副本,其副本概念与 Elasticsearch 类似,但在 CH 中分片其实是一种逻辑概念,其物理承载是由副本承担的。

ClickHouse 的数据副本一般通过 ReplicatedMergeTree 复制表系列引擎实现,副本之间借助 ZooKeeper 实现数据的一致性。此外也可通过分布式表负责同时进行分片和副本的数据写入工作。

263edcd9-aad5-4eca-8e42-47ae63552133.png

存储组织结构

245f590d-0568-40a1-b279-045fabff5e21.png

分区目录

分区目录命名

多个分区目录组成一个分区,目录命名规则如下:

PartitionId_MinBlockNum_MaxBlockNum_Level

PartitionID:分区id,例如20210301。
MinBlockNum:最小分区块编号,自增类型,从1开始向上递增。每产生一个新的目录分区就向上递增一个数字。
MaxBlockNum:最大分区块编号,新创建的分区MinBlockNum等于MaxBlockNum的编号。
Level:合并的层级,被合并的次数。合并次数越多,层级值越大。

69e1520c-0368-47a1-9b4a-4e61807ad754.png

分区目录合并

c36bff66-8fbe-4b44-afe5-970e0533000a.png

数据

目录内文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
*── 200002_1_1_0
* *── Birthday.bin
* *── Birthday.mrk2
* *── checksums.txt
* *── columns.txt
* *── count.txt
* *── Id.bin
* *── Id.mrk2
* *── minmax_Birthday.idx
* *── Name.bin
* *── Name.mrk2
* *── partition.dat
* *── primary.idx
*── detached
*── format_version.txt
文件名 描述 作用
primary.idx 索引文件 用于存放稀疏索引
[Column].mrk2 标记文件 保存了bin文件中数据的偏移信息,用于建立primary.idx和[Column].bin文件之间的映射
[Column].bin 数据文件 存储数据,默认使用lz4压缩存储
partition.dat 分区文件 用于保存分区表达式生成的值
minmax_[Column].idx minmax索引 用于记录当前分区下分区字段的最小最大值
columns.txt 列信息文件 用于保存表的列字段信息
count.txt 计数文件 用于记录当前分区目录下数据的总行数
checksums.txt 校验文件 存放以上各个文件的size以及哈希值,用于快速校验文件的完整性

primary.idx结构

0cd44d48-2323-4aa9-9020-b22c51f58395.png

.mrk2

54e7d7ec-10af-4e22-9f93-dac77d4b9342.png

  1. Offset in compressed file,8 Bytes,代表该标记指向的压缩数据块在bin文件中的偏移量。
  2. Offset in decompressed block,8 Bytes,代表该标记指向的数据在解压数据块中的偏移量。
  3. Rows count,8 Bytes,行数,通常情况下其等于index_granularity。
    86c3005c-373c-4ca9-91ee-9f030707baf3.png

.bin

409a3098-cfe5-4da7-b410-7912e7532d91.png

  1. Checksum,16 Bytes,用于对后面的数据进行校验。
  2. Compression algorithm,1 Byte,默认是LZ4,编号为0x82。
  3. Compressed size,4 Bytes,其值等于Compression algorithm + Compressed size + Decompressed size + Compressed data的长度
  4. Decompressed size,4 Bytes,数据解压缩后的长度。
  5. Compressed data,压缩数据,长度为Compressed size - 9。

索引过程

12b5387a-cb0a-4955-882a-8c8fcb846f18.png
691d5682-433f-42f3-aac8-cb70d4c78a1e.png

二级索引(跳树索引)

一级索引决定了数据排布的方式,但是当满足最左前缀原则时,数据查询需要扫描全部文件,这时候可以借助二级索引进行加速。

  1. minmax
    minmax顾名思义就是和分区目录下的 minmax_{column_name}.idx 文件类似,只不过不再是只有一个min/max值,例如上面的minmax跳数索引 a,表示每隔 3 * index_granularity 的区间就会记录一次 u64 * i32 和 s 的最大最小值。当我们通过s查询数据时,可以先基于minmax值判断,从而跳过大多数不需要扫描的索引粒度。
  2. set(max_rows)
    保存指定表达式的去重值,尤其是对于那些重复性很高的列,例如性别、年龄段等,max_rows 参数表示在一个索引粒度内,最多记录不超过 max_rows 行,即不多于 max_rows 个去重值,max_rows=0 表示不限制。
  3. ngrambf_v1(n, size_of_bloom_filter_in_bytes, number_of_hash_functions, random_seed)
    存储一个包含数据块中所有 n元短语(ngram) 的布隆过滤器(Bloom filter)。对String, FixedString 和 Map类型数据有效,可用于优化 EQUALS, LIKE 和 IN表达式。

MergeTree

建表语句

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1] [TTL expr1],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2] [TTL expr2],
...
INDEX index_name1 expr1 TYPE type1(...) GRANULARITY value1,
INDEX index_name2 expr2 TYPE type2(...) GRANULARITY value2,
...
PROJECTION projection_name_1 (SELECT <COLUMN LIST EXPR> [GROUP BY] [ORDER BY]),
PROJECTION projection_name_2 (SELECT <COLUMN LIST EXPR> [GROUP BY] [ORDER BY])
) ENGINE = MergeTree()
ORDER BY expr,column1,cloumn2
[PARTITION BY expr]
[PRIMARY KEY expr]
[SAMPLE BY expr]
[TTL expr
[DELETE|TO DISK 'xxx'|TO VOLUME 'xxx' [, ...] ]
[WHERE conditions]
[GROUP BY key_expr [SET v1 = aggr_func(v1) [, v2 = aggr_func(v2) ...]] ] ]
[SETTINGS name=value, ...]

[必填选项]
ENGINE:引擎名字,MergeTree引擎无参数。
ORDER BY:排序键,可以由一列或多列组成,决定了数据以何种方式进行排序,例如ORDER BY(CounterID, EventDate)。如果没有显示指定PRIMARY KEY,那么将使用ORDER BY作为PRIMARY KEY。通常只指定ORDER BY即可。

[选填选项]
PARTITION BY:分区键,指明表中的数据以何种规则进行分区。分区是在一个表中通过指定的规则划分而成的逻辑数据集。分区可以按任意标准进行,如按月、按日或按事件类型。为了减少需要操作的数据,每个分区都是分开存储的。
PRIMARY KEY:主键,设置后会按照主键生成一级索引(primary.idx),数据会依据索引的设置进行排序,从而加速查询性能。默认情况下,PRIMARY KEY与ORDER BY设置相同,所以通常情况下直接使用ORDER BY设置来替代主键设置。
SAMPLE BY:数据采样设置,如果显示配置了该选项,那么主键配置中也应该包括此配置。例如 ORDER BY CounterID / EventDate / intHash32(UserID)、SAMPLE BY intHash32(UserID)。
TTL:数据存活时间,可以为某一字段列或者一整张表设置TTL,设置中必须包含Date或DateTime字段类型。如果设置在列上,那么会删除字段中过期的数据。如果设置的是表级的TTL,那么会删除表中过期的数据。如果设置了两种类型,那么按先到期的为准。例如,TTL createtime + INTERVAL 1 DAY,即一天后过期。使用场景包括定期删除数据,或者定期将数据进行归档。
index_granularity:索引间隔粒度。MergeTree索引为稀疏索引,每index_granularity个数据产生一条索引。index_granularity默认设置为8092。
enable_mixed_granularity_parts:是否启动index_granularity_bytes来控制索引粒度大小。
index_granularity_bytes:索引粒度,以字节为单位,默认10Mb。
merge_max_block_size:数据块合并最大记录个数,默认8192。
merge_with_ttl_timeout:合并频率最小时间间隔,默认1天。

ReplacingMergeTree

该引擎适合于经常要根据’主键’进行数据更新的数据(upsert),主键加引号是因为,其实是根据order by定义的字段而不是根据primary key的字段去重的.
更新是非实时的,只有在分区目录合并或者手动触发时会去重

SummingMergeTree,AggregatingMergeTree

预聚合表引擎,此引擎适合于要查询聚合结果而不关心明细数据的场景,比如查询的是每个人月的销量综合,而不是每一单的实际销量.
SummingMergeTree只能计算sum,AggregatingMergeTree是升级版,支持多种聚合算法。但是比较复杂,其插入数据必须要以 insert into select的方式不能是insert into values的形式,通常作为雾化表使用。
例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
CREATE TABLE mydatabase.amTest
(
`id` Int8,
`name` String,
`code` AggregateFunction(uniq, String),
`date` Date,
`score` AggregateFunction(sum, Int16),
`score2` AggregateFunction(avg, Int16)
)
ENGINE = AggregatingMergeTree
PARTITION BY date
ORDER BY (id, name);

CREATE MATERIALIZED VIEW amTest
ENGINE = AggregatingMergeTree
PARTITION BY date
primary key(id,name)
ORDER BY (id, name,sex)
AS SELECT
id,name,date,uniqState(code),sumState(score) as score,avgState(score) as score2
FROM xxx GROUP BY id,name;

CollapsingMergeTree,VersionedCollapsingMergeTree

这种引擎的适应场景在于有些时候需要删除数据或者更新数据。很少使用。

分布式

ClickHouse学习系列之四【副本&分片部署说明】

集群配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
 <remote_servers>
<ck_cluster>
<shard>
<weight>1</weight>
<internal_replication>true</internal_replication>
<replica>
<host>10.161.4.40</host>
<port>9000</port>
<user>default</user>
<password>clickhouse9z</password>
</replica>
</shard>
<shard>
<weight>1</weight>
<internal_replication>true</internal_replication>
<replica>
<host>10.161.4.41</host>
<port>9000</port>
<user>default</user>
<password>clickhouse9z</password>
</replica>
</shard>
<shard>
<weight>1</weight>
<internal_replication>true</internal_replication>
<replica>
<host>10.161.4.42</host>
<port>9000</port>
<user>default</user>
<password>clickhouse9z</password>
</replica>
</shard>
</ck_cluster>

</remote_servers>

ReplicatedMergeTree

1
2
3
4
5
6
7
8
9
10
11
12
13
CREATE TABLE IF NOT EXISTS data.event_local ON CLUSTER ck_cluster(
`event` String,
`user_id` Int64,
`distinct_id` String,
`date` Date,
`time` DateTime,
`timezone_offset` Float64,
`logdate` Date
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/event','{replica}')
PARTITION BY logdate
ORDER BY (event,user_id,receive_time)
sample by intHash32(user_id);

我们在创建表的时候指定了ReplicatedMergeTree(xxxx),里面传递了两个参数,我们对这两个参数一一描述

  • /clickhouse/tables/ 这一部分指定的是在ZK上创建的路径地址,可随意变换只要记得即可
  • {shard} 指的是分片的标志,同一个分片内的所有机器应该保持相同。建议使用使用的是集群名+分片名的配置也就是{layer}-{shard}
  • {replica} 参数建议在macros配置成机器的hostname,因为每台机器的hostname都是不一样的,因此就能确保每个表的识别符都是唯一的了

当使用ON CLUSTER创建表时,{shard}和{replica}clickhouse会自动填充,而且在集群内任意一台机器执行即可,不需要在所有机器执行

Distributed

分布式引擎本身不存储数据, 但可以在多个服务器上进行分布式查询。 读是自动并行的。读取时,远程服务器表的索引(如果有的话)会被使用。

例:event_local

1
2
3
CREATE TABLE IF NOT EXISTS data.event ON CLUSTER ck_cluster
AS data.event_local
ENGINE = Distributed(ck_cluster, data, event_local,int64hash(user_id));

GLOBAL & LOCAL

使用distributed表时,查询会下推到各个分片执行,多个结果再UNION到一起。如果在查询中使用子查询,通常要使用GLOBAL参数(GLOBAL IN/GLOBAL JOIN)来保证子查询数据的汇总。
ClickHouse分布式IN & JOIN 查询的避坑指南

#高阶函数
ClickHouse 高阶函数

数据导入

Datax

{
    "job":
    {
        "setting":
        {
            "speed":
            {
                "channel":10
            },
            "errorLimit":
            {
                "record": 0,
                "percentage": 0.02
            }
        },
        "content":
        [
            {
                "reader":
                {
                    "name": "hdfsreader",
                    "parameter":
                    {
                        "path": "/user/hive/warehouse/data.db/event/logdate=20220518",
                        "defaultFS": "hdfs://x.x.x.x:xxxx",
                        "column":
                        [
                           {"index": 0, "type": "string"}, {"index": 1, "type": "long"}, {"index": 2, "type": "string"}, {"index": 3, "type": "date"}, {"index": 4, "type": "date"} {"value": "20220518", "type": "string"}
                        ],
                        "fileType": "orc",
                        "encoding": "UTF-8",
                        "fieldDelimiter": ","
                    }
                },
                "writer":
                {
                    "name": "clickhousewriter",
                    "parameter":
                    {
                        "username": "default",
                        "password": "clickhouse9z",
                        "column":
                        [
                            "event",
                            "user_id",
                            "distinct_id",
                            "date",
                            "logdate"
                        ],
                        "connection":
                        [
                            {
                                "jdbcUrl": "jdbc:clickhouse://x.x.x.x:xxxx/data",
                                "table":
                                [
                                    "event"
                                ]
                            }
                        ]
                    }
                }
            }
        ]
    }
}

Airflow

介绍

airflow是一款开源的,分布式任务调度框架,它将一个具有上下级依赖关系的工作流,组装成一个有向无环图。

特点:

  • 分布式任务调度:允许一个工作流的task在多台worker上同时执行
  • 可构建任务依赖:以有向无环图的方式构建任务依赖关系
  • task原子性:工作流上每个task都是原子可重试的,一个工作流某个环节的task失败可自动或手动进行重试,不必从头开始任务

架构

35a160b63e7389fe12f451e299ab0c00.jpg

webserver : 提供web端服务,以及会定时生成子进程去扫描对应的目录下的dags,并更新数据库

scheduler : 任务调度服务,根据dags生成任务,并提交到消息中间件队列中 (redis或rabbitMq)

celery worker : 分布在不同的机器上,作为任务真正的的执行节点。通过监听消息中间件: redis或rabbitMq 领取任务

flower : 监控worker进程的存活性,启动或关闭worker进程,查看运行的task

celery

d4e3cc47e43e1073f8d347eede9d297a.png

[1] Web server –> Workers : 获取任务执行日志。

[2] Web server –> DAG files : 展示DAG结构。

[3] Web server –> Database :获取任务状态。

[4] Workers –> DAG files :展示DAG结构和执行任务。

[5] Workers –> Database : 获取和存储连接配置信息、变量XCOM。

[6] Workers –> Celery’s result backend : 存储任务执行信息。

[7] Workers –> Celery’s broker :存储执行的命令。

[8] Scheduler –> Database : 存储DAG运行信息和相关的任务。

[9] Scheduler –> DAG files : 展示DAG的结构和执行任务。

[10] Scheduler –> Celery’s result backend : 获取已经执行完的任务信息。

[11] Scheduler –> Celery’s broker : 把执行的命令发送给Celery’s broker。

角色

Task

就是任务,有异步任务和定时任务

Broker

中间人,接收生产者发来的消息即Task,将任务存入队列。任务的消费者是Worker。

Celery本身不提供队列服务,推荐用Redis或RabbitMQ实现队列服务。

Worker

执行任务的单元,它实时监控消息队列,如果有任务就获取任务并执行它。

Beat

定时任务调度器,根据配置定时将任务发送给Broker。

Backend

用于存储任务的执行结果。

3cc47a82b844cca11c599250b4b491ae.png

基本架构

85da297f86b3880a9192397b08ae953d.png

  1. 用户接口: shell/CLI, jdbc/odbc, webui Command Line Interface
  2. 跨语言服务 : thrift server 提供了一种能力,让用户可以使用多种不同的语言来操纵hive
  3. Driver
    Driver 组件完成 HQL 查询语句从词法分析,语法分析,编译,优化,以及生成逻辑执行 计划的生成。生成的逻辑执行计划存储在 HDFS 中,并随后由 MapReduce 调用执行
    Hive 的核心是驱动引擎, 驱动引擎由四部分组成:
    1. 解释器:解释器的作用是将 HiveSQL 语句转换为抽象语法树(AST)
    2. 编译器:编译器是将语法树编译为逻辑执行计划
    3. 优化器:优化器是对逻辑执行计划进行优化
    4. 执行器:执行器是调用底层的运行框架执行逻辑执行计划
  4. 元数据存储系统 : RDBMS MySQL
    元数据,通俗的讲,就是存储在 Hive 中的数据的描述信息。
    Hive 中的元数据通常包括:表的名字,表的列和分区及其属性,表的属性(内部表和 外部表),表的数据所在目录
    Metastore 默认存在自带的 Derby 数据库中。缺点就是不适合多用户操作,并且数据存 储目录不固定。数据库跟着 Hive 走,极度不方便管理
    解决方案:通常存我们自己创建的 MySQL 库(本地 或 远程)
    Hive 和 MySQL 之间通过 MetaStore 服务交互

执行流程:

HiveQL 通过命令行或者客户端提交,经过 Compiler 编译器,运用 MetaStore 中的元数 据进行类型检测和语法分析,生成一个逻辑方案(Logical Plan),然后通过的优化处理,产生 一个 MapReduce 任务。

数据组织

  1. Hive 的存储结构包括数据库、表、视图、分区和表数据等。数据库,表,分区等等都对 应 HDFS 上的一个目录。表数据对应 HDFS 对应目录下的文件。
  2. Hive 中所有的数据都存储在 HDFS 中,没有专门的数据存储格式,因为 Hive 是读模式 (Schema On Read),可支持 TextFile,SequenceFile,RCFile 或者自定义格式等
  3. 只需要在创建表的时候告诉 Hive 数据中的列分隔符和行分隔符,Hive 就可以解析数据
      Hive 的默认列分隔符:控制符 Ctrl + A,\x01 Hive 的
      Hive 的默认行分隔符:换行符 \n
  4. Hive 中包含以下数据模型:
    database:在 HDFS 中表现为${hive.metastore.warehouse.dir}目录下一个文件夹
    table:在 HDFS 中表现所属 database 目录下一个文件夹
    external table:与 table 类似,不过其数据存放位置可以指定任意 HDFS 目录路径
    partition:在 HDFS 中表现为 table 目录下的子目录
    bucket:在 HDFS 中表现为同一个表目录或者分区目录下根据某个字段的值进行 hash 散 列之后的多个文件
    view:与传统数据库类似,只读,基于基本表创建
  5. Hive 的元数据存储在 RDBMS 中,除元数据外的其它所有数据都基于 HDFS 存储。默认情 况下,Hive 元数据保存在内嵌的 Derby 数据库中,只能允许一个会话连接,只适合简单的 测试。实际生产环境中不适用,为了支持多用户会话,则需要一个独立的元数据库,使用 MySQL 作为元数据库,Hive 内部对 MySQL 提供了很好的支持。
  6. Hive 中的表分为内部表、外部表、分区表和 Bucket 表
    外部表示hive 对存储在 HDFS 上的数据提供了一种新的抽象。而不是管理存储在 HDFS 上的数据。所以不管创建内部 表还是外部表,都可以对 hive 表的数据存储目录中的数据进行增删操作。
    • 内部表和外部表的区别:
      • 删除内部表,删除表元数据和数据
      • 删除外部表,删除元数据,不删除数据
    • 内部表和外部表的使用选择:
      • 大多数情况,他们的区别不明显,如果数据的所有处理都在 Hive 中进行,那么倾向于 选择内部表,但是如果 Hive 和其他工具要针对相同的数据集进行处理,外部表更合适。
      • 使用外部表访问存储在 HDFS 上的初始数据,然后通过 Hive 转换数据并存到内部表中
      • 使用外部表的场景是针对一个数据集有多个不同的 Schema
    • 分区表和分桶表的区别:
      Hive 数据表可以根据某些字段进行分区操作,细化数据管理,可以让部分查询更快。同 时表和分区也可以进一步被划分为 Buckets,分桶表的原理和 MapReduce 编程中的 HashPartitioner 的原理类似。
      分区和分桶都是细化数据管理,但是分区表是手动添加区分,由于 Hive 是读模式,所 以对添加进分区的数据不做模式校验,分桶表中的数据是按照某些分桶字段进行 hash 散列 形成的多个文件,所以数据的准确性也高很多

元数据表

meta表

hive版本

VERSION表,记录hive版本

数据库相关meta表

表名 说明 字段
DBS 该表存储Hive中所有数据库的基本信息 数据库ID,数据库描述,数据库HDFS路径,数据库名,数据库所有者用户名,所有者角色
DATABASE_PARAMS 该表存储数据库的相关参数,在CREATE DATABASE时候用WITH DBPROPERTIES (property_name=property_value, …)指定的参数 数据库ID,参数名,参数值

DBS和DATABASE_PARAMS这两张表通过DB_ID字段关联。

表相关meta表

表名 说明 字段
TBLS 该表中存储Hive表、视图、索引表的基本信息 表ID,创建时间,数据库ID,上次访问时间,所有者,保留字段,序列化配置信息,表名,表类型,视图的详细HQL语句,视图的原始HQL语句
TABLE_PARAMS 该表存储表/视图的属性信息 表ID,属性名,属性值
TBL_PRIVS 该表存储表/视图的授权信息 授权ID,授权时间,授权执行用户,授权者类型,被授权用户,被授权用户类型,权限,表ID

文件存储信息相关的元数据表

由于HDFS支持的文件格式很多,而建Hive表时候也可以指定各种文件格式,Hive在将HQL解析成MapReduce时候,需要知道去哪里,使用哪种格式去读写HDFS文件,而这些信息就保存在这几张表中。

表名 说明 字段
SDS 该表保存文件存储的基本信息,如INPUT_FORMAT、OUTPUT_FORMAT、是否压缩等。TBLS表中的SD_ID与该表关联,可以获取Hive表的存储信息
SD_PARAMS 该表存储Hive存储的属性信息,在创建表时候使用STORED BY ‘storage.handler.class.name’ [WITH SERDEPROPERTIES (…)指定
SERDES 该表存储序列化使用的类信息
SERDE_PARAMS 该表存储序列化的一些属性、格式信息,比如:行、列分隔符

表字段相关的元数据表

表名 说明 字段
COLUMNS_V2 该表存储表对应的字段信息

表分区相关的元数据表

表名 说明 字段
PARTITIONS 该表存储表分区的基本信息
PARTITION_KEYS 该表存储分区的字段信息
PARTITION_KEY_VALS 该表存储分区字段值
PARTITION_PARAMS 该表存储分区的属性信息

数据类型与存储格式

基本数据类型

类型 描述 示例
boolean true/false TRUE
tinyint 1字节的有符号整数 -128~127 1Y
smallint 2个字节的有符号整数,-32768~32767 1S
int 4个字节的带符号整数 1
bigint 8字节带符号整数 1L
float 4字节单精度浮点数 1.0
double 8字节双精度浮点数 1.0
deicimal 任意精度的带符号小数 1.0
String 字符串,变长 “a”,’b’
varchar 变长字符串 “a”,’b’
char 固定长度字符串 “a”,’b’
binary 字节数组 无法表示
timestamp 时间戳,纳秒精度 122327493795
date 日期 ‘2018-04-07’

复杂数据类型

类型 描述 示例
array 有序的的同类型的集合 array(1,2)
map key-value,key必须为原始类型,value可以任意类型 map(‘a’,1,’b’,2)
struct 字段集合,类型可以不同 struct(‘1’,1,1.0), named_stract(‘col1’,’1’,’col2’,1,’clo3’,1.0)

存储格式

Hive会为每个创建的数据库在HDFS上创建一个目录,该数据库的表会以子目录形式存储,表中的数据会以表目录下的文件形式存储。对于default数据库,默认的缺省数据库没有自己的目录,default数据库下的表默认存放在/user/hive/warehouse目录下。

  1. textfile
    textfile为默认格式,存储方式为行存储。数据不做压缩,磁盘开销大,数据解析开销大。
  2. SequenceFile
    SequenceFile是Hadoop API提供的一种二进制文件支持,其具有使用方便、可分割、可压缩的特点。
    SequenceFile支持三种压缩选择:NONE, RECORD, BLOCK。 Record压缩率低,一般建议使用BLOCK压缩。
  3. RCFile
    一种行列存储相结合的存储方式。
  4. ORCFile
    数据按照行分块,每个块按照列存储,其中每个块都存储有一个索引。hive给出的新格式,属于RCFILE的升级版,性能有大幅度提升,而且数据可以压缩存储,压缩快 快速列存取。
  5. Parquet
    Parquet也是一种行式存储,同时具有很好的压缩性能;同时可以减少大量的表扫描和反序列化的时间。

数据格式

当数据存储在文本文件中,必须按照一定格式区别行和列,并且在Hive中指明这些区分符。Hive默认使用了几个平时很少出现的字符,这些字符一般不会作为内容出现在记录中。

Hive默认的行和列分隔符如下表所示。

分隔符 描述
\n 对于文本文件来说,每行是一条记录,所以\n 来分割记录
^A (Ctrl+A) 分割字段,也可以用\001 来表示
^B (Ctrl+B) 用于分割 Arrary 或者 Struct 中的元素,或者用于 map 中键值之间的分割,也可以用\002 分割。
^C 用于 map 中键和值自己分割,也可以用\003 表示。

DDL

hive ddl

数据库DDL

  1. 创建库
  2. 查看库
  3. 删除库
  4. 切换库

表DDL

创建表:

1
2
3
4
5
6
7
8
9
CREATE [EXTERNAL] TABLE [IF NOT EXISTS] table_name
  [(col_name data_type [COMMENT col_comment], ...)]
  [COMMENT table_comment]
  [PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)]
  [CLUSTERED BY (col_name, col_name, ...)
    [SORTED BY (col_name [ASC|DESC], ...)] INTO num_buckets BUCKETS]
  [ROW FORMAT row_format]
  [STORED AS file_format]
  [LOCATION hdfs_path]
  • CREATE TABLE 创建一个指定名字的表。如果相同名字的表已经存在,则抛出异常;用户可以用 IF NOT EXIST 选项来忽略这个异常
  • EXTERNAL 关键字可以让用户创建一个外部表,在建表的同时指定一个指向实际数据的路径(LOCATION)
  • LIKE 允许用户复制现有的表结构,但是不复制数据
  • COMMENT可以为表与字段增加描述
  • PARTITIONED BY 指定分区
  • ROW FORMAT
      DELIMITED [FIELDS TERMINATED BY char] [COLLECTION ITEMS TERMINATED BY char]
        MAP KEYS TERMINATED BY char] [LINES TERMINATED BY char]
        | SERDE serde_name [WITH SERDEPROPERTIES
        (property_name=property_value, property_name=property_value, …)]
      用户在建表的时候可以自定义 SerDe 或者使用自带的 SerDe。如果没有指定 ROW FORMAT 或者 ROW FORMAT DELIMITED,将会使用自带的 SerDe。在建表的时候,
    用户还需要为表指定列,用户在指定表的列的同时也会指定自定义的 SerDe,Hive 通过 SerDe 确定表的具体的列的数据。
  • STORED AS
      SEQUENCEFILE //序列化文件
      | TEXTFILE //普通的文本文件格式
      | RCFILE  //行列存储相结合的文件
      | INPUTFORMAT input_format_classname OUTPUTFORMAT output_format_classname //自定义文件格式
      如果文件数据是纯文本,可以使用 STORED AS TEXTFILE。如果数据需要压缩,使用 STORED AS SEQUENCE 。
  • LOCATION指定表在HDFS的存储路径

查看表

修改表

删除表

清空表

函数&脚本

内置函数

内置函数

窗口函数

hive窗口函数_

窗口函数是用于分析用的一类函数,要理解窗口函数要先从聚合函数说起。 大家都知道聚合函数是将某列中多行的值合并为一行,比如sum、count等。 而窗口函数则可以在本行内做运算,得到多行的结果,即每一行对应一行的值。 通用的窗口函数可以用下面的语法来概括:

Function() Over (Partition By Column1,Column2,Order By Column3)

窗口函数又分为以下三类: 聚合型窗口函数 分析型窗口函数 * 取值型窗口函数

聚合型窗口函数

聚合型即SUM(), MIN(),MAX(),AVG(),COUNT()这些常见的聚合函数。 聚合函数配合窗口函数使用可以使计算更加灵活

分析型窗口函数

分析型即RANk(),ROW_NUMBER(),DENSE_RANK()等常见的排序用的窗口函数

row_number函数:生成连续的序号(相同元素序号相同)

rank函数:如两元素排序相同则序号相同,并且会跳过下一个序号

desrank函数:如两元素排序相同则序号相同,不会跳过下一个序号

取值型窗口函数

这几个函数可以通过字面意思记得,LAG是迟滞的意思,也就是对某一列进行往后错行;LEAD是LAG的反义词,也就是对某一列进行提前几行;FIRST_VALUE是对该列到目前为止的首个值,而LAST_VALUE是到目前行为止的最后一个值。

LAG()和LEAD() 可以带3个参数,第一个是返回的值,第二个是前置或者后置的行数,第三个是默认值。

窗口大小

关键是理解 ROWS BETWEEN 含义,也叫做window子句:

PRECEDING:往前

FOLLOWING:往后

CURRENT ROW:当前行

UNBOUNDED:无边界,UNBOUNDED PRECEDING 表示从最前面的起点开始, UNBOUNDED FOLLOWING:表示到最后面的终点

–其他AVG,MIN,MAX,和SUM用法一样

自定义函数

当 Hive 提供的内置函数无法满足业务处理需要时,此时就可以考虑使用用户自定义函数。

  • UDF(user-defined function)作用于单个数据行,产生一个数据行作为输出。(数学函数,字 符串函数)
  • UDAF(用户定义聚集函数 User- Defined Aggregation Funcation):接收多个输入数据行,并产 生一个输出数据行。(count,max)
  • UDTF(表格生成函数 User-Defined Table Functions):接收一行输入,输出多行(explode)
1
2
3
4
5
6
7
8
9
10
11
import org.apache.hadoop.hive.ql.exec.UDF;

public class ToLowerCase extends UDF{

// 必须是 public,并且 evaluate 方法可以重载
public String evaluate(String field) {
String result = field.toLowerCase();
return result;
}

}

Transform

Hive 的 TRANSFORM 关键字提供了在 SQL 中调用自写脚本的功能。适合实现 Hive 中没有的 功能又不想写 UDF 的情况

1
2
3
4
5
6
7
8
#!/bin/python
import sys
import datetime
for line in sys.stdin:
line = line.strip()
movie,rate,unixtime,userid = line.split('\t')
weekday = datetime.datetime.fromtimestamp(float(unixtime)).isoweekday()
print '\t'.join([movie, rate, str(weekday),userid])
1
2
3
hive>add file /home/hadoop/weekday_mapper.py;
hive> insert into table lastjsontable select transform(movie,rate,unixtime,userid)
using 'python weekday_mapper.py' as(movie,rate,weekday,userid) from rate;

数据倾斜

由于数据分布不均匀,造成数据大量的集中到一点,造成数据热点

hive框架的特性:

  • 不怕数据大,怕数据倾斜
  • Jobs 数比较多的作业运行效率相对比较低,如子查询比较多
  • sum,count,max,min 等聚集函数,通常不会有数据倾斜问题

主要表现:

任务进度长时间维持在 99%或者 100%的附近,查看任务监控页面,发现只有少量 reduce 子任务未完成,因为其处理的数据量和其他的 reduce 差异过大。 单一 reduce 处理的记录数和平均记录数相差太大,通常达到好几倍之多,最长时间远大 于平均时长。

容易出现数据倾斜的情况:

aeaecc7eb67ab1eac021091ee6874318.png

  1. group by 不和聚集函数搭配使用的时候
  2. count(distinct),在数据量大的情况下,容易数据倾斜,因为 count(distinct)是按 group by 字段分组,按 distinct 字段排序
  3. 小表关联超大表 join

产生数据倾斜的原因

  1. key 分布不均匀
  2. 业务数据本身的特性
  3. 建表考虑不周全
  4. 某些 HQL 语句本身就存在数据倾斜

解决办法

参数调节

hive.map.aggr=true

Map 端部分聚合,相当于Combiner

hive.groupby.skewindata=true

有数据倾斜的时候进行负载均衡,当选项设定为 true,生成的查询计划会有两个 MR Job。第一个 MR Job 中,Map 的输出结果集合会随机分布到 Reduce 中,每个 Reduce 做部分聚合操作,并输出结果,这样处理的结果是相同的 Group By Key 有可能被分发到不同的 Reduce 中,从而达到负载均衡的目的;第二个 MR Job 再根据预处理的数据结果按照 Group By Key 分布到 Reduce 中(这个过程可以保证相同的 Group By Key 被分布到同一个 Reduce 中),最后完成最终的聚合操作。

语句调节

  • 如何Join:
    关于驱动表的选取,选用join key分布最均匀的表作为驱动表
    做好列裁剪和filter操作,以达到两表做join的时候,数据量相对变小的效果。
  • 大小表Join:
    使用mapjoin让小的维度表(1000条以下的记录条数)先进内存。在map端完成reduce.
    MAPJION会把小表全部读入内存中,在map阶段直接拿另外一个表的数据和内存中表数据做匹配,而普通的equality join则是类似于mapreduce模型中的file join,需要先分组,然后再reduce端进行连接,使用的时候需要结合着场景;由于mapjoin是在map是进行了join操作,省去了reduce的运行,效率也会高很多
  • 大表Join大表:
    把空值的key变成一个字符串加上随机数,把倾斜的数据分到不同的reduce上,由于null值关联不上,处理后并不影响最终结果。
  • count distinct大量相同特殊值
    count distinct时,将值为空的情况单独处理,如果是计算count distinct,可以不用处理,直接过滤,在最后结果中加1。如果还有其他计算,需要进行group by,可以先将值为空的记录单独处理,再和其他计算结果进行union。
  • group by维度过小:
    采用sum() group by的方式来替换count(distinct)完成计算。
  • 特殊情况特殊处理:
    在业务逻辑优化效果的不大情况下,有些时候是可以将倾斜的数据单独拿出来处理。最后union回去。

例子

  1. 空值产生的数据倾斜
    在日志中,常会有信息丢失的问题,比如日志中的 user_id,如果取其中的 user_id 和用户表中的 user_id 相关联,就会碰到数据倾斜的问题。
    解决方案 1:user_id 为空的不参与关联
    解决方案 2:赋予空值新的 key 值
  2. 不同数据类型关联产生数据倾斜
    用户表中 user_id 字段为 int,log 表中 user_id 为既有 string 也有 int 的类型, 当按照两个表的 user_id 进行 join 操作的时候,默认的 hash 操作会按照 int 类型的 id 进 行分配,这样就会导致所有的 string 类型的 id 就被分到同一个 reducer 当中
    解决方案:把数字类型 id 转换成 string 类型的 id
  3. 大小表关联查询产生数据倾斜
    使用map join解决小表关联大表造成的数据倾斜问题。这个方法使用的频率很高。

hive执行过程

概述

  1. Hive 将 HQL 转换成一组操作符(Operator),比如 GroupByOperator, JoinOperator 等
  2. 操作符 Operator 是 Hive 的最小处理单元
  3. 每个操作符代表一个 HDFS 操作或者 MapReduce 作业
  4. Hive 通过 ExecMapper 和 ExecReducer 执行 MapReduce 程序,执行模式有本地模式和分 布式两种模式

操作符类型

f3c5848ccaddac8a43b1f286665feaca.png

编译器的工作职责

  1. Parser:将 HQL 语句转换成抽象语法树(AST:Abstract Syntax Tree)
  2. Semantic Analyzer:将抽象语法树转换成查询块
  3. Logic Plan Generator:将查询块转换成逻辑查询计划
  4. Logic Optimizer:重写逻辑查询计划,优化逻辑执行计划。优化过程可能包括谓词下推(Predicate Push Down),分区剪裁(Partition Prunner),关联排序(Join Reorder)
  5. Physical Plan Gernerator:将逻辑计划转化成物理计划(MapReduce Jobs)
  6. Physical Optimizer:选择最佳的 Join 策略,优化物理执行计划。比如基于输入选择执行路径,增加备份作业等

优化器类型

e76307484976aefd449c340576495570.png

上表中带①符号的,优化目的都是尽量将任务合并到一个 Job 中,以减少 Job 数量,带②的 优化目的是尽量减少 shuffle 数据量

Join

Map:

  1. 以 JOIN ON 条件中的列作为 Key,如果有多个列,则 Key 是这些列的组合
  2. 以 JOIN 之后所关心的列作为 Value,当有多个列时,Value 是这些列的组合。在 Value 中还会包含表的 Tag 信息,用于标明此 Value 对应于哪个表
  3. 按照 Key 进行排序

Shuffle:

根据 Key 的值进行 Hash,并将 Key/Value 对按照 Hash 值推至不同对 Reduce 中

Reduce:

Reducer 根据 Key 值进行 Join 操作,并且通过 Tag 来识别不同的表中的数据

例子:

SELECT pv.pageid, u.age FROM page_view pv JOIN user u ON pv.userid = u.userid;

%!(EXTRA markdown.ResourceType=, string=, string=)

Group By

实例:

SELECT pageid, age, count(1) FROM pv_users GROUP BY pageid, age;

1cdd2c2d832d98763803d325a69b4859.png

distinct

示例:

SELECT age, count(distinct pageid) FROM pv_users GROUP BY age;

按照 age 分组,然后统计每个分组里面的不重复的 pageid 有多少个

68292b6da4fce41f92317a8ab3794443.png

该 SQL 语句会按照 age 和 pageid 预先分组,进行 distinct 操作。然后会再按 照 age 进行分组,再进行一次 distinct 操作

优化

常用手段

  1. 好的模型设计事半功倍
  2. 解决数据倾斜问题
  3. 减少 job 数
  4. 设置合理的 MapReduce 的 task 数,能有效提升性能。(比如,10w+级别的计算,用 160个 reduce,那是相当的浪费,1 个足够)
  5. 了解数据分布,自己动手解决数据倾斜问题是个不错的选择。这是通用的算法优化,但 算法优化有时不能适应特定业务背景,开发人员了解业务,了解数据,可以通过业务逻辑精 确有效的解决数据倾斜问题
  6. 数据量较大的情况下,慎用 count(distinct),group by 容易产生倾斜问题
  7. 对小文件进行合并,是行之有效的提高调度效率的方法,假如所有的作业设置合理的文件数,对云梯的整体调度效率也会产生积极的正向影响
  8. 优化时把握整体,单个作业最优不如整体最优

排序选择

  • cluster by:对同一字段分桶并排序,不能和 sort by 连用。cluster by除了具有distribute by的功能外还兼具sort by的功能。但是排序只能是倒叙排序,不能指定排序规则为ASC或者DESC。
  • distribute by + sort by:distribute by是控制在map端如何拆分数据给reduce端的。hive会根据distribute by后面列,对应reduce的个数进行分发,默认是采用hash算法。sort by为每个reduce产生一个排序文件。在有些情况下,你需要控制某个特定行应该到哪个reducer,这通常是为了进行后续的聚集操作。distribute by刚好可以做这件事。因此,distribute by经常和sort by配合使用
  • sort by:sort by不是全局排序,其在数据进入reducer前完成排序,因此,如果用sort by进行排序,并且设置mapred.reduce.tasks>1,则sort by只会保证每个reducer的输出有序,并不保证全局有序。sort by不同于order by,它不受hive.mapred.mode属性的影响,sort by的数据只能保证在同一个reduce中的数据可以按指定字段排序。使用sort by你可以指定执行的reduce个数(通过set mapred.reduce.tasks=n来指定),对输出的数据再执行归并排序,即可得到全部结果。
  • order by:全局排序,缺陷是只能使用一个 reduce

笛卡尔积

当 Hive 设定为严格模式(hive.mapred.mode=strict)时,不允许在 HQL 语句中出现笛卡尔积, 这实际说明了 Hive 对笛卡尔积支持较弱。因为找不到 Join key,Hive 只能使用 1 个 reducer 来完成笛卡尔积。

当然也可以使用 limit 的办法来减少某个表参与 join 的数据量,但对于需要笛卡尔积语义的 需求来说,经常是一个大表和一个小表的 Join 操作,结果仍然很大(以至于无法用单机处 理),这时 MapJoin才是最好的解决办法。MapJoin,顾名思义,会在 Map 端完成 Join 操作。 这需要将 Join 操作的一个或多个表完全读入内存。

PS:MapJoin 在子查询中可能出现未知 BUG。在大表和小表做笛卡尔积时,规避笛卡尔积的 方法是,给 Join 添加一个 Join key,原理很简单:将小表扩充一列 join key,并将小表的条 目复制数倍,join key 各不相同;将大表扩充一列 join key 为随机数。

精髓就在于复制几倍,最后就有几个 reduce 来做,而且大表的数据是前面小表扩张 key 值 范围里面随机出来的,所以复制了几倍 n,就相当于这个随机范围就有多大 n,那么相应的, 大表的数据就被随机的分为了 n 份。并且最后处理所用的 reduce 数量也是 n,而且也不会 出现数据倾斜。

设置合理的 maptask 数量

Map 数过大

  • Map 阶段输出文件太小,产生大量小文件
  • 初始化和创建 Map 的开销很大

Map 数太小

  • 文件处理或查询并发度小,Job 执行时间过长
  • 大量作业时,容易堵塞集群

在 MapReduce 的编程案例中,我们得知,一个MR Job的 MapTask 数量是由输入分片 InputSplit 决定的。而输入分片是由 FileInputFormat.getSplit()决定的。一个输入分片对应一个 MapTask, 而输入分片是由三个参数决定的:

e8e26931026e26c5a8c775b0be3273ea.png

输入分片大小的计算是这么计算出来的:

long splitSize = Math.max(minSize, Math.min(maxSize, blockSize))

默认情况下,输入分片大小和 HDFS 集群默认数据块大小一致,也就是默认一个数据块,启 用一个 MapTask 进行处理,这样做的好处是避免了服务器节点之间的数据传输,提高 job 处 理效率

两种经典的控制 MapTask 的个数方案:减少 MapTask 数或者增加 MapTask 数

  1. 减少 MapTask 数是通过合并小文件来实现,这一点主要是针对数据源
  2. 增加 MapTask 数可以通过控制上一个 job 的 reduceTask 个数

因为 Hive 语句最终要转换为一系列的 MapReduce Job 的,而每一个 MapReduce Job 是由一 系列的 MapTask 和 ReduceTask 组成的,默认情况下, MapReduce 中一个 MapTask 或者一个 ReduceTask 就会启动一个 JVM 进程,一个 Task 执行完毕后, JVM 进程就退出。这样如果任 务花费时间很短,又要多次启动 JVM 的情况下,JVM 的启动时间会变成一个比较大的消耗, 这个时候,就可以通过重用 JVM 来解决:

set mapred.job.reuse.jvm.num.tasks=5

小文件合并

件数目过多,会给 HDFS 带来压力,并且会影响处理效率,可以通过合并 Map 和 Reduce 的 结果文件来消除这样的影响:

set hive.merge.mapfiles = true ##在 map only 的任务结束时合并小文件

set hive.merge.mapredfiles = false ## true 时在 MapReduce 的任务结束时合并小文件

set hive.merge.size.per.task = 25610001000 ##合并文件的大小

set mapred.max.split.size=256000000; ##每个 Map 最大分割大小

set mapred.min.split.size.per.node=1; ##一个节点上 split 的最少值

set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; ##执行 Map 前进行小文件合并

设置合理的 reduceTask 的数量

Hadoop MapReduce 程序中,reducer 个数的设定极大影响执行效率,这使得 Hive 怎样决定 reducer 个数成为一个关键问题。遗憾的是 Hive 的估计机制很弱,不指定 reducer 个数的情 况下,Hive 会猜测确定一个 reducer 个数,基于以下两个设定:

  1. hive.exec.reducers.bytes.per.reducer(默认为 256000000)
  2. hive.exec.reducers.max(默认为 1009)
  3. mapreduce.job.reduces=-1(设置一个常量 reducetask 数量)

计算 reducer 数的公式很简单: N=min(参数 2,总输入数据量/参数 1) 通常情况下,有必要手动指定 reducer 个数。考虑到 map 阶段的输出数据量通常会比输入有 大幅减少,因此即使不设定 reducer 个数,重设参数 2 还是必要的。

依据 Hadoop 的经验,可以将参数 2 设定为 0.95*(集群中 datanode 个数)。

合并 MapReduce 操作

Multi-group by 是 Hive 的一个非常好的特性,它使得 Hive 中利用中间结果变得非常方便。 例如:

1
2
3
4
5
6
FROM (SELECT a.status, b.school, b.gender FROM status_updates a JOIN profiles b ON (a.userid =
b.userid and a.ds='2009-03-20' ) ) subq1
INSERT OVERWRITE TABLE gender_summary PARTITION(ds='2009-03-20')
SELECT subq1.gender, COUNT(1) GROUP BY subq1.gender
INSERT OVERWRITE TABLE school_summary PARTITION(ds='2009-03-20')
SELECT subq1.school, COUNT(1) GROUP BY subq1.school

上述查询语句使用了 multi-group by 特性连续 group by 了 2 次数据,使用不同的 group by key。 这一特性可以减少一次 MapReduce 操作

合理利用分桶:Bucketing 和 Sampling

Bucket 是指将数据以指定列的值为 key 进行 hash,hash 到指定数目的桶中。这样就可以支 持高效采样了。如下例就是以 userid 这一列为 bucket 的依据,共设置 32 个 buckets

1
2
3
4
5
6
7
8
9
10
11
CREATE TABLE page_view(viewTime INT, userid BIGINT,
page_url STRING, referrer_url STRING,
ip STRING COMMENT 'IP Address of the User')
COMMENT 'This is the page view table'
PARTITIONED BY(dt STRING, country STRING)
CLUSTERED BY(userid) SORTED BY(viewTime) INTO 32 BUCKETS
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '1'
COLLECTION ITEMS TERMINATED BY '2'
MAP KEYS TERMINATED BY '3'
STORED AS SEQUENCEFILE;

通常情况下,Sampling 在全体数据上进行采样,这样效率自然就低,它要去访问所有数据。 而如果一个表已经对某一列制作了 bucket,就可以采样所有桶中指定序号的某个桶,这就 减少了访问量。

如下例所示就是采样了 page_view 中 32 个桶中的第三个桶的全部数据:

SELECT * FROM page_view TABLESAMPLE(BUCKET 3 OUT OF 32);

如下例所示就是采样了 page_view 中 32 个桶中的第三个桶的一半数据:

SELECT * FROM page_view TABLESAMPLE(BUCKET 3 OUT OF 64);

合理利用分区:Partition

Partition 就是分区。分区通过在创建表时启用 partitioned by 实现,用来 partition 的维度并不 是实际数据的某一列,具体分区的标志是由插入内容时给定的。当要查询某一分区的内容时 可以采用 where 语句,形似 where tablename.partition_column = a 来实现。

Join 优化

总体原则:

  1. 优先过滤后再进行 Join 操作,最大限度的减少参与 join 的数据量
  2. 小表 join 大表,最好启动 mapjoin
  3. Join on 的条件相同的话,最好放入同一个 job,并且 join 表的排列顺序从小到大

在使用写有 Join 操作的查询语句时有一条原则:应该将条目少的表/子查询放在 Join 操作 符的左边。原因是在 Join 操作的 Reduce 阶段,位于 Join 操作符左边的表的内容会被加 载进内存,将条目少的表放在左边,可以有效减少发生 OOM 错误的几率。对于一条语句 中有多个 Join 的情况,如果 Join 的条件相同,比如查询

1
2
3
4
INSERT OVERWRITE TABLE pv_users
SELECT pv.pageid, u.age FROM page_view p
JOIN user u ON (pv.userid = u.userid)
JOIN newuser x ON (u.userid = x.userid);

如果 Join 的 key 相同,不管有多少个表,都会则会合并为一个 Map-Reduce 任务,而不 是”n”个,在做 OUTER JOIN 的时候也是一样

在编写 Join 查询语句时,如果确定是由于 join 出现的数据倾斜,那么请做如下设置:

1
2
set hive.skewjoin.key=100000; // 这个是 join 的键对应的记录条数超过这个值则会进行 分拆,值根据具体数据量设置
set hive.optimize.skewjoin=true; // 如果是 join 过程出现倾斜应该设置为 true

Group By 优化

  1. Map 端部分聚合
    并不是所有的聚合操作都需要在 Reduce 端完成,很多聚合操作都可以先在 Map 端进 行部分聚合,最后在 Reduce 端得出最终结果。
    MapReduce 的 combiner 组件参数包括:
1
2
set hive.map.aggr = true 是否在 Map 端进行聚合,默认为 True
set hive.groupby.mapaggr.checkinterval = 100000 在 Map 端进行聚合操作的条目数目
  1. 使用 Group By 有数据倾斜的时候进行负载均衡
    当 sql 语句使用 groupby 时数据出现倾斜时,如果该变量设置为 true,那么 Hive 会自动进行 负载均衡。策略就是把 MR 任务拆分成两个:第一个先做预汇总,第二个再做最终汇总
    在 MR 的第一个阶段中,Map 的输出结果集合会缓存到 maptaks 中,每个 Reduce 做部分聚 合操作,并输出结果,这样处理的结果是相同 Group By Key 有可能被分发到不同的 Reduce 中, 从而达到负载均衡的目的;第二个阶段 再根据预处理的数据结果按照 Group By Key 分布到 Reduce 中(这个过程可以保证相同的 Group By Key 被分布到同一个 Reduce 中),最后完成 最终的聚合操作。
1
set hive.groupby.skewindata = true

合理利用文件存储格式

创建表时,尽量使用 orc、parquet 这些列式存储格式,因为列式存储的表,每一列的数据在 物理上是存储在一起的,Hive 查询时会只遍历需要列数据,大大减少处理的数据量

本地模式执行 MapReduce

Hive 在集群上查询时,默认是在集群上 N 台机器上运行, 需要多个机器进行协调运行,这 个方式很好地解决了大数据量的查询问题。但是当 Hive 查询处理的数据量比较小时,其实 没有必要启动分布式模式去执行,因为以分布式方式执行就涉及到跨网络传输、多节点协调 等,并且消耗资源。这个时间可以只使用本地模式来执行 mapreduce job,只在一台机器上 执行,速度会很快。

并行化处理

一个 hive sql 语句可能会转为多个 mapreduce Job,每一个 job 就是一个 stage,这些 job 顺序 执行,这个在 cli 的运行日志中也可以看到。但是有时候这些任务之间并不是是相互依赖的, 如果集群资源允许的话,可以让多个并不相互依赖 stage 并发执行,这样就节约了时间,提 高了执行速度,但是如果集群资源匮乏时,启用并行化反倒是会导致各个 job 相互抢占资源 而导致整体执行性能的下降。启用并行化:

1
2
set hive.exec.parallel=true;
set hive.exec.parallel.thread.number=8; //同一个 sql 允许并行任务的最大线程数

设置压缩存储

Hive 最终是转为 MapReduce 程序来执行的,而 MapReduce 的性能瓶颈在于网络 IO 和 磁盘 IO,要解决性能瓶颈,最主要的是减少数据量,对数据进行压缩是个好的方式。压缩 虽然是减少了数据量,但是压缩过程要消耗 CPU 的,但是在 Hadoop 中, 往往性能瓶颈不 在于 CPU,CPU 压力并不大,所以压缩充分利用了比较空闲的 CPU

Kudu

基本架构

c91fddee385746f24c5b5d32d2e14b9e.png

  • Table(表):一张table是数据存储在kudu的位置。Table具有schema和全局有序的primary key(主键)。Table被分为很多段,也就是tablets.
  • Tablet (段):一个tablet是一张table连续的segment,与其他数据存储引擎或关系型数据的partition相似。Tablet存在副本机制,其中一个副本为leader tablet。任何副本都可以对读取进行服务,并且写入时需要在所有副本对应的tablet server之间达成一致性。
  • Tablet server:存储tablet和为tablet向client提供服务。对于给定的tablet,一个tablet server充当leader,其他tablet server充当该tablet的follower副本。只有leader服务写请求,leader与follower为每个服务提供读请求。
  • Master:主要用来管理元数据(元数据存储在只有一个tablet的catalog table中),即tablet与表的基本信息,监听tserver的状态
  • Catalog Table: 元数据表,用来存储table(schema、locations、states)与tablet(现有的tablet列表,每个tablet及其副本所处tserver,tablet当前状态以及开始和结束键)的信息。

存储结构

总的来说是LSM tree的结构

fad480e985bf9c221ac9abc779350957.png

  • 一个Table包含多个Tablet,其中Tablet的数量是根据hash或者range进行设置
  • 一个Tablet中包含MetaData信息和多个RowSet信息
  • 一个Rowset中包含一个MemRowSet与0个或多个DiskRowset,其中MemRowSet存储insert的数据,一旦MemRowSet写满会flush到磁盘生成一个或多个DiskRowSet,此时MemRowSet清空。MemRowSet默认写满1G或者120s flush一次
    (注意:memRowSet是行式存储,DiskRowSet是列式存储,MemRowSet基于primary key有序)。每隔tablet中会定期对一些diskrowset做compaction操作,目的是对多个diskRowSet进行重新排序,以此来使其更有序并减少diskRowSet的数量,同时在compaction的过程中慧慧resolve掉deltaStores当中的delete记录
  • 一个DiskRowSet包含baseData与DeltaStores两部分,其中baseData存储的数据看起来不可改变,DeltaStores中存储的是改变的数据
  • DeltaStores包含一个DeltaMemStores和多个DeltaFile,其中DeltaMemStores放在内存,用来存储update与delete数据,一旦DeltaMemStores写满,会flush成DeltaFile。
    当DeltaFile过多会影响查询性能,所以KUDU每隔一段时间会执行compaction操作,将其合并到baseData中,主要是resolve掉update数据。

写入

9d5f855fc12c783a5ee8f31a59d851e3.png

读取

2dbec1416df945ef084f33f5c913fb11.png

  1. 客户端master请求查询表指定数据
  2. master对请求进行校验,校验表是否存在,schema中是否存在指定查询的字段,主键是否存在
  3. master通过查询catalog Table返回表,将tablet对应的tserver信息、tserver状态等元数据信息返回给client
  4. client与tserver建立连接,通过metaData找到primary key对应的RowSet。
  5. 首先加载RowSet内存中MemRowSet与DeltMemStore中的数据
  6. 然后加载磁盘中的数据,也就是DiskRowSet中的BaseData与DeltFile中的数据
  7. 返回数据给Client
  8. 继续4-7步骤,直到拿到所有数据返回给client

插入

929bd87c11a7b1e1ceae82ecdbaf21a8.png

  1. client向master请求预写表的元数据信息
  2. master会进行一定的校验,表是否存在,字段是否存在等
  3. 如果master校验通过,则返回表的分区、tablet与其对应的tserver给client;如果校验失败则报错给client。
  4. client根据master返回的元数据信息,将请求发送给tablet对应的tserver.
  5. tserver首先会查询内存中MemRowSet与DeltMemStore中是否存在与待插入数据主键相同的数据,如果存在则报错
  6. tserver会讲写请求预写到WAL日志,用来server宕机后的恢复操作
  7. 将数据写入内存中的MemRowSet中,一旦MemRowSet的大小达到1G或120s后,MemRowSet会flush成一个或DiskRowSet,用来将数据持久化
  8. 返回client数据处理完毕

更新

35b115bef593e1394f30442dfc6cf6ff.png

  1. client向master请求预更新表的元数据,首先master会校验表是否存在,字段是否存在,如果校验通过则会返回给client表的分区. tablet. tablet所在tserver信息
  2. client向tserver发起更新请求
  3. 将更新操作预写如WAL日志,用来在server宕机后的数据恢复
  4. 根据tserver中待更新的数据所处位置的不同,有不同的处理方式:
    如果数据在内存中,则从MemRowSet中找到数据所处的行,然后在改行的mutation链表中写入更新信息,在MemRowSet flush的时候,将更新合并到baseData中
    如果数据在DiskRowSet中,则将更新信息写入DeltMemStore中,DeltMemStore达到一定大小后会flush成DeltFile。
  5. 更新完毕后返回消息给client。

使用场景

  • 流式实时计算场景
    流式计算场景通常有持续不断地大量写入,与此同时这些数据还要支持近乎实时的读、写以及更新操作。Kudu的设计能够很好的处理此场景。
  • 时间序列存储引擎(TSDB)
    Kudu的hash分片设计能够很好地避免TSDB类请求的局部热点问题。同时高效的Scan性能让Kudu能够比Hbase更好的支持查询操作。
  • 机器学习&数据挖掘
    机器学习和数据挖掘的中间结果往往需要高吞吐量的批量写入和读取,同时会有少量的随机读写操作。Kudu的设计可以很好地满足这些中间结果的存储需求。
  • 与历史遗产数据共存
    在工业界实际生产环境中,往往有大量的历史遗产数据。Impala可以同时支持HDFS、Kudu等多个底层存储引擎,这个特性使得在使用的Kudu的同时,不必把所有的数据都迁移到Kudu。
  • Kudu+Impala为实时数据仓库存储提供了良好的解决方案。这套架构在支持随机读写的同时还能保持良好的Scan性能,同时其对Spark等流式计算框架有官方的客户端支持。这些特性意味着数据可以从Spark实时计算中实时的写入Kudu,上层的Impala提供BI分析SQL查询,对于数据挖掘和算法等需求可以在Spark迭代计算框架上直接操作Kudu底层数据。