clickhouse-copier
clickhouse-copier
The description of the utility and its parameters, as well as examples of the config files that you need to create for the copier are in the official repo for the ClickHouse® copier utility
The steps to run a task:
Create a config file for clickhouse-copier
(zookeeper.xml)
Create a config file for the task (task1.xml)
Create the task in ZooKeeper and start an instance of clickhouse-copier
clickhouse-copier --daemon --base-dir=/opt/clickhouse-copier --config=/opt/clickhouse-copier/zookeeper.xml --task-path=/clickhouse/copier/task1 --task-file=/opt/clickhouse-copier/task1.xml
If the node in ZooKeeper already exists and you want to change it, you need to add the task-upload-force
parameter:
clickhouse-copier --daemon --base-dir=/opt/clickhouse-copier --config=/opt/clickhouse-copier/zookeeper.xml --task-path=/clickhouse/copier/task1 --task-file=/opt/clickhouse-copier/task1.xml --task-upload-force=1
If you want to run another instance of clickhouse-copier
for the same task, you need to copy the config file (zookeeper.xml) to another server, and run this command:
clickhouse-copier --daemon --base-dir=/opt/clickhouse-copier --config=/opt/clickhouse-copier/zookeeper.xml --task-path=/clickhouse/copier/task1
The number of simultaneously running instances is controlled be the max_workers
parameter in your task configuration file. If you run more workers superfluous workers will sleep and log messages like this:
<Debug> ClusterCopier: Too many workers (1, maximum 1). Postpone processing
See also
1 - clickhouse-copier 20.3 and earlier
clickhouse-copier 20.3 and earlier
clickhouse-copier
was created to move data between clusters.
It runs simple INSERT…SELECT queries and can copy data between tables with different engine parameters and between clusters with different number of shards.
In the task configuration file you need to describe the layout of the source and the target cluster, and list the tables that you need to copy. You can copy whole tables or specific partitions.
clickhouse-copier
uses temporary distributed tables to select from the source cluster and insert into the target cluster.
The process is as follows
- Process the configuration files.
- Discover the list of partitions if not provided in the config.
- Copy partitions one by one.
- Drop the partition from the target table if it’s not empty
- Copy data from source shards one by one.
- Check if there is data for the partition on a source shard.
- Check the status of the task in ZooKeeper.
- Create target tables on all shards of the target cluster.
- Insert the partition of data into the target table.
- Mark the partition as completed in ZooKeeper.
If there are several workers running simultaneously, they will assign themselves to different source shards.
If a worker was interrupted, another worker can be started to continue the task. The next worker will drop incomplete partitions and resume the copying.
Configuring the engine of the target table
clickhouse-copier
uses the engine from the task configuration file for these purposes:
- to create target tables if they don’t exist.
- PARTITION BY: to SELECT a partition of data from the source table, to DROP existing partitions from target tables.
clickhouse-copier
does not support the old MergeTree format.
However, you can create the target tables manually and specify the engine in the task configuration file in the new format so that clickhouse-copier
can parse it for its SELECT queries.
How to monitor the status of running tasks
clickhouse-copier
uses ZooKeeper to keep track of the progress and to communicate between workers.
Here is a list of queries that you can use to see what’s happening.
--task-path /clickhouse/copier/task1
-- The task config
select * from system.zookeeper
where path='<task-path>'
name | ctime | mtime
----------------------------+---------------------+--------------------
description | 2019-10-18 15:40:00 | 2020-09-11 16:01:14
task_active_workers_version | 2019-10-18 16:00:09 | 2020-09-11 16:07:08
tables | 2019-10-18 16:00:25 | 2019-10-18 16:00:25
task_active_workers | 2019-10-18 16:00:09 | 2019-10-18 16:00:09
-- Running workers
select * from system.zookeeper
where path='<task-path>/task_active_workers'
-- The list of processed tables
select * from system.zookeeper
where path='<task-path>/tables'
-- The list of processed partitions
select * from system.zookeeper
where path='<task-path>/tables/<table>'
name | ctime
-------+--------------------
201909 | 2019-10-18 18:24:18
-- The status of a partition
select * from system.zookeeper
where path='<task-path>/tables/<table>/<partition>'
name | ctime
-------------------------+--------------------
shards | 2019-10-18 18:24:18
partition_active_workers | 2019-10-18 18:24:18
-- The status of source shards
select * from system.zookeeper
where path='<task-path>/tables/<table>/<partition>/shards'
name | ctime | mtime
-----+---------------------+--------------------
1 | 2019-10-18 22:37:48 | 2019-10-18 22:49:29
2 - clickhouse-copier 20.4 - 21.6
clickhouse-copier 20.4 - 21.6
clickhouse-copier
was created to move data between clusters.
It runs simple INSERT…SELECT
queries and can copy data between tables with different engine parameters and between clusters with different number of shards.
In the task configuration file you need to describe the layout of the source and the target cluster, and list the tables that you need to copy. You can copy whole tables or specific partitions.
clickhouse-copier
uses temporary distributed tables to select from the source cluster and insert into the target cluster.
The behavior of clickhouse-copier
was changed in 20.4:
- Now
clickhouse-copier
inserts data into intermediate tables, and after the insert finishes successfully clickhouse-copier
attaches the completed partition into the target table. This allows for incremental data copying, because the data in the target table is intact during the process. Important note: ATTACH PARTITION respects the max_partition_size_to_drop
limit. Make sure the max_partition_size_to_drop
limit is big enough (or set to zero) in the destination cluster. If clickhouse-copier
is unable to attach a partition because of the limit, it will proceed to the next partition, and it will drop the intermediate table when the task is finished (if the intermediate table is less than the max_table_size_to_drop
limit). Another important note: ATTACH PARTITION is replicated. The attached partition will need to be downloaded by the other replicas. This can create significant network traffic between ClickHouse nodes. If an attach takes a long time, clickhouse-copier
will log a timeout and will proceed to the next step. - Now
clickhouse-copier
splits the source data into chunks and copies them one by one. This is useful for big source tables, when inserting one partition of data can take hours. If there is an error during the insert clickhouse-copier
has to drop the whole partition and start again. The number_of_splits
parameter lets you split your data into chunks so that in case of an exception clickhouse-copier
has to re-insert only one chunk of the data. - Now
clickhouse-copier
runs OPTIMIZE target_table PARTITION ... DEDUPLICATE
for non-Replicated MergeTree tables. Important note: This is a very strange feature that can do more harm than good. We recommend to disable it by configuring the engine of the target table as Replicated in the task configuration file, and create the target tables manually if they are not supposed to be replicated. Intermediate tables are always created as plain MergeTree.
The process is as follows
- Process the configuration files.
- Discover the list of partitions if not provided in the config.
- Copy partitions one by one ** The metadata in ZooKeeper suggests the order described here.**
- Copy chunks of data one by one.
- Copy data from source shards one by one.
- Create intermediate tables on all shards of the target cluster.
- Check the status of the chunk in ZooKeeper.
- Drop the partition from the intermediate table if the previous attempt was interrupted.
- Insert the chunk of data into the intermediate tables.
- Mark the shard as completed in ZooKeeper
- Attach the chunks of the completed partition into the target table one by one
- Attach a chunk into the target table.
- non-Replicated: Run OPTIMIZE target_table DEDUPLICATE for the partition on the target table.
- Drop intermediate tables (may not succeed if the tables are bigger than
max_table_size_to_drop
).
If there are several workers running simultaneously, they will assign themselves to different source shards.
If a worker was interrupted, another worker can be started to continue the task. The next worker will drop incomplete partitions and resume the copying.
Configuring the engine of the target table
clickhouse-copier
uses the engine from the task configuration file for these purposes:
- to create target and intermediate tables if they don’t exist.
- PARTITION BY: to SELECT a partition of data from the source table, to ATTACH partitions into target tables, to DROP incomplete partitions from intermediate tables, to OPTIMIZE partitions after they are attached to the target.
- ORDER BY: to SELECT a chunk of data from the source table.
Here is an example of SELECT that clickhouse-copier
runs to get the sixth of ten chunks of data:
WHERE (<the PARTITION BY clause> = (<a value of the PARTITION BY expression> AS partition_key))
AND (cityHash64(<the ORDER BY clause>) % 10 = 6 )
clickhouse-copier
does not support the old MergeTree format.
However, you can create the intermediate tables manually with the same engine as the target tables (otherwise ATTACH will not work), and specify the engine in the task configuration file in the new format so that clickhouse-copier
can parse it for SELECT, ATTACH PARTITION and DROP PARTITION queries.
Important note: always configure engine as Replicated to disable OPTIMIZE … DEDUPLICATE (unless you know why you need clickhouse-copier
to run OPTIMIZE … DEDUPLICATE).
The default value for number_of_splits
is 10.
You can change this parameter in the table
section of the task configuration file. We recommend setting it to 1 for smaller tables.
<cluster_push>target_cluster</cluster_push>
<database_push>target_database</database_push>
<table_push>target_table</table_push>
<number_of_splits>1</number_of_splits>
<engine>Engine=Replicated...<engine>
How to monitor the status of running tasks
clickhouse-copier
uses ZooKeeper to keep track of the progress and to communicate between workers.
Here is a list of queries that you can use to see what’s happening.
--task-path=/clickhouse/copier/task1
-- The task config
select * from system.zookeeper
where path='<task-path>'
name | ctime | mtime
----------------------------+---------------------+--------------------
description | 2021-03-22 13:15:48 | 2021-03-22 13:25:28
status | 2021-03-22 13:15:48 | 2021-03-22 13:25:28
task_active_workers_version | 2021-03-22 13:15:48 | 2021-03-22 20:32:09
tables | 2021-03-22 13:16:47 | 2021-03-22 13:16:47
task_active_workers | 2021-03-22 13:15:48 | 2021-03-22 13:15:48
-- Status
select * from system.zookeeper
where path='<task-path>/status'
-- Running workers
select * from system.zookeeper
where path='<task-path>/task_active_workers'
-- The list of processed tables
select * from system.zookeeper
where path='<task-path>/tables'
-- The list of processed partitions
select * from system.zookeeper
where path='<task-path>/tables/<table>'
name | ctime
-------+--------------------
202103 | 2021-03-22 13:16:47
202102 | 2021-03-22 13:18:31
202101 | 2021-03-22 13:27:36
202012 | 2021-03-22 14:05:08
-- The status of a partition
select * from system.zookeeper
where path='<task-path>/tables/<table>/<partition>'
name | ctime
---------------+--------------------
piece_0 | 2021-03-22 13:18:31
attach_is_done | 2021-03-22 14:05:05
-- The status of a piece
select * from system.zookeeper
where path='<task-path>/tables/<table>/<partition>/piece_N'
name | ctime
-------------------------------+--------------------
shards | 2021-03-22 13:18:31
is_dirty | 2021-03-22 13:26:51
partition_piece_active_workers | 2021-03-22 13:26:54
clean_start | 2021-03-22 13:26:54
-- The status of source shards
select * from system.zookeeper
where path='<task-path>/tables/<table>/<partition>/piece_N/shards'
name | ctime | mtime
-----+---------------------+--------------------
1 | 2021-03-22 13:26:54 | 2021-03-22 14:05:05
3 - Kubernetes job for clickhouse-copier
Kubernetes job for clickhouse-copier
clickhouse-copier
deployment in kubernetes
clickhouse-copier
can be deployed in a kubernetes environment to automate some simple backups or copy fresh data between clusters.
Some documentation to read:
Deployment
Use a kubernetes job is recommended but a simple pod can be used if you only want to execute the copy one time.
Just edit/change all the yaml
files to your needs.
1) Create the PVC:
First create a namespace in which all the pods and resources are going to be deployed
kubectl create namespace clickhouse-copier
Then create the PVC using a storageClass
gp2-encrypted class or use any other storageClass from other providers:
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: copier-logs
namespace: clickhouse-copier
spec:
storageClassName: gp2-encrypted
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 100Mi
and deploy:
kubectl -n clickhouse-copier create -f ./kubernetes/copier-pvc.yaml
2) Create the configmap:
The configmap has both files zookeeper.xml
and task01.xml
with the zookeeper node listing and the parameters for the task respectively.
---
apiVersion: v1
kind: ConfigMap
metadata:
name: copier-config
namespace: clickhouse-copier
data:
task01.xml: |
<clickhouse>
<logger>
<console>true</console>
<log remove="remove"/>
<errorlog remove="remove"/>
<level>trace</level>
</logger>
<remote_servers>
<all-replicated>
<shard>
<replica>
<host>clickhouse01.svc.cluster.local</host>
<port>9000</port>
<user>chcopier</user>
<password>pass</password>
</replica>
<replica>
<host>clickhouse02.svc.cluster.local</host>
<port>9000</port>
<user>chcopier</user>
<password>pass</password>
</replica>
</shard>
</all-replicated>
<all-sharded>
<!-- <secret></secret> -->
<shard>
<replica>
<host>clickhouse03.svc.cluster.local</host>
<port>9000</port>
<user>chcopier</user>
<password>pass</password>
</replica>
</shard>
<shard>
<replica>
<host>clickhouse03.svc.cluster.local</host>
<port>9000</port>
<user>chcopier</user>
<password>pass</password>
</replica>
</shard>
</all-sharded>
</remote_servers>
<max_workers>1</max_workers>
<settings_pull>
<readonly>1</readonly>
</settings_pull>
<settings_push>
<readonly>0</readonly>
</settings_push>
<settings>
<connect_timeout>3</connect_timeout>
<insert_distributed_sync>1</insert_distributed_sync>
</settings>
<tables>
<table_sales>
<cluster_pull>all-replicated</cluster_pull>
<database_pull>default</database_pull>
<table_pull>fact_sales_event</table_pull>
<cluster_push>all-sharded</cluster_push>
<database_push>default</database_push>
<table_push>fact_sales_event</table_push>
<engine>
Engine=ReplicatedMergeTree('/clickhouse/{cluster}/tables/{shard}/fact_sales_event', '{replica}')
PARTITION BY toYYYYMM(timestamp)
ORDER BY (channel_id, product_id)
SETTINGS index_granularity = 8192
</engine>
<sharding_key>rand()</sharding_key>
</table_ventas>
</tables>
</clickhouse>
zookeeper.xml: |
<clickhouse>
<logger>
<level>trace</level>
<size>100M</size>
<count>3</count>
</logger>
<zookeeper>
<node>
<host>zookeeper1.svc.cluster.local</host>
<port>2181</port>
</node>
<node>
<host>zookeeper2.svc.cluster.local</host>
<port>2181</port>
</node>
<node>
<host>zookeeper3.svc.cluster.local</host>
<port>2181</port>
</node>
</zookeeper>
</clickhouse>
and deploy:
kubectl -n clickhouse-copier create -f ./kubernetes/copier-configmap.yaml
The task01.xml
file has many parameters to take into account explained in the repo for clickhouse-copier. Important to note that it is needed a FQDN for the Zookeeper nodes and ClickHouse® server that are valid for the cluster. As the deployment creates a new namespace, it is recommended to use a FQDN linked to a service. For example zookeeper01.svc.cluster.local
. This file should be adapted to both clusters topologies and to the needs of the user.
The zookeeper.xml
file is pretty straightforward with a simple 3 node ensemble configuration.
3) Create the job:
Basically the job will download the official ClickHouse image and will create a pod with 2 containers:
clickhouse-copier: This container will run the clickhouse-copier utility.
sidecar-logging: This container will be used to read the logs of the clickhouse-copier container for different runs (this part can be improved):
---
apiVersion: batch/v1
kind: Job
metadata:
name: clickhouse-copier-test
namespace: clickhouse-copier
spec:
# only for kubernetes 1.23
# ttlSecondsAfterFinished: 86400
template:
spec:
containers:
- name: clickhouse-copier
image: clickhouse/clickhouse-server:21.8
command:
- clickhouse-copier
- --task-upload-force=1
- --config-file=$(CH_COPIER_CONFIG)
- --task-path=$(CH_COPIER_TASKPATH)
- --task-file=$(CH_COPIER_TASKFILE)
- --base-dir=$(CH_COPIER_BASEDIR)
env:
- name: CH_COPIER_CONFIG
value: "/var/lib/clickhouse/tmp/zookeeper.xml"
- name: CH_COPIER_TASKPATH
value: "/clickhouse/copier/tasks/task01"
- name: CH_COPIER_TASKFILE
value: "/var/lib/clickhouse/tmp/task01.xml"
- name: CH_COPIER_BASEDIR
value: "/var/lib/clickhouse/tmp"
resources:
limits:
cpu: "1"
memory: 2048Mi
volumeMounts:
- name: copier-config
mountPath: /var/lib/clickhouse/tmp/zookeeper.xml
subPath: zookeeper.xml
- name: copier-config
mountPath: /var/lib/clickhouse/tmp/task01.xml
subPath: task01.xml
- name: copier-logs
mountPath: /var/lib/clickhouse/tmp
- name: sidecar-logger
image: busybox:1.35
command: ['/bin/sh', '-c', 'tail', '-n', '1000', '-f', '/tmp/copier-logs/clickhouse-copier*/*.log']
resources:
limits:
cpu: "1"
memory: 512Mi
volumeMounts:
- name: copier-logs
mountPath: /tmp/copier-logs
volumes:
- name: copier-config
configMap:
name: copier-config
items:
- key: zookeeper.xml
path: zookeeper.xml
- key: task01.xml
path: task01.xml
- name: copier-logs
persistentVolumeClaim:
claimName: copier-logs
restartPolicy: Never
backoffLimit: 3
Deploy and watch progress checking the logs:
kubectl -n clickhouse-copier logs <podname> sidecar-logging