Logo
数据同步

数据同步 #

数据同步功能旨在帮助用户实现两个数据源之间的数据实时同步。数据同步功能可应用于异地多活、数据灾备、数据仓库等多种业务场景。

概述 #

SphereEx-DBPlusEngine 提供的数据同步方案可以将 Proxy 的数据同步到下游,下游可以是一个同一个 Proxy 集群,也可以是另外的集群,也可以同步到其他的物理库中。

同步的过程中支持 DDL 语句的同步,也支持源端目标端的数据比对。

目前 SphereEx-DBPlusEngine 数据同步插件所支持的数据库产品为 MySQL,具体版本、环境要求和权限要求如下。

数据库
版本支持
环境要求
权限要求
1MySQL5.1.15 ~ 8.xmy.cnf 配置
[mysqld]
server-id=1
log-bin=mysql-bin
binlog-format=row
binlog-row-image=full
–确认命令如下
show variables like ‘%log_bin%’;
show variables like ‘%binlog%’;
GRANT REPLICATION SLAVE, REPLICATION CLIENT ON . TO ${username}@${host}
–确认命令如下
SHOW GRANTS FOR ‘user’;

在作业执行过程中,SphereEx-DBPlusEngine 能够根据集群资源情况及进行合理地调度,尽可能提高整体资源利用率,避免资源争夺,从而保作业执行效率。同时也支持任务的自动故障转移,给长时间运行的任务提供必要保障。

适用场景 #

源端 -> 目标端增量同步数据校验
1MySQL -> MySQL支持支持

使用前提 #

  • 已完成 SphereEx-DBPlusEngine 和数据库环境部署,网络通畅且服务正常。
  • 作业流程中需要依赖 Zookeeper,需提前完成部署。
  • 在数据同步阶段,请务必确保延迟时间段内的数据库日志不能被清除。

使用限制 #

支持项 #

  • 支持启动同步的时候指定 binlog 位点。
  • 支持源端和目标端表名不一致。
  • 支持排除指定的字段同步。
  • 支持断点续传。

不支持项 #

  • 不支持同步过程中源端表结构变更。
  • 同步过程中不允许动态修改同步列表。
  • 源端表结构变更不会自动同步。

注意事项 #

因为源端表结构可以和目标端不一样,所以数据同步需要提前创建目标端的表。

原理介绍 #

数据同步当前只支持增量同步,不同的数据库使用的技术细节不同,但总体上均为基于复制协议或 WAL 日志实现的变更数据捕获功能。

MySQL 通过订阅并解析 binlog,binlog 中记录了数据库的变更信息,包括 DDL 和 DML 语句。

使用指南 #

  1. 优化数据同步配置(可选)

SphereEx-DBPlusEngine 提供了数据同步的优化配置,可对数据抽取及数据写入的线程进行控制。在资源充沛的情况下,可最大限度使用资源,高效完成作业。同时,为了尽量减少对业务系统的影响,也可对流量进行限制。

具体可参考Syncing 作业定义

  1. 创建数据同步作业

通过 DistSQL 创建数据同步作业,可对数据同步作业进行查询、中断、重启和删除。

  1. 数据一致性校验(可选)

为了确保数据的一致性,可通过数据一致性校验功能进行确认,该过程通常在窗口时间进行。同一时间只允许同时运行一个数据一致性校验作业。用户可以对一致性校验进行查询、中断、重启和删除。

操作指南 #

数据同步的关键操作步骤如下所示。

  1. 初始化源端表的配置,比如配置分片规则,初始化数据等

  2. 目标端建表

  3. 优化数据同步任务配置(可选)

  4. 启动数据同步

  5. 源端进行数据写入

  6. 确认同步状态

  7. 执行数据一致性校验(可选)

完整流程示例 #

本示例为通过 SphereEx-DBPlusEngine 将 source 逻辑库下的数据同步到另一个 target 逻辑库下,源端和目标端均为分片表,且源端和目标端的表名不同。

具体操作命令请参考数据同步 DistSQL

前提条件 #

  1. 在 MySQL 中创建好数据库。

示例:

DROP DATABASE IF EXISTS source_ds_0;
CREATE DATABASE source_ds_0 DEFAULT CHARSET utf8;
    
DROP DATABASE IF EXISTS source_ds_1;
CREATE DATABASE source_ds_1 DEFAULT CHARSET utf8;
    
DROP DATABASE IF EXISTS target_ds;
CREATE DATABASE target_ds DEFAULT CHARSET utf8;
  1. 本地启动 SphereEx-DBPlusEngine,这里使用默认端口 3307。

操作步骤 #

  1. 准备源端环境

在 proxy 中创建源端数据库,注册数据源 source_ds_0,source_ds_1,创建分片规则,然后建表。

CREATE DATABASE source;

USE source

REGISTER STORAGE UNIT source_ds_0 (
    URL="jdbc:mysql://127.0.0.1:3306/source_ds_0?useServerPrepStmts=true&useSSL=false&characterEncoding=utf-8",
    USER="root",
    PASSWORD="root"
), source_ds_1 (
    URL="jdbc:mysql://127.0.0.1:3306/source_ds_1?useServerPrepStmts=true&useSSL=false&characterEncoding=utf-8",
    USER="root",
    PASSWORD="root"
);
    
CREATE SHARDING TABLE RULE t_order(
STORAGE_UNITS(source_ds_0,source_ds_1),
SHARDING_COLUMN=order_id,
TYPE(NAME="hash_mod",PROPERTIES("sharding-count"="4")),
KEY_GENERATE_STRATEGY(COLUMN=order_id,TYPE(NAME="snowflake"))
);
       
CREATE TABLE t_order (order_id INT NOT NULL, user_id INT NOT NULL, status VARCHAR(45) NULL, PRIMARY KEY (order_id));
  1. 准备目标端环境

在 proxy 中创建目标端数据库,注册数据源 target_ds,创建分片规则,然后建表。

CREATE DATABASE target;

USE target

REGISTER STORAGE UNIT target_ds (
    URL="jdbc:mysql://127.0.0.1:3306/target_ds?useServerPrepStmts=true&useSSL=false&characterEncoding=utf-8",
    USER="root",
    PASSWORD="root"
);
    
CREATE SHARDING TABLE RULE t_order_sink(
STORAGE_UNITS(target_ds),
SHARDING_COLUMN=order_id,
TYPE(NAME="hash_mod",PROPERTIES("sharding-count"="4")),
KEY_GENERATE_STRATEGY(COLUMN=order_id,TYPE(NAME="snowflake"))
);
       
CREATE TABLE t_order_sink (order_id INT NOT NULL, user_id INT NOT NULL, status VARCHAR(45) NULL, PRIMARY KEY (order_id));
  1. 启动数据同步

启动数据同步之前,可结合资源情况对同步的规则进行调整。

在 proxy 源端逻辑库下执行同步的 DistSQL,示例:

USE source;

SYNC DATA t_order TO SINK (TYPE(NAME='JDBC', PROPERTIES('url'='jdbc:mysql://localhost:3307/target?useSSL=false',
     'username'='root', 'password'='root', 
     'source-target-table-name-mapping'='t_order:t_order_sink')));

表名之间用逗号分割,如果要同步数据库下的所有表,可以用 * 代替表名。

PROPERTIES 配置项中的 url 填写 proxy 的 JDBC URL,source-target-table-name-mapping 是源端和目标端表名的映射关系。

同步任务开始后,可通过 SHOW 命令查看任务列表。

mysql> show syncing list;
+--------------------------------------------+---------+----------------+--------+---------------------+-----------+
| id                                         | tables  | job_item_count | active | create_time         | stop_time |
+--------------------------------------------+---------+----------------+--------+---------------------+-----------+
| j5802p0000e7520a2801da1f1b5a50cc359cc4dd4a | t_order | 2              | true   | 2024-02-20 10:10:23 |           |
+--------------------------------------------+---------+----------------+--------+---------------------+-----------+
1 row in set (0.02 sec)

根据列表中任务 id 的值,进一步查看该任务的状态。

mysql> show syncing status j5802p0000e7520a2801da1f1b5a50cc359cc4dd4a;
+------+-------------+---------+--------------------------+--------+-------------------------+------------------------------------+---------------+--------------------------+---------------+
| item | data_source | tables  | status                   | active | processed_records_count | incremental_local_last_commit_time | delay_seconds | incremental_idle_seconds | error_message |
+------+-------------+---------+--------------------------+--------+-------------------------+------------------------------------+---------------+--------------------------+---------------+
| 0    | source_ds_0 | t_order | EXECUTE_INCREMENTAL_TASK | true   | 0                       |                                    |               |                          |               |
| 1    | source_ds_1 | t_order | EXECUTE_INCREMENTAL_TASK | true   | 0                       |                                    |               |                          |               |
+------+-------------+---------+--------------------------+--------+-------------------------+------------------------------------+---------------+--------------------------+---------------+
2 rows in set (0.05 sec)
  1. 源端进行数据写入

随机进行增删改的操作,示例:

USE source

INSERT INTO t_order (order_id, user_id, status) VALUES (1, 1, 'active'), (2, 2, 'active2'), (3, 3, 'active3'), (4, 4, 'active4');
UPDATE t_order SET status = 'inactive' WHERE order_id = 1;
INSERT INTO t_order (order_id, user_id, status) VALUES (5, 5, 'active5');
DELETE FROM t_order WHERE order_id = 2;

然后再查看同步任务的状态。

mysql> SHOW SYNCING STATUS j5802p0000e7520a2801da1f1b5a50cc359cc4dd4a;
+------+-------------+---------+--------------------------+--------+-------------------------+------------------------------------+---------------+--------------------------+---------------+
| item | data_source | tables  | status                   | active | processed_records_count | incremental_local_last_commit_time | delay_seconds | incremental_idle_seconds | error_message |
+------+-------------+---------+--------------------------+--------+-------------------------+------------------------------------+---------------+--------------------------+---------------+
| 0    | source_ds_0 | t_order | EXECUTE_INCREMENTAL_TASK | true   | 2                       | 2024-02-20 10:15:01.123            | 0             | 7                        |               |
| 1    | source_ds_1 | t_order | EXECUTE_INCREMENTAL_TASK | true   | 3                       | 2024-02-20 10:15:01.156            | 0             | 7                        |               |
+------+-------------+---------+--------------------------+--------+-------------------------+------------------------------------+---------------+--------------------------+---------------+
2 rows in set (0.08 sec)
  1. 数据一致性校验(可选)

SphereEx-DBPlusEngine 可对源端和目标端的数据一致性进行校验,也包括两端记录数比对。

mysql> CHECK SYNCING j5802p0000e7520a2801da1f1b5a50cc359cc4dd4a BY TYPE (NAME='CRC32_MATCH');
Query OK, 0 rows affected (0.08 sec)

算法描述可参考数据一致性校验算法

通过 DistSQL 查看校验结果。

mysql> SHOW SYNCING CHECK STATUS j5802p0000e7520a2801da1f1b5a50cc359cc4dd4a;
+---------+--------+---------------------+----------+--------+-------------------------------+-----------------------------+--------------------------+-------------------------+-------------------------+------------------+----------------+-----------------+---------------+
| tables  | result | check_failed_tables | status   | active | inventory_finished_percentage | inventory_remaining_seconds | incremental_idle_seconds | check_begin_time        | check_end_time          | duration_seconds | algorithm_type | algorithm_props | error_message |
+---------+--------+---------------------+----------+--------+-------------------------------+-----------------------------+--------------------------+-------------------------+-------------------------+------------------+----------------+-----------------+---------------+
| t_order | true   |                     | FINISHED | false  | 100                           | 0                           |                          | 2024-02-20 10:16:46.861 | 2024-02-20 10:16:51.218 | 4                | CRC32_MATCH    |                 |               |
+---------+--------+---------------------+----------+--------+-------------------------------+-----------------------------+--------------------------+-------------------------+-------------------------+------------------+----------------+-----------------+---------------+
1 row in set (0.05 sec)
  1. 为进一步验证数据同步的情况,可在 proxy 中查询两者的数据
  • source
mysql> USE source;

mysql> select * from t_order order by order_id;
+----------+---------+----------+
| order_id | user_id | status   |
+----------+---------+----------+
|        1 |       1 | inactive |
|        3 |       3 | active3  |
|        4 |       4 | active4  |
|        5 |       5 | active5  |
+----------+---------+----------+
4 rows in set (0.20 sec)
  • target
mysql> USE target;

mysql> select * from t_order_sink order by order_id;
+----------+---------+----------+
| order_id | user_id | status   |
+----------+---------+----------+
|        1 |       1 | inactive |
|        3 |       3 | active3  |
|        4 |       4 | active4  |
|        5 |       5 | active5  |
+----------+---------+----------+
4 rows in set (0.03 sec)