Flink CDC 是Apache Flink ®的一组源连接器,使用变更数据捕获 (CDC) 从不同数据库中获取变更。Apache Flink 的 CDC Connectors集成 Debezium 作为捕获数据更改的引擎。所以它可以充分发挥 Debezium 的能力。
连接器 | 数据库 | 驱动 |
---|---|---|
mongodb-cdc | MongoDB: 3.6, 4.x, 5.0 | MongoDB Driver: 4.3.4 |
mysql-cdc | MySQL: 5.6, 5.7, 8.0.x、RDS MySQL: 5.6, 5.7, 8.0.x、PolarDB MySQL: 5.6, 5.7, 8.0.x、Aurora MySQL: 5.6, 5.7, 8.0.x、MariaDB: 10.x、PolarDB X: 2.0.1 | JDBC Driver: 8.0.28 |
oceanbase-cdc | OceanBase CE: 3.1.x, 4.x、OceanBase EE: 2.x, 3.x, 4.x | OceanBase Driver: 2.4.x |
oracle-cdc | Oracle: 11, 12, 19, 21 | Oracle Driver: 19.3.0.0 |
postgres-cdc | PostgreSQL: 9.6, 10, 11, 12, 13, 14 | JDBC Driver: 42.5.1 |
sqlserver-cdc | Sqlserver: 2012, 2014, 2016, 2017, 2019 | JDBC Driver: 9.4.1.jre8 |
tidb-cdc | TiDB: 5.1.x, 5.2.x, 5.3.x, 5.4.x, 6.0.0 | JDBC Driver: 8.0.27 |
db2-cdc | Db2: 11.5 | Db2 Driver: 11.5.0.0 |
vitess-cdc | Vitess: 8.0.x, 9.0.x | MySql JDBC Driver: 8.0.26 |
下表显示了 Flink CDC Connectors 与 Flink ®的版本对应关系:
Flink CDC版本_ | Flink 版本_ |
---|---|
1.0.0 | 1.11.* |
1.1.0 | 1.11.* |
1.2.0 | 1.12.* |
1.3.0 | 1.12.* |
1.4.0 | 1.13.* |
2.0.* | 1.13.* |
2.1.* | 1.13.* |
2.2.* | 1.13.*、1.14.* |
2.3.* | 1.13.*、1.14.*、1.15.*、1.16.0 |
2.4.* | 1.13.*、1.14*、1.15.*、1.16.*、1.17.0 |
支持读取数据库快照,即使出现故障也能继续读取binlog,并进行Exactly-once处理。
DataStream API 的 CDC 连接器,用户可以在单个作业中使用多个数据库和表的更改,而无需部署 Debezium 和 Kafka。
Table/SQL API 的 CDC 连接器,用户可以使用 SQL DDL 创建 CDC 源来监视单个表上的更改。
我们需要几个步骤来使用提供的连接器设置 Flink 集群。
首先我们安装了 1.17+ 版本的 Flink 集群(java 8+)。
注意: 如果需要安装Flink请查看笔者对应的博客 flink高可用集群搭建(Standalone模式)
本文用到的jar包flink-connector-jdbc-3.1.1-1.17.jar和flink-sql-connector-mysql-cdc-2.2.1.jar
下载 连接器 SQL jar (或自行构建)。
将下载的jar包放在FLINK_HOME/lib/.
重启Flink集群。
注意:目前2.4以上版本需要进行自行编译构建。本文笔者自行进行构建上传的
本教程将展示如何使用 Flink CDC 快速构建 MySQL的流式 ETL。
假设我们将产品数据存储在MySQL中,同步到另外一个MySQL中
在下面的章节中,我们将介绍如何使用 Flink Mysql CDC 来实现它。本教程中的所有练习均在 Flink SQL CLI 中进行,整个过程使用标准 SQL 语法,无需任何 Java/Scala 代码,也无需安装 IDE。
架构概述如下:
需要准备安装好的MySQL数据库,具体MySQL数据怎么安装请查看笔者的博客Ubuntu数据库安装(mysql)
注意: 如果是其他操作系统请查看其他博客对应的数据库安装教程
使用以下命令启动 Flink SQL CLI:
./bin/sql-client.sh
我们应该看到 CLI 客户端的欢迎屏幕。
首先,每 3 秒启用一次检查点
-- Flink SQL Flink SQL> SET execution.checkpointing.interval = 3s;
编辑源数据库Flink Sql代码,如下所示:
CREATE TABLE products ( id INT NOT NULL, name STRING, description STRING, PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', #引入的CDC jar包驱动,没有引入会报错提示需要引入 'hostname' = '192.168.50.163',#源数据库连接host地址,可以根据自己的具体设置,此处为笔者本机的 'port' = '3306', #源数据库端口 'username' = 'root',#源数据库账号 'password' = '*****',#源数据库密码 'database-name' = 'mydb',#源数据库 'table-name' = 'products'#源数据库表 );
在Flink SQL 执行以下语句创建从相应数据库表捕获更改数据的表
-- Flink SQL Flink SQL> CREATE TABLE products ( id INT, name STRING, description STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '192.168.50.163', 'port' = '3306', 'username' = 'root', 'password' = '****', 'database-name' = 'mydb', 'table-name' = 'products' );
编辑目标数据库Flink Sql代码,如下所示:
CREATE TABLE product ( id INT, name STRING, description STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( #引入的jdbc jar包驱动,没有引入会报错提示需要引入 flink-connector-jdbc 'connector' = 'jdbc', #目标数据库连接url地址,可以根据自己的具体设置,此处为笔者本机的。部分高版本的MySQL需要添加useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC 'url' = 'jdbc:mysql://192.168.50.163:3306/mydb1?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC', #需要访问的数据库驱动 'driver' = 'com.mysql.cj.jdbc.Driver', #目标数据库账号 'username' = 'root', #目标据库密码 'password' = '***', #目标数据库表 'table-name' = 'product' );
在Flink SQL 执行以下语句创建捕获更改数据的表与目标数据库表的映射关系
-- Flink SQL Flink SQL> CREATE TABLE product ( id INT, name STRING, description STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://192.168.50.163:3306/mydb1?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC', 'driver' = 'com.mysql.cj.jdbc.Driver', 'username' = 'root', 'password' = 'root', 'table-name' = 'product' );
使用Flink SQL将表product与 表查询products表写入目标MySQL。
-- Flink SQL Flink SQL> insert into product select * from products;
具体操作步骤如下所示:
这是源数据库,操作添加数据,如下图所示:
目标数据库同步操作如下图
红框勾选为运行的同步任务
至此Flink CDC MySQL同步MySQL第一节讲解完毕,后面会更新其复杂操作