相关推荐recommended
Flink实战之 MySQL CDC
作者:mmseoamin日期:2024-04-29

文章目录

  • 前言
  • MySQL 启用binlog
  • 添加maven依赖
  • 使用DataStream API java代码读取CDC数据流
  • 使用Flink SQL读取CDC数据

    前言

    Flink CDC主要关注于从源数据库(如MySQL、PostgreSQL等)捕获数据变更,并将这些变更实时地提供给Flink作业进行处理。Flink CDC的核心优势在于其实时性和一致性。通过捕获数据库的增量变动记录,Flink CDC能够实时地将这些变更数据同步到Flink流处理作业中,从而实现低延迟的数据处理和分析。同时,Flink CDC还保证了数据的一致性,确保在数据处理过程中数据的准确性和完整性。

    为了实现这一功能,Flink社区开发了flink-cdc-connectors组件。这是一个可以直接从MySQL、PostgreSQL等数据库读取全量数据和增量变更数据的source组件。通过配置相应的连接器和参数,Flink作业可以连接到源数据库,并实时捕获和处理数据变更。


    MySQL 启用binlog

    在使用CDC之前务必要开启MySQl的binlog。

    修改my.cnf文件,增加:

    server_id=1
    log_bin=mysql-bin
    binlog_format=ROW
    expire_logs_days=15
    binlog_do_db=testdb
    

    添加maven依赖

            
                org.apache.flink
                flink-java
                ${flink.version}
            
            
                org.apache.flink
                flink-streaming-java
                ${flink.version}
            
            
                org.apache.flink
                flink-table-api-java
                ${flink.version}
            
            
                com.ververica
                flink-connector-mysql-cdc
                2.4.0
            
    

    使用DataStream API java代码读取CDC数据流

    创建表

    CREATE TABLE `userdemo` (
    	`user_id` VARCHAR(50) NOT NULL COLLATE 'utf8mb4_general_ci',
    	`user_name` VARCHAR(50) NULL DEFAULT NULL COLLATE 'utf8mb4_general_ci',
    	`age` INT(11) NULL DEFAULT '0',
    	PRIMARY KEY (`user_id`) USING BTREE
    )
    COLLATE='utf8mb4_general_ci'
    ENGINE=InnoDB
    ;
    
    import com.ververica.cdc.connectors.mysql.MySqlSource;
    import com.ververica.cdc.connectors.mysql.table.StartupOptions;
    import com.ververica.cdc.debezium.DebeziumSourceFunction;
    import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    /**
     * @Date: 2024/3/12 10:03
     * @Description DataStream API CDC
     **/
    public class FlinkMysqlCdc {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            DebeziumSourceFunction sourceFunction = MySqlSource.builder()
                    .hostname("10.168.192.70")
                    .port(3306)
                    .username("root")
                    .password("XXXXX")
                    .databaseList("testdb")
                    // 这里一定要是db.table的形式
                    .tableList("testdb.userdemo")
                    .serverTimeZone("GMT+8")
    //                .deserializer(new StringDebeziumDeserializationSchema())
                    .deserializer(new JsonDebeziumDeserializationSchema())
                    .startupOptions(StartupOptions.initial())
                    .build();
            DataStreamSource dataStreamSource = env.addSource(sourceFunction);
            dataStreamSource.print();
            env.execute("FlinkDSCDC");
        }
    }
    

    运行程序输出内容入下:

    {"before":null,"after":{"user_id":"001","user_name":"sdaf","age":23},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1710228835000,"snapshot":"true","db":"testdb","sequence":null,"table":"userdemo","server_id":0,"gtid":null,"file":"xxx.000002","pos":154,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1710228834716,"transaction":null}
    {"before":null,"after":{"user_id":"002","user_name":"DSF","age":35},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1710228835000,"snapshot":"last","db":"testdb","sequence":null,"table":"userdemo","server_id":0,"gtid":null,"file":"xxx.000002","pos":154,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1710228834720,"transaction":null}
    

    插入数据

    INSERT INTO  userdemo (user_id,user_name,age) VALUES('004','wangwu',26);
    
    {"before":null,"after":{"user_id":"004","user_name":"wangwu","age":26},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1710235648000,"snapshot":"false","db":"testdb","sequence":null,"table":"userdemo","server_id":1,"gtid":null,"file":"xxx.000002","pos":649,"row":0,"thread":7,"query":null},"op":"c","ts_ms":1710235647380,"transaction":null}
    

    修改userdemo数据

    UPDATE userdemo SET user_name='zhangsan' WHERE user_id='001'
    

    运行结果如下:

    {"before":{"user_id":"001","user_name":"sdaf","age":23},"after":{"user_id":"001","user_name":"zhangsan","age":23},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1710235526000,"snapshot":"false","db":"testdb","sequence":null,"table":"userdemo","server_id":1,"gtid":null,"file":"xxx.000002","pos":352,"row":0,"thread":7,"query":null},"op":"u","ts_ms":1710235525246,"transaction":null}
    

    使用Flink SQL读取CDC数据流

    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;  
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;  
    import org.apache.flink.table.api.*;  
      
    public class MyFlinkCDCJob {  
        public static void main(String[] args) throws Exception {  
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();  
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);  
      
            // 配置 MySQL CDC 源  
            String sourceDDL = "CREATE TABLE my_table (" +  
                    " id INT NOT NULL," +  
                    " name STRING," +  
                    " age INT," +  
                    " PRIMARY KEY (id) NOT ENFORCED" +  
                    ") WITH (" +  
                    " 'connector' = 'mysql-cdc'," +  
                    " 'hostname' = 'your_mysql_hostname'," +  
                    " 'port' = '3306'," +  
                    " 'username' = 'your_username'," +  
                    " 'password' = 'your_password'," +  
                    " 'database-name' = 'your_database_name'," +  
                    " 'table-name' = 'your_table_name'" +  
                    ")";  
            tableEnv.executeSql(sourceDDL);  
      
            // 定义 Flink 作业逻辑  
            Table result = tableEnv.sqlQuery("SELECT * FROM my_table");  
            tableEnv.toRetractStream(result, Row.class).print();  
      
            // 执行作业  
            env.execute("My Flink CDC Job");  
        }  
    }
    

    在上面的代码中,我们创建了一个名为 my_table 的表,该表通过 MySQL CDC 连接器连接到 MySQL 数据库。然后,我们执行一个 SQL 查询来选择这个表中的所有数据,并将结果打印到控制台。

    请注意,你需要替换示例代码中的 'your_mysql_hostname', 'your_username', 'your_password', 'your_database_name', 和 'your_table_name' 为你的实际 MySQL 数据库信息。