1 - CollapsingMergeTree vs ReplacingMergeTree

CollapsingMergeTree vs ReplacingMergeTree.

CollapsingMergeTree vs ReplacingMergeTree

ReplacingMergeTree CollapsingMergeTree
+ very easy to use (always replace) - more complex (accounting-alike, put ‘rollback’ records to fix something)
+ you don’t need to store the previous state of the row - you need to the store (somewhere) the previous state of the row, OR extract it from the table itself (point queries is not nice for ClickHouse)
- no deletes + support deletes
- w/o FINAL - you can can always see duplicates, you need always to ‘pay’ FINAL performance penalty + properly crafted query can give correct results without final (i.e. sum(amount * sign) will be correct, no matter of you have duplicated or not)
- only uniq()-alike things can be calculated in materialied views + you can do basic counts & sums in materialized views

2 - Part names & MVCC

Part names & multiversion concurrency control.

Part names & multiversion concurrency control

Part name format is:

<partitionid>_<min_block_number>_<max_block_number>_<level>_<data_version>

system.parts contains all the information parsed.

partitionid is quite simple (it just comes from your partitioning key).

What are block_numbers?

DROP TABLE IF EXISTS part_names;
create table part_names (date Date, n UInt8, m UInt8) engine=MergeTree PARTITION BY toYYYYMM(date) ORDER BY n;

insert into part_names VALUES (now(), 0, 0);
select name, partition_id, min_block_number, max_block_number, level, data_version from system.parts where table = 'part_names' and active;
┌─name─────────┬─partition_id─┬─min_block_number─┬─max_block_number─┬─level─┬─data_version─┐
│ 202203_1_1_0 │ 202203       │                1 │                1 │     0 │            1 │
└──────────────┴──────────────┴──────────────────┴──────────────────┴───────┴──────────────┘

insert into part_names VALUES (now(), 0, 0);
select name, partition_id, min_block_number, max_block_number, level, data_version from system.parts where table = 'part_names' and active;
┌─name─────────┬─partition_id─┬─min_block_number─┬─max_block_number─┬─level─┬─data_version─┐
│ 202203_1_1_0 │ 202203       │                1 │                1 │     0 │            1 │
│ 202203_2_2_0 │ 202203       │                2 │                2 │     0 │            2 │
└──────────────┴──────────────┴──────────────────┴──────────────────┴───────┴──────────────┘

insert into part_names VALUES (now(), 0, 0);
select name, partition_id, min_block_number, max_block_number, level, data_version from system.parts where table = 'part_names' and active;
┌─name─────────┬─partition_id─┬─min_block_number─┬─max_block_number─┬─level─┬─data_version─┐
│ 202203_1_1_0 │ 202203       │                1 │                1 │     0 │            1 │
│ 202203_2_2_0 │ 202203       │                2 │                2 │     0 │            2 │
│ 202203_3_3_0 │ 202203       │                3 │                3 │     0 │            3 │
└──────────────┴──────────────┴──────────────────┴──────────────────┴───────┴──────────────┘

As you can see every insert creates a new incremental block_number which is written in part names both as <min_block_number> and <min_block_number> (and the level is 0 meaning that the part was never merged).

Those block numbering works in the scope of partition (for Replicated table) or globally across all partition (for plain MergeTree table).

ClickHouse always merge only continuous blocks . And new part names always refer to the minimum and maximum block numbers.

OPTIMIZE TABLE part_names;

┌─name─────────┬─partition_id─┬─min_block_number─┬─max_block_number─┬─level─┬─data_version─┐
│ 202203_1_3_1 │ 202203       │                1 │                3 │     1 │            1 │
└──────────────┴──────────────┴──────────────────┴──────────────────┴───────┴──────────────┘

As you can see here - three parts (with block number 1,2,3) were merged and they formed the new part with name 1_3 as min/max block size. Level get incremented.

Now even while previous (merged) parts still exists in filesystem for a while (as inactive) clickhouse is smart enough to understand that new part ‘covers’ same range of blocks as 3 parts of the prev ‘generation’

There might be a fifth section in the part name, data version.

Data version gets increased when a part mutates.

Every mutation takes one block number:

insert into part_names VALUES (now(), 0, 0);
insert into part_names VALUES (now(), 0, 0);
insert into part_names VALUES (now(), 0, 0);

select name, partition_id, min_block_number, max_block_number, level, data_version from system.parts where table = 'part_names' and active;

┌─name─────────┬─partition_id─┬─min_block_number─┬─max_block_number─┬─level─┬─data_version─┐
│ 202203_1_3_1 │ 202203       │                1 │                3 │     1 │            1 │
│ 202203_4_4_0 │ 202203       │                4 │                4 │     0 │            4 │
│ 202203_5_5_0 │ 202203       │                5 │                5 │     0 │            5 │
│ 202203_6_6_0 │ 202203       │                6 │                6 │     0 │            6 │
└──────────────┴──────────────┴──────────────────┴──────────────────┴───────┴──────────────┘

insert into part_names VALUES (now(), 0, 0);

alter table part_names update m=n where 1;

select name, partition_id, min_block_number, max_block_number, level, data_version from system.parts where table = 'part_names' and active;

┌─name───────────┬─partition_id─┬─min_block_number─┬─max_block_number─┬─level─┬─data_version─┐
│ 202203_1_3_1_7 │ 202203       │                1 │                3 │     1 │            7 │
│ 202203_4_4_0_7 │ 202203       │                4 │                4 │     0 │            7 │
│ 202203_5_5_0_7 │ 202203       │                5 │                5 │     0 │            7 │
│ 202203_6_6_0_7 │ 202203       │                6 │                6 │     0 │            7 │
│ 202203_8_8_0   │ 202203       │                8 │                8 │     0 │            8 │
└────────────────┴──────────────┴──────────────────┴──────────────────┴───────┴──────────────┘

OPTIMIZE TABLE part_names;

select name, partition_id, min_block_number, max_block_number, level, data_version from system.parts where table = 'part_names' and active;
┌─name───────────┬─partition_id─┬─min_block_number─┬─max_block_number─┬─level─┬─data_version─┐
│ 202203_1_8_2_7 │ 202203       │                1 │                8 │     2 │            7 │
└────────────────┴──────────────┴──────────────────┴──────────────────┴───────┴──────────────┘

3 - How to pick an ORDER BY / PRIMARY KEY / PARTITION BY for the MergeTree-family table

How to pick an ORDER BY / PRIMARY KEY / PARTITION BY for the MergeTree table.

How to pick an ORDER BY / PRIMARY KEY

Good order by usually have 3 to 5 columns, from lowest cardinal on the left (and the most important for filtering) to highest cardinal (and less important for filtering).

Practical approach to create an good ORDER BY for a table:

  1. Pick the columns you use in filtering always
  2. The most important for filtering and the lowest cardinal should be the left-most. Typically it’s something like tenant_id
  3. Next column is more cardinal, less important. It can be rounded time sometimes, or site_id, or source_id, or group_id or something similar.
  4. repeat p.3 once again (or few times)
  5. if you added already all columns important for filtering and you still not addressing a single row with you pk - you can add more columns which can help to put similar records close to each other (to improve the compression)
  6. if you have something like hierarchy / tree-like relations between the columns - put there the records from ‘root’ to ’leaves’ for example (continent, country, cityname). This way clickhouse can do lookup by country / city even if continent is not specified (it will just ‘check all continents’) special variants of MergeTree may require special ORDER BY to make the record unique etc.

Some examples or good order by

ORDER BY (tenantid, site_id, utm_source, clientid, timestamp)
ORDER BY (site_id, toStartOfHour(timestamp), sessionid, timestamp )
PRIMARY KEY (site_id, toStartOfHour(timestamp), sessionid)

For Summing / Aggregating

All dimensions go to ORDER BY, all metrics - outside of that.

The most important for filtering columns with the lowest cardinality should be the left most.

If number of dimensions is high it’s typically make sense to use a prefix of ORDER BY as a PRIMARY KEY to avoid polluting sparse index.

Examples:

ORDER BY (tenant_id, hour, country_code, team_id, group_id, source_id)
PRIMARY KEY (tenant_id, hour, country_code, team_id)

For Replacing / Collapsing

You need to keep all ‘mutable’ columns outside of ORDER BY, and have some unique id (a base to collapse duplicates) inside. Typically the right-most column is some row identifier. And it’s often not needed in sparse index (so PRIMARY KEY can be a prefix of ORDER BY) The rest consideration are the same.

Examples:

ORDER BY (tenantid, site_id, eventid) --  utm_source is mutable, while tenantid, site_id is not
PRIMARY KEY (tenantid, site_id) -- eventid is not used for filtering, needed only for collapsing duplicates

PARTITION BY

  • Good size for single partition is something like 1-300Gb.
  • For Summing/Replacing a bit smaller (400Mb-40Gb)
  • Better to avoid touching more that few dozens of partitions with typical SELECT query.
  • Single insert should bring data to one or few partitions.
  • The number of partitons in table - dozen or hundreds, not thousands.

The size of partitions you can check in system.parts table.

Examples:

-- for time-series:
PARTITION BY toYYYY(timestamp)          -- long retention, not too much data
PARTITION BY toYYYYMM(timestamp)        --  
PARTITION BY toMonday(timestamp)        -- 
PARTITION BY toDate(timestamp)          --
PARTITION BY toStartOfHour(timestamp)   -- short retention, lot of data

-- for table with some incremental (non time-bounded) counter

PARTITION BY intDiv(transaction_id, 1000000)

-- for some dimention tables (always requested with WHERE userid)
PARTITION BY userid % 16

For the small tables (smaller than few gigabytes) partitioning is usually not needed at all (just skip PARTITION BY expresssion when you create the table).

4 - AggregatingMergeTree

AggregatingMergeTree

Q. What happens with columns which are nor the part of ORDER BY key, nor have the AggregateFunction type?

A. it picks the first value met, (similar to any)

CREATE TABLE agg_test
(
    `a` String,
    `b` UInt8,
    `c` SimpleAggregateFunction(max, UInt8)
)
ENGINE = AggregatingMergeTree
ORDER BY a;

INSERT INTO agg_test VALUES ('a', 1, 1);
INSERT INTO agg_test VALUES ('a', 2, 2);

SELECT * FROM agg_test FINAL;

┌─a─┬─b─┬─c─┐
 a  1  2 
└───┴───┴───┘

INSERT INTO agg_test VALUES ('a', 3, 3);

SELECT * FROM agg_test;

┌─a─┬─b─┬─c─┐
 a  1  2 
└───┴───┴───┘
┌─a─┬─b─┬─c─┐
 a  3  3 
└───┴───┴───┘

OPTIMIZE TABLE agg_test FINAL;

SELECT * FROM agg_test;

┌─a─┬─b─┬─c─┐
 a  1  3 
└───┴───┴───┘

Last non-null value for each column

CREATE TABLE test_last
(
    `col1` Int32,
    `col2` SimpleAggregateFunction(anyLast, Nullable(DateTime)),
    `col3` SimpleAggregateFunction(anyLast, Nullable(DateTime))
)
ENGINE = AggregatingMergeTree
ORDER BY col1

Ok.

0 rows in set. Elapsed: 0.003 sec.

INSERT INTO test_last (col1, col2) VALUES (1, now());

Ok.

1 rows in set. Elapsed: 0.014 sec.

INSERT INTO test_last (col1, col3) VALUES (1, now())

Ok.

1 rows in set. Elapsed: 0.006 sec.

SELECT
    col1,
    anyLast(col2),
    anyLast(col3)
FROM test_last
GROUP BY col1

┌─col1─┬───────anyLast(col2)─┬───────anyLast(col3)─┐
│    1 │ 2020-01-16 20:57:46 │ 2020-01-16 20:57:51 │
└──────┴─────────────────────┴─────────────────────┘

1 rows in set. Elapsed: 0.005 sec.

SELECT *
FROM test_last
FINAL

┌─col1─┬────────────────col2─┬────────────────col3─┐
│    1 │ 2020-01-16 20:57:46 │ 2020-01-16 20:57:51 │
└──────┴─────────────────────┴─────────────────────┘

1 rows in set. Elapsed: 0.003 sec. 

6 - Nulls in order by

Nulls in order by
  1. It is NOT RECOMMENDED for a general use
  2. Use on your own risk
  3. Use latest ClickHouse version if you need that.
CREATE TABLE x
(
    `a` Nullable(UInt32),
    `b` Nullable(UInt32),
    `cnt` UInt32
)
ENGINE = SummingMergeTree
ORDER BY (a, b)
SETTINGS allow_nullable_key = 1;
INSERT INTO x VALUES (Null,2,1), (Null,Null,1), (3, Null, 1), (4,4,1);
INSERT INTO x VALUES (Null,2,1), (Null,Null,1), (3, Null, 1), (4,4,1);
SELECT * FROM x;
┌────a─┬────b─┬─cnt─┐
    3   null    2 
    4     4    2 
  null     2    2 
  null   null    2 
└──────┴──────┴─────┘

7 - ReplacingMergeTree

ReplacingMergeTree

Last state

CREATE TABLE repl_tbl
(
    `key` UInt32,
    `val_1` UInt32,
    `val_2` String,
    `val_3` String,
    `val_4` String,
    `val_5` UUID,
    `ts` DateTime
)
ENGINE = ReplacingMergeTree(ts)
ORDER BY key

SYSTEM STOP MERGES repl_tbl;

INSERT INTO repl_tbl SELECT number as key, rand() as val_1, randomStringUTF8(10) as val_2, randomStringUTF8(5) as val_3, randomStringUTF8(4) as val_4, generateUUIDv4() as val_5, now() as ts FROM numbers(10000000);
INSERT INTO repl_tbl SELECT number as key, rand() as val_1, randomStringUTF8(10) as val_2, randomStringUTF8(5) as val_3, randomStringUTF8(4) as val_4, generateUUIDv4() as val_5, now() as ts FROM numbers(10000000);
INSERT INTO repl_tbl SELECT number as key, rand() as val_1, randomStringUTF8(10) as val_2, randomStringUTF8(5) as val_3, randomStringUTF8(4) as val_4, generateUUIDv4() as val_5, now() as ts FROM numbers(10000000);
INSERT INTO repl_tbl SELECT number as key, rand() as val_1, randomStringUTF8(10) as val_2, randomStringUTF8(5) as val_3, randomStringUTF8(4) as val_4, generateUUIDv4() as val_5, now() as ts FROM numbers(10000000);

SELECT count() FROM repl_tbl

┌──count()─┐
 50000000 
└──────────┘

Single key

-- GROUP BY
SELECT key, argMax(val_1, ts) as val_1, argMax(val_2, ts) as val_2, argMax(val_3, ts) as val_3, argMax(val_4, ts) as val_4, argMax(val_5, ts) as val_5, max(ts) FROM repl_tbl WHERE key = 10 GROUP BY key;
1 rows in set. Elapsed: 0.017 sec. Processed 40.96 thousand rows, 5.24 MB (2.44 million rows/s., 312.31 MB/s.)

-- ORDER BY LIMIT BY
SELECT * FROM repl_tbl WHERE key = 10 ORDER BY ts DESC LIMIT 1 BY key ;
1 rows in set. Elapsed: 0.017 sec. Processed 40.96 thousand rows, 5.24 MB (2.39 million rows/s., 305.41 MB/s.)

-- Subquery
SELECT * FROM repl_tbl WHERE key = 10 AND ts = (SELECT max(ts) FROM repl_tbl WHERE key = 10);
1 rows in set. Elapsed: 0.019 sec. Processed 40.96 thousand rows, 1.18 MB (2.20 million rows/s., 63.47 MB/s.)

-- FINAL
SELECT * FROM repl_tbl FINAL WHERE key = 10;
1 rows in set. Elapsed: 0.021 sec. Processed 40.96 thousand rows, 5.24 MB (1.93 million rows/s., 247.63 MB/s.)

Multiple keys

-- GROUP BY
SELECT key, argMax(val_1, ts) as val_1, argMax(val_2, ts) as val_2, argMax(val_3, ts) as val_3, argMax(val_4, ts) as val_4, argMax(val_5, ts) as val_5, max(ts) FROM repl_tbl WHERE key IN (SELECT toUInt32(number) FROM numbers(1000000) WHERE number % 100) GROUP BY key FORMAT Null;
Peak memory usage (for query): 2.31 GiB.
0 rows in set. Elapsed: 3.264 sec. Processed 5.04 million rows, 645.01 MB (1.54 million rows/s., 197.60 MB/s.)

-- set optimize_aggregation_in_order=1;
Peak memory usage (for query): 1.11 GiB.
0 rows in set. Elapsed: 1.772 sec. Processed 2.74 million rows, 350.30 MB (1.54 million rows/s., 197.73 MB/s.)

-- ORDER BY LIMIT BY
SELECT * FROM repl_tbl WHERE key IN (SELECT toUInt32(number) FROM numbers(1000000) WHERE number % 100) ORDER BY ts DESC LIMIT 1 BY key FORMAT Null;
Peak memory usage (for query): 1.08 GiB.
0 rows in set. Elapsed: 2.429 sec. Processed 5.04 million rows, 645.01 MB (2.07 million rows/s., 265.58 MB/s.)

-- Subquery
SELECT * FROM repl_tbl WHERE (key, ts) IN (SELECT key, max(ts) FROM repl_tbl WHERE key IN (SELECT toUInt32(number) FROM numbers(1000000) WHERE number % 100) GROUP BY key) FORMAT Null;
Peak memory usage (for query): 432.57 MiB.
0 rows in set. Elapsed: 0.939 sec. Processed 5.04 million rows, 160.33 MB (5.36 million rows/s., 170.69 MB/s.)

-- set optimize_aggregation_in_order=1;
Peak memory usage (for query): 202.88 MiB.
0 rows in set. Elapsed: 0.824 sec. Processed 5.04 million rows, 160.33 MB (6.11 million rows/s., 194.58 MB/s.)

-- FINAL
SELECT * FROM repl_tbl FINAL WHERE key IN (SELECT toUInt32(number) FROM numbers(1000000) WHERE number % 100) FORMAT Null;
Peak memory usage (for query): 198.32 MiB.
0 rows in set. Elapsed: 1.211 sec. Processed 5.04 million rows, 645.01 MB (4.16 million rows/s., 532.57 MB/s.)

Full table

-- GROUP BY
SELECT key, argMax(val_1, ts) as val_1, argMax(val_2, ts) as val_2, argMax(val_3, ts) as val_3, argMax(val_4, ts) as val_4, argMax(val_5, ts) as val_5, max(ts) FROM repl_tbl GROUP BY key FORMAT Null;
Peak memory usage (for query): 15.02 GiB.
0 rows in set. Elapsed: 19.164 sec. Processed 50.00 million rows, 6.40 GB (2.61 million rows/s., 334.02 MB/s.)

-- set optimize_aggregation_in_order=1;
Peak memory usage (for query): 4.44 GiB.
0 rows in set. Elapsed: 9.700 sec. Processed 21.03 million rows, 2.69 GB (2.17 million rows/s., 277.50 MB/s.)

-- ORDER BY LIMIT BY
SELECT * FROM repl_tbl ORDER BY ts DESC LIMIT 1 BY key FORMAT Null;
Peak memory usage (for query): 10.46 GiB.
0 rows in set. Elapsed: 21.264 sec. Processed 50.00 million rows, 6.40 GB (2.35 million rows/s., 301.03 MB/s.)

-- Subquery
SELECT * FROM repl_tbl WHERE (key, ts) IN (SELECT key, max(ts) FROM repl_tbl GROUP BY key) FORMAT Null;
Peak memory usage (for query): 2.52 GiB.
0 rows in set. Elapsed: 6.891 sec. Processed 50.00 million rows, 1.60 GB (7.26 million rows/s., 232.22 MB/s.)

-- set optimize_aggregation_in_order=1;
Peak memory usage (for query): 1.05 GiB.
0 rows in set. Elapsed: 4.427 sec. Processed 50.00 million rows, 1.60 GB (11.29 million rows/s., 361.49 MB/s.)

-- FINAL
SELECT * FROM repl_tbl FINAL FORMAT Null;
Peak memory usage (for query): 838.75 MiB.
0 rows in set. Elapsed: 6.681 sec. Processed 50.00 million rows, 6.40 GB (7.48 million rows/s., 958.18 MB/s.)

FINAL

Clickhouse merge parts only in scope of single partition, so if two rows with the same replacing key would land in different partitions, they would never be merged in single row. FINAL keyword works in other way, it merge all rows across all partitions. But that behavior can be changed viado_not_merge_across_partitions_select_final setting.

https://kb.altinity.com

FINAL clause speed

CREATE TABLE repl_tbl_part
(
    `key` UInt32,
    `value` UInt32,
    `part_key` UInt32
)
ENGINE = ReplacingMergeTree
PARTITION BY part_key
ORDER BY key;

INSERT INTO repl_tbl_part SELECT
    1 AS key,
    number AS value,
    number % 2 AS part_key
FROM numbers(4)
SETTINGS optimize_on_insert = 0;

SELECT * FROM repl_tbl_part;

┌─key─┬─value─┬─part_key─┐
   1      1         1 
   1      3         1 
└─────┴───────┴──────────┘
┌─key─┬─value─┬─part_key─┐
   1      0         0 
   1      2         0 
└─────┴───────┴──────────┘

SELECT * FROM repl_tbl_part FINAL;

┌─key─┬─value─┬─part_key─┐
   1      3         1 
└─────┴───────┴──────────┘

SELECT * FROM repl_tbl_part FINAL SETTINGS do_not_merge_across_partitions_select_final=1;

┌─key─┬─value─┬─part_key─┐
   1      3         1 
└─────┴───────┴──────────┘
┌─key─┬─value─┬─part_key─┐
   1      2         0 
└─────┴───────┴──────────┘

OPTIMIZE TABLE repl_tbl_part FINAL;

SELECT * FROM repl_tbl_part;

┌─key─┬─value─┬─part_key─┐
   1      3         1 
└─────┴───────┴──────────┘
┌─key─┬─value─┬─part_key─┐
   1      2         0 
└─────┴───────┴──────────┘

7.1 - ReplacingMergeTree does not collapse duplicates

ReplacingMergeTree does not collapse duplicates

Hi there, I have a question about replacing merge trees. I have set up a Materialized View with ReplacingMergeTree table, but even if I call optimize on it, the parts don’t get merged. I filled that table yesterday, nothing happened since then. What should I do?

Merges are eventual and may never happen. It depends on the number of inserts that happened after, the number of parts in the partition, size of parts. If the total size of input parts are greater than the maximum part size then they will never be merged.

https://clickhouse.tech/docs/en/operations/settings/merge-tree-settings/#max-bytes-to-merge-at-max-space-in-pool

https://clickhouse.tech/docs/en/engines/table-engines/mergetree-family/replacingmergetree/ ReplacingMergeTree is suitable for clearing out duplicate data in the background in order to save space, but it doesn’t guarantee the absence of duplicates.

8 - Skip index

Skip index
--(1) create test table
drop table if exists test;
create table test
(
    version UInt32
    ,id UInt32
    ,state UInt8
    ,INDEX state_idx (state) type set(0) GRANULARITY 1
) ENGINE ReplacingMergeTree(version)
      ORDER BY (id);

--(2) insert sample data
INSERT INTO test (version, id, state) VALUES (1,1,1);
INSERT INTO test (version, id, state) VALUES (2,1,0);
INSERT INTO test (version, id, state) VALUES (3,1,1);

--(3) check the result:
-- expected 3, 1, 1
select version, id, state from test final;
┌─version─┬─id─┬─state─┐
       3   1      1 
└─────────┴────┴───────┘

-- expected empty result
select version, id, state from test final where state=0;
┌─version─┬─id─┬─state─┐
       2   1      0 
└─────────┴────┴───────┘

9 - SummingMergeTree

SummingMergeTree

Nested structures

In certain conditions it could make sense to collapse one of dimensions to set of arrays. It’s usually profitable to do if this dimension is not commonly used in queries. It would reduce amount of rows in aggregated table and speed up queries which doesn’t care about this dimension in exchange of aggregation performance by collapsed dimension.

CREATE TABLE traffic
(
    `key1` UInt32,
    `key2` UInt32,
    `port` UInt16,
    `bits_in` UInt32 CODEC (T64,LZ4),
    `bits_out` UInt32 CODEC (T64,LZ4),
    `packets_in` UInt32 CODEC (T64,LZ4),
    `packets_out` UInt32 CODEC (T64,LZ4)
)
ENGINE = SummingMergeTree
ORDER BY (key1, key2, port);

INSERT INTO traffic SELECT
    number % 1000,
    intDiv(number, 10000),
    rand() % 20,
    rand() % 753,
    rand64() % 800,
    rand() % 140,
    rand64() % 231
FROM numbers(100000000);

CREATE TABLE default.traffic_map
(
    `key1` UInt32,
    `key2` UInt32,
    `bits_in` UInt32 CODEC(T64, LZ4),
    `bits_out` UInt32 CODEC(T64, LZ4),
    `packets_in` UInt32 CODEC(T64, LZ4),
    `packets_out` UInt32 CODEC(T64, LZ4),
    `portMap.port` Array(UInt16),
    `portMap.bits_in` Array(UInt32) CODEC(T64, LZ4),
    `portMap.bits_out` Array(UInt32) CODEC(T64, LZ4),
    `portMap.packets_in` Array(UInt32) CODEC(T64, LZ4),
    `portMap.packets_out` Array(UInt32) CODEC(T64, LZ4)
)
ENGINE = SummingMergeTree
ORDER BY (key1, key2);

INSERT INTO traffic_map WITH rand() % 20 AS port
SELECT
    number % 1000 AS key1,
    intDiv(number, 10000) AS key2,
    rand() % 753 AS bits_in,
    rand64() % 800 AS bits_out,
    rand() % 140 AS packets_in,
    rand64() % 231 AS packets_out,
    [port],
    [bits_in],
    [bits_out],
    [packets_in],
    [packets_out]
FROM numbers(100000000);

┌─table───────┬─column──────────────┬─────rows─┬─compressed─┬─uncompressed─┬──ratio─┐
 traffic      bits_out             80252317  109.09 MiB  306.14 MiB      2.81 
 traffic      bits_in              80252317  108.34 MiB  306.14 MiB      2.83 
 traffic      port                 80252317  99.21 MiB   153.07 MiB      1.54 
 traffic      packets_out          80252317  91.36 MiB   306.14 MiB      3.35 
 traffic      packets_in           80252317  84.61 MiB   306.14 MiB      3.62 
 traffic      key2                 80252317  47.88 MiB   306.14 MiB      6.39 
 traffic      key1                 80252317  1.38 MiB    306.14 MiB    221.42 
 traffic_map  portMap.bits_out     10000000  108.96 MiB  306.13 MiB      2.81 
 traffic_map  portMap.bits_in      10000000  108.32 MiB  306.13 MiB      2.83 
 traffic_map  portMap.port         10000000  92.00 MiB   229.36 MiB      2.49 
 traffic_map  portMap.packets_out  10000000  90.95 MiB   306.13 MiB      3.37 
 traffic_map  portMap.packets_in   10000000  84.19 MiB   306.13 MiB      3.64 
 traffic_map  key2                 10000000  23.46 MiB   38.15 MiB       1.63 
 traffic_map  bits_in              10000000  15.59 MiB   38.15 MiB       2.45 
 traffic_map  bits_out             10000000  15.59 MiB   38.15 MiB       2.45 
 traffic_map  packets_out          10000000  13.22 MiB   38.15 MiB       2.89 
 traffic_map  packets_in           10000000  12.62 MiB   38.15 MiB       3.02 
 traffic_map  key1                 10000000  180.29 KiB  38.15 MiB     216.66 
└─────────────┴─────────────────────┴──────────┴────────────┴──────────────┴────────┘

-- Queries

SELECT
    key1,
    sum(packets_in),
    sum(bits_out)
FROM traffic
GROUP BY key1
FORMAT `Null`

0 rows in set. Elapsed: 0.488 sec. Processed 80.25 million rows, 963.03 MB (164.31 million rows/s., 1.97 GB/s.)

SELECT
    key1,
    sum(packets_in),
    sum(bits_out)
FROM traffic_map
GROUP BY key1
FORMAT `Null`

0 rows in set. Elapsed: 0.063 sec. Processed 10.00 million rows, 120.00 MB (159.43 million rows/s., 1.91 GB/s.)


SELECT
    key1,
    port,
    sum(packets_in),
    sum(bits_out)
FROM traffic
GROUP BY
    key1,
    port
FORMAT `Null`

0 rows in set. Elapsed: 0.668 sec. Processed 80.25 million rows, 1.12 GB (120.14 million rows/s., 1.68 GB/s.)

WITH arrayJoin(arrayZip(untuple(sumMap(portMap.port, portMap.packets_in, portMap.bits_out)))) AS tpl
SELECT
    key1,
    tpl.1 AS port,
    tpl.2 AS packets_in,
    tpl.3 AS bits_out
FROM traffic_map
GROUP BY key1
FORMAT `Null`

0 rows in set. Elapsed: 0.915 sec. Processed 10.00 million rows, 1.08 GB (10.93 million rows/s., 1.18 GB/s.)