Logo
CDC

CDC #

CDC(Change Data Capture)增量数据捕捉。数据库日志(以 MySQL 为例) binlog 是 MySQL 记录变更数据的"二进制日志",它可以看做是一个消息队列,队列中按顺序保存了 MySQL 中详细的增量变更信息,通过消费队列中的变更条目,下游系统或工具实现了与 MySQL 的实时数据同步。 在实际应用中,CDC主要用于数据同步,数据备份和恢复等方面。通常情况下,CDC都应用于数据流转量较大的场景中,如大规模数据仓库、数据中心、云计算等领域。

概述 #

SphereEx-DBPlusEngine 数据库增强引擎既可以帮助数据库实现单体到分布式转换,又可以实现数据在数据库中的加密存储,进而帮助数据库提升安全性。SphereEx-DBPlusEngine 的 CDC 功能,实现对分布式和加密存储等场景下的数据变化的捕捉。加密场景设置加解密参数,定义传输明文或密文到指定目标,满足不同目标端对数据展现形式的需求。分布式场景支持事务保障源端和目标端分布式事务的一致性。 目前 SphereEx-DBPlusEngine 数据迁移插件所支持的数据库产品为 MySQL、MySQL分支版本、PostgreSQL 和 openGauss,具体版本、环境要求和权限要求如下。

数据库
版本支持
环境要求
权限要求
1MySQL5.1.15 ~ 8.xmy.cnf 配置
log-bin=mysql-bin
binlog-format=row
binlog-row-image=full
–确认命令如下
show variables like ‘%log_bin%’;
show variables like ‘%binlog%’;
GRANT REPLICATION SLAVE, REPLICATION CLIENT ON . TO ${username}@${host}
–确认命令如下
SHOW GRANTS FOR ‘user’;
2Aurora MySQL3.02.2my.cnf 配置
log-bin=mysql-bin
binlog-format=row
binlog-row-image=full
–确认命令如下
show variables like ‘%log_bin%’;
show variables like ‘%binlog%’;
GRANT REPLICATION SLAVE, REPLICATION CLIENT ON . TO ${username}@${host}
–确认命令如下
SHOW GRANTS FOR ‘user’;
3PostgreSQL9.4 或以上版本postgresql.conf 配置:
wal_level = logical
max_wal_senders = 10
max_replication_slots = 10
max_connections = 600
pg_hba.conf 配置:
host replication repl_acct 0.0.0.0/0 md5
4openGauss2.0.1 ~ 3.0.0postgresql.conf 配置:
wal_level = logical
max_wal_senders = 10
max_replication_slots = 10
max_connections = 600
pg_hba.conf 配置:
host replication repl_acct 0.0.0.0/0 md5
5MariaDB5.1 及以上my.cnf 配置
log-bin=mysql-bin
binlog-format=row
binlog-row-image=full
–确认命令如下
show variables like ‘%log_bin%’;
show variables like ‘%binlog%’;
GRANT REPLICATION SLAVE, REPLICATION CLIENT ON . TO ${username}@${host}
–确认命令如下
SHOW GRANTS FOR ‘user’;

核心功能 #

  • 全量数据采集
  • 增量数据采集
  • 增量数据和全量数据的衔接
  • 支持事务
  • 数据解密传输
  • 故障自动切换
  • 支持资源调度
  • 断点续传

基本概念 #

  • CDC 进程

CDC(Change Data Capture)是一种数据采集技术,CDC 可以监控 SphereEx-DBPlusEngine 的存储节点中的数据变化,捕捉到数据操作事件,过滤并提取有用信息,最终将这些变化数据发送到指定的目标上。

  • 集群

为了提供特定服务而集合在一起的多个节点。

  • 源端

原始数据所在的集群。

  • 目标端

原始数据将要迁移的目标存储集群。

  • 订阅

目标端配置接收源端的变化数据。

  • 存量数据

配置 CDC 功能前,存储节点中已有的数据。

  • 增量数据

CDC 运行过程中,业务系统所产生的新数据。

适用场景 #

CDC主要用于数据同步、数据备份和恢复等方面。通常情况下,CDC都应用于数据流转量较大的场景中,如大规模数据仓库、数据中心、云计算等。同时 CDC 支持将数据写入 Kafka,所有可以消费 Kafka 的目标端都可以作为 SphereEx-DBPlusEngine CDC 的目标端。

使用前提 #

  • 已完成 SphereEx-DBPlusEngine 和目标端数据库环境部署,网络通畅且服务正常。

  • CDC 需要依赖治理中心(如 Zookeeper),需提前完成部署。

使用限制 #

目前只有 openGauss 的增量数据支持全局排序。由于其它数据库没有提供全局有序的 CSN,还无法做到。

  • 支持 openGauss 全局排序。
  • 支持 Kafka
  • 不支持 DDL 变更。
  • 不支持除 openGauss 以外数据库的全局排序。
  • 不支持存在外键的表
  • 不支持在进行 CDC 服务期间触发了扩缩容,CDC 将停止对该表的变化数据的采集。

注意事项 #

在 CDC 运行期间,请务必确保数据库日志不能被清除。

部署 CDC 服务端 #

目前 CDC 和 Proxy 在相同进程内。一个 Proxy 实例可以同时提供原有的 Proxy 服务以及新的 CDC 服务。

修改 global.yaml 配置 #

新增属性:

cdc-server-port: 33071
cdc-decrypt-enabled: false
  • 属性 cdc-server-port

配置了这个属性代表需要开启 CDC 服务端口,在所有 proxy 实例,也就是任意一个 proxy 实例都可以提供 CDC 服务。不配置则不开启。

CDC 的状态和 Proxy 类似,重要的数据都在注册中心(比如 ZK),进程里面有对应的状态。

CDC 服务端可以认为是无状态的。

  • 属性 cdc-decrypt-enabled

参数含义: CDC 是否开启将密文解析为明文推送到客户端,支持在线修改

参数值: true/false

默认值:false

修改 CDC 配置(可选) #

因 streaming rule 具有默认值,无需创建,仅提供 ALTER 语句。

完整配置 DistSQL 示例:

ALTER STREAMING RULE (
READ(
  WORKER_THREAD=20,
  BATCH_SIZE=1000,
  SHARDING_SIZE=10000000,
  RATE_LIMITER (TYPE(NAME='QPS',PROPERTIES('qps'='500')))
),
WRITE(
  WORKER_THREAD=20,
  BATCH_SIZE=1000,
  RATE_LIMITER (TYPE(NAME='TPS',PROPERTIES('tps'='2000')))
),
STREAM_CHANNEL (TYPE(NAME='MEMORY',PROPERTIES('block-queue-size'='2000')))
);

配置项说明:

ALTER STREAMING RULE (
READ( -- 数据读取配置。如果不配置则部分参数默认生效。
  WORKER_THREAD=20, -- 影响全量、增量任务,从源端摄取数据的线程池大小,不配置则使用默认值,需要确保该值不低于分库的数量。
  BATCH_SIZE=1000, -- 影响全量、增量任务,一次查询操作返回的最大记录数。如果一个事务中的数据量大于该值,增量情况下可能超过设定的值。
  SHARDING_SIZE=10000000, -- 影响全量任务,存量数据分片大小。如果不配置则使用默认值。
  RATE_LIMITER ( -- 影响全量、增量任务,限流算法。如果不配置则不限流。
  TYPE( -- 算法类型。可选项:QPS
  NAME='QPS',
  PROPERTIES( -- 算法属性
  'qps'='500'
  )))
),
WRITE( -- 数据写入配置。如果不配置则部分参数默认生效。
  WORKER_THREAD=20, -- 影响全量、增量任务,数据写入到目标端的线程池大小。如果不配置则使用默认值。
  BATCH_SIZE=1000, -- 影响全量、增量任务,存量任务一次批量写入操作的最大记录数。如果不配置则使用默认值。如果一个事务中的数据量大于该值,增量情况下可能超过设定的值。
  RATE_LIMITER ( -- 限流算法。如果不配置则不限流。
  TYPE( -- 算法类型。可选项:TPS
  NAME='TPS',
  PROPERTIES( -- 算法属性
  'tps'='2000'
  )))
),
STREAM_CHANNEL ( -- 数据通道,连接生产者和消费者,用于 read 和 write 环节。如果不配置则默认使用 MEMORY 类型。
TYPE( -- 算法类型。可选项:MEMORY
NAME='MEMORY',
PROPERTIES( -- 算法属性
'block-queue-size'='2000' -- 属性:阻塞队列大小
)))
);

输出到网络 #

原理介绍 #

CDC架构图

主要分为 3 大组成部分:

  • CDC 服务端:提供增量数据订阅服务。使用网络通道的情况下,会开启服务端监听端口。
  • CDC 协议:使用网络通道的情况下需要。协议使用 protobuf3 定义,数据压缩率和编解码性能较高,不容易出现数据编解码问题。
  • CDC 客户端:使用网络通道的情况下需要。连接登录 CDC 服务端之后,订阅并消费增量数据,写入客户自己的目标端。需要引用系统提供的 cdc-client Maven 包。

CDC 消费端使用示例 #

需要使用 CDC 客户端和 CDC 服务端进行通信

如下为 CDC 消费端的配置示例

  1. 首先需要引入 CDC 客户端的 maven 依赖
<dependency>
    <groupId>org.apache.shardingsphere</groupId>
    <artifactId>shardingsphere-data-pipeline-cdc-client</artifactId>
    <version>${project.version}</version>
</dependency>
  1. 基于 CDCClient 进行 CDC 数据同步
import com.google.protobuf.EmptyProto;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.TimestampProto;
import com.google.protobuf.TypeRegistry;
import com.google.protobuf.WrappersProto;
import com.google.protobuf.util.JsonFormat;
import com.google.protobuf.util.JsonFormat.Printer;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.cdc.client.CDCClient;
import org.apache.shardingsphere.data.pipeline.cdc.client.parameter.StartCDCClientParameter;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody.SchemaTable;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record;

import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

@Slf4j
public final class CDCDemo {
    
    public static void main(final String[] args) {
        StartCDCClientParameter parameter = new StartCDCClientParameter();
        // Proxy的地址
        parameter.setAddress("127.0.0.1");
        // CDC 端口,和配置文件中一致
        parameter.setPort(33071);
        // Proxy的用户名
        parameter.setUsername("root");
        // Proxy的密码
        parameter.setPassword("root");
        // 订阅的逻辑库
        parameter.setDatabase("sharding_db");
        // 是否开启全量
        parameter.setFull(true);
        // 需要订阅的表
        parameter.setSchemaTables(Collections.singletonList(SchemaTable.newBuilder().setTable("t_order").build()));
        // 第二个参数是数据消费的逻辑,这里是简单打印下,也可以写到数据库或MQ
        Printer printer = JsonFormat.printer().omittingInsignificantWhitespace().usingTypeRegistry(TypeRegistry.newBuilder().add(EmptyProto.getDescriptor().getMessageTypes()).add(TimestampProto.getDescriptor().getMessageTypes()).add(WrappersProto.getDescriptor().getMessageTypes()).build());
        CDCClient cdcClient = new CDCClient(parameter, records -> {
            List<String> list = new ArrayList<>();
            for (Record record : records) {
                String print;
                try {
                    print = printer.print(record);
                } catch (final InvalidProtocolBufferException ex) {
                    throw new RuntimeException(ex);
                }
                list.add(print);
            }
            System.out.printf("%s , received records: %s%n", LocalDateTime.now(), list);
        });
        cdcClient.start();
    }
}

输出到 Kafka #

原理介绍 #

CDC原理图

使用示例 #

CREATE DATABASE sharding_db;

USE sharding_db

REGISTER STORAGE UNIT ds_0 ( URL='jdbc:mysql://127.0.0.1:3306/ds_0?verifyServerCertificate=false&rewriteBatchedStatements=true&allowMultiQueries=true&useServerPrepStmts=true&useSSL=true&requireSSL=true&enabledTLSProtocols=TLSv1.2,TLSv1.3&allowPublicKeyRetrieval=true&characterEncoding=utf-8&useLocalSessionState=true', USER='root', PASSWORD='root');

-- 创建逻辑表规则

开启 CDC:

STREAM full DATA FROM sharding_db * TO SINK (TYPE(NAME='kafka', PROPERTIES('bootstrap.servers'='PLAINTEXT://localhost:9092','topic-route-strategy'='PER_DATABASE','topic-route-prefix'='')));

PROPERTIES 里面是给 Kafka 使用的配置,会直接透传给 KafkaProducer,除了topic-route-strategytopic-route-prefix,这两个是 CDC 自己的配置。

属性说明:

  • topic-route-strategy:topic 路由策略,可选项:1)PER_DATABASE,每个逻辑库一个 topic,2)PER_TABLE,每个逻辑表一个 topic。
  • topic-route-prefix:topic 名称前缀。默认为空。

常用 DistSQL:

SHOW STREAMING LIST;

SHOW STREAMING STATUS {jobId};

FAQ #

  1. 分片表经过 CDC 之后是分片表还是单表

单表,CDC 仅是数据捕获和传输,不会捕获和传输结构的变更。