Kafka
Kafka
git log -- contrib/librdkafka | git name-rev --stdin
1 - Adjusting librdkafka settings
Adjusting librdkafka settings
- To set rdkafka options - add to
<kafka>
section in config.xml
or preferably use a separate file in config.d/
:
Some random example:
<yandex>
<kafka>
<max_poll_interval_ms>60000</max_poll_interval_ms>
<session_timeout_ms>60000</session_timeout_ms>
<heartbeat_interval_ms>10000</heartbeat_interval_ms>
<reconnect_backoff_ms>5000</reconnect_backoff_ms>
<reconnect_backoff_max_ms>60000</reconnect_backoff_max_ms>
<request_timeout_ms>20000</request_timeout_ms>
<retry_backoff_ms>500</retry_backoff_ms>
<message_max_bytes>20971520</message_max_bytes>
<debug>all</debug><!-- only to get the errors -->
<security_protocol>SSL</security_protocol>
<ssl_ca_location>/etc/clickhouse-server/ssl/kafka-ca-qa.crt</ssl_ca_location>
<ssl_certificate_location>/etc/clickhouse-server/ssl/client_clickhouse_client.pem</ssl_certificate_location>
<ssl_key_location>/etc/clickhouse-server/ssl/client_clickhouse_client.key</ssl_key_location>
<ssl_key_password>pass</ssl_key_password>
</kafka>
</yandex>
Authentication / connectivity
Amazon MSK
<yandex>
<kafka>
<security_protocol>sasl_ssl</security_protocol>
<sasl_username>root</sasl_username>
<sasl_password>toor</sasl_password>
</kafka>
</yandex>
SASL/SCRAM
<yandex>
<kafka>
<security_protocol>sasl_ssl</security_protocol>
<sasl_mechanism>SCRAM-SHA-512</sasl_mechanism>
<sasl_username>root</sasl_username>
<sasl_password>toor</sasl_password>
</kafka>
</yandex>
https://leftjoin.ru/all/clickhouse-as-a-consumer-to-amazon-msk/
Inline Kafka certs
To connect to some Kafka cloud services you may need to use certificates.
If needed they can be converted to pem format and inlined into ClickHouse config.xml
Example:
<kafka>
<ssl_key_pem><![CDATA[
RSA Private-Key: (3072 bit, 2 primes)
....
-----BEGIN RSA PRIVATE KEY-----
...
-----END RSA PRIVATE KEY-----
]]></ssl_key_pem>
<ssl_certificate_pem><![CDATA[
-----BEGIN CERTIFICATE-----
...
-----END CERTIFICATE-----
]]></ssl_certificate_pem>
</kafka>
See xml
https://help.aiven.io/en/articles/489572-getting-started-with-aiven-kafka
https://stackoverflow.com/questions/991758/how-to-get-pem-file-from-key-and-crt-files
Azure Event Hub
See https://github.com/ClickHouse/ClickHouse/issues/12609
Kerberos
<!-- Kerberos-aware Kafka -->
<kafka>
<security_protocol>SASL_PLAINTEXT</security_protocol>
<sasl_kerberos_keytab>/home/kafkauser/kafkauser.keytab</sasl_kerberos_keytab>
<sasl_kerberos_principal>kafkauser/kafkahost@EXAMPLE.COM</sasl_kerberos_principal>
</kafka>
confluent cloud
<yandex>
<kafka>
<auto_offset_reset>smallest</auto_offset_reset>
<security_protocol>SASL_SSL</security_protocol>
<ssl_endpoint_identification_algorithm>https</ssl_endpoint_identification_algorithm>
<sasl_mechanism>PLAIN</sasl_mechanism>
<sasl_username>username</sasl_username>
<sasl_password>password</sasl_password>
<ssl_ca_location>probe</ssl_ca_location>
<!--
<ssl_ca_location>/path/to/cert.pem</ssl_ca_location>
-->
</kafka>
</yandex>
https://docs.confluent.io/cloud/current/client-apps/config-client.html
How to test connection settings
Use kafkacat utility - it internally uses same library to access Kafla as clickhouse itself and allows easily to test different settings.
kafkacat -b my_broker:9092 -C -o -10 -t my_topic \
-X security.protocol=SASL_SSL \
-X sasl.mechanisms=PLAIN \
-X sasl.username=uerName \
-X sasl.password=Password
Different configurations for different tables?
Is there some more documentation how to use this multiconfiguration for Kafka ?
The whole logic is here:
https://github.com/ClickHouse/ClickHouse/blob/da4856a2be035260708fe2ba3ffb9e437d9b7fef/src/Storages/Kafka/StorageKafka.cpp#L466-L475
So it load the main config first, after that it load (with overwrites) the configs for all topics, listed in kafka_topic_list
of the table.
Also since v21.12 it’s possible to use more straght-forward way using named_collections:
https://github.com/ClickHouse/ClickHouse/pull/31691
So you can say something like
CREATE TABLE test.kafka (key UInt64, value UInt64) ENGINE = Kafka(kafka1, kafka_format='CSV');
And after that in configuration:
<clickhouse>
<named_collections>
<kafka1>
<kafka_broker_list>kafka1:19092</kafka_broker_list>
<kafka_topic_list>conf</kafka_topic_list>
<kafka_group_name>conf</kafka_group_name>
</kafka1>
</named_collections>
</clickhouse>
<yandex>
<named_collections>
<kafka_preset1>
<kafka_broker_list>...</kafka_broker_list>
<kafka_topic_list>foo.bar</kafka_topic_list>
<kafka_group_name>foo.bar.group</kafka_group_name>
<kafka>
<security_protocol>...</security_protocol>
<sasl_mechanism>...</sasl_mechanism>
<sasl_username>...</sasl_username>
<sasl_password>...</sasl_password>
<auto_offset_reset>smallest</auto_offset_reset>
<ssl_endpoint_identification_algorithm>https</ssl_endpoint_identification_algorithm>
<ssl_ca_location>probe</ssl_ca_location>
</kafka>
</kafka_preset1>
</named_collections>
</yandex>
The same fragment of code in newer versions:
https://github.com/ClickHouse/ClickHouse/blob/d19e24f530c30f002488bc136da78f5fb55aedab/src/Storages/Kafka/StorageKafka.cpp#L474-L496
2 - Error handling
Error handling
Pre 21.6
There are couple options:
Certain formats which has schema in built in them (like JSONEachRow) could silently skip any unexpected fields after enabling setting input_format_skip_unknown_fields
It’s also possible to skip up to N malformed messages for each block, with used setting kafka_skip_broken_messages
but it’s also does not support all possible formats.
After 21.6
It’s possible to stream messages which could not be parsed, this behavior could be enabled via setting: kafka_handle_error_mode='stream'
and clickhouse wil write error and message from Kafka itself to two new virtual columns: _error, _raw_message
.
So you can create another Materialized View which would collect to a separate table all errors happening while parsing with all important information like offset and content of message.
CREATE TABLE default.kafka_engine
(
`i` Int64,
`s` String
)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka:9092'
kafka_topic_list = 'topic',
kafka_group_name = 'clickhouse',
kafka_format = 'JSONEachRow',
kafka_handle_error_mode='stream';
CREATE MATERIALIZED VIEW default.kafka_errors
(
`topic` String,
`partition` Int64,
`offset` Int64,
`raw` String,
`error` String
)
ENGINE = MergeTree
ORDER BY (topic, partition, offset)
SETTINGS index_granularity = 8192 AS
SELECT
_topic AS topic,
_partition AS partition,
_offset AS offset,
_raw_message AS raw,
_error AS error
FROM default.kafka_engine
WHERE length(_error) > 0

https://github.com/ClickHouse/ClickHouse/pull/20249#issuecomment-779054737
https://github.com/ClickHouse/ClickHouse/pull/21850
https://altinity.com/blog/clickhouse-kafka-engine-faq
3 - Exactly once semantics
Exactly once semantics
EOS consumer (isolation.level=read_committed) is enabled by default since librdkafka 1.2.0, so for ClickHouse - since 20.2
See:
BUT: while EOS semantics will guarantee you that no duplicates will happen on the Kafka side (i.e. even if you produce the same messages few times it will be consumed once), but ClickHouse as a Kafka client can currently guarantee only at-least-once. And in some corner cases (connection lost etc) you can get duplicates.
We need to have something like transactions on ClickHouse side to be able to avoid that. Adding something like simple transactions is in plans for Y2022.
block-aggregator by eBay
Block Aggregator is a data loader that subscribes to Kafka topics, aggregates the Kafka messages into blocks that follow the Clickhouse’s table schemas, and then inserts the blocks into ClickHouse. Block Aggregator provides exactly-once delivery guarantee to load data from Kafka to ClickHouse. Block Aggregator utilizes Kafka’s metadata to keep track of blocks that are intended to send to ClickHouse, and later uses this metadata information to deterministically re-produce ClickHouse blocks for re-tries in case of failures. The identical blocks are guaranteed to be deduplicated by ClickHouse.
eBay/block-aggregator
4 - Kafka main parsing loop
Kafka main parsing loop
One of the threads from scheduled_pool (pre 20.9) / background_message_broker_schedule_pool
(after 20.9) do that in infinite loop:
- Batch poll (time limit:
kafka_poll_timeout_ms
500ms, messages limit: kafka_poll_max_batch_size
65536) - Parse messages.
- If we don’t have enough data (rows limit:
kafka_max_block_size
1048576) or time limit reached (kafka_flush_interval_ms
7500ms) - continue polling (goto p.1) - Write a collected block of data to MV
- Do commit (commit after write = at-least-once).
On any error, during that process, Kafka client is restarted (leading to rebalancing - leave the group and get back in few seconds).

Important settings
These usually should not be adjusted:
kafka_poll_max_batch_size
= max_block_size (65536)kafka_poll_timeout_ms
= stream_poll_timeout_ms (500ms)
You may want to adjust those depending on your scenario:
kafka_flush_interval_ms
= stream_poll_timeout_ms (7500ms)kafka_max_block_size
= max_insert_block_size / kafka_num_consumers (for the single consumer: 1048576)
See also
https://github.com/ClickHouse/ClickHouse/pull/11388
5 - Kafka parallel consuming
Kafka parallel consuming
For very large topics when you need more parallelism (especially on the insert side) you may use several tables with the same pipeline (pre 20.9) or enable kafka_thread_per_consumer
(after 20.9).
kafka_num_consumers = N,
kafka_thread_per_consumer=1
Notes:
- the inserts will happen in parallel (without that setting inserts happen linearly)
- enough partitions are needed.
kafka_num_consumers
is limited by number of physical cores (half of vCPUs). kafka_disable_num_consumers_limit
can be used to override the limit.background_message_broker_schedule_pool_size
is 16 by default, you may need to increase if using more than 16 consumers
Before increasing kafka_num_consumers
with keeping kafka_thread_per_consumer=0
may improve consumption & parsing speed, but flushing & committing still happens by a single thread there (so inserts are linear).
6 - Rewind / fast-forward / replay
Rewind / fast-forward / replay
- Step 1: Detach Kafka tables in ClickHouse
- Step 2:
kafka-consumer-groups.sh --bootstrap-server kafka:9092 --topic topic:0,1,2 --group id1 --reset-offsets --to-latest --execute
- Step: Attach Kafka tables back
See also these configuration settings:
<kafka>
<auto_offset_reset>smallest</auto_offset_reset>
</kafka>
About Offset Consuming
When a consumer joins the consumer group, the broker will check if it has a commited offset. If that is the case, then it will start from the latest offset. Both ClickHouse and librdKafka documentation state that the default value for auto_offset_reset
is largest (or latest
in new Kafka versions) but it is not, if the consumer is new:
https://github.com/ClickHouse/ClickHouse/blob/f171ad93bcb903e636c9f38812b6aaf0ab045b04/src/Storages/Kafka/StorageKafka.cpp#L506
conf.set("auto.offset.reset", "earliest"); // If no offset stored for this group, read all messages from the start
If there is no offset stored or it is out of range, for that particular consumer group, the consumer will start consuming from the beginning (earliest
), and if there is some offset stored then it should use the latest
.
The log retention policy influences which offset values correspond to the earliest
and latest
configurations. Consider a scenario where a topic has a retention policy set to 1 hour. Initially, you produce 5 messages, and then, after an hour, you publish 5 more messages. In this case, the latest offset will remain unchanged from the previous example. However, due to Kafka removing the earlier messages, the earliest available offset will not be 0; instead, it will be 5.
7 - SELECTs from engine=Kafka
SELECTs from engine=Kafka
Question
What will happen, if we would run SELECT query from working Kafka table with MV attached? Would data showed in SELECT query appear later in MV destination table?
Answer
- Most likely SELECT query would show nothing.
- If you lucky enough and something would show up, those rows wouldn’t appear in MV destination table.
So it’s not recommended to run SELECT queries on working Kafka tables.
In case of debug it’s possible to use another Kafka table with different consumer_group
, so it wouldn’t affect your main pipeline.