CDC #
CDC (Change Data Capture) is a method used to capture and track changes in a database. In the context of databases like MySQL, the binlog, or binary log, is utilized to record these changes. Think of the binlog as a message queue that sequentially stores detailed incremental change information within MySQL. By consuming the change entries from this queue, downstream systems or tools can achieve real-time data synchronization with MySQL.
In practical applications, CDC is primarily used for data synchronization, data backup, and recovery purposes. Typically, CDC is employed in scenarios with a high volume of data flow, such as large-scale data warehouses, data centers, and cloud computing environments.
Overview #
SphereEx-DBPlusEngine, the Database Enhancement Engine, serves a dual purpose. It aids in the transformation of databases from monolithic to distributed systems and enables encrypted storage within databases, thereby enhancing database security. The CDC (Change Data Capture) feature of SphereEx-DBPlusEngine allows for the capture of data changes in scenarios involving distributed and encrypted storage. In encryption scenarios, it enables the configuration of encryption/decryption parameters, defines whether data should be transmitted in plaintext or ciphertext to a specified target, catering to the presentation requirements of different target systems. In distributed scenarios, it supports transaction assurance to maintain consistency between the source and target in distributed transactions.
Currently, the SphereEx-DBPlusEngine data migration plugin supports the following database products: MySQL, MySQL branch versions, PostgreSQL, and openGauss. Specific versions, environmental requirements, and permission requirements are detailed below.
Database Supported | Version Support | Environment Requirements | Permission Requirements | |
---|---|---|---|---|
1 | MySQL | 5.1.15 ~ 8.x | my.cnf Configurationlog-bin=mysql-bin binlog-format=row binlog-row-image=full –Confirmation commands are as follows show variables like ‘%log_bin%’; show variables like ‘%binlog%’; | GRANT REPLICATION SLAVE, REPLICATION CLIENT ON . TO ${username}@${host} –Confirmation commands are as follows SHOW GRANTS FOR ‘user’; |
2 | Aurora MySQL | 3.02.2 | my.cnf Configurationlog-bin=mysql-bin binlog-format=row binlog-row-image=full –Confirmation commands are as follows show variables like ‘%log_bin%’; show variables like ‘%binlog%’; | GRANT REPLICATION SLAVE, REPLICATION CLIENT ON . TO ${username}@${host} –Confirmation commands are as follows SHOW GRANTS FOR ‘user’; |
3 | PostgreSQL | 9.4 or above | postgresql.conf Configuration:wal_level = logical max_wal_senders = 10 max_replication_slots = 10 max_connections = 600 | pg_hba.conf Configuration:host replication repl_acct 0.0.0.0/0 md5 |
4 | openGauss | 2.0.1 ~ 3.0.0 | postgresql.conf Configuration:wal_level = logical max_wal_senders = 10 max_replication_slots = 10 max_connections = 600 | pg_hba.conf Configuration:host replication repl_acct 0.0.0.0/0 md5 |
5 | MariaDB | 5.1 及以上 | my.cnf Configurationlog-bin=mysql-bin binlog-format=row binlog-row-image=full –Confirmation commands are as follows show variables like ‘%log_bin%’; show variables like ‘%binlog%’; | GRANT REPLICATION SLAVE, REPLICATION CLIENT ON . TO ${username}@${host} –Confirmation commands are as follows SHOW GRANTS FOR ‘user’; |
Architecture Diagram #
The core features are: #
- Full data collection
- Incremental data collection
- Integration of incremental and full data
- Transaction support
- Data decryption and transmission
- Automatic failover
- Resource scheduling support
- Resume from breakpoint
Basic Concepts #
- CDC Process
CDC (Change Data Capture) is a data collection technology that can monitor data changes in SphereEx-DBPlusEngine’s storage nodes, capture data operation events, filter and extract useful information, and ultimately send these changed data to a specified target.
Cluster Multiple nodes grouped together to provide specific services.
Source End The cluster where the original data is located.
Target End The target storage cluster to which the original data will be migrated.
Subscription Configuring the target end to receive change data from the source end.
Existing Data Data already present in the storage nodes before configuring CDC functionality.
Incremental Data New data generated by the business system during the operation of CDC.
Use Cases #
CDC is mainly used for data synchronization, data backup, and recovery. Typically, CDC is applied in scenarios with a significant data flow, such as large-scale data warehouses, data centers, and cloud computing. Additionally, CDC supports writing data to Kafka, so any target end capable of consuming Kafka can be used as a target end for SphereEx-DBPlusEngine CDC.
Prerequisites #
- The SphereEx-DBPlusEngine and the target database environment should be deployed and running smoothly with proper network connectivity.
- CDC relies on governance centers like Zookeeper, which should be deployed in advance.
Usage Restrictions #
Currently, only openGauss supports global sorting for incremental data. This is because other databases do not provide a globally ordered CSN (Change Sequence Number), making it impossible to achieve.
Supports global sorting in openGauss. Supports Kafka. Does not support DDL changes. Does not support global sorting for databases other than openGauss. Does not support tables with foreign keys. If scaling operations (e.g., scaling up or down) are triggered while the CDC service is running, CDC will stop collecting change data for that table.
Notes #
During the operation of CDC, please make sure that the database logs are not cleared.
Principle Introduction #
It can be mainly divided into three major components:
- CDC Server: It provides incremental data subscription services. When using a network channel, it opens a server-side listening port.
- CDC Protocol: This is required when using a network channel. The protocol is defined using protobuf3, which offers high data compression rates and encoding/decoding performance, making it less prone to data encoding/decoding issues.
- CDC Client: This is required when using a network channel. After connecting and logging into the CDC server, it subscribes to and consumes incremental data, then writes it to the client’s own target destination. To do this, you need to reference the cdc-client Maven package provided by the system.
Deploying the CDC Server #
Currently, the CDC and Proxy are within the same process. A single Proxy instance can simultaneously provide the existing Proxy service and the new CDC service.
Configuring CDC involves adding a new property to the server.yaml file.
cdc-server-port: 33071
- Explanation:
Configuring this property means that you need to enable the CDC service port, and any Proxy instance can provide the CDC service. If not configured, it will not be enabled.
CDC’s state is similar to Proxy, with important data stored in the registration center (e.g., Zookeeper), and there is corresponding status within the process.
The CDC server can be considered stateless.
CDC Server Parameter Parameter Name: cdc_decrypt_enabled
Parameter Meaning: Indicates whether CDC should enable the decryption of ciphertext into plaintext and push it to the client, with support for online modification.
Parameter Value: true/false
Default Value: false
CDC Consumer Configuration Example #
Communication with CDC requires the use of both the CDC client and CDC server.
Here is an example configuration for the CDC consumer:
- First, you need to include the Maven dependency for the CDC client.
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere-data-pipeline-cdc-client</artifactId>
<version>${project.version}</version>
</dependency>
- Performing CDC Data Synchronization Using CDCClient
import com.google.protobuf.EmptyProto;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.TimestampProto;
import com.google.protobuf.TypeRegistry;
import com.google.protobuf.WrappersProto;
import com.google.protobuf.util.JsonFormat;
import com.google.protobuf.util.JsonFormat.Printer;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.cdc.client.CDCClient;
import org.apache.shardingsphere.data.pipeline.cdc.client.parameter.StartCDCClientParameter;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody.SchemaTable;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@Slf4j
public final class CDCDemo {
public static void main(final String[] args) {
StartCDCClientParameter parameter = new StartCDCClientParameter();
// Proxy address
parameter.setAddress("127.0.0.1");
// CDC port, and it should match the configuration file
parameter.setPort(33071);
// Proxy username
parameter.setUsername("root");
// Proxy password
parameter.setPassword("root");
// Subscribed Logical Database
parameter.setDatabase("sharding_db");
// Whether to Enable Full
parameter.setFull(true);
// Tables to Subscribe to
parameter.setSchemaTables(Collections.singletonList(SchemaTable.newBuilder().setTable("t_order").build()));
// The second parameter is the logic for consuming data, here it is simply printing, but it can also be written to a database or message queue (MQ)
Printer printer = JsonFormat.printer().omittingInsignificantWhitespace().usingTypeRegistry(TypeRegistry.newBuilder().add(EmptyProto.getDescriptor().getMessageTypes()).add(TimestampProto.getDescriptor().getMessageTypes()).add(WrappersProto.getDescriptor().getMessageTypes()).build());
CDCClient cdcClient = new CDCClient(parameter, records -> {
List<String> list = new ArrayList<>();
for (Record record : records) {
String print;
try {
print = printer.print(record);
} catch (final InvalidProtocolBufferException ex) {
throw new RuntimeException(ex);
}
list.add(print);
}
System.out.printf("%s , received records: %s%n", LocalDateTime.now(), list);
});
cdcClient.start();
}
}
FAQ #
- After going through CDC, sharded tables remain sharded tables. CDC is primarily for capturing and transmitting data changes and does not capture or transmit structural changes.