特点
真正的列式数据库管理系统 除了数据本身外不存在其他额外的数据,避免了在值旁边存储它们的长度«number»
完备的DBMS功能 允许在运行时创建表和数据库、加载数据和运行查询,而无需重新配置或重启服务。
数据压缩 除了在磁盘空间和CPU消耗之间进行不同权衡的高效通用压缩编解码器之外,ClickHouse还提供针对特定类型数据的专用编解码器,这使得ClickHouse能够与更小的数据库(如时间序列数据库)竞争并超越它们。
数据的磁盘存储 ClickHouse被设计用于工作在传统磁盘上的系统,它提供每GB更低的存储成本,但如果可以使用SSD和内存,它也会合理的利用这些资源。
多核心并行处理 ClickHouse会使用服务器上一切可用的资源,从而以最自然的方式并行处理大型查询。
多服务器分布式处理 在ClickHouse中,数据可以保存在不同的shard上,每一个shard都由一组用于容错的replica组成,查询可以并行地在所有shard上进行处理。这些对用户来说是透明的
支持SQL ClickHouse支持一种基于SQL的声明式查询语言 ,它在许多情况下与ANSI SQL标准 相同。 支持的查询GROUP BY, ORDER BY, FROM, JOIN, IN以及非相关子查询。 相关(依赖性)子查询和窗口函数暂不受支持,但将来会被实现。
向量引擎 为了高效的使用CPU,数据不仅仅按列存储,同时还按向量(列的一部分)进行处理,这样可以更加高效地使用CPU。
实时的数据更新 ClickHouse支持在表中定义主键。为了使查询能够快速在主键中进行范围查找,数据总是以增量的方式有序的存储在MergeTree中。因此,数据可以持续不断地高效的写入到表中,并且写入的过程中不会存在任何加锁的行为。
索引 按照主键对数据进行排序,这将帮助ClickHouse在几十毫秒以内完成对数据特定值或范围的查找。
支持近似计算 ClickHouse提供各种各样在允许牺牲数据精度的情况下对查询进行加速的方法:
用于近似计算的各类聚合函数,如:distinct values, medians, quantiles
基于数据的部分样本进行近似查询。这时,仅会从磁盘检索少部分比例的数据。
不使用全部的聚合条件,通过随机选择有限个数据聚合条件进行聚合。这在数据聚合条件满足某些分布条件下,在提供相当准确的聚合结果的同时降低了计算资源的使用。
Adaptive Join Algorithm ClickHouse支持自定义JOIN多个表,它更倾向于散列连接算法,如果有多个大表,则使用合并-连接算法
支持数据复制和数据完整性 ClickHouse使用异步的多主复制技术。当数据被写入任何一个可用副本后,系统会在后台将数据分发给其他副本,以保证系统在不同副本上保持相同的数据。在大多数情况下ClickHouse能在故障后自动恢复,在一些少数的复杂情况下需要手动恢复。 更多信息,参见 数据复制。
表引擎 MergeTree MergeTree这个名词是在我们耳熟能详的LSM Tree之上做减法而来——去掉了MemTable和Log
适用于高负载,最强大的表引擎
同构后续的后台数据处理,快速插入数据,然后应用规则在后台合并这些部分
支持数据复制、分区、辅助数据跳过索引以及其他引擎不支持的其他功能
Log
轻量级引擎,功能最少。当您需要快速编写许多小表(最多约100万行)并在以后作为一个整体读取它们时,它们是最有效的。
Integration Engines
Special Engines(特殊引擎,不知道如何分类,ClickHouse特有的)
高阶函数 Retention 留存分析 windowFunnel 漏斗查询 sequenceCount 关键链路分析
集群架构 ClickHouse 不同于 Elasticsearch、HDFS 这类主从架构的分布式系统,它采用多主(无中心)架构,集群中的每个节点角色对等,客户端访问任意一个节点都能得到相同的效果。
ClickHouse 借助分片将数据进行横向切分,而分片依赖集群,每个集群由 1 到多个分片组成,每个分片对应了 CH 的 1 个服务节点;分片数量的上限取决与节点数量(1 个分片只能对应 1 个服务节点 )。
但是 ClickHouse 并不像其他分布式系统那样,拥有高度自动化的分片功能;CH 提供了本地表与分布式表的概念;一张本地表等同于一个数据分片。而分布式表是张逻辑表,本身不存储任何数据,它是本地表的访问代理,其作用类似分库中间件。借助分布式表,能够代理访问多个数据分片,从而实现分布式查询。当然,也可以在应用层实现数据分发。
ClickHouse 同时支持数据副本,其副本概念与 Elasticsearch 类似,但在 CH 中分片其实是一种逻辑概念,其物理承载是由副本承担的。
ClickHouse 的数据副本一般通过 ReplicatedMergeTree 复制表系列引擎实现,副本之间借助 ZooKeeper 实现数据的一致性。此外也可通过分布式表负责同时进行分片和副本的数据写入工作。
存储组织结构
分区目录 分区目录命名 多个分区目录组成一个分区,目录命名规则如下:
PartitionId_MinBlockNum_MaxBlockNum_Level
PartitionID:分区id,例如20210301。 MinBlockNum:最小分区块编号,自增类型,从1开始向上递增。每产生一个新的目录分区就向上递增一个数字。 MaxBlockNum:最大分区块编号,新创建的分区MinBlockNum等于MaxBlockNum的编号。 Level:合并的层级,被合并的次数。合并次数越多,层级值越大。
分区目录合并
数据 目录内文件 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 *── 200002 _1_1_0 * *── Birthday.bin * *── Birthday.mrk2 * *── checksums.txt * *── columns .txt * *── count.txt * *── Id.bin * *── Id.mrk2 * *── minmax_Birthday.idx * *── Name.bin * *── Name.mrk2 * *── partition.dat * *── primary.idx *── detached *── format_version.txt
文件名
描述
作用
primary.idx
索引文件
用于存放稀疏索引
[Column].mrk2
标记文件
保存了bin文件中数据的偏移信息,用于建立primary.idx和[Column].bin文件之间的映射
[Column].bin
数据文件
存储数据,默认使用lz4压缩存储
partition.dat
分区文件
用于保存分区表达式生成的值
minmax_[Column].idx
minmax索引
用于记录当前分区下分区字段的最小最大值
columns.txt
列信息文件
用于保存表的列字段信息
count.txt
计数文件
用于记录当前分区目录下数据的总行数
checksums.txt
校验文件
存放以上各个文件的size以及哈希值,用于快速校验文件的完整性
primary.idx结构 0cd44d48-2323-4aa9-9020-b22c51f58395.png
.mrk2 54e7d7ec-10af-4e22-9f93-dac77d4b9342.png
Offset in compressed file,8 Bytes,代表该标记指向的压缩数据块在bin文件中的偏移量。
Offset in decompressed block,8 Bytes,代表该标记指向的数据在解压数据块中的偏移量。
Rows count,8 Bytes,行数,通常情况下其等于index_granularity。86c3005c-373c-4ca9-91ee-9f030707baf3.png
.bin 409a3098-cfe5-4da7-b410-7912e7532d91.png
Checksum ,16 Bytes,用于对后面的数据进行校验。
Compression algorithm ,1 Byte,默认是LZ4,编号为0x82。
Compressed size ,4 Bytes,其值等于Compression algorithm + Compressed size + Decompressed size + Compressed data的长度
Decompressed size ,4 Bytes,数据解压缩后的长度。
Compressed data ,压缩数据,长度为Compressed size - 9。
索引过程 12b5387a-cb0a-4955-882a-8c8fcb846f18.png 691d5682-433f-42f3-aac8-cb70d4c78a1e.png
二级索引(跳树索引) 一级索引决定了数据排布的方式,但是当满足最左前缀原则时,数据查询需要扫描全部文件,这时候可以借助二级索引进行加速。
minmax minmax顾名思义就是和分区目录下的 minmax_{column_name}.idx 文件类似,只不过不再是只有一个min/max值,例如上面的minmax跳数索引 a,表示每隔 3 * index_granularity 的区间就会记录一次 u64 * i32 和 s 的最大最小值。当我们通过s查询数据时,可以先基于minmax值判断,从而跳过大多数不需要扫描的索引粒度。
set(max_rows) 保存指定表达式的去重值,尤其是对于那些重复性很高的列,例如性别、年龄段等,max_rows 参数表示在一个索引粒度内,最多记录不超过 max_rows 行,即不多于 max_rows 个去重值,max_rows=0 表示不限制。
ngrambf_v1(n, size_of_bloom_filter_in_bytes, number_of_hash_functions, random_seed) 存储一个包含数据块中所有 n元短语(ngram) 的布隆过滤器(Bloom filter)。对String, FixedString 和 Map类型数据有效,可用于优化 EQUALS, LIKE 和 IN表达式。
MergeTree 建表语句 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 CREATE TABLE [IF NOT EXISTS ] [db.]table_name [ON CLUSTER cluster]( name1 [type1] [DEFAULT | MATERIALIZED| ALIAS expr1] [TTL expr1], name2 [type2] [DEFAULT | MATERIALIZED| ALIAS expr2] [TTL expr2], ... INDEX index_name1 expr1 TYPE type1(...) GRANULARITY value1, INDEX index_name2 expr2 TYPE type2(...) GRANULARITY value2, ... PROJECTION projection_name_1 (SELECT < COLUMN LIST EXPR> [GROUP BY ] [ORDER BY ]), PROJECTION projection_name_2 (SELECT < COLUMN LIST EXPR> [GROUP BY ] [ORDER BY ]) ) ENGINE = MergeTree() ORDER BY expr,column1,cloumn2[PARTITION BY expr] [PRIMARY KEY expr] [SAMPLE BY expr] [TTL expr [DELETE | TO DISK 'xxx' | TO VOLUME 'xxx' [, ...] ] [WHERE conditions] [GROUP BY key_expr [SET v1 = aggr_func(v1) [, v2 = aggr_func(v2) ...]] ] ] [SETTINGS name= value , ...]
[必填选项] ENGINE:引擎名字,MergeTree引擎无参数。 ORDER BY:排序键,可以由一列或多列组成,决定了数据以何种方式进行排序,例如ORDER BY(CounterID, EventDate)。如果没有显示指定PRIMARY KEY,那么将使用ORDER BY作为PRIMARY KEY。通常只指定ORDER BY即可。
[选填选项] PARTITION BY:分区键,指明表中的数据以何种规则进行分区。分区是在一个表中通过指定的规则划分而成的逻辑数据集。分区可以按任意标准进行,如按月、按日或按事件类型。为了减少需要操作的数据,每个分区都是分开存储的。 PRIMARY KEY:主键,设置后会按照主键生成一级索引(primary.idx),数据会依据索引的设置进行排序,从而加速查询性能。默认情况下,PRIMARY KEY与ORDER BY设置相同,所以通常情况下直接使用ORDER BY设置来替代主键设置。 SAMPLE BY:数据采样设置,如果显示配置了该选项,那么主键配置中也应该包括此配置。例如 ORDER BY CounterID / EventDate / intHash32(UserID)、SAMPLE BY intHash32(UserID)。 TTL:数据存活时间,可以为某一字段列或者一整张表设置TTL,设置中必须包含Date或DateTime字段类型。如果设置在列上,那么会删除字段中过期的数据。如果设置的是表级的TTL,那么会删除表中过期的数据。如果设置了两种类型,那么按先到期的为准。例如,TTL createtime + INTERVAL 1 DAY,即一天后过期。使用场景包括定期删除数据,或者定期将数据进行归档。 index_granularity:索引间隔粒度。MergeTree索引为稀疏索引,每index_granularity个数据产生一条索引。index_granularity默认设置为8092。 enable_mixed_granularity_parts:是否启动index_granularity_bytes来控制索引粒度大小。 index_granularity_bytes:索引粒度,以字节为单位,默认10Mb。 merge_max_block_size:数据块合并最大记录个数,默认8192。 merge_with_ttl_timeout:合并频率最小时间间隔,默认1天。
ReplacingMergeTree 该引擎适合于经常要根据’主键’进行数据更新的数据(upsert),主键加引号是因为,其实是根据order by定义的字段而不是根据primary key
的字段去重的. 更新是非实时的,只有在分区目录合并或者手动触发时会去重
SummingMergeTree,AggregatingMergeTree 预聚合表引擎,此引擎适合于要查询聚合结果而不关心明细数据的场景,比如查询的是每个人月的销量综合,而不是每一单的实际销量. SummingMergeTree只能计算sum,AggregatingMergeTree是升级版,支持多种聚合算法。但是比较复杂,其插入数据必须要以 insert into select的方式不能是insert into values的形式,通常作为雾化表使用。 例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 CREATE TABLE mydatabase.amTest( `id` Int8, `name` String, `code` AggregateFunction(uniq, String), `date ` Date , `score` AggregateFunction(sum, Int16), `score2` AggregateFunction(avg, Int16) ) ENGINE = AggregatingMergeTree PARTITION BY date ORDER BY (id, name);CREATE MATERIALIZED VIEW amTestENGINE = AggregatingMergeTree PARTITION BY date primary key(id,name)ORDER BY (id, name,sex)AS SELECT id,name,date ,uniqState(code),sumState(score) as score,avgState(score) as score2 FROM xxx GROUP BY id,name;
CollapsingMergeTree,VersionedCollapsingMergeTree 这种引擎的适应场景在于有些时候需要删除数据或者更新数据。很少使用。
分布式 ClickHouse学习系列之四【副本&分片部署说明】
集群配置 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 <remote_servers> <ck_cluster> <shard> <weight>1</weight> <internal_replication>true</internal_replication> <replica> <host>10.161.4.40</host> <port>9000</port> <user>default</user> <password>clickhouse9z</password> </replica> </shard> <shard> <weight>1</weight> <internal_replication>true</internal_replication> <replica> <host>10.161.4.41</host> <port>9000</port> <user>default</user> <password>clickhouse9z</password> </replica> </shard> <shard> <weight>1</weight> <internal_replication>true</internal_replication> <replica> <host>10.161.4.42</host> <port>9000</port> <user>default</user> <password>clickhouse9z</password> </replica> </shard> </ck_cluster> </remote_servers>
ReplicatedMergeTree 1 2 3 4 5 6 7 8 9 10 11 12 13 CREATE TABLE IF NOT EXISTS data.event_local ON CLUSTER ck_cluster( `event` String, `user_id` Int64, `distinct_id` String, `date ` Date , `time ` DateTime, `timezone_offset` Float64, `logdate` Date ) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/event' ,'{replica}' ) PARTITION BY logdateORDER BY (event,user_id,receive_time)sample by intHash32(user_id);
我们在创建表的时候指定了ReplicatedMergeTree(xxxx)
,里面传递了两个参数,我们对这两个参数一一描述
/clickhouse/tables/
这一部分指定的是在ZK上创建的路径地址,可随意变换只要记得即可
{shard}
指的是分片的标志,同一个分片内的所有机器应该保持相同。建议使用使用的是集群名+分片名的配置也就是{layer}-{shard}
{replica}
参数建议在macros
配置成机器的hostname,因为每台机器的hostname都是不一样的,因此就能确保每个表的识别符都是唯一的了
当使用ON CLUSTER创建表时,{shard}和{replica}clickhouse会自动填充,而且在集群内任意一台机器执行即可,不需要在所有机器执行
Distributed 分布式引擎本身不存储数据 , 但可以在多个服务器上进行分布式查询。 读是自动并行的。读取时,远程服务器表的索引(如果有的话)会被使用。
例:event_local
1 2 3 CREATE TABLE IF NOT EXISTS data.event ON CLUSTER ck_clusterAS data.event_localENGINE = Distributed(ck_cluster, data, event_local,int64hash(user_id));
GLOBAL & LOCAL 使用distributed表时,查询会下推到各个分片执行,多个结果再UNION到一起。如果在查询中使用子查询,通常要使用GLOBAL参数(GLOBAL IN/GLOBAL JOIN)来保证子查询数据的汇总。ClickHouse分布式IN & JOIN 查询的避坑指南
#高阶函数ClickHouse 高阶函数
数据导入 Datax
{
"job":
{
"setting":
{
"speed":
{
"channel":10
},
"errorLimit":
{
"record": 0,
"percentage": 0.02
}
},
"content":
[
{
"reader":
{
"name": "hdfsreader",
"parameter":
{
"path": "/user/hive/warehouse/data.db/event/logdate=20220518",
"defaultFS": "hdfs://x.x.x.x:xxxx",
"column":
[
{"index": 0, "type": "string"}, {"index": 1, "type": "long"}, {"index": 2, "type": "string"}, {"index": 3, "type": "date"}, {"index": 4, "type": "date"} {"value": "20220518", "type": "string"}
],
"fileType": "orc",
"encoding": "UTF-8",
"fieldDelimiter": ","
}
},
"writer":
{
"name": "clickhousewriter",
"parameter":
{
"username": "default",
"password": "clickhouse9z",
"column":
[
"event",
"user_id",
"distinct_id",
"date",
"logdate"
],
"connection":
[
{
"jdbcUrl": "jdbc:clickhouse://x.x.x.x:xxxx/data",
"table":
[
"event"
]
}
]
}
}
}
]
}
}