0%

Clickhouse介绍

特点

  • 真正的列式数据库管理系统
    除了数据本身外不存在其他额外的数据,避免了在值旁边存储它们的长度«number»

  • 完备的DBMS功能
    允许在运行时创建表和数据库、加载数据和运行查询,而无需重新配置或重启服务。

  • 数据压缩
    除了在磁盘空间和CPU消耗之间进行不同权衡的高效通用压缩编解码器之外,ClickHouse还提供针对特定类型数据的专用编解码器,这使得ClickHouse能够与更小的数据库(如时间序列数据库)竞争并超越它们。

  • 数据的磁盘存储
    ClickHouse被设计用于工作在传统磁盘上的系统,它提供每GB更低的存储成本,但如果可以使用SSD和内存,它也会合理的利用这些资源。

  • 多核心并行处理
    ClickHouse会使用服务器上一切可用的资源,从而以最自然的方式并行处理大型查询。

  • 多服务器分布式处理
    在ClickHouse中,数据可以保存在不同的shard上,每一个shard都由一组用于容错的replica组成,查询可以并行地在所有shard上进行处理。这些对用户来说是透明的

  • 支持SQL
    ClickHouse支持一种基于SQL的声明式查询语言,它在许多情况下与ANSI SQL标准相同。
    支持的查询GROUP BY, ORDER BY, FROM, JOIN, IN以及非相关子查询。
    相关(依赖性)子查询和窗口函数暂不受支持,但将来会被实现。

  • 向量引擎
    为了高效的使用CPU,数据不仅仅按列存储,同时还按向量(列的一部分)进行处理,这样可以更加高效地使用CPU。

  • 实时的数据更新
    ClickHouse支持在表中定义主键。为了使查询能够快速在主键中进行范围查找,数据总是以增量的方式有序的存储在MergeTree中。因此,数据可以持续不断地高效的写入到表中,并且写入的过程中不会存在任何加锁的行为。

  • 索引
    按照主键对数据进行排序,这将帮助ClickHouse在几十毫秒以内完成对数据特定值或范围的查找。

  • 支持近似计算
    ClickHouse提供各种各样在允许牺牲数据精度的情况下对查询进行加速的方法:

    1. 用于近似计算的各类聚合函数,如:distinct values, medians, quantiles
    2. 基于数据的部分样本进行近似查询。这时,仅会从磁盘检索少部分比例的数据。
    3. 不使用全部的聚合条件,通过随机选择有限个数据聚合条件进行聚合。这在数据聚合条件满足某些分布条件下,在提供相当准确的聚合结果的同时降低了计算资源的使用。
  • Adaptive Join Algorithm
    ClickHouse支持自定义JOIN多个表,它更倾向于散列连接算法,如果有多个大表,则使用合并-连接算法

  • 支持数据复制和数据完整性
    ClickHouse使用异步的多主复制技术。当数据被写入任何一个可用副本后,系统会在后台将数据分发给其他副本,以保证系统在不同副本上保持相同的数据。在大多数情况下ClickHouse能在故障后自动恢复,在一些少数的复杂情况下需要手动恢复。
    更多信息,参见 数据复制。

  • 表引擎
    MergeTree
    MergeTree这个名词是在我们耳熟能详的LSM Tree之上做减法而来——去掉了MemTable和Log

    • 适用于高负载,最强大的表引擎
    • 同构后续的后台数据处理,快速插入数据,然后应用规则在后台合并这些部分
    • 支持数据复制、分区、辅助数据跳过索引以及其他引擎不支持的其他功能

    Log

    • 轻量级引擎,功能最少。当您需要快速编写许多小表(最多约100万行)并在以后作为一个整体读取它们时,它们是最有效的。

    Integration Engines

    • 与其他数据存储和处理系统通信的引擎。

    Special Engines(特殊引擎,不知道如何分类,ClickHouse特有的)

  • 高阶函数
    Retention 留存分析
    windowFunnel 漏斗查询
    sequenceCount 关键链路分析

集群架构

ClickHouse 不同于 Elasticsearch、HDFS 这类主从架构的分布式系统,它采用多主(无中心)架构,集群中的每个节点角色对等,客户端访问任意一个节点都能得到相同的效果。

ClickHouse 借助分片将数据进行横向切分,而分片依赖集群,每个集群由 1 到多个分片组成,每个分片对应了 CH 的 1 个服务节点;分片数量的上限取决与节点数量(1 个分片只能对应 1 个服务节点)。

但是 ClickHouse 并不像其他分布式系统那样,拥有高度自动化的分片功能;CH 提供了本地表与分布式表的概念;一张本地表等同于一个数据分片。而分布式表是张逻辑表,本身不存储任何数据,它是本地表的访问代理,其作用类似分库中间件。借助分布式表,能够代理访问多个数据分片,从而实现分布式查询。当然,也可以在应用层实现数据分发。

ClickHouse 同时支持数据副本,其副本概念与 Elasticsearch 类似,但在 CH 中分片其实是一种逻辑概念,其物理承载是由副本承担的。

ClickHouse 的数据副本一般通过 ReplicatedMergeTree 复制表系列引擎实现,副本之间借助 ZooKeeper 实现数据的一致性。此外也可通过分布式表负责同时进行分片和副本的数据写入工作。

263edcd9-aad5-4eca-8e42-47ae63552133.png

存储组织结构

245f590d-0568-40a1-b279-045fabff5e21.png

分区目录

分区目录命名

多个分区目录组成一个分区,目录命名规则如下:

PartitionId_MinBlockNum_MaxBlockNum_Level

PartitionID:分区id,例如20210301。
MinBlockNum:最小分区块编号,自增类型,从1开始向上递增。每产生一个新的目录分区就向上递增一个数字。
MaxBlockNum:最大分区块编号,新创建的分区MinBlockNum等于MaxBlockNum的编号。
Level:合并的层级,被合并的次数。合并次数越多,层级值越大。

69e1520c-0368-47a1-9b4a-4e61807ad754.png

分区目录合并

c36bff66-8fbe-4b44-afe5-970e0533000a.png

数据

目录内文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
*── 200002_1_1_0
* *── Birthday.bin
* *── Birthday.mrk2
* *── checksums.txt
* *── columns.txt
* *── count.txt
* *── Id.bin
* *── Id.mrk2
* *── minmax_Birthday.idx
* *── Name.bin
* *── Name.mrk2
* *── partition.dat
* *── primary.idx
*── detached
*── format_version.txt
文件名 描述 作用
primary.idx 索引文件 用于存放稀疏索引
[Column].mrk2 标记文件 保存了bin文件中数据的偏移信息,用于建立primary.idx和[Column].bin文件之间的映射
[Column].bin 数据文件 存储数据,默认使用lz4压缩存储
partition.dat 分区文件 用于保存分区表达式生成的值
minmax_[Column].idx minmax索引 用于记录当前分区下分区字段的最小最大值
columns.txt 列信息文件 用于保存表的列字段信息
count.txt 计数文件 用于记录当前分区目录下数据的总行数
checksums.txt 校验文件 存放以上各个文件的size以及哈希值,用于快速校验文件的完整性

primary.idx结构

0cd44d48-2323-4aa9-9020-b22c51f58395.png

.mrk2

54e7d7ec-10af-4e22-9f93-dac77d4b9342.png

  1. Offset in compressed file,8 Bytes,代表该标记指向的压缩数据块在bin文件中的偏移量。
  2. Offset in decompressed block,8 Bytes,代表该标记指向的数据在解压数据块中的偏移量。
  3. Rows count,8 Bytes,行数,通常情况下其等于index_granularity。
    86c3005c-373c-4ca9-91ee-9f030707baf3.png

.bin

409a3098-cfe5-4da7-b410-7912e7532d91.png

  1. Checksum,16 Bytes,用于对后面的数据进行校验。
  2. Compression algorithm,1 Byte,默认是LZ4,编号为0x82。
  3. Compressed size,4 Bytes,其值等于Compression algorithm + Compressed size + Decompressed size + Compressed data的长度
  4. Decompressed size,4 Bytes,数据解压缩后的长度。
  5. Compressed data,压缩数据,长度为Compressed size - 9。

索引过程

12b5387a-cb0a-4955-882a-8c8fcb846f18.png
691d5682-433f-42f3-aac8-cb70d4c78a1e.png

二级索引(跳树索引)

一级索引决定了数据排布的方式,但是当满足最左前缀原则时,数据查询需要扫描全部文件,这时候可以借助二级索引进行加速。

  1. minmax
    minmax顾名思义就是和分区目录下的 minmax_{column_name}.idx 文件类似,只不过不再是只有一个min/max值,例如上面的minmax跳数索引 a,表示每隔 3 * index_granularity 的区间就会记录一次 u64 * i32 和 s 的最大最小值。当我们通过s查询数据时,可以先基于minmax值判断,从而跳过大多数不需要扫描的索引粒度。
  2. set(max_rows)
    保存指定表达式的去重值,尤其是对于那些重复性很高的列,例如性别、年龄段等,max_rows 参数表示在一个索引粒度内,最多记录不超过 max_rows 行,即不多于 max_rows 个去重值,max_rows=0 表示不限制。
  3. ngrambf_v1(n, size_of_bloom_filter_in_bytes, number_of_hash_functions, random_seed)
    存储一个包含数据块中所有 n元短语(ngram) 的布隆过滤器(Bloom filter)。对String, FixedString 和 Map类型数据有效,可用于优化 EQUALS, LIKE 和 IN表达式。

MergeTree

建表语句

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1] [TTL expr1],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2] [TTL expr2],
...
INDEX index_name1 expr1 TYPE type1(...) GRANULARITY value1,
INDEX index_name2 expr2 TYPE type2(...) GRANULARITY value2,
...
PROJECTION projection_name_1 (SELECT <COLUMN LIST EXPR> [GROUP BY] [ORDER BY]),
PROJECTION projection_name_2 (SELECT <COLUMN LIST EXPR> [GROUP BY] [ORDER BY])
) ENGINE = MergeTree()
ORDER BY expr,column1,cloumn2
[PARTITION BY expr]
[PRIMARY KEY expr]
[SAMPLE BY expr]
[TTL expr
[DELETE|TO DISK 'xxx'|TO VOLUME 'xxx' [, ...] ]
[WHERE conditions]
[GROUP BY key_expr [SET v1 = aggr_func(v1) [, v2 = aggr_func(v2) ...]] ] ]
[SETTINGS name=value, ...]

[必填选项]
ENGINE:引擎名字,MergeTree引擎无参数。
ORDER BY:排序键,可以由一列或多列组成,决定了数据以何种方式进行排序,例如ORDER BY(CounterID, EventDate)。如果没有显示指定PRIMARY KEY,那么将使用ORDER BY作为PRIMARY KEY。通常只指定ORDER BY即可。

[选填选项]
PARTITION BY:分区键,指明表中的数据以何种规则进行分区。分区是在一个表中通过指定的规则划分而成的逻辑数据集。分区可以按任意标准进行,如按月、按日或按事件类型。为了减少需要操作的数据,每个分区都是分开存储的。
PRIMARY KEY:主键,设置后会按照主键生成一级索引(primary.idx),数据会依据索引的设置进行排序,从而加速查询性能。默认情况下,PRIMARY KEY与ORDER BY设置相同,所以通常情况下直接使用ORDER BY设置来替代主键设置。
SAMPLE BY:数据采样设置,如果显示配置了该选项,那么主键配置中也应该包括此配置。例如 ORDER BY CounterID / EventDate / intHash32(UserID)、SAMPLE BY intHash32(UserID)。
TTL:数据存活时间,可以为某一字段列或者一整张表设置TTL,设置中必须包含Date或DateTime字段类型。如果设置在列上,那么会删除字段中过期的数据。如果设置的是表级的TTL,那么会删除表中过期的数据。如果设置了两种类型,那么按先到期的为准。例如,TTL createtime + INTERVAL 1 DAY,即一天后过期。使用场景包括定期删除数据,或者定期将数据进行归档。
index_granularity:索引间隔粒度。MergeTree索引为稀疏索引,每index_granularity个数据产生一条索引。index_granularity默认设置为8092。
enable_mixed_granularity_parts:是否启动index_granularity_bytes来控制索引粒度大小。
index_granularity_bytes:索引粒度,以字节为单位,默认10Mb。
merge_max_block_size:数据块合并最大记录个数,默认8192。
merge_with_ttl_timeout:合并频率最小时间间隔,默认1天。

ReplacingMergeTree

该引擎适合于经常要根据’主键’进行数据更新的数据(upsert),主键加引号是因为,其实是根据order by定义的字段而不是根据primary key的字段去重的.
更新是非实时的,只有在分区目录合并或者手动触发时会去重

SummingMergeTree,AggregatingMergeTree

预聚合表引擎,此引擎适合于要查询聚合结果而不关心明细数据的场景,比如查询的是每个人月的销量综合,而不是每一单的实际销量.
SummingMergeTree只能计算sum,AggregatingMergeTree是升级版,支持多种聚合算法。但是比较复杂,其插入数据必须要以 insert into select的方式不能是insert into values的形式,通常作为雾化表使用。
例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
CREATE TABLE mydatabase.amTest
(
`id` Int8,
`name` String,
`code` AggregateFunction(uniq, String),
`date` Date,
`score` AggregateFunction(sum, Int16),
`score2` AggregateFunction(avg, Int16)
)
ENGINE = AggregatingMergeTree
PARTITION BY date
ORDER BY (id, name);

CREATE MATERIALIZED VIEW amTest
ENGINE = AggregatingMergeTree
PARTITION BY date
primary key(id,name)
ORDER BY (id, name,sex)
AS SELECT
id,name,date,uniqState(code),sumState(score) as score,avgState(score) as score2
FROM xxx GROUP BY id,name;

CollapsingMergeTree,VersionedCollapsingMergeTree

这种引擎的适应场景在于有些时候需要删除数据或者更新数据。很少使用。

分布式

ClickHouse学习系列之四【副本&分片部署说明】

集群配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
 <remote_servers>
<ck_cluster>
<shard>
<weight>1</weight>
<internal_replication>true</internal_replication>
<replica>
<host>10.161.4.40</host>
<port>9000</port>
<user>default</user>
<password>clickhouse9z</password>
</replica>
</shard>
<shard>
<weight>1</weight>
<internal_replication>true</internal_replication>
<replica>
<host>10.161.4.41</host>
<port>9000</port>
<user>default</user>
<password>clickhouse9z</password>
</replica>
</shard>
<shard>
<weight>1</weight>
<internal_replication>true</internal_replication>
<replica>
<host>10.161.4.42</host>
<port>9000</port>
<user>default</user>
<password>clickhouse9z</password>
</replica>
</shard>
</ck_cluster>

</remote_servers>

ReplicatedMergeTree

1
2
3
4
5
6
7
8
9
10
11
12
13
CREATE TABLE IF NOT EXISTS data.event_local ON CLUSTER ck_cluster(
`event` String,
`user_id` Int64,
`distinct_id` String,
`date` Date,
`time` DateTime,
`timezone_offset` Float64,
`logdate` Date
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/event','{replica}')
PARTITION BY logdate
ORDER BY (event,user_id,receive_time)
sample by intHash32(user_id);

我们在创建表的时候指定了ReplicatedMergeTree(xxxx),里面传递了两个参数,我们对这两个参数一一描述

  • /clickhouse/tables/ 这一部分指定的是在ZK上创建的路径地址,可随意变换只要记得即可
  • {shard} 指的是分片的标志,同一个分片内的所有机器应该保持相同。建议使用使用的是集群名+分片名的配置也就是{layer}-{shard}
  • {replica} 参数建议在macros配置成机器的hostname,因为每台机器的hostname都是不一样的,因此就能确保每个表的识别符都是唯一的了

当使用ON CLUSTER创建表时,{shard}和{replica}clickhouse会自动填充,而且在集群内任意一台机器执行即可,不需要在所有机器执行

Distributed

分布式引擎本身不存储数据, 但可以在多个服务器上进行分布式查询。 读是自动并行的。读取时,远程服务器表的索引(如果有的话)会被使用。

例:event_local

1
2
3
CREATE TABLE IF NOT EXISTS data.event ON CLUSTER ck_cluster
AS data.event_local
ENGINE = Distributed(ck_cluster, data, event_local,int64hash(user_id));

GLOBAL & LOCAL

使用distributed表时,查询会下推到各个分片执行,多个结果再UNION到一起。如果在查询中使用子查询,通常要使用GLOBAL参数(GLOBAL IN/GLOBAL JOIN)来保证子查询数据的汇总。
ClickHouse分布式IN & JOIN 查询的避坑指南

#高阶函数
ClickHouse 高阶函数

数据导入

Datax

{
    "job":
    {
        "setting":
        {
            "speed":
            {
                "channel":10
            },
            "errorLimit":
            {
                "record": 0,
                "percentage": 0.02
            }
        },
        "content":
        [
            {
                "reader":
                {
                    "name": "hdfsreader",
                    "parameter":
                    {
                        "path": "/user/hive/warehouse/data.db/event/logdate=20220518",
                        "defaultFS": "hdfs://x.x.x.x:xxxx",
                        "column":
                        [
                           {"index": 0, "type": "string"}, {"index": 1, "type": "long"}, {"index": 2, "type": "string"}, {"index": 3, "type": "date"}, {"index": 4, "type": "date"} {"value": "20220518", "type": "string"}
                        ],
                        "fileType": "orc",
                        "encoding": "UTF-8",
                        "fieldDelimiter": ","
                    }
                },
                "writer":
                {
                    "name": "clickhousewriter",
                    "parameter":
                    {
                        "username": "default",
                        "password": "clickhouse9z",
                        "column":
                        [
                            "event",
                            "user_id",
                            "distinct_id",
                            "date",
                            "logdate"
                        ],
                        "connection":
                        [
                            {
                                "jdbcUrl": "jdbc:clickhouse://x.x.x.x:xxxx/data",
                                "table":
                                [
                                    "event"
                                ]
                            }
                        ]
                    }
                }
            }
        ]
    }
}