Logo
数据迁移

数据迁移 #

语法描述类型
ALTER MIGRATION RULE定义数据迁移规则RAL
SHOW MIGRATION RULE查看数据迁移规则RAL
SHOW MIGRATION LIST查看数据迁移规则列表RAL
SHOW MIGRATION STATUS jobId查看数据迁移作业状态RAL
MIGRATE TABLE x INTO y启动迁移作业RAL
START MIGRATION jobId开启停止的作业RAL
STOP MIGRATION jobId停止作业RAL
CHECK MIGRATION jobId检查作业数据一致性RAL
START MIGRATION CHECK jobId开启停止的数据一致性校验作业RAL
STOP MIGRATION CHECK jobId停止数据一致性校验作业RAL
SHOW MIGRATION CHECK STATUS jobId查看数据一致性校验进度RAL
COMMIT MIGRATION jobId提交作业RAL
ROLLBACK MIGRATION jobId撤销作业。注意:该语句会 清理目标端表,请谨慎操作RAL
REGISTER MIGRATION SOURCE STORAGE UNIT注册数据迁移任务存储单元RAL
SHOW MIGRATION CHECK ALGORITHMS查看数据迁移算法RAL

1. 定义数据迁移作业 #

ALTER RESHARDING RULE ();

参数说明

ALTER RESHARDING RULE (
READ( -- 数据读取配置。如果不配置则部分参数默认生效。
  WORKER_THREAD=40, -- 从源端摄取全量数据的线程池大小。如果不配置则使用默认值。
  BATCH_SIZE=1000, -- 一次查询操作返回的最大记录数。如果不配置则使用默认值。
  SHARDING_SIZE=10000000, -- 全量数据分片大小。如果不配置则使用默认值。
  RATE_LIMITER ( -- 限流算法。如果不配置则不限流。
  TYPE( -- 算法类型。可选项:QPS
  NAME='QPS',
  PROPERTIES( -- 算法属性
  'qps'='500'
  )))
),
WRITE( -- 数据写入配置。如果不配置则部分参数默认生效。
  WORKER_THREAD=40, -- 数据写入到目标端的线程池大小。如果不配置则使用默认值。
  BATCH_SIZE=1000, -- 一次批量写入操作的最大记录数。如果不配置则使用默认值。
  RATE_LIMITER ( -- 限流算法。如果不配置则不限流。
  TYPE( -- 算法类型。可选项:TPS
  NAME='TPS',
  PROPERTIES( -- 算法属性
  'tps'='2000'
  )))
),
STREAM_CHANNEL ( -- 数据通道,连接生产者和消费者,用于 read 和 write 环节。如果不配置则默认使用 MEMORY 类型。
TYPE( -- 算法类型。可选项:MEMORY
NAME='MEMORY',
PROPERTIES( -- 算法属性
'block-queue-size'='10000' -- 属性:阻塞队列大小
)))
);

默认配置:

+‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑+‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑+‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑+ 
| read | write | stream_channel | 
+‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑+‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑+‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑+ 
| {"workerThread":40,"batchSize":1000,"shardingSize":10000000} | {"workerThread":40,"batchSize":1000} | {"type":"MEMORY", "props":{"block‑queue‑size":10000}} |
 +‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑+‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑+‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑+

示例

ALTER MIGRATION RULE (READ(RATE_LIMITER (TYPE(NAME='QPS’,PROPERTIES(‘qps'='5000’)))))

2. 查看数据迁移规则 #

SHOW MIGRATION RULE

参数说明

示例

SHOW MIGRATION RULE;

输出说明

列名说明
read数据读取配置。如果不配置则部分参数默认生效
write数据写入配置。如果不配置则部分参数默认生效。
stream_channel数据通道,连接生产者和消费者,用于 read 和 write 环节。如果不配置则默认使用 MEMORY 类型。

3. 查看数据迁移规则列表 #

SHOW MIGRATION LIST;

参数说明

示例

+-------------------------------------+---------+----------------------+--------+---------------------+-----------+
| id                                  | tables  | sharding_total_count | active | create_time         | stop_time |
+-------------------------------------+---------+----------------------+--------+---------------------+-----------+
|j01016e501b498ed1bdb2c373a2e85e2529a6| t_order | 1                    | true   | 2022-08-22 16:37:01 | NULL      |
+-------------------------------------+---------+----------------------+--------+---------------------+-----------+

输出说明

列名说明
Id迁移任务ID
tables被迁移的表名
active任务是否活跃 true 或 false
create_time任务创建时间
stop_time任务停止时间,为 NULL 表示任务未停止

4. 查看数据迁移作业状态 #

SHOW MIGRATION STATUS jobId;

参数说明

参数名说明
jobId数据迁移作业id

示例

SHOW MIGRATION STATUS 'j01016e501b498ed1bdb2c373a2e85e2529a6';
+------+-------------+--------------------------+--------+-------------------------+-------------------------------+--------------------------+---------------+
| item | data_source | status                   | active | processed_records_count | inventory_finished_percentage | incremental_idle_seconds | error_message |
+------+-------------+--------------------------+--------+-------------------------+-------------------------------+--------------------------+---------------+
| 0    | ds_0        | EXECUTE_INCREMENTAL_TASK | true   | 6                       | 100                           | 81                       |               |
+------+-------------+--------------------------+--------+-------------------------+-------------------------------+--------------------------+---------------+

输出说明

列名说明
item迁移任务内序号
data_source数据存储单元名称
status任务状态,详细参考状态描述
active任务是否活跃 true 或 false
processed_records_count处理完成的记录数
inventory_finished_percentage全量数据处理完成进度(百分比)
incremental_idle_seconds增量同步任务不活跃时间(单位:秒)
error_message错误信息

状态描述

取值描述
PREPARING准备中
RUNNING运行中
EXECUTE_INVENTORY_TASK全量迁移中
EXECUTE_INCREMENTAL_TASK增量迁移中
FINISHED已完成(整个流程完成了,新规则已生效)
PREPARING_FAILURE准备阶段失败
EXECUTE_INVENTORY_TASK_FAILURE全量迁移阶段失败
EXECUTE_INCREMENTAL_TASK_FAILURE增量迁移阶段失败

5. 启动数据迁移 #

MIGRATE TABLE schema_name.table_name INTO schema_name.table_name;

示例

存储节点为 MySQL 启动数据迁移。

MIGRATE TABLE ds_0.t_order INTO t_order;

存储节点为MySQL或者指定目标端逻辑库:

MIGRATE TABLE ds_0.t_order INTO sharding_db.t_order;

6. 开启已经停止的作业 #

START MIGRATION jobId

参数说明

参数名说明
jobId作业id

示例

指定 jobid 启动已经停止的 job。

START MIGRATION 'j01016e501b498ed1bdb2c373a2e85e2529a6';

7. 停止的作业 #

STOP MIGRATION jobId

参数说明

参数名说明
jobId作业id

示例

指定jobId启动已经停止的job。

STOP MIGRATION 'j01016e501b498ed1bdb2c373a2e85e2529a6';

8. 执行数据一致性校验 #

CHECK MIGRATION jobId BY TYPE (NAME='CRC32_MATCH');
参数名说明
jobId作业id

示例

指定 jobId 指定校验算法执行数据一致性校验

CHECK MIGRATION 'j01016e501b498ed1bdb2c373a2e85e2529a6' BY TYPE (NAME='CRC32_MATCH');
Query OK, 0 rows affected (0.09 sec)

指定 jobId 使用默认校验算法执行数据一致性校验

CHECK MIGRATION 'j01016e501b498ed1bdb2c373a2e85e2529a6';
Query OK, 0 rows affected (0.09 sec)

9. 开启停止的数据一致校验作业 #

START MIGRATION CHECK jobId
参数名说明
jobId作业 id

示例

指定 jobId 指定校验算法执行数据一致性校验

START MIGRATION CHECK  'j01016e501b498ed1bdb2c373a2e85e2529a6' ;
Query OK, 0 rows affected (0.09 sec)

10. 停止数据一致校验作业 #

STOP MIGRATION CHECK jobId
参数名说明
jobId作业id

示例

指定 jobId 指定校验算法执行数据一致性校验

STOP MIGRATION CHECK  'j01016e501b498ed1bdb2c373a2e85e2529a6' ;
Query OK, 0 rows affected (0.09 sec)

11. 查看数据一致性校验进度 #

SHOW MIGRATION CHECK STATUS jobId
参数名说明
jobId作业id

示例

指定 jobId 指查看数据一致性校验进度

SHOW MIGRATION CHECK STATUS 'j01016e501b498ed1bdb2c373a2e85e2529a6';
+---------+--------+---------------------+-------------------+-------------------------+-------------------------+------------------+---------------+
| tables  | result | finished_percentage | remaining_seconds | check_begin_time        | check_end_time          | duration_seconds | error_message |
+---------+--------+---------------------+-------------------+-------------------------+-------------------------+------------------+---------------+
| t_order | true   | 100                 | 0                 | 2022-10-13 11:18:15.171 | 2022-10-13 11:18:15.878 | 0                |               |
+---------+--------+---------------------+-------------------+-------------------------+-------------------------+------------------+---------------+

12. 提交作业 #

COMMIT MIGRATION jobId; 

参数说明

参数名说明
jobId作业id

示例

指定 jobId 提交作业。

COMMIT MIGRATION 'j01016e501b498ed1bdb2c373a2e85e2529a6';

13. 撤销作业 #

ROLLBACK MIGRATION jobId;
参数名说明
jobId作业id

注意:该语句会 清理目标端表,请谨慎操作。

示例

指定 jobId 回滚 job。

ROLLBACK MIGRATION 'j01016e501b498ed1bdb2c373a2e85e2529a6';

14. 注册数据迁移任务存储单元 #

REGISTER MIGRATION SOURCE STORAGE UNIT 

参数说明

参考分片表部分

示例

REGISTER MIGRATION SOURCE STORAGE UNIT ds_0 (
    URL="jdbc:mysql://127.0.0.1:3306/migration_ds_0?serverTimezone=UTC&useSSL=false",
    USER="root",
    PASSWORD="root",
    PROPERTIES("minPoolSize"="1","maxPoolSize"="20","idleTimeout"="60000")
);

15. 查看数据迁移算法 #

SHOW MIGRATION CHECK ALGORITHMS;

参数说明

示例

SHOW MIGRATION CHECK ALGORITHMS;
+-------------+--------------------------------------------------------------+----------------------------+
| type        | supported_database_types                                     | description                |
+-------------+--------------------------------------------------------------+----------------------------+
| CRC32_MATCH | MySQL                                                        | Match CRC32 of records.    |
| DATA_MATCH  | SQL92,MySQL,MariaDB,PostgreSQL,openGauss,Oracle,SQLServer,H2 | Match raw data of records. |
+-------------+--------------------------------------------------------------+----------------------------+