ClickHouse 数据导入实战:从MySQL到ClickHouse的迁移方案
关键词:ClickHouse、MySQL、数据迁移、ETL、数据仓库、高性能分析、大数据
摘要:本文详细介绍了如何将数据从MySQL迁移到ClickHouse的完整方案。我们将从基础概念讲起,逐步深入探讨多种迁移方法,包括直接导出导入、使用ETL工具、实时同步等方案。文章包含详细的代码示例和性能优化建议,帮助读者在实际项目中实现高效、可靠的数据迁移。
背景介绍
目的和范围
本文旨在为需要将数据从MySQL迁移到ClickHouse的开发者和数据分析师提供一套完整的解决方案。我们将覆盖从基础概念到高级技巧的全方位内容,包括但不限于:
ClickHouse与MySQL的核心差异数据迁移前的准备工作多种迁移方案的技术实现性能优化和常见问题解决
预期读者
数据库管理员(DBA)数据分析师后端开发工程师数据仓库架构师任何对大数据处理感兴趣的技术人员
文档结构概述
文章将从基础概念讲起,逐步深入到具体实现方案,最后讨论优化技巧和未来趋势。每个部分都配有详细的代码示例和示意图,确保读者能够轻松理解和实践。
术语表
核心术语定义
ClickHouse:由Yandex开发的开源列式数据库管理系统,专为在线分析处理(OLAP)设计MySQL:流行的开源关系型数据库管理系统(RDBMS),广泛用于在线事务处理(OLTP)ETL:Extract-Transform-Load的缩写,指数据抽取、转换和加载的过程物化视图:预先计算并存储的查询结果,用于加速特定查询
相关概念解释
OLTP vs OLAP:OLTP(在线事务处理)系统优化了大量小事务的处理,而OLAP(在线分析处理)系统优化了复杂分析查询列式存储 vs 行式存储:列式存储将同一列的数据连续存储,适合分析查询;行式存储将同一行的数据连续存储,适合事务处理
缩略词列表
OLTP:Online Transaction ProcessingOLAP:Online Analytical ProcessingETL:Extract-Transform-LoadCSV:Comma-Separated ValuesJDBC:Java Database Connectivity
核心概念与联系
故事引入
想象你经营着一家快速发展的电商公司。最初,你的MySQL数据库完美处理了用户订单、产品目录和客户信息。但随着业务增长,你需要分析一年内哪些产品组合最受欢迎,或者预测季节性销售趋势。这时,MySQL开始显得力不从心——复杂的分析查询需要几分钟甚至几小时才能完成。
这时,你听说了ClickHouse——一个专为分析查询设计的数据库,能在秒级完成MySQL需要几分钟才能完成的查询。但如何将现有数据从MySQL迁移到ClickHouse呢?这就是我们今天要解决的冒险任务!
核心概念解释
ClickHouse:数据分析的超级引擎
ClickHouse就像一个专门为分析任务设计的超级计算机。不同于MySQL(像一位细心的图书管理员,擅长快速找到并更新单本书籍),ClickHouse更像是一位统计专家,能快速分析整个图书馆的借阅模式、热门书籍趋势等。
它的秘密武器是:
列式存储:数据按列而不是按行存储,分析时只需读取相关列向量化执行:一次处理一批数据而不是单条记录数据压缩:相似的数据值能高效压缩,节省空间
MySQL到ClickHouse迁移的挑战
把数据从MySQL搬到ClickHouse,就像把家具从公寓搬到别墅:
结构差异:公寓(MySQL)的布局和别墅(ClickHouse)不同,需要重新安排数据类型映射:有些MySQL数据类型在ClickHouse中没有直接对应数据量大:可能需要分批搬运,避免”堵车”实时同步:如何在搬运过程中保持数据更新?
核心概念之间的关系
MySQL和ClickHouse的分工协作
想象MySQL是商店的收银台,ClickHouse是后台的分析部门:
MySQL:快速记录每一笔交易(写优化)ClickHouse:分析所有交易数据,发现趋势(读优化)
它们可以共存于一个系统,各司其职:
MySQL处理日常事务(下单、支付)定期将数据同步到ClickHouse进行分析ClickHouse的分析结果可以反馈给MySQL改进业务
迁移方案的选择
根据数据量和实时性需求,我们可以选择不同的”搬运方式”:
小数据量、一次性迁移 → 直接导出导入 (搬家公司的整车运输)
中等数据量、定期更新 → ETL工具 (定时的货运列车)
大数据量、实时同步 → CDC(变更数据捕获) (实时传送带)
核心概念原理和架构的文本示意图
典型的MySQL到ClickHouse迁移架构:
[MySQL] → [Extract] → [Transform] → [Load] → [ClickHouse]
│ │
├─ CSV/JSON 导出 ├─ 直接插入
├─ JDBC 连接 ├─ 批量导入
└─ Binlog 监听 └─ 物化视图
Mermaid 流程图
核心算法原理 & 具体操作步骤
方案一:直接导出导入(适合小数据量一次性迁移)
原理
这是最简单的迁移方法,分为三个步骤:
从MySQL导出数据到中间文件(CSV/JSON)传输文件到ClickHouse服务器使用ClickHouse的本地文件导入功能加载数据
Python实现示例
import pymysql
import subprocess
from clickhouse_driver import Client
# 1. 从MySQL导出数据到CSV
def export_mysql_to_csv():
conn = pymysql.connect(host='mysql_host', user='user',
password='password', db='database')
cursor = conn.cursor()
# 执行查询并写入CSV
cursor.execute("SELECT * FROM orders")
with open('/tmp/orders.csv', 'w') as f:
for row in cursor:
f.write(','.join(str(col) for col in row) + '
')
cursor.close()
conn.close()
# 2. 传输文件到ClickHouse服务器 (假设使用scp)
def transfer_file():
subprocess.run([
'scp', '/tmp/orders.csv',
'clickhouse_user@clickhouse_host:/tmp/'
])
# 3. 导入数据到ClickHouse
def import_to_clickhouse():
ch_client = Client(host='clickhouse_host')
# 确保表结构已创建
ch_client.execute('''
CREATE TABLE IF NOT EXISTS orders (
id UInt32,
user_id UInt32,
product_id UInt32,
amount Float32,
order_date Date
) ENGINE = MergeTree()
ORDER BY (order_date, id)
''')
# 使用文件导入
ch_client.execute(
"INSERT INTO orders FORMAT CSV",
open('/tmp/orders.csv', 'r')
)
if __name__ == '__main__':
export_mysql_to_csv()
transfer_file()
import_to_clickhouse()
方案二:使用ETL工具(适合中等数据量定期同步)
原理
ETL工具提供了更强大的数据处理能力:
抽取(Extract):从MySQL读取数据转换(Transform):数据清洗、类型转换、聚合等加载(Load):将处理后的数据写入ClickHouse
我们使用Apache Airflow作为ETL调度工具。
Airflow DAG示例
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
from clickhouse_driver import Client
import pymysql
default_args = {
'owner': 'admin',
'depends_on_past': False,
'start_date': datetime(2023, 1, 1),
'retries': 3,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'mysql_to_clickhouse',
default_args=default_args,
schedule_interval='@daily'
)
def extract_mysql_data(**context):
conn = pymysql.connect(host='mysql_host', user='user',
password='password', db='database')
cursor = conn.cursor(pymysql.cursors.DictCursor)
# 获取执行日期(Airflow提供的逻辑日期)
exec_date = context['execution_date'].strftime('%Y-%m-%d')
# 查询当天的数据
cursor.execute(f'''
SELECT * FROM orders
WHERE order_date = '{exec_date}'
''')
# 将数据推送到XCom(Airflow的任务间通信机制)
return cursor.fetchall()
def transform_data(**context):
# 从XCom获取数据
ti = context['ti']
raw_data = ti.xcom_pull(task_ids='extract_mysql_data')
# 简单转换:将MySQL的DECIMAL转为Float
transformed = []
for row in raw_data:
new_row = row.copy()
if 'amount' in new_row:
new_row['amount'] = float(new_row['amount'])
transformed.append(new_row)
return transformed
def load_to_clickhouse(**context):
ti = context['ti']
data = ti.xcom_pull(task_ids='transform_data')
if not data:
return
ch_client = Client(host='clickhouse_host')
# 准备批量插入
columns = list(data[0].keys())
values = [tuple(row.values()) for row in data]
ch_client.execute(
f"INSERT INTO orders ({','.join(columns)}) VALUES",
values
)
extract_task = PythonOperator(
task_id='extract_mysql_data',
python_callable=extract_mysql_data,
provide_context=True,
dag=dag
)
transform_task = PythonOperator(
task_id='transform_data',
python_callable=transform_data,
provide_context=True,
dag=dag
)
load_task = PythonOperator(
task_id='load_to_clickhouse',
python_callable=load_to_clickhouse,
provide_context=True,
dag=dag
)
extract_task >> transform_task >> load_task
方案三:实时同步(使用CDC技术)
原理
变更数据捕获(CDC)技术可以实时捕捉MySQL的变更并应用到ClickHouse:
监听MySQL的binlog(二进制日志)解析binlog事件(插入/更新/删除)将变更应用到ClickHouse
使用Debezium作为CDC工具,Kafka作为消息队列。
架构示意图
[MySQL] → [Debezium] → [Kafka] → [ClickHouse Sink Connector] → [ClickHouse]
实现步骤
配置MySQL启用binlog
# 在my.cnf中
[mysqld]
server-id = 1
log_bin = mysql-bin
binlog_format = ROW
binlog_row_image = FULL
启动Debezium MySQL连接器
curl -i -X POST -H "Accept:application/json"
-H "Content-Type:application/json"
http://debezium-server:8083/connectors/
-d '{
"name": "mysql-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql_host",
"database.port": "3306",
"database.user": "debezium",
"database.password": "password",
"database.server.id": "184054",
"database.server.name": "mysql",
"database.include.list": "your_database",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.your_database",
"include.schema.changes": "true"
}
}'
配置ClickHouse Kafka引擎表
CREATE TABLE kafka_orders (
id UInt32,
user_id UInt32,
product_id UInt32,
amount Float32,
order_date Date,
_op String
) ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka:9092',
kafka_topic_list = 'mysql.your_database.orders',
kafka_group_name = 'clickhouse_consumer_group',
kafka_format = 'JSONEachRow';
创建物化视图处理数据
CREATE TABLE orders (
id UInt32,
user_id UInt32,
product_id UInt32,
amount Float32,
order_date Date
) ENGINE = ReplacingMergeTree()
ORDER BY (id);
CREATE MATERIALIZED VIEW orders_consumer TO orders AS
SELECT
id, user_id, product_id, amount, order_date
FROM kafka_orders
WHERE _op = 'r' OR _op = 'c'; -- 只处理读取和创建操作
数学模型和公式 & 详细讲解
数据迁移性能模型
迁移性能主要受以下因素影响:
数据提取速度 VextractV_{extract}Vextract网络传输速度 VtransferV_{transfer}Vtransfer数据加载速度 VloadV_{load}Vload
总迁移时间 TtotalT_{total}Ttotal 可以表示为:
其中:
DDD 是数据总量ToverheadT_{overhead}Toverhead 是各种开销时间(连接建立、转换等)
批量插入优化
ClickHouse的插入性能与批量大小密切相关。最佳批量大小可以通过以下公式估算:
其中:
CsetupC_{setup}Csetup 是每次插入的固定开销(约10-100ms)Cper_rowC_{per\_row}Cper_row 是每行的处理开销(约0.01-0.1ms)
对于典型值:
但在实践中,由于网络延迟等因素,通常使用更大的批量(1000-10000行)效果更好。
数据压缩率预测
ClickHouse的压缩率取决于数据特征。对于列XXX,压缩率RRR可近似为:
其中:
H(X)H(X)H(X) 是列的熵entropy(X) ext{entropy}(X)entropy(X) 是列的信息熵
对于低基数列(如性别、布尔值),压缩率可能达到10:1以上;而对于高熵数据(如随机数),压缩率可能接近1:1。
项目实战:代码实际案例和详细解释说明
开发环境搭建
1. 准备测试数据
在MySQL中创建测试表并生成数据:
-- MySQL
CREATE DATABASE ecommerce;
USE ecommerce;
CREATE TABLE orders (
id INT AUTO_INCREMENT PRIMARY KEY,
user_id INT NOT NULL,
product_id INT NOT NULL,
amount DECIMAL(10,2) NOT NULL,
order_date DATE NOT NULL,
INDEX idx_date (order_date)
);
-- 生成100万测试数据
INSERT INTO orders (user_id, product_id, amount, order_date)
SELECT
FLOOR(RAND() * 1000) + 1,
FLOOR(RAND() * 100) + 1,
ROUND(RAND() * 100, 2),
DATE('2020-01-01') + INTERVAL FLOOR(RAND() * 1095) DAY
FROM
(SELECT 1 UNION SELECT 2 UNION SELECT 3 UNION SELECT 4) t1,
(SELECT 1 UNION SELECT 2 UNION SELECT 3 UNION SELECT 4) t2,
(SELECT 1 UNION SELECT 2 UNION SELECT 3 UNION SELECT 4) t3,
(SELECT 1 UNION SELECT 2 UNION SELECT 3 UNION SELECT 4) t4,
(SELECT 1 UNION SELECT 2 UNION SELECT 3 UNION SELECT 4) t5,
(SELECT 1 UNION SELECT 2 UNION SELECT 3 UNION SELECT 4) t6;
2. ClickHouse表设计
在ClickHouse中创建优化过的表结构:
-- ClickHouse
CREATE DATABASE ecommerce;
-- 主表
CREATE TABLE ecommerce.orders (
id UInt32,
user_id UInt32,
product_id UInt32,
amount Float32,
order_date Date
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(order_date)
ORDER BY (order_date, id)
SETTINGS index_granularity = 8192;
-- 预聚合物化视图
CREATE MATERIALIZED VIEW ecommerce.orders_daily_mv
ENGINE = SummingMergeTree()
PARTITION BY toYYYYMM(order_date)
ORDER BY (order_date, product_id)
POPULATE AS
SELECT
order_date,
product_id,
count() AS order_count,
sum(amount) AS total_amount,
uniq(user_id) AS unique_users
FROM ecommerce.orders
GROUP BY order_date, product_id;
源代码详细实现和代码解读
高效批量迁移脚本
import pymysql
from clickhouse_driver import Client
from datetime import datetime
import time
class MySQLToClickHouseMigrator:
def __init__(self, mysql_config, ch_config):
self.mysql_config = mysql_config
self.ch_config = ch_config
self.batch_size = 10000
self.max_retries = 3
def get_mysql_connection(self):
return pymysql.connect(**self.mysql_config)
def get_clickhouse_client(self):
return Client(**self.ch_config)
def estimate_row_count(self, table):
with self.get_mysql_connection() as conn:
with conn.cursor() as cursor:
cursor.execute(f"SELECT COUNT(*) FROM {table}")
return cursor.fetchone()[0]
def migrate_table(self, table, where_clause=None):
total_rows = self.estimate_row_count(table)
print(f"Total rows to migrate: {total_rows}")
ch_client = self.get_clickhouse_client()
processed = 0
start_time = datetime.now()
query = f"SELECT * FROM {table}"
if where_clause:
query += f" WHERE {where_clause}"
with self.get_mysql_connection() as conn:
# 使用SS游标减少内存使用
conn.cursorclass = pymysql.cursors.SSCursor
with conn.cursor() as cursor:
cursor.execute(query)
batch = []
for row in cursor:
batch.append(row)
if len(batch) >= self.batch_size:
self._insert_batch(ch_client, table, batch)
processed += len(batch)
self._log_progress(processed, total_rows, start_time)
batch = []
if batch: # 处理剩余数据
self._insert_batch(ch_client, table, batch)
processed += len(batch)
duration = (datetime.now() - start_time).total_seconds()
print(f"
Migration completed. Total time: {duration:.2f} seconds")
print(f"Average speed: {processed/duration:.2f} rows/second")
def _insert_batch(self, ch_client, table, batch, retry=0):
try:
ch_client.execute(
f"INSERT INTO {table} VALUES",
batch
)
except Exception as e:
if retry < self.max_retries:
print(f"Retry {retry+1} for batch of size {len(batch)}")
time.sleep(2 ** retry) # 指数退避
self._insert_batch(ch_client, table, batch, retry+1)
else:
print(f"Failed to insert batch after {self.max_retries} retries")
raise
def _log_progress(self, processed, total, start_time):
elapsed = (datetime.now() - start_time).total_seconds()
percent = 100 * processed / total
speed = processed / elapsed if elapsed > 0 else 0
print(
f"
Processed: {processed:,}/{total:,} ({percent:.1f}%) | "
f"Speed: {speed:,.0f} rows/sec | "
f"Elapsed: {elapsed:.0f}s",
end="", flush=True
)
# 使用示例
if __name__ == "__main__":
migrator = MySQLToClickHouseMigrator(
mysql_config={
'host': 'localhost',
'user': 'root',
'password': 'mysql_password',
'db': 'ecommerce'
},
ch_config={
'host': 'localhost',
'user': 'default',
'password': 'clickhouse_password',
'database': 'ecommerce'
}
)
# 按日期分区迁移
migrator.migrate_table('orders', 'order_date >= "2022-01-01"')
代码解读与分析
连接管理:
使用上下文管理器(语句)确保数据库连接正确关闭为MySQL使用SS游标(SSCursor)流式获取数据,避免内存溢出
with
批量处理:
可配置的批量大小(),平衡内存使用和插入性能实现指数退避重试机制,提高可靠性
batch_size
进度监控:
实时显示迁移进度、速度和预计剩余时间支持条件迁移(),便于增量同步
where_clause
性能优化:
ClickHouse表使用分区和排序键优化查询性能预聚合物化视图加速常见分析查询
错误处理:
自动重试失败的批量插入清晰的错误报告和日志记录
实际应用场景
电商数据分析
将订单数据从MySQL迁移到ClickHouse后,可以:
实时分析销售趋势识别热门产品组合计算用户生命周期价值检测异常交易模式
-- 分析季度销售趋势
SELECT
toQuarter(order_date) AS quarter,
sum(amount) AS total_sales,
count() AS order_count,
total_sales / order_count AS avg_order_value
FROM ecommerce.orders
GROUP BY quarter
ORDER BY quarter;
用户行为分析
-- 计算用户留存率
WITH
first_orders AS (
SELECT user_id, min(order_date) AS first_date
FROM ecommerce.orders
GROUP BY user_id
),
cohort_size AS (
SELECT
toYYYYMM(first_date) AS cohort_month,
count() AS users
FROM first_orders
GROUP BY cohort_month
),
retained_users AS (
SELECT
toYYYYMM(fo.first_date) AS cohort_month,
toYYYYMM(o.order_date) AS order_month,
uniq(o.user_id) AS retained_count
FROM first_orders fo
JOIN ecommerce.orders o ON fo.user_id = o.user_id
WHERE o.order_date >= fo.first_date
GROUP BY cohort_month, order_month
)
SELECT
r.cohort_month,
c.users AS cohort_size,
r.order_month,
r.retained_count,
r.retained_count / c.users AS retention_rate
FROM retained_users r
JOIN cohort_size c ON r.cohort_month = c.cohort_month
ORDER BY r.cohort_month, r.order_month;
实时监控仪表板
将ClickHouse与Grafana集成,创建实时业务监控:
-- 最近7天销售指标
SELECT
toStartOfDay(order_date) AS day,
sum(amount) AS daily_sales,
uniq(user_id) AS daily_customers,
daily_sales / daily_customers AS arpu
FROM ecommerce.orders
WHERE order_date >= now() - INTERVAL 7 DAY
GROUP BY day
ORDER BY day;
工具和资源推荐
迁移工具
ClickHouse官方MySQL引擎:
CREATE TABLE mysql_orders (
id UInt32,
user_id UInt32,
product_id UInt32,
amount Float32,
order_date Date
) ENGINE = MySQL('mysql_host:3306', 'ecommerce', 'orders', 'user', 'password');
-- 然后可以复制数据
INSERT INTO orders SELECT * FROM mysql_orders;
Altinity ClickHouse MySQL迁移工具:
开源工具,专门为MySQL到ClickHouse迁移优化支持自动模式转换和并行迁移GitHub: https://github.com/Altinity/clickhouse-mysql-data-reader
Airbyte:
开源数据集成平台提供可视化界面配置MySQL到ClickHouse的管道官网: https://airbyte.com/
监控和优化工具
ClickHouse监控:
Prometheus + Grafana监控ClickHouse性能官方仪表板模板: https://grafana.com/grafana/dashboards?search=clickhouse
数据验证工具:
使用验证MySQL和ClickHouse数据一致性GitHub: https://github.com/maximdanilchenko/clickhouse-datacompare
clickhouse-datacompare
学习资源
官方文档:
ClickHouse官方文档: https://clickhouse.com/docs/en/MySQL到ClickHouse迁移指南: https://clickhouse.com/docs/en/integrations/mysql
性能优化指南:
Altinity知识库: https://altinity.com/knowledge-base/ClickHouse性能调优: https://clickhouse.com/docs/en/optimize/
未来发展趋势与挑战
发展趋势
更紧密的MySQL集成:
ClickHouse正在改进MySQL兼容性,包括SQL语法和协议支持计划支持MySQL作为外部字典的数据源
实时能力增强:
更好的Kafka集成和流处理能力物化视图的实时更新改进
云原生支持:
与Kubernetes的深度集成云服务商托管的ClickHouse服务
技术挑战
模式变更处理:
MySQL表结构变更如何同步到ClickHouse需要开发自动化工具检测和处理DDL变更
数据一致性保证:
确保迁移过程中不丢失数据处理网络分区等异常情况
大规模迁移性能:
TB级数据的快速迁移方案最小化对生产MySQL的影响
复杂数据类型支持:
MySQL的JSON类型与ClickHouse的兼容处理空间数据等特殊类型的转换
总结:学到了什么?
核心概念回顾
ClickHouse的优势:
列式存储和向量化执行带来极佳的分析性能丰富的数据处理引擎(MergeTree、Kafka、MySQL等)
迁移方案选择:
小数据量:直接导出导入中等数据量:ETL工具处理大数据量+实时需求:CDC变更捕获
性能优化关键:
批量操作优于单行操作合理设计表引擎和排序键利用物化视图预聚合数据
概念关系回顾
MySQL和ClickHouse的协作:
MySQL作为”记录系统”处理事务ClickHouse作为”分析系统”提供洞察
迁移工具链:
抽取工具获取MySQL数据转换层处理数据类型差异加载策略影响最终性能
思考题:动动小脑筋
思考题一:
假设你有一个1TB的MySQL订单表,需要迁移到ClickHouse,但只有4小时的维护窗口。你会选择哪种迁移方案?需要考虑哪些因素?
思考题二:
如何设计一个系统,在MySQL数据迁移到ClickHouse后,持续验证两端数据的一致性?有哪些技术可以实现这一点?
思考题三:
ClickHouse的物化视图与MySQL的视图有什么本质区别?为什么在分析场景下ClickHouse的物化视图更有优势?
附录:常见问题与解答
Q1: 迁移过程中MySQL和ClickHouse的表结构需要完全一致吗?
A: 不需要完全一致。ClickHouse可以只包含需要的列,且数据类型可以不同(但需兼容)。迁移工具通常支持列映射和类型转换。
Q2: 如何处理MySQL的NULL值与ClickHouse的默认值?
A: ClickHouse没有真正的NULL,但有Nullable类型。可以在迁移时将MySQL的NULL转为:
特定默认值(如0、空字符串)使用Nullable类型显式标记
Q3: 迁移过程中如何最小化对生产MySQL的影响?
A: 建议:
在从库上执行迁移使用低优先级查询分批迁移,避免长时间锁表避开业务高峰期
Q4: ClickHouse缺少某些MySQL的功能(如事务、UPDATE)怎么办?
A: 这是设计差异。解决方案:
关键事务仍在MySQL处理ClickHouse主要存储历史数据和分析结果频繁更新的数据可以使用ReplacingMergeTree引擎
扩展阅读 & 参考资料
官方文档:
ClickHouse MySQL引擎: https://clickhouse.com/docs/en/engines/table-engines/integrations/mysql数据迁移最佳实践: https://clickhouse.com/docs/en/integrations/data-ingestion
技术博客:
Altinity的MySQL到ClickHouse迁移指南: https://altinity.com/blog/migrating-data-from-mysql-to-clickhouseClickHouse实时数据管道: https://clickhouse.com/blog/real-time-data-pipelines-with-clickhouse
书籍:
《ClickHouse实战》(Aleksandra Donko, 2022)《高性能ClickHouse》(Yegor Andreenko, 2021)
视频教程:
ClickHouse官方YouTube频道: https://www.youtube.com/c/ClickHouseDB数据迁移实战演示: https://youtu.be/7n5m8u6s_qc