Flink CDC地址
Flink官网地址
CDC:全称是 Change Data Capture,即数据变更捕获技术,具体的含义是 通过识别和捕获对数据库中的数据所做的更改(包括数据或数据表的插入、更新、删除;数据库结构的变更调整等),然后将这些更改按发生的顺序完整记录下来,并实时通过中间技术桥梁(消息中间件、TCP等等)将变更顺序消息传送到下游流程或系统的过程。
用于 Apache Flink 的 CDC 连接器是一组源连接器,用于®Apache Flink®,使用变更数据捕获 (CDC) 从不同的数据库引入变更。 Apache Flink 的 CDC 连接器将 Debezium 集成为捕获数据更改的引擎。因此,它可以充分利用Debezium的能力。查看更多关于什么是Debezium的信息。®
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; import com.ververica.cdc.connectors.mysql.source.MySqlSource; public class MySqlBinlogSourceExample { public static void main(String[] args) throws Exception { MySqlSourcemySqlSource = MySqlSource. builder() .hostname("yourHostname") .port(yourPort) .databaseList("yourDatabaseName") // set captured database .tableList("yourDatabaseName.yourTableName") // set captured table .username("yourUsername") .password("yourPassword") .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String .build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // enable checkpoint env.enableCheckpointing(3000); env .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source") // set 4 parallel source tasks .setParallelism(4) .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering env.execute("Print MySQL Snapshot + Binlog"); } }
1.8 UTF-8 UTF-8 2.3.12.RELEASE 2.17.0 3.4.1 4.1.0 1.16.0 org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-logging org.apache.logging.log4j log4j-core ${log4j2.version} org.apache.logging.log4j log4j-api ${log4j2.version} org.apache.logging.log4j log4j-slf4j-impl ${log4j2.version} org.apache.logging.log4j log4j-1.2-api ${log4j2.version} org.springframework.boot spring-boot-starter-log4j2 com.github.pagehelper pagehelper-spring-boot-starter 1.4.1 mysql mysql-connector-java 8.0.29 com.alibaba druid-spring-boot-starter 1.2.4 com.alibaba easyexcel 3.1.1 org.apache.shardingsphere sharding-jdbc-spring-boot-starter 4.1.1 com.baomidou mybatis-plus-boot-starter ${mybatis-plus.version} com.baomidou mybatis-plus-generator ${mybatis-plus.version} org.apache.velocity velocity-engine-core 2.3 com.github.pagehelper pagehelper-spring-boot-starter 1.4.1 com.baomidou dynamic-datasource-spring-boot-starter 3.6.1 org.springframework.boot spring-boot-starter-validation org.apache.httpcomponents httpclient 4.5.14 com.alibaba fastjson 1.2.76 com.aliyun.oss aliyun-sdk-oss 3.15.0 com.xuxueli xxl-job-core 2.4.0 org.apache.flink flink-streaming-java ${flink.version} org.apache.flink flink-clients ${flink.version} com.ververica flink-connector-mysql-cdc 2.3.0 mysql mysql-connector-java org.apache.flink flink-table-api-java 1.12.1
mysql mysql-connector-java 8.0.29 com.ververica flink-connector-mysql-cdc 2.3.0 mysql mysql-connector-java
org.apache.flink flink-table-api-java 1.12.1
版本过低会导致:Caused by: java.sql.SQLSyntaxErrorException: Access denied; you need (at least one of) the RELOAD privilege(s) for this operation,
要解决此问题,你可以按照以下步骤操作:
确保使用的数据库用户具有 RELOAD 权限。请登录到 MySQL 数据库,并为用户授予 RELOAD 权限。例如,使用以下命令为用户 your_user 授予 RELOAD 权限:
GRANT RELOAD ON *.* TO 'your_user'@'localhost';
Flink官方解决上述问题
MySQL CDC source使用增量快照算法,避免了数据库锁的使用,因此不需要"RELOAD"权限。
从 Flink 1.12 版本开始,Flink 引入了对 MySQL CDC 的集成和支持。在这个版本中,Flink 提供了 flink-connector-mysql-cdc 模块,用于实现基于 MySQL 的 Change Data Capture 功能。
在 Flink 1.12 版本中,MySQL CDC 源使用了增量快照算法来捕获数据变更,并且不需要 RELOAD 权限。这种实现方式避免了数据库锁的使用,提供了低延迟的数据变更捕获能力。
代码中设置can.incremental.snapshot.enabled开启,详细代码见代码示例
Configuration config = new Configuration(); // 设置增量快照开启为 true config.setBoolean("scan.incremental.snapshot.enabled", true); env.configure(config);
创建MysqlEventListener 类实现ApplicationRunner ,项目启动时可以启动mysql监听
import com.ververica.cdc.connectors.mysql.source.MySqlSource; import com.ververica.cdc.connectors.mysql.table.StartupOptions; import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; /** * @Description: mysql变更监听器 * @Date: 2023/10/11 **/ public class MysqlEventListener implements ApplicationRunner { @Override public void run(ApplicationArguments args) throws Exception { MySqlSourcemySqlSource = MySqlSource. builder() .hostname("yourHostname") .port(3306) .databaseList("yourDatabaseName") // 设置捕获的数据库, 如果需要同步整个数据库,请将 tableList 设置为 ".*". .tableList("yourDatabaseName.yourTableName") // 设置捕获的表,数据库.表名 .username("yourUsername") .password("yourPassword") .deserializer(new MysqlDeserialization()) // 将 SourceRecord 转换为 自定义对象 /**initial初始化快照,即全量导入后增量导入(检测更新数据写入) * latest:只进行增量导入(不读取历史变化) * timestamp:指定时间戳进行数据导入(大于等于指定时间错读取数据) */ .startupOptions(StartupOptions.latest()) .build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Configuration config = new Configuration(); // 设置增量快照开启为 true config.setBoolean("scan.incremental.snapshot.enabled", true); env.configure(config); env.setParallelism(1); // DebeziumSourceFunction dataChangeInfoMySqlSource = buildDataChangeSource(); DataStreamSource streamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source") .setParallelism(1); streamSource.addSink(new DataChangeSink()); env.execute("mysql-stream-cdc"); }; }
import com.alibaba.fastjson.JSONObject; import com.ververica.cdc.debezium.DebeziumDeserializationSchema; import io.debezium.data.Envelope; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.util.Collector; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; import java.util.List; import java.util.Optional; /** * @Description: mysql自定序列化 * @Date: 2023/10/11 **/ public class MysqlDeserialization implements DebeziumDeserializationSchema{ public static final String TS_MS = "ts_ms"; public static final String BIN_FILE = "file"; public static final String POS = "pos"; public static final String CREATE = "CREATE"; public static final String BEFORE = "before"; public static final String AFTER = "after"; public static final String SOURCE = "source"; public static final String UPDATE = "UPDATE"; /** * 反序列化数据,转变为自定义对象DataChangeInfo * @param sourceRecord * @param collector * @throws Exception */ @Override public void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception { String topic = sourceRecord.topic(); String[] fields = topic.split("\\."); String database = fields[1]; String tableName = fields[2]; Struct struct = (Struct) sourceRecord.value(); final Struct source = struct.getStruct(SOURCE); DataChangeInfo dataChangeInfo = new DataChangeInfo(); dataChangeInfo.setBeforeData( getJsonObject(struct, BEFORE).toJSONString()); dataChangeInfo.setAfterData(getJsonObject(struct, AFTER).toJSONString()); //5.获取操作类型 CREATE UPDATE DELETE Envelope.Operation operation = Envelope.operationFor(sourceRecord); String type = operation.toString().toUpperCase(); int eventType = type.equals(CREATE) ? 1 : UPDATE.equals(type) ? 2 : 3; dataChangeInfo.setEventType(eventType); dataChangeInfo.setFileName(Optional.ofNullable(source.get(BIN_FILE)).map(Object::toString).orElse("")); dataChangeInfo.setFilePos(Optional.ofNullable(source.get(POS)).map(x->Integer.parseInt(x.toString())).orElse(0)); dataChangeInfo.setDatabase(database); dataChangeInfo.setTableName(tableName); dataChangeInfo.setChangeTime(Optional.ofNullable(struct.get(TS_MS)).map(x -> Long.parseLong(x.toString())).orElseGet(System::currentTimeMillis)); //7.输出数据 collector.collect(dataChangeInfo); } /** * 从元数据获取变更前或者变更后的数据 * @param value * @param fieldElement * @return */ private JSONObject getJsonObject(Struct value, String fieldElement) { Struct element = value.getStruct(fieldElement); JSONObject jsonObject = new JSONObject(); if (element != null) { Schema afterSchema = element.schema(); List fieldList = afterSchema.fields(); for (Field field : fieldList) { Object afterValue = element.get(field); jsonObject.put(field.name(), afterValue); } } return jsonObject; } @Override public TypeInformation getProducedType() { return TypeInformation.of(DataChangeInfo.class); } }
import lombok.Data; /** * @Description: 数据变更对象 * @Date: 2023/10/11 **/ @Data public class DataChangeInfo { /** * 变更前数据 */ private String beforeData; /** * 变更后数据 */ private String afterData; /** * 变更类型 1新增 2修改 3删除 */ private Integer eventType; /** * binlog文件名 */ private String fileName; /** * binlog当前读取点位 */ private Integer filePos; /** * 数据库名 */ private String database; /** * 表名 */ private String tableName; /** * 变更时间 */ private Long changeTime; }
import lombok.extern.slf4j.Slf4j; import org.apache.flink.streaming.api.functions.sink.SinkFunction; /** * @Description: 数据处理 * @Date: 2023/10/11 **/ @Slf4j public class DataChangeSink implements SinkFunction{ @Override public void invoke(String value, Context context) throws Exception { log.info("收到变更原始数据:{}", value); //业务代码 } }
项目启动成功
变更前
变更后:点击保存
服务监听结果