前言:今天是学习 flink 的第 19 天啦!学习了 flinkSQL 中窗口的应用,包括滚动窗口,滑动窗口,会话窗口,累计窗口,学会了如何计算累计值(类似于中视频计划中的累计播放量业务需求),多维数据分析等大数据热点问题,总结了很多自己的理解和想法,希望和大家多多交流,希望对大家有帮助!
Tips:"分享是快乐的源泉💧,在我的博客里,不仅有知识的海洋🌊,还有满满的正能量加持💪,快来和我一起分享这份快乐吧😊!
喜欢我的博客的话,记得点个红心❤️和小关小注哦!您的支持是我创作的动力!"
将流变成特殊的“批”处理,常用的窗口:
在 flink 1.13 之前,是一个特殊的 GroupWindowFunction
SELECT TUMBLE_START( bidtime, INTERVAL '10' MINUTE), TUMBLE_END( bidtime, INTERVAL '10' MINUTE), TUMBLE_ROWTIME( bidtime, INTERVAL '10' MINUTE), SUM(price) FROM MyTable GROUP BY TUMBLE( bidtime, INTERVAL '10' MINUTE),
在 flink 1.13 之后,用 Table-Value Function 进行语法标准化
SELECT window_start, window_end, window_time, SUM(price) FROM TABLE( TUMBLE(TABLE MyTable, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES) ) GROUP BY window_start, window_end;
语法:
TUMBLE(TABLE data, DESCRIPTOR(timecol), size) data:一个表名。 timecol:是一个列描述符,指示应将数据的哪个时间属性列映射到翻转窗口。 size:是指定滚动窗口宽度的持续时间。
数据:
2021-04-15 08:05:00,4.00,C 2021-04-15 08:07:00,2.00,A 2021-04-15 08:09:00,5.00,D 2021-04-15 08:11:00,3.00,B 2021-04-15 08:13:00,1.00,E 2021-04-15 08:17:00,6.00,F
需求:现在有一个实时数据看板,需要计算当前每10分钟GMV的总和
package cn.itcast.day02.Window;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
/**
* @author lql
* @time 2024-03-16 17:33:47
* @description TODO
*/
public class GroupWindowsSqlTumbleExample {
public static void main(String[] args) throws Exception {
//todo 1)构建flink流处理的运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//todo 2)设置并行度
env.setParallelism(1);
//todo 3)构建flink的表的运行环境
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env, settings);
String filePath = GroupWindowsSqlTumbleExample.class.getClassLoader().getResource("bid.csv").getPath();
tabEnv.executeSql("create table Bid(" +
"bidtime TIMESTAMP(3)," +
"price DECIMAL(10, 2), " +
"item string," +
"watermark for bidtime as bidtime - interval '1' second) " +
"with("
+ "'connector' = 'filesystem',"
+ "'path' = 'file:///"+filePath+"',"
+ "'format' = 'csv'"
+ ")");
Table table = tabEnv.sqlQuery("" +
"select window_start,window_end,sum(price) as sum_price " +
" from table(" +
" tumble(table Bid, DESCRIPTOR(bidtime), interval '10' MINUTES))" +
" group by window_start,window_end");
tabEnv.toAppendStream(table, Row.class).print();
env.execute();
}
}
结果:
+I[2021-04-15T08:00, 2021-04-15T08:10, 11.00] +I[2021-04-15T08:10, 2021-04-15T08:20, 10.00]
语法:
HOP(TABLE data, DESCRIPTOR(timecol), slide, size [, offset ]) data:是一个表名。 timecol:是一个列描述符,指示应将数据的哪个时间属性列映射到滑动窗口。 slide:是一个持续时间,指定了连续跳跃窗口开始之间的持续时间 size:是指定跳变窗口宽度的持续时间
需求:每隔 5 分钟,统计 10 分钟的数据
package cn.itcast.day02.Window;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
/**
* @author lql
* @time 2024-03-16 19:28:30
* @description TODO
*/
public class GroupWindowsSqlHopExample {
public static void main(String[] args) throws Exception {
//todo 1)构建flink流处理的运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//todo 2)设置并行度
env.setParallelism(1);
//todo 3)构建flink的表的运行环境
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env, settings);
String filePath = GroupWindowsSqlHopExample.class.getClassLoader().getResource("bid.csv").getPath();
tabEnv.executeSql("create table Bid(" +
"bidtime TIMESTAMP(3)," +
"price DECIMAL(10, 2), " +
"item string," +
"watermark for bidtime as bidtime - interval '1' second) " +
"with("
+ "'connector' = 'filesystem',"
+ "'path' = 'file:///"+filePath+"',"
+ "'format' = 'csv'"
+ ")");
Table table = tabEnv.sqlQuery("" +
"select window_start,window_end,sum(price) as sum_price " +
" from table(" +
" hop(table Bid, DESCRIPTOR(bidtime), interval '5' MINUTES, interval '10' MINUTES))" +
" group by window_start,window_end");
tabEnv.toAppendStream(table, Row.class).print();
env.execute();
}
}
结果:
+I[2021-04-15T08:00, 2021-04-15T08:10, 11.00] +I[2021-04-15T08:05, 2021-04-15T08:15, 15.00] +I[2021-04-15T08:10, 2021-04-15T08:20, 10.00] +I[2021-04-15T08:15, 2021-04-15T08:25, 6.00]
Flink1.13 版本中不支持 Window TVF,预计在 flink1.14 版本中支持;
需求:用老版本实现,定义 Session Gap 为3分钟,一个窗口最后一条数据之后的三分钟内没有新数据出现,则该窗口关闭,再之后的数据被归为下一个窗口
package cn.itcast.day02.Window;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
* @author lql
* @time 2024-03-16 19:37:20
* @description TODO
*/
public class GroupWindowsSqlSessionExample {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
String filePath = GroupWindowsSqlSessionExample.class.getClassLoader().getResource("bid.csv").getPath();
// 作为事件时间的字段必须是 timestamp 类型, 所以根据 long 类型的 ts 计算出来一个 t
tEnv.executeSql("create table Bid(" +
"bidtime TIMESTAMP(3)," +
"price DECIMAL(10, 2), " +
"item string," +
"watermark for bidtime as bidtime - interval '1' second) " +
"with("
+ "'connector' = 'filesystem',"
+ "'path' = 'file:///"+filePath+"',"
+ "'format' = 'csv'"
+ ")");
tEnv.sqlQuery("SELECT " +
" SESSION_START(bidtime, INTERVAL '3' minute) as wStart, " +
" SESSION_END(bidtime, INTERVAL '3' minute) as wEnd, " +
" SUM(price) sum_price " +
"FROM Bid " +
"GROUP BY SESSION(bidtime, INTERVAL '3' minute)"
)
.execute()
.print();
}
}
结果:
+----+-------------------------+-------------------------+-----------+ | op | wStart | wEnd | sum_price | +----+-------------------------+-------------------------+-----------+ | +I | 2021-04-15 08:05:00.000 | 2021-04-15 08:16:00.000 | 15.00 | | +I | 2021-04-15 08:17:00.000 | 2021-04-15 08:20:00.000 | 6.00 | +----+-------------------------+-------------------------+-----------+ 2 rows in set
语法:
CUMULATE(TABLE data, DESCRIPTOR(timecol), step, size) TABLE 表名称 DESCRIPTOR 表中作为开窗的时间字段名称 step 大窗口的分割长度 size 指定最大的那个时间窗口
需求:10 分钟作为窗口,统计每隔两分钟的累计数(类似于中视频计划计算播放量完美累计曲线!)
package cn.itcast.day02.Window;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
* @author lql
* @time 2024-03-16 19:45:02
* @description TODO
*/
public class GroupWindowsSqlCumulateExample {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
String filePath = GroupWindowsSqlCumulateExample.class.getClassLoader().getResource("bid.csv").getPath();
// 作为事件时间的字段必须是 timestamp 类型, 所以根据 long 类型的 ts 计算出来一个 t
tEnv.executeSql("create table Bid(" +
"bidtime TIMESTAMP(3)," +
"price DECIMAL(10, 2), " +
"item string," +
"watermark for bidtime as bidtime - interval '1' second) " +
"with("
+ "'connector' = 'filesystem',"
+ "'path' = 'file:///"+filePath+"',"
+ "'format' = 'csv'"
+ ")");
tEnv.sqlQuery("SELECT window_start, window_end, SUM(price) as sum_price\n" +
" FROM TABLE(\n" +
" CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES))\n" +
" GROUP BY window_start, window_end"
)
.execute()
.print();
}
}
结果:
+----+-------------------------+-------------------------+-----------+ | op | window_start | window_end | sum_price | +----+-------------------------+-------------------------+-----------+ | +I | 2021-04-15 08:00:00.000 | 2021-04-15 08:06:00.000 | 4.00 | | +I | 2021-04-15 08:00:00.000 | 2021-04-15 08:08:00.000 | 6.00 | | +I | 2021-04-15 08:00:00.000 | 2021-04-15 08:10:00.000 | 11.00 | | +I | 2021-04-15 08:10:00.000 | 2021-04-15 08:12:00.000 | 3.00 | | +I | 2021-04-15 08:10:00.000 | 2021-04-15 08:14:00.000 | 4.00 | | +I | 2021-04-15 08:10:00.000 | 2021-04-15 08:16:00.000 | 4.00 | | +I | 2021-04-15 08:10:00.000 | 2021-04-15 08:18:00.000 | 10.00 | | +I | 2021-04-15 08:10:00.000 | 2021-04-15 08:20:00.000 | 10.00 | +----+-------------------------+-------------------------+-----------+ 8 rows in set
当前效果:
SELECT window_start, window_end, userId, category, sum(price) as sum_price FROM TABLE( TUMBLE(TABLE orders, DESCRIPTOR(t), INTERVAL '5' SECONDS)) GROUP BY window_start, window_end, GROUPING SETS((userId, category), (userId), ())
以前效果:
// () SELECT window_start, window_end, 'NULL' as userId, 'NULL' as category, sum(price) as sum_price FROM TABLE( TUMBLE(TABLE orders, DESCRIPTOR(t), INTERVAL '5' SECONDS)) GROUP BY window_start, window_end UNION ALL // (userId) SELECT window_start, window_end, userId as userId, 'NULL' as category, sum(price) as sum_price FROM TABLE( TUMBLE(TABLE orders, DESCRIPTOR(t), INTERVAL '5' SECONDS)) GROUP BY window_start, window_end, userId UNION ALL // (userId, category) SELECT window_start, window_end,userId, category, sum(price) as sum_price FROM TABLE( TUMBLE(TABLE orders, DESCRIPTOR(t), INTERVAL '5' SECONDS)) GROUP BY window_start, window_end, userId, category
速记:从右往左,全面到稀缺!
GROUP BY ROLLUP(a, b, c)
--等价于以下语句。
GROUPING SETS((a,b,c),(a,b),(a), ())
GROUP BY ROLLUP ( a, (b, c), d )
--等价于以下语句。
GROUPING SETS (
( a, b, c, d ),
( a, b, c ),
( a ),
( )
)
速记:排列组合
GROUP BY CUBE(a, b, c)
--等价于以下语句。
GROUPING SETS((a,b,c),(a,b),(a,c),(b,c),(a),(b),(c),())
GROUP BY CUBE ( (a, b), (c, d) )
--等价于以下语句。
GROUPING SETS (
( a, b, c, d ),
( a, b ),
( c, d ),
( )
)
// CUBE 和 GROUPING SETS 组合,相当于排列组合基础上加上元素
GROUP BY a, CUBE (b, c), GROUPING SETS ((d), (e))
--等价于以下语句。
GROUP BY GROUPING SETS (
(a, b, c, d), (a, b, c, e),
(a, b, d), (a, b, e),
(a, c, d), (a, c, e),
(a, d), (a, e)
)
背景:GROUPING SETS 结果中使用 NULL 充当占位符,导致无法区分占位符 NULL 与数据中真正的 NULL。
实例:
SELECT window_start, window_end, userId, category, GROUPING(category) as categoryFlag, sum(price) as sum_price, IF(GROUPING(category) = 0, category, 'ALL') as `all` FROM TABLE( TUMBLE(TABLE orders, DESCRIPTOR(t), INTERVAL '5' SECONDS)) GROUP BY window_start, window_end, GROUPING SETS((userId, category), (userId))
结果:
| window_start | window_end | userId | category | sum_price | flag | all |
|---|---|---|---|---|---|---|
| 2021-05-23 05:16:35.000 | 2021-05-23 05:16:40.000 | NULL | NULL | 10.1 | 1 | ALL |
| 2021-05-23 05:16:40.000 | 2021-05-23 05:16:45.000 | NULL | NULL | 96.6 | 1 | ALL |
| 2021-05-23 05:16:45.000 | 2021-05-23 05:16:50.000 | NULL | NULL | 15.6 | 1 | ALL |
| 2021-05-23 05:16:35.000 | 2021-05-23 05:16:40.000 | user_001 | 电脑 | 10.1 | 0 | 电脑 |
| 2021-05-23 05:16:40.000 | 2021-05-23 05:16:45.000 | user_001 | 手机 | 14.1 | 0 | 手机 |
| 2021-05-23 05:16:40.000 | 2021-05-23 05:16:45.000 | user_002 | 手机 | 82.5 | 0 | 手机 |
| 2021-05-23 05:16:45.000 | 2021-05-23 05:16:50.000 | user_001 | 电脑 | 15.6 | 0 | 电脑 |
| 2021-05-23 05:16:35.000 | 2021-05-23 05:16:40.000 | user_001 | NULL | 10.1 | 1 | ALL |
| 2021-05-23 05:16:40.000 | 2021-05-23 05:16:45.000 | user_001 | NULL | 14.1 | 1 | ALL |
| 2021-05-23 05:16:40.000 | 2021-05-23 05:16:45.000 | user_002 | NULL | 82.5 | 1 | ALL |
| 2021-05-23 05:16:45.000 | 2021-05-23 05:16:50.000 | user_001 | NULL | 15.6 | 1 | ALL |
MaxCompute还提供了无参数的 GROUPING__ID 函数,用于兼容Hive查询。
结果是将参数列的GROUPING结果按照BitMap的方式组成整数
MaxCompute 和 Hive 2.3.0 及以上版本兼容该函数,在Hive 2.3.0以下版本中该函数输出不一致,因此并不推荐使用此函数。
SELECT a,b,c , COUNT(*), GROUPING_ID FROM VALUES (1,2,3) as t(a,b,c) GROUP BY a, b, c GROUPING SETS ((a,b,c), (a)); GROUPING_ID既无输入参数,也无括号。此表达方式在 MaxCompute 中等价于 GROUPING_ID(a,b,c),参数与 GROUP BY 的顺序一致。
模板:计算每10分钟营业时间窗内销售额最高的前3名供应商。
SELECT *
FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY price DESC) as rownum
FROM (
SELECT window_start, window_end, supplier_id, SUM(price) as price, COUNT(*) as cnt
FROM TABLE(
TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
GROUP BY window_start, window_end, supplier_id
)
) WHERE rownum <= 3;
思路:先算滚动时间 10 分钟,按照窗口时间,id 分组求和,再排序函数取前三。
按照行进行划分:BETWEEN (UNBOUNDED | rowCount) PRECEDING AND CURRENT ROW
注解:如果不加 rowCount 相当于从以前到现在,加上 rowCount 相当于从前 n 行到现在!
数据源:
| itemID | itemType | onSellTime | price |
|---|---|---|---|
| ITEM001 | Electronic | 2021-05-11 10:01:00.000 | 20 |
| ITEM002 | Electronic | 2021-05-11 10:02:00.000 | 50 |
| ITEM003 | Electronic | 2021-05-11 10:03:00.000 | 30 |
| ITEM004 | Electronic | 2021-05-11 10:03:00.000 | 60 |
| ITEM005 | Electronic | 2021-05-11 10:05:00.000 | 40 |
| ITEM006 | Electronic | 2021-05-11 10:06:00.000 | 20 |
| ITEM007 | Electronic | 2021-05-11 10:07:00.000 | 70 |
| ITEM008 | Clothes | 2021-05-11 10:08:00.000 | 20 |
| ITEM009 | Clothes | 2021-05-11 10:09:00.000 | 40 |
| ITEM010 | Clothes | 2021-05-11 10:11:00.000 | 30 |
示例:按照 itemType 分组,onSellTime 升序,求从以前到现在总金额
select
itemID,
itemType,
onSellTime,
price,
sum(price) over w as sumPrice
from tmall_item
WINDOW w AS (
PARTITION BY itemType
ORDER BY onSellTime
ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW
)
结果:
| itemID | itemType | onSellTime | price | sumPrice |
|---|---|---|---|---|
| ITEM001 | Electronic | 2021-05-11 10:01:00.000 | 20.0 | 20.0 |
| ITEM002 | Electronic | 2021-05-11 10:02:00.000 | 50.0 | 70.0 |
| ITEM003 | Electronic | 2021-05-11 10:03:00.000 | 30.0 | 100.0 |
| ITEM004 | Electronic | 2021-05-11 10:03:00.000 | 60.0 | 160.0 |
| ITEM005 | Electronic | 2021-05-11 10:05:00.000 | 40.0 | 200.0 |
| ITEM006 | Electronic | 2021-05-11 10:06:00.000 | 20.0 | 220.0 |
| ITEM007 | Electronic | 2021-05-11 10:07:00.000 | 70.0 | 290.0 |
| ITEM008 | Clothes | 2021-05-11 10:08:00.000 | 20.0 | 20.0 |
| ITEM009 | Clothes | 2021-05-11 10:09:00.000 | 40.0 | 60.0 |
| ITEM010 | Clothes | 2021-05-11 10:11:00.000 | 30.0 | 90.0 |
按照时间进行划分:ROWS BETWEEN ( UNBOUNDED | rowCount ) preceding AND CURRENT ROW
例子:实时统计两分钟内金额
select
itemID,
itemType,
onSellTime,
price,
sum(price) over w as sumPrice
from tmall_item
WINDOW w AS (
PARTITION BY itemType
ORDER BY onSellTime
RANGE BETWEEN INTERVAL '2' MINUTE preceding AND CURRENT ROW
)
Tumble 类方法:
例子:每隔5秒钟统计一次每个商品类型的销售总额
public class GroupWindowsTableApiTumbleExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator dataStream = env
.fromElements(
new OrderInfo("电脑", 1000L, 100D),
new OrderInfo("手机", 2000L, 200D),
new OrderInfo("电脑", 3000L, 300D),
new OrderInfo("手机", 4000L, 400D),
new OrderInfo("手机", 5000L, 500D),
new OrderInfo("电脑", 6000L, 600D))
.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((element, recordTimestamp) -> element.getTimestamp())
);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
Table table = tableEnv
.fromDataStream(dataStream, $("category"), $("timestamp").rowtime(), $("money"));
table
.window(Tumble.over(lit(5).second())
.on($("timestamp")).as("w")) // 定义滚动窗口并给窗口起一个别名
.groupBy($("category"),
$("w")) // 窗口必须出现的分组字段中
.select($("category"),
$("w").start().as("window_start"),
$("w").end().as("window_end"),
$("money").sum().as("total_money"))
.execute()
.print();
env.execute();
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class OrderInfo {
private String category;
private Long timestamp;
private Double money;
}
}
Slide 类方法:
例子:每隔5秒钟统计过去10秒钟每个商品类型的销售总额
public class GroupWindowsTableApiTumbleExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator dataStream = env
.fromElements(
new OrderInfo("电脑", 1000L, 100D),
new OrderInfo("手机", 2000L, 200D),
new OrderInfo("电脑", 3000L, 300D),
new OrderInfo("手机", 4000L, 400D),
new OrderInfo("手机", 5000L, 500D),
new OrderInfo("电脑", 6000L, 600D))
.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((element, recordTimestamp) -> element.getTimestamp())
);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
Table table = tableEnv
.fromDataStream(dataStream, $("category"), $("timestamp").rowtime(), $("money"));
table
.window(Slide.over(lit(10).second())
.every(lit(5).second())
.on($("timestamp"))
.as("w")) // 定义滚动窗口并给窗口起一个别名
.groupBy($("category"),
$("w")) // 窗口必须出现的分组字段中
.select($("category"),
$("w").start().as("window_start"),
$("w").end().as("window_end"),
$("money").sum().as("total_money"))
.execute()
.print();
env.execute();
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class OrderInfo {
private String category;
private Long timestamp;
private Double money;
}
}
Session 类方法:
例子:两次的时间间隔超过6秒的基础上,没有新的订单事件这个窗口就会关闭,然后处理这个窗口区间内所产生的订单数据计算
public class GroupWindowsTableApiTumbleExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator dataStream = env
.fromElements(
new OrderInfo("电脑", 1000L, 100D),
new OrderInfo("手机", 2000L, 200D),
new OrderInfo("电脑", 3000L, 300D),
new OrderInfo("手机", 4000L, 400D),
new OrderInfo("手机", 5000L, 500D),
new OrderInfo("电脑", 6000L, 600D))
.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((element, recordTimestamp) -> element.getTimestamp())
);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
Table table = tableEnv
.fromDataStream(dataStream, $("category"), $("timestamp").rowtime(), $("money"));
table
.window(Session.withGap(lit(6).second())
.on($("timestamp"))
.as("w")) // 定义滚动窗口并给窗口起一个别名
.groupBy($("category"),
$("w")) // 窗口必须出现的分组字段中
.select($("category"),
$("w").start().as("window_start"),
$("w").end().as("window_end"),
$("money").sum().as("total_money"))
.execute()
.print();
env.execute();
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class OrderInfo {
private String category;
private Long timestamp;
private Double money;
}
}
上一篇:MySQL 开发规范