弹性伸缩 #
| 语法 | 描述 | 类型 |
|---|---|---|
| ALTER RESHARDING RULE | 定义扩缩容规则 | RAL |
| RESHARD TABLE tableName by rule | 启动扩缩容 | RDL |
| SHOW RESHARDING RULE | 查看扩缩容规则 | RAL |
| SHOW RESHARDING LIST | 查看扩缩容作业列表 | RAL |
| SHOW RESHARDING STATUS jobId | 查看扩缩容作业状态 | RAL |
| START RESHARDING jobId | 启动已经停止的扩缩容作业 | RAL |
| STOP RESHARDING jobId | 停止扩缩容作业 | RAL |
| SHOW RESHARDING CHECK ALGORITHMS | 查看一致性校验算法 | RAL |
| CHECK RESHARDING jobId | 执行扩缩容数据一致性校验 | RAL |
| START RESHARDING CHECK jobId | 启动停止的扩缩容数据一致性校验作业 | RAL |
| STOP RESHARDING CHECK jobId | 停止的扩缩容数据一致性校验作业 | RAL |
| DROP RESHARDING CHECK jobId | 删除一致性校验作业 | RAL |
| COMMIT RESHARDING jobId | 提交扩缩容作业 | RAL |
| FORCE COMMIT RESHARDING jobId | 强制提交扩缩容缩作业 | RAL |
| ROLLBACK RESHARDING jobId | 撤销扩缩容作业 | RAL |
| APPLY RESHARDING jobId | 应用扩缩容新配置(切换元数据) | RAL |
| FORCE APPLY RESHARDING jobId | 强制应用扩缩容新配置(切换元数据) | RAL |
| STOP RESHARDING SOURCE WRITING jobId | 停写 | RAL |
| RESTORE RESHARDING SOURCE WRITING jobId | 恢复停写 | RAL |
| AUTO RESHARD TABLE tableName BY rule | 扩缩容自动模式 | RDL |
| RESHARD SINGLE TABLE tableName TO BROADCAST | 单表扩容成广播表 | RDL |
| AUTO RESHARD SINGLE TABLE tableName TO BROADCAST | 单表扩容成广播表自动模式 | RDL |
| UPDATE RESHARDING jobId OFFSET POSITION | 更新扩缩容增量进度 | RAL |
| PRECHECK RESHARD TABLE tableName BY rule | 重分布预检查 | RAL |
| PRECHECK RESHARD SINGLE TABLE tableName TO BROADCAST | 单表转广播表预检查 | RAL |
| PRECHECK REGISTER STORAGE UNIT storageUnitName rule | 注册存储节点时广播表预检查 | RAL |
| PRECHECK [PARTIAL] RESHARD TABLE tableName ADD STORAGE UNITS | 分片表部分重分布预检查, 增加存储节点 | RAL |
| PRECHECK [PARTIAL] RESHARD TABLE tableName DROP STORAGE UNITS | 分片表部分重分布预检查, 减少存储节点 | RAL |
| PRECHECK [PARTIAL] RESHARD TABLE tableName REPLACE STORAGE UNITS | 分片表,替换存储节点预检查 | RAL |
1. 定义扩缩容规则 #
ALTER RESHARDING RULE
因扩缩容 rule 具有默认值,无需创建,仅提供 ALTER 语句。
完整配置 DistSQL 示例
ALTER RESHARDING RULE (
READ(
WORKER_THREAD=20,
BATCH_SIZE=1000,
SHARDING_SIZE=10000000,
RATE_LIMITER (TYPE(NAME='QPS',PROPERTIES('qps'='500')))
),
WRITE(
WORKER_THREAD=20,
BATCH_SIZE=1000,
RATE_LIMITER (TYPE(NAME='TPS',PROPERTIES('tps'='2000')))
),
STREAM_CHANNEL (TYPE(NAME='MEMORY',PROPERTIES('block-queue-size'='2000')))
);
配置说明
ALTER RESHARDING RULE (
READ( -- 数据读取配置。如果不配置则部分参数默认生效。
WORKER_THREAD=20, -- 从源端摄取全量数据的线程池大小。如果不配置则使用默认值。
BATCH_SIZE=1000, -- 一次查询操作返回的最大记录数。如果不配置则使用默认值。
SHARDING_SIZE=10000000, -- 全量数据分片大小。如果不配置则使用默认值。
RATE_LIMITER ( -- 限流算法。如果不配置则不限流。
TYPE( -- 算法类型。可选项:QPS
NAME='QPS',
PROPERTIES( -- 算法属性
'qps'='500'
)))
),
WRITE( -- 数据写入配置。如果不配置则部分参数默认生效。
WORKER_THREAD=20, -- 数据写入到目标端的线程池大小。如果不配置则使用默认值。
BATCH_SIZE=1000, -- 一次批量写入操作的最大记录数。如果不配置则使用默认值。
RATE_LIMITER ( -- 限流算法。如果不配置则不限流。
TYPE( -- 算法类型。可选项:TPS
NAME='TPS',
PROPERTIES( -- 算法属性
'tps'='2000'
)))
),
STREAM_CHANNEL ( -- 数据通道,连接生产者和消费者,用于 read 和 write 环节。如果不配置则默认使用 MEMORY 类型。
TYPE( -- 算法类型。可选项:MEMORY
NAME='MEMORY',
PROPERTIES( -- 算法属性
'block-queue-size'='2000' -- 属性:阻塞队列大小
)))
);
2. 启动扩缩容 #
RESHARD TABLE tableName BY rule
参数说明
| 参数名 | 说明 |
|---|---|
| tableName | 执行扩缩容的表 |
示例
RESHARD TABLE t_order BY(
STORAGE_UNITS(ds_0, ds_1, ds_2),
SHARDING_COLUMN=order_id,
TYPE(NAME="hash_mod",PROPERTIES("sharding-count"="6")),
KEY_GENERATE_STRATEGY(COLUMN=order_id,TYPE(NAME="snowflake"))
);
3. 查看扩缩容规则 #
SHOW RESHARDING LIST;
参数说明
无
示例
SHOW RESHARDING LIST;
+---------------------------------------+---------+----------------------+--------+---------------------+-----------+
| id | tables | sharding_total_count | active | create_time | stop_time |
+---------------------------------------+---------+----------------------+--------+---------------------+-----------+
| j51017f973ac82cb1edea4f5238a258c25e89 | t_order | 2 | true | 2022-10-25 10:10:58 | NULL |
+---------------------------------------+---------+----------------------+--------+---------------------+-----------+
4. 查看扩缩容作业列表 #
SHOW RESHARDING LIST;
参数说明
无
示例
SHOW RESHARDING LIST;
+---------------------------------------+---------+----------------------+--------+---------------------+-----------+
| id | tables | sharding_total_count | active | create_time | stop_time |
+---------------------------------------+---------+----------------------+--------+---------------------+-----------+
| j51017f973ac82cb1edea4f5238a258c25e89 | t_order | 2 | true | 2022-10-25 10:10:58 | NULL |
+---------------------------------------+---------+----------------------+--------+---------------------+-----------+
5. 查看扩缩容作业状态 #
SHOW RESHARDING STATUS jobId;
参数说明
| 参数名 | 说明 |
|---|---|
| jobid | 作业id |
示例
SHOW RESHARDING STATUS 'j51017f973ac82cb1edea4f5238a258c25e89';
+------+-------------+--------------------------+--------+-------------------------+-------------------------------+-------------------+--------------------------+---------------+
| item | data_source | status | active | processed_records_count | inventory_finished_percentage | remaining_seconds | incremental_idle_seconds | error_message |
+------+-------------+--------------------------+--------+-------------------------+-------------------------------+-------------------+--------------------------+---------------+
| 0 | ds_0 | EXECUTE_INCREMENTAL_TASK | true | 3 | 100 | 0 | 92 | |
| 1 | ds_1 | EXECUTE_INCREMENTAL_TASK | true | 3 | 100 | 0 | 92 | |
+------+-------------+--------------------------+--------+-------------------------+-------------------------------+-------------------+--------------------------+---------------+
6. 启动已经停止的扩缩容作业 #
START RESHARDING jobId
参数说明
| 参数名 | 说明 |
|---|---|
| jobid | 作业id |
示例
START RESHARDING 'j51017f973ac82cb1edea4f5238a258c25e89';
7. 停止扩缩容作业 #
STOP RESHARDING jobId
参数说明
| 参数名 | 说明 |
|---|---|
| jobid | 作业id |
示例
STOP RESHARDING 'j51017f973ac82cb1edea4f5238a258c25e89';
8. 查看数据一致性校验算法 #
一致性校验算法选择请参考 数据一致性校验算法。
SHOW RESHARDING CHECK ALGORITHMS
示例
SHOW RESHARDING CHECK ALGORITHMS;
+----------------------------+---------------------------------------------+-----------------------------------------------------------------------------------------------+----------------------------------------------+
| type | type_aliases | supported_database_types | description |
+----------------------------+---------------------------------------------+-----------------------------------------------------------------------------------------------+----------------------------------------------+
| CRC32_MATCH | | MySQL,MariaDB,H2,Aurora | Match CRC32 of records. |
| DATA_MATCH | | SQL92,MySQL,PostgreSQL,openGauss,Oracle,SQLServer,MariaDB,H2,Hive,Presto,Aurora,DM,KingbaseES | Match raw data of records. |
| SphereEx:BATCH_CRC32_MATCH | SphereEx:DATA_CONSISTENCY_CHECK_CRC32_MATCH | MySQL,MariaDB,H2,Aurora | Match CRC32 of records (Commercial edition). |
| SphereEx:DATA_DIFF | | SQL92,MySQL,PostgreSQL,openGauss,Oracle,SQLServer,MariaDB,H2,Hive,Presto,Aurora,DM,KingbaseES | Diff raw data of records. |
+----------------------------+---------------------------------------------+-----------------------------------------------------------------------------------------------+----------------------------------------------+
输出说明
- type : 算法名
- type_aliases : 算法别名
- supported_database_types : 算法支持哪些数据库
- description : 算法说明
9. 执行扩缩容数据一致性 #
CHECK RESHARDING jobId BY TYPE (NAME='algorithmType');
| 参数名 | 说明 |
|---|---|
| jobId | 作业id |
| algorithmType | 一致性校验算法类型 |
示例
指定 jobId 指定校验算法执行数据一致性校验
CHECK RESHARDING 'j01016e501b498ed1bdb2c373a2e85e2529a6' BY TYPE (NAME='CRC32_MATCH');
CHECK RESHARDING 'j01016e501b498ed1bdb2c373a2e85e2529a6' BY TYPE (NAME='DATA_MATCH', PROPERTIES('chunk-size'='1000'));
CHECK RESHARDING 'j01016e501b498ed1bdb2c373a2e85e2529a6' BY TYPE (NAME='SphereEx:BATCH_CRC32_MATCH');
CHECK RESHARDING 'j01016e501b498ed1bdb2c373a2e85e2529a6' BY TYPE (NAME='SphereEx:DATA_DIFF', PROPERTIES('diff-storage-type'='DATABASE','chunk-size'='1000','diff-storage-unit-name'='ds_2','diff-table-name'='spex_cnschk_1','incremental-idle-seconds-threshold'='30'));
10. 启动停止的扩缩容数据一致性检测作业 #
START RESHARDING CHECK jobId
参数说明
| 参数名 | 说明 |
|---|---|
| jobid | 作业id |
示例
START RESHARDING CHECK 'j51017f973ac82cb1edea4f5238a258c25e89';
11. 停止扩缩容数据一致性检测作业 #
STOP RESHARDING CHECK jobId
参数说明
| 参数名 | 说明 |
|---|---|
| jobid | 作业id |
示例
STOP RESHARDING CHECK 'j51017f973ac82cb1edea4f5238a258c25e89';
12. 删除扩缩容数据一致性检测作业 #
DROP RESHARDING CHECK jobId
参数说明
| 参数名 | 说明 |
|---|---|
| jobid | 作业id |
示例
DROP RESHARDING CHECK 'j51017f973ac82cb1edea4f5238a258c25e89';
13. 提交扩缩容作业 #
COMMIT RESHARDING jobId
参数说明
| 参数名 | 说明 |
|---|---|
| jobId | 作业id |
示例
COMMIT RESHARDING 'j51017f973ac82cb1edea4f5238a258c25e89';
14. 强制提交扩缩容作业 #
FORCE COMMIT RESHARDING jobId
参数说明
| 参数名 | 说明 |
|---|---|
| jobid | 作业id |
示例
FORCE COMMIT RESHARDING 'j51017f973ac82cb1edea4f5238a258c25e89';
15. 撤销扩缩容作业 #
ROLLBACK RESHARDING jobId
参数说明
| 参数名 | 说明 |
|---|---|
| jobid | 作业id |
示例
ROLLBACK RESHARDING 'j51017f973ac82cb1edea4f5238a258c25e89';
16. 应用扩缩容新配置(切换元数据) #
APPLY RESHARDING jobId
参数说明
| 参数名 | 说明 |
|---|---|
| jobid | 作业id |
示例
APPLY RESHARDING 'j51017f973ac82cb1edea4f5238a258c25e89';
17. 强制应用扩缩容新配置(切换元数据) #
FORCE APPLY RESHARDING jobId
参数说明
| 参数名 | 说明 |
|---|---|
| jobid | 作业id |
示例
FORCE APPLY RESHARDING 'j51017f973ac82cb1edea4f5238a258c25e89';
18. 停写 #
STOP RESHARDING SOURCE WRITING jobId
参数说明
| 参数名 | 说明 |
|---|---|
| jobid | 作业id |
示例
STOP RESHARDING SOURCE WRITING 'j51017f973ac82cb1edea4f5238a258c25e89';
19. 恢复停写 #
RESTORE RESHARDING SOURCE WRITING jobId
参数说明
| 参数名 | 说明 |
|---|---|
| jobid | 作业id |
示例
RESTORE RESHARDING SOURCE WRITING 'j51017f973ac82cb1edea4f5238a258c25e89';
20. 扩缩容自动模式 #
AUTO RESHARD TABLE tableName BY rule
参数说明
| 参数名 | 说明 |
|---|---|
| tableName | 执行扩缩容的表 |
示例
AUTO RESHARD TABLE t_order BY(
STORAGE_UNITS(ds_0, ds_1, ds_2),
SHARDING_COLUMN=order_id,
TYPE(NAME="hash_mod",PROPERTIES("sharding-count"="6")),
KEY_GENERATE_STRATEGY(COLUMN=order_id,TYPE(NAME="snowflake"))
);
21. 单表扩容成广播表 #
RESHARD SINGLE TABLE tableName TO BROADCAST
参数说明
| 参数名 | 说明 |
|---|---|
| tableName | 执行扩缩容的单表 |
示例
RESHARD SINGLE TABLE t_address TO BROADCAST
22. 单表扩容成广播表自动模式 #
AUTO RESHARD SINGLE TABLE tableName TO BROADCAST
参数说明
| 参数名 | 说明 |
|---|---|
| tableName | 执行扩缩容的单表 |
示例
AUTO RESHARD SINGLE TABLE t_address TO BROADCAST
23. 更新扩缩容增量进度 #
UPDATE RESHARDING jobId OFFSET POSITION('storageUnitName'='position');
| 参数名 | 说明 |
|---|---|
| storageUnitName | 存储单元名称 |
| position | 增量进度位点 |
示例
更新 MySQL 进度的格式是 file#pos,写法如下
UPDATE RESHARDING j51017f973ac82cb1edea4f5238a258c25e89 OFFSET POSITION('ds_0'='mysql-bin.000001#111','ds_1'='mysql-bin.000002#222');
24. 重分布预检查 #
PRECHECK RESHARD TABLE tableName BY rule
| 参数名 | 说明 |
|---|---|
| tableName | 表名称 |
示例
PRECHECK RESHARD TABLE t_order BY (
STORAGE_UNITS(ds_1, ds_2),
SHARDING_COLUMN=order_id,
TYPE(NAME="hash_mod",PROPERTIES("sharding-count"="6")),
KEY_GENERATE_STRATEGY(COLUMN=order_id,TYPE(NAME="snowflake"))
);
mysql> PRECHECK RESHARD TABLE t_order BY (
-> DATANODES("ds_${0..1}.t_single_${0..1}"),
-> DATABASE_STRATEGY(TYPE="standard",SHARDING_COLUMN=id,SHARDING_ALGORITHM(TYPE(NAME="inline",PROPERTIES("algorithm-expression"="ds_${id % 2}")))),
-> TABLE_STRATEGY(TYPE="standard",SHARDING_COLUMN=id,SHARDING_ALGORITHM(TYPE(NAME="inline",PROPERTIES("algorithm-expression"="t_single_${id % 2}")))),
-> KEY_GENERATE_STRATEGY(COLUMN=order_id,TYPE(NAME="snowflake"))
-> );
+-----------------------------+--------+--------------------------------------------------------------------------------------------------------------------------+
| item | status | reason |
+-----------------------------+--------+--------------------------------------------------------------------------------------------------------------------------+
| Connection | true | |
| Binlog | true | |
| Replication | true | |
| Table name length | true | |
| Primary key or unique key | false | No primary key and unique key: ds_0.t_order |
| Table structure | true | |
| Privileges | true | |
| TimeZone | true | |
| Character set and collation | false | Sample storage unit: ds_1, sample character set: utf8mb3, sample collation: utf8mb3_unicode_ci, diff storage units: ds_0 |
+-----------------------------+--------+--------------------------------------------------------------------------------------------------------------------------+
9 rows in set (0.08 sec)
25. 单表转广播表预检查 #
PRECHECK RESHARD SINGLE TABLE tableName TO BROADCAST
| 参数名 | 说明 |
|---|---|
| tableName | 表名称 |
示例
PRECHECK RESHARD SINGLE TABLE t_single TO BROADCAST;
mysql> PRECHECK RESHARD SINGLE TABLE t_single TO BROADCAST;
+-----------------------------+--------+--------------------------------------------------------------------------------------------------------------------------+
| item | status | reason |
+-----------------------------+--------+--------------------------------------------------------------------------------------------------------------------------+
| Connection | true | |
| Binlog | false | Not enabled binlog: ds_1 |
| Replication | true | |
| Table name length | true | |
| Primary key or unique key | false | No primary key and unique key: ds_1.t_single |
| Table structure | true | |
| Privileges | true | |
| TimeZone | true | |
| Character set and collation | false | Sample storage unit: ds_1, sample character set: utf8mb3, sample collation: utf8mb3_unicode_ci, diff storage units: ds_0 |
+-----------------------------+--------+--------------------------------------------------------------------------------------------------------------------------+
9 rows in set (0.09 sec)
26. 注册存储节点时广播表预检查 #
PRECHECK REGISTER STORAGE UNIT storageUnitName rule
| 参数名 | 说明 |
|---|---|
| storageUnitName | 存储节点名称 |
示例
PRECHECK REGISTER STORAGE UNIT ds_3 (
URL='jdbc:mysql://127.0.0.1:3306/db3?useSSL=false',
USER='root',
PASSWORD='root',
PROPERTIES('minPoolSize'='1', 'connectionTimeoutMilliseconds'='30000', 'maxLifetimeMilliseconds'='1800000', 'idleTimeoutMilliseconds'='60000', 'maxPoolSize'='50')
);
mysql> PRECHECK REGISTER STORAGE UNIT ds_3 (
-> URL = "jdbc:mysql://127.0.0.1:3306/demo_ds?serverTimezone=UTC&useSSL=false&allowPublicKeyRetrieval=true",
-> USER = "root",
-> PASSWORD = "123456",
-> PROPERTIES("maximumPoolSize" = 10, "minimumIdle"=5, "idleTimeout" = "30000")
-> )\G
*************************** 1. row ***************************
name: ds_3
result: false
connection: success
privilege: success
broadcast: fail
message: Broadcast table `t_address` check failed: [Binlog] Not enabled binlog: ds_1;[Primary key or unique key] No primary key and unique key: ds_1.t_address, ds_0.t_address;[Character set and collation] Sample storage unit: ds_3, sample character set: utf8mb4, sample collation: utf8mb4_unicode_ci, diff storage units: ds_1
1 row in set (0.07 sec)
27. 分片表部分重分布预检查, 增加存储节点 #
PRECHECK [PARTIAL] RESHARD TABLE tableName ADD STORAGE UNITS
| 参数名 | 说明 |
|---|---|
| tableName | 表名称 |
示例
PRECHECK PARTIAL RESHARD TABLE t_order ADD STORAGE UNITS (ds_2,ds_3);
28. 分片表部分重分布预检查, 减少存储节点 #
PRECHECK [PARTIAL] RESHARD TABLE tableName DROP STORAGE UNITS
| 参数名 | 说明 |
|---|---|
| tableName | 表名称 |
示例
PRECHECK PARTIAL RESHARD TABLE t_order DROP STORAGE UNITS (ds_2, ds_3);
28. 分片表替换存储节点预检查 #
PRECHECK [PARTIAL] RESHARD TABLE tableName REPLACE STORAGE UNITS
| 参数名 | 说明 |
|---|---|
| tableName | 表名称 |
示例
PRECHECK PARTIAL RESHARD TABLE t_order REPLACE STORAGE UNITS (ds_1 TO ds_11, ds_2 TO ds_12);