数据迁移 #
| 语法 | 描述 | 类型 |
|---|---|---|
| ALTER MIGRATION RULE | 定义数据迁移规则 | RAL |
| SHOW MIGRATION RULE | 查看数据迁移规则 | RAL |
| SHOW MIGRATION LIST | 查看数据迁移规则列表 | RAL |
| SHOW MIGRATION STATUS jobId | 查看数据迁移作业状态 | RAL |
| MIGRATE TABLE x INTO y | 启动迁移作业 | RAL |
| START MIGRATION jobId | 开启停止的作业 | RAL |
| STOP MIGRATION jobId | 停止作业 | RAL |
| SHOW MIGRATION CHECK ALGORITHMS | 查看一致性校验算法 | RAL |
| CHECK MIGRATION jobId | 检查作业数据一致性 | RAL |
| START MIGRATION CHECK jobId | 开启停止的数据一致性校验作业 | RAL |
| STOP MIGRATION CHECK jobId | 停止数据一致性校验作业 | RAL |
| DROP MIGRATION CHECK jobId | 删除数据一致性校验作业 | RAL |
| SHOW MIGRATION CHECK STATUS jobId | 查看数据一致性校验进度 | RAL |
| COMMIT MIGRATION jobId | 提交作业 | RAL |
| ROLLBACK MIGRATION jobId | 撤销作业。注意:该语句会 清理目标端表,请谨慎操作 | RAL |
| REGISTER MIGRATION SOURCE STORAGE UNIT | 注册数据迁移源端存储单元 | RAL |
| SHOW MIGRATION SOURCE STORAGE UNITS | 查询已经注册的数据迁移源端存储单元 | RAL |
| UNREGISTER MIGRATION SOURCE STORAGE UNIT | 移除数据迁移源端存储单元 | RAL |
| UPDATE MIGRATION jobId OFFSET POSITION | 更新数据迁移增量进度 | RAL |
定义数据迁移作业 #
ALTER MIGRATION RULE ();
因 migration rule 具有默认值,无需创建,仅提供 ALTER 语句。
完整配置 DistSQL 示例
ALTER MIGRATION RULE (
READ(
WORKER_THREAD=20,
BATCH_SIZE=1000,
SHARDING_SIZE=10000000,
RATE_LIMITER (TYPE(NAME='QPS',PROPERTIES('qps'='500')))
),
WRITE(
WORKER_THREAD=20,
BATCH_SIZE=1000,
RATE_LIMITER (TYPE(NAME='TPS',PROPERTIES('tps'='2000')))
),
STREAM_CHANNEL (TYPE(NAME='MEMORY',PROPERTIES('block-queue-size'='2000')))
);
配置说明
ALTER MIGRATION RULE (
READ( -- 数据读取配置。如果不配置则部分参数默认生效。
WORKER_THREAD=20, -- 从源端摄取全量数据的线程池大小。如果不配置则使用默认值。
BATCH_SIZE=1000, -- 一次查询操作返回的最大记录数。如果不配置则使用默认值。
SHARDING_SIZE=10000000, -- 全量数据分片大小。如果不配置则使用默认值。
RATE_LIMITER ( -- 限流算法。如果不配置则不限流。
TYPE( -- 算法类型。可选项:QPS
NAME='QPS',
PROPERTIES( -- 算法属性
'qps'='500'
)))
),
WRITE( -- 数据写入配置。如果不配置则部分参数默认生效。
WORKER_THREAD=20, -- 数据写入到目标端的线程池大小。如果不配置则使用默认值。
BATCH_SIZE=1000, -- 一次批量写入操作的最大记录数。如果不配置则使用默认值。
RATE_LIMITER ( -- 限流算法。如果不配置则不限流。
TYPE( -- 算法类型。可选项:TPS
NAME='TPS',
PROPERTIES( -- 算法属性
'tps'='2000'
)))
),
STREAM_CHANNEL ( -- 数据通道,连接生产者和消费者,用于 read 和 write 环节。如果不配置则默认使用 MEMORY 类型。
TYPE( -- 算法类型。可选项:MEMORY
NAME='MEMORY',
PROPERTIES( -- 算法属性
'block-queue-size'='2000' -- 属性:阻塞队列大小
)))
);
默认配置:
+‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑+‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑+‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑+
| read | write | stream_channel |
+‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑+‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑+‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑+
| {"workerThread":40,"batchSize":1000,"shardingSize":10000000} | {"workerThread":40,"batchSize":1000} | {"type":"MEMORY", "props":{"block‑queue‑size":10000}} |
+‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑+‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑+‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑+
示例
ALTER MIGRATION RULE (READ(RATE_LIMITER (TYPE(NAME='QPS’,PROPERTIES(‘qps'='5000’)))))
查看数据迁移规则 #
SHOW MIGRATION RULE
参数说明
无
示例
SHOW MIGRATION RULE;
输出说明
| 列名 | 说明 |
|---|---|
| read | 数据读取配置。如果不配置则部分参数默认生效 |
| write | 数据写入配置。如果不配置则部分参数默认生效。 |
| stream_channel | 数据通道,连接生产者和消费者,用于 read 和 write 环节。如果不配置则默认使用 MEMORY 类型。 |
查看数据迁移规则列表 #
SHOW MIGRATION LIST;
参数说明
无
示例
+-------------------------------------+---------+----------------------+--------+---------------------+-----------+
| id | tables | sharding_total_count | active | create_time | stop_time |
+-------------------------------------+---------+----------------------+--------+---------------------+-----------+
|j01016e501b498ed1bdb2c373a2e85e2529a6| t_order | 1 | true | 2022-08-22 16:37:01 | NULL |
+-------------------------------------+---------+----------------------+--------+---------------------+-----------+
输出说明
| 列名 | 说明 |
|---|---|
| Id | 迁移任务ID |
| tables | 被迁移的表名 |
| active | 任务是否活跃 true 或 false |
| create_time | 任务创建时间 |
| stop_time | 任务停止时间,为 NULL 表示任务未停止 |
查看数据迁移作业状态 #
SHOW MIGRATION STATUS jobId;
参数说明
| 参数名 | 说明 |
|---|---|
| jobId | 数据迁移作业id |
示例
SHOW MIGRATION STATUS 'j01016e501b498ed1bdb2c373a2e85e2529a6';
+------+-------------+--------------------------+--------+-------------------------+-------------------------------+--------------------------+---------------+
| item | data_source | status | active | processed_records_count | inventory_finished_percentage | incremental_idle_seconds | error_message |
+------+-------------+--------------------------+--------+-------------------------+-------------------------------+--------------------------+---------------+
| 0 | ds_0 | EXECUTE_INCREMENTAL_TASK | true | 6 | 100 | 81 | |
+------+-------------+--------------------------+--------+-------------------------+-------------------------------+--------------------------+---------------+
输出说明
| 列名 | 说明 |
|---|---|
| item | 迁移任务内序号 |
| data_source | 数据存储单元名称 |
| status | 任务状态,详细参考状态描述 |
| active | 任务是否活跃 true 或 false |
| processed_records_count | 处理完成的记录数 |
| inventory_finished_percentage | 全量数据处理完成进度(百分比) |
| incremental_idle_seconds | 增量同步任务不活跃时间(单位:秒) |
| error_message | 错误信息 |
状态说明
| 取值 | 描述 |
|---|---|
| PREPARING | 准备中 |
| RUNNING | 运行中 |
| EXECUTE_INVENTORY_TASK | 全量迁移中 |
| EXECUTE_INCREMENTAL_TASK | 增量迁移中 |
| FINISHED | 已完成(整个流程完成了,新规则已生效) |
| PREPARING_FAILURE | 准备阶段失败 |
| EXECUTE_INVENTORY_TASK_FAILURE | 全量迁移阶段失败 |
| EXECUTE_INCREMENTAL_TASK_FAILURE | 增量迁移阶段失败 |
启动数据迁移 #
MIGRATE TABLE source_storage_unit.source_schema_name.source_table_name INTO target_database_name.target_table_name (PROPERTIES('column-exclude-list'='column1,column2'));
参数说明
| 参数名 | 说明 |
|---|---|
| source_storage_unit | 数据源存储单元名称。需要通过REGISTER MIGRATION SOURCE STORAGE UNIT提前注册 |
| source_schema_name | 源端 schema。可选 |
| source_table_name | 源端表名 |
| target_database_name | 目标端逻辑库名。可选。不指定的话使用当前逻辑库名 |
| target_table_name | 目标端表名 |
| PROPERTIES | 数据迁移属性。可选 |
数据迁移可选属性:
| 属性名称 | 数据类型 | 默认值 | 说明 |
|---|---|---|---|
| column-exclude-list | String | 需要排除的字段名。多个字段使用逗号分隔 |
示例
存储节点为 MySQL,迁移到当前逻辑库:
MIGRATE TABLE ds_0.t_order INTO t_order;
存储节点为 PostgreSQL,迁移到当前逻辑库:
MIGRATE TABLE ds_0.public.t_order INTO t_order;
存储节点为 MySQL,迁移到指定逻辑库:
MIGRATE TABLE ds_0.t_order INTO sharding_db.t_order;
同时迁移多张表,多张表使用逗号分隔:
MIGRATE TABLE ds_0.t_order INTO sharding_db.t_order,ds_0.t_order_item INTO sharding_db.t_order_item;
t_order 表排除部分字段,t_order_item 表迁移所有字段:
MIGRATE TABLE ds_0.t_order INTO sharding_db.t_order (PROPERTIES('column-exclude-list'='column1,column2')),ds_0.t_order_item INTO sharding_db.t_order_item;
开启已经停止的作业 #
START MIGRATION jobId
参数说明
| 参数名 | 说明 |
|---|---|
| jobId | 作业id |
示例
指定 jobid 启动已经停止的 job。
START MIGRATION 'j01016e501b498ed1bdb2c373a2e85e2529a6';
停止的作业 #
STOP MIGRATION jobId
参数说明
| 参数名 | 说明 |
|---|---|
| jobId | 作业id |
示例
指定jobId启动已经停止的job。
STOP MIGRATION 'j01016e501b498ed1bdb2c373a2e85e2529a6';
查看数据一致性算法 #
一致性校验算法选择请参考 数据一致性校验算法。
SHOW MIGRATION CHECK ALGORITHMS;
示例
SHOW MIGRATION CHECK ALGORITHMS;
+----------------------------+---------------------------------------------+-----------------------------------------------------------------------------------------------+----------------------------------------------+
| type | type_aliases | supported_database_types | description |
+----------------------------+---------------------------------------------+-----------------------------------------------------------------------------------------------+----------------------------------------------+
| CRC32_MATCH | | MySQL,MariaDB,H2,Aurora | Match CRC32 of records. |
| DATA_MATCH | | SQL92,MySQL,PostgreSQL,openGauss,Oracle,SQLServer,MariaDB,H2,Hive,Presto,Aurora,DM,KingbaseES | Match raw data of records. |
| SphereEx:BATCH_CRC32_MATCH | SphereEx:DATA_CONSISTENCY_CHECK_CRC32_MATCH | MySQL,MariaDB,H2,Aurora | Match CRC32 of records (Commercial edition). |
| SphereEx:DATA_DIFF | | SQL92,MySQL,PostgreSQL,openGauss,Oracle,SQLServer,MariaDB,H2,Hive,Presto,Aurora,DM,KingbaseES | Diff raw data of records. |
+----------------------------+---------------------------------------------+-----------------------------------------------------------------------------------------------+----------------------------------------------+
输出说明
- type : 算法名
- type_aliases : 算法别名
- supported_database_types : 算法支持哪些数据库
- description : 算法说明
执行数据一致性校验 #
CHECK MIGRATION jobId BY TYPE (NAME='algorithmType');
| 参数名 | 说明 |
|---|---|
| jobId | 作业id |
| algorithmType | 一致性校验算法类型 |
示例
指定 jobId 指定校验算法执行数据一致性校验
CHECK MIGRATION 'j01016e501b498ed1bdb2c373a2e85e2529a6' BY TYPE (NAME='CRC32_MATCH');
CHECK MIGRATION 'j01016e501b498ed1bdb2c373a2e85e2529a6' BY TYPE (NAME='DATA_MATCH', PROPERTIES('chunk-size'='1000'));
CHECK MIGRATION 'j01016e501b498ed1bdb2c373a2e85e2529a6' BY TYPE (NAME='SphereEx:BATCH_CRC32_MATCH');
CHECK MIGRATION 'j01016e501b498ed1bdb2c373a2e85e2529a6' BY TYPE (NAME='SphereEx:DATA_DIFF', PROPERTIES('diff-storage-type'='DATABASE','chunk-size'='1000','diff-storage-unit-name'='ds_2','diff-table-name'='spex_cnschk_1','incremental-idle-seconds-threshold'='30'));
开启停止的数据一致校验作业 #
START MIGRATION CHECK jobId
| 参数名 | 说明 |
|---|---|
| jobId | 作业 id |
示例
指定 jobId 指定校验算法执行数据一致性校验
START MIGRATION CHECK 'j01016e501b498ed1bdb2c373a2e85e2529a6' ;
Query OK, 0 rows affected (0.09 sec)
停止数据一致校验作业 #
STOP MIGRATION CHECK jobId
| 参数名 | 说明 |
|---|---|
| jobId | 作业id |
示例
指定 jobId 指定校验算法执行数据一致性校验
STOP MIGRATION CHECK 'j01016e501b498ed1bdb2c373a2e85e2529a6' ;
Query OK, 0 rows affected (0.09 sec)
删除数据一致校验作业 #
DROP MIGRATION CHECK jobId
| 参数名 | 说明 |
|---|---|
| jobId | 作业id |
示例
指定 jobId 指定校验算法执行数据一致性校验
DROP MIGRATION CHECK 'j01016e501b498ed1bdb2c373a2e85e2529a6' ;
Query OK, 0 rows affected (0.09 sec)
查看数据一致性校验进度 #
SHOW MIGRATION CHECK STATUS jobId
| 参数名 | 说明 |
|---|---|
| jobId | 作业id |
示例
指定 jobId 指查看数据一致性校验进度
show migration check status j0102p00009e6221cef2c4467323e205a1e4d1604b;
+-------------------+--------+---------------------+---------------------+-------------------+-------------------------+----------------+------------------+---------------+
| tables | result | check_failed_tables | inventory_finished_percentage | inventory_remaining_seconds | check_begin_time | check_end_time | duration_seconds | error_message |
+-------------------+--------+---------------------+---------------------+-------------------+-------------------------+----------------+------------------+---------------+
| ds_0.t_order_copy | | | 0 | 0 | 2023-07-26 18:57:37.431 | | 0 | |
+-------------------+--------+---------------------+---------------------+-------------------+-------------------------+----------------+------------------+---------------+
输出说明
- tables : 表名
- result : 校验结果
- check_failed_tables : 校验失败的表
- inventory_finished_percentage : 存量数据校验完成百分比。由于计算基数是历史值,并且可能是估算结果,所以完成百分比可能存在一定误差
- inventory_remaining_seconds : 存量数据校验剩余时间(单位:秒)。由于计算基数是历史值,并且可能是估算结果,所以剩余时间可能存在一定误差
- check_begin_time : 校验开始时间
- check_end_time : 校验结束时间
- duration_seconds : 校验耗时(单位:秒)
- error_message : 错误信息
提交作业 #
COMMIT MIGRATION jobId;
参数说明
| 参数名 | 说明 |
|---|---|
| jobId | 作业id |
示例
指定 jobId 提交作业。
COMMIT MIGRATION 'j01016e501b498ed1bdb2c373a2e85e2529a6';
撤销作业 #
ROLLBACK MIGRATION jobId;
| 参数名 | 说明 |
|---|---|
| jobId | 作业id |
注意:该语句会 清理目标端表,请谨慎操作。
示例
指定 jobId 回滚 job。
ROLLBACK MIGRATION 'j01016e501b498ed1bdb2c373a2e85e2529a6';
注册数据迁移源端存储单元 #
REGISTER MIGRATION SOURCE STORAGE UNIT
参数说明
参考存储节点部分
示例
REGISTER MIGRATION SOURCE STORAGE UNIT ds_0 (
URL="jdbc:mysql://127.0.0.1:3306/migration_ds_0?serverTimezone=UTC&useSSL=false",
USER="root",
PASSWORD="root",
PROPERTIES("minPoolSize"="1","maxPoolSize"="20","idleTimeout"="60000")
);
查询已经注册的数据迁移源端存储单元 #
示例
SHOW MIGRATION SOURCE STORAGE UNITS;
移除数据迁移源端存储单元 #
UNREGISTER MIGRATION SOURCE STORAGE UNIT storageUnitName;
| 参数名 | 说明 |
|---|---|
| storageUnitName | 源端存储单元。存在多个使用逗号分隔 |
示例
UNREGISTER MIGRATION SOURCE STORAGE UNIT ds_0;
更新数据迁移增量进度 #
UPDATE MIGRATION jobId OFFSET POSITION('storageUnitName'='position');
| 参数名 | 说明 |
|---|---|
| storageUnitName | 存储单元名称 |
| position | 增量进度位点 |
示例
更新 MySQL 进度的格式是 file#pos,写法如下
UPDATE MIGRATION j01016e501b498ed1bdb2c373a2e85e2529a6 OFFSET POSITION('ds_0'='mysql-bin.000001#111','ds_1'='mysql-bin.000002#222');