数据同步 #
语法 | 描述 | 类型 |
---|---|---|
ALTER SYNCING RULE | 定义数据同步规则 | RAL |
SHOW SYNCING RULE | 查看数据同步规则 | RAL |
SHOW SYNCING LIST | 查看数据同步规则列表 | RAL |
SHOW SYNCING STATUS jobId | 查看数据同步作业状态 | RAL |
SYNC DATA tableName TO SINK | 启动数据同步作业 | RAL |
START SYNCING jobId | 开启停止的作业 | RAL |
STOP SYNCING jobId | 停止作业 | RAL |
CHECK SYNCING jobId | 检查作业数据一致性 | RAL |
START SYNCING CHECK jobId | 开启停止的数据一致性校验作业 | RAL |
STOP SYNCING CHECK jobId | 停止数据一致性校验作业 | RAL |
SHOW SYNCING CHECK STATUS jobId | 查看数据一致性校验进度 | RAL |
COMMIT SYNCING jobId | 提交作业 | RAL |
ROLLBACK SYNCING jobId | 撤销作业 | RAL |
UPDATE SYNCING jobId OFFSET POSITION | 更新数据同步进度 | RAL |
1. 定义数据同步作业规则 #
ALTER SYNCING RULE ();
因 syncing rule 具有默认值,无需创建,仅提供 ALTER 语句。
完整配置 DistSQL 示例
ALTER SYNCING RULE (
READ(
WORKER_THREAD=20,
BATCH_SIZE=1000,
SHARDING_SIZE=10000000,
RATE_LIMITER (TYPE(NAME='QPS',PROPERTIES('qps'='500')))
),
WRITE(
WORKER_THREAD=20,
BATCH_SIZE=1000,
RATE_LIMITER (TYPE(NAME='TPS',PROPERTIES('tps'='2000')))
),
STREAM_CHANNEL (TYPE(NAME='MEMORY',PROPERTIES('block-queue-size'='2000')))
);
配置说明
ALTER SYNCING RULE (
READ( -- 数据读取配置。如果不配置则部分参数默认生效。
WORKER_THREAD=20, -- 从源端摄取数据的线程池大小,该值需要大于分库数量,如果不配置则使用默认值。
BATCH_SIZE=1000, -- 一次查询操作返回的最大记录数。如果不配置则使用默认值。
RATE_LIMITER ( -- 限流算法。如果不配置则不限流。
TYPE( -- 算法类型。可选项:QPS
NAME='QPS',
PROPERTIES( -- 算法属性
'qps'='500'
)))
),
WRITE( -- 数据写入配置。如果不配置则部分参数默认生效。
WORKER_THREAD=20, -- 数据写入到目标端的线程池大小。如果不配置则使用默认值。
BATCH_SIZE=1000, -- 一次批量写入操作的最大记录数。如果不配置则使用默认值。
RATE_LIMITER ( -- 限流算法。如果不配置则不限流。
TYPE( -- 算法类型。可选项:TPS
NAME='TPS',
PROPERTIES( -- 算法属性
'tps'='2000'
)))
),
STREAM_CHANNEL ( -- 数据通道,连接生产者和消费者,用于 read 和 write 环节。如果不配置则默认使用 MEMORY 类型。
TYPE( -- 算法类型。可选项:MEMORY
NAME='MEMORY',
PROPERTIES( -- 算法属性
'block-queue-size'='2000' -- 属性:阻塞队列大小
)))
);
默认配置:
+‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑+‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑+‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑+
| read | write | stream_channel |
+‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑+‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑+‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑+
| {"workerThread":20,"batchSize":1000,"shardingSize":10000000} | {"workerThread":20,"batchSize":1000} | {"type":"MEMORY", "props":{"block‑queue‑size":2000}} |
+‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑+‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑+‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑+
示例
ALTER SYNCING RULE (READ(RATE_LIMITER (TYPE(NAME='QPS’,PROPERTIES(‘qps'='5000’)))))
2. 查看数据同步规则 #
SHOW SYNCING RULE
参数说明
无
示例
SHOW SYNCING RULE;
输出说明
列名 | 说明 |
---|---|
read | 数据读取配置。如果不配置则部分参数默认生效 |
write | 数据写入配置。如果不配置则部分参数默认生效。 |
stream_channel | 数据通道,连接生产者和消费者,用于 read 和 write 环节。如果不配置则默认使用 MEMORY 类型。 |
3. 查看数据同步规则列表 #
SHOW SYNCING LIST;
参数说明
无
示例
mysql> SHOW SYNCING LIST;
+--------------------------------------------+---------+----------------+--------+---------------------+-----------+
| id | tables | job_item_count | active | create_time | stop_time |
+--------------------------------------------+---------+----------------+--------+---------------------+-----------+
| j5802p0000c1159b9ae7a81c76e196669420080657 | t_order | 2 | true | 2024-02-19 15:30:57 | NULL |
+--------------------------------------------+---------+----------------+--------+---------------------+-----------+
4. 查看数据同步作业状态 #
SHOW SYNCING STATUS jobId;
参数说明
参数名 | 说明 |
---|---|
jobId | 数据同步作业id |
示例
mysql> SHOW SYNCING STATUS j5802p0000c1159b9ae7a81c76e196669420080657;
+------+--------------+---------+--------------------------+--------+-------------------------+---------------+--------------------------+---------------+
| item | data_source | tables | status | active | processed_records_count | delay_seconds | incremental_idle_seconds | error_message |
+------+--------------+---------+--------------------------+--------+-------------------------+---------------+--------------------------+---------------+
| 0 | ds_0 | t_order | EXECUTE_INCREMENTAL_TASK | true | 51 | 1 | 2 | |
| 1 | ds_1 | t_order | EXECUTE_INCREMENTAL_TASK | true | 51 | 0 | 2 | |
+------+--------------+---------+--------------------------+--------+-------------------------+---------------+--------------------------+---------------+
输出说明
列名 | 说明 |
---|---|
item | 同步任务内序号 |
data_source | 数据存储单元名称 |
status | 任务状态,详细参考状态描述 |
active | 任务是否活跃 true 或 false |
processed_records_count | 处理完成的记录数 |
delay_seconds | 延迟时间(秒) |
incremental_idle_seconds | 增量同步任务不活跃时间(单位:秒) |
error_message | 错误信息 |
状态说明
取值 | 描述 |
---|---|
RUNNING | 运行中 |
PREPARING | 准备中 |
EXECUTE_INCREMENTAL_TASK | 增量同步中 |
5. 启动数据同步 #
SYNC DATA tableName TO SINK (TYPE(NAME='sinkType', PROPERTIES('url'='jdbc:mysql://localhost:3307/sharding_db?useSSL=false','username'='proxy', 'password'='Proxy@123')));
参数说明
参数名 | 说明 |
---|---|
tableName | 需要同步的表 |
sinkType | 下游类型,目前只支持 JDBC,此时 PROPERTIES 的值 url、username、password 是必填的 |
PROPERTIES 中的参数都是 key=values
格式,多个参数之间使用逗号分隔。
示例
SYNC DATA t_order,t_order_item TO SINK (TYPE(NAME='JDBC', PROPERTIES('url'='jdbc:mysql://localhost:3307/sharding_db?useSSL=false','username'='proxy', 'password'='Proxy@123', 'maxPoolSize'=20,
'source-start-position'='ds_0:mysql-bin.000002#8990668,ds_1:mysql-bin.000002#8990668')));
完整的参数说明如下:
配置名 | 说明 | 示例 |
---|---|---|
source-start-position | 指定同步开始的位置,格式为 storageUnitName:binlogFileName#position ,多个使用逗号分隔 | ‘source-start-position’=‘ds_0:mysql-bin.000002#8990668,ds_1:mysql-bin.000002#8990668’ |
column-exclude-list | 排除指定表的指定列,格式为 tableName:column1 ,多个表之间使用 | 分隔。 | ‘column-exclude-list’=’t_order:exclude1,exclude2|t_order_item:exclude1’ |
source-target-table-name-mapping | 源端和目标端的表名映射关系,两者不同的时候需要配置 | ‘source-target-table-name-mapping’=’t_order:t_order_sink,t_order_item:t_order_item_sink’ |
6. 开启已经停止的同步作业 #
START SYNCING jobId
参数说明
参数名 | 说明 |
---|---|
jobId | 作业id |
示例
START SYNCING j5802p0000c1159b9ae7a81c76e196669420080657;
7. 停止的作业 #
STOP SYNCING jobId
参数说明
参数名 | 说明 |
---|---|
jobId | 作业id |
示例
STOP SYNCING j5802p0000c1159b9ae7a81c76e196669420080657;
8. 执行数据一致性校验 #
一致性校验算法选择请参考 数据一致性校验算法。
CHECK SYNCING jobId BY TYPE (NAME='algorithmType');
参数名 | 说明 |
---|---|
jobId | 作业id |
algorithmType | 一致性校验算法类型 |
示例
指定 jobId 指定校验算法执行数据一致性校验。
CHECK SYNCING j5802p0000c1159b9ae7a81c76e196669420080657 BY TYPE (NAME='CRC32_MATCH');
CHECK SYNCING j5802p0000c1159b9ae7a81c76e196669420080657 BY TYPE (NAME='DATA_MATCH', PROPERTIES('chunk-size'='1000'));
CHECK SYNCING j5802p0000c1159b9ae7a81c76e196669420080657 BY TYPE (NAME='SphereEx:BATCH_CRC32_MATCH');
CHECK SYNCING j5802p0000c1159b9ae7a81c76e196669420080657 BY TYPE (NAME='SphereEx:DATA_DIFF', PROPERTIES('diff-storage-type'='DATABASE','chunk-size'='1000','diff-storage-unit-name'='ds_2','diff-table-name'='spex_cnschk_1','incremental-idle-seconds-threshold'='30'));
9. 开启停止的数据一致校验作业 #
START SYNCING CHECK jobId
参数名 | 说明 |
---|---|
jobId | 作业 id |
示例
指定 jobId 指定校验算法执行数据一致性校验。
START SYNCING CHECK j5802p0000c1159b9ae7a81c76e196669420080657 ;
10. 停止数据一致性校验作业 #
STOP SYNCING CHECK jobId
参数名 | 说明 |
---|---|
jobId | 作业id |
示例
指定 jobId 指定校验算法执行数据一致性校验。
STOP SYNCING CHECK j5802p0000c1159b9ae7a81c76e196669420080657 ;
11. 查看数据一致性校验进度 #
SHOW SYNCING CHECK STATUS jobId
参数名 | 说明 |
---|---|
jobId | 作业id |
示例
mysql> SHOW SYNCING CHECK STATUS j5802p0000c1159b9ae7a81c76e196669420080657;
+---------+--------+---------------------+--------+-------------------------------+-----------------------------+--------------------------+-------------------------+-------------------------+------------------+----------------------------+-----------------+---------------+
| tables | result | check_failed_tables | active | inventory_finished_percentage | inventory_remaining_seconds | incremental_idle_seconds | check_begin_time | check_end_time | duration_seconds | algorithm_type | algorithm_props | error_message |
+---------+--------+---------------------+--------+-------------------------------+-----------------------------+--------------------------+-------------------------+-------------------------+------------------+----------------------------+-----------------+---------------+
| t_order | true | | false | 100 | 0 | | 2024-02-19 16:07:40.926 | 2024-02-19 16:07:42.178 | 1 | SphereEx:BATCH_CRC32_MATCH | | |
+---------+--------+---------------------+--------+-------------------------------+-----------------------------+--------------------------+-------------------------+-------------------------+------------------+----------------------------+-----------------+---------------+
12. 提交作业 #
COMMIT SYNCING jobId;
参数说明
参数名 | 说明 |
---|---|
jobId | 作业id |
示例
COMMIT SYNCING j5802p0000c1159b9ae7a81c76e196669420080657;
13. 撤销作业 #
ROLLBACK SYNCING jobId;
参数名 | 说明 |
---|---|
jobId | 作业id |
示例
ROLLBACK SYNCING j5802p0000c1159b9ae7a81c76e196669420080657;
14. 更新数据同步进度 #
UPDATE SYNCING jobId OFFSET POSITION('storageUnitName'='position');
参数名 | 说明 |
---|---|
storageUnitName | 存储单元名称 |
position | 进度位点 |
示例
更新 MySQL 进度的格式是 file#pos,写法如下
UPDATE SYNCING j5802p0000c1159b9ae7a81c76e196669420080657 OFFSET POSITION('ds_0'='mysql-bin.000001#111','ds_1'='mysql-bin.000002#222');