Logo
CDC

CDC #

CDC (Change Data Capture) is a method used to capture and track changes in a database. In the context of databases like MySQL, the binlog, or binary log, is utilized to record these changes. Think of the binlog as a message queue that sequentially stores detailed incremental change information within MySQL. By consuming the change entries from this queue, downstream systems or tools can achieve real-time data synchronization with MySQL.

In practical applications, CDC is primarily used for data synchronization, data backup, and recovery purposes. Typically, CDC is employed in scenarios with a high volume of data flow, such as large-scale data warehouses, data centers, and cloud computing environments.

Overview #

SphereEx-DBPlusEngine, the Database Enhancement Engine, serves a dual purpose. It aids in the transformation of databases from monolithic to distributed systems and enables encrypted storage within databases, thereby enhancing database security. The CDC (Change Data Capture) feature of SphereEx-DBPlusEngine allows for the capture of data changes in scenarios involving distributed and encrypted storage. In encryption scenarios, it enables the configuration of encryption/decryption parameters, defines whether data should be transmitted in plaintext or ciphertext to a specified target, catering to the presentation requirements of different target systems. In distributed scenarios, it supports transaction assurance to maintain consistency between the source and target in distributed transactions.

Currently, the SphereEx-DBPlusEngine data migration plugin supports the following database products: MySQL, MySQL branch versions, PostgreSQL, and openGauss. Specific versions, environmental requirements, and permission requirements are detailed below.

Database Supported
Version Support
Environment Requirements
Permission Requirements
1MySQL5.1.15 ~ 8.xmy.cnf Configuration
log-bin=mysql-bin
binlog-format=row
binlog-row-image=full
–Confirmation commands are as follows
show variables like ‘%log_bin%’;
show variables like ‘%binlog%’;
GRANT REPLICATION SLAVE, REPLICATION CLIENT ON . TO ${username}@${host}
–Confirmation commands are as follows
SHOW GRANTS FOR ‘user’;
2Aurora MySQL3.02.2my.cnf Configuration
log-bin=mysql-bin
binlog-format=row
binlog-row-image=full
–Confirmation commands are as follows
show variables like ‘%log_bin%’;
show variables like ‘%binlog%’;
GRANT REPLICATION SLAVE, REPLICATION CLIENT ON . TO ${username}@${host}
–Confirmation commands are as follows
SHOW GRANTS FOR ‘user’;
3PostgreSQL9.4 or abovepostgresql.conf Configuration:
wal_level = logical
max_wal_senders = 10
max_replication_slots = 10
max_connections = 600
pg_hba.conf Configuration:
host replication repl_acct 0.0.0.0/0 md5
4openGauss2.0.1 ~ 3.0.0postgresql.conf Configuration:
wal_level = logical
max_wal_senders = 10
max_replication_slots = 10
max_connections = 600
pg_hba.conf Configuration:
host replication repl_acct 0.0.0.0/0 md5
5MariaDB5.1 及以上my.cnf Configuration
log-bin=mysql-bin
binlog-format=row
binlog-row-image=full
–Confirmation commands are as follows
show variables like ‘%log_bin%’;
show variables like ‘%binlog%’;
GRANT REPLICATION SLAVE, REPLICATION CLIENT ON . TO ${username}@${host}
–Confirmation commands are as follows
SHOW GRANTS FOR ‘user’;

Architecture Diagram #

CDC原理图

The core features are: #

  • Full data collection
  • Incremental data collection
  • Integration of incremental and full data
  • Transaction support
  • Data decryption and transmission
  • Automatic failover
  • Resource scheduling support
  • Resume from breakpoint

Basic Concepts #

  • CDC Process

CDC (Change Data Capture) is a data collection technology that can monitor data changes in SphereEx-DBPlusEngine’s storage nodes, capture data operation events, filter and extract useful information, and ultimately send these changed data to a specified target.

  • Cluster Multiple nodes grouped together to provide specific services.

  • Source End The cluster where the original data is located.

  • Target End The target storage cluster to which the original data will be migrated.

  • Subscription Configuring the target end to receive change data from the source end.

  • Existing Data Data already present in the storage nodes before configuring CDC functionality.

  • Incremental Data New data generated by the business system during the operation of CDC.

Use Cases #

CDC is mainly used for data synchronization, data backup, and recovery. Typically, CDC is applied in scenarios with a significant data flow, such as large-scale data warehouses, data centers, and cloud computing. Additionally, CDC supports writing data to Kafka, so any target end capable of consuming Kafka can be used as a target end for SphereEx-DBPlusEngine CDC.

Prerequisites #

  • The SphereEx-DBPlusEngine and the target database environment should be deployed and running smoothly with proper network connectivity.
  • CDC relies on governance centers like Zookeeper, which should be deployed in advance.

Usage Restrictions #

Currently, only openGauss supports global sorting for incremental data. This is because other databases do not provide a globally ordered CSN (Change Sequence Number), making it impossible to achieve.

Supports global sorting in openGauss. Supports Kafka. Does not support DDL changes. Does not support global sorting for databases other than openGauss. Does not support tables with foreign keys. If scaling operations (e.g., scaling up or down) are triggered while the CDC service is running, CDC will stop collecting change data for that table.

Notes #

During the operation of CDC, please make sure that the database logs are not cleared.

Principle Introduction #

CDC架构图

It can be mainly divided into three major components:

  • CDC Server: It provides incremental data subscription services. When using a network channel, it opens a server-side listening port.
  • CDC Protocol: This is required when using a network channel. The protocol is defined using protobuf3, which offers high data compression rates and encoding/decoding performance, making it less prone to data encoding/decoding issues.
  • CDC Client: This is required when using a network channel. After connecting and logging into the CDC server, it subscribes to and consumes incremental data, then writes it to the client’s own target destination. To do this, you need to reference the cdc-client Maven package provided by the system.

Deploying the CDC Server #

Currently, the CDC and Proxy are within the same process. A single Proxy instance can simultaneously provide the existing Proxy service and the new CDC service.

Configuring CDC involves adding a new property to the server.yaml file.

cdc-server-port: 33071
  • Explanation:

Configuring this property means that you need to enable the CDC service port, and any Proxy instance can provide the CDC service. If not configured, it will not be enabled.

CDC’s state is similar to Proxy, with important data stored in the registration center (e.g., Zookeeper), and there is corresponding status within the process.

The CDC server can be considered stateless.

CDC Server Parameter Parameter Name: cdc_decrypt_enabled

Parameter Meaning: Indicates whether CDC should enable the decryption of ciphertext into plaintext and push it to the client, with support for online modification.

Parameter Value: true/false

Default Value: false

CDC Consumer Configuration Example #

Communication with CDC requires the use of both the CDC client and CDC server.

Here is an example configuration for the CDC consumer:

  1. First, you need to include the Maven dependency for the CDC client.
<dependency>
    <groupId>org.apache.shardingsphere</groupId>
    <artifactId>shardingsphere-data-pipeline-cdc-client</artifactId>
    <version>${project.version}</version>
</dependency>
  1. Performing CDC Data Synchronization Using CDCClient
import com.google.protobuf.EmptyProto;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.TimestampProto;
import com.google.protobuf.TypeRegistry;
import com.google.protobuf.WrappersProto;
import com.google.protobuf.util.JsonFormat;
import com.google.protobuf.util.JsonFormat.Printer;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.cdc.client.CDCClient;
import org.apache.shardingsphere.data.pipeline.cdc.client.parameter.StartCDCClientParameter;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody.SchemaTable;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record;

import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

@Slf4j
public final class CDCDemo {
    
    public static void main(final String[] args) {
        StartCDCClientParameter parameter = new StartCDCClientParameter();
        // Proxy address
        parameter.setAddress("127.0.0.1");
        // CDC port, and it should match the configuration file
        parameter.setPort(33071);
        // Proxy username
        parameter.setUsername("root");
        // Proxy password
        parameter.setPassword("root");
        // Subscribed Logical Database
        parameter.setDatabase("sharding_db");
        // Whether to Enable Full
        parameter.setFull(true);
        // Tables to Subscribe to
        parameter.setSchemaTables(Collections.singletonList(SchemaTable.newBuilder().setTable("t_order").build()));
        // The second parameter is the logic for consuming data, here it is simply printing, but it can also be written to a database or message queue (MQ)
        Printer printer = JsonFormat.printer().omittingInsignificantWhitespace().usingTypeRegistry(TypeRegistry.newBuilder().add(EmptyProto.getDescriptor().getMessageTypes()).add(TimestampProto.getDescriptor().getMessageTypes()).add(WrappersProto.getDescriptor().getMessageTypes()).build());
        CDCClient cdcClient = new CDCClient(parameter, records -> {
            List<String> list = new ArrayList<>();
            for (Record record : records) {
                String print;
                try {
                    print = printer.print(record);
                } catch (final InvalidProtocolBufferException ex) {
                    throw new RuntimeException(ex);
                }
                list.add(print);
            }
            System.out.printf("%s , received records: %s%n", LocalDateTime.now(), list);
        });
        cdcClient.start();
    }
}

FAQ #

  1. After going through CDC, sharded tables remain sharded tables. CDC is primarily for capturing and transmitting data changes and does not capture or transmit structural changes.