前言:今天是学习 flink 的第 19 天啦!学习了 flinkSQL 中窗口的应用,包括滚动窗口,滑动窗口,会话窗口,累计窗口,学会了如何计算累计值(类似于中视频计划中的累计播放量业务需求),多维数据分析等大数据热点问题,总结了很多自己的理解和想法,希望和大家多多交流,希望对大家有帮助!
Tips:"分享是快乐的源泉💧,在我的博客里,不仅有知识的海洋🌊,还有满满的正能量加持💪,快来和我一起分享这份快乐吧😊!
喜欢我的博客的话,记得点个红心❤️和小关小注哦!您的支持是我创作的动力!"
将流变成特殊的“批”处理,常用的窗口:
在 flink 1.13 之前,是一个特殊的 GroupWindowFunction
SELECT TUMBLE_START( bidtime, INTERVAL '10' MINUTE), TUMBLE_END( bidtime, INTERVAL '10' MINUTE), TUMBLE_ROWTIME( bidtime, INTERVAL '10' MINUTE), SUM(price) FROM MyTable GROUP BY TUMBLE( bidtime, INTERVAL '10' MINUTE),
在 flink 1.13 之后,用 Table-Value Function 进行语法标准化
SELECT window_start, window_end, window_time, SUM(price) FROM TABLE( TUMBLE(TABLE MyTable, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES) ) GROUP BY window_start, window_end;
语法:
TUMBLE(TABLE data, DESCRIPTOR(timecol), size) data:一个表名。 timecol:是一个列描述符,指示应将数据的哪个时间属性列映射到翻转窗口。 size:是指定滚动窗口宽度的持续时间。
数据:
2021-04-15 08:05:00,4.00,C 2021-04-15 08:07:00,2.00,A 2021-04-15 08:09:00,5.00,D 2021-04-15 08:11:00,3.00,B 2021-04-15 08:13:00,1.00,E 2021-04-15 08:17:00,6.00,F
需求:现在有一个实时数据看板,需要计算当前每10分钟GMV的总和
package cn.itcast.day02.Window; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row; /** * @author lql * @time 2024-03-16 17:33:47 * @description TODO */ public class GroupWindowsSqlTumbleExample { public static void main(String[] args) throws Exception { //todo 1)构建flink流处理的运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //todo 2)设置并行度 env.setParallelism(1); //todo 3)构建flink的表的运行环境 EnvironmentSettings settings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env, settings); String filePath = GroupWindowsSqlTumbleExample.class.getClassLoader().getResource("bid.csv").getPath(); tabEnv.executeSql("create table Bid(" + "bidtime TIMESTAMP(3)," + "price DECIMAL(10, 2), " + "item string," + "watermark for bidtime as bidtime - interval '1' second) " + "with(" + "'connector' = 'filesystem'," + "'path' = 'file:///"+filePath+"'," + "'format' = 'csv'" + ")"); Table table = tabEnv.sqlQuery("" + "select window_start,window_end,sum(price) as sum_price " + " from table(" + " tumble(table Bid, DESCRIPTOR(bidtime), interval '10' MINUTES))" + " group by window_start,window_end"); tabEnv.toAppendStream(table, Row.class).print(); env.execute(); } }
结果:
+I[2021-04-15T08:00, 2021-04-15T08:10, 11.00] +I[2021-04-15T08:10, 2021-04-15T08:20, 10.00]
语法:
HOP(TABLE data, DESCRIPTOR(timecol), slide, size [, offset ]) data:是一个表名。 timecol:是一个列描述符,指示应将数据的哪个时间属性列映射到滑动窗口。 slide:是一个持续时间,指定了连续跳跃窗口开始之间的持续时间 size:是指定跳变窗口宽度的持续时间
需求:每隔 5 分钟,统计 10 分钟的数据
package cn.itcast.day02.Window; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row; /** * @author lql * @time 2024-03-16 19:28:30 * @description TODO */ public class GroupWindowsSqlHopExample { public static void main(String[] args) throws Exception { //todo 1)构建flink流处理的运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //todo 2)设置并行度 env.setParallelism(1); //todo 3)构建flink的表的运行环境 EnvironmentSettings settings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env, settings); String filePath = GroupWindowsSqlHopExample.class.getClassLoader().getResource("bid.csv").getPath(); tabEnv.executeSql("create table Bid(" + "bidtime TIMESTAMP(3)," + "price DECIMAL(10, 2), " + "item string," + "watermark for bidtime as bidtime - interval '1' second) " + "with(" + "'connector' = 'filesystem'," + "'path' = 'file:///"+filePath+"'," + "'format' = 'csv'" + ")"); Table table = tabEnv.sqlQuery("" + "select window_start,window_end,sum(price) as sum_price " + " from table(" + " hop(table Bid, DESCRIPTOR(bidtime), interval '5' MINUTES, interval '10' MINUTES))" + " group by window_start,window_end"); tabEnv.toAppendStream(table, Row.class).print(); env.execute(); } }
结果:
+I[2021-04-15T08:00, 2021-04-15T08:10, 11.00] +I[2021-04-15T08:05, 2021-04-15T08:15, 15.00] +I[2021-04-15T08:10, 2021-04-15T08:20, 10.00] +I[2021-04-15T08:15, 2021-04-15T08:25, 6.00]
Flink1.13 版本中不支持 Window TVF,预计在 flink1.14 版本中支持;
需求:用老版本实现,定义 Session Gap 为3分钟,一个窗口最后一条数据之后的三分钟内没有新数据出现,则该窗口关闭,再之后的数据被归为下一个窗口
package cn.itcast.day02.Window; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; /** * @author lql * @time 2024-03-16 19:37:20 * @description TODO */ public class GroupWindowsSqlSessionExample { public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); String filePath = GroupWindowsSqlSessionExample.class.getClassLoader().getResource("bid.csv").getPath(); // 作为事件时间的字段必须是 timestamp 类型, 所以根据 long 类型的 ts 计算出来一个 t tEnv.executeSql("create table Bid(" + "bidtime TIMESTAMP(3)," + "price DECIMAL(10, 2), " + "item string," + "watermark for bidtime as bidtime - interval '1' second) " + "with(" + "'connector' = 'filesystem'," + "'path' = 'file:///"+filePath+"'," + "'format' = 'csv'" + ")"); tEnv.sqlQuery("SELECT " + " SESSION_START(bidtime, INTERVAL '3' minute) as wStart, " + " SESSION_END(bidtime, INTERVAL '3' minute) as wEnd, " + " SUM(price) sum_price " + "FROM Bid " + "GROUP BY SESSION(bidtime, INTERVAL '3' minute)" ) .execute() .print(); } }
结果:
+----+-------------------------+-------------------------+-----------+ | op | wStart | wEnd | sum_price | +----+-------------------------+-------------------------+-----------+ | +I | 2021-04-15 08:05:00.000 | 2021-04-15 08:16:00.000 | 15.00 | | +I | 2021-04-15 08:17:00.000 | 2021-04-15 08:20:00.000 | 6.00 | +----+-------------------------+-------------------------+-----------+ 2 rows in set
语法:
CUMULATE(TABLE data, DESCRIPTOR(timecol), step, size) TABLE 表名称 DESCRIPTOR 表中作为开窗的时间字段名称 step 大窗口的分割长度 size 指定最大的那个时间窗口
需求:10 分钟作为窗口,统计每隔两分钟的累计数(类似于中视频计划计算播放量完美累计曲线!)
package cn.itcast.day02.Window; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; /** * @author lql * @time 2024-03-16 19:45:02 * @description TODO */ public class GroupWindowsSqlCumulateExample { public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); String filePath = GroupWindowsSqlCumulateExample.class.getClassLoader().getResource("bid.csv").getPath(); // 作为事件时间的字段必须是 timestamp 类型, 所以根据 long 类型的 ts 计算出来一个 t tEnv.executeSql("create table Bid(" + "bidtime TIMESTAMP(3)," + "price DECIMAL(10, 2), " + "item string," + "watermark for bidtime as bidtime - interval '1' second) " + "with(" + "'connector' = 'filesystem'," + "'path' = 'file:///"+filePath+"'," + "'format' = 'csv'" + ")"); tEnv.sqlQuery("SELECT window_start, window_end, SUM(price) as sum_price\n" + " FROM TABLE(\n" + " CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES))\n" + " GROUP BY window_start, window_end" ) .execute() .print(); } }
结果:
+----+-------------------------+-------------------------+-----------+ | op | window_start | window_end | sum_price | +----+-------------------------+-------------------------+-----------+ | +I | 2021-04-15 08:00:00.000 | 2021-04-15 08:06:00.000 | 4.00 | | +I | 2021-04-15 08:00:00.000 | 2021-04-15 08:08:00.000 | 6.00 | | +I | 2021-04-15 08:00:00.000 | 2021-04-15 08:10:00.000 | 11.00 | | +I | 2021-04-15 08:10:00.000 | 2021-04-15 08:12:00.000 | 3.00 | | +I | 2021-04-15 08:10:00.000 | 2021-04-15 08:14:00.000 | 4.00 | | +I | 2021-04-15 08:10:00.000 | 2021-04-15 08:16:00.000 | 4.00 | | +I | 2021-04-15 08:10:00.000 | 2021-04-15 08:18:00.000 | 10.00 | | +I | 2021-04-15 08:10:00.000 | 2021-04-15 08:20:00.000 | 10.00 | +----+-------------------------+-------------------------+-----------+ 8 rows in set
当前效果:
SELECT window_start, window_end, userId, category, sum(price) as sum_price FROM TABLE( TUMBLE(TABLE orders, DESCRIPTOR(t), INTERVAL '5' SECONDS)) GROUP BY window_start, window_end, GROUPING SETS((userId, category), (userId), ())
以前效果:
// () SELECT window_start, window_end, 'NULL' as userId, 'NULL' as category, sum(price) as sum_price FROM TABLE( TUMBLE(TABLE orders, DESCRIPTOR(t), INTERVAL '5' SECONDS)) GROUP BY window_start, window_end UNION ALL // (userId) SELECT window_start, window_end, userId as userId, 'NULL' as category, sum(price) as sum_price FROM TABLE( TUMBLE(TABLE orders, DESCRIPTOR(t), INTERVAL '5' SECONDS)) GROUP BY window_start, window_end, userId UNION ALL // (userId, category) SELECT window_start, window_end,userId, category, sum(price) as sum_price FROM TABLE( TUMBLE(TABLE orders, DESCRIPTOR(t), INTERVAL '5' SECONDS)) GROUP BY window_start, window_end, userId, category
速记:从右往左,全面到稀缺!
GROUP BY ROLLUP(a, b, c) --等价于以下语句。 GROUPING SETS((a,b,c),(a,b),(a), ()) GROUP BY ROLLUP ( a, (b, c), d ) --等价于以下语句。 GROUPING SETS ( ( a, b, c, d ), ( a, b, c ), ( a ), ( ) )
速记:排列组合
GROUP BY CUBE(a, b, c) --等价于以下语句。 GROUPING SETS((a,b,c),(a,b),(a,c),(b,c),(a),(b),(c),()) GROUP BY CUBE ( (a, b), (c, d) ) --等价于以下语句。 GROUPING SETS ( ( a, b, c, d ), ( a, b ), ( c, d ), ( ) ) // CUBE 和 GROUPING SETS 组合,相当于排列组合基础上加上元素 GROUP BY a, CUBE (b, c), GROUPING SETS ((d), (e)) --等价于以下语句。 GROUP BY GROUPING SETS ( (a, b, c, d), (a, b, c, e), (a, b, d), (a, b, e), (a, c, d), (a, c, e), (a, d), (a, e) )
背景:GROUPING SETS 结果中使用 NULL 充当占位符,导致无法区分占位符 NULL 与数据中真正的 NULL。
实例:
SELECT window_start, window_end, userId, category, GROUPING(category) as categoryFlag, sum(price) as sum_price, IF(GROUPING(category) = 0, category, 'ALL') as `all` FROM TABLE( TUMBLE(TABLE orders, DESCRIPTOR(t), INTERVAL '5' SECONDS)) GROUP BY window_start, window_end, GROUPING SETS((userId, category), (userId))
结果:
window_start | window_end | userId | category | sum_price | flag | all |
---|---|---|---|---|---|---|
2021-05-23 05:16:35.000 | 2021-05-23 05:16:40.000 | NULL | NULL | 10.1 | 1 | ALL |
2021-05-23 05:16:40.000 | 2021-05-23 05:16:45.000 | NULL | NULL | 96.6 | 1 | ALL |
2021-05-23 05:16:45.000 | 2021-05-23 05:16:50.000 | NULL | NULL | 15.6 | 1 | ALL |
2021-05-23 05:16:35.000 | 2021-05-23 05:16:40.000 | user_001 | 电脑 | 10.1 | 0 | 电脑 |
2021-05-23 05:16:40.000 | 2021-05-23 05:16:45.000 | user_001 | 手机 | 14.1 | 0 | 手机 |
2021-05-23 05:16:40.000 | 2021-05-23 05:16:45.000 | user_002 | 手机 | 82.5 | 0 | 手机 |
2021-05-23 05:16:45.000 | 2021-05-23 05:16:50.000 | user_001 | 电脑 | 15.6 | 0 | 电脑 |
2021-05-23 05:16:35.000 | 2021-05-23 05:16:40.000 | user_001 | NULL | 10.1 | 1 | ALL |
2021-05-23 05:16:40.000 | 2021-05-23 05:16:45.000 | user_001 | NULL | 14.1 | 1 | ALL |
2021-05-23 05:16:40.000 | 2021-05-23 05:16:45.000 | user_002 | NULL | 82.5 | 1 | ALL |
2021-05-23 05:16:45.000 | 2021-05-23 05:16:50.000 | user_001 | NULL | 15.6 | 1 | ALL |
MaxCompute还提供了无参数的 GROUPING__ID 函数,用于兼容Hive查询。
结果是将参数列的GROUPING结果按照BitMap的方式组成整数
MaxCompute 和 Hive 2.3.0 及以上版本兼容该函数,在Hive 2.3.0以下版本中该函数输出不一致,因此并不推荐使用此函数。
SELECT a,b,c , COUNT(*), GROUPING_ID FROM VALUES (1,2,3) as t(a,b,c) GROUP BY a, b, c GROUPING SETS ((a,b,c), (a)); GROUPING_ID既无输入参数,也无括号。此表达方式在 MaxCompute 中等价于 GROUPING_ID(a,b,c),参数与 GROUP BY 的顺序一致。
模板:计算每10分钟营业时间窗内销售额最高的前3名供应商。
SELECT * FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY price DESC) as rownum FROM ( SELECT window_start, window_end, supplier_id, SUM(price) as price, COUNT(*) as cnt FROM TABLE( TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)) GROUP BY window_start, window_end, supplier_id ) ) WHERE rownum <= 3;
思路:先算滚动时间 10 分钟,按照窗口时间,id 分组求和,再排序函数取前三。
按照行进行划分:BETWEEN (UNBOUNDED | rowCount) PRECEDING AND CURRENT ROW
注解:如果不加 rowCount 相当于从以前到现在,加上 rowCount 相当于从前 n 行到现在!
数据源:
itemID | itemType | onSellTime | price |
---|---|---|---|
ITEM001 | Electronic | 2021-05-11 10:01:00.000 | 20 |
ITEM002 | Electronic | 2021-05-11 10:02:00.000 | 50 |
ITEM003 | Electronic | 2021-05-11 10:03:00.000 | 30 |
ITEM004 | Electronic | 2021-05-11 10:03:00.000 | 60 |
ITEM005 | Electronic | 2021-05-11 10:05:00.000 | 40 |
ITEM006 | Electronic | 2021-05-11 10:06:00.000 | 20 |
ITEM007 | Electronic | 2021-05-11 10:07:00.000 | 70 |
ITEM008 | Clothes | 2021-05-11 10:08:00.000 | 20 |
ITEM009 | Clothes | 2021-05-11 10:09:00.000 | 40 |
ITEM010 | Clothes | 2021-05-11 10:11:00.000 | 30 |
示例:按照 itemType 分组,onSellTime 升序,求从以前到现在总金额
select itemID, itemType, onSellTime, price, sum(price) over w as sumPrice from tmall_item WINDOW w AS ( PARTITION BY itemType ORDER BY onSellTime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW )
结果:
itemID | itemType | onSellTime | price | sumPrice |
---|---|---|---|---|
ITEM001 | Electronic | 2021-05-11 10:01:00.000 | 20.0 | 20.0 |
ITEM002 | Electronic | 2021-05-11 10:02:00.000 | 50.0 | 70.0 |
ITEM003 | Electronic | 2021-05-11 10:03:00.000 | 30.0 | 100.0 |
ITEM004 | Electronic | 2021-05-11 10:03:00.000 | 60.0 | 160.0 |
ITEM005 | Electronic | 2021-05-11 10:05:00.000 | 40.0 | 200.0 |
ITEM006 | Electronic | 2021-05-11 10:06:00.000 | 20.0 | 220.0 |
ITEM007 | Electronic | 2021-05-11 10:07:00.000 | 70.0 | 290.0 |
ITEM008 | Clothes | 2021-05-11 10:08:00.000 | 20.0 | 20.0 |
ITEM009 | Clothes | 2021-05-11 10:09:00.000 | 40.0 | 60.0 |
ITEM010 | Clothes | 2021-05-11 10:11:00.000 | 30.0 | 90.0 |
按照时间进行划分:ROWS BETWEEN ( UNBOUNDED | rowCount ) preceding AND CURRENT ROW
例子:实时统计两分钟内金额
select itemID, itemType, onSellTime, price, sum(price) over w as sumPrice from tmall_item WINDOW w AS ( PARTITION BY itemType ORDER BY onSellTime RANGE BETWEEN INTERVAL '2' MINUTE preceding AND CURRENT ROW )
Tumble 类方法:
例子:每隔5秒钟统计一次每个商品类型的销售总额
public class GroupWindowsTableApiTumbleExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); SingleOutputStreamOperatordataStream = env .fromElements( new OrderInfo("电脑", 1000L, 100D), new OrderInfo("手机", 2000L, 200D), new OrderInfo("电脑", 3000L, 300D), new OrderInfo("手机", 4000L, 400D), new OrderInfo("手机", 5000L, 500D), new OrderInfo("电脑", 6000L, 600D)) .assignTimestampsAndWatermarks( WatermarkStrategy . forBoundedOutOfOrderness(Duration.ofSeconds(5)) .withTimestampAssigner((element, recordTimestamp) -> element.getTimestamp()) ); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); Table table = tableEnv .fromDataStream(dataStream, $("category"), $("timestamp").rowtime(), $("money")); table .window(Tumble.over(lit(5).second()) .on($("timestamp")).as("w")) // 定义滚动窗口并给窗口起一个别名 .groupBy($("category"), $("w")) // 窗口必须出现的分组字段中 .select($("category"), $("w").start().as("window_start"), $("w").end().as("window_end"), $("money").sum().as("total_money")) .execute() .print(); env.execute(); } @Data @AllArgsConstructor @NoArgsConstructor public static class OrderInfo { private String category; private Long timestamp; private Double money; } }
Slide 类方法:
例子:每隔5秒钟统计过去10秒钟每个商品类型的销售总额
public class GroupWindowsTableApiTumbleExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); SingleOutputStreamOperatordataStream = env .fromElements( new OrderInfo("电脑", 1000L, 100D), new OrderInfo("手机", 2000L, 200D), new OrderInfo("电脑", 3000L, 300D), new OrderInfo("手机", 4000L, 400D), new OrderInfo("手机", 5000L, 500D), new OrderInfo("电脑", 6000L, 600D)) .assignTimestampsAndWatermarks( WatermarkStrategy . forBoundedOutOfOrderness(Duration.ofSeconds(5)) .withTimestampAssigner((element, recordTimestamp) -> element.getTimestamp()) ); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); Table table = tableEnv .fromDataStream(dataStream, $("category"), $("timestamp").rowtime(), $("money")); table .window(Slide.over(lit(10).second()) .every(lit(5).second()) .on($("timestamp")) .as("w")) // 定义滚动窗口并给窗口起一个别名 .groupBy($("category"), $("w")) // 窗口必须出现的分组字段中 .select($("category"), $("w").start().as("window_start"), $("w").end().as("window_end"), $("money").sum().as("total_money")) .execute() .print(); env.execute(); } @Data @AllArgsConstructor @NoArgsConstructor public static class OrderInfo { private String category; private Long timestamp; private Double money; } }
Session 类方法:
例子:两次的时间间隔超过6秒的基础上,没有新的订单事件这个窗口就会关闭,然后处理这个窗口区间内所产生的订单数据计算
public class GroupWindowsTableApiTumbleExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); SingleOutputStreamOperatordataStream = env .fromElements( new OrderInfo("电脑", 1000L, 100D), new OrderInfo("手机", 2000L, 200D), new OrderInfo("电脑", 3000L, 300D), new OrderInfo("手机", 4000L, 400D), new OrderInfo("手机", 5000L, 500D), new OrderInfo("电脑", 6000L, 600D)) .assignTimestampsAndWatermarks( WatermarkStrategy . forBoundedOutOfOrderness(Duration.ofSeconds(5)) .withTimestampAssigner((element, recordTimestamp) -> element.getTimestamp()) ); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); Table table = tableEnv .fromDataStream(dataStream, $("category"), $("timestamp").rowtime(), $("money")); table .window(Session.withGap(lit(6).second()) .on($("timestamp")) .as("w")) // 定义滚动窗口并给窗口起一个别名 .groupBy($("category"), $("w")) // 窗口必须出现的分组字段中 .select($("category"), $("w").start().as("window_start"), $("w").end().as("window_end"), $("money").sum().as("total_money")) .execute() .print(); env.execute(); } @Data @AllArgsConstructor @NoArgsConstructor public static class OrderInfo { private String category; private Long timestamp; private Double money; } }
上一篇:MySQL 开发规范