This the multi-page printable view of this section. Click here to print.

Return to the regular view of this page.

Useful queries

Access useful ClickHouse queries, from finding database size, missing blocks, checking table metadata in Zookeeper, and more.

1 - Check table metadata in zookeeper

Check table metadata in zookeeper.

Compare table metadata of different replicas in zookeeper

Metadata on replica is not up to date with common metadata in Zookeeper

SELECT *, if( neighbor(name, -1) == name and name != 'is_active', neighbor(value, -1) == value , 1) as looks_good
FROM (
SELECT
    name,
    path,
    ctime,
    mtime,
    value
FROM system.zookeeper
WHERE (path IN (
    SELECT arrayJoin(groupUniqArray(if(path LIKE '%/replicas', concat(path, '/', name), path)))
    FROM system.zookeeper
    WHERE path IN (
        SELECT arrayJoin([zookeeper_path, concat(zookeeper_path, '/replicas')])
        FROM system.replicas
        WHERE table = 'test_repl'
    )
)) AND (name IN ('metadata', 'columns', 'is_active'))
ORDER BY
    name = 'is_active',
    name ASC,
    path ASC
)

vs.

SELECT metadata_modification_time, create_table_query FROM system.tables WHERE name = 'test_repl'

2 - Debug hunging thing

Debug hunging / freezing things

Debug hunging / freezing things

If ClickHouse is busy with something and you don’t know what’s happeing, you can easily check the stacktraces of all the thread which are working

SELECT
 arrayStringConcat(arrayMap(x -> demangle(addressToSymbol(x)), trace), '\n') AS trace_functions,
 count()
FROM system.stack_trace
GROUP BY trace_functions
ORDER BY count()
DESC
SETTINGS allow_introspection_functions=1
FORMAT Vertical;

If you can’t start any queries, but you have access to the node, you can sent a singal

# older versions
for i in $(ls -1 /proc/$(pidof clickhouse-server)/task/); do kill -TSTP $i; done
# even older versions
for i in $(ls -1 /proc/$(pidof clickhouse-server)/task/); do kill -SIGPROF $i; done

3 - Handy queries for a system.query_log

Handy queries for a system.query_log.

The most cpu / write / read-intensive queries from query_log

SELECT
    normalized_query_hash,
    any(query),
    count(),
    sum(query_duration_ms) / 1000 AS QueriesDuration,
    sum(ProfileEvents.Values[indexOf(ProfileEvents.Names, 'RealTimeMicroseconds')]) / 1000000 AS RealTime,
    sum(ProfileEvents.Values[indexOf(ProfileEvents.Names, 'UserTimeMicroseconds')]) / 1000000 AS UserTime,
    sum(ProfileEvents.Values[indexOf(ProfileEvents.Names, 'SystemTimeMicroseconds')]) / 1000000 AS SystemTime,
    sum(ProfileEvents.Values[indexOf(ProfileEvents.Names, 'DiskReadElapsedMicroseconds')]) / 1000000 AS DiskReadTime,
    sum(ProfileEvents.Values[indexOf(ProfileEvents.Names, 'DiskWriteElapsedMicroseconds')]) / 1000000 AS DiskWriteTime,
    sum(ProfileEvents.Values[indexOf(ProfileEvents.Names, 'NetworkSendElapsedMicroseconds')]) / 1000000 AS NetworkSendTime,
    sum(ProfileEvents.Values[indexOf(ProfileEvents.Names, 'NetworkReceiveElapsedMicroseconds')]) / 1000000 AS NetworkReceiveTime,
    sum(ProfileEvents.Values[indexOf(ProfileEvents.Names, 'ZooKeeperWaitMicroseconds')]) / 1000000 AS ZooKeeperWaitTime,
    sum(ProfileEvents.Values[indexOf(ProfileEvents.Names, 'OSIOWaitMicroseconds')]) / 1000000 AS OSIOWaitTime,
    sum(ProfileEvents.Values[indexOf(ProfileEvents.Names, 'OSCPUWaitMicroseconds')]) / 1000000 AS OSCPUWaitTime,
    sum(ProfileEvents.Values[indexOf(ProfileEvents.Names, 'OSCPUVirtualTimeMicroseconds')]) / 1000000 AS OSCPUVirtualTime,
    sum(read_rows) AS ReadRows,
    formatReadableSize(sum(read_bytes)) AS ReadBytes,
    sum(written_rows) AS WrittenTows,
    formatReadableSize(sum(written_bytes)) AS WrittenBytes,
    sum(result_rows) AS ResultRows,
    formatReadableSize(sum(result_bytes)) AS ResultBytes
FROM system.query_log
WHERE (event_time > (now() - 3600)) AND type in (2,4) -- QueryFinish, ExceptionWhileProcessing
GROUP BY normalized_query_hash
    WITH TOTALS
ORDER BY UserTime DESC
LIMIT 30
FORMAT Vertical

Find queries which were started but not finished at some moment in time

SELECT
  query_id,
  min(event_time) t,
  any(query)
FROM system.query_log
where event_date = today() and event_time > '2021-11-25 02:29:12'
GROUP BY query_id
HAVING countIf(type='QueryFinish') = 0 OR sum(query_duration_ms) > 100000
order by t;

select
     query_id,
     any(query)
from system.query_log
where event_time between '2021-09-24 07:00:00' and '2021-09-24 09:00:00'
group by query_id HAVING countIf(type=1) <> countIf(type!=1)

4 - Ingestion metrics from system.part_log

Query to gather information about ingestion rate from system.part_log.
-- Insert rate
select database, table, time_bucket,
       max(number_of_parts_per_insert) max_parts_pi,
       median(number_of_parts_per_insert) median_parts_pi,
       min(min_rows_per_part) min_rows_pp, 
       max(max_rows_per_part) max_rows_pp, 
       median(median_rows_per_part) median_rows_pp,
       min(rows_per_insert) min_rows_pi, 
       median(rows_per_insert) median_rows_pi, 
       max(rows_per_insert) max_rows_pi, 
       sum(rows_per_insert) rows_inserted,
       sum(seconds_per_insert) parts_creation_seconds,
       count() inserts,
       sum(number_of_parts_per_insert) new_parts,
       max(last_part_pi) - min(first_part_pi) as insert_period,
       inserts*60/insert_period as inserts_per_minute
from
(SELECT 
	database, 
	table,
	toStartOfDay(event_time) AS time_bucket, 
	count() AS number_of_parts_per_insert,
	min(rows) AS min_rows_per_part,
	max(rows) AS max_rows_per_part, 
	median(rows) AS median_rows_per_part,
	sum(rows) AS rows_per_insert, 
	min(size_in_bytes) AS min_bytes_per_part, 
	max(size_in_bytes) AS max_bytes_per_part,
	median(size_in_bytes) AS median_bytes_per_part, 
	sum(size_in_bytes) AS bytes_per_insert, 
	median_bytes_per_part / median_rows_per_part AS avg_row_size,
	sum(duration_ms)/1000 as seconds_per_insert,
	max(event_time) as last_part_pi,  min(event_time) as first_part_pi
FROM 
	system.part_log
WHERE 
  -- Enum8('NewPart' = 1, 'MergeParts' = 2, 'DownloadPart' = 3, 'RemovePart' = 4, 'MutatePart' = 5, 'MovePart' = 6)
	event_type = 1 
	AND 
  -- change if another time period is desired
	event_date >= today()
GROUP BY query_id, database, table, time_bucket
)
GROUP BY database, table, time_bucket
ORDER BY time_bucket, database, table ASC

-- New parts per partition 
select database, table, event_type, partition_id, count() c, round(avg(rows)) 
from system.part_log where event_date >= today() and event_type = 'NewPart'
group by database, table, event_type, partition_id
order by c desc

5 - Can detached parts be dropped?

Can detached parts be dropped?

Here is what different statuses mean:

  1. Parts are renamed to ‘ignored’ if they were found during ATTACH together with other, bigger parts that cover the same blocks of data, i.e. they were already merged into something else.
  2. parts are renamed to ‘broken’ if ClickHouse was not able to load data from the parts. There could be different reasons: some files are lost, checksums are not correct, etc.
  3. parts are renamed to ‘unexpected’ if they are present locally, but are not found in ZooKeeper, in case when an insert was not completed properly. The part is detached only if it’s old enough (5 minutes), otherwise CH registers this part in ZooKeeper as a new part.
  4. parts are renamed to ‘cloned’ if ClickHouse have had some parts on local disk while repairing lost replica so already existed parts being renamed and put in detached directory. Controlled by setting detach_old_local_parts_when_cloning_replica.

‘Ignored’ parts are safe to delete. ‘Unexpected’ and ‘broken’ should be investigated, but it might not be an easy thing to do, especially for older parts. If the system.part_log table is enabled you can find some information there. Otherwise you will need to look in clickhouse-server.log for what happened when the parts were detached. If there is another way you could confirm that there is no data loss in the affected tables, you could simply delete all detached parts.

Here is a query that can help with investigations. It looks for active parts containing the same data blocks that the detached parts:

SELECT *,
       concat('alter table ',database,'.',table,' drop detached part ''',a.name,''' settings allow_drop_detached=1;') as drop
FROM system.detached_parts a
ALL LEFT JOIN
(SELECT database, table, partition_id, name, active, min_block_number, max_block_number
   FROM system.parts WHERE active
) b
USING (database, table, partition_id)
WHERE a.min_block_number >= b.min_block_number
  AND a.max_block_number <= b.max_block_number

6 - Database Size - Table - Column size

Database Size - Table - Column size

Tables

Table size

SELECT
    database,
    table,
    formatReadableSize(sum(data_compressed_bytes) AS size) AS compressed,
    formatReadableSize(sum(data_uncompressed_bytes) AS usize) AS uncompressed,
    round(usize / size, 2) AS compr_rate,
    sum(rows) AS rows,
    count() AS part_count
FROM system.parts
WHERE (active = 1) AND (database LIKE '%') AND (table LIKE '%')
GROUP BY
    database,
    table
ORDER BY size DESC;

Table size + inner MatView (Atomic)

SELECT
      p.database,
      if(t.name = '', p.table, p.table||' ('||t.name||')') tbl,
      formatReadableSize(sum(p.data_compressed_bytes) AS size) AS compressed,
      formatReadableSize(sum(p.data_uncompressed_bytes) AS usize) AS uncompressed,
      round(usize / size, 2) AS compr_rate,
      sum(p.rows) AS rows,
      count() AS part_count
FROM system.parts p left join system.tables t on p.database = t.database and p.table = '.inner_id.'||toString(t.uuid)
WHERE (active = 1) AND (tbl LIKE '%') AND (database LIKE '%')
GROUP BY
    p.database,
    tbl
ORDER BY size DESC;

Column size

SELECT
    database,
    table,
    column,
    formatReadableSize(sum(column_data_compressed_bytes) AS size) AS compressed,
    formatReadableSize(sum(column_data_uncompressed_bytes) AS usize) AS uncompressed,
    round(usize / size, 2) AS compr_ratio,
    sum(rows) rows_cnt,
    round(usize / rows_cnt, 2) avg_row_size
FROM system.parts_columns
WHERE (active = 1) AND (database LIKE '%') AND (table LIKE '%')
GROUP BY
    database,
    table,
    column
ORDER BY size DESC;

Projections

Projection size

SELECT
    database,
    table,
    name,
    formatReadableSize(sum(data_compressed_bytes) AS size) AS compressed,
    formatReadableSize(sum(data_uncompressed_bytes) AS usize) AS uncompressed,
    round(usize / size, 2) AS compr_rate,
    sum(rows) AS rows,
    count() AS part_count
FROM system.projection_parts
WHERE (table = 'ptest') AND active
GROUP BY
    database,
    table,
    name
ORDER BY size DESC;

Projection column size

SELECT
    database,
    table,
    column,
    formatReadableSize(sum(column_data_compressed_bytes) AS size) AS compressed,
    formatReadableSize(sum(column_data_uncompressed_bytes) AS usize) AS uncompressed,
    round(usize / size, 2) AS compr_rate
FROM system.projection_parts_columns
WHERE (active = 1) AND (table LIKE 'ptest')
GROUP BY
    database,
    table,
    column
ORDER BY size DESC;

Understanding the columns data properties:

SELECT
   count(),
   * APPLY (uniq),
   * APPLY (max),
   * APPLY (min),
   * APPLY(topK(5))
FROM table_name 
FORMAT Vertical;

-- also you can add * APPLY (entropy) to show entropy (i.e. 'randomness' of the column).
-- if the table is huge add some WHERE condition to slice some 'representative' data range, for example single month / week / day of data.

Understanding the ingest pattern:

SELECT
    database,
    table,
    median(rows),
    median(bytes_on_disk),
    sum(rows),
    max(bytes_on_disk),
    min(bytes_on_disk),
    round(quantile(0.95)(bytes_on_disk), 0),
    sum(bytes_on_disk),
    count(),
    countIf(NOT active),
    uniqExact(partition)
FROM system.parts
WHERE (modification_time > (now() - 480)) AND (level = 0)
GROUP BY
    database,
    table
ORDER BY count() DESC

part_log

WITH 30 * 60 AS frame_size
SELECT
    toStartOfInterval(event_time, toIntervalSecond(frame_size)) AS m,
    database,
    table,
    ROUND(countIf(event_type = 'NewPart') / frame_size, 2) AS new,
    ROUND(countIf(event_type = 'MergeParts') / frame_size, 2) AS merge,
    ROUND(countIf(event_type = 'DownloadPart') / frame_size, 2) AS dl,
    ROUND(countIf(event_type = 'RemovePart') / frame_size, 2) AS rm,
    ROUND(countIf(event_type = 'MutatePart') / frame_size, 2) AS mut,
    ROUND(countIf(event_type = 'MovePart') / frame_size, 2) AS mv
FROM system.part_log
WHERE event_time > (now() - toIntervalHour(24))
GROUP BY
    m,
    database,
    table
ORDER BY
    database ASC,
    table ASC,
    m ASC
    
WITH 30 * 60 AS frame_size
SELECT
    toStartOfInterval(event_time, toIntervalSecond(frame_size)) AS m,
    database,
    table,
    ROUND(countIf(event_type = 'NewPart') / frame_size, 2) AS inserts_per_sec,
    ROUND(sumIf(rows, event_type = 'NewPart') / frame_size, 2) AS rows_per_sec,
    ROUND(sumIf(size_in_bytes, event_type = 'NewPart') / frame_size, 2) AS bytes_per_sec
FROM system.part_log
WHERE event_time > (now() - toIntervalHour(24))
GROUP BY
    m,
    database,
    table
ORDER BY
    database ASC,
    table ASC,
    m ASC

Understanding the partitioning

SELECT
    database,
    table,
    count(),
    topK(5)(partition),
    COLUMNS('metric.*') APPLY(quantiles(0.005, 0.05, 0.10, 0.25, 0.5, 0.75, 0.9, 0.95, 0.995))
FROM
(
    SELECT
        database,
        table,
        partition,
        sum(bytes_on_disk) AS metric_bytes,
        sum(data_uncompressed_bytes) AS metric_uncompressed_bytes,
        sum(rows) AS metric_rows,
        sum(primary_key_bytes_in_memory) AS metric_pk_size,
        count() AS metric_count,
        countIf(part_type = 'Wide') AS metric_wide_count,
        countIf(part_type = 'Compact') AS metric_compact_count,
        countIf(part_type = 'Memory') AS metric_memory_count
    FROM system.parts
    GROUP BY
        database,
        table,
        partition
)
GROUP BY
    database,
    table
FORMAT Vertical

7 - Datasets

Datasets

8 - Number of active parts in a partition

Number of active parts in a partition

Q: Why do I have several active parts in a partition? Why Clickhouse does not merge them immediately?

A: CH does not merge parts by time

Merge scheduler selects parts by own algorithm based on the current node workload / number of parts / size of parts.

CH merge scheduler balances between a big number of parts and a wasting resources on merges.

Merges are CPU/DISK IO expensive. If CH will merge every new part then all resources will be spend on merges and will no resources remain on queries (selects ).

CH will not merge parts with a combined size greater than 100 GB.

SELECT
    database,
    table,
    partition,
    sum(rows) AS rows,
    count() AS part_count
FROM system.parts
WHERE (active = 1) AND (table LIKE '%') AND (database LIKE '%')
GROUP BY
    database,
    table,
    partition
ORDER BY part_count DESC
limit 20

9 - Parts consistency

Check if there are blocks missing

SELECT
    database,
    table,
    partition_id,
    ranges.1 AS previous_part,
    ranges.2 AS next_part,
    ranges.3 AS previous_block_number,
    ranges.4 AS next_block_number,
    range(toUInt64(previous_block_number + 1), toUInt64(next_block_number)) AS missing_block_numbers
FROM
(
    WITH
        arrayPopFront(groupArray(min_block_number) AS min) AS min_adj,
        arrayPopBack(groupArray(max_block_number) AS max) AS max_adj,
        arrayFilter((x, y, z) -> (y != (z + 1)), arrayZip(arrayPopBack(groupArray(name) AS name_arr), arrayPopFront(name_arr), max_adj, min_adj), min_adj, max_adj) AS missing_ranges
    SELECT
        database,
        table,
        partition_id,
        missing_ranges
    FROM
    (
        SELECT *
        FROM system.parts
        WHERE active AND (table = 'query_thread_log') AND (partition_id = '202108') AND active
        ORDER BY min_block_number ASC
    )
    GROUP BY
        database,
        table,
        partition_id
)
ARRAY JOIN missing_ranges AS ranges

┌─database─┬─table────────────┬─partition_id─┬─previous_part───────┬─next_part──────────┬─previous_block_number─┬─next_block_number─┬─missing_block_numbers─┐
 system    query_thread_log  202108        202108_864_1637_556  202108_1639_1639_0                   1637               1639  [1638]                
└──────────┴──────────────────┴──────────────┴─────────────────────┴────────────────────┴───────────────────────┴───────────────────┴───────────────────────┘

Find the number of blocks in a table

SELECT
    database,
    table,
    partition_id,
    sum(max_block_number - min_block_number) AS blocks_count
FROM system.parts
WHERE active AND (table = 'query_thread_log') AND (partition_id = '202108') AND active
GROUP BY
    database,
    table,
    partition_id

┌─database─┬─table────────────┬─partition_id─┬─blocks_count─┐
 system    query_thread_log  202108                1635 
└──────────┴──────────────────┴──────────────┴──────────────┘

Compare the list of parts in ZooKeeper with the list of parts on disk

select zoo.p_path as part_zoo, zoo.ctime, zoo.mtime, disk.p_path as part_disk
from
(
  select concat(path,'/',name) as p_path, ctime, mtime
  from system.zookeeper where path in (select concat(replica_path,'/parts') from system.replicas)
) zoo
left join 
(
  select concat(replica_path,'/parts/',name) as p_path
  from system.parts inner join system.replicas using (database, table)
) disk on zoo.p_path = disk.p_path
where part_disk=''
order by part_zoo;

You can clean that orphan zk records (need to execute using delete in zkCli, rm in zk-shell):

select 'delete '||part_zoo
from (
select zoo.p_path as part_zoo, zoo.ctime, zoo.mtime, disk.p_path as part_disk
from
(
  select concat(path,'/',name) as p_path, ctime, mtime
  from system.zookeeper where path in (select concat(replica_path,'/parts') from system.replicas)
) zoo
left join 
(
  select concat(replica_path,'/parts/',name) as p_path
  from system.parts inner join system.replicas using (database, table)
) disk on zoo.p_path = disk.p_path
where part_disk='' and zoo.mtime <= now() - interval 1 day
order by part_zoo) format TSVRaw;