Logo
数据同步

数据同步 #

语法描述类型
ALTER SYNCING RULE定义数据同步规则RAL
SHOW SYNCING RULE查看数据同步规则RAL
SHOW SYNCING LIST查看数据同步规则列表RAL
SHOW SYNCING STATUS jobId查看数据同步作业状态RAL
SYNC DATA tableName TO SINK启动数据同步作业RAL
START SYNCING jobId开启停止的作业RAL
STOP SYNCING jobId停止作业RAL
CHECK SYNCING jobId检查作业数据一致性RAL
START SYNCING CHECK jobId开启停止的数据一致性校验作业RAL
STOP SYNCING CHECK jobId停止数据一致性校验作业RAL
SHOW SYNCING CHECK STATUS jobId查看数据一致性校验进度RAL
COMMIT SYNCING jobId提交作业RAL
ROLLBACK SYNCING jobId撤销作业RAL
UPDATE SYNCING jobId OFFSET POSITION更新数据同步进度RAL

1. 定义数据同步作业规则 #

ALTER SYNCING RULE ();

因 syncing rule 具有默认值,无需创建,仅提供 ALTER 语句。

完整配置 DistSQL 示例

ALTER SYNCING RULE (
READ(
  WORKER_THREAD=20,
  BATCH_SIZE=1000,
  SHARDING_SIZE=10000000,
  RATE_LIMITER (TYPE(NAME='QPS',PROPERTIES('qps'='500')))
),
WRITE(
  WORKER_THREAD=20,
  BATCH_SIZE=1000,
  RATE_LIMITER (TYPE(NAME='TPS',PROPERTIES('tps'='2000')))
),
STREAM_CHANNEL (TYPE(NAME='MEMORY',PROPERTIES('block-queue-size'='2000')))
);

配置说明

ALTER SYNCING RULE (
READ( -- 数据读取配置。如果不配置则部分参数默认生效。
  WORKER_THREAD=20, -- 从源端摄取数据的线程池大小,该值需要大于分库数量,如果不配置则使用默认值。
  BATCH_SIZE=1000, -- 一次查询操作返回的最大记录数。如果不配置则使用默认值。
  RATE_LIMITER ( -- 限流算法。如果不配置则不限流。
  TYPE( -- 算法类型。可选项:QPS
  NAME='QPS',
  PROPERTIES( -- 算法属性
  'qps'='500'
  )))
),
WRITE( -- 数据写入配置。如果不配置则部分参数默认生效。
  WORKER_THREAD=20, -- 数据写入到目标端的线程池大小。如果不配置则使用默认值。
  BATCH_SIZE=1000, -- 一次批量写入操作的最大记录数。如果不配置则使用默认值。
  RATE_LIMITER ( -- 限流算法。如果不配置则不限流。
  TYPE( -- 算法类型。可选项:TPS
  NAME='TPS',
  PROPERTIES( -- 算法属性
  'tps'='2000'
  )))
),
STREAM_CHANNEL ( -- 数据通道,连接生产者和消费者,用于 read 和 write 环节。如果不配置则默认使用 MEMORY 类型。
TYPE( -- 算法类型。可选项:MEMORY
NAME='MEMORY',
PROPERTIES( -- 算法属性
'block-queue-size'='2000' -- 属性:阻塞队列大小
)))
);

默认配置:

+‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑+‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑+‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑+ 
| read | write | stream_channel | 
+‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑+‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑+‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑+ 
| {"workerThread":20,"batchSize":1000,"shardingSize":10000000} | {"workerThread":20,"batchSize":1000} | {"type":"MEMORY", "props":{"block‑queue‑size":2000}} |
 +‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑+‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑+‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑‑+

示例

ALTER SYNCING RULE (READ(RATE_LIMITER (TYPE(NAME='QPS’,PROPERTIES(‘qps'='5000’)))))

2. 查看数据同步规则 #

SHOW SYNCING RULE

参数说明

示例

SHOW SYNCING RULE;

输出说明

列名说明
read数据读取配置。如果不配置则部分参数默认生效
write数据写入配置。如果不配置则部分参数默认生效。
stream_channel数据通道,连接生产者和消费者,用于 read 和 write 环节。如果不配置则默认使用 MEMORY 类型。

3. 查看数据同步规则列表 #

SHOW SYNCING LIST;

参数说明

示例

mysql> SHOW SYNCING LIST;
+--------------------------------------------+---------+----------------+--------+---------------------+-----------+
| id                                         | tables  | job_item_count | active | create_time         | stop_time |
+--------------------------------------------+---------+----------------+--------+---------------------+-----------+
| j5802p0000c1159b9ae7a81c76e196669420080657 | t_order | 2              | true   | 2024-02-19 15:30:57 | NULL      |
+--------------------------------------------+---------+----------------+--------+---------------------+-----------+

4. 查看数据同步作业状态 #

SHOW SYNCING STATUS jobId;

参数说明

参数名说明
jobId数据同步作业id

示例

mysql> SHOW SYNCING STATUS j5802p0000c1159b9ae7a81c76e196669420080657;
+------+--------------+---------+--------------------------+--------+-------------------------+---------------+--------------------------+---------------+
| item | data_source  | tables  | status                   | active | processed_records_count | delay_seconds | incremental_idle_seconds | error_message |
+------+--------------+---------+--------------------------+--------+-------------------------+---------------+--------------------------+---------------+
| 0    |     ds_0     | t_order | EXECUTE_INCREMENTAL_TASK | true   | 51                      | 1             | 2                        |               |
| 1    |     ds_1     | t_order | EXECUTE_INCREMENTAL_TASK | true   | 51                      | 0             | 2                        |               |
+------+--------------+---------+--------------------------+--------+-------------------------+---------------+--------------------------+---------------+

输出说明

列名说明
item同步任务内序号
data_source数据存储单元名称
status任务状态,详细参考状态描述
active任务是否活跃 true 或 false
processed_records_count处理完成的记录数
delay_seconds延迟时间(秒)
incremental_idle_seconds增量同步任务不活跃时间(单位:秒)
error_message错误信息

状态说明

取值描述
RUNNING运行中
PREPARING准备中
EXECUTE_INCREMENTAL_TASK增量同步中

5. 启动数据同步 #

SYNC DATA tableName TO SINK (TYPE(NAME='sinkType', PROPERTIES('url'='jdbc:mysql://localhost:3307/sharding_db?useSSL=false','username'='proxy', 'password'='Proxy@123')));

参数说明

参数名说明
tableName需要同步的表
sinkType下游类型,目前只支持 JDBC,此时 PROPERTIES 的值 url、username、password 是必填的

PROPERTIES 中的参数都是 key=values 格式,多个参数之间使用逗号分隔。

示例

SYNC DATA t_order,t_order_item TO SINK (TYPE(NAME='JDBC', PROPERTIES('url'='jdbc:mysql://localhost:3307/sharding_db?useSSL=false','username'='proxy', 'password'='Proxy@123', 'maxPoolSize'=20,
'source-start-position'='ds_0:mysql-bin.000002#8990668,ds_1:mysql-bin.000002#8990668')));

完整的参数说明如下:

配置名说明示例
source-start-position指定同步开始的位置,格式为 storageUnitName:binlogFileName#position,多个使用逗号分隔‘source-start-position’=‘ds_0:mysql-bin.000002#8990668,ds_1:mysql-bin.000002#8990668’
column-exclude-list排除指定表的指定列,格式为 tableName:column1,多个表之间使用 | 分隔。‘column-exclude-list’=’t_order:exclude1,exclude2|t_order_item:exclude1’
source-target-table-name-mapping源端和目标端的表名映射关系,两者不同的时候需要配置‘source-target-table-name-mapping’=’t_order:t_order_sink,t_order_item:t_order_item_sink’

6. 开启已经停止的同步作业 #

START SYNCING jobId

参数说明

参数名说明
jobId作业id

示例

START SYNCING j5802p0000c1159b9ae7a81c76e196669420080657;

7. 停止的作业 #

STOP SYNCING jobId

参数说明

参数名说明
jobId作业id

示例

STOP SYNCING j5802p0000c1159b9ae7a81c76e196669420080657;

8. 执行数据一致性校验 #

一致性校验算法选择请参考 数据一致性校验算法

CHECK SYNCING jobId BY TYPE (NAME='algorithmType');
参数名说明
jobId作业id
algorithmType一致性校验算法类型

示例

指定 jobId 指定校验算法执行数据一致性校验。

CHECK SYNCING j5802p0000c1159b9ae7a81c76e196669420080657 BY TYPE (NAME='CRC32_MATCH');

CHECK SYNCING j5802p0000c1159b9ae7a81c76e196669420080657 BY TYPE (NAME='DATA_MATCH', PROPERTIES('chunk-size'='1000'));

CHECK SYNCING j5802p0000c1159b9ae7a81c76e196669420080657 BY TYPE (NAME='SphereEx:BATCH_CRC32_MATCH');

CHECK SYNCING j5802p0000c1159b9ae7a81c76e196669420080657 BY TYPE (NAME='SphereEx:DATA_DIFF', PROPERTIES('diff-storage-type'='DATABASE','chunk-size'='1000','diff-storage-unit-name'='ds_2','diff-table-name'='spex_cnschk_1','incremental-idle-seconds-threshold'='30'));

9. 开启停止的数据一致校验作业 #

START SYNCING CHECK jobId
参数名说明
jobId作业 id

示例

指定 jobId 指定校验算法执行数据一致性校验。

START SYNCING CHECK j5802p0000c1159b9ae7a81c76e196669420080657 ;

10. 停止数据一致性校验作业 #

STOP SYNCING CHECK jobId
参数名说明
jobId作业id

示例

指定 jobId 指定校验算法执行数据一致性校验。

STOP SYNCING CHECK j5802p0000c1159b9ae7a81c76e196669420080657 ;

11. 查看数据一致性校验进度 #

SHOW SYNCING CHECK STATUS jobId
参数名说明
jobId作业id

示例

mysql> SHOW SYNCING CHECK STATUS j5802p0000c1159b9ae7a81c76e196669420080657;
+---------+--------+---------------------+--------+-------------------------------+-----------------------------+--------------------------+-------------------------+-------------------------+------------------+----------------------------+-----------------+---------------+
| tables  | result | check_failed_tables | active | inventory_finished_percentage | inventory_remaining_seconds | incremental_idle_seconds | check_begin_time        | check_end_time          | duration_seconds | algorithm_type             | algorithm_props | error_message |
+---------+--------+---------------------+--------+-------------------------------+-----------------------------+--------------------------+-------------------------+-------------------------+------------------+----------------------------+-----------------+---------------+
| t_order | true   |                     | false  | 100                           | 0                           |                          | 2024-02-19 16:07:40.926 | 2024-02-19 16:07:42.178 | 1                | SphereEx:BATCH_CRC32_MATCH |                 |               |
+---------+--------+---------------------+--------+-------------------------------+-----------------------------+--------------------------+-------------------------+-------------------------+------------------+----------------------------+-----------------+---------------+

12. 提交作业 #

COMMIT SYNCING jobId; 

参数说明

参数名说明
jobId作业id

示例

COMMIT SYNCING j5802p0000c1159b9ae7a81c76e196669420080657;

13. 撤销作业 #

ROLLBACK SYNCING jobId;
参数名说明
jobId作业id

示例

ROLLBACK SYNCING j5802p0000c1159b9ae7a81c76e196669420080657;

14. 更新数据同步进度 #

UPDATE SYNCING jobId OFFSET POSITION('storageUnitName'='position');
参数名说明
storageUnitName存储单元名称
position进度位点

示例

更新 MySQL 进度的格式是 file#pos,写法如下

UPDATE SYNCING j5802p0000c1159b9ae7a81c76e196669420080657 OFFSET POSITION('ds_0'='mysql-bin.000001#111','ds_1'='mysql-bin.000002#222');