RabbitMQ
RabbitMQ engine in ClickHouse® 24.3+
Settings
Basic RabbitMQ settings and use cases: https://clickhouse.com/docs/en/engines/table-engines/integrations/rabbitmq
Latest improvements/fixes
(v23.10+)
- Allow to save unparsed records and errors in RabbitMQ:
NATS and FileLog engines. Add virtual columns
_error
and _raw_message
(for NATS and RabbitMQ), _raw_record
(for FileLog) that are filled when ClickHouse fails to parse new record.
The behaviour is controlled under storage settings nats_handle_error_mode
for NATS, rabbitmq_handle_error_mode
for RabbitMQ, handle_error_mode
for FileLog similar to kafka_handle_error_mode
.
If it’s set to default
, en exception will be thrown when ClickHouse fails to parse a record, if it’s set to stream
, error and raw record will be saved into virtual columns.
Closes #36035 and #55477
(v24+)
1 - RabbitMQ Error handling
Error handling for RabbitMQ table engine
Same approach as in Kafka but virtual columns are different. Check https://clickhouse.com/docs/en/engines/table-engines/integrations/rabbitmq#virtual-columns
CREATE TABLE IF NOT EXISTS rabbitmq.broker_errors_queue
(
exchange_name String,
channel_id String,
delivery_tag UInt64,
redelivered UInt8,
message_id String,
timestamp UInt64
)
engine = RabbitMQ
SETTINGS
rabbitmq_host_port = 'localhost:5672',
rabbitmq_exchange_name = 'exchange-test', -- required parameter even though this is done via the rabbitmq config
rabbitmq_queue_consume = true,
rabbitmq_queue_base = 'test-errors',
rabbitmq_format = 'JSONEachRow',
rabbitmq_username = 'guest',
rabbitmq_password = 'guest',
rabbitmq_handle_error_mode = 'stream';
CREATE MATERIALIZED VIEW IF NOT EXISTS rabbitmq.broker_errors_mv
(
exchange_name String,
channel_id String,
delivery_tag UInt64,
redelivered UInt8,
message_id String,
timestamp UInt64
raw_message String,
error String
)
ENGINE = MergeTree
ORDER BY (error)
SETTINGS index_granularity = 8192 AS
SELECT
_exchange_name AS exchange_name,
_channel_id AS channel_id,
_delivery_tag AS delivery_tag,
_redelivered AS redelivered,
_message_id AS message_id,
_timestamp AS timestamp,
_raw_message AS raw_message,
_error AS error
FROM rabbitmq.broker_errors_queue
WHERE length(_error) > 0