# 各类数据库 CDC 配置指南

## MySQL
MySQL CDC 连接器允许从 MySQL 数据库读取快照数据和增量数据。本文描述了如何准备好 MySQL 侧相关功能。

### 支持的数据库
| Connector                                                    | Database                                                     | Driver              |
| ------------------------------------------------------------ | ------------------------------------------------------------ | ------------------- |
| [mysql-cdc](https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc(ZH).html#) | [MySQL](https://dev.mysql.com/doc): 5.6, 5.7, 8.0.x<br />[RDS MySQL](https://www.aliyun.com/product/rds/mysql): 5.6, 5.7, 8.0.x<br />[PolarDB MySQL](https://www.aliyun.com/product/polardb): 5.6, 5.7, 8.0.x<br />[Aurora MySQL](https://aws.amazon.com/cn/rds/aurora): 5.6, 5.7, 8.0.x<br />[MariaDB](https://mariadb.org/): 10.x<br />[PolarDB X](https://github.com/ApsaraDB/galaxysql): 2.0.1 | JDBC Driver: 8.0.27 |

### 创建用户
CDC connector需要使用 MySQL 用户连接抽取数据。该用户需要拥有所有需要进行 CDC 抽取的数据库的权限。

1. 创建 MySQL 用户:

   ```SQL
   mysql> CREATE USER 'user'@'localhost' IDENTIFIED BY 'password';
   ```

2. 用户赋权:

   ```SQL
   mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';
   ```

3. 提交权限

   ```SQL
   mysql> FLUSH PRIVILEGES;
   ```

权限说明:

| 权限关键字                 | 说明                                                         |
| :------------------------- | :----------------------------------------------------------- |
| `SELECT`                   | Enables the connector to select rows from tables in databases. This is used only when performing a snapshot. |
| `RELOAD`                   | Enables the connector the use of the `FLUSH` statement to clear or reload internal caches, flush tables, or acquire locks. This is used only when performing a snapshot. |
| `SHOW DATABASES`           | Enables the connector to see database names by issuing the `SHOW DATABASE` statement. This is used only when performing a snapshot. |
| `REPLICATION SLAVE`        | Enables the connector to connect to and read the MySQL server binlog. |
| `REPLICATION CLIENT`       | Enables the connector the use of the following statements:`SHOW MASTER STATUS``SHOW SLAVE STATUS``SHOW BINARY LOGS`The connector always requires this. |
| `ON`                       | Identifies the database to which the permissions apply.      |
| `TO 'user'`                | Specifies the user to grant the permissions to.              |
| `IDENTIFIED BY 'password'` | Specifies the user’s MySQL password.                         |

### 打开 binlog

1. 检查 `log-bin` 是否已经打开

   ```SQL
   mysql> SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::"
   FROM information_schema.global_variables WHERE variable_name='log_bin';
   ```

2. 如果是是 `OFF`, 修改 MySQL 配置文件如下配置项:

   ```properties
   server-id         = 223344
   log_bin           = mysql-bin
   binlog_format     = ROW
   binlog_row_image  = FULL
   expire_logs_days  = 10
   ```

3. 再次检查 binlog 是否打开

   ```SQL
   mysql> SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::"
   FROM information_schema.global_variables WHERE variable_name='log_bin';
   ```

MySQL 配置项说明

| 配置项             | 说明                                                         |
| :----------------- | :----------------------------------------------------------- |
| `server-id`        | The value for the `server-id` must be unique for each server and replication client in the MySQL cluster. During MySQL connector set up, Debezium assigns a unique server ID to the connector. |
| `log_bin`          | The value of `log_bin` is the base name of the sequence of binlog files. |
| `binlog_format`    | The `binlog-format` must be set to `ROW` or `row`.           |
| `binlog_row_image` | The `binlog_row_image` must be set to `FULL` or `full`.      |
| `expire_logs_days` | This is the number of days for automatic binlog file removal. The default is `0`, which means no automatic removal. Set the value to match the needs of your environment. See [MySQL purges binlog files](https://debezium.io/documentation/reference/1.6/connectors/mysql.html#mysql-purges-binlog-files-used-by-debezium). |

## Oracle
Oracle CDC 连接器允许从 Oracle 数据库读取快照数据和增量数据。本文描述了如何准备好 Oracle 侧相关功能。

### 支持的数据库
| Connector                                                    | Database           | Driver                  |
| ------------------------------------------------------------ | ------------------ | ----------------------- |
| [oracle-cdc](https://ververica.github.io/flink-cdc-connectors/master/content/connectors/oracle-cdc.html#) | Oracle: 11, 12, 19 | Oracle Driver: 19.3.0.0 |

### Oracle 准备
首先需要打开 Oracle 数据库的 log archiving 功能,并且准备一个 Oracle 用户,其需要具有对于所有需要进行 CDC 抽取的数据库合适的权限。

1. 打开 log archiving  
   (1.1). 以DBA身份连接Oracle

   ```
   ORACLE_SID=SID
   export ORACLE_SID
   sqlplus /nolog
     CONNECT sys/password AS SYSDBA
   ```

   (1.2). 打开 log archiving

   ```
   alter system set db_recovery_file_dest_size = 10G;
   alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
   shutdown immediate;
   startup mount;
   alter database archivelog;
   alter database open;
   ```

   **注意:**
    - 打开 log archiving 需要重启 Oracle 数据库
    - archived logs 可能会占用大量磁盘空间,建议定期清理过期数据

   (1.3). 检查 log archiving 状态

   ```
   -- Should now "Database log mode: Archive Mode"
   archive log list;
   ```

   **注意:**

   需要在待抽取的表或数据库上开启 supplemental logging,这是为了能抓取 CDC 数据每行的*before* state 。如下说明了如何配置打开该功能。

   ```
   -- Enable supplemental logging for a specific table:
   ALTER TABLE inventory.customers ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
   ```

   ```
   -- Enable supplemental logging for database
   ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
   ```

2. 创建有权限的 Oracle 用户

   (2.1). 创建 Tablespace

   ```
   sqlplus sys/password@host:port/SID AS SYSDBA;
     CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/SID/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
     exit;
   ```

   (2.2). 创建用户并赋权

   ```
   sqlplus sys/password@host:port/SID AS SYSDBA;
     CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;
     GRANT CREATE SESSION TO flinkuser;
     GRANT SET CONTAINER TO flinkuser;
     GRANT SELECT ON V_$DATABASE to flinkuser;
     GRANT FLASHBACK ANY TABLE TO flinkuser;
     GRANT SELECT ANY TABLE TO flinkuser;
     GRANT SELECT_CATALOG_ROLE TO flinkuser;
     GRANT EXECUTE_CATALOG_ROLE TO flinkuser;
     GRANT SELECT ANY TRANSACTION TO flinkuser;
     GRANT LOGMINING TO flinkuser;
   
     GRANT CREATE TABLE TO flinkuser;
     GRANT LOCK ANY TABLE TO flinkuser;
     GRANT ALTER ANY TABLE TO flinkuser;
     GRANT CREATE SEQUENCE TO flinkuser;
   
     GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser;
     GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser;
   
     GRANT SELECT ON V_$LOG TO flinkuser;
     GRANT SELECT ON V_$LOG_HISTORY TO flinkuser;
     GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser;
     GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser;
     GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser;
     GRANT SELECT ON V_$LOGFILE TO flinkuser;
     GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser;
     GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser;
     exit;
   ```

### 对于 CDB database

整体来说和以上非 CDB 数据库操作相似,但是具体命令不尽相同。

1. 打开 log archiving

   ```
   ORACLE_SID=ORCLCDB
   export ORACLE_SID
   sqlplus /nolog
     CONNECT sys/password AS SYSDBA
     alter system set db_recovery_file_dest_size = 10G;
     -- should exist
     alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
     shutdown immediate
     startup mount
     alter database archivelog;
     alter database open;
     -- Should show "Database log mode: Archive Mode"
     archive log list
     exit;
   ```

   **Note:** 这里也需要打开 supplemental logging:

   ```
   -- Enable supplemental logging for a specific table:
   ALTER TABLE inventory.customers ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
   -- Enable supplemental logging for database
   ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
   ```

2. 创建有权限的 Oracle 用户

   ```
   sqlplus sys/password@//localhost:1521/ORCLCDB as sysdba
     CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
     exit
   ```

   ```
   sqlplus sys/password@//localhost:1521/ORCLPDB1 as sysdba
     CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/ORCLPDB1/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
     exit
   ```

   ```
   sqlplus sys/password@//localhost:1521/ORCLCDB as sysdba
     CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE logminer_tbs QUOTA UNLIMITED ON logminer_tbs CONTAINER=ALL;
     GRANT CREATE SESSION TO flinkuser CONTAINER=ALL;
     GRANT SET CONTAINER TO flinkuser CONTAINER=ALL;
     GRANT SELECT ON V_$DATABASE to flinkuser CONTAINER=ALL;
     GRANT FLASHBACK ANY TABLE TO flinkuser CONTAINER=ALL;
     GRANT SELECT ANY TABLE TO flinkuser CONTAINER=ALL;
     GRANT SELECT_CATALOG_ROLE TO flinkuser CONTAINER=ALL;
     GRANT EXECUTE_CATALOG_ROLE TO flinkuser CONTAINER=ALL;
     GRANT SELECT ANY TRANSACTION TO flinkuser CONTAINER=ALL;
     GRANT LOGMINING TO flinkuser CONTAINER=ALL;
     GRANT CREATE TABLE TO flinkuser CONTAINER=ALL;
     -- need not to execute if set scan.incremental.snapshot.enabled=true(default)
     GRANT LOCK ANY TABLE TO flinkuser CONTAINER=ALL;
     GRANT CREATE SEQUENCE TO flinkuser CONTAINER=ALL;
   
     GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser CONTAINER=ALL;
     GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser CONTAINER=ALL;
   
     GRANT SELECT ON V_$LOG TO flinkuser CONTAINER=ALL;
     GRANT SELECT ON V_$LOG_HISTORY TO flinkuser CONTAINER=ALL;
     GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser CONTAINER=ALL;
     GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser CONTAINER=ALL;
     GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser CONTAINER=ALL;
     GRANT SELECT ON V_$LOGFILE TO flinkuser CONTAINER=ALL;
     GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser CONTAINER=ALL;
     GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser CONTAINER=ALL;
     exit
   ```

更多信息可以参考: [Setting up Oracle](https://debezium.io/documentation/reference/1.6/connectors/oracle.html#setting-up-oracle)

## DB2
DB2 CDC 连接器允许从 DB2 数据库读取快照数据和增量数据。本文描述了如何准备好 DB2 侧相关功能。

### 支持的数据库

| Connector                                                    | Database                                      | Driver               |
| ------------------------------------------------------------ | --------------------------------------------- | -------------------- |
| [Db2-cdc](https://ververica.github.io/flink-cdc-connectors/master/content/connectors/db2-cdc.html) | [Db2](https://www.ibm.com/products/db2): 11.5 | Db2 Driver: 11.5.0.0 |

### DB2 准备
添加 debezium management udf 到 DB2, 设置 asn agent,添加所需的表
1. 登录 DB2 服务器,以要链接的 DB 用户,将 debezium 提供的 mangement udf包(asncdctools)传到 DB2 机器上, 放到目录 $HOME/asncdctools
2. 使用如下命编译 debezium 提供的 udf
   ```
   cd $HOME/asncdctools
   /opt/ibm/db2/V11.5/samples/c/bldrtn asncdc
   ```
3. 确保 JDBC 可以读取 DB2 元数据目录
   ```
   cd $HOME/sqllib/bnd
   db2 bind db2schema.bnd blocking all grant public sqlerror continue
   ```
4. 连接到数据库安装 Debezium 管理 UDF。假设您以 db2instl 用户身份登录,因此应在该 db2inst1 用户上安装 UDF。注意替换 DB_NAME
   ```
   db2 connect to DB_NAME
   ```
5. 复制 Debezium mangement UDF 并为其设置权限
   ```
   cp $HOME/asncdctools/asncdc $HOME/sqllib/function
   chmod 777 $HOME/sqllib/function
   ```
6. 启用启动和停止 ASN 捕获代理 的 Debezium UDF
   ```
   db2 -tvmf $HOME/asncdctools/asncdc_UDF.sql
   ```
7. 创建 ASN 控制表
   ```
   db2 -tvmf $HOME/asncdctools/asncdctables.sql
   ```
8. 启用将表添加到捕获模式并从捕获模式中删除表的 Debezium UDF
   ```
   db2 -tvmf $HOME/asncdctools/asncdcaddremove.sql
   ```
9. 进入 db2 sql client,注意替换 DB_NAME,启动 ASN 代理
   ```
   db2
   connect to DB_NAME

   VALUES ASNCDC.ASNCDCSERVICES('start','asncdc')
   ```
10. 将表置于捕获模式。为要捕获的每个表调用以下语句。替换 MYSCHEMA 为包含要进入捕获模式的表的模式的名称。同样,替换 MYTABLE 为要进入捕获模式的表的名称
   ```
   CALL ASNCDC.ADDTABLE('test', 'test_mq')
   ```
11. 重新初始化 ASN 服务
   ```
   VALUES ASNCDC.ASNCDCSERVICES('reinit','asncdc')

   重新初始化后看见类似如下字样
   2022-12-08-11.46.53.420967 ASN0600I  "AsnCcmd" : "" : "Initial" : Program "capcmd 11.4.0 (Build 11.5.0.0 Level s1906101300, PTF DYN1906101300AMD64)" is starting.
   ```

更多信息可以参考: [Setting up DB2](https://debezium.io/documentation/reference/1.6/connectors/db2.html#setting-up-db2)