1 - CollapsingMergeTree vs ReplacingMergeTree

CollapsingMergeTree vs ReplacingMergeTree.

CollapsingMergeTree vs ReplacingMergeTree

ReplacingMergeTreeCollapsingMergeTree
+ very easy to use (always replace)- more complex (accounting-alike, put ‘rollback’ records to fix something)
+ you don’t need to store the previous state of the row- you need to the store (somewhere) the previous state of the row, OR extract it from the table itself (point queries is not nice for ClickHouse)
- no deletes+ support deletes
- w/o FINAL - you can can always see duplicates, you need always to ‘pay’ FINAL performance penalty+ properly crafted query can give correct results without final (i.e. sum(amount * sign) will be correct, no matter of you have duplicated or not)
- only uniq()-alike things can be calculated in materialied views+ you can do basic counts & sums in materialized views

2 - Part names & MVCC

Part names & multiversion concurrency control.

Part names & multiversion concurrency control

Part name format is:

<partitionid>_<min_block_number>_<max_block_number>_<level>_<data_version>

system.parts contains all the information parsed.

partitionid is quite simple (it just comes from your partitioning key).

What are block_numbers?

DROP TABLE IF EXISTS part_names;
create table part_names (date Date, n UInt8, m UInt8) engine=MergeTree PARTITION BY toYYYYMM(date) ORDER BY n;

insert into part_names VALUES (now(), 0, 0);
select name, partition_id, min_block_number, max_block_number, level, data_version from system.parts where table = 'part_names' and active;
┌─name─────────┬─partition_id─┬─min_block_number─┬─max_block_number─┬─level─┬─data_version─┐
│ 202203_1_1_0 │ 202203       │                1 │                1 │     0 │            1 │
└──────────────┴──────────────┴──────────────────┴──────────────────┴───────┴──────────────┘

insert into part_names VALUES (now(), 0, 0);
select name, partition_id, min_block_number, max_block_number, level, data_version from system.parts where table = 'part_names' and active;
┌─name─────────┬─partition_id─┬─min_block_number─┬─max_block_number─┬─level─┬─data_version─┐
│ 202203_1_1_0 │ 202203       │                1 │                1 │     0 │            1 │
│ 202203_2_2_0 │ 202203       │                2 │                2 │     0 │            2 │
└──────────────┴──────────────┴──────────────────┴──────────────────┴───────┴──────────────┘

insert into part_names VALUES (now(), 0, 0);
select name, partition_id, min_block_number, max_block_number, level, data_version from system.parts where table = 'part_names' and active;
┌─name─────────┬─partition_id─┬─min_block_number─┬─max_block_number─┬─level─┬─data_version─┐
│ 202203_1_1_0 │ 202203       │                1 │                1 │     0 │            1 │
│ 202203_2_2_0 │ 202203       │                2 │                2 │     0 │            2 │
│ 202203_3_3_0 │ 202203       │                3 │                3 │     0 │            3 │
└──────────────┴──────────────┴──────────────────┴──────────────────┴───────┴──────────────┘

As you can see every insert creates a new incremental block_number which is written in part names both as <min_block_number> and <min_block_number> (and the level is 0 meaning that the part was never merged).

Those block numbering works in the scope of partition (for Replicated table) or globally across all partition (for plain MergeTree table).

ClickHouse always merge only continuous blocks . And new part names always refer to the minimum and maximum block numbers.

OPTIMIZE TABLE part_names;

┌─name─────────┬─partition_id─┬─min_block_number─┬─max_block_number─┬─level─┬─data_version─┐
│ 202203_1_3_1 │ 202203       │                1 │                3 │     1 │            1 │
└──────────────┴──────────────┴──────────────────┴──────────────────┴───────┴──────────────┘

As you can see here - three parts (with block number 1,2,3) were merged and they formed the new part with name 1_3 as min/max block size. Level get incremented.

Now even while previous (merged) parts still exists in filesystem for a while (as inactive) clickhouse is smart enough to understand that new part ‘covers’ same range of blocks as 3 parts of the prev ‘generation’

There might be a fifth section in the part name, data version.

Data version gets increased when a part mutates.

Every mutation takes one block number:

insert into part_names VALUES (now(), 0, 0);
insert into part_names VALUES (now(), 0, 0);
insert into part_names VALUES (now(), 0, 0);

select name, partition_id, min_block_number, max_block_number, level, data_version from system.parts where table = 'part_names' and active;

┌─name─────────┬─partition_id─┬─min_block_number─┬─max_block_number─┬─level─┬─data_version─┐
│ 202203_1_3_1 │ 202203       │                1 │                3 │     1 │            1 │
│ 202203_4_4_0 │ 202203       │                4 │                4 │     0 │            4 │
│ 202203_5_5_0 │ 202203       │                5 │                5 │     0 │            5 │
│ 202203_6_6_0 │ 202203       │                6 │                6 │     0 │            6 │
└──────────────┴──────────────┴──────────────────┴──────────────────┴───────┴──────────────┘

insert into part_names VALUES (now(), 0, 0);

alter table part_names update m=n where 1;

select name, partition_id, min_block_number, max_block_number, level, data_version from system.parts where table = 'part_names' and active;

┌─name───────────┬─partition_id─┬─min_block_number─┬─max_block_number─┬─level─┬─data_version─┐
│ 202203_1_3_1_7 │ 202203       │                1 │                3 │     1 │            7 │
│ 202203_4_4_0_7 │ 202203       │                4 │                4 │     0 │            7 │
│ 202203_5_5_0_7 │ 202203       │                5 │                5 │     0 │            7 │
│ 202203_6_6_0_7 │ 202203       │                6 │                6 │     0 │            7 │
│ 202203_8_8_0   │ 202203       │                8 │                8 │     0 │            8 │
└────────────────┴──────────────┴──────────────────┴──────────────────┴───────┴──────────────┘

OPTIMIZE TABLE part_names;

select name, partition_id, min_block_number, max_block_number, level, data_version from system.parts where table = 'part_names' and active;
┌─name───────────┬─partition_id─┬─min_block_number─┬─max_block_number─┬─level─┬─data_version─┐
│ 202203_1_8_2_7 │ 202203       │                1 │                8 │     2 │            7 │
└────────────────┴──────────────┴──────────────────┴──────────────────┴───────┴──────────────┘

3 - How to pick an ORDER BY / PRIMARY KEY / PARTITION BY for the MergeTree-family table

How to pick an ORDER BY / PRIMARY KEY / PARTITION BY for the MergeTree table.

Good order by usually have 3 to 5 columns, from lowest cardinal on the left (and the most important for filtering) to highest cardinal (and less important for filtering).

Practical approach to create an good ORDER BY for a table:

  1. Pick the columns you use in filtering always
  2. The most important for filtering and the lowest cardinal should be the left-most. Typically it’s something like tenant_id
  3. Next column is more cardinal, less important. It can be rounded time sometimes, or site_id, or source_id, or group_id or something similar.
  4. repeat p.3 once again (or few times)
  5. if you added already all columns important for filtering and you still not addressing a single row with you pk - you can add more columns which can help to put similar records close to each other (to improve the compression)
  6. if you have something like hierarchy / tree-like relations between the columns - put there the records from ‘root’ to ’leaves’ for example (continent, country, cityname). This way clickhouse can do lookup by country / city even if continent is not specified (it will just ‘check all continents’) special variants of MergeTree may require special ORDER BY to make the record unique etc.
  7. For timeseries it usually make sense to put timestamp as latest column in ORDER BY, it helps with putting the same data near by for better locality. There is only 2 major patterns for timestamps in ORDER BY: (…, toStartOf(Day|Hour|…)(timestamp), …, timestamp) and (…, timestamp). First one is useful when your often query small part of table partition. (table partitioned by months and your read only 1-4 days 90% of times)

Some examples of good order by

ORDER BY (tenantid, site_id, utm_source, clientid, timestamp)
ORDER BY (site_id, toStartOfHour(timestamp), sessionid, timestamp )
PRIMARY KEY (site_id, toStartOfHour(timestamp), sessionid)

For Summing / Aggregating

All dimensions go to ORDER BY, all metrics - outside of that.

The most important for filtering columns with the lowest cardinality should be the left most.

If number of dimensions is high it’s typically make sense to use a prefix of ORDER BY as a PRIMARY KEY to avoid polluting sparse index.

Examples:

ORDER BY (tenant_id, hour, country_code, team_id, group_id, source_id)
PRIMARY KEY (tenant_id, hour, country_code, team_id)

For Replacing / Collapsing

You need to keep all ‘mutable’ columns outside of ORDER BY, and have some unique id (a base to collapse duplicates) inside. Typically the right-most column is some row identifier. And it’s often not needed in sparse index (so PRIMARY KEY can be a prefix of ORDER BY) The rest consideration are the same.

Examples:

ORDER BY (tenantid, site_id, eventid) --  utm_source is mutable, while tenantid, site_id is not
PRIMARY KEY (tenantid, site_id) -- eventid is not used for filtering, needed only for collapsing duplicates

ORDER BY example

-- col1: high Cardinality
-- col2: low cardinality

CREATE TABLE tests.order_test
(    
     `col1` DateTime,    
     `col2` UInt8
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(col1)
ORDER BY (col1, col2)
--
SELECT count() 
┌───count()─┐ 
 126371225  
└───────────┘ 

So let’s put the highest cardinal column to the left and the least to the right in the ORDER BY definition. This will impact in queries like:

SELECT * FROM order_test
WHERE col1 > toDateTime('2020-10-01')
ORDER BY col1, col2
FORMAT `Null`

Here for the filtering it will use the skipping index to select the parts WHERE col1 > xxx and the result wont be need to be ordered because the ORDER BY in the query aligns with the ORDER BY in the table and the data is already ordered in disk.

executeQuery: (from [::ffff:192.168.11.171]:39428, user: admin) SELECT * FROM order_test WHERE col1 > toDateTime('2020-10-01') ORDER BY col1,col2 FORMAT Null; (stage: Complete)
ContextAccess (admin): Access granted: SELECT(col1, col2) ON tests.order_test
ContextAccess (admin): Access granted: SELECT(col1, col2) ON tests.order_test
InterpreterSelectQuery: FetchColumns -> Complete
tests.order_test (SelectExecutor): Key condition: (column 0 in [1601503201, +Inf))
tests.order_test (SelectExecutor): MinMax index condition: (column 0 in [1601503201, +Inf))
tests.order_test (SelectExecutor): Running binary search on index range for part 202010_367_545_8 (7612 marks)
tests.order_test (SelectExecutor): Running binary search on index range for part 202010_549_729_12 (37 marks)
tests.order_test (SelectExecutor): Running binary search on index range for part 202011_689_719_2 (1403 marks)
tests.order_test (SelectExecutor): Running binary search on index range for part 202012_550_730_12 (3 marks)
tests.order_test (SelectExecutor): Found (LEFT) boundary mark: 0
tests.order_test (SelectExecutor): Found (LEFT) boundary mark: 0
tests.order_test (SelectExecutor): Found (LEFT) boundary mark: 0
tests.order_test (SelectExecutor): Found (RIGHT) boundary mark: 37
tests.order_test (SelectExecutor): Found (RIGHT) boundary mark: 3
tests.order_test (SelectExecutor): Found (RIGHT) boundary mark: 1403
tests.order_test (SelectExecutor): Found continuous range in 11 steps
tests.order_test (SelectExecutor): Found continuous range in 3 steps
tests.order_test (SelectExecutor): Running binary search on index range for part 202011_728_728_0 (84 marks)
tests.order_test (SelectExecutor): Found continuous range in 21 steps
tests.order_test (SelectExecutor): Running binary search on index range for part 202011_725_725_0 (128 marks)
tests.order_test (SelectExecutor): Found (LEFT) boundary mark: 0
tests.order_test (SelectExecutor): Found (LEFT) boundary mark: 0
tests.order_test (SelectExecutor): Found (RIGHT) boundary mark: 84
tests.order_test (SelectExecutor): Running binary search on index range for part 202011_722_722_0 (128 marks)
tests.order_test (SelectExecutor): Found continuous range in 13 steps
tests.order_test (SelectExecutor): Found (RIGHT) boundary mark: 128
tests.order_test (SelectExecutor): Found continuous range in 14 steps
tests.order_test (SelectExecutor): Running binary search on index range for part 202011_370_686_19 (5993 marks)
tests.order_test (SelectExecutor): Found (LEFT) boundary mark: 0
tests.order_test (SelectExecutor): Found (RIGHT) boundary mark: 5993
tests.order_test (SelectExecutor): Found (LEFT) boundary mark: 0
tests.order_test (SelectExecutor): Found continuous range in 25 steps
tests.order_test (SelectExecutor): Found (RIGHT) boundary mark: 128
tests.order_test (SelectExecutor): Found continuous range in 14 steps
tests.order_test (SelectExecutor): Found (LEFT) boundary mark: 0
tests.order_test (SelectExecutor): Found (RIGHT) boundary mark: 7612
tests.order_test (SelectExecutor): Found continuous range in 25 steps
tests.order_test (SelectExecutor): Selected 8/9 parts by partition key, 8 parts by primary key, 15380/15380 marks by primary key, 15380 marks to read from 8 ranges
Ok.

0 rows in set. Elapsed: 0.649 sec. Processed 125.97 million rows, 629.86 MB (194.17 million rows/s., 970.84 MB/s.)

If we change the ORDER BY expression in the query, Clickhouse will need to retrieve the rows and reorder them:

SELECT * FROM order_test
WHERE col1 > toDateTime('2020-10-01')
ORDER BY col2, col1
FORMAT `Null`

As seen In the MergingSortedTransform message, the ORDER BY in the table definition is not aligned with the ORDER BY in the query, so ClickHouse has to reorder the resultset.

executeQuery: (from [::ffff:192.168.11.171]:39428, user: admin) SELECT * FROM order_test WHERE col1 > toDateTime('2020-10-01') ORDER BY col2,col1 FORMAT Null; (stage: Complete)
ContextAccess (admin): Access granted: SELECT(col1, col2) ON tests.order_test
ContextAccess (admin): Access granted: SELECT(col1, col2) ON tests.order_test
InterpreterSelectQuery: FetchColumns -> Complete
tests.order_test (SelectExecutor): Key condition: (column 0 in [1601503201, +Inf))
tests.order_test (SelectExecutor): MinMax index condition: (column 0 in [1601503201, +Inf))
tests.order_test (SelectExecutor): Running binary search on index range for part 202010_367_545_8 (7612 marks)
tests.order_test (SelectExecutor): Running binary search on index range for part 202012_550_730_12 (3 marks)
tests.order_test (SelectExecutor): Found (LEFT) boundary mark: 0
tests.order_test (SelectExecutor): Running binary search on index range for part 202011_725_725_0 (128 marks)
tests.order_test (SelectExecutor): Found (RIGHT) boundary mark: 3
tests.order_test (SelectExecutor): Running binary search on index range for part 202011_689_719_2 (1403 marks)
tests.order_test (SelectExecutor): Running binary search on index range for part 202010_549_729_12 (37 marks)
tests.order_test (SelectExecutor): Running binary search on index range for part 202011_728_728_0 (84 marks)
tests.order_test (SelectExecutor): Found (LEFT) boundary mark: 0
tests.order_test (SelectExecutor): Found continuous range in 3 steps
tests.order_test (SelectExecutor): Found (LEFT) boundary mark: 0
tests.order_test (SelectExecutor): Found (LEFT) boundary mark: 0
tests.order_test (SelectExecutor): Found (LEFT) boundary mark: 0
tests.order_test (SelectExecutor): Running binary search on index range for part 202011_722_722_0 (128 marks)
tests.order_test (SelectExecutor): Found (RIGHT) boundary mark: 7612
tests.order_test (SelectExecutor): Found (RIGHT) boundary mark: 37
tests.order_test (SelectExecutor): Found (LEFT) boundary mark: 0
tests.order_test (SelectExecutor): Found continuous range in 11 steps
tests.order_test (SelectExecutor): Found (RIGHT) boundary mark: 1403
tests.order_test (SelectExecutor): Found (RIGHT) boundary mark: 84
tests.order_test (SelectExecutor): Found continuous range in 25 steps
tests.order_test (SelectExecutor): Running binary search on index range for part 202011_370_686_19 (5993 marks)
tests.order_test (SelectExecutor): Found continuous range in 21 steps
tests.order_test (SelectExecutor): Found (RIGHT) boundary mark: 128
tests.order_test (SelectExecutor): Found continuous range in 13 steps
tests.order_test (SelectExecutor): Found (LEFT) boundary mark: 0
tests.order_test (SelectExecutor): Found continuous range in 14 steps
tests.order_test (SelectExecutor): Found (RIGHT) boundary mark: 128
tests.order_test (SelectExecutor): Found (LEFT) boundary mark: 0
tests.order_test (SelectExecutor): Found continuous range in 14 steps
tests.order_test (SelectExecutor): Found (RIGHT) boundary mark: 5993
tests.order_test (SelectExecutor): Found continuous range in 25 steps
tests.order_test (SelectExecutor): Selected 8/9 parts by partition key, 8 parts by primary key, 15380/15380 marks by primary key, 15380 marks to read from 8 ranges
tests.order_test (SelectExecutor): MergingSortedTransform: Merge sorted 1947 blocks, 125972070 rows in 1.423973879 sec., 88465155.05499662 rows/sec., 423.78 MiB/sec
Ok.

0 rows in set. Elapsed: 1.424 sec. Processed 125.97 million rows, 629.86 MB (88.46 million rows/s., 442.28 MB/s.)

PARTITION BY

  • Good size for single partition is something like 1-300Gb.
  • For Summing/Replacing a bit smaller (400Mb-40Gb)
  • Better to avoid touching more that few dozens of partitions with typical SELECT query.
  • Single insert should bring data to one or few partitions.
  • The number of partitons in table - dozen or hundreds, not thousands.

The size of partitions you can check in system.parts table.

Examples:

-- for time-series:
PARTITION BY toYear(timestamp)          -- long retention, not too much data
PARTITION BY toYYYYMM(timestamp)        --  
PARTITION BY toMonday(timestamp)        -- 
PARTITION BY toDate(timestamp)          --
PARTITION BY toStartOfHour(timestamp)   -- short retention, lot of data

-- for table with some incremental (non time-bounded) counter

PARTITION BY intDiv(transaction_id, 1000000)

-- for some dimention tables (always requested with WHERE userid)
PARTITION BY userid % 16

For the small tables (smaller than few gigabytes) partitioning is usually not needed at all (just skip PARTITION BY expresssion when you create the table).

See also

How to change ORDER BY

ClickHouse Anti-Patterns. Learning from Users’ Mistakes

A short talk by Mikhail Filimonov

https://youtu.be/DP7l6Swkskw?t=3777

4 - AggregatingMergeTree

AggregatingMergeTree

Q. What happens with columns which are nor the part of ORDER BY key, nor have the AggregateFunction type?

A. it picks the first value met, (similar to any)

CREATE TABLE agg_test
(
    `a` String,
    `b` UInt8,
    `c` SimpleAggregateFunction(max, UInt8)
)
ENGINE = AggregatingMergeTree
ORDER BY a;

INSERT INTO agg_test VALUES ('a', 1, 1);
INSERT INTO agg_test VALUES ('a', 2, 2);

SELECT * FROM agg_test FINAL;

┌─a─┬─b─┬─c─┐
 a  1  2 
└───┴───┴───┘

INSERT INTO agg_test VALUES ('a', 3, 3);

SELECT * FROM agg_test;

┌─a─┬─b─┬─c─┐
 a  1  2 
└───┴───┴───┘
┌─a─┬─b─┬─c─┐
 a  3  3 
└───┴───┴───┘

OPTIMIZE TABLE agg_test FINAL;

SELECT * FROM agg_test;

┌─a─┬─b─┬─c─┐
 a  1  3 
└───┴───┴───┘

Last non-null value for each column

CREATE TABLE test_last
(
    `col1` Int32,
    `col2` SimpleAggregateFunction(anyLast, Nullable(DateTime)),
    `col3` SimpleAggregateFunction(anyLast, Nullable(DateTime))
)
ENGINE = AggregatingMergeTree
ORDER BY col1

Ok.

0 rows in set. Elapsed: 0.003 sec.

INSERT INTO test_last (col1, col2) VALUES (1, now());

Ok.

1 rows in set. Elapsed: 0.014 sec.

INSERT INTO test_last (col1, col3) VALUES (1, now())

Ok.

1 rows in set. Elapsed: 0.006 sec.

SELECT
    col1,
    anyLast(col2),
    anyLast(col3)
FROM test_last
GROUP BY col1

┌─col1─┬───────anyLast(col2)─┬───────anyLast(col3)─┐
│    1 │ 2020-01-16 20:57:46 │ 2020-01-16 20:57:51 │
└──────┴─────────────────────┴─────────────────────┘

1 rows in set. Elapsed: 0.005 sec.

SELECT *
FROM test_last
FINAL

┌─col1─┬────────────────col2─┬────────────────col3─┐
│    1 │ 2020-01-16 20:57:46 │ 2020-01-16 20:57:51 │
└──────┴─────────────────────┴─────────────────────┘

1 rows in set. Elapsed: 0.003 sec.

Merge two data streams

Q. I have 2 Kafka topics from which I am storing events into 2 different tables (A and B) having the same unique ID. I want to create a single table that combines the data in tables A and B into one table C. The problem is that data received asynchronously and not all the data is available when a row arrives in Table A or vice-versa.

A. You can use AggregatingMergeTree with Nullable columns and any aggregation function or Non-Nullable column and max aggregation function if it aceptable for your data.

CREATE TABLE table_C (
    id      Int64,
    colA    SimpleAggregatingFunction(any,Nullable(UInt32)),
    colB    SimpleAggregatingFunction(max, String)
) ENGINE = AggregatingMergeTree()
ORDER BY id;

CREATE MATERIALIZED VIEW mv_A TO table_C AS
SELECT id,colA FROM Kafka_A;

CREATE MATERIALIZED VIEW mv_B TO table_C AS
SELECT id,colB FROM Kafka_B;

6 - Merge performance and OPTIMIZE FINAL

Merge performance and OPTIMIZE FINAL DEDUPLICATE BY expr

Merge Performance

Main things affecting the merge speed are:

  • Schema (especially compression codecs, some bad types, sorting order…)
  • Horizontal vs Vertical merge
    • Horizontal = reads all columns at once, do merge sort, write new part
    • Vertical = first read columns from order by, do merge sort, write them to disk, remember permutation, then process the rest of columns on by one, applying permutation.
  • compact vs wide parts
  • Other things like server load, concurrent merges…
SELECT name, value
FROM system.merge_tree_settings
WHERE name LIKE '%vert%';

 enable_vertical_merge_algorithm                   1      
 vertical_merge_algorithm_min_rows_to_activate     131072
 vertical_merge_algorithm_min_columns_to_activate  11
  • Vertical merge will be used if part has more than 131072 rows and more than 11 columns in the table.
-- Disable Vertical Merges
ALTER TABLE test MODIFY SETTING enable_vertical_merge_algorithm = 0
  • Horizontal merge used by default, will use more memory if there are more than 80 columns in the table

OPTIMIZE TABLE example FINAL DEDUPLICATE BY expr

When using deduplicate feature in OPTIMIZE FINAL, the question is which row will remain and won’t be deduped?

For SELECT operations Clickhouse does not guarantee the order of the resultset unless you specify ORDER BY. This random ordering is affected by different parameters, like for example max_threads.

In a merge operation ClickHouse reads rows sequentially in storage order, which is determined by ORDER BY specified in CREATE TABLE statement, and only the first unique row in that order survives deduplication. So it is a bit different from how SELECT actually works. As FINAL clause is used then ClickHouse will merge all rows across all partitions (If it is not specified then the merge operation will be done per partition), and so the first unique row of the first partition will survive deduplication. Merges are single-threaded because it is too complicated to apply merge ops in-parallel, and it generally makes no sense.

7 - Nulls in order by

Nulls in order by
  1. It is NOT RECOMMENDED for a general use
  2. Use on your own risk
  3. Use latest ClickHouse version if you need that.
CREATE TABLE x
(
    `a` Nullable(UInt32),
    `b` Nullable(UInt32),
    `cnt` UInt32
)
ENGINE = SummingMergeTree
ORDER BY (a, b)
SETTINGS allow_nullable_key = 1;
INSERT INTO x VALUES (Null,2,1), (Null,Null,1), (3, Null, 1), (4,4,1);
INSERT INTO x VALUES (Null,2,1), (Null,Null,1), (3, Null, 1), (4,4,1);
SELECT * FROM x;
┌────a─┬────b─┬─cnt─┐
    3   null    2 
    4     4    2 
  null     2    2 
  null   null    2 
└──────┴──────┴─────┘

8 - ReplacingMergeTree

ReplacingMergeTree

ReplacingMergeTree is a powerful ClickHouse MergeTree engine. It is one of the techniques that can be used to guarantee unicity or exactly once delivery in ClickHouse.

General Operations

Engine Parameters

Engine = ReplacingMergeTree([version_column],[is_deleted_column])
ORDER BY <list_of_columns>
  • ORDER BY – The ORDER BY defines the columns that need to be unique at merge time. Since merge time can not be decided most of the time, the FINAL keyword is required to remove duplicates.
  • version_column – An monotonically increasing number, which can be based on a timestamp. Used for make sure sure updates are executed in a right order.
  • is_deleted_column (23.2+ see https://github.com/ClickHouse/ClickHouse/pull/41005) – the column used to delete rows.

DML operations

  • CREATE – INSERT INTO t values(..)
  • READ – SELECT FROM t final
  • UPDATE – INSERT INTO t(..., _version) values (...), insert with incremented version
  • DELETE – INSERT INTO t(..., _version, is_deleted) values(..., 1)

FINAL

ClickHouse does not guarantee that merge will fire and replace rows using ReplacingMergeTree logic. FINAL keyword should be used in order to apply merge in a query time. It works reasonably fast when PK filter is used, but maybe slow for SELECT * type of queries:

See these links for reference:

Since 23.2, profile level final=1 can force final automatically, see https://github.com/ClickHouse/ClickHouse/pull/40945

Clickhouse merge parts only in scope of single partition, so if two rows with the same replacing key would land in different partitions, they would never be merged in single row. FINAL keyword works in other way, it merge all rows across all partitions. But that behavior can be changed viado_not_merge_across_partitions_select_final setting.

CREATE TABLE repl_tbl_part
(
    `key` UInt32,
    `value` UInt32,
    `part_key` UInt32
)
ENGINE = ReplacingMergeTree
PARTITION BY part_key
ORDER BY key;

INSERT INTO repl_tbl_part SELECT
    1 AS key,
    number AS value,
    number % 2 AS part_key
FROM numbers(4)
SETTINGS optimize_on_insert = 0;

SELECT * FROM repl_tbl_part;

┌─key─┬─value─┬─part_key─┐
   1      1         1 
   1      3         1 
└─────┴───────┴──────────┘
┌─key─┬─value─┬─part_key─┐
   1      0         0 
   1      2         0 
└─────┴───────┴──────────┘

SELECT * FROM repl_tbl_part FINAL;

┌─key─┬─value─┬─part_key─┐
   1      3         1 
└─────┴───────┴──────────┘

SELECT * FROM repl_tbl_part FINAL SETTINGS do_not_merge_across_partitions_select_final=1;

┌─key─┬─value─┬─part_key─┐
   1      3         1 
└─────┴───────┴──────────┘
┌─key─┬─value─┬─part_key─┐
   1      2         0 
└─────┴───────┴──────────┘

OPTIMIZE TABLE repl_tbl_part FINAL;

SELECT * FROM repl_tbl_part;

┌─key─┬─value─┬─part_key─┐
   1      3         1 
└─────┴───────┴──────────┘
┌─key─┬─value─┬─part_key─┐
   1      2         0 
└─────┴───────┴──────────┘

Deleting the data

  • Delete in partition: ALTER TABLE t DELETE WHERE ... in PARTITION 'partition' – slow and asynchronous, rebuilds the partition
  • Filter is_deleted in queries: SELECT ... WHERE is_deleted = 0
  • Before 23.2, use ROW POLICY to apply a filter automatically: CREATE ROW POLICY delete_masking on t using is_deleted = 0 for ALL;
  • 23.2+ ReplacingMergeTree(version, is_deleted) ORDER BY .. SETTINGS clean_deleted_rows='Always' (see https://github.com/ClickHouse/ClickHouse/pull/41005)

Other options:

  • Partition operations: ALTER TABLE t DROP PARTITION 'partition' – locks the table, drops full partition only
  • Lightwieght delete: DELETE FROM t WHERE ... – experimental

Use cases

Last state

Tested on ClickHouse 23.6 version FINAL is good in all cases

CREATE TABLE repl_tbl
(
    `key` UInt32,
    `val_1` UInt32,
    `val_2` String,
    `val_3` String,
    `val_4` String,
    `val_5` UUID,
    `ts` DateTime
)
ENGINE = ReplacingMergeTree(ts)
ORDER BY key

SYSTEM STOP MERGES repl_tbl;

INSERT INTO repl_tbl SELECT number as key, rand() as val_1, randomStringUTF8(10) as val_2, randomStringUTF8(5) as val_3, randomStringUTF8(4) as val_4, generateUUIDv4() as val_5, now() as ts FROM numbers(10000000);
INSERT INTO repl_tbl SELECT number as key, rand() as val_1, randomStringUTF8(10) as val_2, randomStringUTF8(5) as val_3, randomStringUTF8(4) as val_4, generateUUIDv4() as val_5, now() as ts FROM numbers(10000000);
INSERT INTO repl_tbl SELECT number as key, rand() as val_1, randomStringUTF8(10) as val_2, randomStringUTF8(5) as val_3, randomStringUTF8(4) as val_4, generateUUIDv4() as val_5, now() as ts FROM numbers(10000000);
INSERT INTO repl_tbl SELECT number as key, rand() as val_1, randomStringUTF8(10) as val_2, randomStringUTF8(5) as val_3, randomStringUTF8(4) as val_4, generateUUIDv4() as val_5, now() as ts FROM numbers(10000000);

SELECT count() FROM repl_tbl

┌──count()─┐
 40000000 
└──────────┘

Single key

-- GROUP BY
SELECT key, argMax(val_1, ts) as val_1, argMax(val_2, ts) as val_2, argMax(val_3, ts) as val_3, argMax(val_4, ts) as val_4, argMax(val_5, ts) as val_5, max(ts) FROM repl_tbl WHERE key = 10 GROUP BY key;
1 row in set. Elapsed: 0.008 sec.

-- ORDER BY LIMIT BY
SELECT * FROM repl_tbl WHERE key = 10 ORDER BY ts DESC LIMIT 1 BY key ;
1 row in set. Elapsed: 0.006 sec.

-- Subquery
SELECT * FROM repl_tbl WHERE key = 10 AND ts = (SELECT max(ts) FROM repl_tbl WHERE key = 10);
1 row in set. Elapsed: 0.009 sec.

-- FINAL
SELECT * FROM repl_tbl FINAL WHERE key = 10;
1 row in set. Elapsed: 0.008 sec.

Multiple keys

-- GROUP BY
SELECT key, argMax(val_1, ts) as val_1, argMax(val_2, ts) as val_2, argMax(val_3, ts) as val_3, argMax(val_4, ts) as val_4, argMax(val_5, ts) as val_5, max(ts) FROM repl_tbl WHERE key IN (SELECT toUInt32(number) FROM numbers(1000000) WHERE number % 100) GROUP BY key FORMAT Null;
Peak memory usage (for query): 2.19 GiB.
0 rows in set. Elapsed: 1.043 sec. Processed 5.08 million rows, 524.38 MB (4.87 million rows/s., 502.64 MB/s.)

-- SET optimize_aggregation_in_order=1;
Peak memory usage (for query): 349.94 MiB.
0 rows in set. Elapsed: 0.901 sec. Processed 4.94 million rows, 506.55 MB (5.48 million rows/s., 562.17 MB/s.)

-- ORDER BY LIMIT BY
SELECT * FROM repl_tbl WHERE key IN (SELECT toUInt32(number) FROM numbers(1000000) WHERE number % 100) ORDER BY ts DESC LIMIT 1 BY key FORMAT Null;
Peak memory usage (for query): 1.12 GiB.
0 rows in set. Elapsed: 1.171 sec. Processed 5.08 million rows, 524.38 MB (4.34 million rows/s., 447.95 MB/s.)

-- Subquery
SELECT * FROM repl_tbl WHERE (key, ts) IN (SELECT key, max(ts) FROM repl_tbl WHERE key IN (SELECT toUInt32(number) FROM numbers(1000000) WHERE number % 100) GROUP BY key) FORMAT Null;
Peak memory usage (for query): 197.30 MiB.
0 rows in set. Elapsed: 0.484 sec. Processed 8.72 million rows, 507.33 MB (18.04 million rows/s., 1.05 GB/s.)

-- SET optimize_aggregation_in_order=1;
Peak memory usage (for query): 171.93 MiB.
0 rows in set. Elapsed: 0.465 sec. Processed 8.59 million rows, 490.55 MB (18.46 million rows/s., 1.05 GB/s.)

-- FINAL
SELECT * FROM repl_tbl FINAL WHERE key IN (SELECT toUInt32(number) FROM numbers(1000000) WHERE number % 100) FORMAT Null;
Peak memory usage (for query): 537.13 MiB.
0 rows in set. Elapsed: 0.357 sec. Processed 4.39 million rows, 436.28 MB (12.28 million rows/s., 1.22 GB/s.)

Full table

-- GROUP BY
SELECT key, argMax(val_1, ts) as val_1, argMax(val_2, ts) as val_2, argMax(val_3, ts) as val_3, argMax(val_4, ts) as val_4, argMax(val_5, ts) as val_5, max(ts) FROM repl_tbl GROUP BY key FORMAT Null;
Peak memory usage (for query): 16.08 GiB.
0 rows in set. Elapsed: 11.600 sec. Processed 40.00 million rows, 5.12 GB (3.45 million rows/s., 441.49 MB/s.)

-- SET optimize_aggregation_in_order=1;
Peak memory usage (for query): 865.76 MiB.
0 rows in set. Elapsed: 9.677 sec. Processed 39.82 million rows, 5.10 GB (4.12 million rows/s., 526.89 MB/s.)

-- ORDER BY LIMIT BY
SELECT * FROM repl_tbl ORDER BY ts DESC LIMIT 1 BY key FORMAT Null;
Peak memory usage (for query): 8.39 GiB.
0 rows in set. Elapsed: 14.489 sec. Processed 40.00 million rows, 5.12 GB (2.76 million rows/s., 353.45 MB/s.)

-- Subquery
SELECT * FROM repl_tbl WHERE (key, ts) IN (SELECT key, max(ts) FROM repl_tbl GROUP BY key) FORMAT Null;
Peak memory usage (for query): 2.40 GiB.
0 rows in set. Elapsed: 5.225 sec. Processed 79.65 million rows, 5.40 GB (15.24 million rows/s., 1.03 GB/s.)

-- SET optimize_aggregation_in_order=1;
Peak memory usage (for query): 924.39 MiB.
0 rows in set. Elapsed: 4.126 sec. Processed 79.67 million rows, 5.40 GB (19.31 million rows/s., 1.31 GB/s.)

-- FINAL
SELECT * FROM repl_tbl FINAL FORMAT Null;
Peak memory usage (for query): 834.09 MiB.
0 rows in set. Elapsed: 2.314 sec. Processed 38.80 million rows, 4.97 GB (16.77 million rows/s., 2.15 GB/s.)

8.1 - ReplacingMergeTree does not collapse duplicates

ReplacingMergeTree does not collapse duplicates

Hi there, I have a question about replacing merge trees. I have set up a Materialized View with ReplacingMergeTree table, but even if I call optimize on it, the parts don’t get merged. I filled that table yesterday, nothing happened since then. What should I do?

Merges are eventual and may never happen. It depends on the number of inserts that happened after, the number of parts in the partition, size of parts. If the total size of input parts are greater than the maximum part size then they will never be merged.

https://clickhouse.tech/docs/en/operations/settings/merge-tree-settings/#max-bytes-to-merge-at-max-space-in-pool

https://clickhouse.tech/docs/en/engines/table-engines/mergetree-family/replacingmergetree/ ReplacingMergeTree is suitable for clearing out duplicate data in the background in order to save space, but it doesn’t guarantee the absence of duplicates.

9 - Skip index

Skip index
--(1) create test table
drop table if exists test;
create table test
(
    version UInt32
    ,id UInt32
    ,state UInt8
    ,INDEX state_idx (state) type set(0) GRANULARITY 1
) ENGINE ReplacingMergeTree(version)
      ORDER BY (id);

--(2) insert sample data
INSERT INTO test (version, id, state) VALUES (1,1,1);
INSERT INTO test (version, id, state) VALUES (2,1,0);
INSERT INTO test (version, id, state) VALUES (3,1,1);

--(3) check the result:
-- expected 3, 1, 1
select version, id, state from test final;
┌─version─┬─id─┬─state─┐
       3   1      1 
└─────────┴────┴───────┘

-- expected empty result
select version, id, state from test final where state=0;
┌─version─┬─id─┬─state─┐
       2   1      0 
└─────────┴────┴───────┘

10 - SummingMergeTree

SummingMergeTree

Nested structures

In certain conditions it could make sense to collapse one of dimensions to set of arrays. It’s usually profitable to do if this dimension is not commonly used in queries. It would reduce amount of rows in aggregated table and speed up queries which doesn’t care about this dimension in exchange of aggregation performance by collapsed dimension.

CREATE TABLE traffic
(
    `key1` UInt32,
    `key2` UInt32,
    `port` UInt16,
    `bits_in` UInt32 CODEC (T64,LZ4),
    `bits_out` UInt32 CODEC (T64,LZ4),
    `packets_in` UInt32 CODEC (T64,LZ4),
    `packets_out` UInt32 CODEC (T64,LZ4)
)
ENGINE = SummingMergeTree
ORDER BY (key1, key2, port);

INSERT INTO traffic SELECT
    number % 1000,
    intDiv(number, 10000),
    rand() % 20,
    rand() % 753,
    rand64() % 800,
    rand() % 140,
    rand64() % 231
FROM numbers(100000000);

CREATE TABLE default.traffic_map
(
    `key1` UInt32,
    `key2` UInt32,
    `bits_in` UInt32 CODEC(T64, LZ4),
    `bits_out` UInt32 CODEC(T64, LZ4),
    `packets_in` UInt32 CODEC(T64, LZ4),
    `packets_out` UInt32 CODEC(T64, LZ4),
    `portMap.port` Array(UInt16),
    `portMap.bits_in` Array(UInt32) CODEC(T64, LZ4),
    `portMap.bits_out` Array(UInt32) CODEC(T64, LZ4),
    `portMap.packets_in` Array(UInt32) CODEC(T64, LZ4),
    `portMap.packets_out` Array(UInt32) CODEC(T64, LZ4)
)
ENGINE = SummingMergeTree
ORDER BY (key1, key2);

INSERT INTO traffic_map WITH rand() % 20 AS port
SELECT
    number % 1000 AS key1,
    intDiv(number, 10000) AS key2,
    rand() % 753 AS bits_in,
    rand64() % 800 AS bits_out,
    rand() % 140 AS packets_in,
    rand64() % 231 AS packets_out,
    [port],
    [bits_in],
    [bits_out],
    [packets_in],
    [packets_out]
FROM numbers(100000000);

┌─table───────┬─column──────────────┬─────rows─┬─compressed─┬─uncompressed─┬──ratio─┐
 traffic      bits_out             80252317  109.09 MiB  306.14 MiB      2.81 
 traffic      bits_in              80252317  108.34 MiB  306.14 MiB      2.83 
 traffic      port                 80252317  99.21 MiB   153.07 MiB      1.54 
 traffic      packets_out          80252317  91.36 MiB   306.14 MiB      3.35 
 traffic      packets_in           80252317  84.61 MiB   306.14 MiB      3.62 
 traffic      key2                 80252317  47.88 MiB   306.14 MiB      6.39 
 traffic      key1                 80252317  1.38 MiB    306.14 MiB    221.42 
 traffic_map  portMap.bits_out     10000000  108.96 MiB  306.13 MiB      2.81 
 traffic_map  portMap.bits_in      10000000  108.32 MiB  306.13 MiB      2.83 
 traffic_map  portMap.port         10000000  92.00 MiB   229.36 MiB      2.49 
 traffic_map  portMap.packets_out  10000000  90.95 MiB   306.13 MiB      3.37 
 traffic_map  portMap.packets_in   10000000  84.19 MiB   306.13 MiB      3.64 
 traffic_map  key2                 10000000  23.46 MiB   38.15 MiB       1.63 
 traffic_map  bits_in              10000000  15.59 MiB   38.15 MiB       2.45 
 traffic_map  bits_out             10000000  15.59 MiB   38.15 MiB       2.45 
 traffic_map  packets_out          10000000  13.22 MiB   38.15 MiB       2.89 
 traffic_map  packets_in           10000000  12.62 MiB   38.15 MiB       3.02 
 traffic_map  key1                 10000000  180.29 KiB  38.15 MiB     216.66 
└─────────────┴─────────────────────┴──────────┴────────────┴──────────────┴────────┘

-- Queries

SELECT
    key1,
    sum(packets_in),
    sum(bits_out)
FROM traffic
GROUP BY key1
FORMAT `Null`

0 rows in set. Elapsed: 0.488 sec. Processed 80.25 million rows, 963.03 MB (164.31 million rows/s., 1.97 GB/s.)

SELECT
    key1,
    sum(packets_in),
    sum(bits_out)
FROM traffic_map
GROUP BY key1
FORMAT `Null`

0 rows in set. Elapsed: 0.063 sec. Processed 10.00 million rows, 120.00 MB (159.43 million rows/s., 1.91 GB/s.)


SELECT
    key1,
    port,
    sum(packets_in),
    sum(bits_out)
FROM traffic
GROUP BY
    key1,
    port
FORMAT `Null`

0 rows in set. Elapsed: 0.668 sec. Processed 80.25 million rows, 1.12 GB (120.14 million rows/s., 1.68 GB/s.)

WITH arrayJoin(arrayZip(untuple(sumMap(portMap.port, portMap.packets_in, portMap.bits_out)))) AS tpl
SELECT
    key1,
    tpl.1 AS port,
    tpl.2 AS packets_in,
    tpl.3 AS bits_out
FROM traffic_map
GROUP BY key1
FORMAT `Null`

0 rows in set. Elapsed: 0.915 sec. Processed 10.00 million rows, 1.08 GB (10.93 million rows/s., 1.18 GB/s.)

11 - VersionedCollapsingMergeTree

VersionedCollapsingMergeTree

When you have an incoming event stream with duplicates and updates you have a big challenge building a consistent row state inside the Clickhouse table.

ReplacingMergeTree is a great engine for that and there are a lot of blog posts on how to apply it for that particular purpose. But there is a serious problem - you can’t use another very important feature - aggregating rows by Materialized Views or Projections on top of the ReplacingMT table, because duplicates and updates will not be deduplicated and calculated aggregates (like sum or count) will be incorrect. For big amounts of data, it’s become critical because aggregating raw data during report queries will take too much time.

Another drawback of ReplacingMergeTree is unfinished support for DELETEs. While for the newest versions of Clickhouse, it’s possible to add the is_deleted to ReplacingMergeTree parameters, the necessity of manually filtering out deleted rows even after FINAL processing makes it less useful.

Clickhouse has other table engines that can be used quite well for dealing with UPDATEs and DELETEs - CollapsingMergeTree and VersionedCollapsingMergeTree.

Both of them use the concept of inserting a “rollback row” to compensate for the previous insert. The difference between CollapsingMergeTree and VersionedCollapsingMergeTree is in the algorithm of collapsing. For Cluster configurations, it’s very important to understand what row came first and who should replace whom. That is why using ReplicatedVersionedCollapsingMergeTree is mandatory for Replicated Clusters.

When dealing with such complicated data streams, it needs to be solved 3 tasks simultaneously:

  • remove duplicates
  • process updates and deletes
  • calculate correct aggregates

Let’s explain in several steps how to do it in Clickhouse with some tricky SQL code.

Row deduplication

There are a lot of ways to remove duplicates from the event stream. The most effective is block deduplication when Clickhouse drop inserts blocks with the same checksum (or tag). But it requires building a smart ingest procedure.

But it’s possible to use another method - checking that particular row is already presented in the destination table and not insert it again. To get reliable results, such a process should be executed in 1 thread on 1 cluster node. That can be possible only for not-too-active event streams (like 100k/sec). For heavier streams some sort of partitioning is needed while inserting data with different PK to different shards or replicas or even on the same node.

The example of row deduplication:

create table Example1 (id Int64, metric UInt64) 
engine = MergeTree order by id;

create table Example1Null engine = Null as Example1;

create materialized view __Example1 to Example1 as
select * from Example1Null 
where id not in (
   select id from Example1 where id in (
      select id from Example1Null
   )
);

Here is the trick:

  • use Null table and MatView to be able access both insert block and the dest table
  • check existance of ids in dest table with fast index scan by primary key using IN operator
  • filter existing rows from insert block by NOT IN operator

Insert block in most cases has not too many rows (like 1000-100k), so checking dest table for their existence by scanning Primary Key (residing in memory) won’t take much time, but due to high table’s index granularity can be still noticeble on high load. If it’s possible better to reduce index granularity at least to 4096 (from default 8192).

Last row state

To process updates in CollapsingMergeTree it needs to know “last row state” to insert the “compensation row”. Sometimes it’s possible - CDC events coming from MySQL’s binlog or Postgres’s WAL contains not only “new” data but also “old” values. If one of columns contains timestamp of row’s update time it can be used as row’s “version”. But in most cases incoming event stream does not have old metric values and suitable version information. In this case we can get that data by looking into Clickhouse table the same way as we do for row deduplication in previous example.

create table Example2 (id Int64, metric UInt64, sign Int8) 
engine = CollapsingMergeTree(sign) order by id;

create table Example2Null engine = Null as Example2;

create materialized view __Example2 to Example3 as
with _old as (
   select *, arrayJoin([-1,1]) as _sign 
   from Example2 where id in (select id from Example2Null)
   )
select id,
       if(_old._sign=-1, _old.metric, _new.metric) as metric
from Example2Null as _new
join _old using id;

Here I read more data from Example2 table compared to Example1. Instead of simple checking the row existance by IN operator, a JOIN with existed rows used for building “compensate row”.

The trick with arrayJoin is needed to insert two rows as it required for CollapsingMergeTree table.

Don’t try to run code above. It’s just a short explanation of the idea, lucking many needed elements.

Replace by collapsing

Here is more realistic example, that can be played with:

create table Example3 
(
    id              Int32,   
    metric1         UInt32,
    metric2         UInt32,
    _version        UInt64,
    sign            Int8 default 1
) engine = VersionedCollapsingMergeTree(sign, _version)
ORDER BY id
;
create table Stage engine=Null as Example3 ;

create materialized view Example3Transform to Example3 as
with __new as ( SELECT * FROM Stage order by sign desc, _version desc limit 1 by id ),
     __old AS ( SELECT *, arrayJoin([-1,1]) AS _sign from
                 ( select * FROM Example3 final
                   PREWHERE id IN (SELECT id FROM __new)
                   where sign = 1
                 )
    )
select id,
    if(__old._sign = -1, __old.metric1, __new.metric1)   AS metric1,
    if(__old._sign = -1, __old.metric2, __new.metric2)   AS metric2,
    if(__old._sign = -1, __old._version, __new._version) AS _version,
    if(__old._sign = -1, -1, 1)                          AS sign
from __new left join __old
using id
where if(__new.sign=-1,
  __old._sign = -1,                -- insert only delete row if it's found in old data
  __new._version > __old._version  -- skip duplicates for updates
);

-- original
insert into Stage values (1,1,1,1,1), (2,2,2,1,1);
select 'step1',* from Example3 ;

-- no duplicates (with the same version) inserted
insert into Stage values (1,3,1,1,1),(2,3,2,1,1);
select 'step2',* from Example3 ;

-- delete a row with id=2. version for delete row does not have any meaning
insert into Stage values (2,2,2,0,-1);
select 'step3',* from Example3 final;

-- replace a row with id=1. row with sign=-1 not needed, but can be in the insert blocks (will be skipped)
insert into Stage values (1,1,1,0,-1),(1,3,3,2,1);
select 'step4',* from Example3 final;

Output:

step1	1	1	1	1	1
step1	2	2	2	1	1
step2	1	1	1	1	1
step2	2	2	2	1	1
step3	1	1	1	1	1
step4	1	3	3	2	1

Important additions:

  • filtering insert block to get only 1 (latest) row, if there are inserted many rows with same id
  • using FINAL and PREWHERE (to speed up FINAL) while reading main (dest) table
  • filter to skip out-of-order events by checking version
  • DELETE event processing (inside last WHERE)

Adding projections

Let’s finally add aggregating projection together with more useful updated_at timestamp instead of abstract _version.

https://fiddle.clickhouse.com/3140d341-ccc5-4f57-8fbf-55dbf4883a21

create table Example4 
(
    id              Int32,   
    metric1         UInt32,
    Smetric1        alias metric1*sign,
    metric2         UInt32,
    dim1            LowCardinality(String),
    updated_at      DateTime64(3) default now64(3),
    sign            Int8 default 1,
-- incoming event stream is deduplicated so I can do stream aggregation
    PROJECTION byDim1  (
        select dim1, sum(metric1*sign) group by dim1
    )
) engine = VersionedCollapsingMergeTree(sign, updated_at)
ORDER BY id
;
create table Stage engine=Null as Example4 ;

create materialized view Example4Transform to Example4 as
with __new as ( SELECT * FROM Stage order by sign desc, updated_at desc limit 1 by id ),
     __old AS ( SELECT *, arrayJoin([-1,1]) AS _sign from
                 ( select * FROM Example4 final
                   PREWHERE id IN (SELECT id FROM __new)
                   where sign = 1
                 )
    )
select id,
    if(__old._sign = -1, __old.metric1, __new.metric1)   AS metric1,
    if(__old._sign = -1, __old.metric2, __new.metric2) AS metric2,
    if(__old._sign = -1, __old.dim1, __new.dim1) AS dim1,
    if(__old._sign = -1, __old.updated_at, __new.updated_at) AS updated_at,
    if(__old._sign = -1, -1, 1)                          AS sign
from __new left join __old using id
where if(__new.sign=-1,
  __old._sign = -1,                -- insert only delete row if it's found in old data
  __new.updated_at > __old.updated_at  -- skip duplicates for updates
);

-- original
insert into Stage(id,metric1,metric2,dim1) values (1,1,1,'d'), (2,2,2,'d');
select 'step1',* from Example4 ;
select 'proj1',dim1, sum(Smetric1) from Example4 group by dim1;

-- delete a row with id=2
insert into Stage(id,metric1,metric2,sign) values (2,2,2,-1);
select 'step2',* from Example4 final;
select 'proj2',dim1, sum(Smetric1) from Example4 group by dim1;

-- replace a row with id=1. row with sign=-1 not needed, but can be in the insert blocks (will be skipped)
insert into Stage(id,metric1,metric2,dim1,sign) values (1,1,1,'',-1),(1,3,3,'d',1);
select 'step3',* from Example4 final;
select 'proj3',dim1, sum(Smetric1) from Example4 group by dim1;

Output:

step1	1	1	1	d	2024-03-03 15:58:23.232	1
step1	2	2	2	d	2024-03-03 15:58:23.232	1
proj1	d	3
step2	1	1	1	d	2024-03-03 15:58:23.232	1
proj2	d	1
step3	1	3	3	d	2024-03-03 15:58:23.292	1
proj3	d	3

Combine old and new

As the bonus I will use presented techique to reimplement AggregatingMergeTree algorithm with combining old row with new row with VersionedCollapsingMergeTree.

https://fiddle.clickhouse.com/e1d7e04c-f1d6-4a25-9aac-1fe2b543c693

create table Example5 
(
    id              Int32,   
    metric1         UInt32,
    metric2         Nullable(UInt32),
    updated_at      DateTime64(3) default now64(3),
    sign            Int8 default 1
) engine = VersionedCollapsingMergeTree(sign, updated_at)
ORDER BY id
;
create table Stage engine=Null as Example5 ;
  
create materialized view Example5Transform to Example5 as
with __new as ( SELECT * FROM Stage order by sign desc, updated_at desc limit 1 by id ),
     __old AS ( SELECT *, arrayJoin([-1,1]) AS _sign from
                 ( select * FROM Example5 final
                   PREWHERE id IN (SELECT id FROM __new)
                   where sign = 1
                 )
    )
select id,
    if(__old._sign = -1, __old.metric1, greatest(__new.metric1, __old.metric1)) AS metric1,    
    if(__old._sign = -1, __old.metric2, ifNull(__new.metric2, __old.metric2)) AS metric2,
    if(__old._sign = -1, __old.updated_at, __new.updated_at) AS updated_at,
    if(__old._sign = -1, -1, 1)                          AS sign
from __new left join __old using id
where if(__new.sign=-1,
  __old._sign = -1,                -- insert only delete row if it's found in old data
  __new.updated_at > __old.updated_at  -- skip duplicates for updates
);

-- original
insert into Stage(id) values (1), (2);
select 'step0',* from Example5 ;

insert into Stage(id,metric1) values (1,1), (2,2);
select 'step1',* from Example5 final;

insert into Stage(id,metric2) values (1,11), (2,12);
select 'step2',* from Example5 final ;

Output:

step0	1	0	\N	2024-03-03 15:48:21.588	1
step0	2	0	\N	2024-03-03 15:48:21.588	1
step1	1	1	\N	2024-03-03 15:48:21.599	1
step1	2	2	\N	2024-03-03 15:48:21.599	1
step2	1	1	11	2024-03-03 15:48:21.612	1
step2	2	2	12	2024-03-03 15:48:21.612	1

Complex Primary Key

In the examples above I use for PK a very simple a compact column with In64 type. When it’s possible better to go such a way. SnowFlakeId is the best variant and can be easily created during INSERT from DateTime and hash of one or several important columns. But sometimes it needs to have a more complicated PK as when storing data for multiple Tenant (Customer, Partners, etc) in the same table. It’s not a problem for suggested technique - just use all the needed columns in all filter and JOIN operations.

create table Example1 
(
    id              Int64,  
    tenant_id       Int32, 
    metric1         UInt32,
    _version        UInt64,
    sign            Int8 default 1
) engine = VersionedCollapsingMergeTree(sign, _version)
ORDER BY (tenant_id,id)
;
create table Stage engine=Null as Example1 ;

create materialized view Example1Transform to Example1 as
with __new as ( SELECT * FROM Stage order by sign desc, _version desc limit 1 by tenant_id,id ),
     __old AS ( SELECT *, arrayJoin([-1,1]) AS _sign from
                 ( select * FROM Example1 final
                   PREWHERE (tenant_id,id) IN (SELECT tenant_id,id FROM __new)
                   where sign = 1
                 )
    )
select id,tenant_id,
    if(__old._sign = -1, __old.metric1, __new.metric1)   AS metric1,
    if(__old._sign = -1, __old._version, __new._version) AS _version,
    if(__old._sign = -1, -1, 1)                          AS sign
from __new left join __old
using (tenant_id,id)
where if(__new.sign=-1,
  __old._sign = -1,                -- insert only delete row if it's found in old data
  __new._version > __old._version  -- skip duplicates for updates
);