数据迁移 #
语法 | 描述 | 类型 |
---|---|---|
ALTER MIGRATION RULE | 定义数据迁移规则 | RAL |
SHOW MIGRATION RULE | 查看数据迁移规则 | RAL |
SHOW MIGRATION LIST | 查看数据迁移规则列表 | RAL |
SHOW MIGRATION STATUS jobId | 查看数据迁移作业状态 | RAL |
MIGRATE TABLE x INTO y | 启动迁移作业 | RAL |
START MIGRATION jobId | 开启停止的作业 | RAL |
STOP MIGRATION jobId | 停止作业 | RAL |
SHOW MIGRATION CHECK ALGORITHMS | 查看一致性校验算法 | RAL |
CHECK MIGRATION jobId | 检查作业数据一致性 | RAL |
START MIGRATION CHECK jobId | 开启停止的数据一致性校验作业 | RAL |
STOP MIGRATION CHECK jobId | 停止数据一致性校验作业 | RAL |
SHOW MIGRATION CHECK STATUS jobId | 查看数据一致性校验进度 | RAL |
COMMIT MIGRATION jobId | 提交作业 | RAL |
ROLLBACK MIGRATION jobId | 撤销作业。注意:该语句会 清理目标端表,请谨慎操作 | RAL |
REGISTER MIGRATION SOURCE STORAGE UNIT | 注册数据迁移源端存储单元 | RAL |
SHOW MIGRATION SOURCE STORAGE UNITS | 查询已经注册的数据迁移源端存储单元 | RAL |
UNREGISTER MIGRATION SOURCE STORAGE UNIT | 移除数据迁移源端存储单元 | RAL |
UPDATE MIGRATION jobId OFFSET POSITION | 更新数据迁移增量进度 | RAL |
1. 定义数据迁移作业 #
ALTER MIGRATION RULE ();
因 migration rule 具有默认值,无需创建,仅提供 ALTER 语句。
完整配置 DistSQL 示例
ALTER MIGRATION RULE (
READ(
WORKER_THREAD=20,
BATCH_SIZE=1000,
SHARDING_SIZE=10000000,
RATE_LIMITER (TYPE(NAME='QPS',PROPERTIES('qps'='500')))
),
WRITE(
WORKER_THREAD=20,
BATCH_SIZE=1000,
RATE_LIMITER (TYPE(NAME='TPS',PROPERTIES('tps'='2000')))
),
STREAM_CHANNEL (TYPE(NAME='MEMORY',PROPERTIES('block-queue-size'='2000')))
);
配置说明
ALTER MIGRATION RULE (
READ( -- 数据读取配置。如果不配置则部分参数默认生效。
WORKER_THREAD=20, -- 从源端摄取全量数据的线程池大小。如果不配置则使用默认值。
BATCH_SIZE=1000, -- 一次查询操作返回的最大记录数。如果不配置则使用默认值。
SHARDING_SIZE=10000000, -- 全量数据分片大小。如果不配置则使用默认值。
RATE_LIMITER ( -- 限流算法。如果不配置则不限流。
TYPE( -- 算法类型。可选项:QPS
NAME='QPS',
PROPERTIES( -- 算法属性
'qps'='500'
)))
),
WRITE( -- 数据写入配置。如果不配置则部分参数默认生效。
WORKER_THREAD=20, -- 数据写入到目标端的线程池大小。如果不配置则使用默认值。
BATCH_SIZE=1000, -- 一次批量写入操作的最大记录数。如果不配置则使用默认值。
RATE_LIMITER ( -- 限流算法。如果不配置则不限流。
TYPE( -- 算法类型。可选项:TPS
NAME='TPS',
PROPERTIES( -- 算法属性
'tps'='2000'
)))
),
STREAM_CHANNEL ( -- 数据通道,连接生产者和消费者,用于 read 和 write 环节。如果不配置则默认使用 MEMORY 类型。
TYPE( -- 算法类型。可选项:MEMORY
NAME='MEMORY',
PROPERTIES( -- 算法属性
'block-queue-size'='2000' -- 属性:阻塞队列大小
)))
);
默认配置:
+‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑+‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑+‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑+
| read | write | stream_channel |
+‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑+‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑+‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑+
| {"workerThread":40,"batchSize":1000,"shardingSize":10000000} | {"workerThread":40,"batchSize":1000} | {"type":"MEMORY", "props":{"block‑queue‑size":10000}} |
+‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑+‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑+‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑+
示例
ALTER MIGRATION RULE (READ(RATE_LIMITER (TYPE(NAME='QPS’,PROPERTIES(‘qps'='5000’)))))
2. 查看数据迁移规则 #
SHOW MIGRATION RULE
参数说明
无
示例
SHOW MIGRATION RULE;
输出说明
列名 | 说明 |
---|---|
read | 数据读取配置。如果不配置则部分参数默认生效 |
write | 数据写入配置。如果不配置则部分参数默认生效。 |
stream_channel | 数据通道,连接生产者和消费者,用于 read 和 write 环节。如果不配置则默认使用 MEMORY 类型。 |
3. 查看数据迁移规则列表 #
SHOW MIGRATION LIST;
参数说明
无
示例
+-------------------------------------+---------+----------------------+--------+---------------------+-----------+
| id | tables | sharding_total_count | active | create_time | stop_time |
+-------------------------------------+---------+----------------------+--------+---------------------+-----------+
|j01016e501b498ed1bdb2c373a2e85e2529a6| t_order | 1 | true | 2022-08-22 16:37:01 | NULL |
+-------------------------------------+---------+----------------------+--------+---------------------+-----------+
输出说明
列名 | 说明 |
---|---|
Id | 迁移任务ID |
tables | 被迁移的表名 |
active | 任务是否活跃 true 或 false |
create_time | 任务创建时间 |
stop_time | 任务停止时间,为 NULL 表示任务未停止 |
4. 查看数据迁移作业状态 #
SHOW MIGRATION STATUS jobId;
参数说明
参数名 | 说明 |
---|---|
jobId | 数据迁移作业id |
示例
SHOW MIGRATION STATUS 'j01016e501b498ed1bdb2c373a2e85e2529a6';
+------+-------------+--------------------------+--------+-------------------------+-------------------------------+--------------------------+---------------+
| item | data_source | status | active | processed_records_count | inventory_finished_percentage | incremental_idle_seconds | error_message |
+------+-------------+--------------------------+--------+-------------------------+-------------------------------+--------------------------+---------------+
| 0 | ds_0 | EXECUTE_INCREMENTAL_TASK | true | 6 | 100 | 81 | |
+------+-------------+--------------------------+--------+-------------------------+-------------------------------+--------------------------+---------------+
输出说明
列名 | 说明 |
---|---|
item | 迁移任务内序号 |
data_source | 数据存储单元名称 |
status | 任务状态,详细参考状态描述 |
active | 任务是否活跃 true 或 false |
processed_records_count | 处理完成的记录数 |
inventory_finished_percentage | 全量数据处理完成进度(百分比) |
incremental_idle_seconds | 增量同步任务不活跃时间(单位:秒) |
error_message | 错误信息 |
状态说明
取值 | 描述 |
---|---|
PREPARING | 准备中 |
RUNNING | 运行中 |
EXECUTE_INVENTORY_TASK | 全量迁移中 |
EXECUTE_INCREMENTAL_TASK | 增量迁移中 |
FINISHED | 已完成(整个流程完成了,新规则已生效) |
PREPARING_FAILURE | 准备阶段失败 |
EXECUTE_INVENTORY_TASK_FAILURE | 全量迁移阶段失败 |
EXECUTE_INCREMENTAL_TASK_FAILURE | 增量迁移阶段失败 |
5. 启动数据迁移 #
MIGRATE TABLE source_storage_unit.source_schema_name.source_table_name INTO target_database_name.target_table_name (PROPERTIES('column-exclude-list'='column1,column2'));
参数说明
参数名 | 说明 |
---|---|
source_storage_unit | 数据源存储单元名称。需要通过REGISTER MIGRATION SOURCE STORAGE UNIT 提前注册 |
source_schema_name | 源端 schema。可选 |
source_table_name | 源端表名 |
target_database_name | 目标端逻辑库名。可选。不指定的话使用当前逻辑库名 |
target_table_name | 目标端表名 |
PROPERTIES | 数据迁移属性。可选 |
数据迁移可选属性:
属性名称 | 数据类型 | 默认值 | 说明 |
---|---|---|---|
column-exclude-list | String | 需要排除的字段名。多个字段使用逗号分隔 |
示例
存储节点为 MySQL,迁移到当前逻辑库:
MIGRATE TABLE ds_0.t_order INTO t_order;
存储节点为 PostgreSQL,迁移到当前逻辑库:
MIGRATE TABLE ds_0.public.t_order INTO t_order;
存储节点为 MySQL,迁移到指定逻辑库:
MIGRATE TABLE ds_0.t_order INTO sharding_db.t_order;
同时迁移多张表,多张表使用逗号分隔:
MIGRATE TABLE ds_0.t_order INTO sharding_db.t_order,ds_0.t_order_item INTO sharding_db.t_order_item;
t_order 表排除部分字段,t_order_item 表迁移所有字段:
MIGRATE TABLE ds_0.t_order INTO sharding_db.t_order (PROPERTIES('column-exclude-list'='column1,column2')),ds_0.t_order_item INTO sharding_db.t_order_item;
6. 开启已经停止的作业 #
START MIGRATION jobId
参数说明
参数名 | 说明 |
---|---|
jobId | 作业id |
示例
指定 jobid 启动已经停止的 job。
START MIGRATION 'j01016e501b498ed1bdb2c373a2e85e2529a6';
7. 停止的作业 #
STOP MIGRATION jobId
参数说明
参数名 | 说明 |
---|---|
jobId | 作业id |
示例
指定jobId启动已经停止的job。
STOP MIGRATION 'j01016e501b498ed1bdb2c373a2e85e2529a6';
8. 查看数据一致性算法 #
一致性校验算法选择请参考 数据一致性校验算法。
SHOW MIGRATION CHECK ALGORITHMS;
示例
SHOW MIGRATION CHECK ALGORITHMS;
+----------------------------+---------------------------------------------+-----------------------------------------------------------------------------------------------+----------------------------------------------+
| type | type_aliases | supported_database_types | description |
+----------------------------+---------------------------------------------+-----------------------------------------------------------------------------------------------+----------------------------------------------+
| CRC32_MATCH | | MySQL,MariaDB,H2,Aurora | Match CRC32 of records. |
| DATA_MATCH | | SQL92,MySQL,PostgreSQL,openGauss,Oracle,SQLServer,MariaDB,H2,Hive,Presto,Aurora,DM,KingbaseES | Match raw data of records. |
| SphereEx:BATCH_CRC32_MATCH | SphereEx:DATA_CONSISTENCY_CHECK_CRC32_MATCH | MySQL,MariaDB,H2,Aurora | Match CRC32 of records (Commercial edition). |
| SphereEx:DATA_DIFF | | SQL92,MySQL,PostgreSQL,openGauss,Oracle,SQLServer,MariaDB,H2,Hive,Presto,Aurora,DM,KingbaseES | Diff raw data of records. |
+----------------------------+---------------------------------------------+-----------------------------------------------------------------------------------------------+----------------------------------------------+
输出说明
- type : 算法名
- type_aliases : 算法别名
- supported_database_types : 算法支持哪些数据库
- description : 算法说明
9. 执行数据一致性校验 #
CHECK MIGRATION jobId BY TYPE (NAME='algorithmType');
参数名 | 说明 |
---|---|
jobId | 作业id |
algorithmType | 一致性校验算法类型 |
示例
指定 jobId 指定校验算法执行数据一致性校验
CHECK MIGRATION 'j01016e501b498ed1bdb2c373a2e85e2529a6' BY TYPE (NAME='CRC32_MATCH');
CHECK MIGRATION 'j01016e501b498ed1bdb2c373a2e85e2529a6' BY TYPE (NAME='DATA_MATCH', PROPERTIES('chunk-size'='1000'));
CHECK MIGRATION 'j01016e501b498ed1bdb2c373a2e85e2529a6' BY TYPE (NAME='SphereEx:BATCH_CRC32_MATCH');
CHECK MIGRATION 'j01016e501b498ed1bdb2c373a2e85e2529a6' BY TYPE (NAME='SphereEx:DATA_DIFF', PROPERTIES('diff-storage-type'='DATABASE','chunk-size'='1000','diff-storage-unit-name'='ds_2','diff-table-name'='spex_cnschk_1','incremental-idle-seconds-threshold'='30'));
10. 开启停止的数据一致校验作业 #
START MIGRATION CHECK jobId
参数名 | 说明 |
---|---|
jobId | 作业 id |
示例
指定 jobId 指定校验算法执行数据一致性校验
START MIGRATION CHECK 'j01016e501b498ed1bdb2c373a2e85e2529a6' ;
Query OK, 0 rows affected (0.09 sec)
11. 停止数据一致校验作业 #
STOP MIGRATION CHECK jobId
参数名 | 说明 |
---|---|
jobId | 作业id |
示例
指定 jobId 指定校验算法执行数据一致性校验
STOP MIGRATION CHECK 'j01016e501b498ed1bdb2c373a2e85e2529a6' ;
Query OK, 0 rows affected (0.09 sec)
12. 查看数据一致性校验进度 #
SHOW MIGRATION CHECK STATUS jobId
参数名 | 说明 |
---|---|
jobId | 作业id |
示例
指定 jobId 指查看数据一致性校验进度
show migration check status j0102p00009e6221cef2c4467323e205a1e4d1604b;
+-------------------+--------+---------------------+---------------------+-------------------+-------------------------+----------------+------------------+---------------+
| tables | result | check_failed_tables | inventory_finished_percentage | inventory_remaining_seconds | check_begin_time | check_end_time | duration_seconds | error_message |
+-------------------+--------+---------------------+---------------------+-------------------+-------------------------+----------------+------------------+---------------+
| ds_0.t_order_copy | | | 0 | 0 | 2023-07-26 18:57:37.431 | | 0 | |
+-------------------+--------+---------------------+---------------------+-------------------+-------------------------+----------------+------------------+---------------+
输出说明
- tables : 表名
- result : 校验结果
- check_failed_tables : 校验失败的表
- inventory_finished_percentage : 存量数据校验完成百分比。由于计算基数是历史值,并且可能是估算结果,所以完成百分比可能存在一定误差
- inventory_remaining_seconds : 存量数据校验剩余时间(单位:秒)。由于计算基数是历史值,并且可能是估算结果,所以剩余时间可能存在一定误差
- check_begin_time : 校验开始时间
- check_end_time : 校验结束时间
- duration_seconds : 校验耗时(单位:秒)
- error_message : 错误信息
13. 提交作业 #
COMMIT MIGRATION jobId;
参数说明
参数名 | 说明 |
---|---|
jobId | 作业id |
示例
指定 jobId 提交作业。
COMMIT MIGRATION 'j01016e501b498ed1bdb2c373a2e85e2529a6';
14. 撤销作业 #
ROLLBACK MIGRATION jobId;
参数名 | 说明 |
---|---|
jobId | 作业id |
注意:该语句会 清理目标端表,请谨慎操作。
示例
指定 jobId 回滚 job。
ROLLBACK MIGRATION 'j01016e501b498ed1bdb2c373a2e85e2529a6';
15. 注册数据迁移源端存储单元 #
REGISTER MIGRATION SOURCE STORAGE UNIT
参数说明
参考存储节点部分
示例
REGISTER MIGRATION SOURCE STORAGE UNIT ds_0 (
URL="jdbc:mysql://127.0.0.1:3306/migration_ds_0?serverTimezone=UTC&useSSL=false",
USER="root",
PASSWORD="root",
PROPERTIES("minPoolSize"="1","maxPoolSize"="20","idleTimeout"="60000")
);
16. 查询已经注册的数据迁移源端存储单元 #
示例
SHOW MIGRATION SOURCE STORAGE UNITS;
17. 移除数据迁移源端存储单元 #
UNREGISTER MIGRATION SOURCE STORAGE UNIT storageUnitName;
参数名 | 说明 |
---|---|
storageUnitName | 源端存储单元。存在多个使用逗号分隔 |
示例
UNREGISTER MIGRATION SOURCE STORAGE UNIT ds_0;
18. 更新数据迁移增量进度 #
UPDATE MIGRATION jobId OFFSET POSITION('storageUnitName'='position');
参数名 | 说明 |
---|---|
storageUnitName | 存储单元名称 |
position | 增量进度位点 |
示例
更新 MySQL 进度的格式是 file#pos,写法如下
UPDATE MIGRATION j01016e501b498ed1bdb2c373a2e85e2529a6 OFFSET POSITION('ds_0'='mysql-bin.000001#111','ds_1'='mysql-bin.000002#222');