Error handling
Pre 21.6
There are couple options:
Certain formats which has schema in built in them (like JSONEachRow) could silently skip any unexpected fields after enabling setting input_format_skip_unknown_fields
It’s also possible to skip up to N malformed messages for each block, with used setting kafka_skip_broken_messages
but it’s also does not support all possible formats.
After 21.6
It’s possible to stream messages which could not be parsed, this behavior could be enabled via setting: kafka_handle_error_mode='stream'
and ClickHouse® wil write error and message from Kafka itself to two new virtual columns: _error, _raw_message
So you can create another Materialized View which would collect to a separate table all errors happening while parsing with all important information like offset and content of message.
CREATE TABLE default.kafka_engine
`i` Int64,
`s` String
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka:9092'
kafka_topic_list = 'topic',
kafka_group_name = 'clickhouse',
kafka_format = 'JSONEachRow',
CREATE MATERIALIZED VIEW default.kafka_errors
`topic` String,
`partition` Int64,
`offset` Int64,
`raw` String,
`error` String
ENGINE = MergeTree
ORDER BY (topic, partition, offset)
SETTINGS index_granularity = 8192 AS
_topic AS topic,
_partition AS partition,
_offset AS offset,
_raw_message AS raw,
_error AS error
FROM default.kafka_engine
WHERE length(_error) > 0