Logo
Data Sharding

Data Sharding #

The traditional solution of storing data centrally in a single node has difficulties meeting the scenario of massive data in terms of performance, availability, operation, and maintenance cost.

In performance, the relational database mostly uses the B+ tree index. When the data amount exceeds the threshold, a deeper index will increase the disk IO access number, and weaken the query performance. At the same time, high concurrency requests also make the centralized database the greatest limitation of the system.

From the aspect of availability, the statelessness of service, which can achieve a smaller cost of scaling at will, inevitably leads to the ultimate pressure of the system falling on the database. And a single data node, or a simple primary-secondary architecture, has become increasingly difficult to bear. The availability of the database has become the key to the whole system.

In terms of DevOps cost, when the data in a database instance reaches a threshold or more, the DevOps pressure on the DBA will increase. The time cost of both data backup and recovery will become more and more unmanageable because of the size of the data.

As traditional relational databases fail to meet the needs of Internet scenarios, there are more and more attempts to store data to natively distributed NoSQL. However, the incompatibility of NoSQL with SQL and the imperfection of the ecosystem make them unable to deliver a fatal blow in the game while relational databases still remain unshakable.

Overview #

The SphereEx-DBPlusEngine sharding plugin minimizes the impact of shard tables, allowing users to use a horizontally sharded database cluster as if it were a single database, and currently enables sharding solutions for a variety of databases.

Basic Concept #

  • Table

    Tables are a key concept for transparent data sharding, and SphereEx-DBPlusEngine adapts to the data sharding needs of different scenarios by providing a variety of table types.

  • Logic Table

    The logical name of the horizontally split database (table) of the same structure is the logical identifier of the table in SQL. Example: the order data is split into 10 tables according to the primary key endings, t_order_0 to t_order_9, and their logical table names are t_order.

  • Table Relation

    Refers to a set of shard tables with consistent shard rules. When using bound tables for multi-table association query, you must use the shard button for association. Otherwise, Cartesian product association or cross-library association will occur, which will affect the query efficiency. For example, if the t_order table and t_order_item table are both sliced according to order_id and use order_id to associate, the two tables are related to each other as bound tables. The multi-table association query between the bound tables will not have a Cartesian product association, and the efficiency of the association query will be greatly improved.

  • Broadcast table

    Refers to a table that exists in all sharded data sources, and the table structure and its data are identical in each database. It is suitable for scenarios where the amount of data is not large and you need to associate queries with tables of massive data. e.g., dictionary tables.

  • Single Table

    Refers to the only table that exists in all sharded data sources. Suitable for tables that do not have a large amount of data and do not need to be sharded.

  • Distributed Transactions

    • XA
    • BASE
  • Distributed Primary Keys

    • SNOWFLAKE
    • UUID
    • NanoID
  • SQL Parse

    It is divided into lexical parsing and syntactic parsing. The lexical parser first breaks SQL into non-divisible words. The syntax parser is then used to understand SQL and finally refine the parsing context. The parsing context includes tables, selection items, sorting items, grouping items, aggregation functions, paging information, query conditions, and markers for placeholders that may need to be modified.

  • SQL Route

    Match the user-configured fragmentation policy based on the resolution context and generate a routing path. Currently, both shard routing and broadcast routing are supported.

  • SQL Rewrite

    Rewrites SQL to statements that can be executed correctly in the real database. SQL rewriting is divided into correctness rewriting and optimization rewriting.

  • SQL Execution

    Asynchronous execution via multi-threaded executor.

  • Results merge

    Combine multiple execution result sets for export through a unified JDBC interface. Result merging includes stream merging, in-memory merging, and append merging using the decorator pattern.

  • Query Optimization

    Supported by the Federation execution engine, it optimizes complex queries such as correlation queries and subqueries. It also supports distributed queries across multiple database instances, using relational algebra internally to optimize the query plan and query the results through the optimal plan.

Applicable Scenarios #

Highly concurrent OLTP scenarios with massive amounts of data #

Since most relational databases use B+ tree type indexes, the increase in index depth will also increase the number of IO disk accesses when the data volume exceeds the threshold, which will lead to the degradation of query performance. Through SphereEx-DBPlusEngine data shard, the data stored in a single database can be spread out to multiple databases or tables according to a certain business dimension. It can achieve the effect of performance improvement and meet the performance requirements in the highly concurrent OLTP scenarios.

Massive Data Real-time Analytics OLAP Scenarios #

In traditional database architecture, if users want to perform data analysis, they need to use ETL tools to synchronize data to the data platform first, and then perform data analysis. Using ETL tools will lead to a significant reduction in the effectiveness of data analysis. SphereEx-DBPlusEngine provides static entry and heterogeneous language support, independent of application deployment, for OLAP scenarios with real-time analytics.

Prerequisites for Use #

  • SphereEx-DBPlusEngine and database cluster are installed on the server and the service is running properly.
  • See “Restrictions on Use” below for details.

Restrictions on use #

Compatible with the commonly used SQL routed to single data node; due to the relatively complex scenarios of SQL routed to multiple data nodes, it is divided into three cases: stable support, experimental (not recommended for use) support and not support.

CategoryDescription
Stable SupportRegular queries, subqueries, paging queries, op-expressions with sharded keys
Experimental SupportSubquery (subquery and outer unspecified slice key/key value inconsistency), cross-library correlation query
Not supportedCASE WHEN, partial paging queries for Oracle and SQL Server

Stable support already #

Full support for DML, DDL, DCL, TCL and common DALs. Support for complex queries such as paging, de-duplication, sorting, grouping, aggregation and table association. Supports for PostgreSQL and openGauss database SCHEMA DDL and DML statements. Refer to the compatibility list for details.

General Enquiry

  • SELECT Main Statement
SELECT select_expr [, select_expr ...] FROM table_reference [, table_reference ...]
[WHERE predicates]
[GROUP BY {col_name | position} [ASC | DESC], ...]
[ORDER BY {col_name | position} [ASC | DESC], ...]
[LIMIT {[offset,] row_count | row_count OFFSET offset}]
  • select_expr
* | 
[DISTINCT] COLUMN_NAME [AS] [alias] | 
(MAX | MIN | SUM | AVG)(COLUMN_NAME | alias) [AS] [alias] | 
COUNT(* | COLUMN_NAME | alias) [AS] [alias]
  • table_reference
tbl_name [AS] alias] [index_hint_list]
| table_reference ([INNER] | {LEFT|RIGHT} [OUTER]) JOIN table_factor [JOIN ON conditional_expr | USING (column_list)]

Sub-query

Stable support is provided by the kernel when both the subquery and the outer query specify a shard key and the values of the shard keys remain consistent.

e.g.

SELECT * FROM (SELECT * FROM t_order WHERE order_id = 1) o WHERE o.order_id = 1;

Subqueries for paging, with stable support from the kernel.

e.g.

SELECT * FROM (SELECT row_.*, rownum rownum_ FROM (SELECT * FROM t_order) row_ WHERE rownum <= ?) WHERE rownum > ?;

Paging Query

MySQL, PostgreSQL, openGauss subqueries are fully supported.

MySQL, PostgreSQL and openGauss all support LIMIT paging without subqueries.

SELECT * FROM t_order o ORDER BY id LIMIT ? OFFSET ?

The operation expression contains the shard key

When the slice key is in an operator expression, the value used for the slice cannot be extracted through the SQL literal and will result in full routing. For example, assuming create_time is the shard key.

SELECT * FROM t_order WHERE to_date(create_time, 'yyyy-mm-dd') = '2019-01-01';

Experimental Support #

Experimental support refers specifically to support using the Federation execution engine. The engine is in rapid development and largely available to users, but still needs extensive optimization as an experimental product.

Sub-query

The Federation execution engine provides support when subqueries and outer queries do not specify a shard key or the shard keys have inconsistent values.

e.g.

SELECT * FROM (SELECT * FROM t_order) o;

SELECT * FROM (SELECT * FROM t_order) o WHERE o.order_id = 1;

SELECT * FROM (SELECT * FROM t_order WHERE order_id = 1) o;

SELECT * FROM (SELECT * FROM t_order WHERE order_id = 1) o WHERE o.order_id = 2;

Cross-database Association Query

The Federation execution engine provides support when multiple tables in an associative query are distributed across different database instances. Assuming that ’t_order’ and ’t_order_item’ are sharded tables with multiple data nodes and no binding table rules are configured, and ’t_user’ and ’t_user_role’ are single tables distributed across different database instances, then the Federation execution engine can support the following common correlation queries:

SELECT * FROM t_order o INNER JOIN t_order_item i ON o.order_id = i.order_id WHERE o.order_id = 1;

SELECT * FROM t_order o INNER JOIN t_user u ON o.user_id = u.user_id WHERE o.user_id = 1;

SELECT * FROM t_order o LEFT JOIN t_user_role r ON o.user_id = r.user_id WHERE o.user_id = 1;

SELECT * FROM t_order_item i LEFT JOIN t_user u ON i.user_id = u.user_id WHERE i.user_id = 1;

SELECT * FROM t_order_item i RIGHT JOIN t_user_role r ON i.user_id = r.user_id WHERE i.user_id = 1;

SELECT * FROM t_user u RIGHT JOIN t_user_role r ON u.user_id = r.user_id WHERE u.user_id = 1;

Not supported

Unsupported SQL:

  • CASE WHEN contains subqueries
  • Use logical table names in CASE WHEN (use table aliases please)
  • INSERT INTO tbl_name (col1, col2, …) SELECT * FROM tbl_name WHERE col3 = ? (The SELECT clause does not support * and the built-in distributed primary key generator)
  • REPLACE INTO tbl_name (col1, col2, …) SELECT * FROM tbl_name WHERE col3 = ?(The SELECT clause does not support * and the built-in distributed primary key generator)
  • SELECT MAX(tbl_name.col1) FROM tbl_name (When the query column is a function expression, the table name can not be used before the query column while table alias can be used)

CASE WHEN

The following CASE WHEN statements are not supported:

  • CASE WHEN contains subqueries
  • Use of logical table names in CASE WHEN (use table aliases please))

Note #

  1. Shard table Join problem, the small tables involved in multi-table association can be configured as broadcast tables, which can improve the efficiency of association.
  2. Distributed primary key problem, in the Sharding scenarios, self-incrementing fields can not guarantee global uniqueness, therefore you need to build an issuer to generate globally unique IDs, such as Snowflake, UUID or NanoID.
  3. Distributed transaction problem: SphereEx-DBPlusEngine provides LOCAL, XA, BASE three modes of distributed transactions, GTM is not implemented for the time being.
  4. The real tables, sharding columns, and distributed sequences configured in the sharding rules need to have the same case as the columns in the database.

Principle Introduction #

The SphereEx-DBPlusEngine kernel processing flow includes the standard SQL Parser and SQL Binder modules, which are used to identify SQL specific features and divide the SQL execution flow into Simple Push Down Engine and SQL Federation Engine based on their results.

For a detailed description of the principle content, please refer to Kernel Principles - Data Fragmentation .

User Guide #

  1. Sharding Design: Sharding Key

    Before you can start sharding, you first need to determine the sharding key (sharding key). Many examples use self-incrementing IDs or timestamp fields for sharding, but these are not absolute; a better recommendation is to choose them in the context of the business. Consider the following strategy for selection.

    • Statistical analysis of the SQL executed in the system is performed to select the most frequently used or important field class slices in the table that needs to be sliced. This may include some queries from the OLAP class, which can be excluded from SQL.

    • If the business is very complex and multiple dimensional divisions are possible, consider appropriate multi-dimensional field splitting of the table. Consider splitting by business logic if necessary (rather than simple field division).

    • If multi-dimensional splitting is involved, some data redundancy needs to be maintained. Generally, the primary dimension can be written by the program, while the secondary dimension is written asynchronously to ensure that both are ultimately consistent.

    • If related queries are involved, they can be implemented with a sharding strategy. For example, consider ER tables, broadcast tables, and other tie-in strategies. If it is very complex, it can also be considered to provide support for the OLAP system. For data synchronization strategy, push-pull combination can be considered.

    • The final choice of splitting fields should be stable and unchanging to avoid cross-slice movement problems.

  2. Sharding design: sharding strategy

For the splitting algorithm, the common algorithms are LIST, RANGE, HASH, etc. According to the characteristics of each splitting algorithm, choosing is avaialble. If the range is uniform, HASH can be used, and RANGE can be used for hot and cold data. At the same time, it can be used with some characterized design, such as the use of secondary mapping to solve the problem of horizontal scaling, feature encoding fields to meet the multi-feature splitting, etc.

HashListRange
AdvantagesEven distribution and accessManual identificationManual identification, range query friendly Shrink/expansion relatively friendly
DisadvantagesDisorderly, range queries need to cross pieces Manually unable to identify distribution Shrink/expansion unfriendlyTends to be unevenly distributedUneven data distribution/data access

The most common ones are HASH and RANGE, which are described below.

  • HASH

Although there are many schemes for sharding tables, Hash sharding is the most popular and widespread scheme. Random sharding is not random, but follows certain rules. It is usually split by HASH modulo, so it is sometimes called discrete sharding. The data in random sharding is relatively uniform and less prone to hotspots and bottlenecks of concurrent access. However, it is less convenient when data migration is involved later. This problem can be largely avoided by using the consistent HASH algorithm. In addition, discrete sharding is also prone to the complications of cross-slice queries.

  • RANGE

Ranging data by range is one of the most simple sharding schemes, and it can be used flexibly in combination with other sharding schemes. When a range lookup is required using a partitioned field, the RANGE partitioning strategy can quickly locate data for efficient querying. In most cases, cross-slice lookup problems are avoided. In the later expansion, it is also easier to simply add nodes, without the need to migrate data from other slices. However, this distribution method is prone to data hotspot problems.

  1. Design: sharding quantity planning

Due to the different business characteristics, there is no clear limit to the number of single shard, which needs to be determined by combining with the test results to meet the efficiency and reserved space requirements.

  1. Design: Distributed ID

In a database split table environment, the data is distributed on different shardings and can no longer be generated directly by the database self-growth feature. Otherwise, it will cause duplicate primary keys of data tables on different shardings, you can use a global issuer to generate a global unique ID, such as Snowflake, UUID or NanoID.

  1. For online business, you need to confirm whether there are unsupported statements, and adapt the transformation

After the data is split, it is bound to face some SQL limitation problems compared with the traditional centralized one. Here we need the business to sort out the SQL features, combine with the description of the usage limitation of split, and make the adaptation transformation.

  1. Migration and expansion for live business

For new projects, you can use it directly after configuring the sharding rules. For online business, migrating data from centralized to distributed, you can refer to the data migration plug-in. For secondary expansion, you can refer to the elastic scaling plug-in.

Sharding Algorithm #

Algorithm for data shard, supports =, >=, <=, >, < BETWEEN and IN for sharding. The shard algorithm can be implemented by the developer or you can use SphereEx-DBPlusEngine’s built-in shard algorithm syntax sugar, which is very flexible.

Automated Sharding Algorithm #

Sharding algorithm syntactic sugar for conveniently hosting all data nodes without users having to concern themselves with the physical distribution of real tables, Including implementation of common slicing algorithms such as modulo, hash, range, time, etc.

Custom Sharding Algorithm #

Provides an interface for application developers to implement their own slicing algorithms that are closely related to the business implementation and allows users to manage the physical distribution of real tables by themselves. Custom slicing algorithms are subdivided into

  • Standard sharding algorithms are used to handle scenarios where sharing is performed using a single key as the shard key =, IN, BETWEEN AND, >, <, >=, <= .
  • The composite sharding algorithm is used to handle scenarios where multiple keys are used as shard keys, and the logic of multiple shard keys is more complex and requires the application developer to handle the complexity of it.
  • Hint sharding algorithm is used to handle scenarios that use Hint row sharding. SphereEx-DBPlusEngine slicing plug-in provides a variety of built-in slicing algorithms, which can be divided into automatic sharding algorithms and custom sharding algorithms according to the type, and can meet the needs of most business scenarios of users.
CategorySharding AlgorithmKey Word
1Auto Sharding AlgorithmsModulo Sharding AlgorithmMOD
2Hash Modulo Sharding AlgorithmHASH_MOD
3Volume Based Range Sharding AlgorithmVOLUME_RANGE
4Boundary Based Range Sharding AlgorithmBOUNDARY_RANGE
5Auto Interval Sharding AlgorithmAUTO_INTERVAL
6Custom Sharding AlgorithmsInline Sharding AlgorithmINLINE
7Interval Sharding AlgorithmINTERVAL
8Complex Sharding AlgorithmCOMPLEX_INLINE
9Hint Sharding AlgorithmHINT_INLINE

Inline Expression #

In the tedious data sharding rule configuration, with the increase of data nodes, a large number of repeated configurations make the configuration itself not easy to maintain. The data node configuration workload can be effectively simplified by inline expressions.

For the common slicing algorithm, using Java code implementation does not help to manage the configuration uniformly. By writing the sharding algorithm through line expressions, the rule configuration can be effectively stored together, which is easier to browse and store.

Syntax Description #

Inline expressions are intuitive to use, just use ${ expression } or $->{ expression } in the configuration to identify the inline expressions. The two parts of the configuration, data nodes and slicing algorithms, are currently supported. The content of inline expressions uses Groovy syntax, and all operations supported by Groovy are supported by inline expressions. For example

${begin..end} indicates a range interval

${[unit1, unit2, unit_x]} means an enumeration value

If there are multiple ${ expression } or $->{ expression } expressions in an inline expression, the final result of the whole expression will be a Cartesian combination based on the result of each sub-expression.

For example, the following inline expressions:

${['online', 'offline']}_table${1..3}

The final resolution will be:

online_table1, online_table2, online_table3, offline_table1, offline_table2, offline_table3

Configuration Note #

For uniformly distributed data nodes, if the data structure is as follows:

db0
  ├── t_order0
  └── t_order1
db1
  ├── t_order0
  └── t_order1

Using an inline expression, this can be simplified to

db${0..1}.t_order${0..1}

or

db$->{0..1}.t_order$->{0..1}

For a custom data node, if the data structure is as follows:

db0
  ├── t_order0
  └── t_order1
db1
  ├── t_order2
  ├── t_order3
  └── t_order4

Using an inline expression, this can be simplified to

db0.t_order${0..1},db1.t_order${2..4}

or

db0.t_order$->{0..1},db1.t_order$->{2..4}

For data nodes with prefixes, they can also be flexibly configured by line expressions. if the data structure is as follows:

db0
  ├── t_order_00
  ├── t_order_01
  ├── t_order_02
  ├── t_order_03
  ├── t_order_04
  ├── t_order_05
  ├── t_order_06
  ├── t_order_07
  ├── t_order_08
  ├── t_order_09
  ├── t_order_10
  ├── t_order_11
  ├── t_order_12
  ├── t_order_13
  ├── t_order_14
  ├── t_order_15
  ├── t_order_16
  ├── t_order_17
  ├── t_order_18
  ├── t_order_19
  └── t_order_20
db1
  ├── t_order_00
  ├── t_order_01
  ├── t_order_02
  ├── t_order_03
  ├── t_order_04
  ├── t_order_05
  ├── t_order_06
  ├── t_order_07
  ├── t_order_08
  ├── t_order_09
  ├── t_order_10
  ├── t_order_11
  ├── t_order_12
  ├── t_order_13
  ├── t_order_14
  ├── t_order_15
  ├── t_order_16
  ├── t_order_17
  ├── t_order_18
  ├── t_order_19
  └── t_order_20

Separate configurations can be used. First configure the data nodes containing prefixes, then configure the data nodes without prefixes, and then just use the Cartesian product of inline expressions to combine them automatically. The above example, using inline expressions, can be simplified as:

db${0..1}.t_order_0${0..9}, db${0..1}.t_order_${10..20}

or

db$->{0..1}.t_order_0$->{0..9}, db$->{0..1}.t_order_$->{10..20}

For SQL that uses = and IN for sharding with only one shard key, you can use line expressions instead of coded configuration.

The expression inside the inline expression is essentially a piece of Groovy code that returns the corresponding real data source or real table name based on the way the sharding key is calculated.

For example, if there are 10 databases, a 0-tailed database is routed to a 0-tailed data source, a 1-tailed database is routed to a 1-tailed data source, and so on. The inline expression used to represent the sharding algorithm:

ds${id % 10}

or

ds$->{id % 10}

Sharding Strategy #

Contains sharding keys and sharding algorithms, which are separated independently due to the independence of sharding algorithms. What can really be used for sharding operations is the sharding key + sharding algorithm, that is, the sharding strategy.

Forced Sharding Route #

For scenarios where the split field is not determined by SQL but by other external conditions, use SQL Hint to inject the split value. e.g. Shard the database by employee login primary key when there is no such field in the database. SQL Hint supports both Java API and SQL annotation (to be implemented).

  • Implementation Motivation:

Extracting shard key columns and values by parsing SQL statements and performing sharding is a zero-intrusion implementation of SphereEx-DBPlusEngine for SQL. If there is no sharding condition in the SQL statement, sharding cannot be performed and full routing is required. In some application scenarios, the shard condition does not exist in SQL, but in external business logic. So there is a need to provide a way to specify shard results externally, which is called Hint in SphereEx-DBPlusEngine.

  • Implementation Mechanism:

DBPlusEngine uses ThreadLocal to manage sliced keys. You can programmatically add a sharding condition to the HintManager that takes effect only within the current thread. In addition to using forced slice routing programmatically, SphereEx-DBPlusEngine can also reference Hint by means of special annotations in SQL, allowing developers to use this functionality in a more transparent way. SQL that specifies forced sharding routes will ignore the original sharding logic and route directly to the specified real data node.

Operation Guide #

DistSQL #

  1. Prepare the environment to complete SphereEx-DBPlusEngine and database deployment.
  2. Create logical libraries and register storage (database) resources.
  3. Create sharding rules and configure data sources, sharding algorithms and the number of shards.
  4. Create a sharding table with the same name as the sharding rule.
  5. Insert test data, confirm the execution path, and complete the configuration.

Console #

  1. Prepare the environment to complete the SphereEx-Console, SphereEx-DBPlusEngine and database deployment.
  2. Log in to SphereEx-Console and click the “New Table” button on the “Cluster” - “Objects” page.
  3. On the New Table page, fill out the table creation statement and click the Add Shard Plugin button below.
  4. on the shard plug-in page, configure the sharding information.
  5. Confirm the table distribution and complete the configuration.

Configuration Example #

The following configuration examples do not involve stock data. For the case of stock data splitting, you need to refer to thedata migration plug-inoperation.

Configuring Sharding with DistSQL #

Environmental Description #

InstanceIP AddressService PortHost NameNote
1DBPlusEngine 1.2.0192.168.xx.1023307dbplusengine
2MySQL 8.0.28192.168.xx.1053306mysql_0
3MySQL 8.0.28192.168.xx.1063306mysql_1

Topology diagram #

Topology diagram

Configuration Process #

Please create mysql_0 and mysql_1 databases in two MySQL data sources in advance.

    1. Create a logical database named testdb in SphereEx-DBPlusEngine.。
mysql> CREATE DATABASE testdb;
Query OK, 0 rows affected (0.15 sec)

mysql> SHOW DATABASES;
+--------------------+
| schema_name        |
+--------------------+
| mysql              |
| information_schema |
| performance_schema |
| sys                |
| testdb             |
+--------------------+
5 rows in set (0.00 sec)
  1. Register the prepared data source into SphereEx-DBPlusEngine.
mysql> USE testdb;
Database changed

mysql> REGISTER STORAGE UNIT ds_0 (
    URL="jdbc:mysql://192.168.xx.105:3306/mysql_0?serverTimezone=UTC&useSSL=false",
    USER="test",
    PASSWORD="Test@123"
), ds_1 (
    URL="jdbc:mysql://192.168.xx.106:3306/mysql_1?serverTimezone=UTC&useSSL=false",
    USER="test",
    PASSWORD="Test@123"
);
Query OK, 0 rows affected (0.60 sec)

mysql> SHOW STORAGE UNITS\G
*************************** 1. row ***************************
                           name: ds_1
                           type: MySQL
                           host: 192.168.xx.106
                           port: 3306
                             db: mysql_1
connection_timeout_milliseconds: 30000
      idle_timeout_milliseconds: 60000
      max_lifetime_milliseconds: 2100000
                  max_pool_size: 50
                  min_pool_size: 1
                      read_only: false
               other_attributes: {"dataSourceProperties":{"cacheServerConfiguration":"true","elideSetAutoCommits":"true","useServerPrepStmts":"true","cachePrepStmts":"true","rewriteBatchedStatements":"true","cacheResultSetMetadata":"false","useLocalSessionState":"true","maintainTimeStats":"false","prepStmtCacheSize":"200000","tinyInt1isBit":"false","prepStmtCacheSqlLimit":"2048","netTimeoutForStreamingResults":"0","zeroDateTimeBehavior":"round"},"healthCheckProperties":{},"initializationFailTimeout":1,"validationTimeout":5000,"leakDetectionThreshold":0,"poolName":"HikariPool-83","registerMbeans":false,"allowPoolSuspension":false,"autoCommit":true,"isolateInternalQueries":false}
*************************** 2. row ***************************
                           name: ds_0
                           type: MySQL
                           host: 192.168.xx.105
                           port: 3306
                             db: mysql_0
connection_timeout_milliseconds: 30000
      idle_timeout_milliseconds: 60000
      max_lifetime_milliseconds: 2100000
                  max_pool_size: 50
                  min_pool_size: 1
                      read_only: false
               other_attributes: {"dataSourceProperties":{"cacheServerConfiguration":"true","elideSetAutoCommits":"true","useServerPrepStmts":"true","cachePrepStmts":"true","rewriteBatchedStatements":"true","cacheResultSetMetadata":"false","useLocalSessionState":"true","maintainTimeStats":"false","prepStmtCacheSize":"200000","tinyInt1isBit":"false","prepStmtCacheSqlLimit":"2048","netTimeoutForStreamingResults":"0","zeroDateTimeBehavior":"round"},"healthCheckProperties":{},"initializationFailTimeout":1,"validationTimeout":5000,"leakDetectionThreshold":0,"poolName":"HikariPool-84","registerMbeans":false,"allowPoolSuspension":false,"autoCommit":true,"isolateInternalQueries":false}
2 rows in set (0.00 sec)
  1. Create a sharding rule and confirm the creation result, where the name of the created sharding rule needs to be consistent with the business table name.
mysql> CREATE SHARDING TABLE RULE t_user(
 STORAGE_UNITS(ds_0,ds_1),
 SHARDING_COLUMN=user_id,
 TYPE(NAME="hash_mod",PROPERTIES("sharding-count"="4"))
);
Query OK, 0 rows affected (0.40 sec)

mysql> SHOW SHARDING TABLE RULE t_user\G
*************************** 1. row ***************************
                            table: t_user
                actual_data_nodes:
              actual_data_sources: ds_0,ds_1
           database_strategy_type:
         database_sharding_column:
 database_sharding_algorithm_type:
database_sharding_algorithm_props:
              table_strategy_type: STANDARD
            table_sharding_column: user_id
    table_sharding_algorithm_type: hash_mod
   table_sharding_algorithm_props: sharding-count=4
              key_generate_column:
               key_generator_type:
              key_generator_props:
1 row in set (0.00 sec)

With the above configuration, the t_user table is divided into 4 shardings, which are stored in 2 data sources. The user_id is the sharding field and the sharding algorithm used is hash_mod.

  1. Create the t_user table and insert several rows.
mysql> CREATE TABLE `t_user` (
 `user_id` int NOT NULL,
 `order_id` int NOT NULL,
 `status` varchar(45) DEFAULT NULL,
 PRIMARY KEY (`user_id`)
);
Query OK, 0 rows affected (0.45 sec)

mysql> INSERT INTO t_user VALUES
(1,1,'active'),
(2,2,'active'),
(3,3,'active'),
(4,4,'active');
Query OK, 4 rows affected (0.20 sec)

mysql> SELECT * FROM t_user ORDER BY user_id;
+---------+----------+--------+
| user_id | order_id | status |
+---------+----------+--------+
|       1 |        1 | active |
|       2 |        2 | active |
|       3 |        3 | active |
|       4 |        4 | active |
+---------+----------+--------+
4 rows in set (0.06 sec)
  1. Use the PREVIEW command to view the execution path of SQL.
mysql> PREVIEW SELECT * FROM t_user;
+------------------+---------------------------------------------------------+
| data_source_name | actual_sql                                              |
+------------------+---------------------------------------------------------+
| ds_0             | SELECT * FROM t_user_0 UNION ALL SELECT * FROM t_user_2 |
| ds_1             | SELECT * FROM t_user_1 UNION ALL SELECT * FROM t_user_3 |
+------------------+---------------------------------------------------------+
2 rows in set (0.05 sec)
  1. Finally, the distribution of shardings is confirmed in the physical repository.
--ds_0
mysql> SHOW DATABASES;
+--------------------+
| Database           |
+--------------------+
| information_schema |
| mysql              |
| mysql_0            |
| performance_schema |
| sys                |
+--------------------+
5 rows in set (0.00 sec)

mysql> USE mysql_0;

mysql> SHOW TABLES;
+-------------------+
| Tables_in_mysql_0 |
+-------------------+
| t_user_0          |
| t_user_2          |
+-------------------+
2 rows in set (0.00 sec)

mysql> SELECT * FROM t_user_0;
+---------+----------+--------+
| user_id | order_id | status |
+---------+----------+--------+
|       4 |        4 | active |
+---------+----------+--------+
1 row in set (0.00 sec)

mysql> SELECT * FROM t_user_2;
+---------+----------+--------+
| user_id | order_id | status |
+---------+----------+--------+
|       2 |        2 | active |
+---------+----------+--------+
1 row in set (0.00 sec)

--ds_1
mysql> SHOW DATABASES;
+--------------------+
| Database           |
+--------------------+
| information_schema |
| mysql              |
| mysql_0            |
| performance_schema |
| sys                |
+--------------------+
5 rows in set (0.00 sec)

mysql> USE mysql_1;

mysql> SHOW TABLES;
+-------------------+
| Tables_in_mysql_1 |
+-------------------+
| t_user_1          |
| t_user_3          |
+-------------------+
2 rows in set (0.00 sec)

mysql> SELECT * FROM t_user_1;
+---------+----------+--------+
| user_id | order_id | status |
+---------+----------+--------+
|       1 |        1 | active |
+---------+----------+--------+
1 row in set (0.00 sec)

mysql> SELECT * FROM t_user_3;
+---------+----------+--------+
| user_id | order_id | status |
+---------+----------+--------+
|       3 |        3 | active |
+---------+----------+--------+
1 row in set (0.00 sec)

As the output shows, the records of t_user are stored in 4 shard tables in 2 databases.

Using Console to configure sharding #

Environment Description #

InstanceIP AddressService PortHost NameNote
1DBPlusEngine 1.2.010.9.122.803307dbplusengine
2Console 1.1.010.9.122.818089console
3MySQL 8.0.2810.9.122.8213306test_2
4MySQL 8.0.2810.9.122.8213306test_3

Topology diagram #

Topology diagram

Configuration Process #

  1. Log in to SphereEx-Console and click the “New Table” button on the “Cluster” - “Objects” page.

Console1

  1. On the New Table page, fill out the table creation statement and click the Add Slice Plugin button below.

Console2

  1. On the Shard Plugin page, configure the sharding information.

On this page, you can select the specified table type, storage node, shard key, shard algorithm, and shard quantity, and then click the OK button.

Console3

  1. Confirm the table distribution.

At this point, the page will pop up the message of “Create successfully”. We can confirm the sharding information of the t_user shard table by clicking the “View Table Distribution” property link in the object list.

Console4

As shown in the diagram below, the t_user table has 4 slices, t_user_0~3, which are stored in the test_2 and test_3 storage nodes.

Console5

The demonstration of creating a shard table via SphereEx-Console ends here.

Use custom sharding #

Custom extensions are implemented by configuring the sharding policy type and algorithm class name. CLASS_BASED allows additional custom properties to be passed into the algorithm class, which can be retrieved through an instance of the java.util.Properties class with the property named props . See Git’s org.apache.shardingsphere.example.extension.sharding.algortihm.classbased.fixture.ClassBasedStandardShardingAlgorithmFixture .

Type:CLASS_BASED

Configurable Attributes:

Attribute NameData TypeDescription
1strategyStringSlice policy type, supporting STANDARD, COMPLEX or HINT (case-insensitive)
2algorithmClassNameStringFull qualified name of sharding algorithm

Configuration Example #

When using data sharding, just configure the corresponding data sharding algorithm under the shardingAlgorithms property.

rules:
- !SHARDING
  tables:
    t_order: 
      actualDataNodes: ds_${0..1}.t_order_${0..1}
      tableStrategy: 
        standard:
          shardingColumn: order_id
          shardingAlgorithmName: t-order-inline
      keyGenerateStrategy:
        column: order_id
        keyGeneratorName: snowflake
    t_order_item:
      actualDataNodes: ds_${0..1}.t_order_item_${0..1}
      tableStrategy:
        standard:
          shardingColumn: order_id
          shardingAlgorithmName: t_order-item-inline
      keyGenerateStrategy:
        column: order_item_id
        keyGeneratorName: snowflake
    t_account:
      actualDataNodes: ds_${0..1}.t_account_${0..1}
      tableStrategy:
        standard:
          shardingAlgorithmName: t-account-inline
      keyGenerateStrategy:
        column: account_id
        keyGeneratorName: snowflake
  defaultShardingColumn: account_id
  bindingTables:
    - t_order,t_order_item
  broadcastTables:
    - t_address
  defaultDatabaseStrategy:
    standard:
      shardingColumn: user_id
      shardingAlgorithmName: database-inline
  defaultTableStrategy:
    none:
  
  shardingAlgorithms:
    database-inline:
      type: INLINE
      props:
        algorithm-expression: ds_${user_id % 2}
    t-order-inline:
      type: INLINE
      props:
        algorithm-expression: t_order_${order_id % 2}
    t_order-item-inline:
      type: INLINE
      props:
        algorithm-expression: t_order_item_${order_id % 2}
    t-account-inline:
      type: INLINE
      props:
        algorithm-expression: t_account_${account_id % 2}
  keyGenerators:
    snowflake:
      type: SNOWFLAKE

Automatic Sharding Configuration #

SphereEx-DBPlusEngine also supports automated sharding management, automatically creating shards when the range algorithm’s upper time limit is reached, improving user experience without data movement. Currently, AUTO_INTERVAL and INTERVAL algorithms are supported, which are non-data-movement automatic sharding expansion. Here’s a simple example of automatic sharding. For specific DistSQL usage, please refer to Sharded Tables.

  • Standard Sharding Rule

    CREATE SHARDING TABLE RULE t_order_interval (
        DATANODES("${['ds_0','ds_1']}.t_order_interval_${['2023_06_01','2023_06_02','2023_06_03','2023_06_04','2023_06_05','2023_06_06','2023_06_07','2023_06_08','2023_06_09','2023_06_10','2023_06_11','2023_06_12','2023_06_13','2023_06_14','2023_06_15','2023_06_16','2023_06_17','2023_06_18','2023_06_19','2023_06_20','2023_06_21','2023_06_22','2023_06_23','2023_06_24','2023_06_25']}"),
        DATABASE_STRATEGY(TYPE="standard",SHARDING_COLUMN=user_id,SHARDING_ALGORITHM(TYPE(NAME="inline",PROPERTIES("algorithm-expression"="ds_${user_id % 2}")))),
        TABLE_STRATEGY(TYPE="standard",SHARDING_COLUMN=creation_date,SHARDING_ALGORITHM(TYPE(NAME="INTERVAL",PROPERTIES("datetime-pattern"="yyyy-MM-dd HH:mm:ss",
        "datetime-lower"="2023-06-01 00:00:00","datetime-upper"="2023-06-25 23:59:59","sharding-suffix-pattern"="yyyy_MM_dd","datetime-interval-amount"="1", "datetime-interval-unit"="DAYS")))),
        AUTO_RESHARDING_STRATEGY(
            MATCHING_ALGORITHM(TYPE(NAME="INTERVAL_UPPER_TIME_HIGH_WATERLINE",PROPERTIES("remaining-seconds-until-upper-time"=864000))),
            ACTION_ALGORITHM(TYPE(NAME="SCALE_SHARDING",PROPERTIES("operation-type"="ADD","amount"=20)))
      )
    );
    

    INTERVAL_UPPER_TIME_HIGH_WATERLINE: In the interval algorithm, when the current time is close to the upper time, the specified time for automatic sharding is triggered.

    remaining-seconds-until-upper-time: The remaining seconds until the upper time triggers redistribution. In the example above, it’s 864,000 seconds.

    operation-type: add, other options include subtract, multiply, and divide.

    amount: Corresponds to operation-type, for example, add or multiply. In the example above, it’s adding 2 shards.

  • Automatic Sharding Rule

    CREATE SHARDING TABLE RULE t_order_auto_interval (
        STORAGE_UNITS(ds_0,ds_1),
        SHARDING_COLUMN=creation_date,TYPE(NAME="AUTO_INTERVAL",PROPERTIES("datetime-lower"="2023-06-01 00:00:00", "datetime-upper"="2023-06-25 23:59:59", "sharding-seconds"="86400")),
        AUTO_RESHARDING_STRATEGY(
            MATCHING_ALGORITHM(TYPE(NAME="INTERVAL_UPPER_TIME_HIGH_WATERLINE",PROPERTIES("remaining-seconds-until-upper-time"=864000))),
            ACTION_ALGORITHM(TYPE(NAME="SCALE_SHARDING",PROPERTIES("operation-type"="ADD","amount"=30)))
        )
    );  
    

FAQ #

  1. Can DDL be done by SphereEx-DBPlusEngine after using the sharding plugin?

    Yes, it can. With SphereEx-DBPlusEngine, you can maintain the whole distributed architecture as if you were using one database, without executing in the database one by one.

  2. Is there a solution to migrate the MySQL database currently in use to a distributed cluster?

    Yes, there is. You can pay attention to the capability of SphereEx-DBPlusEngine data migration and elastic expansion plugin, which provides one-stop data migration from centralized to distributed, and also can reasonably expand the capacity of storage nodes according to resource usage.