This is the multi-page printable view of this section. Click here to print.

Return to the regular view of this page.

Kafka

Kafka
git log -- contrib/librdkafka | git name-rev --stdin
ClickHouse versionlibrdkafka version
21.10+ (#27883)1.6.1 + snappy fixes + boring ssl + illumos_build fixes + edenhill#3279 fix
21.6+ (#23874)1.6.1 + snappy fixes + boring ssl + illumos_build fixes
21.1+ (#18671)1.6.0-RC3 + snappy fixes + boring ssl
20.13+ (#18053)1.5.0 + msan fixes + snappy fixes + boring ssl
20.7+ (#12991)1.5.0 + msan fixes
20.5+ (#11256)1.4.2
20.2+ (#9000)1.3.0
19.11+ (#5872)1.1.0
19.5+ (#4799)1.0.0
19.1+ (#4025)1.0.0-RC5
v1.1.54382+ (#2276)0.11.4

1 - Inferring Schema from AvroConfluent Messages in Kafka for ClickHouse

Learn how to define Kafka table structures in ClickHouse by using Avro’s schema registry & sample message.

Inferring Schema from AvroConfluent Messages in Kafka for ClickHouse

To consume messages from Kafka within ClickHouse, you need to define the ENGINE=Kafka table structure with all the column names and types. This task can be particularly challenging when dealing with complex Avro messages, as manually determining the exact schema for ClickHouse is both tricky and time-consuming. This complexity is particularly frustrating in the case of Avro formats, where the column names and their types are already clearly defined in the schema registry.

Although ClickHouse supports schema inference for files, it does not natively support this for Kafka streams.

Here’s a workaround to infer the schema using AvroConfluent messages:

Step 1: Capture and Store a Raw Kafka Message

First, create a table in ClickHouse to consume a raw message from Kafka and store it as a file:

CREATE TABLE test_kafka (raw String) ENGINE = Kafka 
SETTINGS kafka_broker_list = 'localhost:29092', 
         kafka_topic_list = 'movies-raw', 
         kafka_format = 'RawBLOB', -- Don't try to parse the message, return it 'as is'
         kafka_group_name = 'tmp_test'; -- Using some dummy consumer group here.

INSERT INTO FUNCTION file('./avro_raw_sample.avro', 'RawBLOB') 
SELECT * FROM test_kafka LIMIT 1 
SETTINGS max_block_size=1, stream_like_engine_allow_direct_select=1;

DROP TABLE test_kafka;

Step 2: Infer Schema Using the Stored File

Using the stored raw message, let ClickHouse infer the schema based on the AvroConfluent format and a specified schema registry URL:

CREATE TEMPORARY TABLE test AS 
SELECT * FROM file('./avro_raw_sample.avro', 'AvroConfluent') 
SETTINGS format_avro_schema_registry_url='http://localhost:8085';

SHOW CREATE TEMPORARY TABLE test\G;

The output from the SHOW CREATE command will display the inferred schema, for example:

Row 1:
──────
statement: CREATE TEMPORARY TABLE test
(
    `movie_id` Int64,
    `title` String,
    `release_year` Int64
)
ENGINE = Memory

Step 3: Create the Kafka Table with the Inferred Schema

Now, use the inferred schema to create the Kafka table:

CREATE TABLE movies_kafka
(
    `movie_id` Int64,
    `title` String,
    `release_year` Int64
)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'localhost:29092',
         kafka_topic_list = 'movies-raw',
         kafka_format = 'AvroConfluent',
         kafka_group_name = 'movies',
         kafka_schema_registry_url = 'http://localhost:8085';

This approach reduces manual schema definition efforts and enhances data integration workflows by utilizing the schema inference capabilities of ClickHouse for AvroConfluent messages.

Appendix

Avro is a binary serialization format used within Apache Kafka for efficiently serializing data with a compact binary format. It relies on schemas, which define the structure of the serialized data, to ensure robust data compatibility and type safety.

Schema Registry is a service that provides a centralized repository for Avro schemas. It helps manage and enforce schemas across applications, ensuring that the data exchanged between producers and consumers adheres to a predefined format, and facilitates schema evolution in a safe manner.

In ClickHouse, the Avro format is used for data that contains the schema embedded directly within the file or message. This means the structure of the data is defined and included with the data itself, allowing for self-describing messages. However, embedding the schema within every message is not optimal for streaming large volumes of data, as it increases the workload and network overhead. Repeatedly passing the same schema with each message can be inefficient, particularly in high-throughput environments.

On the other hand, the AvroConfluent format in ClickHouse is specifically designed to work with the Confluent Schema Registry. This format expects the schema to be managed externally in a schema registry rather than being embedded within each message. It retrieves schema information from the Schema Registry, which allows for centralized schema management and versioning, facilitating easier schema evolution and enforcement across different applications using Kafka.

2 - Setting the background message broker schedule pool size

Guide to managing the background_message_broker_schedule_pool_size setting for Kafka, RabbitMQ, and NATS table engines in your database.

Setting the background message broker schedule pool size

When using Kafka, RabbitMQ, or NATS table engines, you might encounter issues caused by the oversaturation of the thread pool responsible for background jobs. Monitoring and adjusting the background_message_broker_schedule_pool_size setting can help alleviate these problems.

Identifying Thread Pool Saturation

To check the status of your thread pool, run the following SQL query:

SELECT
    (
        SELECT value
        FROM system.metrics
        WHERE metric = 'BackgroundMessageBrokerSchedulePoolTask'
    ) AS tasks,
    (
        SELECT value
        FROM system.metrics
        WHERE metric = 'BackgroundMessageBrokerSchedulePoolSize'
    ) AS pool_size,
    pool_size - tasks AS free_threads

If you have metric_log enabled, you can use this query to monitor the minimum number of free threads available throughout the day:

SELECT min(CurrentMetric_BackgroundMessageBrokerSchedulePoolSize - CurrentMetric_BackgroundMessageBrokerSchedulePoolTask) AS min_free_threads
FROM system.metric_log
WHERE event_date = today()

Interpreting Results

If the number of free threads is zero or very close to zero, you might experience issues with your Kafka, RabbitMQ, or NATS engines. In such cases, you should increase the background_message_broker_schedule_pool_size setting.

Adjusting the Thread Pool Size

To fix the problem, increase the background_message_broker_schedule_pool_size setting in your config.xml. For older ClickHouse versions, you may need to adjust this setting in both the default profile in users.xml and config.xml.

Estimating the Required Pool Size

To estimate the appropriate value for background_message_broker_schedule_pool_size, use the following query:

WITH
    toUInt32OrDefault(extract(engine_full, 'kafka_num_consumers\s*=\s*(\d+)')) as kafka_num_consumers,
    extract(engine_full, 'kafka_thread_per_consumer\s*=\s*(\d+|\'true\')') not in ('', '0') as kafka_thread_per_consumer,
    multiIf(
        engine = 'Kafka',  
        if(kafka_thread_per_consumer AND kafka_num_consumers > 0, kafka_num_consumers, 1),
        engine = 'RabbitMQ',
        3,
        engine = 'NATS',
        3,
        0 /* should not happen */
    ) as threads_needed
SELECT 
   ceil(sum(threads_needed) * 1.25)
FROM 
    system.tables
WHERE 
    engine in ('Kafka', 'RabbitMQ', 'NATS')
;

This query helps you determine the necessary pool size based on the number of consumers and threads per consumer for Kafka, and a fixed number for RabbitMQ and NATS.

By following these guidelines, you can ensure your background message broker thread pool is appropriately sized, preventing performance issues and maintaining the efficiency of your Kafka, RabbitMQ, or NATS engines.

3 - Adjusting librdkafka settings

Adjusting librdkafka settings

Some random example:

<yandex>
    <kafka>
        <max_poll_interval_ms>60000</max_poll_interval_ms>
        <session_timeout_ms>60000</session_timeout_ms>
        <heartbeat_interval_ms>10000</heartbeat_interval_ms>
        <reconnect_backoff_ms>5000</reconnect_backoff_ms>
        <reconnect_backoff_max_ms>60000</reconnect_backoff_max_ms>
        <request_timeout_ms>20000</request_timeout_ms>
        <retry_backoff_ms>500</retry_backoff_ms>
        <message_max_bytes>20971520</message_max_bytes>
        <debug>all</debug><!-- only to get the errors -->
        <security_protocol>SSL</security_protocol>
        <ssl_ca_location>/etc/clickhouse-server/ssl/kafka-ca-qa.crt</ssl_ca_location>
        <ssl_certificate_location>/etc/clickhouse-server/ssl/client_clickhouse_client.pem</ssl_certificate_location>
        <ssl_key_location>/etc/clickhouse-server/ssl/client_clickhouse_client.key</ssl_key_location>
        <ssl_key_password>pass</ssl_key_password>
    </kafka>
</yandex>

Authentication / connectivity

Sometimes the consumer group needs to be explicitly allowed in the broker UI config.

Amazon MSK | SASL/SCRAM

<yandex>
  <kafka>
    <security_protocol>sasl_ssl</security_protocol>
    <!-- Depending on your broker config you may need to uncomment below sasl_mechanism -->
    <!-- <sasl_mechanism>SCRAM-SHA-512</sasl_mechanism> -->
    <sasl_username>root</sasl_username>
    <sasl_password>toor</sasl_password>
  </kafka>
</yandex>

https://leftjoin.ru/all/clickhouse-as-a-consumer-to-amazon-msk/

on-prem / self-hosted Kafka broker

<yandex>
  <kafka>
    <security_protocol>sasl_ssl</security_protocol>
    <sasl_mechanism>SCRAM-SHA-512</sasl_mechanism>
    <sasl_username>root</sasl_username>
    <sasl_password>toor</sasl_password>
    <!-- fullchain cert here -->
    <ssl_ca_location>/path/to/cert/fullchain.pem</ssl_ca_location>   
  </kafka>
</yandex>

Inline Kafka certs

To connect to some Kafka cloud services you may need to use certificates.

If needed they can be converted to pem format and inlined into ClickHouse config.xml Example:

<kafka>
<ssl_key_pem><![CDATA[
  RSA Private-Key: (3072 bit, 2 primes)
    ....
-----BEGIN RSA PRIVATE KEY-----
...
-----END RSA PRIVATE KEY-----
]]></ssl_key_pem>
<ssl_certificate_pem><![CDATA[
-----BEGIN CERTIFICATE-----
...
-----END CERTIFICATE-----
]]></ssl_certificate_pem>
</kafka>

See xml

https://help.aiven.io/en/articles/489572-getting-started-with-aiven-kafka

https://stackoverflow.com/questions/991758/how-to-get-pem-file-from-key-and-crt-files

Azure Event Hub

See https://github.com/ClickHouse/ClickHouse/issues/12609

Kerberos

  <!-- Kerberos-aware Kafka -->
  <kafka>
    <security_protocol>SASL_PLAINTEXT</security_protocol>
    <sasl_kerberos_keytab>/home/kafkauser/kafkauser.keytab</sasl_kerberos_keytab>
    <sasl_kerberos_principal>kafkauser/kafkahost@EXAMPLE.COM</sasl_kerberos_principal>
  </kafka>

Confluent Cloud

<yandex>
  <kafka>
    <auto_offset_reset>smallest</auto_offset_reset>
    <security_protocol>SASL_SSL</security_protocol>
    <!-- older broker versions may need this below, for newer versions ignore -->
    <!-- <ssl_endpoint_identification_algorithm>https</ssl_endpoint_identification_algorithm> -->
    <sasl_mechanism>PLAIN</sasl_mechanism>
    <sasl_username>username</sasl_username>
    <sasl_password>password</sasl_password>
    <!-- Same as above here ignore if newer broker version -->
    <!-- <ssl_ca_location>probe</ssl_ca_location> -->
  </kafka>
</yandex>

https://docs.confluent.io/cloud/current/client-apps/config-client.html

How to test connection settings

Use kafkacat utility - it internally uses same library to access Kafla as clickhouse itself and allows easily to test different settings.

kafkacat -b my_broker:9092 -C -o -10 -t my_topic \
   -X security.protocol=SASL_SSL  \
   -X sasl.mechanisms=PLAIN \
   -X sasl.username=uerName \
   -X sasl.password=Password

Different configurations for different tables?

Is there some more documentation how to use this multiconfiguration for Kafka ?

The whole logic is here: https://github.com/ClickHouse/ClickHouse/blob/da4856a2be035260708fe2ba3ffb9e437d9b7fef/src/Storages/Kafka/StorageKafka.cpp#L466-L475

So it load the main config first, after that it load (with overwrites) the configs for all topics, listed in kafka_topic_list of the table.

Also since v21.12 it’s possible to use more straght-forward way using named_collections: https://github.com/ClickHouse/ClickHouse/pull/31691

So you can say something like

CREATE TABLE test.kafka (key UInt64, value UInt64) ENGINE = Kafka(kafka1, kafka_format='CSV');

And after that in configuration:

<clickhouse>
 <named_collections>
  <kafka1>
   <kafka_broker_list>kafka1:19092</kafka_broker_list>
   <kafka_topic_list>conf</kafka_topic_list>
   <kafka_group_name>conf</kafka_group_name>
  </kafka1>
 </named_collections>
</clickhouse>


<yandex>
    <named_collections>
        <kafka_preset1>
            <kafka_broker_list>...</kafka_broker_list>
            <kafka_topic_list>foo.bar</kafka_topic_list>
            <kafka_group_name>foo.bar.group</kafka_group_name>
            <kafka>
                <security_protocol>...</security_protocol>
                <sasl_mechanism>...</sasl_mechanism>
                <sasl_username>...</sasl_username>
                <sasl_password>...</sasl_password>
                <auto_offset_reset>smallest</auto_offset_reset>
                <ssl_endpoint_identification_algorithm>https</ssl_endpoint_identification_algorithm>
                <ssl_ca_location>probe</ssl_ca_location>
            </kafka>
        </kafka_preset1>
    </named_collections>
</yandex>

The same fragment of code in newer versions: https://github.com/ClickHouse/ClickHouse/blob/d19e24f530c30f002488bc136da78f5fb55aedab/src/Storages/Kafka/StorageKafka.cpp#L474-L496

4 - Error handling

Error handling

Pre 21.6

There are couple options:

Certain formats which has schema in built in them (like JSONEachRow) could silently skip any unexpected fields after enabling setting input_format_skip_unknown_fields

It’s also possible to skip up to N malformed messages for each block, with used setting kafka_skip_broken_messages but it’s also does not support all possible formats.

After 21.6

It’s possible to stream messages which could not be parsed, this behavior could be enabled via setting: kafka_handle_error_mode='stream' and clickhouse wil write error and message from Kafka itself to two new virtual columns: _error, _raw_message.

So you can create another Materialized View which would collect to a separate table all errors happening while parsing with all important information like offset and content of message.

CREATE TABLE default.kafka_engine
(
    `i` Int64,
    `s` String
)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka:9092'
kafka_topic_list = 'topic',
kafka_group_name = 'clickhouse',
kafka_format = 'JSONEachRow',
kafka_handle_error_mode='stream';

CREATE MATERIALIZED VIEW default.kafka_errors
(
    `topic` String,
    `partition` Int64,
    `offset` Int64,
    `raw` String,
    `error` String
)
ENGINE = MergeTree
ORDER BY (topic, partition, offset)
SETTINGS index_granularity = 8192 AS
SELECT
    _topic AS topic,
    _partition AS partition,
    _offset AS offset,
    _raw_message AS raw,
    _error AS error
FROM default.kafka_engine
WHERE length(_error) > 0

Table connections

https://github.com/ClickHouse/ClickHouse/pull/20249

https://github.com/ClickHouse/ClickHouse/pull/21850

https://altinity.com/blog/clickhouse-kafka-engine-faq

5 - Exactly once semantics

Exactly once semantics

EOS consumer (isolation.level=read_committed) is enabled by default since librdkafka 1.2.0, so for ClickHouse - since 20.2

See:

BUT: while EOS semantics will guarantee you that no duplicates will happen on the Kafka side (i.e. even if you produce the same messages few times it will be consumed once), but ClickHouse as a Kafka client can currently guarantee only at-least-once. And in some corner cases (connection lost etc) you can get duplicates.

We need to have something like transactions on ClickHouse side to be able to avoid that. Adding something like simple transactions is in plans for Y2022.

block-aggregator by eBay

Block Aggregator is a data loader that subscribes to Kafka topics, aggregates the Kafka messages into blocks that follow the Clickhouse’s table schemas, and then inserts the blocks into ClickHouse. Block Aggregator provides exactly-once delivery guarantee to load data from Kafka to ClickHouse. Block Aggregator utilizes Kafka’s metadata to keep track of blocks that are intended to send to ClickHouse, and later uses this metadata information to deterministically re-produce ClickHouse blocks for re-tries in case of failures. The identical blocks are guaranteed to be deduplicated by ClickHouse.

eBay/block-aggregator

6 - Kafka main parsing loop

Kafka main parsing loop

One of the threads from scheduled_pool (pre 20.9) / background_message_broker_schedule_pool (after 20.9) do that in infinite loop:

  1. Batch poll (time limit: kafka_poll_timeout_ms 500ms, messages limit: kafka_poll_max_batch_size 65536)
  2. Parse messages.
  3. If we don’t have enough data (rows limit: kafka_max_block_size 1048576) or time limit reached (kafka_flush_interval_ms 7500ms) - continue polling (goto p.1)
  4. Write a collected block of data to MV
  5. Do commit (commit after write = at-least-once).

On any error, during that process, Kafka client is restarted (leading to rebalancing - leave the group and get back in few seconds).

Kafka batching

Important settings

These usually should not be adjusted:

  • kafka_poll_max_batch_size = max_block_size (65536)
  • kafka_poll_timeout_ms = stream_poll_timeout_ms (500ms)

You may want to adjust those depending on your scenario:

  • kafka_flush_interval_ms = stream_poll_timeout_ms (7500ms)
  • kafka_max_block_size = max_insert_block_size / kafka_num_consumers (for the single consumer: 1048576)

See also

https://github.com/ClickHouse/ClickHouse/pull/11388

Disable at-least-once delivery

kafka_commit_every_batch = 1 will change the loop logic mentioned above. Consumed batch commited to the Kafka and the block of rows send to Materialized Views only after that. It could be resembled as at-most-once delivery mode as prevent duplicate creation but allow loss of data in case of failures.

7 - Kafka parallel consuming

Kafka parallel consuming

For very large topics when you need more parallelism (especially on the insert side) you may use several tables with the same pipeline (pre 20.9) or enable kafka_thread_per_consumer (after 20.9).

kafka_num_consumers = N,
kafka_thread_per_consumer=1

Notes:

  • the inserts will happen in parallel (without that setting inserts happen linearly)
  • enough partitions are needed.
  • kafka_num_consumers is limited by number of physical cores (half of vCPUs). kafka_disable_num_consumers_limit can be used to override the limit.
  • background_message_broker_schedule_pool_size is 16 by default, you may need to increase if using more than 16 consumers

Before increasing kafka_num_consumers with keeping kafka_thread_per_consumer=0 may improve consumption & parsing speed, but flushing & committing still happens by a single thread there (so inserts are linear).

8 - Multiple MVs attached to Kafka table

How Multiple MVs attached to Kafka table consume and how they are affected by kafka_num_consumers/kafka_thread_per_consumer

So the basic pipeline depicted is a Kafka table with 2 MVs attached. The Kafka broker has 2 topics and 4 partitions.

kafka_thread_per_consumer = 0

Kafka engine table will act as 2 consumers but only 1 thread for both consumers. For this scenario we use these settings:

kafka_num_consumers = 2
kafka_thread_per_consumer = 0

The same Kafka engine will create 2 streams, 1 for each consumer and will join them in a union stream. And it will use 1 thread [ 2385 ] This is how we can see it in the logs:

2022.11.09 17:49:34.282077 [ 2385 ] {} <Debug> StorageKafka (kafka_table): Started streaming to 2 attached views
  • How ClickHouse calculates the number of threads depending on the thread_per_consumer setting:

      auto stream_count = thread_per_consumer ? 1 : num_created_consumers;
          sources.reserve(stream_count);
          pipes.reserve(stream_count);
          for (size_t i = 0; i < stream_count; ++i)
          {
             ......
          }
    

Details:

https://github.com/ClickHouse/ClickHouse/blob/1b49463bd297ade7472abffbc931c4bb9bf213d0/src/Storages/Kafka/StorageKafka.cpp#L834

Also a detailed graph of the pipeline:

thread_per_consumer0

With this approach if the number of consumers are increased, still Kafka engine will use only 1 thread to flush. The consuming/processing rate will probably be increased but not linearly, for example 5 consumers will not consume 5 times faster. Also a good property of this approach is the linearization of INSERTS, which means that the order of the inserts is preserved and it is sequential. This option is good for small/medium kafka topics.

kafka_thread_per_consumer = 1

Kafka engine table will act as 2 consumers and 1 thread per consumers For this scenario we use these settings:

kafka_num_consumers = 2
kafka_thread_per_consumer = 1

Here the pipeline works like this:

thread_per_consumer1

With this approach the number of consumers are increased and each consumer will use a thread and so the consuming/processing rate. In this scenario it is important to remark that topic needs to have as many partitions as consumers (threads) to achieve the maximum performance. Also if the number of consumers(threads) needs to be raised to more than 16 you need to change the background pool of threads setting background_message_broker_schedule_pool_size to a higher value than 16 (which is the default). This option is good for large kafka topics with millions of messages per second.

9 - Rewind / fast-forward / replay

Rewind / fast-forward / replay
  • Step 1: Detach Kafka tables in ClickHouse
    DETACH TABLE db.kafka_table_name ON CLUSTER '{cluster}';
    
  • Step 2: kafka-consumer-groups.sh --bootstrap-server kafka:9092 --topic topic:0,1,2 --group id1 --reset-offsets --to-latest --execute
  • Step 3: Attach Kafka tables back
    ATTACH TABLE db.kafka_table_name ON CLUSTER '{cluster}';
    

See also these configuration settings:

<kafka>
  <auto_offset_reset>smallest</auto_offset_reset>
</kafka>

About Offset Consuming

When a consumer joins the consumer group, the broker will check if it has a commited offset. If that is the case, then it will start from the latest offset. Both ClickHouse and librdKafka documentation state that the default value for auto_offset_reset is largest (or latest in new Kafka versions) but it is not, if the consumer is new:

https://github.com/ClickHouse/ClickHouse/blob/f171ad93bcb903e636c9f38812b6aaf0ab045b04/src/Storages/Kafka/StorageKafka.cpp#L506

 conf.set("auto.offset.reset", "earliest");     // If no offset stored for this group, read all messages from the start

If there is no offset stored or it is out of range, for that particular consumer group, the consumer will start consuming from the beginning (earliest), and if there is some offset stored then it should use the latest. The log retention policy influences which offset values correspond to the earliest and latest configurations. Consider a scenario where a topic has a retention policy set to 1 hour. Initially, you produce 5 messages, and then, after an hour, you publish 5 more messages. In this case, the latest offset will remain unchanged from the previous example. However, due to Kafka removing the earlier messages, the earliest available offset will not be 0; instead, it will be 5.

10 - SELECTs from engine=Kafka

SELECTs from engine=Kafka

Question

What will happen, if we would run SELECT query from working Kafka table with MV attached? Would data showed in SELECT query appear later in MV destination table?

Answer

  1. Most likely SELECT query would show nothing.
  2. If you lucky enough and something would show up, those rows wouldn’t appear in MV destination table.

So it’s not recommended to run SELECT queries on working Kafka tables.

In case of debug it’s possible to use another Kafka table with different consumer_group, so it wouldn’t affect your main pipeline.