《Flink SQL 语法篇》系列,共包含以下 10 篇文章:
😊 如果您觉得这篇文章有用 ✔️ 的话,请给博主一个一键三连 🚀🚀🚀 吧 (点赞 🧡、关注 💛、收藏 💚)!!!您的支持 💖💖💖 将激励 🔥 博主输出更多优质内容!!!
Flink 也支持了非常多的数据 Join 方式,主要包括以下三种:
细分 Flink SQL 支持的 Join:
Regular Join 定义(支持 Batch / Streaming):Regular Join 其实就是和离线 Hive SQL 一样的 Regular Join,通过条件关联两条流数据输出。
应用场景:Join 其实在我们的数仓建设过程中应用是非常广泛的。离线数仓可以说基本上是离不开 Join 的。那么实时数仓的建设也必然离不开 Join,比如日志关联扩充维度数据,构建宽表;日志通过 ID 关联计算 CTR。
Regular Join 包含以下几种(以 L 作为左流中的数据标识,R 作为右流中的数据标识):
实际案例:案例为 曝光日志 关联 点击日志 筛选既有曝光又有点击的数据。
-- 曝光日志数据 CREATE TABLE show_log_table ( log_id BIGINT, show_params STRING ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '2', 'fields.show_params.length' = '1', 'fields.log_id.min' = '1', 'fields.log_id.max' = '100' ); -- 点击日志数据 CREATE TABLE click_log_table ( log_id BIGINT, click_params STRING ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '2', 'fields.click_params.length' = '1', 'fields.log_id.min' = '1', 'fields.log_id.max' = '10' ); CREATE TABLE sink_table ( s_id BIGINT, s_params STRING, c_id BIGINT, c_params STRING ) WITH ( 'connector' = 'print' ); -- 流的 INNER JOIN,条件为 log_id INSERT INTO sink_table SELECT show_log_table.log_id as s_id, show_log_table.show_params as s_params, click_log_table.log_id as c_id, click_log_table.click_params as c_params FROM show_log_table INNER JOIN click_log_table ON show_log_table.log_id = click_log_table.log_id;
输出结果如下:
+I[5, d, 5, f] +I[5, d, 5, 8] +I[5, d, 5, 2] +I[3, 4, 3, 0] +I[3, 4, 3, 3] ...
CREATE TABLE show_log_table ( log_id BIGINT, show_params STRING ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1', 'fields.show_params.length' = '3', 'fields.log_id.min' = '1', 'fields.log_id.max' = '10' ); CREATE TABLE click_log_table ( log_id BIGINT, click_params STRING ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1', 'fields.click_params.length' = '3', 'fields.log_id.min' = '1', 'fields.log_id.max' = '10' ); CREATE TABLE sink_table ( s_id BIGINT, s_params STRING, c_id BIGINT, c_params STRING ) WITH ( 'connector' = 'print' ); INSERT INTO sink_table SELECT show_log_table.log_id as s_id, show_log_table.show_params as s_params, click_log_table.log_id as c_id, click_log_table.click_params as c_params FROM show_log_table LEFT JOIN click_log_table ON show_log_table.log_id = click_log_table.log_id;
输出结果如下:
+I[5, f3c, 5, c05] +I[5, 6e2, 5, 1f6] +I[5, 86b, 5, 1f6] +I[5, f3c, 5, 1f6] -D[3, 4ab, null, null] -D[3, 6f2, null, null] +I[3, 4ab, 3, 765] +I[3, 6f2, 3, 765] +I[2, 3c4, null, null] +I[3, 4ab, 3, a8b] +I[3, 6f2, 3, a8b] +I[2, c03, null, null] ...
CREATE TABLE show_log_table ( log_id BIGINT, show_params STRING ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '2', 'fields.show_params.length' = '1', 'fields.log_id.min' = '1', 'fields.log_id.max' = '10' ); CREATE TABLE click_log_table ( log_id BIGINT, click_params STRING ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '2', 'fields.click_params.length' = '1', 'fields.log_id.min' = '1', 'fields.log_id.max' = '10' ); CREATE TABLE sink_table ( s_id BIGINT, s_params STRING, c_id BIGINT, c_params STRING ) WITH ( 'connector' = 'print' ); INSERT INTO sink_table SELECT show_log_table.log_id as s_id, show_log_table.show_params as s_params, click_log_table.log_id as c_id, click_log_table.click_params as c_params FROM show_log_table FULL JOIN click_log_table ON show_log_table.log_id = click_log_table.log_id;
输出结果如下:
+I[null, null, 7, 6] +I[6, 5, null, null] -D[1, c, null, null] +I[1, c, 1, 2] +I[3, 1, null, null] +I[null, null, 7, d] +I[10, 0, null, null] +I[null, null, 2, 6] -D[null, null, 7, 6] -D[null, null, 7, d] ...
关于 Regular Join 的注意事项:
Interval Join 定义(支持 Batch / Streaming):Interval Join 在离线的概念中是没有的。Interval Join 可以让一条流去 Join 另一条流中前后一段时间内的数据。
应用场景:为什么有 Regular Join 还要 Interval Join 呢?刚刚的案例也讲了,Regular Join 会产生 回撤流,但是在实时数仓中一般写入的 Sink 都是类似于 Kafka 这样的消息队列,然后后面接 Clickhouse 等引擎,这些引擎又不具备处理回撤流的能力。所以博主理解 Interval Join 就是用于消灭回撤流的。
Interval Join 包含以下几种(以 L 作为左流中的数据标识,R 作为右流中的数据标识):
可以发现 Inner Interval Join 和其他三种 Outer Interval Join 的区别在于,Outer 在随着时间推移的过程中,如果有数据过期了之后,会根据是否是 Outer 将没有 Join 到的数据也给输出。
实际案例:还是刚刚的案例,曝光日志 关联 点击日志 筛选既有曝光又有点击的数据,条件是曝光关联之后发生 4 4 4 小时之内的点击。
CREATE TABLE show_log_table ( log_id BIGINT, show_params STRING, row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)), WATERMARK FOR row_time AS row_time ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1', 'fields.show_params.length' = '1', 'fields.log_id.min' = '1', 'fields.log_id.max' = '10' ); CREATE TABLE click_log_table ( log_id BIGINT, click_params STRING, row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)), WATERMARK FOR row_time AS row_time ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1', 'fields.click_params.length' = '1', 'fields.log_id.min' = '1', 'fields.log_id.max' = '10' ); CREATE TABLE sink_table ( s_id BIGINT, s_params STRING, c_id BIGINT, c_params STRING ) WITH ( 'connector' = 'print' ); INSERT INTO sink_table SELECT show_log_table.log_id as s_id, show_log_table.show_params as s_params, click_log_table.log_id as c_id, click_log_table.click_params as c_params FROM show_log_table INNER JOIN click_log_table ON show_log_table.log_id = click_log_table.log_id AND show_log_table.row_time BETWEEN click_log_table.row_time - INTERVAL '4' HOUR AND click_log_table.row_time;
输出结果如下:
6> +I[2, a, 2, 6] 6> +I[2, 6, 2, 6] 2> +I[4, 1, 4, 5] 2> +I[10, 8, 10, d] 2> +I[10, 7, 10, d] 2> +I[10, d, 10, d] 2> +I[5, b, 5, d] 6> +I[1, a, 1, 7]
CREATE TABLE show_log ( log_id BIGINT, show_params STRING, row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)), WATERMARK FOR row_time AS row_time ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1', 'fields.show_params.length' = '1', 'fields.log_id.min' = '1', 'fields.log_id.max' = '10' ); CREATE TABLE click_log ( log_id BIGINT, click_params STRING, row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)), WATERMARK FOR row_time AS row_time ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1', 'fields.click_params.length' = '1', 'fields.log_id.min' = '1', 'fields.log_id.max' = '10' ); CREATE TABLE sink_table ( s_id BIGINT, s_params STRING, c_id BIGINT, c_params STRING ) WITH ( 'connector' = 'print' ); INSERT INTO sink_table SELECT show_log.log_id as s_id, show_log.show_params as s_params, click_log.log_id as c_id, click_log.click_params as c_params FROM show_log LEFT JOIN click_log ON show_log.log_id = click_log.log_id AND show_log.row_time BETWEEN click_log.row_time - INTERVAL '5' SECOND AND click_log.row_time + INTERVAL '5' SECOND;
输出结果如下:
+I[6, e, 6, 7] +I[11, d, null, null] +I[7, b, null, null] +I[8, 0, 8, 3] +I[13, 6, null, null]
CREATE TABLE show_log ( log_id BIGINT, show_params STRING, row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)), WATERMARK FOR row_time AS row_time ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1', 'fields.show_params.length' = '1', 'fields.log_id.min' = '5', 'fields.log_id.max' = '15' ); CREATE TABLE click_log ( log_id BIGINT, click_params STRING, row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)), WATERMARK FOR row_time AS row_time ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1', 'fields.click_params.length' = '1', 'fields.log_id.min' = '1', 'fields.log_id.max' = '10' ); CREATE TABLE sink_table ( s_id BIGINT, s_params STRING, c_id BIGINT, c_params STRING ) WITH ( 'connector' = 'print' ); INSERT INTO sink_table SELECT show_log.log_id as s_id, show_log.show_params as s_params, click_log.log_id as c_id, click_log.click_params as c_params FROM show_log LEFT JOIN click_log ON show_log.log_id = click_log.log_id AND show_log.row_time BETWEEN click_log.row_time - INTERVAL '5' SECOND AND click_log.row_time + INTERVAL '5' SECOND;
输出结果如下:
+I[6, 1, null, null] +I[7, 3, 7, 8] +I[null, null, 6, 6] +I[null, null, 4, d] +I[8, d, null, null] +I[null, null, 3, b]
关于 Interval Join 的注意事项: