Flink CDC主要关注于从源数据库(如MySQL、PostgreSQL等)捕获数据变更,并将这些变更实时地提供给Flink作业进行处理。Flink CDC的核心优势在于其实时性和一致性。通过捕获数据库的增量变动记录,Flink CDC能够实时地将这些变更数据同步到Flink流处理作业中,从而实现低延迟的数据处理和分析。同时,Flink CDC还保证了数据的一致性,确保在数据处理过程中数据的准确性和完整性。
为了实现这一功能,Flink社区开发了flink-cdc-connectors组件。这是一个可以直接从MySQL、PostgreSQL等数据库读取全量数据和增量变更数据的source组件。通过配置相应的连接器和参数,Flink作业可以连接到源数据库,并实时捕获和处理数据变更。
在使用CDC之前务必要开启MySQl的binlog。
修改my.cnf文件,增加:
server_id=1 log_bin=mysql-bin binlog_format=ROW expire_logs_days=15 binlog_do_db=testdb
org.apache.flink flink-java ${flink.version} org.apache.flink flink-streaming-java ${flink.version} org.apache.flink flink-table-api-java ${flink.version} com.ververica flink-connector-mysql-cdc 2.4.0
创建表
CREATE TABLE `userdemo` ( `user_id` VARCHAR(50) NOT NULL COLLATE 'utf8mb4_general_ci', `user_name` VARCHAR(50) NULL DEFAULT NULL COLLATE 'utf8mb4_general_ci', `age` INT(11) NULL DEFAULT '0', PRIMARY KEY (`user_id`) USING BTREE ) COLLATE='utf8mb4_general_ci' ENGINE=InnoDB ;
import com.ververica.cdc.connectors.mysql.MySqlSource; import com.ververica.cdc.connectors.mysql.table.StartupOptions; import com.ververica.cdc.debezium.DebeziumSourceFunction; import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; /** * @Date: 2024/3/12 10:03 * @Description DataStream API CDC **/ public class FlinkMysqlCdc { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DebeziumSourceFunctionsourceFunction = MySqlSource. builder() .hostname("10.168.192.70") .port(3306) .username("root") .password("XXXXX") .databaseList("testdb") // 这里一定要是db.table的形式 .tableList("testdb.userdemo") .serverTimeZone("GMT+8") // .deserializer(new StringDebeziumDeserializationSchema()) .deserializer(new JsonDebeziumDeserializationSchema()) .startupOptions(StartupOptions.initial()) .build(); DataStreamSource dataStreamSource = env.addSource(sourceFunction); dataStreamSource.print(); env.execute("FlinkDSCDC"); } }
运行程序输出内容入下:
{"before":null,"after":{"user_id":"001","user_name":"sdaf","age":23},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1710228835000,"snapshot":"true","db":"testdb","sequence":null,"table":"userdemo","server_id":0,"gtid":null,"file":"xxx.000002","pos":154,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1710228834716,"transaction":null} {"before":null,"after":{"user_id":"002","user_name":"DSF","age":35},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1710228835000,"snapshot":"last","db":"testdb","sequence":null,"table":"userdemo","server_id":0,"gtid":null,"file":"xxx.000002","pos":154,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1710228834720,"transaction":null}
插入数据
INSERT INTO userdemo (user_id,user_name,age) VALUES('004','wangwu',26);
{"before":null,"after":{"user_id":"004","user_name":"wangwu","age":26},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1710235648000,"snapshot":"false","db":"testdb","sequence":null,"table":"userdemo","server_id":1,"gtid":null,"file":"xxx.000002","pos":649,"row":0,"thread":7,"query":null},"op":"c","ts_ms":1710235647380,"transaction":null}
修改userdemo数据
UPDATE userdemo SET user_name='zhangsan' WHERE user_id='001'
运行结果如下:
{"before":{"user_id":"001","user_name":"sdaf","age":23},"after":{"user_id":"001","user_name":"zhangsan","age":23},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1710235526000,"snapshot":"false","db":"testdb","sequence":null,"table":"userdemo","server_id":1,"gtid":null,"file":"xxx.000002","pos":352,"row":0,"thread":7,"query":null},"op":"u","ts_ms":1710235525246,"transaction":null}
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.api.*; public class MyFlinkCDCJob { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // 配置 MySQL CDC 源 String sourceDDL = "CREATE TABLE my_table (" + " id INT NOT NULL," + " name STRING," + " age INT," + " PRIMARY KEY (id) NOT ENFORCED" + ") WITH (" + " 'connector' = 'mysql-cdc'," + " 'hostname' = 'your_mysql_hostname'," + " 'port' = '3306'," + " 'username' = 'your_username'," + " 'password' = 'your_password'," + " 'database-name' = 'your_database_name'," + " 'table-name' = 'your_table_name'" + ")"; tableEnv.executeSql(sourceDDL); // 定义 Flink 作业逻辑 Table result = tableEnv.sqlQuery("SELECT * FROM my_table"); tableEnv.toRetractStream(result, Row.class).print(); // 执行作业 env.execute("My Flink CDC Job"); } }
在上面的代码中,我们创建了一个名为 my_table 的表,该表通过 MySQL CDC 连接器连接到 MySQL 数据库。然后,我们执行一个 SQL 查询来选择这个表中的所有数据,并将结果打印到控制台。
请注意,你需要替换示例代码中的 'your_mysql_hostname', 'your_username', 'your_password', 'your_database_name', 和 'your_table_name' 为你的实际 MySQL 数据库信息。