Logo
数据迁移

数据迁移 #

语法描述类型
ALTER MIGRATION RULE定义数据迁移规则RAL
SHOW MIGRATION RULE查看数据迁移规则RAL
SHOW MIGRATION LIST查看数据迁移规则列表RAL
SHOW MIGRATION STATUS jobId查看数据迁移作业状态RAL
MIGRATE TABLE x INTO y启动迁移作业RAL
START MIGRATION jobId开启停止的作业RAL
STOP MIGRATION jobId停止作业RAL
SHOW MIGRATION CHECK ALGORITHMS查看一致性校验算法RAL
CHECK MIGRATION jobId检查作业数据一致性RAL
START MIGRATION CHECK jobId开启停止的数据一致性校验作业RAL
STOP MIGRATION CHECK jobId停止数据一致性校验作业RAL
SHOW MIGRATION CHECK STATUS jobId查看数据一致性校验进度RAL
COMMIT MIGRATION jobId提交作业RAL
ROLLBACK MIGRATION jobId撤销作业。注意:该语句会 清理目标端表,请谨慎操作RAL
REGISTER MIGRATION SOURCE STORAGE UNIT注册数据迁移源端存储单元RAL
SHOW MIGRATION SOURCE STORAGE UNITS查询已经注册的数据迁移源端存储单元RAL
UNREGISTER MIGRATION SOURCE STORAGE UNIT移除数据迁移源端存储单元RAL
UPDATE MIGRATION jobId OFFSET POSITION更新数据迁移增量进度RAL

1. 定义数据迁移作业 #

ALTER MIGRATION RULE ();

因 migration rule 具有默认值,无需创建,仅提供 ALTER 语句。

完整配置 DistSQL 示例

ALTER MIGRATION RULE (
READ(
  WORKER_THREAD=20,
  BATCH_SIZE=1000,
  SHARDING_SIZE=10000000,
  RATE_LIMITER (TYPE(NAME='QPS',PROPERTIES('qps'='500')))
),
WRITE(
  WORKER_THREAD=20,
  BATCH_SIZE=1000,
  RATE_LIMITER (TYPE(NAME='TPS',PROPERTIES('tps'='2000')))
),
STREAM_CHANNEL (TYPE(NAME='MEMORY',PROPERTIES('block-queue-size'='2000')))
);

配置说明

ALTER MIGRATION RULE (
READ( -- 数据读取配置。如果不配置则部分参数默认生效。
  WORKER_THREAD=20, -- 从源端摄取全量数据的线程池大小。如果不配置则使用默认值。
  BATCH_SIZE=1000, -- 一次查询操作返回的最大记录数。如果不配置则使用默认值。
  SHARDING_SIZE=10000000, -- 全量数据分片大小。如果不配置则使用默认值。
  RATE_LIMITER ( -- 限流算法。如果不配置则不限流。
  TYPE( -- 算法类型。可选项:QPS
  NAME='QPS',
  PROPERTIES( -- 算法属性
  'qps'='500'
  )))
),
WRITE( -- 数据写入配置。如果不配置则部分参数默认生效。
  WORKER_THREAD=20, -- 数据写入到目标端的线程池大小。如果不配置则使用默认值。
  BATCH_SIZE=1000, -- 一次批量写入操作的最大记录数。如果不配置则使用默认值。
  RATE_LIMITER ( -- 限流算法。如果不配置则不限流。
  TYPE( -- 算法类型。可选项:TPS
  NAME='TPS',
  PROPERTIES( -- 算法属性
  'tps'='2000'
  )))
),
STREAM_CHANNEL ( -- 数据通道,连接生产者和消费者,用于 read 和 write 环节。如果不配置则默认使用 MEMORY 类型。
TYPE( -- 算法类型。可选项:MEMORY
NAME='MEMORY',
PROPERTIES( -- 算法属性
'block-queue-size'='2000' -- 属性:阻塞队列大小
)))
);

默认配置:

+‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑+‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑+‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑+ 
| read | write | stream_channel | 
+‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑+‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑+‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑+ 
| {"workerThread":40,"batchSize":1000,"shardingSize":10000000} | {"workerThread":40,"batchSize":1000} | {"type":"MEMORY", "props":{"block‑queue‑size":10000}} |
 +‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑+‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑+‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑+

示例

ALTER MIGRATION RULE (READ(RATE_LIMITER (TYPE(NAME='QPS’,PROPERTIES(‘qps'='5000’)))))

2. 查看数据迁移规则 #

SHOW MIGRATION RULE

参数说明

示例

SHOW MIGRATION RULE;

输出说明

列名说明
read数据读取配置。如果不配置则部分参数默认生效
write数据写入配置。如果不配置则部分参数默认生效。
stream_channel数据通道,连接生产者和消费者,用于 read 和 write 环节。如果不配置则默认使用 MEMORY 类型。

3. 查看数据迁移规则列表 #

SHOW MIGRATION LIST;

参数说明

示例

+-------------------------------------+---------+----------------------+--------+---------------------+-----------+
| id                                  | tables  | sharding_total_count | active | create_time         | stop_time |
+-------------------------------------+---------+----------------------+--------+---------------------+-----------+
|j01016e501b498ed1bdb2c373a2e85e2529a6| t_order | 1                    | true   | 2022-08-22 16:37:01 | NULL      |
+-------------------------------------+---------+----------------------+--------+---------------------+-----------+

输出说明

列名说明
Id迁移任务ID
tables被迁移的表名
active任务是否活跃 true 或 false
create_time任务创建时间
stop_time任务停止时间,为 NULL 表示任务未停止

4. 查看数据迁移作业状态 #

SHOW MIGRATION STATUS jobId;

参数说明

参数名说明
jobId数据迁移作业id

示例

SHOW MIGRATION STATUS 'j01016e501b498ed1bdb2c373a2e85e2529a6';
+------+-------------+--------------------------+--------+-------------------------+-------------------------------+--------------------------+---------------+
| item | data_source | status                   | active | processed_records_count | inventory_finished_percentage | incremental_idle_seconds | error_message |
+------+-------------+--------------------------+--------+-------------------------+-------------------------------+--------------------------+---------------+
| 0    | ds_0        | EXECUTE_INCREMENTAL_TASK | true   | 6                       | 100                           | 81                       |               |
+------+-------------+--------------------------+--------+-------------------------+-------------------------------+--------------------------+---------------+

输出说明

列名说明
item迁移任务内序号
data_source数据存储单元名称
status任务状态,详细参考状态描述
active任务是否活跃 true 或 false
processed_records_count处理完成的记录数
inventory_finished_percentage全量数据处理完成进度(百分比)
incremental_idle_seconds增量同步任务不活跃时间(单位:秒)
error_message错误信息

状态说明

取值描述
PREPARING准备中
RUNNING运行中
EXECUTE_INVENTORY_TASK全量迁移中
EXECUTE_INCREMENTAL_TASK增量迁移中
FINISHED已完成(整个流程完成了,新规则已生效)
PREPARING_FAILURE准备阶段失败
EXECUTE_INVENTORY_TASK_FAILURE全量迁移阶段失败
EXECUTE_INCREMENTAL_TASK_FAILURE增量迁移阶段失败

5. 启动数据迁移 #

MIGRATE TABLE source_storage_unit.source_schema_name.source_table_name INTO target_database_name.target_table_name (PROPERTIES('column-exclude-list'='column1,column2'));

参数说明

参数名说明
source_storage_unit数据源存储单元名称。需要通过REGISTER MIGRATION SOURCE STORAGE UNIT提前注册
source_schema_name源端 schema。可选
source_table_name源端表名
target_database_name目标端逻辑库名。可选。不指定的话使用当前逻辑库名
target_table_name目标端表名
PROPERTIES数据迁移属性。可选

数据迁移可选属性:

属性名称数据类型默认值说明
column-exclude-listString需要排除的字段名。多个字段使用逗号分隔

示例

存储节点为 MySQL,迁移到当前逻辑库:

MIGRATE TABLE ds_0.t_order INTO t_order;

存储节点为 PostgreSQL,迁移到当前逻辑库:

MIGRATE TABLE ds_0.public.t_order INTO t_order;

存储节点为 MySQL,迁移到指定逻辑库:

MIGRATE TABLE ds_0.t_order INTO sharding_db.t_order;

同时迁移多张表,多张表使用逗号分隔:

MIGRATE TABLE ds_0.t_order INTO sharding_db.t_order,ds_0.t_order_item INTO sharding_db.t_order_item;

t_order 表排除部分字段,t_order_item 表迁移所有字段:

MIGRATE TABLE ds_0.t_order INTO sharding_db.t_order (PROPERTIES('column-exclude-list'='column1,column2')),ds_0.t_order_item INTO sharding_db.t_order_item;

6. 开启已经停止的作业 #

START MIGRATION jobId

参数说明

参数名说明
jobId作业id

示例

指定 jobid 启动已经停止的 job。

START MIGRATION 'j01016e501b498ed1bdb2c373a2e85e2529a6';

7. 停止的作业 #

STOP MIGRATION jobId

参数说明

参数名说明
jobId作业id

示例

指定jobId启动已经停止的job。

STOP MIGRATION 'j01016e501b498ed1bdb2c373a2e85e2529a6';

8. 查看数据一致性算法 #

一致性校验算法选择请参考 数据一致性校验算法

SHOW MIGRATION CHECK ALGORITHMS;

示例

SHOW MIGRATION CHECK ALGORITHMS;
+----------------------------+---------------------------------------------+-----------------------------------------------------------------------------------------------+----------------------------------------------+
| type                       | type_aliases                                | supported_database_types                                                                      | description                                  |
+----------------------------+---------------------------------------------+-----------------------------------------------------------------------------------------------+----------------------------------------------+
| CRC32_MATCH                |                                             | MySQL,MariaDB,H2,Aurora                                                                       | Match CRC32 of records.                      |
| DATA_MATCH                 |                                             | SQL92,MySQL,PostgreSQL,openGauss,Oracle,SQLServer,MariaDB,H2,Hive,Presto,Aurora,DM,KingbaseES | Match raw data of records.                   |
| SphereEx:BATCH_CRC32_MATCH | SphereEx:DATA_CONSISTENCY_CHECK_CRC32_MATCH | MySQL,MariaDB,H2,Aurora                                                                       | Match CRC32 of records (Commercial edition). |
| SphereEx:DATA_DIFF         |                                             | SQL92,MySQL,PostgreSQL,openGauss,Oracle,SQLServer,MariaDB,H2,Hive,Presto,Aurora,DM,KingbaseES | Diff raw data of records.                    |
+----------------------------+---------------------------------------------+-----------------------------------------------------------------------------------------------+----------------------------------------------+

输出说明

  • type : 算法名
  • type_aliases : 算法别名
  • supported_database_types : 算法支持哪些数据库
  • description : 算法说明

9. 执行数据一致性校验 #

CHECK MIGRATION jobId BY TYPE (NAME='algorithmType');
参数名说明
jobId作业id
algorithmType一致性校验算法类型

示例

指定 jobId 指定校验算法执行数据一致性校验

CHECK MIGRATION 'j01016e501b498ed1bdb2c373a2e85e2529a6' BY TYPE (NAME='CRC32_MATCH');

CHECK MIGRATION 'j01016e501b498ed1bdb2c373a2e85e2529a6' BY TYPE (NAME='DATA_MATCH', PROPERTIES('chunk-size'='1000'));

CHECK MIGRATION 'j01016e501b498ed1bdb2c373a2e85e2529a6' BY TYPE (NAME='SphereEx:BATCH_CRC32_MATCH');

CHECK MIGRATION 'j01016e501b498ed1bdb2c373a2e85e2529a6' BY TYPE (NAME='SphereEx:DATA_DIFF', PROPERTIES('diff-storage-type'='DATABASE','chunk-size'='1000','diff-storage-unit-name'='ds_2','diff-table-name'='spex_cnschk_1','incremental-idle-seconds-threshold'='30'));

10. 开启停止的数据一致校验作业 #

START MIGRATION CHECK jobId
参数名说明
jobId作业 id

示例

指定 jobId 指定校验算法执行数据一致性校验

START MIGRATION CHECK  'j01016e501b498ed1bdb2c373a2e85e2529a6' ;
Query OK, 0 rows affected (0.09 sec)

11. 停止数据一致校验作业 #

STOP MIGRATION CHECK jobId
参数名说明
jobId作业id

示例

指定 jobId 指定校验算法执行数据一致性校验

STOP MIGRATION CHECK  'j01016e501b498ed1bdb2c373a2e85e2529a6' ;
Query OK, 0 rows affected (0.09 sec)

12. 查看数据一致性校验进度 #

SHOW MIGRATION CHECK STATUS jobId
参数名说明
jobId作业id

示例

指定 jobId 指查看数据一致性校验进度

show migration check status j0102p00009e6221cef2c4467323e205a1e4d1604b;
+-------------------+--------+---------------------+---------------------+-------------------+-------------------------+----------------+------------------+---------------+
| tables            | result | check_failed_tables | inventory_finished_percentage | inventory_remaining_seconds | check_begin_time        | check_end_time | duration_seconds | error_message |
+-------------------+--------+---------------------+---------------------+-------------------+-------------------------+----------------+------------------+---------------+
| ds_0.t_order_copy |        |                     | 0                   | 0                 | 2023-07-26 18:57:37.431 |                | 0                |               |
+-------------------+--------+---------------------+---------------------+-------------------+-------------------------+----------------+------------------+---------------+

输出说明

  • tables : 表名
  • result : 校验结果
  • check_failed_tables : 校验失败的表
  • inventory_finished_percentage : 存量数据校验完成百分比。由于计算基数是历史值,并且可能是估算结果,所以完成百分比可能存在一定误差
  • inventory_remaining_seconds : 存量数据校验剩余时间(单位:秒)。由于计算基数是历史值,并且可能是估算结果,所以剩余时间可能存在一定误差
  • check_begin_time : 校验开始时间
  • check_end_time : 校验结束时间
  • duration_seconds : 校验耗时(单位:秒)
  • error_message : 错误信息

13. 提交作业 #

COMMIT MIGRATION jobId; 

参数说明

参数名说明
jobId作业id

示例

指定 jobId 提交作业。

COMMIT MIGRATION 'j01016e501b498ed1bdb2c373a2e85e2529a6';

14. 撤销作业 #

ROLLBACK MIGRATION jobId;
参数名说明
jobId作业id

注意:该语句会 清理目标端表,请谨慎操作。

示例

指定 jobId 回滚 job。

ROLLBACK MIGRATION 'j01016e501b498ed1bdb2c373a2e85e2529a6';

15. 注册数据迁移源端存储单元 #

REGISTER MIGRATION SOURCE STORAGE UNIT 

参数说明

参考存储节点部分

示例

REGISTER MIGRATION SOURCE STORAGE UNIT ds_0 (
    URL="jdbc:mysql://127.0.0.1:3306/migration_ds_0?serverTimezone=UTC&useSSL=false",
    USER="root",
    PASSWORD="root",
    PROPERTIES("minPoolSize"="1","maxPoolSize"="20","idleTimeout"="60000")
);

16. 查询已经注册的数据迁移源端存储单元 #

示例

SHOW MIGRATION SOURCE STORAGE UNITS;

17. 移除数据迁移源端存储单元 #

UNREGISTER MIGRATION SOURCE STORAGE UNIT storageUnitName;
参数名说明
storageUnitName源端存储单元。存在多个使用逗号分隔

示例

UNREGISTER MIGRATION SOURCE STORAGE UNIT ds_0;

18. 更新数据迁移增量进度 #

UPDATE MIGRATION jobId OFFSET POSITION('storageUnitName'='position');
参数名说明
storageUnitName存储单元名称
position增量进度位点

示例

更新 MySQL 进度的格式是 file#pos,写法如下

UPDATE MIGRATION j01016e501b498ed1bdb2c373a2e85e2529a6 OFFSET POSITION('ds_0'='mysql-bin.000001#111','ds_1'='mysql-bin.000002#222');