Learn about ClickHouse® engines, from MergeTree, Atomic Database to RocksDB.
Generally: the main engine in ClickHouse® is called MergeTree
. It allows to store and process data on one server and feel all the advantages of ClickHouse. Basic usage of MergeTree does not require any special configuration, and you can start using it ‘out of the box’.
But one server and one copy of data are not fault-tolerant - something can happen with the server itself, with datacenter availability, etc. So you need to have the replica(s) - i.e. server(s) with the same data and which can ‘substitute’ the original server at any moment.
To have an extra copy (replica) of your data you need to use ReplicatedMergeTree
engine. It can be used instead of MergeTree engine, and you can always upgrade from MergeTree to ReplicatedMergeTree (and downgrade back) if you need. To use that you need to have
ZooKeeper installed
and running. For tests, you can use one standalone Zookeeper instance, but for production usage, you should have zookeeper ensemble at least of 3 servers.
When you use ReplicatedMergeTree then the inserted data is copied automatically to all the replicas, but all the SELECTs are executed on the single server you have connected to. So you can have 5 replicas of your data, but if you will always connect to one replica - it will not ‘share’ / ‘balance’ that traffic automatically between all the replicas, one server will be loaded and the rest will generally do nothing. If you need that balancing of load between multiple replicas - you can use the internal ’loadbalancer’ mechanism which is provided by Distributed engine of ClickHouse. As an alternative in that scenario you can work without Distributed table
, but with some external load balancer that will balance the requests between several replicas according to your specific rules or preferences, or just cluster-aware client which will pick one of the servers for the query time.
The Distributed engine does not store any data, but it can ‘point’ to the same ReplicatedMergeTree/MergeTree table on multiple servers. To use Distributed engine you need to configure <cluster> settings in your ClickHouse server config file.
So let’s say you have 3 replicas of table my_replicated_data with ReplicatedMergeTree engine. You can create a table with Distributed engine called my_distributed_replicated_data which will ‘point’ to all of that 3 servers, and when you will select from that my_distributed_replicated_data table the select will be forwarded and executed on one of the replicas. So in that scenario, each replica will get 1/3 of requests (but each request still will be fully executed on one chosen replica).
All that is great, and will work well while one copy of your data is fitting on a single physical server, and can be processed by the resources of one server. When you have too much data to be stored/processed on one server - you need to use sharding (it’s just a way to split the data into smaller parts). Sharding is the mechanism also provided by Distributed engine.
With sharding data is divided into parts (shards) according to some sharding key. You can just use random distribution, so let’s say - throw a coin to decide on each of the servers the data should be stored, or you can use some ‘smarter’ sharding scheme, to make the data connected to the same subject (let’s say to the same customer) stored on one server, and to another subject on another. So in that case all the shards should be requested at the same time and later the ‘common’ result should be calculated.
In ClickHouse each shard works independently and process its part of data, inside each shard replication can work. And later to query all the shards at the same time and combine the final result - Distributed engine is used. So Distributed work as load balancer inside each shard, and can combine the data coming from different shards together to make the ‘common’ result.
You can use Distributed table for inserts, in that case, it will pass the data to one of the shards according to the sharding key. Or you can insert to the underlying table on one of the shards bypassing the Distributed table.
Short summary
start with MergeTree
to have several copies of data use ReplicatedMergeTree
if your data is too big to fit/ to process on one server - use sharding
to balance the load between replicas and to combine the result of selects from different shards - use Distributed table
.
P.S. Actually you can create replication without Zookeeper and ReplicatedMergeTree, just by using the Distributed table above MergeTree and internal_replication=false cluster setting, but in that case, there will be no guarantee that all the replicas will have 100% the same data, so I rather would not recommend that scenario.
In version 20.5, ClickHouse® first introduced database engine=Atomic.
Since version 20.10 it is a default database engine (before engine=Ordinary was used).
Those 2 database engine differs in a way how they store data on a filesystem, and engine Atomic allows to resolve some of the issues existed in engine=Ordinary.
engine=Atomic supports
non-blocking drop table / rename table
tables delete (&detach) async (wait for selects finish but invisible for new selects)
atomic drop table (all files / folders removed)
atomic table swap (table swap by “EXCHANGE TABLES t1 AND t2;”)
rename dictionary / rename database
unique automatic UUID paths in FS and ZK for Replicated
FAQ
Q. Data is not removed immediately
A. UseDROP TABLE t SYNC;
Or use parameter (user level) database_atomic_wait_for_drop_and_detach_synchronously:
It’s very important that the table will have the same UUID cluster-wide.
When the table is created using ON CLUSTER - all tables will get the same UUID automatically.
When it needs to be done manually (for example - you need to add one more replica), pick CREATE TABLE statement with UUID from one of the existing replicas.
setshow_table_uuid_in_table_create_qquery_if_not_nil=1;SHOWCREATETABLExxx;/* or SELECT create_table_query FROM system.tables WHERE ... */
Q. Should I use Atomic or Ordinary for new setups?
All things inside ClickHouse itself should work smoothly with Atomic.
But some external tools - backup tools, things involving other kinds of direct manipulations with ClickHouse files & folders may have issues with Atomic.
Ordinary layout on the filesystem is simpler. And the issues which address Atomic (lock-free renames, drops, atomic exchange of table) are not so critical in most cases.
Ordinary
Atomic
filesystem layout
very simple
more complicated
external tool support (like clickhouse-backup)
good / mature
good / mature
some DDL queries (DROP / RENAME) may
hang for a long time (waiting for some other things)
yes 👎
no 👍
Possibility to swap 2 tables
rename a to a_old, b to a,
a_old to b;
Operation is not atomic, and can break in the middle (while chances are low).
EXCHANGE TABLES t1 AND t2
Atomic, have no intermediate states.
uuid in zookeeper path
Not possible to use.
The typical pattern is to add version suffix to zookeeper path when you
need to create the new version of the same table.
You can use uuid in zookeeper paths. That requires some extra care when you expand the cluster, and makes zookeeper
paths harder to map to real table.
But allows to to do any kind of manipulations on tables (rename, recreate
with same name etc).
Materialized view without TO syntax
(!we recommend using TO syntax always!)
.inner.mv_name
The name is predictable, easy to match with MV.
.inner_id.{uuid}
The name is unpredictable, hard to match with MV (maybe problematic for
MV chains, and similar scenarios)
Implemented automatic conversion of database engine from Ordinary to Atomic (ClickHouse® Server 22.8+). Create empty convert_ordinary_to_atomic file in flags directory and all Ordinary databases will be converted automatically on next server start.
The conversion is not automatic between upgrades, you need to set the flag as explained below:
Warnings:
* Server has databases (for example `test`) with Ordinary engine, which was deprecated. To convert this database to the new Atomic engine, create a flag /var/lib/clickhouse/flags/convert_ordinary_to_atomic and make sure that ClickHouse has write permission for it.
Example: sudo touch '/var/lib/clickhouse/flags/convert_ordinary_to_atomic' && sudo chmod 666 '/var/lib/clickhouse/flags/convert_ordinary_to_atomic'
Don’t forget to remove detached parts from all Ordinary databases, or you can get the error:
│ 2025.01.28 11:34:57.510330 [ 7 ] {} <Error> Application: Code: 219. DB::Exception: Cannot drop: filesystem error: in remove: Directory not empty ["/var/lib/clickhouse/data/db/"]. Probably data │
│ base contain some detached tables or metadata leftovers from Ordinary engine. If you want to remove all data anyway, try to attach database back and drop it again with enabled force_remove_data_recursively_ │
1.1.2 - How to Convert Atomic to Ordinary
How to Convert Atomic to Ordinary
The following instructions are an example on how to convert a database with the Engine type Atomic to a database with the Engine type Ordinary.
Warning
That can be used only for simple schemas. Schemas with MATERIALIZED views will require extra manipulations.
DROPDATABASEIFEXISTSatomic_db;DROPDATABASEIFEXISTSordinary_db;CREATEDATABASEatomic_dbengine=Atomic;CREATEDATABASEordinary_dbengine=Ordinary;CREATETABLEatomic_db.xENGINE=MergeTreeORDERBYtuple()ASsystem.numbers;CREATEMATERIALIZEDVIEWatomic_db.x_mvENGINE=MergeTreeORDERBYtuple()ASSELECT*FROMatomic_db.x;CREATEMATERIALIZEDVIEWatomic_db.y_mvENGINE=MergeTreeORDERBYtuple()ASSELECT*FROMatomic_db.x;CREATETABLEatomic_db.zENGINE=MergeTreeORDERBYtuple()ASsystem.numbers;CREATEMATERIALIZEDVIEWatomic_db.z_mvTOatomic_db.zASSELECT*FROMatomic_db.x;INSERTINTOatomic_db.xSELECT*FROMnumbers(100);--- USE atomic_db;
---
--- Query id: 28af886d-a339-4e9c-979c-8bdcfb32fd95
---
--- ┌─name───────────────────────────────────────────┐
--- │ .inner_id.b7906fec-f4b2-455b-bf9b-2b18ca64842c │
--- │ .inner_id.bd32d79b-272d-4710-b5ad-bca78d09782f │
--- │ x │
--- │ x_mv │
--- │ y_mv │
--- │ z │
--- │ z_mv │
--- └────────────────────────────────────────────────┘
SELECTmv_storage.database,mv_storage.name,mv.database,mv.nameFROMsystem.tablesASmv_storageLEFTJOINsystem.tablesASmvONsubstring(mv_storage.name,11)=toString(mv.uuid)WHEREmv_storage.nameLIKE'.inner_id.%'ANDmv_storage.database='atomic_db';-- ┌─database──┬─name───────────────────────────────────────────┬─mv.database─┬─mv.name─┐
-- │ atomic_db │ .inner_id.81e1a67d-3d02-4b2a-be17-84d8626d2328 │ atomic_db │ y_mv │
-- │ atomic_db │ .inner_id.e428225c-982a-4859-919b-ba5026db101d │ atomic_db │ x_mv │
-- └───────────┴────────────────────────────────────────────────┴─────────────┴─────────┘
/* STEP 1: prepare rename statements, also to rename implicit mv storage table to explicit one */SELECTif(t.nameLIKE'.inner_id.%','RENAME TABLE `'||t.database||'`.`'||t.name||'` TO `ordinary_db`.`'||mv.name||'_storage`;','RENAME TABLE `'||t.database||'`.`'||t.name||'` TO `ordinary_db`.`'||t.name||'`;')FROMsystem.tablesastLEFTJOINsystem.tablesmvON(substring(t.name,11)=toString(mv.uuid)ANDt.database=mv.database)WHEREt.database='atomic_db'ANDt.engine<>'MaterializedView'FORMATTSVRaw;-- RENAME TABLE `atomic_db`.`.inner_id.b7906fec-f4b2-455b-bf9b-2b18ca64842c` TO `ordinary_db`.`y_mv_storage`;
-- RENAME TABLE `atomic_db`.`.inner_id.bd32d79b-272d-4710-b5ad-bca78d09782f` TO `ordinary_db`.`x_mv_storage`;
-- RENAME TABLE `atomic_db`.`x` TO `ordinary_db`.`x`;
-- RENAME TABLE `atomic_db`.`z` TO `ordinary_db`.`z`;
/* STEP 2: prepare statements to reattach MV */-- Can be done manually: pick existing MV definition (SHOW CREATE TABLE), and change it in the following way:
-- 1) add TO keyword 2) remove column names and engine settings after mv name
SELECTif(t.nameLIKE'.inner_id.%',replaceRegexpOne(mv.create_table_query,'^CREATE MATERIALIZED VIEW ([^ ]+) (.*? AS ','CREATE MATERIALIZED VIEW \\1 TO \\1_storage AS '),mv.create_table_query)FROMsystem.tablesasmvLEFTJOINsystem.tablestON(substring(t.name,11)=toString(mv.uuid)ANDt.database=mv.database)WHEREmv.database='atomic_db'ANDmv.engine='MaterializedView'FORMATTSVRaw;-- CREATE MATERIALIZED VIEW atomic_db.x_mv TO atomic_db.x_mv_storage AS SELECT * FROM atomic_db.x
-- CREATE MATERIALIZED VIEW atomic_db.y_mv TO atomic_db.y_mv_storage AS SELECT * FROM atomic_db.x
/* STEP 3: stop inserts, fire renames statements prepared at the step 1 (hint: use clickhouse-client -mn) */RENAME.../* STEP 4: ensure that only MaterializedView left in source db, and drop it. */SELECT*FROMsystem.tablesWHEREdatabase='atomic_db'andengine<>'MaterializedView';DROPDATABASEatomic_db;/* STEP 4. rename table to old name: */DETACHDATABASEordinary_db;-- rename files / folders:
mv/var/lib/clickhouse/metadata/ordinary_db.sql/var/lib/clickhouse/metadata/atomic_db.sqlvi/var/lib/clickhouse/metadata/atomic_db.sqlmv/var/lib/clickhouse/metadata/ordinary_db/var/lib/clickhouse/metadata/atomic_dbmv/var/lib/clickhouse/data/ordinary_db/var/lib/clickhouse/data/atomic_db-- attach database atomic_db;
ATTACHDATABASEatomic_db;/* STEP 5. restore MV using statements created on STEP 2 */
1.2 - EmbeddedRocksDB & dictionary
EmbeddedRocksDB & dictionary
RocksDB is faster than
MergeTree
on Key/Value queries because MergeTree primary key index is sparse. Probably it’s possible to speedup MergeTree by reducing index_granularity.
NVMe disk is used for the tests.
The main feature of RocksDB is instant updates. You can update a row instantly (microseconds):
partitionid is quite simple (it just comes from your partitioning key).
What are block_numbers?
DROP TABLE IF EXISTS part_names;
create table part_names (date Date, n UInt8, m UInt8) engine=MergeTree PARTITION BY toYYYYMM(date) ORDER BY n;
insert into part_names VALUES (now(), 0, 0);
select name, partition_id, min_block_number, max_block_number, level, data_version from system.parts where table = 'part_names' and active;
┌─name─────────┬─partition_id─┬─min_block_number─┬─max_block_number─┬─level─┬─data_version─┐
│ 202203_1_1_0 │ 202203 │ 1 │ 1 │ 0 │ 1 │
└──────────────┴──────────────┴──────────────────┴──────────────────┴───────┴──────────────┘
insert into part_names VALUES (now(), 0, 0);
select name, partition_id, min_block_number, max_block_number, level, data_version from system.parts where table = 'part_names' and active;
┌─name─────────┬─partition_id─┬─min_block_number─┬─max_block_number─┬─level─┬─data_version─┐
│ 202203_1_1_0 │ 202203 │ 1 │ 1 │ 0 │ 1 │
│ 202203_2_2_0 │ 202203 │ 2 │ 2 │ 0 │ 2 │
└──────────────┴──────────────┴──────────────────┴──────────────────┴───────┴──────────────┘
insert into part_names VALUES (now(), 0, 0);
select name, partition_id, min_block_number, max_block_number, level, data_version from system.parts where table = 'part_names' and active;
┌─name─────────┬─partition_id─┬─min_block_number─┬─max_block_number─┬─level─┬─data_version─┐
│ 202203_1_1_0 │ 202203 │ 1 │ 1 │ 0 │ 1 │
│ 202203_2_2_0 │ 202203 │ 2 │ 2 │ 0 │ 2 │
│ 202203_3_3_0 │ 202203 │ 3 │ 3 │ 0 │ 3 │
└──────────────┴──────────────┴──────────────────┴──────────────────┴───────┴──────────────┘
As you can see every insert creates a new incremental block_number which is written in part names both as <min_block_number> and <min_block_number>
(and the level is 0 meaning that the part was never merged).
Those block numbering works in the scope of partition (for Replicated table) or globally across all partition (for plain MergeTree table).
ClickHouse® always merge only continuous blocks . And new part names always refer to the minimum and maximum block numbers.
As you can see here - three parts (with block number 1,2,3) were merged and they formed the new part with name 1_3 as min/max block size.
Level get incremented.
Now even while previous (merged) parts still exists in filesystem for a while (as inactive) ClickHouse is smart enough to understand
that new part ‘covers’ same range of blocks as 3 parts of the prev ‘generation’
There might be a fifth section in the part name, data version.
1.3.3 - How to pick an ORDER BY / PRIMARY KEY / PARTITION BY for the MergeTree family table
Optimizing ClickHouse® MergeTree tables
Good order by usually has 3 to 5 columns, from lowest cardinal on the left (and the most important for filtering) to highest cardinal (and less important for filtering).
Practical approach to create a good ORDER BY for a table:
Pick the columns you use in filtering always
The most important for filtering and the lowest cardinal should be the left-most. Typically, it’s something like tenant_id
Next column is more cardinal, less important. It can be a rounded time sometimes, or site_id, or source_id, or group_id or something similar.
Repeat step 3 once again (or a few times)
If you already added all columns important for filtering and you’re still not addressing a single row with your pk - you can add more columns which can help to put similar records close to each other (to improve the compression)
If you have something like hierarchy / tree-like relations between the columns - put there the records from ‘root’ to ’leaves’ for example (continent, country, cityname). This way ClickHouse® can do a lookup by country/city even if the continent is not specified (it will just ‘check all continents’)
special variants of MergeTree may require special ORDER BY to make the record unique etc.
For timeseries
, it usually makes sense to put the timestamp as the latest column in ORDER BY, which helps with putting the same data nearby for better locality. There are only 2 major patterns for timestamps in ORDER BY: (…, toStartOf(Day|Hour|…)(timestamp), …, timestamp) and (…, timestamp). The first one is useful when you often query a small part of a table partition. (table partitioned by months, and you read only 1-4 days 90% of the time).
There are exceptions to the rule “low cordinality - first” related to compression ratio. For example, data with a lot of repeated attributes in rows (like clickstream), ordering by session_id will benefit compression and reduce disk read, while setting a low cardinality column (like event type) in the first place makes compression and overall query time worse.
Some examples of good ORDER BY:
ORDER BY (tenantid, site_id, utm_source, clientid, timestamp)
ORDER BY (site_id, toStartOfHour(timestamp), sessionid, timestamp )
PRIMARY KEY (site_id, toStartOfHour(timestamp), sessionid)
All dimensions go to ORDER BY, all metrics - outside of that.
The most important for filtering columns with the lowest cardinality should be the left-most.
If the number of dimensions is high, it typically makes sense to use a prefix of ORDER BY as a PRIMARY KEY to avoid polluting the sparse index.
Examples:
ORDER BY (tenant_id, hour, country_code, team_id, group_id, source_id)
PRIMARY KEY (tenant_id, hour, country_code, team_id)
For Replacing / Collapsing
You need to keep all ‘mutable’ columns outside of ORDER BY, and have some unique id (a base to collapse duplicates) inside.
Typically the right-most column is some row identifier. And it’s often not needed in sparse index (so PRIMARY KEY can be a prefix of ORDER BY)
The rest consideration are the same.
Examples:
ORDER BY (tenantid, site_id, eventid) -- utm_source is mutable, while tenantid, site_id is not
PRIMARY KEY (tenantid, site_id) -- eventid is not used for filtering, needed only for collapsing duplicates
Here for the filtering it will use the skipping index to select the parts WHERE col1 > xxx and the result won’t be need to be ordered because the ORDER BY in the query aligns with the ORDER BY in the table and the data is already ordered in disk. (FWIW, Alexander Zaitsev and Mikhail Filimonov wrote a great post on skipping indexes and how they work
for the Altinity blog.)
executeQuery: (from [::ffff:192.168.11.171]:39428, user: admin) SELECT * FROM order_test WHERE col1 > toDateTime('2020-10-01') ORDER BY col1,col2 FORMAT Null;(stage: Complete)ContextAccess (admin): Access granted: SELECT(col1, col2) ON tests.order_test
ContextAccess (admin): Access granted: SELECT(col1, col2) ON tests.order_test
InterpreterSelectQuery: FetchColumns -> Complete
tests.order_test (SelectExecutor): Key condition: (column 0 in [1601503201, +Inf))tests.order_test (SelectExecutor): MinMax index condition: (column 0 in [1601503201, +Inf))tests.order_test (SelectExecutor): Running binary search on index range for part 202010_367_545_8 (7612 marks)tests.order_test (SelectExecutor): Running binary search on index range for part 202010_549_729_12 (37 marks)tests.order_test (SelectExecutor): Running binary search on index range for part 202011_689_719_2 (1403 marks)tests.order_test (SelectExecutor): Running binary search on index range for part 202012_550_730_12 (3 marks)tests.order_test (SelectExecutor): Found (LEFT) boundary mark: 0tests.order_test (SelectExecutor): Found (LEFT) boundary mark: 0tests.order_test (SelectExecutor): Found (LEFT) boundary mark: 0tests.order_test (SelectExecutor): Found (RIGHT) boundary mark: 37tests.order_test (SelectExecutor): Found (RIGHT) boundary mark: 3tests.order_test (SelectExecutor): Found (RIGHT) boundary mark: 1403tests.order_test (SelectExecutor): Found continuous range in 11 steps
tests.order_test (SelectExecutor): Found continuous range in 3 steps
tests.order_test (SelectExecutor): Running binary search on index range for part 202011_728_728_0 (84 marks)tests.order_test (SelectExecutor): Found continuous range in 21 steps
tests.order_test (SelectExecutor): Running binary search on index range for part 202011_725_725_0 (128 marks)tests.order_test (SelectExecutor): Found (LEFT) boundary mark: 0tests.order_test (SelectExecutor): Found (LEFT) boundary mark: 0tests.order_test (SelectExecutor): Found (RIGHT) boundary mark: 84tests.order_test (SelectExecutor): Running binary search on index range for part 202011_722_722_0 (128 marks)tests.order_test (SelectExecutor): Found continuous range in 13 steps
tests.order_test (SelectExecutor): Found (RIGHT) boundary mark: 128tests.order_test (SelectExecutor): Found continuous range in 14 steps
tests.order_test (SelectExecutor): Running binary search on index range for part 202011_370_686_19 (5993 marks)tests.order_test (SelectExecutor): Found (LEFT) boundary mark: 0tests.order_test (SelectExecutor): Found (RIGHT) boundary mark: 5993tests.order_test (SelectExecutor): Found (LEFT) boundary mark: 0tests.order_test (SelectExecutor): Found continuous range in 25 steps
tests.order_test (SelectExecutor): Found (RIGHT) boundary mark: 128tests.order_test (SelectExecutor): Found continuous range in 14 steps
tests.order_test (SelectExecutor): Found (LEFT) boundary mark: 0tests.order_test (SelectExecutor): Found (RIGHT) boundary mark: 7612tests.order_test (SelectExecutor): Found continuous range in 25 steps
tests.order_test (SelectExecutor): Selected 8/9 parts by partition key, 8 parts by primary key, 15380/15380 marks by primary key, 15380 marks to read from 8 ranges
Ok.
0 rows in set. Elapsed: 0.649 sec. Processed 125.97 million rows, 629.86 MB (194.17 million rows/s., 970.84 MB/s.)
If we change the ORDER BY expression in the query, ClickHouse will need to retrieve the rows and reorder them:
As seen In the MergingSortedTransform message, the ORDER BY in the table definition is not aligned with the ORDER BY in the query, so ClickHouse has to reorder the resultset.
executeQuery: (from [::ffff:192.168.11.171]:39428, user: admin) SELECT * FROM order_test WHERE col1 > toDateTime('2020-10-01') ORDER BY col2,col1 FORMAT Null;(stage: Complete)ContextAccess (admin): Access granted: SELECT(col1, col2) ON tests.order_test
ContextAccess (admin): Access granted: SELECT(col1, col2) ON tests.order_test
InterpreterSelectQuery: FetchColumns -> Complete
tests.order_test (SelectExecutor): Key condition: (column 0 in [1601503201, +Inf))tests.order_test (SelectExecutor): MinMax index condition: (column 0 in [1601503201, +Inf))tests.order_test (SelectExecutor): Running binary search on index range for part 202010_367_545_8 (7612 marks)tests.order_test (SelectExecutor): Running binary search on index range for part 202012_550_730_12 (3 marks)tests.order_test (SelectExecutor): Found (LEFT) boundary mark: 0tests.order_test (SelectExecutor): Running binary search on index range for part 202011_725_725_0 (128 marks)tests.order_test (SelectExecutor): Found (RIGHT) boundary mark: 3tests.order_test (SelectExecutor): Running binary search on index range for part 202011_689_719_2 (1403 marks)tests.order_test (SelectExecutor): Running binary search on index range for part 202010_549_729_12 (37 marks)tests.order_test (SelectExecutor): Running binary search on index range for part 202011_728_728_0 (84 marks)tests.order_test (SelectExecutor): Found (LEFT) boundary mark: 0tests.order_test (SelectExecutor): Found continuous range in 3 steps
tests.order_test (SelectExecutor): Found (LEFT) boundary mark: 0tests.order_test (SelectExecutor): Found (LEFT) boundary mark: 0tests.order_test (SelectExecutor): Found (LEFT) boundary mark: 0tests.order_test (SelectExecutor): Running binary search on index range for part 202011_722_722_0 (128 marks)tests.order_test (SelectExecutor): Found (RIGHT) boundary mark: 7612tests.order_test (SelectExecutor): Found (RIGHT) boundary mark: 37tests.order_test (SelectExecutor): Found (LEFT) boundary mark: 0tests.order_test (SelectExecutor): Found continuous range in 11 steps
tests.order_test (SelectExecutor): Found (RIGHT) boundary mark: 1403tests.order_test (SelectExecutor): Found (RIGHT) boundary mark: 84tests.order_test (SelectExecutor): Found continuous range in 25 steps
tests.order_test (SelectExecutor): Running binary search on index range for part 202011_370_686_19 (5993 marks)tests.order_test (SelectExecutor): Found continuous range in 21 steps
tests.order_test (SelectExecutor): Found (RIGHT) boundary mark: 128tests.order_test (SelectExecutor): Found continuous range in 13 steps
tests.order_test (SelectExecutor): Found (LEFT) boundary mark: 0tests.order_test (SelectExecutor): Found continuous range in 14 steps
tests.order_test (SelectExecutor): Found (RIGHT) boundary mark: 128tests.order_test (SelectExecutor): Found (LEFT) boundary mark: 0tests.order_test (SelectExecutor): Found continuous range in 14 steps
tests.order_test (SelectExecutor): Found (RIGHT) boundary mark: 5993tests.order_test (SelectExecutor): Found continuous range in 25 steps
tests.order_test (SelectExecutor): Selected 8/9 parts by partition key, 8 parts by primary key, 15380/15380 marks by primary key, 15380 marks to read from 8 ranges
tests.order_test (SelectExecutor): MergingSortedTransform: Merge sorted 1947 blocks, 125972070 rows in 1.423973879 sec., 88465155.05499662 rows/sec., 423.78 MiB/sec
Ok.
0 rows in set. Elapsed: 1.424 sec. Processed 125.97 million rows, 629.86 MB (88.46 million rows/s., 442.28 MB/s.)
PARTITION BY
Things to consider:
Good size for single partition is something like 1-300Gb.
For Summing/Replacing a bit smaller (400Mb-40Gb)
Better to avoid touching more that few dozens of partitions with typical SELECT query.
Single insert should bring data to one or few partitions.
The number of partitions in table - dozen or hundreds, not thousands.
The size of partitions you can check in system.parts table.
Examples:
-- for time-series:
PARTITION BY toYear(timestamp) -- long retention, not too much data
PARTITION BY toYYYYMM(timestamp) --
PARTITION BY toMonday(timestamp) --
PARTITION BY toDate(timestamp) --
PARTITION BY toStartOfHour(timestamp) -- short retention, lot of data
-- for table with some incremental (non time-bounded) counter
PARTITION BY intDiv(transaction_id, 1000000)
-- for some dimention tables (always requested with WHERE userid)
PARTITION BY userid % 16
For the small tables (smaller than few gigabytes) partitioning is usually not needed at all (just skip PARTITION BY expression when you create the table).
CREATE TABLE test_last
(
`col1` Int32,
`col2` SimpleAggregateFunction(anyLast, Nullable(DateTime)),
`col3` SimpleAggregateFunction(anyLast, Nullable(DateTime))
)
ENGINE = AggregatingMergeTree
ORDER BY col1
Ok.
0 rows in set. Elapsed: 0.003 sec.
INSERT INTO test_last (col1, col2) VALUES (1, now());
Ok.
1 rows in set. Elapsed: 0.014 sec.
INSERT INTO test_last (col1, col3) VALUES (1, now())
Ok.
1 rows in set. Elapsed: 0.006 sec.
SELECT
col1,
anyLast(col2),
anyLast(col3)
FROM test_last
GROUP BY col1
┌─col1─┬───────anyLast(col2)─┬───────anyLast(col3)─┐
│ 1 │ 2020-01-16 20:57:46 │ 2020-01-16 20:57:51 │
└──────┴─────────────────────┴─────────────────────┘
1 rows in set. Elapsed: 0.005 sec.
SELECT *
FROM test_last
FINAL
┌─col1─┬────────────────col2─┬────────────────col3─┐
│ 1 │ 2020-01-16 20:57:46 │ 2020-01-16 20:57:51 │
└──────┴─────────────────────┴─────────────────────┘
1 rows in set. Elapsed: 0.003 sec.
Merge two data streams
Q. I have 2 Kafka topics from which I am storing events into 2 different tables (A and B) having the same unique ID. I want to create a single table that combines the data in tables A and B into one table C. The problem is that data is received asynchronously and not all the data is available when a row arrives in Table A or vice-versa.
A. You can use AggregatingMergeTree with Nullable columns and any aggregation function or Non-Nullable column and max aggregation function if it is acceptable for your data.
CREATE TABLE table_C (
id Int64,
colA SimpleAggregatingFunction(any,Nullable(UInt32)),
colB SimpleAggregatingFunction(max, String)
) ENGINE = AggregatingMergeTree()
ORDER BY id;
CREATE MATERIALIZED VIEW mv_A TO table_C AS
SELECT id,colA FROM Kafka_A;
CREATE MATERIALIZED VIEW mv_B TO table_C AS
SELECT id,colB FROM Kafka_B;
Schema (especially compression codecs, some bad types, sorting order…)
Horizontal vs Vertical merge
Horizontal = reads all columns at once, do merge sort, write new part
Vertical = first read columns from order by, do merge sort, write them to disk, remember permutation, then process the rest of columns on by one, applying permutation.
compact vs wide parts
Other things like server load, concurrent merges…
Horizontal merge used by default, will use more memory if there are more than 80 columns in the table
OPTIMIZE TABLE example FINAL DEDUPLICATE BY expr
When using
deduplicate
feature in OPTIMIZE FINAL, the question is which row will remain and won’t be deduped?
For SELECT operations ClickHouse® does not guarantee the order of the resultset unless you specify ORDER BY. This random ordering is affected by different parameters, like for example max_threads.
In a merge operation ClickHouse reads rows sequentially in storage order, which is determined by ORDER BY specified in CREATE TABLE statement, and only the first unique row in that order survives deduplication. So it is a bit different from how SELECT actually works. As FINAL clause is used then ClickHouse will merge all rows across all partitions (If it is not specified then the merge operation will be done per partition), and so the first unique row of the first partition will survive deduplication. Merges are single-threaded because it is too complicated to apply merge ops in-parallel, and it generally makes no sense.
ReplacingMergeTree
is a powerful ClickHouse® MergeTree engine. It is one of the techniques that can be used to guarantee unicity or exactly once delivery in ClickHouse.
General Operations
Engine Parameters
Engine = ReplacingMergeTree([version_column],[is_deleted_column])
ORDER BY <list_of_columns>
ORDER BY – The ORDER BY defines the columns that need to be unique at merge time. Since merge time can not be decided most of the time, the FINAL keyword is required to remove duplicates.
version_column – An monotonically increasing number, which can be based on a timestamp. Used for make sure sure updates are executed in a right order.
UPDATE – INSERT INTO t(..., _version) values (...), insert with incremented version
DELETE – INSERT INTO t(..., _version, is_deleted) values(..., 1)
FINAL
ClickHouse does not guarantee that merge will fire and replace rows using ReplacingMergeTree logic. FINAL keyword should be used in order to apply merge in a query time. It works reasonably fast when PK filter is used, but maybe slow for SELECT * type of queries:
ClickHouse merge parts only in scope of single partition, so if two rows with the same replacing key would land in different partitions, they would never be merged in single row. FINAL keyword works in other way, it merge all rows across all partitions. But that behavior can be changed viado_not_merge_across_partitions_select_final setting.
-- GROUP BY
SELECTkey,argMax(val_1,ts)asval_1,argMax(val_2,ts)asval_2,argMax(val_3,ts)asval_3,argMax(val_4,ts)asval_4,argMax(val_5,ts)asval_5,max(ts)FROMrepl_tblWHEREkey=10GROUPBYkey;1rowinset.Elapsed:0.008sec.-- ORDER BY LIMIT BY
SELECT*FROMrepl_tblWHEREkey=10ORDERBYtsDESCLIMIT1BYkey;1rowinset.Elapsed:0.006sec.-- Subquery
SELECT*FROMrepl_tblWHEREkey=10ANDts=(SELECTmax(ts)FROMrepl_tblWHEREkey=10);1rowinset.Elapsed:0.009sec.-- FINAL
SELECT*FROMrepl_tblFINALWHEREkey=10;1rowinset.Elapsed:0.008sec.
Multiple keys
-- GROUP BY
SELECTkey,argMax(val_1,ts)asval_1,argMax(val_2,ts)asval_2,argMax(val_3,ts)asval_3,argMax(val_4,ts)asval_4,argMax(val_5,ts)asval_5,max(ts)FROMrepl_tblWHEREkeyIN(SELECTtoUInt32(number)FROMnumbers(1000000)WHEREnumber%100)GROUPBYkeyFORMATNull;Peakmemoryusage(forquery):2.19GiB.0rowsinset.Elapsed:1.043sec.Processed5.08millionrows,524.38MB(4.87millionrows/s.,502.64MB/s.)-- SET optimize_aggregation_in_order=1;
Peakmemoryusage(forquery):349.94MiB.0rowsinset.Elapsed:0.901sec.Processed4.94millionrows,506.55MB(5.48millionrows/s.,562.17MB/s.)-- ORDER BY LIMIT BY
SELECT*FROMrepl_tblWHEREkeyIN(SELECTtoUInt32(number)FROMnumbers(1000000)WHEREnumber%100)ORDERBYtsDESCLIMIT1BYkeyFORMATNull;Peakmemoryusage(forquery):1.12GiB.0rowsinset.Elapsed:1.171sec.Processed5.08millionrows,524.38MB(4.34millionrows/s.,447.95MB/s.)-- Subquery
SELECT*FROMrepl_tblWHERE(key,ts)IN(SELECTkey,max(ts)FROMrepl_tblWHEREkeyIN(SELECTtoUInt32(number)FROMnumbers(1000000)WHEREnumber%100)GROUPBYkey)FORMATNull;Peakmemoryusage(forquery):197.30MiB.0rowsinset.Elapsed:0.484sec.Processed8.72millionrows,507.33MB(18.04millionrows/s.,1.05GB/s.)-- SET optimize_aggregation_in_order=1;
Peakmemoryusage(forquery):171.93MiB.0rowsinset.Elapsed:0.465sec.Processed8.59millionrows,490.55MB(18.46millionrows/s.,1.05GB/s.)-- FINAL
SELECT*FROMrepl_tblFINALWHEREkeyIN(SELECTtoUInt32(number)FROMnumbers(1000000)WHEREnumber%100)FORMATNull;Peakmemoryusage(forquery):537.13MiB.0rowsinset.Elapsed:0.357sec.Processed4.39millionrows,436.28MB(12.28millionrows/s.,1.22GB/s.)
Full table
-- GROUP BY
SELECTkey,argMax(val_1,ts)asval_1,argMax(val_2,ts)asval_2,argMax(val_3,ts)asval_3,argMax(val_4,ts)asval_4,argMax(val_5,ts)asval_5,max(ts)FROMrepl_tblGROUPBYkeyFORMATNull;Peakmemoryusage(forquery):16.08GiB.0rowsinset.Elapsed:11.600sec.Processed40.00millionrows,5.12GB(3.45millionrows/s.,441.49MB/s.)-- SET optimize_aggregation_in_order=1;
Peakmemoryusage(forquery):865.76MiB.0rowsinset.Elapsed:9.677sec.Processed39.82millionrows,5.10GB(4.12millionrows/s.,526.89MB/s.)-- ORDER BY LIMIT BY
SELECT*FROMrepl_tblORDERBYtsDESCLIMIT1BYkeyFORMATNull;Peakmemoryusage(forquery):8.39GiB.0rowsinset.Elapsed:14.489sec.Processed40.00millionrows,5.12GB(2.76millionrows/s.,353.45MB/s.)-- Subquery
SELECT*FROMrepl_tblWHERE(key,ts)IN(SELECTkey,max(ts)FROMrepl_tblGROUPBYkey)FORMATNull;Peakmemoryusage(forquery):2.40GiB.0rowsinset.Elapsed:5.225sec.Processed79.65millionrows,5.40GB(15.24millionrows/s.,1.03GB/s.)-- SET optimize_aggregation_in_order=1;
Peakmemoryusage(forquery):924.39MiB.0rowsinset.Elapsed:4.126sec.Processed79.67millionrows,5.40GB(19.31millionrows/s.,1.31GB/s.)-- FINAL
SELECT*FROMrepl_tblFINALFORMATNull;Peakmemoryusage(forquery):834.09MiB.0rowsinset.Elapsed:2.314sec.Processed38.80millionrows,4.97GB(16.77millionrows/s.,2.15GB/s.)
1.3.8.1 - ReplacingMergeTree does not collapse duplicates
ReplacingMergeTree does not collapse duplicates
Hi there, I have a question about replacing merge trees. I have set up a
Materialized View
with ReplacingMergeTree table, but even if I call optimize on it, the parts don’t get merged. I filled that table yesterday, nothing happened since then. What should I do?
Merges are eventual and may never happen. It depends on the number of inserts that happened after, the number of parts in the partition, size of parts.
If the total size of input parts are greater than the maximum part size then they will never be merged.
in non-regular (Replicated)MergeTree tables over non ORDER BY columns. ClickHouse® applies index condition on the first step of query execution, so it’s possible to get outdated rows.
--(1) create test table
droptableifexiststest;createtabletest(versionUInt32,idUInt32,stateUInt8,INDEXstate_idx(state)typeset(0)GRANULARITY1)ENGINEReplacingMergeTree(version)ORDERBY(id);--(2) insert sample data
INSERTINTOtest(version,id,state)VALUES(1,1,1);INSERTINTOtest(version,id,state)VALUES(2,1,0);INSERTINTOtest(version,id,state)VALUES(3,1,1);--(3) check the result:
-- expected 3, 1, 1
selectversion,id,statefromtestfinal;┌─version─┬─id─┬─state─┐│3│1│1│└─────────┴────┴───────┘-- expected empty result
selectversion,id,statefromtestfinalwherestate=0;┌─version─┬─id─┬─state─┐│2│1│0│└─────────┴────┴───────┘
1.3.10 - SummingMergeTree
SummingMergeTree
Nested structures
In certain conditions it could make sense to collapse one of dimensions to set of arrays. It’s usually profitable to do if this dimension is not commonly used in queries. It would reduce amount of rows in aggregated table and
speed up queries
which doesn’t care about this dimension in exchange of aggregation performance by collapsed dimension.
How to aggregate mutating event stream with duplicates
Challenges with mutated data
When you have an incoming event stream with duplicates, updates, and deletes, building a consistent row state inside the ClickHouse® table is a big challenge.
The UPDATE/DELETE approach in the OLTP world won’t help with OLAP databases tuned to handle big batches. UPDATE/DELETE operations in ClickHouse are executed as “mutations,” rewriting a lot of data and being relatively slow. You can’t run such operations very often, as for OLTP databases. But the UPSERT operation (insert and replace) runs fast with the ReplacingMergeTree Engine. It’s even set as the default mode for INSERT without any special keyword. We can emulate UPDATE (or even DELETE) with the UPSERT operation.
There are a lot of blog posts
on how to use ReplacingMergeTree Engine to handle mutated data streams. A properly designed table schema with ReplacingMergeTree Engine is a good instrument for building the DWH Dimensions table. But when maintaining metrics in Fact tables, there are several problems:
it’s not possible to use a valuable ClickHouse feature - online aggregation of incoming data by Materialized Views or Projections on top of the ReplacingMT table, because duplicates and updates will not be deduplicated by the engine during inserts, and calculated aggregates (like sum or count) will be incorrect. For significant amounts of data, it’s become critical because aggregating raw data during report queries will take too much time.
unfinished support for DELETEs. While in the newest versions of ClickHouse, it’s possible to add the is_deleted to ReplacingMergeTree parameters, the necessity of manually filtering out deleted rows after FINAL processing makes that feature less useful.
Mutated data should be localized to the same partition. If the “replacing” row is saved to a partition different from the previous one, the report query will be much slower or produce unexpected results.
-- multiple partitions problem
CREATETABLERMT(`key`Int64,`someCol`String,`eventTime`DateTime)ENGINE=ReplacingMergeTree()PARTITIONBYtoYYYYMM(eventTime)ORDERBYkey;INSERTINTORMTValues(1,'first','2024-04-25T10:16:21');INSERTINTORMTValues(1,'second','2024-05-02T08:36:59');withmergedas(select*fromRMTFINAL)select*frommergedwhereeventTime<'2024-05-01'
You will get a row with ‘first’, not an empty set, as one might expect with the FINAL processing of a whole table.
Collapsing
ClickHouse has other table engines, such as CollapsingMergeTree and VersionedCollapsingMergeTree, that can be used even better for UPSERT operation.
Both work by inserting a “rollback row” to compensate for the previous insert. The difference between CollapsingMergeTree and VersionedCollapsingMergeTree is in the algorithm of collapsing. For Cluster configurations, it’s essential to understand which row came first and who should replace whom. That is why using ReplicatedVersionedCollapsingMergeTree is mandatory for Replicated Clusters.
When dealing with such complicated data streams, it needs to be solved 3 tasks simultaneously:
remove duplicates
process updates and deletes
calculate correct aggregates
It’s essential to understand how the collapsing algorithm of VersionedCollapsingMergeTree works. Quote from the documentation
:
When ClickHouse merges data parts, it deletes each pair of rows that have the same primary key and version and different Sign. The order of rows does not matter.
The version column should increase over time. You may use a natural timestamp for that. Random-generated IDs are not suitable for the version column.
Replace data in another partition
Let’s first fix the problem with mutated data in a different partition.
CREATETABLEVCMT(keyInt64,someColString,eventTimeDateTime,signInt8)ENGINE=VersionedCollapsingMergeTree(sign,eventTime)PARTITIONBYtoYYYYMM(eventTime)ORDERBYkey;INSERTINTOVCMTValues(1,'first','2024-04-25 10:16:21',1);INSERTINTOVCMTValues(1,'first','2024-04-25 10:16:21',-1),(1,'second','2024-05-02 08:36:59',1);setdo_not_merge_across_partitions_select_final=1;-- for fast FINAL
select'no rows after:';withmergedas(select*fromVCMTFINAL)select*frommergedwhereeventTime<'2024-05-01';
With VersionedCollapsingMergeTree, we can use more partition strategies, even with columns not tied to the row’s primary key. This could facilitate the creation of faster queries, more convenient TTLs (Time-To-Live), and backups.
Row deduplication
There are several ways to remove duplicates from the event stream. The most effective feature is block deduplication, which occurs when ClickHouse drops incoming blocks with the same checksum (or tag). However, this requires building a smart ingestor capable of saving positions in a transactional manner.
However, another method is possible: verifying whether a particular row already exists in the destination table to avoid redundant insertions. Together with block deduplication, that method also avoids using ReplacingMergeTree and FINAL during query time.
Ensuring accuracy and consistency in results requires executing this process on a single thread within one cluster node. This method is particularly suitable for less active event streams, such as those with up to 100,000 events per second. To boost performance, incoming streams should be segmented into several partitions (or ‘shards’) based on the table/event’s Primary Key, with each partition processed on a single thread.
use Null table and MatView to be able to access both the insert block and the dest table
check the existence of IDs in the destination table with a fast index scan by a primary key using the IN operator
filter existing rows from insert block by NOT IN operator
In most cases, the insert block does not have too many rows (like 1000-100k), so checking the destination table for their existence by scanning the Primary Key (residing in memory) won’t take much time. However, due to the high table index granularity, it can still be noticeable on high load. To enhance performance, consider reducing index granularity to 4096 (from the default 8192) or even fewer values.
Getting old row
To process updates in CollapsingMergeTree, the ’last row state’ must be known before inserting the ‘compensation row.’ Sometimes, this is possible - CDC events coming from MySQL’s binlog or Postgres’s WAL contain not only ’new’ data but also ‘old’ values. If one of the columns includes a sequence-generated version or timestamp of the row’s update time, it can be used as the row’s ‘version’ for VersionedCollapsingMergeTree. When the incoming event stream lacks old metric values and suitable version information, we can retrieve that data by examining the ClickHouse table using the same method used for row deduplication in the previous example.
I read more data from the Example2 table than from Example1. Instead of simply checking the row existence by the IN operator, a JOIN with existing rows is used to build a “compensate row.”
For UPSERT, the collapsing algorithm requires inserting two rows. So, I need to create two rows from any row that is found in the local table. It´s an essential part of the suggested approach, which allows me to produce proper rows for inserting with a human-readable code with clear if() statements. That is why I execute arrayJoin while reading old data.
Don’t try to run the code above. It’s just a short explanation of the idea, lacking many needed elements.
UPSERT by Collapsing
Here is a more realistic example
with more checks that can be played with:
createtableExample3(idInt32,metric1UInt32,metric2UInt32,_versionUInt64,signInt8default1)engine=VersionedCollapsingMergeTree(sign,_version)ORDERBYid;createtableStageengine=NullasExample3;creatematerializedviewExample3TransformtoExample3aswith__newas(SELECT*FROMStageorderby_versiondesc,signdesclimit1byid),__oldAS(SELECT*,arrayJoin([-1,1])AS_signfrom(select*FROMExample3finalPREWHEREidIN(SELECTidFROM__new)wheresign=1))selectid,if(__old._sign=-1,__old.metric1,__new.metric1)ASmetric1,if(__old._sign=-1,__old.metric2,__new.metric2)ASmetric2,if(__old._sign=-1,__old._version,__new._version)AS_version,if(__old._sign=-1,-1,1)ASsignfrom__newleftjoin__oldusingidwhereif(__new.sign=-1,__old._sign=-1,-- insert only delete row if it's found in old data
__new._version>__old._version-- skip duplicates for updates
);-- original
insertintoStagevalues(1,1,1,1,1),(2,2,2,1,1);select'step1',*fromExample3;-- no duplicates (with the same version) inserted
insertintoStagevalues(1,3,1,1,1),(2,3,2,1,1);select'step2',*fromExample3;-- delete a row with id=2. version for delete row does not have any meaning
insertintoStagevalues(2,2,2,0,-1);select'step3',*fromExample3final;-- replace a row with id=1. row with sign=-1 not needed, but can be in the insert blocks (will be skipped)
insertintoStagevalues(1,1,1,0,-1),(1,3,3,2,1);select'step4',*fromExample3final;
Important additions:
When multiple events with the same ID and different versions are received in the one insert batch, the most recent event is applied.
“delete rows” with sign=-1 and the wrong version are not used for processing. For the Collapsing algorithm, the delete row version should match the version from the row stored in the local table, not the same version from the replacing row. That’s why I decided to skip such a “delete row” received from the incoming stream and build it from the table’s data.
using FINAL and PREWHERE (to speed up FINAL) while reading the destination table. PREWHERE filters are applied before FINAL processing, reducing the number of grouped rows.
filter to skip out-of-order events by checking the version
DELETE event processing (inside last WHERE)
Speed Test
setallow_experimental_analyzer=0;createtableExample3(idInt32,DepartmentString,metric1UInt32,metric2Float32,_versionUInt64,signInt8default1)engine=VersionedCollapsingMergeTree(sign,_version)ORDERBYidpartitionby(id%20)settingsindex_granularity=4096;setdo_not_merge_across_partitions_select_final=1;-- make 100M table
INSERTINTOExample3SELECTnumberASid,['HR','Finance','Engineering','Sales','Marketing'][rand()%5+1]ASDepartment,rand()%1000ASmetric1,(rand()%10000)/100.0ASmetric2,0AS_version,1ASsignFROMnumbers(1E8);createfunctiontimeSpentas()->date_diff('millisecond',(selecttsfromt1),now64(3));-- measure plain INSERT time for 1M batch
createtemporarytablet1(tsDateTime64(3))asselectnow64(3);INSERTINTOExample3SELECTnumberASid,['HR','Finance','Engineering','Sales','Marketing'][rand()%5+1]ASDepartment,rand()%1000ASmetric1,(rand()%10000)/100.0ASmetric2,1AS_version,1ASsignFROMnumbers(1E6);select'---',timeSpent(),'INSERT';--create table Stage engine=MergeTree order by id as Example3 ;
createtableStageengine=NullasExample3;creatematerializedviewExample3TransformtoExample3aswith__newas(SELECT*FROMStageorderby_versiondesc,signdesclimit1byid),__oldAS(SELECT*,arrayJoin([-1,1])AS_signfrom(select*FROMExample3finalPREWHEREidIN(SELECTidFROM__new)wheresign=1))selectid,if(__old._sign=-1,__old.Department,__new.Department)ASDepartment,if(__old._sign=-1,__old.metric1,__new.metric1)ASmetric1,if(__old._sign=-1,__old.metric2,__new.metric2)ASmetric2,if(__old._sign=-1,__old._version,__new._version)AS_version,if(__old._sign=-1,-1,1)ASsignfrom__newleftjoin__oldusingidwhereif(__new.sign=-1,__old._sign=-1,-- insert only delete row if it's found in old data
__new._version>__old._version-- skip duplicates for updates
);-- calculate UPSERT time for 1M batch
droptablet1;createtemporarytablet1(tsDateTime64(3))asselectnow64(3);INSERTINTOStageSELECT(rand()%1E6)*100ASid,--number AS id,
['HR','Finance','Engineering','Sales','Marketing'][rand()%5+1]ASDepartment,rand()%1000ASmetric1,(rand()%10000)/100.0ASmetric2,2AS_version,1ASsignFROMnumbers(1E6);select'---',timeSpent(),'UPSERT';-- FINAL query
droptablet1;createtemporarytablet1(tsDateTime64(3))asselectnow64(3);selectDepartment,count(),sum(metric1)fromExample3FINALgroupbyDepartmentorderbyDepartmentformatNull;select'---',timeSpent(),'FINAL';-- GROUP BY query
droptablet1;createtemporarytablet1(tsDateTime64(3))asselectnow64(3);selectDepartment,sum(sign),sum(sign*metric1)fromExample3groupbyDepartmentorderbyDepartmentformatNull;select'---',timeSpent(),'GROUP BY';optimizetableExample3final;-- FINAL query
droptablet1;createtemporarytablet1(tsDateTime64(3))asselectnow64(3);selectDepartment,count(),sum(metric1)fromExample3FINALgroupbyDepartmentorderbyDepartmentformatNull;select'---',timeSpent(),'FINAL OPTIMIZED';-- GROUP BY query
droptablet1;createtemporarytablet1(tsDateTime64(3))asselectnow64(3);selectDepartment,sum(sign),sum(sign*metric1)fromExample3groupbyDepartmentorderbyDepartmentformatNull;select'---',timeSpent(),'GROUP BY OPTIMIZED';
You can use fiddle or clickhouse-local to run such a test:
cat test.sql | clickhouse-local -nm
Results (Mac A2 Pro), milliseconds:
--- 252 INSERT
--- 1710 UPSERT
--- 763 FINAL
--- 311 GROUP BY
--- 314 FINAL OPTIMIZED
--- 295 GROUP BY OPTIMIZED
UPSERT is six times slower than direct INSERT because it requires looking up the destination table. That is the price. It is better to use idempotent inserts with an exactly-once delivery guarantee. However, it’s not always possible.
The FINAL speed is quite good, especially if we split the table by 20 partitions, use do_not_merge_across_partitions_select_final setting, and keep most of the table’s partitions optimized (1 part per partition). But we can do it better.
Adding projections
Let’s add an aggregating projection, and also add a more useful updated_at timestamp instead of an abstract _version and replace String for Department dimension by LowCardinality(String). Let’s look at the difference in time execution.
setallow_experimental_analyzer=0;createtableExample4(idInt32,DepartmentLowCardinality(String),metric1Int32,metric2Float32,_versionDateTime64(3)defaultnow64(3),signInt8default1)engine=VersionedCollapsingMergeTree(sign,_version)ORDERBYidpartitionby(id%20)settingsindex_granularity=4096;setdo_not_merge_across_partitions_select_final=1;-- make 100M table
INSERTINTOExample4SELECTnumberASid,['HR','Finance','Engineering','Sales','Marketing'][rand()%5+1]ASDepartment,rand()%1000ASmetric1,(rand()%10000)/100.0ASmetric2,0AS_version,1ASsignFROMnumbers(1E8);createtemporarytabletimeMark(tsDateTime64(3));createfunctiontimeSpentas()->date_diff('millisecond',(selectmax(ts)fromtimeMark),now64(3));-- measure plain INSERT time for 1M batch
insertintotimeMarkselectnow64(3);INSERTINTOExample4(id,Department,metric1,metric2)SELECTnumberASid,['HR','Finance','Engineering','Sales','Marketing'][rand()%5+1]ASDepartment,rand()%1000ASmetric1,(rand()%10000)/100.0ASmetric2FROMnumbers(1E6);select'---',timeSpent(),'INSERT';--create table Stage engine=MergeTree order by id as Example4 ;
createtableStageengine=NullasExample4;creatematerializedviewExample4TransformtoExample4aswith__newas(SELECT*FROMStageorderby_versiondesc,signdesclimit1byid),__oldAS(SELECT*,arrayJoin([-1,1])AS_signfrom(select*FROMExample4finalPREWHEREidIN(SELECTidFROM__new)wheresign=1))selectid,if(__old._sign=-1,__old.Department,__new.Department)ASDepartment,if(__old._sign=-1,__old.metric1,__new.metric1)ASmetric1,if(__old._sign=-1,__old.metric2,__new.metric2)ASmetric2,if(__old._sign=-1,__old._version,__new._version)AS_version,if(__old._sign=-1,-1,1)ASsignfrom__newleftjoin__oldusingidwhereif(__new.sign=-1,__old._sign=-1,-- insert only delete row if it's found in old data
__new._version>__old._version-- skip duplicates for updates
);-- calculate UPSERT time for 1M batch
insertintotimeMarkselectnow64(3);INSERTINTOStage(id,Department,metric1,metric2)SELECT(rand()%1E6)*100ASid,--number AS id,
['HR','Finance','Engineering','Sales','Marketing'][rand()%5+1]ASDepartment,rand()%1000ASmetric1,(rand()%10000)/100.0ASmetric2FROMnumbers(1E6);select'---',timeSpent(),'UPSERT';-- FINAL query
insertintotimeMarkselectnow64(3);selectDepartment,count(),sum(metric1)fromExample4FINALgroupbyDepartmentorderbyDepartmentformatNull;select'---',timeSpent(),'FINAL';-- GROUP BY query
insertintotimeMarkselectnow64(3);selectDepartment,sum(sign),sum(sign*metric1)fromExample4groupbyDepartmentorderbyDepartmentformatNull;select'---',timeSpent(),'GROUP BY';--select '--parts1',partition, count() from system.parts where active and table='Example4' group by partition;
insertintotimeMarkselectnow64(3);optimizetableExample4final;select'---',timeSpent(),'OPTIMIZE';-- FINAL OPTIMIZED
insertintotimeMarkselectnow64(3);selectDepartment,count(),sum(metric1)fromExample4FINALgroupbyDepartmentorderbyDepartmentformatNull;select'---',timeSpent(),'FINAL OPTIMIZED';-- GROUP BY OPTIMIZED
insertintotimeMarkselectnow64(3);selectDepartment,sum(sign),sum(sign*metric1)fromExample4groupbyDepartmentorderbyDepartmentformatNull;select'---',timeSpent(),'GROUP BY OPTIMIZED';-- UPSERT a little data to create more parts
INSERTINTOStage(id,Department,metric1,metric2)SELECTnumberASid,['HR','Finance','Engineering','Sales','Marketing'][rand()%5+1]ASDepartment,rand()%1000ASmetric1,(rand()%10000)/100.0ASmetric2FROMnumbers(1000);--select '--parts2',partition, count() from system.parts where active and table='Example4' group by partition;
-- GROUP BY SEMI-OPTIMIZED
insertintotimeMarkselectnow64(3);selectDepartment,sum(sign),sum(sign*metric1)fromExample4groupbyDepartmentorderbyDepartmentformatNull;select'---',timeSpent(),'GROUP BY SEMI-OPTIMIZED';--alter table Example4 add column Smetric1 Int32 alias metric1*sign;
altertableExample4addprojectionbyDep(selectDepartment,sum(sign),sum(sign*metric1)groupbyDepartment);-- Materialize Projection
insertintotimeMarkselectnow64(3);altertableExample4materializeprojectionbyDepsettingsmutations_sync=1;select'---',timeSpent(),'Materialize Projection';-- GROUP BY query Projected
insertintotimeMarkselectnow64(3);selectDepartment,sum(sign),sum(sign*metric1)fromExample4groupbyDepartmentorderbyDepartmentsettingsforce_optimize_projection=1formatNull;select'---',timeSpent(),'GROUP BY Projected';
Results (Mac A2 Pro), milliseconds:
--- 175 INSERT
--- 1613 UPSERT
--- 329 FINAL
--- 102 GROUP BY
--- 10498 OPTIMIZE
--- 103 FINAL OPTIMIZED
--- 90 GROUP BY OPTIMIZED
--- 94 GROUP BY SEMI-OPTIMIZED
--- 919 Materialize Projection
--- 5 GROUP BY Projected
Some thoughts:
INSERT, UPSERT, and SELECT benefit from switching the Department column to LowCardinality. Fewer reads - faster queries.
OPTIMIZE is VERY expensive
FINAL is quite fast (especially for the OPTIMIZED table). You don’t need to OPTIMIZE the table till the 1 part for partition to remove FINAL from the query. Not having too many parts already gives you a performance boost.
GROUP BY for that task is still faster
projections building requires resources. Inserts to the table with Projections will be longer. Tune the insert timeouts.
Query over projection is very fast (as it should be). However, it’s not always possible to aggregate data in such a simple way.
DELETEs inaccuracy
The typical CDC event for DWH systems besides INSERT is UPSERT—a new row replaces the old one (with suitable aggregate corrections). But DELETE events are also supported (ones with column sign=-1). The Materialized View described above will correctly process the DELETE event by inserting only 1 row with sign=-1 if a row with a particular ID already exists in the table. In such cases, VersionedCollapsingMergeTree will wipe both rows (with sign=1 & -1) during merge or final operations.
However, it can lead to incorrect duplicate processing in some rare situations. Here is the scenario:
two events happen in the source database (insert and delete) for the very same ID
only insert event create a duplicate (delete event does not duplicate)
all 3 events (delete and two inserts) were processed in separate batches
ClickHouse executes the merge operation very quickly after the first INSERT and DELETE events are received, effectively removing the row with that ID from the table
the second (duplicated) insert is saved to the table because we lost the information about the first insertion
The probability of such a sequence is relatively low, especially in normal operations when the amount of DELETEs is not too significant. Processing events in big batches will reduce the probability even more.
Combine old and new
The presented technique can be used to reimplement the AggregatingMergeTree algorithm to combine old and new row data using VersionedCollapsingMergeTree.
createtableExample5(idInt32,metric1UInt32,metric2Nullable(UInt32),updated_atDateTime64(3)defaultnow64(3),signInt8default1)engine=VersionedCollapsingMergeTree(sign,updated_at)ORDERBYid;createtableStageengine=NullasExample5;creatematerializedviewExample5TransformtoExample5aswith__newas(SELECT*FROMStageorderbysigndesc,updated_atdesclimit1byid),__oldAS(SELECT*,arrayJoin([-1,1])AS_signfrom(select*FROMExample5finalPREWHEREidIN(SELECTidFROM__new)wheresign=1))selectid,if(__old._sign=-1,__old.metric1,greatest(__new.metric1,__old.metric1))ASmetric1,if(__old._sign=-1,__old.metric2,ifNull(__new.metric2,__old.metric2))ASmetric2,if(__old._sign=-1,__old.updated_at,__new.updated_at)ASupdated_at,if(__old._sign=-1,-1,1)ASsignfrom__newleftjoin__oldusingidwhereif(__new.sign=-1,__old._sign=-1,-- insert only delete row if it's found in old data
__new.updated_at>__old.updated_at-- skip duplicates for updates
);-- original
insertintoStage(id)values(1),(2);select'step0',*fromExample5;insertintoStage(id,metric1)values(1,1),(2,2);select'step1',*fromExample5final;insertintoStage(id,metric2)values(1,11),(2,12);select'step2',*fromExample5final;
Complex Primary Key
I used a simple, compact column with Int64 type for the primary key in previous examples. It’s better to go this route with monotonically growing IDs like autoincrement ID or SnowFlakeId (based on timestamp). However, in some cases, a more complex primary key is needed. For instance, when storing data for multiple tenants (Customers, partners, etc.) in the same table. This is not a problem for the suggested technique - use all the necessary columns in all filters and JOIN operations as Tuple.
createtableExample6(idInt64,tenant_idInt32,metric1UInt32,_versionUInt64,signInt8default1)engine=VersionedCollapsingMergeTree(sign,_version)ORDERBY(tenant_id,id);createtableStageengine=NullasExample6;creatematerializedviewExample6TransformtoExample6aswith__newas(SELECT*FROMStageorderbysigndesc,_versiondesclimit1bytenant_id,id),__oldAS(SELECT*,arrayJoin([-1,1])AS_signfrom(select*FROMExample6finalPREWHERE(tenant_id,id)IN(SELECTtenant_id,idFROM__new)wheresign=1))selectid,tenant_id,if(__old._sign=-1,__old.metric1,__new.metric1)ASmetric1,if(__old._sign=-1,__old._version,__new._version)AS_version,if(__old._sign=-1,-1,1)ASsignfrom__newleftjoin__oldusing(tenant_id,id)whereif(__new.sign=-1,__old._sign=-1,-- insert only delete row if it's found in old data
__new._version>__old._version-- skip duplicates for updates
);
Sharding
The suggested approach works well when inserting data in a single thread on a single replica. This is suitable for up to 1M events per second. However, for higher traffic, it’s necessary to use multiple ingesting threads across several replicas. In such cases, collisions caused by parts manipulation and replication delay can disrupt the entire Collapsing algorithm.
But inserting different shards with a sharding key derived from ID works fine. Every shard will operate with its own non-intersecting set of IDs, and don’t interfere with each other.
The same approach can be implemented when inserting several threads into the same replica node. For big installations with high traffic and many shards and replicas, the ingesting app can split the data stream into a considerably large number of “virtual shards” (or partitions in Kafka terminology) and then map the “virtual shards” to the threads doing inserts to “physical shards.”
The incoming stream could be split into several ones by using an expression like cityHash64(id) % 50 = 0 as a sharding key. The ingesting app should calculate the shard number before sending data to internal buffers that will be flushed to INSERTs.
-- emulate insert into distributed table
INSERTINTOfunctionremote('localhos{t,t,t}',default,Stage,id)SELECT(rand()%1E6)*100ASid,--number AS id,
['HR','Finance','Engineering','Sales','Marketing'][rand()%5+1]ASDepartment,rand()%1000ASmetric1,(rand()%10000)/100.0ASmetric2,2AS_version,1ASsignFROMnumbers(1000)settingsprefer_localhost_replica=0;
2 - Queries & Syntax
Learn about ClickHouse® queries & syntax, including Joins & Window Functions.
┌─name───────────────────────────────┬─value────┬─changed─┬─description────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┬─min──┬─max──┬─readonly─┬─type───┐
│ group_by_two_level_threshold │ 100000 │ 0 │ From what number of keys, a two-level aggregation starts. 0 - the threshold is not set. │ ᴺᵁᴸᴸ │ ᴺᵁᴸᴸ │ 0 │ UInt64 │
│ group_by_two_level_threshold_bytes │ 50000000 │ 0 │ From what size of the aggregation state in bytes, a two-level aggregation begins to be used. 0 - the threshold is not set. Two-level aggregation is used when at least one of the thresholds is triggered. │ ᴺᵁᴸᴸ │ ᴺᵁᴸᴸ │ 0 │ UInt64 │
└────────────────────────────────────┴──────────┴─────────┴────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┴──────┴──────┴──────────┴────────┘
In order to parallelize merging of hash tables, ie execute such merge via multiple threads, ClickHouse use two-level approach:
On the first step ClickHouse creates 256 buckets for each thread. (determined by one byte of hash function)
On the second step ClickHouse can merge those 256 buckets independently by multiple threads.
It utilizes a two-level group by and dumps those buckets on disk. And at the last stage ClickHouse will read those buckets from disk one by one and merge them.
So you should have enough RAM to hold one bucket (1/256 of whole GROUP BY size).
Usually it works slower than regular GROUP BY, because ClickHouse needs to read and process data in specific ORDER, which makes it much more complicated to parallelize reading and aggregating.
But it use much less memory, because ClickHouse can stream resultset and there is no need to keep it in memory.
Last item cache
ClickHouse saves value of previous hash calculation, just in case next value will be the same.
All queries and datasets are unique, so in different situations different hacks could work better or worse.
PreFilter values before GROUP BY
SELECTuser_id,sum(clicks)FROMsessionsWHEREcreated_at>'2021-11-01 00:00:00'GROUPBYuser_idHAVING(argMax(clicks,created_at)=16)AND(argMax(platform,created_at)='Rat')FORMAT`Null`<Debug>MemoryTracker:Peakmemoryusage(forquery):18.36GiB.SELECTuser_id,sum(clicks)FROMsessionsWHEREuser_idIN(SELECTuser_idFROMsessionsWHERE(platform='Rat')AND(clicks=16)AND(created_at>'2021-11-01 00:00:00')-- So we will select user_id which could potentially match our HAVING clause in OUTER query.
)AND(created_at>'2021-11-01 00:00:00')GROUPBYuser_idHAVING(argMax(clicks,created_at)=16)AND(argMax(platform,created_at)='Rat')FORMAT`Null`<Debug>MemoryTracker:Peakmemoryusage(forquery):4.43GiB.
Use Fixed-width data types instead of String
For example, you have 2 strings which has values in special form like this
‘ABX 1412312312313’
You can just remove the first 4 characters and convert the rest to UInt64
toUInt64(substr(‘ABX 1412312312313’,5))
And you packed 17 bytes in 8, more than 2 times the improvement of size!
Because each thread uses an independent hash table, if you lower thread amount it will reduce number of hash tables as well and lower memory usage at the cost of slower query execution.
From 22.4 ClickHouse can predict, when it make sense to initialize aggregation with two-level from start, instead of rehashing on fly.
It can improve query time.
https://github.com/ClickHouse/ClickHouse/pull/33439
GROUP BY in external memory
Slow!
Use hash function for GROUP BY keys
GROUP BY cityHash64(‘xxxx’)
Can lead to incorrect results as hash functions is not 1 to 1 mapping.
An approach that allows you to redefine partitioning without table creation
In that example, partitioning is being calculated via MATERIALIZED column expression toDate(toStartOfInterval(ts, toIntervalT(...))), but partition id also can be generated on application side and inserted to ClickHouse® as is.
SELECTnumberFROMnumbers_mt(100000000)GROUPBYnumberFORMAT`Null`MemoryTracker:Peakmemoryusage(forquery):4.04GiB.0rowsinset.Elapsed:8.212sec.Processed100.00millionrows,800.00MB(12.18millionrows/s.,97.42MB/s.)SELECTnumberFROMnumbers_mt(100000000)GROUPBYnumberSETTINGSmax_threads=1FORMAT`Null`MemoryTracker:Peakmemoryusage(forquery):6.00GiB.0rowsinset.Elapsed:19.206sec.Processed100.03millionrows,800.21MB(5.21millionrows/s.,41.66MB/s.)SELECTnumberFROMnumbers_mt(100000000)GROUPBYnumberLIMIT1000FORMAT`Null`MemoryTracker:Peakmemoryusage(forquery):4.05GiB.0rowsinset.Elapsed:4.852sec.Processed100.00millionrows,800.00MB(20.61millionrows/s.,164.88MB/s.)Thisqueryfasterthanfirst,becauseClickHouse®doesn't need to merge states for all keys, only for first 1000 (based on LIMIT)
SELECT number % 1000 AS key
FROM numbers_mt(1000000000)
GROUP BY key
LIMIT 1000
FORMAT `Null`
MemoryTracker: Peak memory usage (for query): 3.15 MiB.
0 rows in set. Elapsed: 0.770 sec. Processed 1.00 billion rows, 8.00 GB (1.30 billion rows/s., 10.40 GB/s.)
SELECT number % 1000 AS key
FROM numbers_mt(1000000000)
GROUP BY key
LIMIT 1001
FORMAT `Null`
MemoryTracker: Peak memory usage (for query): 3.77 MiB.
0 rows in set. Elapsed: 0.770 sec. Processed 1.00 billion rows, 8.00 GB (1.30 billion rows/s., 10.40 GB/s.)
Multi threaded
Will return result only after completion of aggregation
When we try to type cast 64.32 to Decimal128(2) the resulted value is 64.31.
When it sees a number with a decimal separator it interprets as Float64 literal (where 64.32 have no accurate representation, and actually you get something like 64.319999999999999999) and later that Float is casted to Decimal by removing the extra precision.
Workaround is very simple - wrap the number in quotes (and it will be considered as a string literal by query parser, and will be transformed to Decimal directly), or use postgres-alike casting syntax:
One more approach to hide (delete) rows in ClickHouse®
No row policy
CREATETABLEtest_delete(tenantInt64,keyInt64,tsDateTime,value_aString)ENGINE=MergeTreePARTITIONBYtoYYYYMM(ts)ORDERBY(tenant,key,ts);INSERTINTOtest_deleteSELECTnumber%5,number,toDateTime('2020-01-01')+number/10,concat('some_looong_string',toString(number)),FROMnumbers(1e8);INSERTINTOtest_delete-- multiple small tenants
SELECTnumber%5000,number,toDateTime('2020-01-01')+number/10,concat('some_looong_string',toString(number)),FROMnumbers(1e8);
expression: CREATE ROW POLICY pol1 ON test_delete USING tenant not in (1,2,3) TO all;
table subq: CREATE ROW POLICY pol1 ON test_delete USING tenant not in deleted_tenants TO all;
ext. dict. NOT dictHas : CREATE ROW POLICY pol1 ON test_delete USING NOT dictHas('deleted_tenants_dict', tenant) TO all;
ext. dict. dictHas :
Q
no policy
expression
table subq
ext. dict. NOT
ext. dict.
engine=Set
Q1
0.285 / 200.00m
0.333 / 140.08m
0.329 / 140.08m
0.388 / 200.00m
0.399 / 200.00m
0.322 / 200.00m
Q2
0.265 / 20.23m
0.287 / 20.23m
0.287 / 20.23m
0.291 / 20.23m
0.284 / 20.23m
0.275 / 20.23m
Q3
0.062 / 20.23m
0.080 / 20.23m
0.080 / 20.23m
0.084 / 20.23m
0.080 / 20.23m
0.084 / 20.23m
Q4
0.009 / 212.99t
0.011 / 212.99t
0.010 / 213.00t
0.010 / 212.99t
0.010 / 212.99t
0.010 / 212.99t
Q5
0.008 / 180.22t
0.008 / 180.23t
0.046 / 20.22m
0.034 / 20.22m
0.030 / 20.22m
Expression in row policy seems to be fastest way (Q1, Q5).
2.8 - Why is simple `SELECT count()` Slow in ClickHouse®?
ClickHouse is a columnar database that provides excellent performance for analytical queries. However, in some cases, a simple count query can be slow. In this article, we’ll explore the reasons why this can happen and how to optimize the query.
Three Strategies for Counting Rows in ClickHouse
There are three ways to count rows in a table in ClickHouse:
optimize_trivial_count_query: This strategy extracts the number of rows from the table metadata. It’s the fastest and most efficient way to count rows, but it only works for simple count queries.
allow_experimental_projection_optimization: This strategy uses a virtual projection called _minmax_count_projection to count rows. It’s faster than scanning the table but slower than the trivial count query.
Scanning the smallest column in the table and reading rows from that. This is the slowest strategy and is only used when the other two strategies can’t be used.
Why Does ClickHouse Sometimes Choose the Slowest Counting Strategy?
In some cases, ClickHouse may choose the slowest counting strategy even when there are faster options available. Here are some possible reasons why this can happen:
Row policies are used on the table: If row policies are used, ClickHouse needs to filter rows to give the proper count. You can check if row policies are used by selecting from system.row_policies.
Experimental light-weight delete feature was used on the table: If the experimental light-weight delete feature was used, ClickHouse may use the slowest counting strategy. You can check this by looking into parts_columns for the column named _row_exists. To do this, run the following query:
Some other features like allow_experimental_query_deduplication or empty_result_for_aggregation_by_empty_set is used.
2.9 - Collecting query execution flamegraphs using system.trace_log
ClickHouse® has embedded functionality to analyze the details of query performance.
It’s system.trace_log table.
By default it collects information only about queries when runs longer than 1 sec (and collects stacktraces every second).
You can adjust that per query using settings query_profiler_real_time_period_ns & query_profiler_cpu_time_period_ns.
Both works very similar (with desired interval dump the stacktraces of all the threads which execute the query).
real timer - allows to ‘see’ the situations when cpu was not working much, but time was spend for example on IO.
cpu timer - allows to see the ‘hot’ points in calculations more accurately (skip the io time).
Trying to collect stacktraces with a frequency higher than few KHz is usually not possible.
To check where most of the RAM is used you can collect stacktraces during memory allocations / deallocation, by using the
setting memory_profiler_sample_probability.
clickhouse-speedscope
# install wget https://github.com/laplab/clickhouse-speedscope/archive/refs/heads/master.tar.gz -O clickhouse-speedscope.tar.gz
tar -xvzf clickhouse-speedscope.tar.gz
cd clickhouse-speedscope-master/
pip3 install -r requirements.txt
For debugging particular query:
clickhouse-client
SET query_profiler_cpu_time_period_ns=1000000; -- 1000 times per 'cpu' sec
-- or SET query_profiler_real_time_period_ns=2000000; -- 500 times per 'real' sec.
-- or SET memory_profiler_sample_probability=0.1; -- to debug the memory allocations
SELECT ... <your select>
SYSTEM FLUSH LOGS;
-- get the query_id from the clickhouse-client output or from system.query_log (also pay attention on query_id vs initial_query_id for distributed queries).
Now let’s process that:
python3 main.py & # start the proxy in background
python3 main.py --query-id 908952ee-71a8-48a4-84d5-f4db92d45a5d # process the stacktraces
fg # get the proxy from background
Ctrl + C # stop it.
git clone https://github.com/brendangregg/FlameGraph /opt/flamegraph
clickhouse-client -q "SELECT arrayStringConcat(arrayReverse(arrayMap(x -> concat( addressToLine(x), '#', demangle(addressToSymbol(x)) ), trace)), ';') AS stack, count() AS samples FROM system.trace_log WHERE event_time >= subtractMinutes(now(),10) GROUP BY trace FORMAT TabSeparated" | /opt/flamegraph/flamegraph.pl > flamegraph.svg
clickhouse-client -q "SELECT arrayStringConcat((arrayMap(x -> concat(splitByChar('/', addressToLine(x))[-1], '#', demangle(addressToSymbol(x)) ), trace)), ';') AS stack, sum(abs(size)) AS samples FROM system.trace_log where trace_type = 'Memory' and event_date = today() group by trace order by samples desc FORMAT TabSeparated" | /opt/flamegraph/flamegraph.pl > allocs.svg
clickhouse-client -q "SELECT arrayStringConcat(arrayReverse(arrayMap(x -> concat(splitByChar('/', addressToLine(x))[-1], '#', demangle(addressToSymbol(x)) ), trace)), ';') AS stack, count() AS samples FROM system.trace_log where trace_type = 'Memory' group by trace FORMAT TabSeparated SETTINGS allow_introspection_functions=1" | /opt/flamegraph/flamegraph.pl > ~/mem1.svg
2.10 - Using array functions to mimic window-functions alike behavior
There are cases where you may need to mimic window functions using arrays in ClickHouse. This could be for optimization purposes, to better manage memory, or to enable on-disk spilling, especially if you’re working with an older version of ClickHouse that doesn’t natively support window functions.
Here’s an example demonstrating how to mimic a window function like runningDifference() using arrays:
Step 1: Create Sample Data
We’ll start by creating a test table with some sample data:
This table contains IDs, timestamps (ts), and values (val), where each id appears multiple times with different timestamps.
Step 2: Running Difference Example
If you try using runningDifference directly, it works block by block, which can be problematic when the data needs to be ordered or when group changes occur.
The output may look inconsistent because runningDifference requires ordered data within blocks.
Step 3: Using Arrays for Grouping and Calculation
Instead of using runningDifference, we can utilize arrays to group data, sort it, and apply similar logic more efficiently.
Grouping Data into Arrays -
You can group multiple columns into arrays by using the groupArray function. For example, to collect several columns as arrays of tuples, you can use the following query:
Applying Calculations with Arrays -
Once the data is sorted, you can apply array functions like arrayMap and arrayDifference to calculate differences between values in the arrays:
This allows you to manipulate and analyze data within arrays effectively, using powerful functions such as arrayMap, arrayDifference, and arrayEnumerate.
2.11 - -State & -Merge combinators
-State & -Merge combinators
The -State combinator in ClickHouse® does not store additional information about the -If combinator, which means that aggregate functions with and without -If have the same serialized data structure. This can be verified through various examples, as demonstrated below.
Example 1: maxIfState and maxState
In this example, we use the maxIfState and maxState functions on a dataset of numbers, serialize the result, and merge it using the maxMerge function.
$clickhouse-local--query "SELECT maxIfState(number,number % 2) as x, maxState(number) as y FROM numbers(10) FORMAT RowBinary" | clickhouse-local --input-format RowBinary --structure="x AggregateFunction(max,UInt64), y AggregateFunction(max,UInt64)" --query "SELECT maxMerge(x), maxMerge(y) FROM table"
99$clickhouse-local--query "SELECT maxIfState(number,number % 2) as x, maxState(number) as y FROM numbers(11) FORMAT RowBinary" | clickhouse-local --input-format RowBinary --structure="x AggregateFunction(max,UInt64), y AggregateFunction(max,UInt64)" --query "SELECT maxMerge(x), maxMerge(y) FROM table"
910
In both cases, the -State combinator results in identical serialized data footprints, regardless of the conditions in the -If variant. The maxMerge function merges the state without concern for the original -If condition.
Example 2: quantilesTDigestIfState
Here, we use the quantilesTDigestIfState function to demonstrate that functions like quantile-based and sequence matching functions follow the same principle regarding serialized data consistency.
$clickhouse-local--query "SELECT quantilesTDigestIfState(0.1,0.9)(number,number % 2) FROM numbers(1000000) FORMAT RowBinary" | clickhouse-local --input-format RowBinary --structure="x AggregateFunction(quantileTDigestWeighted(0.5),UInt64,UInt8)" --query "SELECT quantileTDigestWeightedMerge(0.4)(x) FROM table"
400000$clickhouse-local--query "SELECT quantilesTDigestIfState(0.1,0.9)(number,number % 2) FROM numbers(1000000) FORMAT RowBinary" | clickhouse-local --input-format RowBinary --structure="x AggregateFunction(quantilesTDigestWeighted(0.5),UInt64,UInt8)" --query "SELECT quantilesTDigestWeightedMerge(0.4,0.8)(x) FROM table"
[400000,800000]
Example 3: Quantile Functions with -Merge
This example shows how the quantileState and quantileMerge functions work together to calculate a specific quantile.
Example 4: sequenceMatch and sequenceCount Functions with -Merge
Finally, we demonstrate the behavior of sequenceMatchState and sequenceMatchMerge, as well as sequenceCountState and sequenceCountMerge, in ClickHouse.
ClickHouse’s -State combinator stores serialized data in a consistent manner, irrespective of conditions used with -If. The same applies to a wide range of functions, including quantile and sequence-based functions. This behavior ensures that functions like maxMerge, quantileMerge, sequenceMatchMerge, and sequenceCountMerge work seamlessly, even across varied inputs.
2.12 - ALTER MODIFY COLUMN is stuck, the column is inaccessible.
ALTER MODIFY COLUMN is stuck, the column is inaccessible.
Problem
You’ve created a table in ClickHouse with the following structure:
The failure occurred because the Enum8 type only allows for predefined values. Since ‘key_c’ wasn’t included in the definition, the mutation failed and left the table in an inconsistent state.
Solution
Identify and Terminate the Stuck Mutation
First, you need to locate the mutation that’s stuck in an incomplete state.
Apply the Correct Column Modification
Now that the column is accessible, you can safely reapply the ALTER query, but this time include all the required enum values:
Monitor Progress
You can monitor the progress of the column modification using the system.mutations or system.parts_columns tables to ensure everything proceeds as expected:
To make ClickHouse® more compatible with ANSI SQL standards (at the expense of some performance), you can adjust several settings. These configurations will bring ClickHouse closer to ANSI SQL behavior but may introduce a slowdown in query performance:
join_use_nulls=1
Introduced in: early versions
Ensures that JOIN operations return NULL for non-matching rows, aligning with standard SQL behavior.
cast_keep_nullable=1
Introduced in: v20.5
Preserves the NULL flag when casting between data types, which is typical in ANSI SQL.
union_default_mode='DISTINCT'
Introduced in: v21.1
Makes the UNION operation default to UNION DISTINCT, which removes duplicate rows, following ANSI SQL behavior.
allow_experimental_window_functions=1
Introduced in: v21.3
Enables support for window functions, which are a standard feature in ANSI SQL.
prefer_column_name_to_alias=1
Introduced in: v21.4
This setting resolves ambiguities by preferring column names over aliases, following ANSI SQL conventions.
group_by_use_nulls=1
Introduced in: v22.7
Allows NULL values to appear in the GROUP BY clause, consistent with ANSI SQL behavior.
By enabling these settings, ClickHouse becomes more ANSI SQL-compliant, although this may come with a trade-off in terms of performance. Each of these options can be enabled as needed, based on the specific SQL compatibility requirements of your application.
2.14 - Async INSERTs
Comprehensive guide to ClickHouse Async INSERTs - configuration, best practices, and monitoring
Overview
Async INSERTs is a ClickHouse® feature that enables automatic server-side batching of data. While we generally recommend batching at the application/ingestor level for better control and decoupling, async inserts are valuable when you have hundreds or thousands of clients performing small inserts and client-side batching is not feasible.
Time threshold elapses (async_insert_busy_timeout_ms)
Maximum number of queries accumulate (async_insert_max_query_number)
Critical Configuration Settings
Core Settings
-- Enable async inserts (0=disabled, 1=enabled)
SETasync_insert=1;-- Wait behavior (STRONGLY RECOMMENDED: use 1)
-- 0 = fire-and-forget mode (risky - no error feedback)
-- 1 = wait for data to be written to storage
SETwait_for_async_insert=1;-- Buffer flush conditions
SETasync_insert_max_data_size=1000000;-- 1MB default
SETasync_insert_busy_timeout_ms=1000;-- 1 second
SETasync_insert_max_query_number=100;-- max queries before flush
Adaptive Timeout (Since 24.3)
-- Adaptive timeout automatically adjusts flush timing based on server load
-- Default: 1 (enabled) - OVERRIDES manual timeout settings
-- Set to 0 for deterministic behavior with manual settings
SETasync_insert_use_adaptive_busy_timeout=0;
Important Behavioral Notes
What Works and What Doesn’t
✅ Works with Async Inserts:
Direct INSERT with VALUES
INSERT with FORMAT (JSONEachRow, CSV, etc.)
Native protocol inserts (since 22.x)
❌ Does NOT Work:
INSERT .. SELECT statements - Other strategies are needed for managing performance and load. Do not use async_insert.
Data Safety Considerations
ALWAYS use wait_for_async_insert = 1 in production!
Risks with wait_for_async_insert = 0:
Silent data loss on errors (read-only table, disk full, too many parts)
Data loss on sudden restart (no fsync by default)
Data not immediately queryable after acknowledgment
No error feedback to client
Deduplication Behavior
Sync inserts: Automatic deduplication enabled by default
Async inserts: Deduplication disabled by default
Enable with async_insert_deduplicate = 1 (since 22.x)
Warning: Don’t use with deduplicate_blocks_in_dependent_materialized_views = 1
features / improvements
Async insert dedup: Support block deduplication for asynchronous inserts. Before this change, async inserts did not support deduplication, because multiple small inserts coexisted in one inserted batch:
Added system table asynchronous_insert_log. It contains information about asynchronous inserts (including results of queries in fire-and-forget mode. (with wait_for_async_insert=0)) for better introspection #42040
Support async inserts in clickhouse-client for queries with inlined data (Native protocol):
Fixed bug which could lead to deadlock while using asynchronous inserts #43233
.
Fix crash when async inserts with deduplication are used for ReplicatedMergeTree tables using a nondefault merging algorithm #51676
Async inserts not working with log_comment setting 48430
Fix misbehaviour with async inserts with deduplication #50663
Reject Insert if async_insert=1 and deduplicate_blocks_in_dependent_materialized_views=1#60888
Disable async_insert_use_adaptive_busy_timeout correctly with compatibility settings #61486
observability / introspection
In 22.x versions, it is not possible to relate part_log/query_id column with asynchronous_insert_log/query_id column. We need to use query_log/query_id:
asynchronous_insert_log shows up the query_id and flush_query_id of each async insert. The query_id from asynchronous_insert_log shows up in the system.query_log as type = 'QueryStart' but the same query_id does not show up in the query_id column of the system.part_log. Because the query_id column in the part_log is the identifier of the INSERT query that created a data part, and it seems it is for sync INSERTS but not for async inserts.
So in asynchronous_inserts table you can check the current batch that still has not been flushed. In the asynchronous_insert_log you can find a log of all the flushed async inserts.
This has been improved in ClickHouse 23.7 Flush queries for async inserts (the queries that do the final push of data) are now logged in the system.query_log where they appear as query_kind = 'AsyncInsertFlush'#51160
Versions
23.8 is a good version to start using async inserts because of the improvements and bugfixes.
24.3 the new adaptive timeout mechanism has been added so ClickHouse will throttle the inserts based on the server load.#58486
This new feature is enabled by default and will OVERRRIDE current async insert settings, so better to disable it if your async insert settings are working. Here’s how to do it in a clickhouse-client session: SET async_insert_use_adaptive_busy_timeout = 0; You can also add it as a setting on the INSERT or as a profile setting.
Generate test data in Native and TSV format ( 100 millions rows )
Text formats and Native format require different set of settings, here I want to find / demonstrate mandatory minimum of settings for any case.
clickhouse-client -q \
'SELECT toInt64(number) A, toString(number) S FROM numbers(100000000) FORMAT Native' > t.native
clickhouse-client -q \
'SELECT toInt64(number) A, toString(number) S FROM numbers(100000000) FORMAT TSV' > t.tsv
Insert with default settings (not atomic)
DROP TABLE IF EXISTS trg;CREATE TABLE trg(A Int64, S String)Engine=MergeTree ORDER BY A;-- Load data in Native format
clickhouse-client -q 'INSERT INTO trg FORMAT Native' <t.native
-- Check how many parts is created
SELECT
count(),
min(rows),
max(rows),
sum(rows)FROM system.parts
WHERE (level= 0) AND (table='trg');┌─count()─┬─min(rows)─┬─max(rows)─┬─sum(rows)─┐
│ 90 │ 890935 │ 1113585 │ 100000000 │
└─────────┴───────────┴───────────┴───────────┘
--- 90 parts! was created - not atomic
DROP TABLE IF EXISTS trg;CREATE TABLE trg(A Int64, S String)Engine=MergeTree ORDER BY A;-- Load data in TSV format
clickhouse-client -q 'INSERT INTO trg FORMAT TSV' <t.tsv
-- Check how many parts is created
SELECT
count(),
min(rows),
max(rows),
sum(rows)FROM system.parts
WHERE (level= 0) AND (table='trg');┌─count()─┬─min(rows)─┬─max(rows)─┬─sum(rows)─┐
│ 85 │ 898207 │ 1449610 │ 100000000 │
└─────────┴───────────┴───────────┴───────────┘
--- 85 parts! was created - not atomic
Insert with adjusted settings (atomic)
Atomic insert use more memory because it needs 100 millions rows in memory.
DROP TABLE IF EXISTS trg;CREATE TABLE trg(A Int64, S String)Engine=MergeTree ORDER BY A;clickhouse-client --input_format_parallel_parsing=0\
--min_insert_block_size_bytes=0\
--min_insert_block_size_rows=1000000000\
-q 'INSERT INTO trg FORMAT Native' <t.native
-- Check that only one part is created
SELECT
count(),
min(rows),
max(rows),
sum(rows)FROM system.parts
WHERE (level= 0) AND (table='trg');┌─count()─┬─min(rows)─┬─max(rows)─┬─sum(rows)─┐
│ 1 │ 100000000 │ 100000000 │ 100000000 │
└─────────┴───────────┴───────────┴───────────┘
-- 1 part, success.
DROP TABLE IF EXISTS trg;CREATE TABLE trg(A Int64, S String)Engine=MergeTree ORDER BY A;-- Load data in TSV format
clickhouse-client --input_format_parallel_parsing=0\
--min_insert_block_size_bytes=0\
--min_insert_block_size_rows=1000000000\
-q 'INSERT INTO trg FORMAT TSV' <t.tsv
-- Check that only one part is created
SELECT
count(),
min(rows),
max(rows),
sum(rows)FROM system.parts
WHERE (level= 0) AND (table='trg');┌─count()─┬─min(rows)─┬─max(rows)─┬─sum(rows)─┐
│ 1 │ 100000000 │ 100000000 │ 100000000 │
└─────────┴───────────┴───────────┴───────────┘
-- 1 part, success.
2.16 - ClickHouse® Projections
Using this ClickHouse feature to optimize queries
Projections in ClickHouse act as inner tables within a main table, functioning as a mechanism to optimize queries by using these inner tables when only specific columns are needed. Essentially, a projection is similar to a Materialized View
with an AggregatingMergeTree engine
, designed to be automatically populated with relevant data.
However, too many projections can lead to excess storage, much like overusing Materialized Views. Projections share the same lifecycle as the main table, meaning they are automatically backfilled and don’t require query rewrites, which is particularly advantageous when integrating with BI tools.
Projection parts are stored within the main table parts, and their merges occur simultaneously as the main table merges, ensuring data consistency without additional maintenance.
compared to a separate table+MV setup:
A separate table gives you more freedom (like partitioning, granularity, etc), but projections - more consistency (parts managed as a whole)
Projections do not support many features (like indexes and FINAL). That becomes better with recent versions, but still a drawback
The design approach for projections is the same as for indexes. Create a table and give it to users. If you encounter a slower query, add a projection for that particular query (or set of similar queries). You can create 10+ projections per table, materialize, drop, etc - the very same as indexes. You exchange query speed for disk space/IO and CPU needed to build and rebuild projections on merges.
Links
Amos Bird - kuaishou.com - Projections in ClickHouse. slides
. video
A query analyzer should have a reason for using a projection and should not have any limitation to do so.
the query should use ONLY the columns defined in the projection.
There should be a lot of data to read from the main table (gigabytes)
for ORDER BY projection WHERE statement referring to a column should be in the query
FINAL queries do not work with projections.
tables with DELETEd rows do not work with projections. This is because rows in a projection may be affected by a DELETE operation. But there is a MergeTree setting lightweight_mutation_projection_mode to change the behavior (Since 24.7)
Projection is used only if it is cheaper to read from it than from the table (expected amount of rows and GBs read is smaller)
Projection should be materialized. Verify that all parts have the needed projection by comparing system.parts and system.projection_parts (see query below)
a bug in a Clickhouse version. Look at changelog
and search for projection.
If there are many projections per table, the analyzer can select any of them. If you think that it is better, use settings preferred_optimize_projection_name or force_optimize_projection_name
If expressions are used instead of plain column names, the query should use the exact expression as defined in the projection with the same functions and modifiers. Use column aliases to make the query the very same as in the projection definition:
SELECT
p.database AS base_database,
p.table AS base_table,
p.name AS base_part_name, -- Name of the part in the base table
p.has_lightweight_delete,
pp.active
FROM system.parts AS p -- Alias for the base table's parts
LEFT JOIN system.projection_parts AS pp -- Alias for the projection's parts
ON p.database = pp.database AND p.table = pp.table
AND p.name = pp.parent_name
AND pp.name = 'projection'
WHERE
p.database = 'database'
AND p.table = 'table'
AND p.active -- Consider only active parts of the base table
-- and not pp.active -- see only missed in the list
ORDER BY p.database, p.table, p.name;
Recalculate on Merge
What happens in the case of non-trivial background merges in ReplacingMergeTree, AggregatingMergeTree and similar, and OPTIMIZE table DEDUPLICATE queries?
Before version 24.8, projections became out of sync with the main data.
Somewhere later (before 25.3) ignore option was introduced. It can be helpful for cases when SummingMergeTree is used with Projections and no DELETE operation in any flavor (Replacing/Collapsing/DELETE/ALTER DELETE) is executed over the table.
However, projection usage is still disabled for FINAL queries. So, you have to use OPTIMIZE FINAL or SELECT …GROUP BY instead of FINAL for fighting duplicates between parts
CREATE TABLE users (uid Int16, name String, version Int16,
projection xx (
select name,uid,version order by name
)
) ENGINE=ReplacingMergeTree order by uid
settings deduplicate_merge_projection_mode='rebuild'
;
INSERT INTO users
SELECT
number AS uid,
concat('User_', toString(uid)) AS name,
1 AS version
FROM numbers(100000);
INSERT INTO users
SELECT
number AS uid,
concat('User_', toString(uid)) AS name,
2 AS version
FROM numbers(100000);
SELECT 'duplicate',name,uid,version FROM users
where name ='User_98304'
settings force_optimize_projection=1 ;
SELECT 'dedup by group by/limit 1 by',name,uid,version FROM users
where name ='User_98304'
order by version DESC
limit 1 by uid
settings force_optimize_projection=1
;
optimize table users final ;
SELECT 'dedup after optimize',name,uid,version FROM users
where name ='User_98304'
settings force_optimize_projection=1 ;
SELECT
database,
table,
name,
formatReadableSize(sum(data_compressed_bytes) AS size) AS compressed,
formatReadableSize(sum(data_uncompressed_bytes) AS usize) AS uncompressed,
round(usize / size, 2) AS compr_rate,
sum(rows) AS rows,
count() AS part_count
FROM system.projection_parts
WHERE active
GROUP BY
database,
table,
name
ORDER BY size DESC;
How to receive a list of tables with projections?
select database, table from system.tables
where create_table_query ilike '%projection%'
and database <> 'system'
Examples
Aggregating ClickHouse projections
createtablez(BrowserString,CountryUInt8,FFloat64)Engine=MergeTreeorderbyBrowser;insertintozselecttoString(number%9999),number%33,1fromnumbers(100000000);--Q1)
selectsum(F),BrowserfromzgroupbyBrowserformatNull;Elapsed:0.205sec.Processed100.00millionrows--Q2)
selectsum(F),Browser,CountryfromzgroupbyBrowser,CountryformatNull;Elapsed:0.381sec.Processed100.00millionrows--Q3)
selectsum(F),count(),Browser,CountryfromzgroupbyBrowser,CountryformatNull;Elapsed:0.398sec.Processed100.00millionrowsaltertablezaddprojectionpp(selectBrowser,Country,count(),sum(F)groupbyBrowser,Country);altertablezmaterializeprojectionpp;---- 0 = don't use proj, 1 = use projection
setallow_experimental_projection_optimization=1;--Q1)
selectsum(F),BrowserfromzgroupbyBrowserformatNull;Elapsed:0.003sec.Processed22.43thousandrows--Q2)
selectsum(F),Browser,CountryfromzgroupbyBrowser,CountryformatNull;Elapsed:0.004sec.Processed22.43thousandrows--Q3)
selectsum(F),count(),Browser,CountryfromzgroupbyBrowser,CountryformatNull;Elapsed:0.005sec.Processed22.43thousandrows
Emulation of an inverted index using orderby projection
You can create an orderby projection and include all columns of a table, but if a table is very wide it will double the amount of stored data. This example demonstrate a trick, we create an orderby projection and include primary key columns and the target column and sort by the target column. This allows using subquery to find primary key values
and after that to query the table using the primary key.
CREATETABLEtest_a(`src`String,`dst`String,`other_cols`String,PROJECTIONp1(SELECTsrc,dstORDERBYdst))ENGINE=MergeTreeORDERBYsrc;insertintotest_aselectnumber,-number,'other_col '||toString(number)fromnumbers(1e8);select*fromtest_awheresrc='42';┌─src─┬─dst─┬─other_cols───┐│42│-42│other_col42│└─────┴─────┴──────────────┘1rowinset.Elapsed:0.005sec.Processed16.38thousandrows,988.49KB(3.14millionrows/s.,189.43MB/s.)select*fromtest_awheredst='-42';┌─src─┬─dst─┬─other_cols───┐│42│-42│other_col42│└─────┴─────┴──────────────┘1rowinset.Elapsed:0.625sec.Processed100.00millionrows,1.79GB(160.05millionrows/s.,2.86GB/s.)-- optimization using projection
select*fromtest_awheresrcin(selectsrcfromtest_awheredst='-42')anddst='-42';┌─src─┬─dst─┬─other_cols───┐│42│-42│other_col42│└─────┴─────┴──────────────┘1rowinset.Elapsed:0.013sec.Processed32.77thousandrows,660.75KB(2.54millionrows/s.,51.26MB/s.)
Elapsed: 0.625 sec. Processed 100.00 million rows – not optimized
VS
Elapsed: 0.013 sec. Processed 32.77 thousand rows – optimized
This article provides an overview of the different methods to handle row deletion in ClickHouse, using tombstone columns and ALTER UPDATE or DELETE. The goal is to highlight the performance impacts of different techniques and storage settings, including a scenario using S3 for remote storage.
Creating a Test Table
We will start by creating a simple MergeTree table with a tombstone column (is_active) to track active rows:
INSERTINTOtest_delete(key,ts,value_a,value_b,value_c)SELECTnumber,1,concat('some_looong_string',toString(number)),concat('another_long_str',toString(number)),concat('string',toString(number))FROMnumbers(10000000);INSERTINTOtest_delete(key,ts,value_a,value_b,value_c)VALUES(400000,2,'totally different string','another totally different string','last string');
Hard Deletion Using ALTER DELETE
If you need to completely remove a row from the table, you can use ALTER DELETE:
ALTERTABLEtest_deleteDELETEWHERE(key=400000)AND(ts=1);Ok.0rowsinset.Elapsed:1.101sec.-- 20 times slower!!!
However, this operation is significantly slower compared to the ALTER UPDATE approach. For example:
ALTER DELETE: Takes around 1.1 seconds
ALTER UPDATE: Only 0.05 seconds
The reason for this difference is that DELETE modifies the physical data structure, while UPDATE merely changes a column value.
SELECT*FROMtest_deleteWHEREkey=400000;┌────key─┬─ts─┬─value_a──────────────────┬─value_b──────────────────────────┬─value_c─────┬─is_active─┐│400000│2│totallydifferentstring│anothertotallydifferentstring│laststring│1│└────────┴────┴──────────────────────────┴──────────────────────────────────┴─────────────┴───────────┘-- For ReplacingMergeTree -> https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/replacingmergetree
OPTIMIZETABLEtest_deleteFINAL;Ok.0rowsinset.Elapsed:2.230sec.-- 40 times slower!!!
SELECT*FROMtest_deleteWHEREkey=400000┌────key─┬─ts─┬─value_a──────────────────┬─value_b──────────────────────────┬─value_c─────┬─is_active─┐│400000│2│totallydifferentstring│anothertotallydifferentstring│laststring│1│└────────┴────┴──────────────────────────┴──────────────────────────────────┴─────────────┴───────────┘
Soft Deletion (via ALTER UPDATE): A quicker approach that does not involve physical data deletion but rather updates the tombstone column.
Hard Deletion (via ALTER DELETE): Can take significantly longer, especially with large datasets stored in remote storage like S3.
Optimizing for Faster Deletion with S3 Storage
If using S3 for storage, the DELETE operation becomes even slower due to the overhead of handling remote data. Here’s an example with a table using S3-backed storage:
CREATETABLEtest_delete(`key`UInt32,`value_a`String,`value_b`String,`value_c`String,`is_deleted`UInt8DEFAULT0)ENGINE=MergeTreeORDERBYkeySETTINGSstorage_policy='s3tiered';INSERTINTOtest_delete(key,value_a,value_b,value_c)SELECTnumber,concat('some_looong_string',toString(number)),concat('another_long_str',toString(number)),concat('really long string',toString(arrayMap(i->cityHash64(i*number),range(50))))FROMnumbers(10000000);OPTIMIZETABLEtest_deleteFINAL;ALTERTABLEtest_deleteMOVEPARTITIONtuple()TODISK's3disk';SELECTcount()FROMtest_delete;┌──count()─┐│10000000│└──────────┘1rowinset.Elapsed:0.002sec.
DELETE Using ALTER UPDATE and Row Policy
You can also control visibility at the query level using row policies. For example, to only show rows where is_active = 1:
To delete a row using ALTER UPDATE:
CREATEROWPOLICYpol1ONtest_deleteUSINGis_active=1TOall;SELECTcount()FROMtest_delete;-- select count() became much slower, it reads data now, not metadata
┌──count()─┐│10000000│└──────────┘1rowinset.Elapsed:0.314sec.Processed10.00millionrows,10.00MB(31.84millionrows/s.,31.84MB/s.)ALTERTABLEtest_deleteUPDATEis_active=0WHERE(key=400000)settingsmutations_sync=2;0rowsinset.Elapsed:1.256sec.SELECTcount()FROMtest_delete;┌─count()─┐│9999999│└─────────┘
This impacts the performance of queries like SELECT count(), as ClickHouse now needs to scan data instead of reading metadata.
This operation is faster, with an elapsed time of around 1.28 seconds in this case:
The choice between ALTER UPDATE and ALTER DELETE depends on your use case. For soft deletes, updating a tombstone column is significantly faster and easier to manage. However, if you need to physically remove rows, be mindful of the performance costs, especially with remote storage like S3.
Since 25.6 - final supports skip indexes (use_skip_indexes_if_final=1 by default)
Since 25.12 - apply_prewhere_after_final and apply_row_policy_after_final settings for correct PREWHERE/row policy handling with FINAL
Since 26.2 - enable_automatic_decision_for_merging_across_partitions_for_final=1 by default (auto-enables cross-partition optimization when safe)
Partitioning
Proper partition design could speed up FINAL processing.
For example, if you have a table with Daily partitioning, you can:
After day end + some time interval during which you can get some updates run OPTIMIZE TABLE xxx PARTITION 'prev_day' FINAL
or add table SETTINGS min_age_to_force_merge_seconds=86400,min_age_to_force_merge_on_partition_only=1
In that case, using FINAL with do_not_merge_across_partitions_select_final will be cheap or even zero.
Example:
DROPTABLEIFEXISTSrepl_tbl;CREATETABLErepl_tbl(`key`UInt32,`val_1`UInt32,`val_2`String,`val_3`String,`val_4`String,`val_5`UUID,`ts`DateTime)ENGINE=ReplacingMergeTree(ts)PARTITIONBYtoDate(ts)ORDERBYkey;INSERTINTOrepl_tblSELECTnumberaskey,rand()asval_1,randomStringUTF8(10)asval_2,randomStringUTF8(5)asval_3,randomStringUTF8(4)asval_4,generateUUIDv4()asval_5,'2020-01-01 00:00:00'astsFROMnumbers(10000000);OPTIMIZETABLErepl_tblPARTITIONID'20200101'FINAL;INSERTINTOrepl_tblSELECTnumberaskey,rand()asval_1,randomStringUTF8(10)asval_2,randomStringUTF8(5)asval_3,randomStringUTF8(4)asval_4,generateUUIDv4()asval_5,'2020-01-02 00:00:00'astsFROMnumbers(10000000);OPTIMIZETABLErepl_tblPARTITIONID'20200102'FINAL;INSERTINTOrepl_tblSELECTnumberaskey,rand()asval_1,randomStringUTF8(10)asval_2,randomStringUTF8(5)asval_3,randomStringUTF8(4)asval_4,generateUUIDv4()asval_5,'2020-01-03 00:00:00'astsFROMnumbers(10000000);OPTIMIZETABLErepl_tblPARTITIONID'20200103'FINAL;INSERTINTOrepl_tblSELECTnumberaskey,rand()asval_1,randomStringUTF8(10)asval_2,randomStringUTF8(5)asval_3,randomStringUTF8(4)asval_4,generateUUIDv4()asval_5,'2020-01-04 00:00:00'astsFROMnumbers(10000000);OPTIMIZETABLErepl_tblPARTITIONID'20200104'FINAL;SYSTEMSTOPMERGESrepl_tbl;INSERTINTOrepl_tblSELECTnumberaskey,rand()asval_1,randomStringUTF8(10)asval_2,randomStringUTF8(5)asval_3,randomStringUTF8(4)asval_4,generateUUIDv4()asval_5,'2020-01-05 00:00:00'astsFROMnumbers(10000000);SELECTcount()FROMrepl_tblWHERENOTignore(*)┌──count()─┐│50000000│└──────────┘1rowsinset.Elapsed:1.504sec.Processed50.00millionrows,6.40GB(33.24millionrows/s.,4.26GB/s.)SELECTcount()FROMrepl_tblFINALWHERENOTignore(*)┌──count()─┐│10000000│└──────────┘1rowsinset.Elapsed:3.314sec.Processed50.00millionrows,6.40GB(15.09millionrows/s.,1.93GB/s.)/* more that 2 time slower, and will get worse once you will have more data */setdo_not_merge_across_partitions_select_final=1;SELECTcount()FROMrepl_tblFINALWHERENOTignore(*)┌──count()─┐│50000000│└──────────┘1rowsinset.Elapsed:1.850sec.Processed50.00millionrows,6.40GB(27.03millionrows/s.,3.46GB/s.)/* only 0.35 sec slower, and while partitions have about the same size that extra cost will be about constant */
Since 26.2, enable_automatic_decision_for_merging_across_partitions_for_final=1 (default) auto-enables this when partition key columns are included in PRIMARY KEY
Light ORDER BY
All columns specified in ORDER BY will be read during FINAL processing, creating additional disk load. Use fewer columns and lighter column types to create faster queries.
Example: UUID vs UInt64
CREATE TABLE uuid_table (id UUID, value UInt64) ENGINE = ReplacingMergeTree() ORDER BY id;
CREATE TABLE uint64_table (id UInt64,value UInt64) ENGINE = ReplacingMergeTree() ORDER BY id;
INSERT INTO uuid_table SELECT generateUUIDv4(), number FROM numbers(5E7);
INSERT INTO uint64_table SELECT number, number FROM numbers(5E7);
SELECT sum(value) FROM uuid_table FINAL format JSON;
SELECT sum(value) FROM uint64_table FINAL format JSON;
When enable_vertical_final=1 (default since 24.1), ClickHouse uses a different deduplication strategy:
Marks duplicate rows as deleted instead of merging them immediately
Filters deleted rows in a later processing step
Reads different columns from different parts in parallel
This improves performance for queries that read only a subset of columns, as non-ORDER BY columns can be read independently from different parts.
PREWHERE and Row Policies with FINAL (25.12+)
By default, PREWHERE and row policies are applied before FINAL deduplication. This can cause incorrect results when:
PREWHERE references columns that differ across duplicate rows
Row policies should filter based on the “winning” row values after deduplication
Use these settings when needed:
apply_prewhere_after_final=1 - Apply PREWHERE after deduplication
apply_row_policy_after_final=1 - Apply row policies after deduplication
Example problem: if you have ReplacingMergeTree with a deleted column and PREWHERE filters on it, without apply_prewhere_after_final=1 you may get wrong results because PREWHERE sees rows before FINAL picks the winner.
FINAL with skip indexes:
Both use_skip_indexes_if_final and use_skip_indexes_if_final_exact_mode are enabled by default since 25.6
The main purpose of JOIN table engine is to avoid building the right table for joining on each query execution. So it’s usually used when you have a high amount of fast queries which share the same right table for joining.
Updates
It’s possible to update rows with setting join_any_take_last_row enabled.
If we look at our query, we only care if sale belongs to customer named James or Lisa and dont care for rest of cases. We can use that.
Usually, ClickHouse is able to pushdown conditions, but not in that case, when conditions itself part of function expression, so you can manually help in those cases.
Reduce attribute columns (push expression before JOIN step)
Our row from the right table consists of 2 fields: customer_sk and c_first_name.
First one is needed to JOIN by it, so it’s not much we can do here, but we can transform a bit of the second column.
Again, let’s look in how we use this column in main query:
We calculate 2 simple conditions(which don’t have any dependency on data from the left table) and nothing more.
It does mean that we can move this calculation to the right table, it will make 3 improvements!
Right table will be smaller -> smaller RAM usage -> better cache hits
We will calculate our conditions over a smaller data set. In the right table we have only 10 million rows and after joining because of the left table we have 2 billion rows -> 200 times improvement!
Our resulting table after JOIN will not have an expensive String column, only 1 byte UInt8 instead -> less copy of data in memory.
Let’s do it:
There are several ways to rewrite that query, let’s not bother with simple once and go straight to most optimized:
Put our 2 conditions in hand-made bitmask:
In order to do that we will take our conditions and multiply them by
As you can see, if you do it in that way, your conditions will not interfere with each other!
But we need to be careful with the wideness of the resulting numeric type.
Let’s write our calculations in type notation:
UInt8 + UInt8*2 -> UInt8 + UInt16 -> UInt32
But we actually do not use more than first 2 bits, so we need to cast this expression back to UInt8
Last thing to do is use the bitTest function in order to get the result of our condition by its position.
But can we make something with our JOIN key column?
It’s type is Nullable(UInt64)
Let’s check if we really need to have a 0…18446744073709551615 range for our customer id, it sure looks like that we have much less people on earth than this number. The same about Nullable trait, we don’t care about Nulls in customer_id
SELECT max(c_customer_sk) FROM customer
For sure, we don’t need that wide type.
Lets remove Nullable trait and cast column to UInt32, twice smaller in byte size compared to UInt64.
Another 10% perf improvement from using UInt32 key instead of Nullable(Int64)
Looks pretty neat, we almost got 10 times improvement over our initial query.
Can we do better?
Probably, but it does mean that we need to get rid of JOIN.
Use IN clause instead of JOIN
Despite that all DBMS support ~ similar feature set, feature performance on different database are different:
Small example, for PostgreSQL, is recommended to replace big IN clauses with JOINs, because IN clauses have bad performance.
But for ClickHouse it’s the opposite!, IN works faster than JOIN, because it only checks key existence in HashSet and doesn’t need to extract any data from the right table in IN.
But first is a short introduction. What the hell is a Dictionary with a FLAT layout?
Basically, it’s just a set of Array’s for each attribute where the value position in the attribute array is just a dictionary key
For sure it put heavy limitation about what dictionary key could be, but it gives really good advantages:
It’s really small memory usage (good cache hit rate) & really fast key lookups (no complex hash calculation)
So, if it’s that great what are the caveats?
First one is that your keys should be ideally autoincremental (with small number of gaps)
And for second, lets look in that simple query and write down all calculations:
SELECTsumIf(ss_sales_price,dictGet(...)='James')
Dictionary call (2 billion times)
String equality check (2 billion times)
Although it’s really efficient in terms of dictGet call and memory usage by Dictionary, it still materializes the String column (memcpy) and we pay a penalty of execution condition on top of such a string column for each row.
But what if we could first calculate our required condition and create such a “Dictionary” ad hoc in query time?
And we can actually do that!
But let’s repeat our analysis again:
As we can see, that Array approach doesn’t even notice that we increased the amount of conditions by 2 times.
2.25 - JSONExtract to parse many attributes at a time
JSONExtract to parse many attributes at a time
Don’t use several JSONExtract for parsing big JSON. It’s very ineffective, slow, and consumes CPU. Try to use one JSONExtract to parse String to Tupes and next get the needed elements:
WITHJSONExtract(json,'Tuple(name String, id String, resources Nested(description String, format String, tracking_summary Tuple(total UInt32, recent UInt32)), extras Nested(key String, value String))')ASparsed_jsonSELECTtupleElement(parsed_json,'name')ASname,tupleElement(parsed_json,'id')ASid,tupleElement(tupleElement(parsed_json,'resources'),'description')AS`resources.description`,tupleElement(tupleElement(parsed_json,'resources'),'format')AS`resources.format`,tupleElement(tupleElement(tupleElement(parsed_json,'resources'),'tracking_summary'),'total')AS`resources.tracking_summary.total`,tupleElement(tupleElement(tupleElement(parsed_json,'resources'),'tracking_summary'),'recent')AS`resources.tracking_summary.recent`FROMurl('https://raw.githubusercontent.com/jsonlines/guide/master/datagov100.json','JSONAsString','json String')
However, such parsing requires static schema - all keys should be presented in every row, or you will get an empty structure. More dynamic parsing requires several JSONExtract invocations, but still - try not to scan the same data several times:
For very subnested dynamic JSON files, if you don’t need all the keys, you could parse sublevels specifically. Still this will require several JSONExtract calls but each call will have less data to parse so complexity will be reduced for each pass: O(log n)
CREATETABLEbetter_parsing(jsonString)ENGINE=Memory;INSERTINTObetter_parsingFORMATJSONAsString{"timestamp":"2024-06-12T14:30:00.001Z","functionality":"DOCUMENT","flowId":"210abdee-6de5-474a-83da-748def0facc1","step":"BEGIN","env":"dev","successful":true,"data":{"action":"initiate_view","stats":{"total":1,"success":1,"failed":0},"client_ip":"192.168.1.100","client_port":"8080"}}WITHparsed_contentAS(SELECTJSONExtractKeysAndValues(json,'String')AS1st_level_arr,mapFromArrays(1st_level_arr.1,1st_level_arr.2)AS1st_level_map,JSONExtractKeysAndValues(1st_level_map['data'],'String')AS2nd_level_arr,mapFromArrays(2nd_level_arr.1,2nd_level_arr.2)AS2nd_level_map,JSONExtractKeysAndValues(2nd_level_map['stats'],'String')AS3rd_level_arr,mapFromArrays(3rd_level_arr.1,3rd_level_arr.2)AS3rd_level_mapFROMjson_tests.better_parsing)SELECT1st_level_map['timestamp']AStimestamp,2nd_level_map['action']ASaction,3rd_level_map['total']AStotal3rd_level_map['nokey']ASno_key_emptyFROMparsed_content/*
┌─timestamp────────────────┬─action────────┬─total─┬─no_key_empty─┐
1. │ 2024-06-12T14:30:00.001Z │ initiate_view │ 1 │ │
└──────────────────────────┴───────────────┴───────┴──────────────┘
1 row in set. Elapsed: 0.003 sec.
*/
2.26 - KILL QUERY
KILL QUERY
Unfortunately not all queries can be killed.
KILL QUERY only sets a flag that must be checked by the query.
A query pipeline is checking this flag before a switching to next block. If the pipeline has stuck somewhere in the middle it cannot be killed.
If a query does not stop, the only way to get rid of it is to restart ClickHouse®.
Q. We are trying to abort running queries when they are being replaced with a new one. We are setting the same query id for this. In some cases this error happens:
Query with id = e213cc8c-3077-4a6c-bc78-e8463adad35d is already running and can’t be stopped
The query is still being killed but the new one is not being executed. Do you know anything about this and if there is a fix or workaround for it?
I guess you use replace_running_query + replace_running_query_max_wait_ms.
Unfortunately it’s not always possible to kill the query at random moment of time.
Kill don’t send any signals, it just set a flag. Which gets (synchronously) checked at certain moments of query execution, mostly after finishing processing one block and starting another.
On certain stages (executing scalar sub-query) the query can not be killed at all. This is a known issue and requires an architectural change to fix it.
I see. Is there a workaround?
This is our use case:
A user requests an analytics report which has a query that takes several settings, the user makes changes to the report (e.g. to filters, metrics, dimensions…). Since the user changed what he is looking for the query results from the initial query are never used and we would like to cancel it when starting the new query (edited)
How to know if ALTER TABLE … DELETE/UPDATE mutation ON CLUSTER was finished successfully on all the nodes?
A. mutation status in system.mutations is local to each replica, so use
SELECThostname(),*FROMclusterAllReplicas('your_cluster_name',system.mutations);-- you can also add WHERE conditions to that query if needed.
Look on is_done and latest_fail_reason columns
Are mutations being run in parallel or they are sequential in ClickHouse® (in scope of one table)
ClickHouse runs mutations sequentially, but it can combine several mutations in a single and apply all of them in one merge.
Sometimes, it can lead to problems, when a combined expression which ClickHouse needs to execute becomes really big. (If ClickHouse combined thousands of mutations in one)
Because ClickHouse stores data in independent parts, ClickHouse is able to run mutation(s) merges for each part independently and in parallel.
It also can lead to high resource utilization, especially memory usage if you use x IN (SELECT ... FROM big_table) statements in mutation, because each merge will run and keep in memory its own HashSet. You can avoid this problem, if you will use Dictionary approach
for such mutations.
Parallelism of mutations controlled by settings:
SELECT*FROMsystem.merge_tree_settingsWHEREnameLIKE'%mutation%'┌─name───────────────────────────────────────────────┬─value─┬─changed─┬─description──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┬─type───┐│max_replicated_mutations_in_queue│8│0│HowmanytasksofmutatingpartsareallowedsimultaneouslyinReplicatedMergeTreequeue.│UInt64││number_of_free_entries_in_pool_to_execute_mutation│20│0│Whenthereislessthanspecifiednumberoffreeentriesinpool,donotexecutepartmutations.Thisistoleavefreethreadsforregularmergesandavoid"Too many parts"│UInt64│└────────────────────────────────────────────────────┴───────┴─────────┴──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┴────────┘
2.30 - OPTIMIZE vs OPTIMIZE FINAL
OPTIMIZE vs OPTIMIZE FINAL
OPTIMIZE TABLE xyz – this initiates an unscheduled merge.
Example
You have 40 parts in 3 partitions. This unscheduled merge selects some partition (i.e. February) and selects 3 small parts to merge, then merge them into a single part. You get 38 parts in the result.
OPTIMIZE TABLE xyz FINAL – initiates a cycle of unscheduled merges.
ClickHouse® merges parts in this table until will remains 1 part in each partition (if a system has enough free disk space). As a result, you get 3 parts, 1 part per partition. In this case, ClickHouse rewrites parts even if they are already merged into a single part. It creates a huge CPU / Disk load if the table (XYZ) is huge. ClickHouse reads / uncompress / merge / compress / writes all data in the table.
If this table has size 1TB it could take around 3 hours to complete.
So we don’t recommend running OPTIMIZE TABLE xyz FINAL against tables with more than 10million rows.
2.31 - Parameterized views
Parameterized views
ClickHouse® versions 23.1+ (23.1.6.42, 23.2.5.46, 23.3.1.2823)
have inbuilt support for parametrized views
:
A server restart is required for the default value to be applied
$ systemctl restart clickhouse-server
Now you can set settings as any other settings, and query them using getSetting() function.
SETmy2_category='hot deals';SELECTgetSetting('my2_category');┌─getSetting('my2_category')─┐│hotdeals│└────────────────────────────┘-- you can query ClickHouse settings as well
SELECTgetSetting('max_threads')┌─getSetting('max_threads')─┐│8│└───────────────────────────┘
SELECTsuppkey,brand,category,quantityFROMsales_wARRAYJOIN[AA,AB,AC,AD]ASquantity,splitByString(', ','AA, AB, AC, AD')AScategoryORDERBYsuppkeyASC┌─suppkey─┬─brand───┬─category─┬─quantity─┐│1│BRAND_A│AA│1500││1│BRAND_A│AB│4200││1│BRAND_A│AC│1600││1│BRAND_A│AD│9800││2│BRAND_B│AA│6200││2│BRAND_B│AB│1300││2│BRAND_B│AC│5800││2│BRAND_B│AD│3100││3│BRAND_C│AA│5000││3│BRAND_C│AB│8900││3│BRAND_C│AC│6900││3│BRAND_C│AD│3400│└─────────┴─────────┴──────────┴──────────┘SELECTsuppkey,brand,tpl.1AScategory,tpl.2ASquantityFROMsales_wARRAYJOINtupleToNameValuePairs(CAST((AA,AB,AC,AD),'Tuple(AA UInt32, AB UInt32, AC UInt32, AD UInt32)'))AStplORDERBYsuppkeyASC┌─suppkey─┬─brand───┬─category─┬─quantity─┐│1│BRAND_A│AA│1500││1│BRAND_A│AB│4200││1│BRAND_A│AC│1600││1│BRAND_A│AD│9800││2│BRAND_B│AA│6200││2│BRAND_B│AB│1300││2│BRAND_B│AC│5800││2│BRAND_B│AD│3100││3│BRAND_C│AA│5000││3│BRAND_C│AB│8900││3│BRAND_C│AC│6900││3│BRAND_C│AD│3400│└─────────┴─────────┴──────────┴──────────┘
2.34 - Possible deadlock avoided. Client should retry
Possible deadlock avoided. Client should retry
In ClickHouse® version 19.14 a serious issue was found: a race condition that can lead to server deadlock. The reason for that was quite fundamental, and a temporary workaround for that was added (“possible deadlock avoided”).
Those locks are one of the fundamental things that the core team was actively working on in 2020.
In 20.3 some of the locks leading to that situation were removed as a part of huge refactoring.
In 20.4 more locks were removed, the check was made configurable (see lock_acquire_timeout ) so you can say how long to wait before returning that exception
In 20.5 heuristics of that check (“possible deadlock avoided”) was improved.
In 20.6 all table-level locks which were possible to remove were removed, so alters are totally lock-free.
20.10 enables database=Atomic by default which allows running even DROP commands without locks.
Typically issue was happening when doing some concurrent select on system.parts / system.columns / system.table with simultaneous table manipulations (doing some kind of ALTERS / TRUNCATES / DROP)I
If that exception happens often in your use-case:
use recent clickhouse versions
ensure you use Atomic engine for the database (not Ordinary) (can be checked in system.databases)
Sometime you can try to workaround issue by finding the queries which uses that table concurently (especially to system.tables / system.parts and other system tables) and try killing them (or avoiding them).
The execution pipeline is embedded in the partition reading code.
So that works this way:
ClickHouse® does partition pruning based on WHERE conditions.
For every partition, it picks a columns ranges (aka ‘marks’ / ‘granulas’) based on primary key conditions.
Here the sampling logic is applied: a) in case of SAMPLE k (k in 0..1 range) it adds conditions WHERE sample_key < k * max_int_of_sample_key_type b) in case of SAMPLE k OFFSET m it adds conditions WHERE sample_key BETWEEN m * max_int_of_sample_key_type AND (m + k) * max_int_of_sample_key_typec) in case of SAMPLE N (N>1) if first estimates how many rows are inside the range we need to read and based on that convert it to 3a case (calculate k based on number of rows in ranges and desired number of rows)
on the data returned by those other conditions are applied (so here the number of rows can be decreased here)
Uniformly distributed in the domain of its data type:
Bad: Timestamp;
Good: intHash32(UserID);
Cheap to calculate:
Bad: cityHash64(URL);
Good: intHash32(UserID);
Not after high granular fields in primary key:
Bad: ORDER BY (Timestamp, sample_key);
Good: ORDER BY (CounterID, Date, sample_key).
Sampling is:
Deterministic
Works in a consistent way for different tables.
Allows reading less amount of data from disk.
SAMPLE key, bonus
SAMPLE 1/10
Select data for 1/10 of all possible sample keys; SAMPLE 1000000
Select from about (not less than) 1 000 000 rows on each shard;
You can use _sample_factor virtual column to determine the relative sample factor; SAMPLE 1/10 OFFSET 1/10
Select second 1/10 of all possible sample keys; SET max_parallel_replicas = 3
Select from multiple replicas of each shard in parallel;
SAMPLE emulation via WHERE condition
Sometimes, it’s easier to emulate sampling via conditions in WHERE clause instead of using SAMPLE key.
SELECT count() FROM table WHERE ... AND cityHash64(some_high_card_key) % 10 = 0; -- Deterministic
SELECT count() FROM table WHERE ... AND rand() % 10 = 0; -- Non-deterministic
ClickHouse will read more data from disk compared to an example with a good SAMPLE key, but it’s more universal and can be used if you can’t change table ORDER BY key. (To learn more about ClickHouse internals, Administrator Training for ClickHouse
is available.)
The following example demonstrates how sampling can be setup correctly, and an example if it being set up incorrectly as a comparison.
Sampling requires sample by expression . This ensures a range of sampled column types fit within a specified range, which ensures the requirement of low cardinality. In this example, I cannot use transaction_id because I can not ensure that the min value of transaction_id = 0 and max value = MAX_UINT64. Instead, I used cityHash64(transaction_id)to expand the range within the minimum and maximum values.
For example if all values of transaction_id are from 0 to 10000 sampling will be inefficient. But cityHash64(transaction_id) expands the range from 0 to 18446744073709551615:
If I used transaction_id without knowing that they matched the allowable ranges, the results of sampled queries would be skewed. For example, when using sample 0.5, ClickHouse requests where sample_col >= 0 and sample_col <= MAX_UINT64/2.
Also you can include multiple columns into a hash function of the sampling expression to improve randomness of the distribution cityHash64(transaction_id, banner_id).
I reduced the granularity of the timestamp column to one hour with toStartOfHour(toDateTime(timestamp)) , otherwise sampling will not work.
Verifying Sampling Works
The following shows that sampling works with the table and parameters described above. Notice the Elapsed time when invoking sampling:
-- Q1. No where filters.
-- The query is 10 times faster with SAMPLE 0.01
selectbanner_id,sum(value),count(value),max(value)fromtable_onegroupbybanner_idformatNull;0rowsinset.Elapsed:11.490sec.Processed10.00billionrows,60.00GB(870.30millionrows/s.,5.22GB/s.)selectbanner_id,sum(value),count(value),max(value)fromtable_oneSAMPLE0.01groupbybanner_idformatNull;0rowsinset.Elapsed:1.316sec.Processed452.67millionrows,6.34GB(343.85millionrows/s.,4.81GB/s.)-- Q2. Filter by the first column in index (banner_id = 42)
-- The query is 20 times faster with SAMPLE 0.01
-- reads 20 times less rows: 10.30 million rows VS Processed 696.32 thousand rows
selectbanner_id,sum(value),count(value),max(value)fromtable_oneWHEREbanner_id=42groupbybanner_idformatNull;0rowsinset.Elapsed:0.020sec.Processed10.30millionrows,61.78MB(514.37millionrows/s.,3.09GB/s.)selectbanner_id,sum(value),count(value),max(value)fromtable_oneSAMPLE0.01WHEREbanner_id=42groupbybanner_idformatNull;0rowsinset.Elapsed:0.008sec.Processed696.32thousandrows,9.75MB(92.49millionrows/s.,1.29GB/s.)-- Q3. No filters
-- The query is 10 times faster with SAMPLE 0.01
-- reads 20 times less rows.
selectbanner_id,toStartOfHour(toDateTime(timestamp))hr,sum(value),count(value),max(value)fromtable_onegroupbybanner_id,hrformatNull;0rowsinset.Elapsed:36.660sec.Processed10.00billionrows,140.00GB(272.77millionrows/s.,3.82GB/s.)selectbanner_id,toStartOfHour(toDateTime(timestamp))hr,sum(value),count(value),max(value)fromtable_oneSAMPLE0.01groupbybanner_id,hrformatNull;0rowsinset.Elapsed:3.741sec.Processed452.67millionrows,9.96GB(121.00millionrows/s.,2.66GB/s.)-- Q4. Filter by not indexed column
-- The query is 6 times faster with SAMPLE 0.01
-- reads 20 times less rows.
selectcount()fromtable_onewherevalue=666formatNull;1rowsinset.Elapsed:6.056sec.Processed10.00billionrows,40.00GB(1.65billionrows/s.,6.61GB/s.)selectcount()fromtable_oneSAMPLE0.01wherevalue=666formatNull;1rowsinset.Elapsed:1.214sec.Processed452.67millionrows,5.43GB(372.88millionrows/s.,4.47GB/s.)
This is the same as our other table, BUT granularity of timestamp column is not reduced.
Verifying Sampling Does Not Work
The following tests shows that sampling is not working because of the lack of timestamp granularity. The Elapsed time is longer when sampling is used.
-- Q1. No where filters.
-- The query is 2 times SLOWER!!! with SAMPLE 0.01
-- Because it needs to read excessive column with sampling data!
selectbanner_id,sum(value),count(value),max(value)fromtable_onegroupbybanner_idformatNull;0rowsinset.Elapsed:11.196sec.Processed10.00billionrows,60.00GB(893.15millionrows/s.,5.36GB/s.)selectbanner_id,sum(value),count(value),max(value)fromtable_oneSAMPLE0.01groupbybanner_idformatNull;0rowsinset.Elapsed:24.378sec.Processed10.00billionrows,140.00GB(410.21millionrows/s.,5.74GB/s.)-- Q2. Filter by the first column in index (banner_id = 42)
-- The query is SLOWER with SAMPLE 0.01
selectbanner_id,sum(value),count(value),max(value)fromtable_oneWHEREbanner_id=42groupbybanner_idformatNull;0rowsinset.Elapsed:0.022sec.Processed10.27millionrows,61.64MB(459.28millionrows/s.,2.76GB/s.)selectbanner_id,sum(value),count(value),max(value)fromtable_oneSAMPLE0.01WHEREbanner_id=42groupbybanner_idformatNull;0rowsinset.Elapsed:0.037sec.Processed10.27millionrows,143.82MB(275.16millionrows/s.,3.85GB/s.)-- Q3. No filters
-- The query is SLOWER with SAMPLE 0.01
selectbanner_id,toStartOfHour(toDateTime(timestamp))hr,sum(value),count(value),max(value)fromtable_onegroupbybanner_id,hrformatNull;0rowsinset.Elapsed:21.663sec.Processed10.00billionrows,140.00GB(461.62millionrows/s.,6.46GB/s.)selectbanner_id,toStartOfHour(toDateTime(timestamp))hr,sum(value),count(value),max(value)fromtable_oneSAMPLE0.01groupbybanner_id,hrformatNull;0rowsinset.Elapsed:26.697sec.Processed10.00billionrows,220.00GB(374.57millionrows/s.,8.24GB/s.)-- Q4. Filter by not indexed column
-- The query is SLOWER with SAMPLE 0.01
selectcount()fromtable_onewherevalue=666formatNull;0rowsinset.Elapsed:7.679sec.Processed10.00billionrows,40.00GB(1.30billionrows/s.,5.21GB/s.)selectcount()fromtable_oneSAMPLE0.01wherevalue=666formatNull;0rowsinset.Elapsed:21.668sec.Processed10.00billionrows,120.00GB(461.51millionrows/s.,5.54GB/s.)
2.38 - Simple aggregate functions & combinators
Simple aggregate functions & combinators
Q. What is SimpleAggregateFunction? Are there advantages to use it instead of AggregateFunction in AggregatingMergeTree?
The ClickHouse® SimpleAggregateFunction can be used for those aggregations when the function state is exactly the same as the resulting function value. Typical example is max function: it only requires storing the single value which is already maximum, and no extra steps needed to get the final value. In contrast avg need to store two numbers - sum & count, which should be divided to get the final value of aggregation (done by the -Merge step at the very end).
SimpleAggregateFunction
AggregateFunction
inserting
accepts the value of underlying type OR
a value of corresponding SimpleAggregateFunction type
CREATE TABLE saf_test ( x SimpleAggregateFunction(max, UInt64) ) ENGINE=AggregatingMergeTree ORDER BY tuple();
INSERT INTO saf_test VALUES (1); INSERT INTO saf_test SELECT max(number) FROM numbers(10); INSERT INTO saf_test SELECT maxSimpleState(number) FROM numbers(20);
ONLY accepts the state of same aggregate function calculated using -State
combinator
storing
Internally store just a value of underlying type
function-specific state
storage usage
typically is much better due to better compression/codecs
in very rare cases it can be more optimal than raw values
adaptive granularity doesn't work for large states
reading raw value per row
you can access it directly
you need to use finalizeAggregation function
using aggregated value
just
select max(x) from test;
you need to use -Merge combinator select maxMerge(x) from test;
memory usage
typically less memory needed (in some corner cases even 10 times)
typically uses more memory, as every state can be quite complex
Q. How maxSimpleState combinator result differs from plain max?
They produce the same result, but types differ (the first have SimpleAggregateFunction datatype). Both can be pushed to SimpleAggregateFunction or to the underlying type. So they are interchangeable.
Info
-SimpleState is useful for implicit Materialized View creation, like
CREATE MATERIALIZED VIEW mv ENGINE = AggregatingMergeTree ORDER BY date AS SELECT date, sumSimpleState(1) AS cnt, sumSimpleState(revenue) AS rev FROM table GROUP BY date
Q. Can I use -If combinator with SimpleAggregateFunction?
Something like SimpleAggregateFunction(maxIf, UInt64, UInt8) is NOT possible. But is 100% ok to push maxIf (or maxSimpleStateIf) into SimpleAggregateFunction(max, UInt64)
There is one problem with that approach:
-SimpleStateIf Would produce 0 as result in case of no-match, and it can mess up some aggregate functions state. It wouldn’t affect functions like max/argMax/sum, but could affect functions like min/argMin/any/anyLast
Set result to some big number in case of no-match, which would be bigger than any possible value, so it would be safe to use. But it would work only for min/argMin
WITHminIfState(number,number>5)ASstate_1,minSimpleStateIf(number,number>5)ASstate_2SELECTbyteSize(state_1),toTypeName(state_1),byteSize(state_2),toTypeName(state_2)FROMnumbers(10)FORMATVertical-- For UInt64
Row1:──────byteSize(state_1):24toTypeName(state_1):AggregateFunction(minIf,UInt64,UInt8)byteSize(state_2):8toTypeName(state_2):SimpleAggregateFunction(min,UInt64)-- For UInt32
──────byteSize(state_1):16byteSize(state_2):4-- For UInt16
──────byteSize(state_1):12byteSize(state_2):2-- For UInt8
──────byteSize(state_1):10byteSize(state_2):1
ClickHouse® provides a type of index that in specific circumstances can significantly improve query speed. These structures are labeled “skip” indexes because they enable ClickHouse to skip reading significant chunks of data that are guaranteed to have no matching values.
2.39.1 - Example: minmax
Example: minmax
Use cases
Strong correlation between column from table ORDER BY / PARTITION BY key and other column which is regularly being used in WHERE condition
Good example is incremental ID which increasing with time.
Multiple Date/DateTime columns can be used in WHERE conditions
Usually it could happen if you have separate Date and DateTime columns and different column being used in PARTITION BY expression and in WHERE condition. Another possible scenario when you have multiple DateTime columns which have pretty the same date or even time.
As you can see ClickHouse read 110.00 million rows and the query elapsed Elapsed: 0.505 sec.
Let’s add an index
altertablebftestaddindexix1(x)TYPEbloom_filterGRANULARITY3;-- GRANULARITY 3 means how many table granules will be in the one index granule
-- In our case 1 granule of skip index allows to check and skip 3*8192 rows.
-- Every dataset is unique sometimes GRANULARITY 1 is better, sometimes
-- GRANULARITY 10.
-- Need to test on the real data.
optimizetablebftestfinal;-- I need to optimize my table because an index is created for only
-- new parts (inserted or merged)
-- optimize table final re-writes all parts, but with an index.
-- probably in your production you don't need to optimize
-- because your data is rotated frequently.
-- optimize is a heavy operation, better never run optimize table final in a
-- production.
createtablebftest(kInt64,xInt64)Engine=MergeTreeorderbyk;-- data is in x column is correlated with the primary key
insertintobftestselectnumber,number*2fromnumbers(100000000);altertablebftestaddindexix1(x)TYPEminmaxGRANULARITY1;altertablebftestmaterializeindexix1;selectcount()frombftestwherex=42;1rowsinset.Elapsed:0.004sec.Processed8.19thousandrows
projection
createtablebftest(kInt64,xInt64,SString)Engine=MergeTreeorderbyk;insertintobftestselectnumber,rand64()%565656,''fromnumbers(10000000);insertintobftestselectnumber,rand64()%565656,''fromnumbers(100000000);altertablebftestaddprojectionp1(selectk,xorderbyx);altertablebftestmaterializeprojectionp1settingsmutations_sync=1;setallow_experimental_projection_optimization=1;-- projection
selectcount()frombftestwherex=42;1rowsinset.Elapsed:0.002sec.Processed24.58thousandrows-- no projection
select*frombftestwherex=42formatNull;0rowsinset.Elapsed:0.432sec.Processed110.00millionrows-- projection
select*frombftestwherekin(selectkfrombftestwherex=42)formatNull;0rowsinset.Elapsed:0.316sec.Processed1.50millionrows
2.40 - Time zones
Time zones
Important things to know:
DateTime inside ClickHouse® is actually UNIX timestamp always, i.e. number of seconds since 1970-01-01 00:00:00 GMT.
Conversion from that UNIX timestamp to a human-readable form and reverse can happen on the client (for native clients) and on the server (for HTTP clients, and for some type of queries, like toString(ts))
Depending on the place where that conversion happened rules of different timezones may be applied.
You can check server timezone using SELECT timezone()
clickhouse-client
also by default tries to use server timezone (see also --use_client_time_zone flag)
If you want you can store the timezone name inside the data type, in that case, timestamp <-> human-readable time rules of that timezone will be applied.
ClickHouse uses system timezone info from tzdata package if it exists, and uses own builtin tzdata if it is missing in the system.
cd /usr/share/zoneinfo/Canada
ln -s ../America/Halifax A
TZ=Canada/A clickhouse-local -q 'select timezone()'
Canada/A
When the conversion using different rules happen
SELECTtimezone()┌─timezone()─┐│UTC│└────────────┘createtablet_with_dt_utc(tsDateTime64(3,'Europe/Moscow'))engine=Log;createtablex(tsString)engine=Null;creatematerializedviewx_mvtot_with_dt_utcasselectparseDateTime64BestEffort(ts)astsfromx;$echo'2021-07-15T05:04:23.733'|clickhouse-client-q'insert into t_with_dt_utc format CSV'-- here client checks the type of the columns, see that it's 'Europe/Moscow' and use conversion according to moscow rules
$echo'2021-07-15T05:04:23.733'|clickhouse-client-q'insert into x format CSV'-- here client check tha type of the columns (it is string), and pass string value to the server.
-- parseDateTime64BestEffort(ts) uses server default timezone (UTC in my case), and convert the value using UTC rules.
-- and the result is 2 different timestamps (when i selecting from that is shows both in 'desired' timezone, forced by column type, i.e. Moscow):
SELECT*FROMt_with_dt_utc┌──────────────────────ts─┐│2021-07-1505:04:23.733││2021-07-1508:04:23.733│└─────────────────────────┘
Best practice here: use UTC timezone everywhere, OR use the same default timezone for ClickHouse server as used by your data
2.41 - Time-series alignment with interpolation
Time-series alignment with interpolation
This article demonstrates how to perform time-series data alignment with interpolation using window functions in ClickHouse. The goal is to align two different time-series (A and B) on the same timestamp axis and fill the missing values using linear interpolation.
Step-by-Step Implementation
We begin by creating a table with test data that simulates two time-series (A and B) with randomly distributed timestamps and values. Then, we apply interpolation to fill missing values for each time-series based on the surrounding data points.
1. Drop Existing Table (if it exists)
DROPTABLEtest_ts_interpolation;
This ensures that any previous versions of the table are removed.
2. Generate Test Data
In this step, we generate random time-series data with timestamps and values for series A and B. The values are calculated differently for each series:
CREATETABLEtest_ts_interpolationENGINE=LogASSELECT((number*100)+50)-(rand()%100)AStimestamp,-- random timestamp generation
transform(rand()%2,[0,1],['A','B'],'')ASts,-- randomly assign series 'A' or 'B'
if(ts='A',timestamp*10,timestamp*100)ASvalue-- different value generation for each series
FROMnumbers(1000000);
Here, the timestamp is generated randomly and assigned to either series A or B using the transform() function. The value is calculated based on the series type (A or B), with different multipliers for each.
3. Preview the Generated Data
After generating the data, you can inspect it by running a simple SELECT query:
SELECT*FROMtest_ts_interpolation;
This will show the randomly generated timestamps, series (A or B), and their corresponding values.
4. Perform Interpolation with Window Functions
To align the time-series and interpolate missing values, we use window functions in the following query:
SELECTtimestamp,if(ts='A',toFloat64(value),-- If the current series is 'A', keep the original value
prev_a.2+(timestamp-prev_a.1)*(next_a.2-prev_a.2)/(next_a.1-prev_a.1)-- Interpolate for 'A'
)asa_value,if(ts='B',toFloat64(value),-- If the current series is 'B', keep the original value
prev_b.2+(timestamp-prev_b.1)*(next_b.2-prev_b.2)/(next_b.1-prev_b.1)-- Interpolate for 'B'
)asb_valueFROM(SELECTtimestamp,ts,value,-- Find the previous and next values for series 'A'
anyLastIf((timestamp,value),ts='A')OVER(ORDERBYtimestampROWSBETWEENUNBOUNDEDPRECEDINGAND1PRECEDING)ASprev_a,anyLastIf((timestamp,value),ts='A')OVER(ORDERBYtimestampDESCROWSBETWEENUNBOUNDEDPRECEDINGAND1PRECEDING)ASnext_a,-- Find the previous and next values for series 'B'
anyLastIf((timestamp,value),ts='B')OVER(ORDERBYtimestampROWSBETWEENUNBOUNDEDPRECEDINGAND1PRECEDING)ASprev_b,anyLastIf((timestamp,value),ts='B')OVER(ORDERBYtimestampDESCROWSBETWEENUNBOUNDEDPRECEDINGAND1PRECEDING)ASnext_bFROMtest_ts_interpolation)
Explanation:
Timestamp Alignment:
We align the timestamps of both series (A and B) and handle missing data points.
Interpolation Logic:
For each A-series timestamp, if the current series is not A, we calculate the interpolated value using the linear interpolation formula:
Similarly, for the B series, interpolation is calculated between the previous (prev_b) and next (next_b) known values.
Window Functions:
anyLastIf() is used to fetch the previous or next values for series A and B based on the timestamps.
We use window functions to efficiently calculate these values over the ordered sequence of timestamps.
By using window functions and interpolation, we can align time-series data with irregular timestamps and fill in missing values based on nearby data points. This technique is useful in scenarios where data is recorded at different times or irregular intervals across multiple series.
2.42 - Top N & Remain
Top N & Remain
When working with large datasets, you may often need to compute the sum of values for the top N groups and aggregate the remainder separately. This article demonstrates several methods to achieve that in ClickHouse.
Dataset Setup
We’ll start by creating a table top_with_rest and inserting data for demonstration purposes:
This method uses ROW_NUMBER() to segregate the top N from the rest.
Method 5: Using WITH TOTALS
This method includes totals for all groups, and you calculate the remainder on the application side.
SELECT
k,
sum(number) AS res
FROM top_with_rest
GROUP BY k
WITH TOTALS
ORDER BY res DESC
LIMIT 10
┌─k───┬───res─┐
│ 999 │ 99945 │
│ 998 │ 99845 │
│ 997 │ 99745 │
│ 996 │ 99645 │
│ 995 │ 99545 │
│ 994 │ 99445 │
│ 993 │ 99345 │
│ 992 │ 99245 │
│ 991 │ 99145 │
│ 990 │ 99045 │
└─────┴───────┘
Totals:
┌─k─┬──────res─┐
│ │ 49995000 │
└───┴──────────┘
You would subtract the sum of the top rows from the totals in your application.
These methods offer different approaches for handling the Top N rows and aggregating the remainder in ClickHouse. Depending on your requirements—whether you prefer using UNION ALL, arrays, window functions, or totals—each method provides flexibility for efficient querying.
2.43 - Troubleshooting
Tips for ClickHouse® troubleshooting
Query Execution Logging
When troubleshooting query execution in ClickHouse®, one of the most useful tools is logging the query execution details. This can be controlled using the session-level setting send_logs_level. Here are the different log levels you can use:
Possible values: 'trace', 'debug', 'information', 'warning', 'error', 'fatal', 'none'
This can be used with clickhouse-client
in both interactive and non-interactive mode.
The logs provide detailed information about query execution, making it easier to identify issues or bottlenecks. You can use the following command to run a query with logging enabled:
ClickHouse supports exporting query performance data in a format compatible with speedscope.app. This can help you visualize performance bottlenecks within queries. Example query to generate a flamegraph:
https://www.speedscope.app/
WITH'95578e1c-1e93-463c-916c-a1a8cdd08198'ASquery,min(min)ASstart_value,max(max)ASend_value,groupUniqArrayArrayArray(trace_arr)ASuniq_frames,arrayMap((x,a,b)->('sampled',b,'none',start_value,end_value,arrayMap(s->reverse(arrayMap(y->toUInt32(indexOf(uniq_frames,y)-1),s)),x),a),groupArray(trace_arr),groupArray(weights),groupArray(trace_type))ASsamplesSELECTconcat('clickhouse-server@',version())ASexporter,'https://www.speedscope.app/file-format-schema.json'AS`$schema`,concat('ClickHouse query id: ',query)ASname,CAST(samples,'Array(Tuple(type String, name String, unit String, startValue UInt64, endValue UInt64, samples Array(Array(UInt32)), weights Array(UInt32)))')ASprofiles,CAST(tuple(arrayMap(x->(demangle(addressToSymbol(x)),addressToLine(x)),uniq_frames)),'Tuple(frames Array(Tuple(name String, line String)))')ASsharedFROM(SELECTmin(min_ns)ASmin,trace_type,max(max_ns)ASmax,groupArray(trace)AStrace_arr,groupArray(cnt)ASweightsFROM(SELECTmin(timestamp_ns)ASmin_ns,max(timestamp_ns)ASmax_ns,trace,trace_type,count()AScntFROMsystem.trace_logWHEREquery_id=queryGROUPBYtrace_type,trace)GROUPBYtrace_type)SETTINGSallow_introspection_functions=1,output_format_json_named_tuples_as_objects=1FORMATJSONEachRow
And query to generate traces per thread
WITH'8e7e0616-cfaf-43af-a139-d938ced7655a'ASquery,min(min)ASstart_value,max(max)ASend_value,groupUniqArrayArrayArray(trace_arr)ASuniq_frames,arrayMap((x,a,b,c,d)->('sampled',concat(b,' - thread ',c.1,' - traces ',c.2),'nanoseconds',d.1-start_value,d.2-start_value,arrayMap(s->reverse(arrayMap(y->toUInt32(indexOf(uniq_frames,y)-1),s)),x),a),groupArray(trace_arr),groupArray(weights),groupArray(trace_type),groupArray((thread_id,total)),groupArray((min,max)))ASsamplesSELECTconcat('clickhouse-server@',version())ASexporter,'https://www.speedscope.app/file-format-schema.json'AS`$schema`,concat('ClickHouse query id: ',query)ASname,CAST(samples,'Array(Tuple(type String, name String, unit String, startValue UInt64, endValue UInt64, samples Array(Array(UInt32)), weights Array(UInt32)))')ASprofiles,CAST(tuple(arrayMap(x->(demangle(addressToSymbol(x)),addressToLine(x)),uniq_frames)),'Tuple(frames Array(Tuple(name String, line String)))')ASsharedFROM(SELECTmin(min_ns)ASmin,trace_type,thread_id,max(max_ns)ASmax,groupArray(trace)AStrace_arr,groupArray(cnt)ASweights,sum(cnt)astotalFROM(SELECTmin(timestamp_ns)ASmin_ns,max(timestamp_ns)ASmax_ns,trace,trace_type,thread_id,sum(if(trace_typeIN('Memory','MemoryPeak','MemorySample'),size,1))AScntFROMsystem.trace_logWHEREquery_id=queryGROUPBYtrace_type,trace,thread_id)GROUPBYtrace_type,thread_idORDERBYtrace_typeASC,totalDESC)SETTINGSallow_introspection_functions=1,output_format_json_named_tuples_as_objects=1,output_format_json_quote_64bit_integers=1FORMATJSONEachRow
By enabling detailed logging and tracing, you can effectively diagnose issues and optimize query performance in ClickHouse.
Update table metadata: schema .sql & metadata in ZK.
It’s usually cheap and fast command. And any new INSERT after schema change will calculate TTL according to new rule.
ALTER TABLE tbl MATERIALIZE TTL
Recalculate TTL for already exist parts.
It can be heavy operation, because ClickHouse® will read column data & recalculate TTL & apply TTL expression.
You can disable this step completely by using materialize_ttl_after_modify user session setting (by default it’s 1, so materialization is enabled).
If you will disable materialization of TTL, it does mean that all old parts will be transformed according OLD TTL rules.
MATERIALIZE TTL:
Recalculate TTL (Kinda cheap, it read only column participate in TTL)
Apply TTL (Rewrite of table data for all columns)
You also can only disable apply TTL substep via materialize_ttl_recalculate_only merge_tree setting (by default it’s 0, so clickhouse will apply TTL expression)
The idea of materialize_ttl_after_modify = 0 and materialize_ttl_recalculate_only = 1 is to use ALTER TABLE tbl MATERIALIZE TTL IN PARTITION xxx; ALTER TABLE tbl MATERIALIZE TTL IN PARTITION yyy; and materialize TTL gently or drop/move partitions manually until the old data without/old TTL is processed.
MATERIALIZE TTL done via Mutation:
ClickHouse create new parts via hardlinks and write new ttl.txt file
ClickHouse remove old(inactive) parts after remove time (default is 8 minutes)
20220401 ttl: 20220601 disk: s3
20220416 ttl: 20220616 disk: s3
20220501 ttl: 20220631 disk: s3 (ClickHouse will not move this part to local disk, because there is no TTL rule for that)
20220502 ttl: 20220701 disk: local
20220516 ttl: 20220716 disk: local
20220601 ttl: 20220731 disk: local
Decrease of TTL
TTL timestamp + INTERVAL 30 DAY MOVE TO DISK s3 -> TTL timestamp + INTERVAL 14 DAY MOVE TO DISK s3
Table parts:
20220401 ttl: 20220401 disk: s3
20220416 ttl: 20220516 disk: s3
20220501 ttl: 20220531 disk: s3
20220502 ttl: 20220601 disk: local
20220516 ttl: 20220616 disk: local
20220601 ttl: 20220631 disk: local
20220401 ttl: 20220415 disk: s3
20220416 ttl: 20220501 disk: s3
20220501 ttl: 20220515 disk: s3
20220502 ttl: 20220517 disk: local (ClickHouse will move this part to disk s3 in background according to TTL rule)
20220516 ttl: 20220601 disk: local (ClickHouse will move this part to disk s3 in background according to TTL rule)
20220601 ttl: 20220616 disk: local
Possible TTL Rules
TTL:
DELETE (With enabled `ttl_only_drop_parts`, it's cheap operation, ClickHouse will drop the whole part)
MOVE
GROUP BY
WHERE
RECOMPRESS
merge_with_ttl_timeout │ 14400 │ 0 │ Minimal time in seconds, when merge with delete TTL can be repeated.
merge_with_recompression_ttl_timeout │ 14400 │ 0 │ Minimal time in seconds, when merge with recompression TTL can be repeated.
max_replicated_merges_with_ttl_in_queue │ 1 │ 0 │ How many tasks of merging parts with TTL are allowed simultaneously in ReplicatedMergeTree queue.
max_number_of_merges_with_ttl_in_pool │ 2 │ 0 │ When there is more than specified number of merges with TTL entries in pool, do not assign new merge with TTL. This is to leave free threads for regular merges and avoid "Too many parts"
ttl_only_drop_parts │ 0 │ 0 │ Only drop altogether the expired parts and not partially prune them.
Session settings:
materialize_ttl_after_modify │ 1 │ 0 │ Apply TTL for old data, after ALTER MODIFY TTL query
2.44.2 - What are my TTL settings?
What are my TTL settings?
Using SHOW CREATE TABLE
If you just want to see the current TTL settings on a table, you can look at the schema definition.
SHOW CREATE TABLE events2_local
FORMAT Vertical
Query id: eba671e5-6b8c-4a81-a4d8-3e21e39fb76b
Row 1:
──────
statement: CREATE TABLE default.events2_local
(
`EventDate` DateTime,
`EventID` UInt32,
`Value` String
)
ENGINE = ReplicatedMergeTree('/clickhouse/{cluster}/tables/{shard}/default/events2_local', '{replica}')
PARTITION BY toYYYYMM(EventDate)
ORDER BY (EventID, EventDate)
TTL EventDate + toIntervalMonth(1)
SETTINGS index_granularity = 8192
This works even when there’s no data in the table. It does not tell you when the TTLs expire or anything specific to data in one or more of the table parts.
Using system.parts
If you want to see the actually TTL values for specific data, run a query on system.parts.
There are columns listing all currently applicable TTL limits for each part.
(It does not work if the table is empty because there aren’t any parts yet.)
During TTL merges ClickHouse® re-calculates values of columns in the SET section.
GROUP BY section should be a prefix of a table’s PRIMARY KEY (the same as ORDER BY, if no separate PRIMARY KEY defined).
-- stop merges to demonstrate data before / after
-- a rolling up
SYSTEMSTOPTTLMERGEStest_ttl_group_by;SYSTEMSTOPMERGEStest_ttl_group_by;INSERTINTOtest_ttl_group_by(key,ts,value)SELECTnumber%5,now()+number,1FROMnumbers(100);INSERTINTOtest_ttl_group_by(key,ts,value)SELECTnumber%5,now()-interval60day+number,2FROMnumbers(100);SELECTtoYYYYMM(ts)ASm,count(),sum(value),min(min_value),max(max_value)FROMtest_ttl_group_byGROUPBYm;┌──────m─┬─count()─┬─sum(value)─┬─min(min_value)─┬─max(max_value)─┐│202102│100│200│2│2││202104│100│100│1│1│└────────┴─────────┴────────────┴────────────────┴────────────────┘SYSTEMSTARTTTLMERGEStest_ttl_group_by;SYSTEMSTARTMERGEStest_ttl_group_by;OPTIMIZETABLEtest_ttl_group_byFINAL;SELECTtoYYYYMM(ts)ASm,count(),sum(value),min(min_value),max(max_value)FROMtest_ttl_group_byGROUPBYm;┌──────m─┬─count()─┬─sum(value)─┬─min(min_value)─┬─max(max_value)─┐│202102│5│200│2│2││202104│100│100│1│1│└────────┴─────────┴────────────┴────────────────┴────────────────┘
As you can see 100 rows were rolled up into 5 rows (key has 5 values) for rows older than 30 days.
Example with SummingMergeTree table
CREATETABLEtest_ttl_group_by(`key1`UInt32,`key2`UInt32,`ts`DateTime,`value`UInt32,`min_value`SimpleAggregateFunction(min,UInt32)DEFAULTvalue,`max_value`SimpleAggregateFunction(max,UInt32)DEFAULTvalue)ENGINE=SummingMergeTreePARTITIONBYtoYYYYMM(ts)PRIMARYKEY(key1,key2,toStartOfDay(ts))ORDERBY(key1,key2,toStartOfDay(ts),ts)TTLts+interval30dayGROUPBYkey1,key2,toStartOfDay(ts)SETvalue=sum(value),min_value=min(min_value),max_value=max(max_value),ts=min(toStartOfDay(ts));-- stop merges to demonstrate data before / after
-- a rolling up
SYSTEMSTOPTTLMERGEStest_ttl_group_by;SYSTEMSTOPMERGEStest_ttl_group_by;INSERTINTOtest_ttl_group_by(key1,key2,ts,value)SELECT1,1,toStartOfMinute(now()+number*60),1FROMnumbers(100);INSERTINTOtest_ttl_group_by(key1,key2,ts,value)SELECT1,1,toStartOfMinute(now()+number*60),1FROMnumbers(100);INSERTINTOtest_ttl_group_by(key1,key2,ts,value)SELECT1,1,toStartOfMinute(now()+number*60-toIntervalDay(60)),2FROMnumbers(100);INSERTINTOtest_ttl_group_by(key1,key2,ts,value)SELECT1,1,toStartOfMinute(now()+number*60-toIntervalDay(60)),2FROMnumbers(100);SELECTtoYYYYMM(ts)ASm,count(),sum(value),min(min_value),max(max_value)FROMtest_ttl_group_byGROUPBYm;┌──────m─┬─count()─┬─sum(value)─┬─min(min_value)─┬─max(max_value)─┐│202102│200│400│2│2││202104│200│200│1│1│└────────┴─────────┴────────────┴────────────────┴────────────────┘SYSTEMSTARTTTLMERGEStest_ttl_group_by;SYSTEMSTARTMERGEStest_ttl_group_by;OPTIMIZETABLEtest_ttl_group_byFINAL;SELECTtoYYYYMM(ts)ASm,count(),sum(value),min(min_value),max(max_value)FROMtest_ttl_group_byGROUPBYm;┌──────m─┬─count()─┬─sum(value)─┬─min(min_value)─┬─max(max_value)─┐│202102│1│400│2│2││202104│100│200│1│1│└────────┴─────────┴────────────┴────────────────┴────────────────┘
During merges ClickHouse re-calculates ts columns as min(toStartOfDay(ts)). It’s possible only for the last column of SummingMergeTreeORDER BY section ORDER BY (key1, key2, toStartOfDay(ts), ts) otherwise it will break the order of rows in the table.
Example with AggregatingMergeTree table
CREATETABLEtest_ttl_group_by_agg(`key1`UInt32,`key2`UInt32,`ts`DateTime,`counter`AggregateFunction(count,UInt32))ENGINE=AggregatingMergeTreePARTITIONBYtoYYYYMM(ts)PRIMARYKEY(key1,key2,toStartOfDay(ts))ORDERBY(key1,key2,toStartOfDay(ts),ts)TTLts+interval30dayGROUPBYkey1,key2,toStartOfDay(ts)SETcounter=countMergeState(counter),ts=min(toStartOfDay(ts));CREATETABLEtest_ttl_group_by_raw(`key1`UInt32,`key2`UInt32,`ts`DateTime)ENGINE=Null;CREATEMATERIALIZEDVIEWtest_ttl_group_by_mvTOtest_ttl_group_by_aggASSELECT`key1`,`key2`,`ts`,countState()ascounterFROMtest_ttl_group_by_rawGROUPBYkey1,key2,ts;-- stop merges to demonstrate data before / after
-- a rolling up
SYSTEMSTOPTTLMERGEStest_ttl_group_by_agg;SYSTEMSTOPMERGEStest_ttl_group_by_agg;INSERTINTOtest_ttl_group_by_raw(key1,key2,ts)SELECT1,1,toStartOfMinute(now()+number*60)FROMnumbers(100);INSERTINTOtest_ttl_group_by_raw(key1,key2,ts)SELECT1,1,toStartOfMinute(now()+number*60)FROMnumbers(100);INSERTINTOtest_ttl_group_by_raw(key1,key2,ts)SELECT1,1,toStartOfMinute(now()+number*60-toIntervalDay(60))FROMnumbers(100);INSERTINTOtest_ttl_group_by_raw(key1,key2,ts)SELECT1,1,toStartOfMinute(now()+number*60-toIntervalDay(60))FROMnumbers(100);SELECTtoYYYYMM(ts)ASm,count(),countMerge(counter)FROMtest_ttl_group_by_aggGROUPBYm;┌──────m─┬─count()─┬─countMerge(counter)─┐│202307│200│200││202309│200│200│└────────┴─────────┴─────────────────────┘SYSTEMSTARTTTLMERGEStest_ttl_group_by_agg;SYSTEMSTARTMERGEStest_ttl_group_by_agg;OPTIMIZETABLEtest_ttl_group_by_aggFINAL;SELECTtoYYYYMM(ts)ASm,count(),countMerge(counter)FROMtest_ttl_group_by_aggGROUPBYm;┌──────m─┬─count()─┬─countMerge(counter)─┐│202307│1│200││202309│100│200│└────────┴─────────┴─────────────────────┘
CREATETABLEtest_ttl_group_by(`key`UInt32,`ts`DateTime,`value`UInt32,`min_value`UInt32DEFAULTvalue,`max_value`UInt32DEFAULTvalue)ENGINE=MergeTreePARTITIONBYtoYYYYMM(ts)ORDERBY(key,toStartOfDay(ts))TTLts+interval180day,ts+interval30dayGROUPBYkey,toStartOfDay(ts)SETvalue=sum(value),min_value=min(min_value),max_value=max(max_value),ts=min(toStartOfDay(ts));-- stop merges to demonstrate data before / after
-- a rolling up
SYSTEMSTOPTTLMERGEStest_ttl_group_by;SYSTEMSTOPMERGEStest_ttl_group_by;INSERTINTOtest_ttl_group_by(key,ts,value)SELECTnumber%5,now()+number,1FROMnumbers(100);INSERTINTOtest_ttl_group_by(key,ts,value)SELECTnumber%5,now()-interval60day+number,2FROMnumbers(100);INSERTINTOtest_ttl_group_by(key,ts,value)SELECTnumber%5,now()-interval200day+number,3FROMnumbers(100);SELECTtoYYYYMM(ts)ASm,count(),sum(value),min(min_value),max(max_value)FROMtest_ttl_group_byGROUPBYm;┌──────m─┬─count()─┬─sum(value)─┬─min(min_value)─┬─max(max_value)─┐│202101│100│300│3│3││202106│100│200│2│2││202108│100│100│1│1│└────────┴─────────┴────────────┴────────────────┴────────────────┘SYSTEMSTARTTTLMERGEStest_ttl_group_by;SYSTEMSTARTMERGEStest_ttl_group_by;OPTIMIZETABLEtest_ttl_group_byFINAL;┌──────m─┬─count()─┬─sum(value)─┬─min(min_value)─┬─max(max_value)─┐│202106│5│200│2│2││202108│100│100│1│1│└────────┴─────────┴────────────┴────────────────┴────────────────┘
All columns have implicit default compression from server config, except event_time, that’s why need to change to compression to Default for this column otherwise it won’t be recompressed.
In case of Replicated installation, Dictionary should be created on all nodes and source tables should use the ReplicatedMergeTree
engine and be replicated across all nodes.
Info
Starting
from 20.4, ClickHouse® forbid by default any potential non-deterministic mutations.
This behavior controlled by setting allow_nondeterministic_mutations. You can append it to query like this ALTER TABLE xxx UPDATE ... WHERE ... SETTINGS allow_nondeterministic_mutations = 1;
For ON CLUSTER queries, you would need to put this setting in default profile and restart ClickHouse servers.
2.46 - Values mapping
Values mapping
SELECTcount()FROMnumbers_mt(1000000000)WHERENOTignore(transform(number%3,[0,1,2,3],['aa','ab','ad','af'],'a0'))1rowsinset.Elapsed:4.668sec.Processed1.00billionrows,8.00GB(214.21millionrows/s.,1.71GB/s.)SELECTcount()FROMnumbers_mt(1000000000)WHERENOTignore(multiIf((number%3)=0,'aa',(number%3)=1,'ab',(number%3)=2,'ad',(number%3)=3,'af','a0'))1rowsinset.Elapsed:7.333sec.Processed1.00billionrows,8.00GB(136.37millionrows/s.,1.09GB/s.)SELECTcount()FROMnumbers_mt(1000000000)WHERENOTignore(CAST(number%3ASEnum('aa'=0,'ab'=1,'ad'=2,'af'=3)'))
1 rows in set. Elapsed: 1.152 sec. Processed 1.00 billion rows, 8.00 GB (867.79 million rows/s., 6.94 GB/s.)
3.1 - How to encode/decode quantileTDigest states from/to list of centroids
A way to export or import quantileTDigest states from/into ClickHouse®
quantileTDigestState
quantileTDigestState is stored in two parts: a count of centroids in LEB128 format + list of centroids without a delimiter. Each centroid is represented as two Float32 values: Mean & Count.
3.5 - arrayMap, arrayJoin or ARRAY JOIN memory usage
Why do arrayMap, arrayFilter, and arrayJoin use so much memory?
arrayMap-like functions memory usage calculation.
In order to calculate arrayMap or similar array* functions ClickHouse® temporarily does arrayJoin-like operation, which in certain conditions can lead to huge memory usage for big arrays.
We can roughly estimate memory usage by multiplying the size of columns participating in the lambda function by the size of the unnested array.
And total memory usage will be 55 values (5(array size)*2(array count)*5(row count) + 5(unnested array size)), which is 5.5 times more than initial array size.
WITHCAST(NULL,'Nullable(UInt8)')AScolumnSELECTcolumn,assumeNotNull(column+999)ASx;┌─column─┬─x─┐│null│0│└────────┴───┘WITHCAST(NULL,'Nullable(UInt8)')AScolumnSELECTcolumn,assumeNotNull(materialize(column)+999)ASx;┌─column─┬───x─┐│null│999│└────────┴─────┘CREATETABLEtest_null(`key`UInt32,`value`Nullable(String))ENGINE=MergeTreeORDERBYkey;INSERTINTOtest_nullSELECTnumber,concat('value ',toString(number))FROMnumbers(4);SELECT*FROMtest_null;┌─key─┬─value───┐│0│value0││1│value1││2│value2││3│value3│└─────┴─────────┘ALTERTABLEtest_nullUPDATEvalue=NULLWHEREkey=3;SELECT*FROMtest_null;┌─key─┬─value───┐│0│value0││1│value1││2│value2││3│null│└─────┴─────────┘SELECTkey,assumeNotNull(value)FROMtest_null;┌─key─┬─assumeNotNull(value)─┐│0│value0││1│value1││2│value2││3│value3│└─────┴──────────────────────┘WITHCAST(NULL,'Nullable(Enum8(\'a\' = 1, \'b\' = 0))')AStestSELECTassumeNotNull(test)┌─assumeNotNull(test)─┐│b│└─────────────────────┘WITHCAST(NULL,'Nullable(Enum8(\'a\' = 1))')AStestSELECTassumeNotNull(test)Erroronprocessingquery'with CAST(null, 'Nullable(Enum8(\'a\'=1))') as test
select assumeNotNull(test); ;':Code:36,e.displayText()=DB::Exception:Unexpectedvalue0inenum,Stacktrace(whencopyingthismessage,alwaysincludethelinesbelow):
Info
Null values in ClickHouse® are stored in a separate dictionary: is this value Null. And for faster dispatch of functions there is no check on Null value while function execution, so functions like plus can modify internal column value (which has default value). In normal conditions it’s not a problem because on read attempt, ClickHouse first would check the Null dictionary and return value from column itself for non-Nulls only. And assumeNotNull function just ignores this Null dictionary. So it would return only column values, and in certain cases it’s possible to have unexpected results.
If it’s possible to have Null values, it’s better to use ifNull function instead.
Because encryption and decryption can be expensive due re-initialization of keys and iv, usually it make sense to use those functions over literal values instead of table column.
3.8 - sequenceMatch
sequenceMatch
Question
I expect the sequence here to only match once as a is only directly after a once - but it matches with gaps. Why is that?
Learn how you can integrate cloud services, BI tools, kafka, MySQL, Spark, MindsDB, and more with ClickHouse®
4.1 - Altinity Cloud Access Management
Enabling access_management for Altinity.Cloud databases.
Organizations that want to enable administrative users in their Altinity.Cloud ClickHouse® servers can do so by enabling access_management manually. This allows for administrative users to be created on the specific ClickHouse Cluster.
WARNING
Modifying the ClickHouse cluster settings manually can lead to the cluster not loading or other issues. Change settings only with full consultation with an Altinity.Cloud support team member, and be ready to remove settings if they cause any disruption of service.
To add the access_management setting to an Altinity.Cloud ClickHouse Cluster:
Log into your Altinity.Cloud account.
For the cluster to modify, select Configure -> Settings.
Cluster setting configure
From the Settings page, select +ADD SETTING.
Add cluster setting
Set the following options:
Setting Type: Select users.d file.
Filename: access_management.xml
Contents: Enter the following to allow the clickhouse_operator that controls the cluster through the clickhouse-operator the ability to set administrative options:
access_management=1 means that users admin, clickhouse_operator are able to create users and grant them privileges using SQL.
Select OK. The cluster will restart, and users can now be created in the cluster that can be granted administrative access.
If you are running ClickHouse 21.9 and above you can enable storing access management in ZooKeeper. in this case it will be automatically propagated to the cluster. This requires yet another configuration file:
The clickhouse-driver is a Python library used for interacting with ClickHouse. Here’s a summary of its features:
Connectivity: clickhouse-driver allows Python applications to connect to ClickHouse servers over TCP/IP Native Interface (9000/9440 ports) and also HTTP interface but it is experimental.
SQL Queries: It enables executing SQL queries against ClickHouse databases from Python scripts, including data manipulation (insertion, deletion, updating) and data retrieval (select queries).
Query Parameters: Supports parameterized queries, which helps in preventing SQL injection attacks and allows for more efficient execution of repeated queries with different parameter values.
Connection Pooling: Provides support for connection pooling, which helps manage connections efficiently, especially in high-concurrency applications, by reusing existing connections instead of creating new ones for each query.
Data Types: Handles conversion between Python data types and ClickHouse data types, ensuring compatibility and consistency when passing data between Python and ClickHouse.
Error Handling: Offers comprehensive error handling mechanisms, including exceptions and error codes, to facilitate graceful error recovery and handling in Python applications.
Asynchronous Support: Supports asynchronous execution of queries using asyncio, allowing for non-blocking query execution in asynchronous Python applications.
Customization: Provides options for customizing connection settings, query execution behavior, and other parameters to suit specific application requirements and performance considerations.
Compatibility: Works with various versions of ClickHouse, ensuring compatibility and support for different ClickHouse features and functionalities.
Documentation and Community: Offers comprehensive documentation and active community support, including examples, tutorials, and forums, to assist developers in effectively using the library and addressing any issues or questions they may have.
This was the first python driver for ClickHouse. It has a mature codebase. By default ClickHouse drivers uses synchronous code
. There is a wrapper to convert code to asynchronous, https://github.com/long2ice/asynch
Here you can get a basic working example from Altinity repo for ingestion/selection using clickhouse-driver:
The ClickHouse Connect Python driver is the ClickHouse, Inc supported-official Python library. Here’s a summary of its key features:
Connectivity: allows Python applications to connect to ClickHouse servers over HTTP Interface (8123/8443 ports).
Compatibility: The driver is compatible with Python 3.x versions, ensuring that it can be used with modern Python applications without compatibility issues.
Performance: The driver is optimized for performance, allowing for efficient communication with ClickHouse databases to execute queries and retrieve results quickly, which is crucial for applications requiring low latency and high throughput.
Query Execution: Developers can use the driver to execute SQL queries against ClickHouse databases, including SELECT, INSERT, UPDATE, DELETE, and other SQL operations, enabling them to perform various data manipulation tasks from Python applications.
Parameterized Queries: The driver supports parameterized queries, allowing developers to safely pass parameters to SQL queries to prevent SQL injection attacks and improve query performance by reusing query execution plans.
Data Type Conversion: The driver automatically handles data type conversion between Python data types and ClickHouse data types, ensuring seamless integration between Python applications and ClickHouse databases without manual data type conversion.
Error Handling: The driver provides robust error handling mechanisms, including exceptions and error codes, to help developers handle errors gracefully and take appropriate actions based on the type of error encountered during query execution.
Limited Asynchronous Support: Some implementations of the driver offer asynchronous support, allowing developers to execute queries asynchronously to improve concurrency and scalability in asynchronous Python applications using asynchronous I/O frameworks like asyncio.
Configuration Options: The driver offers various configuration options, such as connection parameters, authentication methods, and connection pooling settings, allowing developers to customize the driver’s behavior to suit their specific requirements and environment.
Documentation and Community: Offers comprehensive documentation and active community support, including examples, tutorials, and forums, to assist developers in effectively using the library and addressing any issues or questions they may have. https://clickhouse.com/docs/en/integrations/language-clients/python/intro/
If you want to specify a session_id per query you should be able to use the setting dictionary to pass a session_id for each query (note that ClickHouse will automatically generate a session_id if none is provided).
SETTINGS={"session_id":"dagster-batch"+"-"+f"{time.time()}"}client.query("INSERT INTO table ....",settings=SETTINGS)
importclickhouse_connectimportasynciofromclickhouse_connect.driver.httputilimportget_pool_managerasyncdefmain():client=awaitclickhouse_connect.get_async_client(host='localhost',port=8123,pool_mgr=get_pool_manager())foriinrange(100):result=awaitclient.query("SELECT name FROM system.databases")print(result.result_rows)asyncio.run(main())
clickhouse-connect code is synchronous by default and running synchronous functions in an async application is a workaround and might not be as efficient as using a library/wrapper designed for asynchronous operations from the ground up.. So you can use the current wrapper or you can use another approach with asyncio and concurrent.futures and ThreadpoolExecutor or ProcessPoolExecutor. Python GIL has a mutex over Threads but not to Processes so if you need performance at the cost of using processes instead of threads (not much different for medium workloads) you can use ProcesspoolExecutor instead.
importasynciofromconcurrent.futuresimportProcessPoolExecutorimportclickhouse_connect# Function to execute a query using clickhouse-connect synchronouslydefexecute_query_sync(query):client=clickhouse_connect.get_client()# Adjust connection params as neededresult=client.query(query)returnresult# Asynchronous wrapper function to run the synchronous function in a process poolasyncdefexecute_query_async(query):loop=asyncio.get_running_loop()# Use ProcessPoolExecutor to execute the synchronous functionwithProcessPoolExecutor()aspool:result=awaitloop.run_in_executor(pool,execute_query_sync,query)returnresultasyncdefmain():query="SELECT * FROM your_table LIMIT 10"# Example queryresult=awaitexecute_query_async(query)print(result)# Run the async main functionif__name__=='__main__':asyncio.run(main())
So to use asynchronous approach it is recommended to use a connection pool and some asyncio wrapper that can hide the complexity of using the ThreadPoolExecutor/ProcessPoolExecutor
It reads mysql binlog directly and transform queries into something which ClickHouse® can support. Supports updates and deletes (under the hood implemented via something like ReplacingMergeTree with enforced FINAL and ‘deleted’ flag). Status is ’experimental’, there are quite a lot of known limitations and issues, but some people use it. The original author of that went to another project, and the main team don’t have a lot of resource to improve that for now (more important thing in the backlog)
The replication happens on the mysql database level.
Replication using debezium + Kafka (+ Altinity Sink Connector for ClickHouse)
Debezium can read the binlog and transform it to Kafka messages.
You can later capture the stream of message on ClickHouse side and process it as you like.
Please remember that currently Kafka engine supports only at-least-once delivery guarantees.
It’s used by several companies, quite nice & flexible. But initial setup may require some efforts.
Altinity Sink Connector for ClickHouse
Can handle transformation of debezium messages (with support for DELETEs and UPDATEs) and exactly-once delivery for you.
That was done long time ago in altinity for one use-case, and it seem like it was never used outside of that.
It’s a python application with lot of switches which can copy a schema or read binlog from mysql and put it to ClickHouse.
Not supported currently. But it’s just a python, so maybe can be adjusted to different needs.
Accessing MySQL data via integration engines from inside ClickHouse.
Download the latest release
. On 64bit system you usually need both 32 bit and 64 bit drivers.
Install (usually you will need ANSI driver, but better to install both versions, see below).
Configure ClickHouse DSN.
Note: that install driver linked against MDAC (which is default for Windows), some non-windows native
applications (cygwin / msys64 based) may require driver linked against unixodbc. Build section below.
Add ClickHouse DSN configuration into ~/.odbc.ini file. (sample
)
Note: that install driver linked against iodbc (which is default for Mac), some homebrew applications
(like python) may require unixodbc driver to work properly. In that case see Build section below.
Linux
DEB/RPM packaging is not provided yet, please build & install the driver from sources.
Add ClickHouse DSN configuration into ~/.odbc.ini file. (sample
)
On Windows you can create/edit DSN using GUI tool through Control Panel.
The list of DSN parameters recognized by the driver is as follows:
Parameter
Default value
Description
Url
empty
URL that points to a running ClickHouse instance, may include username, password, port, database, etc.
Proto
deduced from Url, or from Port and SSLMode: https if 443 or 8443 or SSLMode is not empty, http otherwise
Protocol, one of: http, https
Server or Host
deduced from Url
IP or hostname of a server with a running ClickHouse instance on it
Port
deduced from Url, or from Proto: 8443 if https, 8123 otherwise
Port on which the ClickHouse instance is listening
Path
/query
Path portion of the URL
UID or Username
default
User name
PWD or Password
empty
Password
Database
default
Database name to connect to
Timeout
30
Connection timeout
SSLMode
empty
Certificate verification method (used by TLS/SSL connections, ignored in Windows), one of: allow, prefer, require, use allow to enable SSL_VERIFY_PEER
TLS/SSL certificate verification mode, SSL_VERIFY_PEER | SSL_VERIFY_FAIL_IF_NO_PEER_CERT
is used otherwise
PrivateKeyFile
empty
Path to private key file (used by TLS/SSL connections), can be empty if no private key file is used
CertificateFile
empty
Path to certificate file (used by TLS/SSL connections, ignored in Windows), if the private key and the certificate are stored in the same file, this can be empty if PrivateKeyFile is specified
CALocation
empty
Path to the file or directory containing the CA/root certificates (used by TLS/SSL connections, ignored in Windows)
DriverLog
on if CMAKE_BUILD_TYPE is Debug, off otherwise
Enable or disable the extended driver logging
DriverLogFile
\temp\clickhouse-odbc-driver.log on Windows, /tmp/clickhouse-odbc-driver.log otherwise
Path to the extended driver log file (used when DriverLog is on)
Troubleshooting & bug reporting
If some software doesn’t work properly with that driver, but works good with other drivers - we will be appropriate if you will be able to collect debug info.
To debug issues with the driver, first things that need to be done are:
enabling driver manager tracing. Links may contain some irrelevant vendor-specific details.
enabling driver logging, see DriverLog and DriverLogFile DSN parameters above
making sure that the application is allowed to create and write these driver log and driver manager trace files
follow the steps leading to the issue.
Collected log files will help to diagnose & solve the issue.
Driver Managers
Note, that since ODBC drivers are not used directly by a user, but rather accessed through applications, which in their turn access the driver through ODBC driver manager, user have to install the driver for the same architecture (32- or 64-bit) as the application that is going to access the driver. Moreover, both the driver and the application must be compiled for (and actually use during run-time) the same ODBC driver manager implementation (we call them “ODBC providers” here). There are three supported ODBC providers:
ODBC driver manager associated with MDAC (Microsoft Data Access Components, sometimes referenced as WDAC, Windows Data Access Components) - the standard ODBC provider of Windows
UnixODBC - the most common ODBC provider in Unix-like systems. Theoretically, could be used in Cygwin or MSYS/MinGW environments in Windows too.
iODBC - less common ODBC provider, mainly used in Unix-like systems, however, it is the standard ODBC provider in macOS. Theoretically, could be used in Cygwin or MSYS/MinGW environments in Windows too.
If you don’t see a package that matches your platforms, or the version of your system is significantly different than those of the available packages, or maybe you want to try a bleeding edge version of the code that hasn’t been released yet, you can always build the driver manually from sources.
Note, that it is always a good idea to install the driver from the corresponding native package (.msi, etc., which you can also easily create if you are building from sources), than use the binaries that were manually copied to some folder.
Building from sources
The general requirements for building the driver from sources are as follows:
CMake 3.12 and later
C++17 and C11 capable compiler toolchain:
Clang 4 and later
GCC 7 and later
Xcode 10 and later
Microsoft Visual Studio 2017 and later
ODBC Driver manager (MDAC / unixodbc / iODBC)
SSL library (openssl)
Generic build scenario:
git clone --recursive git@github.com:ClickHouse/clickhouse-odbc.git
cd clickhouse-odbc
mkdir build
cd build
cmake -DCMAKE_BUILD_TYPE=RelWithDebInfo ..
cmake --build . -C RelWithDebInfo
Additional requirements exist for each platform, which also depend on whether packaging and/or testing is performed.
Linux/macOS
Execute the following in the terminal to install needed dependencies:
# on Red Hat/CentOS (tested on CentOS 7)sudo yum groupinstall "Development Tools"sudo yum install centos-release-scl
sudo yum install devtoolset-8
sudo yum install git cmake openssl-devel unixODBC-devel # You may use libiodbc-devel INSTEAD of unixODBC-develscl enable devtoolset-8 -- bash # Enable Software collections for that terminal session, to use newer versions of complilers# on Ubuntu (tested on Ubuntu 18.10, for older versions you may need to install newer c++ compiler and cmake versions)sudo apt install build-essential git cmake libpoco-dev libssl-dev unixodbc-dev # You may use libiodbc-devel INSEAD of unixODBC-devel# MacOS: # You will need Xcode 10 or later and Command Line Tools to be installed, as well as [Homebrew](https://brew.sh/).brew install git cmake make poco openssl libiodbc # You may use unixodbc INSTEAD of libiodbc
Note: usually on Linux you use unixODBC driver manager, and on Mac - iODBC.
In some (rare) cases you may need use other driver manager, please do it only
if you clearly understand the differences. Driver should be used with the driver
manager it was linked to.
Enter the cloned source tree, create a temporary build folder, and generate a Makefile for the project in it:
cd clickhouse-odbc
mkdir build
cd build
# Configuration options for the project can be specified in the next command in a form of '-Dopt=val'# For MacOS: you may also add '-G Xcode' to the next command, in order to use Xcode as a build system or IDE, and generate the solution and project files instead of Makefile.cmake -DCMAKE_BUILD_TYPE=RelWithDebInfo ..
…and, optionally, run tests (note, that for non-unit tests, preconfigured driver and DSN entries must exist, that point to the binaries generated in this build folder):
cmake --build . -C RelWithDebInfo --target test
For MacOS: if you configured the project with ‘-G Xcode’ initially, open the IDE and build all, package, and test targets manually from there
cmake --open .
Windows
CMake bundled with the recent versions of Visual Studio can be used.
An SDK required for building the ODBC driver is included in Windows SDK, which in its turn is also bundled with Visual Studio.
You will need to install WiX toolset to be able to generate .msi packages. You can download and install it from WiX toolset home page
.
All of the following commands have to be issued in Visual Studio Command Prompt:
use x86 Native Tools Command Prompt for VS 2019 or equivalent for 32-bit builds
use x64 Native Tools Command Prompt for VS 2019 or equivalent for 64-bit builds
Enter the cloned source tree, create a temporary build folder, and generate the solution and project files in it:
cd clickhouse-odbc
mkdir build
cd build
# Configuration options for the project can be specified in the next command in a form of '-Dopt=val'# Use the following command for 32-bit build only.cmake -A Win32 -DCMAKE_BUILD_TYPE=RelWithDebInfo ..
# Use the following command for 64-bit build only.cmake -A x64 -DCMAKE_BUILD_TYPE=RelWithDebInfo ..
…and, optionally, run tests (note, that for non-unit tests, preconfigured driver and DSN entries must exist, that point to the binaries generated in this build folder):
cmake --build . -C RelWithDebInfo --target test
…or open the IDE and build all, package, and test targets manually from there:
cmake --open .
cmake options
The list of configuration options recognized during the CMake generation step is as follows:
Option
Default value
Description
CMAKE_BUILD_TYPE
RelWithDebInfo
Build type, one of: Debug, Release, RelWithDebInfo
CH_ODBC_ENABLE_SSL
ON
Enable TLS/SSL (required for utilizing https:// interface, etc.)
CH_ODBC_ENABLE_INSTALL
ON
Enable install targets (required for packaging)
CH_ODBC_ENABLE_TESTING
inherits value of BUILD_TESTING
Enable test targets
CH_ODBC_PREFER_BUNDLED_THIRD_PARTIES
ON
Prefer bundled over system variants of third party libraries
CH_ODBC_PREFER_BUNDLED_POCO
inherits value of CH_ODBC_PREFER_BUNDLED_THIRD_PARTIES
Prefer bundled over system variants of Poco library
CH_ODBC_PREFER_BUNDLED_SSL
inherits value of CH_ODBC_PREFER_BUNDLED_POCO
Prefer bundled over system variants of TLS/SSL library
CH_ODBC_PREFER_BUNDLED_GOOGLETEST
inherits value of CH_ODBC_PREFER_BUNDLED_THIRD_PARTIES
Prefer bundled over system variants of Google Test library
CH_ODBC_PREFER_BUNDLED_NANODBC
inherits value of CH_ODBC_PREFER_BUNDLED_THIRD_PARTIES
Prefer bundled over system variants of nanodbc library
CH_ODBC_RUNTIME_LINK_STATIC
OFF
Link with compiler and language runtime statically
CH_ODBC_THIRD_PARTY_LINK_STATIC
ON
Link with third party libraries statically
CH_ODBC_DEFAULT_DSN_ANSI
ClickHouse DSN (ANSI)
Default ANSI DSN name
CH_ODBC_DEFAULT_DSN_UNICODE
ClickHouse DSN (Unicode)
Default Unicode DSN name
TEST_DSN
inherits value of CH_ODBC_DEFAULT_DSN_ANSI
ANSI DSN name to use in tests
TEST_DSN_W
inherits value of CH_ODBC_DEFAULT_DSN_UNICODE
Unicode DSN name to use in tests
Packaging / redistributing the driver
You can just copy the library to another computer, in that case you need to
install run-time dependencies on target computer
Windows:
MDAC driver manager (preinstalled on all modern Windows systems)
C++ Redistributable for Visual Studio 2017 or same for 2019, etc.
register the driver so that the corresponding ODBC provider is able to locate it.
All this involves modifying a dedicated registry keys in case of MDAC, or editing odbcinst.ini (for driver registration) and odbc.ini (for DSN definition) files for UnixODBC or iODBC, directly or indirectly.
This will be done automatically using some default values if you are installing the driver using native installers.
Otherwise, if you are configuring manually, or need to modify the default configuration created by the installer, please see the exact locations of files (or registry keys) that need to be modified.
4.5 - ClickHouse® + Spark
jdbc
The trivial & natural way to talk to ClickHouse from Spark is using jdbc. There are 2 jdbc drivers:
ClickHouse can produce / consume data from/to Kafka to exchange data with Spark.
via hdfs
You can load data into hadoop/hdfs using sequence of statements like INSERT INTO FUNCTION hdfs(...) SELECT ... FROM clickhouse_table
later process the data from hdfs by spark and do the same in reverse direction.
via s3
Similar to above but using s3.
via shell calls
You can call other commands from Spark. Those commands can be clickhouse-client and/or clickhouse-local.
do you really need Spark? :)
In many cases you can do everything inside ClickHouse without Spark help :)
Arrays, Higher-order functions, machine learning, integration with lot of different things including the possibility to run some external code using executable dictionaries or UDF.
More info + some unordered links (mostly in Chinese / Russian)
HDFS+ClickHouse+Spark: A lightweight big data analysis system from 0 to 1. (Chinese: HDFS+ClickHouse+Spark:从0到1实现一款轻量级大数据分析系统) https://juejin.cn/post/6850418114962653198
Article is based on feedback provided by one of Altinity clients.
CatBoost:
It uses gradient boosting - a hard to use technique which can outperform neural networks. Gradient boosting is powerful but it’s easy to shoot yourself in the foot using it.
The documentation on how to use it is quite lacking. The only good source of information on how to properly configure a model to yield good results is this video: https://www.youtube.com/watch?v=usdEWSDisS0
. We had to dig around GitHub issues to find out how to make it work with ClickHouse®.
CatBoost is fast. Other libraries will take ~5X to ~10X as long to do what CatBoost does.
CatBoost will do preprocessing out of the box (fills nulls, apply standard scaling, encodes strings as numbers).
CatBoost has all functions you’d need (metrics, plotters, feature importance)
It makes sense to split what CatBoost does into 2 parts:
preprocessing (fills nulls, apply standard scaling, encodes strings as numbers)
number crunching (convert preprocessed numbers to another number - ex: revenue of impression)
Compared to Fast.ai
, CatBoost pre-processing is as simple to use and produces results that can be as good as Fast.ai
.
The number crunching part of Fast.ai
is no-config. For CatBoost you need to configure it, a lot.
CatBoost won’t simplify or hide any complexity of the process. So you need to know data science terms and what it does (ex: if your model is underfitting you can use a smaller l2_reg parameter in the model constructor).
In the end both Fast.ai
and CatBoost can yield comparable results.
Regarding deploying models, CatBoost is really good. The model runs fast, it has a simple binary format which can be loaded in ClickHouse, C, or Python and it will encapsulate pre-processing with the binary file. Deploying Fast.ai
models at scale/speed is impossible out of the box (we have our custom solution to do it which is not simple).
TLDR: CatBoost is fast, produces awesome models, is super easy to deploy and it’s easy to use/train (after becoming familiar with it despite the bad documentation & if you know data science terms).
Regarding MindsDB
The project seems to be a good idea but it’s too young. I was using the GUI version and I’ve encountered some bugs, and none of those bugs have a good error message.
It won’t show data in preview.
The “download” button won’t work.
It’s trying to create and drop tables in ClickHouse without me asking it to.
Other than bugs:
It will only use 1 core to do everything (training, analysis, download).
Analysis will only run with a very small subset of data, if I use something like 1M rows it never finishes.
Training a model on 100k rows took 25 minutes - (CatBoost takes 90s to train with 1M rows)
The model trained on MindsDB is way worse. It had r-squared of 0.46 (CatBoost=0.58)
To me it seems that they are a plugin which connects ClickHouse to MySQL to run the model in Pytorch.
It’s too complex and hard to debug and understand. The resulting model is not good enough.
TLDR: Easy to use (if bugs are ignored), too slow to train & produces a bad model.
4.8 - Google S3 (GCS)
GCS with the table function - seems to work correctly for simple scenarios.
This bucket must be set as part of the default project for the account. This configuration can be found in settings -> interoperability.
Generate a HMAC key for the account, can be done in settings -> interoperability, in the section for user account access keys.
In ClickHouse®, replace the S3 bucket endpoint with the GCS bucket endpoint This must be done with the path-style GCS endpoint: https://storage.googleapis.com/BUCKET_NAME/OBJECT_NAME.
Replace the aws access key id and aws secret access key with the corresponding parts of the HMAC key.
4.9 - Kafka engine
Kafka engine
librdkafka changelog
This changelog tracks the librdkafka version bundled with ClickHouse and notable related fixes.
<yandex><kafka><security_protocol>sasl_ssl</security_protocol><!-- Depending on your broker config you may need to uncomment below sasl_mechanism --><!-- <sasl_mechanism>SCRAM-SHA-512</sasl_mechanism> --><sasl_username>root</sasl_username><sasl_password>toor</sasl_password></kafka></yandex>
<yandex><kafka><security_protocol>sasl_ssl</security_protocol><sasl_mechanism>SCRAM-SHA-512</sasl_mechanism><sasl_username>root</sasl_username><sasl_password>toor</sasl_password><!-- fullchain cert here --><ssl_ca_location>/path/to/cert/fullchain.pem</ssl_ca_location></kafka></yandex>
Inline Kafka certs
To connect to some Kafka cloud services you may need to use certificates.
If needed they can be converted to pem format and inlined into ClickHouse® config.xml
Example:
<yandex><kafka><auto_offset_reset>smallest</auto_offset_reset><security_protocol>SASL_SSL</security_protocol><!-- older broker versions may need this below, for newer versions ignore --><!-- <ssl_endpoint_identification_algorithm>https</ssl_endpoint_identification_algorithm> --><sasl_mechanism>PLAIN</sasl_mechanism><sasl_username>username</sasl_username><sasl_password>password</sasl_password><!-- Same as above here ignore if newer broker version --><!-- <ssl_ca_location>probe</ssl_ca_location> --></kafka></yandex>
Some random example using SSL certificates to authenticate:
<yandex><kafka><max_poll_interval_ms>60000</max_poll_interval_ms><session_timeout_ms>60000</session_timeout_ms><heartbeat_interval_ms>10000</heartbeat_interval_ms><reconnect_backoff_ms>5000</reconnect_backoff_ms><reconnect_backoff_max_ms>60000</reconnect_backoff_max_ms><request_timeout_ms>20000</request_timeout_ms><retry_backoff_ms>500</retry_backoff_ms><message_max_bytes>20971520</message_max_bytes><debug>all</debug><!-- only to get the errors --><security_protocol>SSL</security_protocol><ssl_ca_location>/etc/clickhouse-server/ssl/kafka-ca-qa.crt</ssl_ca_location><ssl_certificate_location>/etc/clickhouse-server/ssl/client_clickhouse_client.pem</ssl_certificate_location><ssl_key_location>/etc/clickhouse-server/ssl/client_clickhouse_client.key</ssl_key_location><ssl_key_password>pass</ssl_key_password></kafka></yandex>
Authentication / connectivity
Sometimes the consumer group needs to be explicitly allowed in the broker UI config.
Use general Kafka/librdkafka settings from this page first, then apply provider-specific options from Config by provider
.
If we don’t have enough data (rows limit: kafka_max_block_size 1048576) or time limit reached (kafka_flush_interval_ms 7500ms) - continue polling (goto p.1)
Write a collected block of data to MV
Do commit (commit after write = at-least-once).
On any error, during that process, Kafka client is restarted (leading to rebalancing - leave the group and get back in few seconds).
kafka_commit_every_batch = 1 will change the loop logic mentioned above. Consumed batch committed to the Kafka and the block of rows send to Materialized Views only after that. It could be resembled as at-most-once delivery mode as prevent duplicate creation but allow loss of data in case of failures.
4.9.1.5 - SELECTs from engine=Kafka
SELECTs from engine=Kafka
Question
What will happen, if we would run SELECT query from working Kafka table with MV attached? Would data showed in SELECT query appear later in MV destination table?
Answer
Most likely SELECT query would show nothing.
If you lucky enough and something would show up, those rows wouldn’t appear in MV destination table.
So it’s not recommended to run SELECT queries on working Kafka tables.
In case of debug it’s possible to use another Kafka table with different consumer_group, so it wouldn’t affect your main pipeline.
4.9.2 - Consumption Patterns
Message consumption models, replay patterns, and delivery semantics.
4.9.2.1 - Exactly once semantics
Exactly once semantics
EOS consumer (isolation.level=read_committed) is enabled by default since librdkafka 1.2.0, so for ClickHouse® - since 20.2
BUT: while EOS semantics will guarantee you that no duplicates will happen on the Kafka side (i.e. even if you produce the same messages few times it will be consumed once), but ClickHouse as a Kafka client can currently guarantee only at-least-once. And in some corner cases (connection lost etc) you can get duplicates.
We need to have something like transactions on ClickHouse side to be able to avoid that. Adding something like simple transactions is in plans for Y2022.
block-aggregator by eBay
Block Aggregator is a data loader that subscribes to Kafka topics, aggregates the Kafka messages into blocks that follow the ClickHouse’s table schemas, and then inserts the blocks into ClickHouse. Block Aggregator provides exactly-once delivery guarantee to load data from Kafka to ClickHouse. Block Aggregator utilizes Kafka’s metadata to keep track of blocks that are intended to send to ClickHouse, and later uses this metadata information to deterministically re-produce ClickHouse blocks for re-tries in case of failures. The identical blocks are guaranteed to be deduplicated by ClickHouse.
For very large topics when you need more parallelism (especially on the insert side) you may use several tables with the same pipeline (pre ClickHouse® 20.9) or enable kafka_thread_per_consumer (after 20.9).
kafka_num_consumers=N,kafka_thread_per_consumer=1
Notes:
the inserts will happen in parallel (without that setting inserts happen linearly)
enough partitions are needed.
kafka_num_consumers is limited by number of physical cores (half of vCPUs). kafka_disable_num_consumers_limit can be used to override the limit.
background_message_broker_schedule_pool_size is 16 by default, you may need to increase if using more than 16 consumers
Before increasing kafka_num_consumers with keeping kafka_thread_per_consumer=0 may improve consumption & parsing speed, but flushing & committing still happens by a single thread there (so inserts are linear).
4.9.2.3 - Multiple MVs attached to Kafka table
How Multiple MVs attached to Kafka table consume and how they are affected by kafka_num_consumers/kafka_thread_per_consumer
Kafka Consumer is a thread inside the Kafka Engine table that is visible by Kafka monitoring tools like kafka-consumer-groups and in Clickhouse in system.kafka_consumers table.
Having multiple consumers increases ingesting parallelism and can significantly speed up event processing. However, it comes with a trade-off: it’s a CPU-intensive task, especially under high event load and/or complicated parsing of incoming data. Therefore, it’s crucial to create as many consumers as you really need and ensure you have enough CPU cores to handle them. We don’t recommend creating too many Kafka Engines per server because it could lead to uncontrolled CPU usage in situations like bulk data upload or catching up a huge kafka lag due to excessive parallelism of the ingesting process.
kafka_thread_per_consumer meaning
Consider a basic pipeline depicted as a Kafka table with 2 MVs attached. The Kafka broker has 2 topics and 4 partitions.
kafka_thread_per_consumer = 0
Kafka engine table will act as 2 consumers, but only 1 insert thread for both of them. It is important to note that the topic needs to have as many partitions as consumers. For this scenario, we use these settings:
The same Kafka engine will create 2 streams, 1 for each consumer, and will join them in a union stream. And it will use 1 thread for inserting [ 2385 ]
This is how we can see it in the logs:
2022.11.09 17:49:34.282077 [ 2385 ] {} <Debug> StorageKafka (kafka_table): Started streaming to 2 attached views
How ClickHouse® calculates the number of threads depending on the thread_per_consumer setting:
With this approach, even if the number of consumers increased, the Kafka engine will still use only 1 thread to flush. The consuming/processing rate will probably increase a bit, but not linearly. For example, 5 consumers will not consume 5 times faster. Also, a good property of this approach is the linearization of INSERTS, which means that the order of the inserts is preserved and sequential. This option is good for small/medium Kafka topics.
kafka_thread_per_consumer = 1
Kafka engine table will act as 2 consumers and 1 thread per consumer. For this scenario, we use these settings:
With this approach, the number of consumers remains the same, but each consumer will use their own insert/flush thread, and the consuming/processing rate should increase.
Background Pool
In Clickhouse there is a special thread pool for background processes, such as streaming engines. Its size is controlled by the background_message_broker_schedule_pool_size setting and is 16 by default. If you exceed this limit across all tables on the server, you’ll likely encounter continuous Kafka rebalances, which will slow down processing considerably. For a server with a lot of CPU cores, you can increase that limit to a higher value, like 20 or even 40. background_message_broker_schedule_pool_size = 20 allows you to create 5 Kafka Engine tables with 4 consumers each of them has its own insert thread. This option is good for large Kafka topics with millions of messages per second.
Multiple Materialized Views
Attaching multiple Materialized Views (MVs) to a Kafka Engine table can be used when you need to apply different transformations to the same topic and store the resulting data in different tables.
(This approach also applies to the other streaming engines - RabbitMQ, s3queue, etc).
All streaming engines begin processing data (reading from the source and producing insert blocks) only after at least one Materialized View is attached to the engine. Multiple Materialized Views can be connected to distribute data across various tables with different transformations. But how does it work when the server starts?
Once the first Materialized View (MV) is loaded, started, and attached to the Kafka/s3queue table, data consumption begins immediately—data is read from the source, pushed to the destination, and the pointers advance to the next position. However, any other MVs that haven’t started yet will miss the data consumed by the first MV, leading to some data loss.
This issue worsens with asynchronous table loading. Tables are only loaded upon first access, and the loading process takes time. When multiple MVs direct the data stream to different tables, some tables might be ready sooner than others. As soon as the first table becomes ready, data consumption starts, and any tables still loading will miss the data consumed during that interval, resulting in further data loss for those tables.
That means when you make a design with Multiple MVs async_load_databases should be switched off:
Also, you have to prevent starting to consume until all MVs are loaded and started. For that, you can add an additional Null table to the MV pipeline, so the Kafka table will pass the block to a single Null table first, and only then many MVs start their own transformations to many dest tables:
When a consumer joins the consumer group, the broker will check if it has a committed offset. If that is the case, then it will start from the latest offset. Both ClickHouse and librdKafka documentation state that the default value for auto_offset_reset is largest (or latest in new Kafka versions) but it is not, if the consumer is new:
conf.set("auto.offset.reset", "earliest"); // If no offset stored for this group, read all messages from the start
If there is no offset stored or it is out of range, for that particular consumer group, the consumer will start consuming from the beginning (earliest), and if there is some offset stored then it should use the latest.
The log retention policy influences which offset values correspond to the earliest and latest configurations. Consider a scenario where a topic has a retention policy set to 1 hour. Initially, you produce 5 messages, and then, after an hour, you publish 5 more messages. In this case, the latest offset will remain unchanged from the previous example. However, due to Kafka removing the earlier messages, the earliest available offset will not be 0; instead, it will be 5.
4.9.3 - Schema and Formats
Schema inference and format-specific integration details.
4.9.3.1 - Inferring Schema from AvroConfluent Messages in Kafka for ClickHouse®
Learn how to define Kafka table structures in ClickHouse® by using Avro’s schema registry & sample message.
To consume messages from Kafka within ClickHouse®, you need to define the ENGINE=Kafka table structure with all the column names and types.
This task can be particularly challenging when dealing with complex Avro messages, as manually determining the exact schema for
ClickHouse is both tricky and time-consuming. This complexity is particularly frustrating in the case of Avro formats,
where the column names and their types are already clearly defined in the schema registry.
Although ClickHouse supports schema inference for files, it does not natively support this for Kafka streams.
Here’s a workaround to infer the schema using AvroConfluent messages:
Step 1: Capture and Store a Raw Kafka Message
First, create a table in ClickHouse to consume a raw message from Kafka and store it as a file:
CREATETABLEtest_kafka(rawString)ENGINE=KafkaSETTINGSkafka_broker_list='localhost:29092',kafka_topic_list='movies-raw',kafka_format='RawBLOB',-- Don't try to parse the message, return it 'as is'
kafka_group_name='tmp_test';-- Using some dummy consumer group here.
INSERTINTOFUNCTIONfile('./avro_raw_sample.avro','RawBLOB')SELECT*FROMtest_kafkaLIMIT1SETTINGSmax_block_size=1,stream_like_engine_allow_direct_select=1;DROPTABLEtest_kafka;
Step 2: Infer Schema Using the Stored File
Using the stored raw message, let ClickHouse infer the schema based on the AvroConfluent format and a specified schema registry URL:
This approach reduces manual schema definition efforts and enhances data integration workflows by utilizing the schema inference capabilities of ClickHouse for AvroConfluent messages.
Appendix
Avro is a binary serialization format used within Apache Kafka for efficiently serializing data with a compact binary format. It relies on schemas, which define the structure of the serialized data, to ensure robust data compatibility and type safety.
Schema Registry is a service that provides a centralized repository for Avro schemas. It helps manage and enforce schemas across applications, ensuring that the data exchanged between producers and consumers adheres to a predefined format, and facilitates schema evolution in a safe manner.
In ClickHouse, the Avro format is used for data that contains the schema embedded directly within the file or message. This means the structure of the data is defined and included with the data itself, allowing for self-describing messages. However, embedding the schema within every message is not optimal for streaming large volumes of data, as it increases the workload and network overhead. Repeatedly passing the same schema with each message can be inefficient, particularly in high-throughput environments.
On the other hand, the AvroConfluent format in ClickHouse is specifically designed to work with the Confluent Schema Registry. This format expects the schema to be managed externally in a schema registry rather than being embedded within each message. It retrieves schema information from the Schema Registry, which allows for centralized schema management and versioning, facilitating easier schema evolution and enforcement across different applications using Kafka.
4.9.4 - Operations and Troubleshooting
Runtime tuning, resource settings, and error diagnostics.
4.9.4.1 - Setting the background message broker schedule pool size
Guide to managing the background_message_broker_schedule_pool_size setting for Kafka, RabbitMQ, and NATS table engines in your database.
Overview
When using Kafka, RabbitMQ, or NATS table engines in ClickHouse®, you may encounter issues related to a saturated background thread pool. One common symptom is a warning similar to the following:
2025.03.14 08:44:26.725868 [ 344 ] {} <Warning> StorageKafka (events_kafka): [rdk:MAXPOLL] [thrd:main]: Application maximum poll interval (60000ms) exceeded by 159ms (adjust max.poll.interval.ms for long-running message processing): leaving group
This warning typically appears not because ClickHouse fails to poll, but because there are no available threads in the background pool to handle the polling in time. In rare cases, the same error might also be caused by long flushing operations to Materialized Views (MVs), especially if their logic is complex or chained.
To resolve this, you should monitor and, if needed, increase the value of the background_message_broker_schedule_pool_size setting.
Step 1: Check Thread Pool Utilization
Run the following SQL query to inspect the current status of your background message broker thread pool:
If free_threads is close to zero or negative, it means your thread pool is saturated and should be increased.
Step 2: Estimate Required Pool Size
To estimate a reasonable value for background_message_broker_schedule_pool_size, run the following query:
WITHtoUInt32OrDefault(extract(engine_full,'kafka_num_consumers\s*=\s*(\d+)'))askafka_num_consumers,extract(engine_full,'kafka_thread_per_consumer\s*=\s*(\d+|\'true\')')notin('','0')askafka_thread_per_consumer,multiIf(engine='Kafka',if(kafka_thread_per_consumerANDkafka_num_consumers>0,kafka_num_consumers,1),engine='RabbitMQ',3,engine='NATS',3,0/* should not happen */)asthreads_neededSELECTceil(sum(threads_needed)*1.25)FROMsystem.tablesWHEREenginein('Kafka','RabbitMQ','NATS')
This will return an estimate that includes a 25% buffer to accommodate spikes in load.
Step 3: Apply the New Setting
Create or update the following configuration file:
After applying the configuration, restart ClickHouse to apply the changes:
sudo systemctl restart clickhouse-server
Summary
A saturated background message broker thread pool can lead to missed Kafka polls and consumer group dropouts. Monitoring your metrics and adjusting background_message_broker_schedule_pool_size accordingly ensures stable operation of Kafka, RabbitMQ, and NATS integrations.
If the problem persists even after increasing the pool size, consider investigating slow MV chains or flushing logic as a potential bottleneck.
4.9.4.2 - Error handling
Error handling
Pre 21.6
There are couple options:
Certain formats which has schema in built in them (like JSONEachRow) could silently skip any unexpected fields after enabling setting input_format_skip_unknown_fields
It’s also possible to skip up to N malformed messages for each block, with used setting kafka_skip_broken_messages but it’s also does not support all possible formats.
After 21.6
It’s possible to stream messages which could not be parsed, this behavior could be enabled via setting: kafka_handle_error_mode='stream' and ClickHouse® wil write error and message from Kafka itself to two new virtual columns: _error, _raw_message.
So you can create another Materialized View which would collect to a separate table all errors happening while parsing with all important information like offset and content of message.
Allow to save unparsed records and errors in RabbitMQ:
NATS and FileLog engines. Add virtual columns _error and _raw_message (for NATS and RabbitMQ), _raw_record (for FileLog) that are filled when ClickHouse fails to parse new record.
The behaviour is controlled under storage settings nats_handle_error_mode for NATS, rabbitmq_handle_error_mode for RabbitMQ, handle_error_mode for FileLog similar to kafka_handle_error_mode.
If it’s set to default, en exception will be thrown when ClickHouse fails to parse a record, if it’s set to stream, error and raw record will be saved into virtual columns.
Closes #36035
and #55477
CREATETABLEIFNOTEXISTSrabbitmq.broker_errors_queue(exchange_nameString,channel_idString,delivery_tagUInt64,redeliveredUInt8,message_idString,timestampUInt64)engine=RabbitMQSETTINGSrabbitmq_host_port='localhost:5672',rabbitmq_exchange_name='exchange-test',-- required parameter even though this is done via the rabbitmq config
rabbitmq_queue_consume=true,rabbitmq_queue_base='test-errors',rabbitmq_format='JSONEachRow',