数据迁移 #
| 语法 | 描述 | 类型 |
|---|---|---|
| ALTER MIGRATION RULE | 定义数据迁移规则 | RAL |
| SHOW MIGRATION RULE | 查看数据迁移规则 | RAL |
| SHOW MIGRATION LIST | 查看数据迁移规则列表 | RAL |
| SHOW MIGRATION STATUS jobId | 查看数据迁移作业状态 | RAL |
| MIGRATE TABLE x INTO y | 启动迁移作业 | RAL |
| START MIGRATION jobId | 开启停止的作业 | RAL |
| STOP MIGRATION jobId | 停止作业 | RAL |
| CHECK MIGRATION jobId | 检查作业数据一致性 | RAL |
| START MIGRATION CHECK jobId | 开启停止的数据一致性校验作业 | RAL |
| STOP MIGRATION CHECK jobId | 停止数据一致性校验作业 | RAL |
| SHOW MIGRATION CHECK STATUS jobId | 查看数据一致性校验进度 | RAL |
| COMMIT MIGRATION jobId | 提交作业 | RAL |
| ROLLBACK MIGRATION jobId | 撤销作业。注意:该语句会 清理目标端表,请谨慎操作 | RAL |
| REGISTER MIGRATION SOURCE STORAGE UNIT | 注册数据迁移任务存储单元 | RAL |
| SHOW MIGRATION CHECK ALGORITHMS | 查看数据迁移算法 | RAL |
1. 定义数据迁移作业 #
ALTER RESHARDING RULE ();
参数说明
ALTER RESHARDING RULE (
READ( -- 数据读取配置。如果不配置则部分参数默认生效。
WORKER_THREAD=40, -- 从源端摄取全量数据的线程池大小。如果不配置则使用默认值。
BATCH_SIZE=1000, -- 一次查询操作返回的最大记录数。如果不配置则使用默认值。
SHARDING_SIZE=10000000, -- 全量数据分片大小。如果不配置则使用默认值。
RATE_LIMITER ( -- 限流算法。如果不配置则不限流。
TYPE( -- 算法类型。可选项:QPS
NAME='QPS',
PROPERTIES( -- 算法属性
'qps'='500'
)))
),
WRITE( -- 数据写入配置。如果不配置则部分参数默认生效。
WORKER_THREAD=40, -- 数据写入到目标端的线程池大小。如果不配置则使用默认值。
BATCH_SIZE=1000, -- 一次批量写入操作的最大记录数。如果不配置则使用默认值。
RATE_LIMITER ( -- 限流算法。如果不配置则不限流。
TYPE( -- 算法类型。可选项:TPS
NAME='TPS',
PROPERTIES( -- 算法属性
'tps'='2000'
)))
),
STREAM_CHANNEL ( -- 数据通道,连接生产者和消费者,用于 read 和 write 环节。如果不配置则默认使用 MEMORY 类型。
TYPE( -- 算法类型。可选项:MEMORY
NAME='MEMORY',
PROPERTIES( -- 算法属性
'block-queue-size'='10000' -- 属性:阻塞队列大小
)))
);
默认配置:
+‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑+‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑+‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑+
| read | write | stream_channel |
+‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑+‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑+‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑+
| {"workerThread":40,"batchSize":1000,"shardingSize":10000000} | {"workerThread":40,"batchSize":1000} | {"type":"MEMORY", "props":{"block‑queue‑size":10000}} |
+‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑+‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑+‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑+
示例
ALTER MIGRATION RULE (READ(RATE_LIMITER (TYPE(NAME='QPS’,PROPERTIES(‘qps'='5000’)))))
2. 查看数据迁移规则 #
SHOW MIGRATION RULE
参数说明
无
示例
SHOW MIGRATION RULE;
输出说明
| 列名 | 说明 |
|---|---|
| read | 数据读取配置。如果不配置则部分参数默认生效 |
| write | 数据写入配置。如果不配置则部分参数默认生效。 |
| stream_channel | 数据通道,连接生产者和消费者,用于 read 和 write 环节。如果不配置则默认使用 MEMORY 类型。 |
3. 查看数据迁移规则列表 #
SHOW MIGRATION LIST;
参数说明
无
示例
+-------------------------------------+---------+----------------------+--------+---------------------+-----------+
| id | tables | sharding_total_count | active | create_time | stop_time |
+-------------------------------------+---------+----------------------+--------+---------------------+-----------+
|j01016e501b498ed1bdb2c373a2e85e2529a6| t_order | 1 | true | 2022-08-22 16:37:01 | NULL |
+-------------------------------------+---------+----------------------+--------+---------------------+-----------+
输出说明
| 列名 | 说明 |
|---|---|
| Id | 迁移任务ID |
| tables | 被迁移的表名 |
| active | 任务是否活跃 true 或 false |
| create_time | 任务创建时间 |
| stop_time | 任务停止时间,为 NULL 表示任务未停止 |
4. 查看数据迁移作业状态 #
SHOW MIGRATION STATUS jobId;
参数说明
| 参数名 | 说明 |
|---|---|
| jobId | 数据迁移作业id |
示例
SHOW MIGRATION STATUS 'j01016e501b498ed1bdb2c373a2e85e2529a6';
+------+-------------+--------------------------+--------+-------------------------+-------------------------------+--------------------------+---------------+
| item | data_source | status | active | processed_records_count | inventory_finished_percentage | incremental_idle_seconds | error_message |
+------+-------------+--------------------------+--------+-------------------------+-------------------------------+--------------------------+---------------+
| 0 | ds_0 | EXECUTE_INCREMENTAL_TASK | true | 6 | 100 | 81 | |
+------+-------------+--------------------------+--------+-------------------------+-------------------------------+--------------------------+---------------+
输出说明
| 列名 | 说明 |
|---|---|
| item | 迁移任务内序号 |
| data_source | 数据存储单元名称 |
| status | 任务状态,详细参考状态描述 |
| active | 任务是否活跃 true 或 false |
| processed_records_count | 处理完成的记录数 |
| inventory_finished_percentage | 全量数据处理完成进度(百分比) |
| incremental_idle_seconds | 增量同步任务不活跃时间(单位:秒) |
| error_message | 错误信息 |
状态描述
| 取值 | 描述 |
|---|---|
| PREPARING | 准备中 |
| RUNNING | 运行中 |
| EXECUTE_INVENTORY_TASK | 全量迁移中 |
| EXECUTE_INCREMENTAL_TASK | 增量迁移中 |
| FINISHED | 已完成(整个流程完成了,新规则已生效) |
| PREPARING_FAILURE | 准备阶段失败 |
| EXECUTE_INVENTORY_TASK_FAILURE | 全量迁移阶段失败 |
| EXECUTE_INCREMENTAL_TASK_FAILURE | 增量迁移阶段失败 |
5. 启动数据迁移 #
MIGRATE TABLE schema_name.table_name INTO schema_name.table_name;
示例
存储节点为 MySQL 启动数据迁移。
MIGRATE TABLE ds_0.t_order INTO t_order;
存储节点为MySQL或者指定目标端逻辑库:
MIGRATE TABLE ds_0.t_order INTO sharding_db.t_order;
6. 开启已经停止的作业 #
START MIGRATION jobId
参数说明
| 参数名 | 说明 |
|---|---|
| jobId | 作业id |
示例
指定 jobid 启动已经停止的 job。
START MIGRATION 'j01016e501b498ed1bdb2c373a2e85e2529a6';
7. 停止的作业 #
STOP MIGRATION jobId
参数说明
| 参数名 | 说明 |
|---|---|
| jobId | 作业id |
示例
指定jobId启动已经停止的job。
STOP MIGRATION 'j01016e501b498ed1bdb2c373a2e85e2529a6';
8. 执行数据一致性校验 #
CHECK MIGRATION jobId BY TYPE (NAME='CRC32_MATCH');
| 参数名 | 说明 |
|---|---|
| jobId | 作业id |
示例
指定 jobId 指定校验算法执行数据一致性校验
CHECK MIGRATION 'j01016e501b498ed1bdb2c373a2e85e2529a6' BY TYPE (NAME='CRC32_MATCH');
Query OK, 0 rows affected (0.09 sec)
指定 jobId 使用默认校验算法执行数据一致性校验
CHECK MIGRATION 'j01016e501b498ed1bdb2c373a2e85e2529a6';
Query OK, 0 rows affected (0.09 sec)
9. 开启停止的数据一致校验作业 #
START MIGRATION CHECK jobId
| 参数名 | 说明 |
|---|---|
| jobId | 作业 id |
示例
指定 jobId 指定校验算法执行数据一致性校验
START MIGRATION CHECK 'j01016e501b498ed1bdb2c373a2e85e2529a6' ;
Query OK, 0 rows affected (0.09 sec)
10. 停止数据一致校验作业 #
STOP MIGRATION CHECK jobId
| 参数名 | 说明 |
|---|---|
| jobId | 作业id |
示例
指定 jobId 指定校验算法执行数据一致性校验
STOP MIGRATION CHECK 'j01016e501b498ed1bdb2c373a2e85e2529a6' ;
Query OK, 0 rows affected (0.09 sec)
11. 查看数据一致性校验进度 #
SHOW MIGRATION CHECK STATUS jobId
| 参数名 | 说明 |
|---|---|
| jobId | 作业id |
示例
指定 jobId 指查看数据一致性校验进度
SHOW MIGRATION CHECK STATUS 'j01016e501b498ed1bdb2c373a2e85e2529a6';
+---------+--------+---------------------+-------------------+-------------------------+-------------------------+------------------+---------------+
| tables | result | finished_percentage | remaining_seconds | check_begin_time | check_end_time | duration_seconds | error_message |
+---------+--------+---------------------+-------------------+-------------------------+-------------------------+------------------+---------------+
| t_order | true | 100 | 0 | 2022-10-13 11:18:15.171 | 2022-10-13 11:18:15.878 | 0 | |
+---------+--------+---------------------+-------------------+-------------------------+-------------------------+------------------+---------------+
12. 提交作业 #
COMMIT MIGRATION jobId;
参数说明
| 参数名 | 说明 |
|---|---|
| jobId | 作业id |
示例
指定 jobId 提交作业。
COMMIT MIGRATION 'j01016e501b498ed1bdb2c373a2e85e2529a6';
13. 撤销作业 #
ROLLBACK MIGRATION jobId;
| 参数名 | 说明 |
|---|---|
| jobId | 作业id |
注意:该语句会 清理目标端表,请谨慎操作。
示例
指定 jobId 回滚 job。
ROLLBACK MIGRATION 'j01016e501b498ed1bdb2c373a2e85e2529a6';
14. 注册数据迁移任务存储单元 #
REGISTER MIGRATION SOURCE STORAGE UNIT
参数说明
参考分片表部分
示例
REGISTER MIGRATION SOURCE STORAGE UNIT ds_0 (
URL="jdbc:mysql://127.0.0.1:3306/migration_ds_0?serverTimezone=UTC&useSSL=false",
USER="root",
PASSWORD="root",
PROPERTIES("minPoolSize"="1","maxPoolSize"="20","idleTimeout"="60000")
);
15. 查看数据迁移算法 #
SHOW MIGRATION CHECK ALGORITHMS;
参数说明
无
示例
SHOW MIGRATION CHECK ALGORITHMS;
+-------------+--------------------------------------------------------------+----------------------------+
| type | supported_database_types | description |
+-------------+--------------------------------------------------------------+----------------------------+
| CRC32_MATCH | MySQL | Match CRC32 of records. |
| DATA_MATCH | SQL92,MySQL,MariaDB,PostgreSQL,openGauss,Oracle,SQLServer,H2 | Match raw data of records. |
+-------------+--------------------------------------------------------------+----------------------------+