案例用到的测试数据请参考文章:
Flink自定义Source模拟数据流
原文链接:https://blog.csdn.net/m0_52606060/article/details/135436048
在实际应用中,事件时间语义会更为常见。一般情况下,业务日志数据中都会记录数据生成的时间戳(timestamp),它就可以作为事件时间的判断基础。
在Flink中,由于处理时间比较简单,早期版本默认的时间语义是处理时间;而考虑到事件时间在实际应用中更为广泛,从Flink1.12版本开始,Flink已经将事件时间作为默认的时间语义了。
在Flink中,用来衡量事件时间进展的标记,就被称作“水位线”(Watermark)。
具体实现上,水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点,主要内容就是一个时间戳,用来指示当前的事件时间。而它插入流中的位置,就应该是在某个数据到来之后;这样就可以从这个数据中提取时间戳,作为当前水位线的时间戳了。
注意:Flink中窗口并不是静态准备好的,而是动态创建——当有落在这个窗口区间范围的数据达到时,才创建对应的窗口。另外,这里我们认为到达窗口结束时间时,窗口就触发计算并关闭,事实上“触发计算”和“窗口关闭”两个行为也可以分开,这部分内容我们会在后面详述。
完美的水位线是“绝对正确”的,也就是一个水位线一旦出现,就表示这个时间之前的数据已经全部到齐、之后再也不会出现了。不过如果要保证绝对正确,就必须等足够长的时间,这会带来更高的延迟。
如果我们希望处理得更快、实时性更强,那么可以将水位线延迟设得低一些。这种情况下,可能很多迟到数据会在水位线之后才到达,就会导致窗口遗漏数据,计算结果不准确。当然,如果我们对准确性完全不考虑、一味地追求处理速度,可以直接使用处理时间语义,这在理论上可以得到最低的延迟。
所以Flink中的水位线,其实是流处理中对低延迟和结果正确性的一个权衡机制,而且把控制的权力交给了程序员,我们可以在代码中定义水位线的生成策略。
在Flink的DataStream API中,有一个单独用于生成水位线的方法:.assignTimestampsAndWatermarks(),它主要用来为流中的数据分配时间戳,并生成水位线来指示事件时间。具体使用如下:
DataStreamstream = env.addSource(new ClickSource()); DataStream withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks( );
说明:WatermarkStrategy作为参数,这就是所谓的“水位线生成策略”。WatermarkStrategy是一个接口,该接口中包含了一个“时间戳分配器”TimestampAssigner和一个“水位线生成器”
WatermarkGenerator。 public interface WatermarkStrategyextends TimestampAssignerSupplier , WatermarkGeneratorSupplier { // 负责从流中数据元素的某个字段中提取时间戳,并分配给元素。时间戳的分配是生成水位线的基础。 @Override TimestampAssigner createTimestampAssigner(TimestampAssignerSupplier.Context context); // 主要负责按照既定的方式,基于时间戳生成水位线 @Override WatermarkGenerator createWatermarkGenerator(WatermarkGeneratorSupplier.Context context); }
有序流中内置水位线设置
对于有序流,主要特点就是时间戳单调增长,所以永远不会出现迟到数据的问题。这是周期性生成水位线的最简单的场景,直接调用WatermarkStrategy.forMonotonousTimestamps()方法就可以实现。
import com.zxl.bean.Orders; import com.zxl.datas.OrdersData; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; public class WatermarkStrategyDemo { public static void main(String[] args) throws Exception { //创建Flink流处理执行环境 StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); //设置并行度为1 environment.setParallelism(1); // TODO: 2024/1/7 定义时间语义 environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //调用Flink自定义Source // TODO: 2024/1/6 订单数据 DataStreamSourceordersDataStreamSource = environment.addSource(new OrdersData()); // TODO: 2024/1/11 定义Watermark策略 WatermarkStrategy ordersWatermarkStrategy = WatermarkStrategy. forMonotonousTimestamps() .withTimestampAssigner(new SerializableTimestampAssigner () { @Override public long extractTimestamp(Orders orders, long l) { System.out.println("返回时间戳"+orders.getOrder_date()+"毫秒13位的"); return orders.getOrder_date(); } }); // TODO: 2024/1/11 指定 watermark策略 SingleOutputStreamOperator watermarks = ordersDataStreamSource.assignTimestampsAndWatermarks(ordersWatermarkStrategy); // TODO: 2024/1/11 进行聚合运算 SingleOutputStreamOperator reduce = watermarks.keyBy(orders -> orders.getProduct_id()) // TODO: 2024/1/11 定义时间窗口 .window(TumblingEventTimeWindows.of(Time.seconds(5))) .reduce(new ReduceFunction () { @Override public Orders reduce(Orders orders, Orders t1) throws Exception { Orders orders1 = new Orders(t1.getOrder_id(), t1.getUser_id(), t1.getOrder_date(), t1.getOrder_amount() + orders.getOrder_amount(), t1.getProduct_id(), t1.getOrder_num()); return orders1; } }); ordersDataStreamSource.print("聚合前数据"); reduce.print("聚合数据"); environment.execute(); } }
由于乱序流中需要等待迟到数据到齐,所以必须设置一个固定量的延迟时间。这时生成水位线的时间戳,就是当前数据流中最大的时间戳减去延迟的结果,相当于把表调慢,当前时钟会滞后于数据的最大时间戳。调用WatermarkStrategy. forBoundedOutOfOrderness()方法就可以实现。这个方法需要传入一个maxOutOfOrderness参数,表示“最大乱序程度”,它表示数据流中乱序数据时间戳的最大差值;如果我们能确定乱序程度,那么设置对应时间长度的延迟,就可以等到所有的乱序数据了。
import com.zxl.bean.Orders; import com.zxl.datas.OrdersData; import javafx.scene.input.DataFormat; import org.apache.commons.lang3.time.DateFormatUtils; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import scala.Tuple2; import java.text.SimpleDateFormat; import java.time.Duration; import java.util.Date; public class WatermarkOutOfOrdernessDemo { public static void main(String[] args) throws Exception { //创建Flink流处理执行环境 StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); //设置并行度为1 environment.setParallelism(1); // TODO: 2024/1/7 定义时间语义 environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //调用Flink自定义Source // TODO: 2024/1/6 订单数据 DataStreamSourceordersDataStreamSource = environment.addSource(new OrdersData()); // TODO: 2024/1/11 定义Watermark策略 WatermarkStrategy ordersWatermarkStrategy = WatermarkStrategy. forBoundedOutOfOrderness(Duration.ofSeconds(3)) // TODO: 2024/1/11 指定水位线时间戳也可以用 Lambda 表达式 .withTimestampAssigner((orders, l) -> orders.getOrder_date()); // TODO: 2024/1/11 指定水位线 SingleOutputStreamOperator operator = ordersDataStreamSource.assignTimestampsAndWatermarks(ordersWatermarkStrategy); // TODO: 2024/1/11 分组聚合 SingleOutputStreamOperator
周期性生成器一般是通过onEvent()观察判断输入的事件,而在onPeriodicEmit()里发出水位线。
下面是一段自定义周期性生成水位线的代码:
import com.zxl.bean.Orders; import com.zxl.datas.OrdersData; import org.apache.commons.lang3.time.DateFormatUtils; import org.apache.flink.api.common.eventtime.*; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import scala.Tuple2; import java.text.SimpleDateFormat; import java.util.Date; public class CustomPeriodicWatermarkExample { public static void main(String[] args) throws Exception { //创建Flink流处理执行环境 StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); //设置并行度为1 environment.setParallelism(1); // TODO: 2024/1/7 定义时间语义 environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //调用Flink自定义Source // TODO: 2024/1/6 订单数据 DataStreamSourceordersDataStreamSource = environment.addSource(new OrdersData()); // TODO: 2024/1/12 设置水位线 SingleOutputStreamOperator operator = ordersDataStreamSource.assignTimestampsAndWatermarks(new CustomWatermarkStrategy()); // TODO: 2024/1/12 打印数据 ordersDataStreamSource.map(new MapFunction >() { @Override public Tuple2 map(Orders orders) throws Exception { //时间格式,HH是24小时制,hh是AM PM12小时制 SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); //比如timestamp=1449210225945; String date_string = sdf.format(new Date(orders.getOrder_date())); return new Tuple2<>(date_string,orders.getOrder_amount()); } }).print(); // TODO: 2024/1/11 分组聚合 SingleOutputStreamOperator reduce = operator.keyBy(orders -> orders.getProduct_id()) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .process(new ProcessWindowFunction () { // TODO: 2024/1/9 参数说明:分组key值,窗口计时器,按照key分组后的数据,收集器 @Override public void process(Integer integer, ProcessWindowFunction .Context context, Iterable elements, Collector collector) throws Exception { // TODO: 2024/1/9 窗口内同一个key包含的数据条数 long count = elements.spliterator().estimateSize(); // TODO: 2024/1/9 窗口的开始时间 long windowStartTs = context.window().getStart(); // TODO: 2024/1/9 窗口的结束时间 long windowEndTs = context.window().getEnd(); String windowStart = DateFormatUtils.format(windowStartTs, "yyyy-MM-dd HH:mm:ss.SSS"); String windowEnd = DateFormatUtils.format(windowEndTs, "yyyy-MM-dd HH:mm:ss.SSS"); // TODO: 2024/1/9 输出收集器 collector.collect("key=" + integer + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString()); } }); reduce.print("聚合数据"); environment.execute(); } // TODO: 2024/1/12 自定义水位线生成器 public static class CustomWatermarkStrategy implements WatermarkStrategy { @Override public WatermarkGenerator createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { return new WatermarkGenerator () { // TODO: 2024/1/12 延迟时间 private Long delayTime = 2000L; // TODO: 2024/1/12 观察到的最大时间戳 private Long maxTs = -Long.MAX_VALUE + delayTime + 1L; @Override public void onEvent(Orders orders, long l, WatermarkOutput watermarkOutput) { // TODO: 2024/1/12 每来一条数据就调用一次 , 更新最大时间戳 maxTs = Math.max(orders.getOrder_date(),maxTs); } @Override public void onPeriodicEmit(WatermarkOutput output) { // TODO: 2024/1/12 发射水位线,默认200ms调用一次 output.emitWatermark(new Watermark(maxTs - delayTime - 1L)); } }; } @Override public TimestampAssigner createTimestampAssigner(TimestampAssignerSupplier.Context context) { // TODO: 2024/1/12 告诉程序数据源里的时间戳是哪一个字段 return new SerializableTimestampAssigner (){ @Override public long extractTimestamp(Orders orders, long l) { return orders.getOrder_date(); } }; } } }
断点式生成器会不停地检测onEvent()中的事件,当发现带有水位线信息的事件时,就立即发出水位线。我们把发射水位线的逻辑写在onEvent方法当中即可。
我们也可以在自定义的数据源中抽取事件时间,然后发送水位线。这里要注意的是,在自定义数据源中发送了水位线以后,就不能再在程序中使用assignTimestampsAndWatermarks方法来生成水位线了。在自定义数据源中生成水位线和在程序中使用assignTimestampsAndWatermarks方法生成水位线二者只能取其一。
env.fromSource( kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafkasource" )
在流处理中,上游任务处理完水位线、时钟改变之后,要把当前的水位线再次发出,广播给所有的下游子任务。而当一个任务接收到多个上游并行任务传递来的水位线时,应该以最小的那个作为当前任务的事件时钟。
水位线在上下游任务之间的传递,非常巧妙地避免了分布式系统中没有统一时钟的问题,每个任务都以“处理完之前所有数据”为标准来确定自己的时钟。
案例:乱序流的watermark,将并行度设为2,观察现象。
在多个上游并行任务中,如果有其中一个没有数据,由于当前Task是以最小的那个作为当前任务的事件时钟,就会导致当前Task的水位线无法推进,就可能导致窗口无法触发。这时候可以设置空闲等待。
import com.zxl.bean.Orders; import org.apache.commons.lang3.time.DateFormatUtils; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.Partitioner; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import java.time.Duration; public class WatermarkOutOfOrdernessDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); environment.setParallelism(2); DataStreamsocketTextStream = environment.socketTextStream("175.24.186.230", 9999); DataStream streamOperator = socketTextStream.map(Integer::parseInt); //自定义分区 DataStream dataStream = streamOperator.partitionCustom(new Partitioner () { @Override public int partition(Integer integer, int i) { if (integer % 2 == 0) { return 0; } else { return 1; } } }, new KeySelector () { @Override public Integer getKey(Integer integer) throws Exception { return integer; } }); WatermarkStrategy ordersWatermarkStrategy = WatermarkStrategy. forBoundedOutOfOrderness(Duration.ofSeconds(3)) .withTimestampAssigner((orders, l) -> orders.getOrder_date()); SingleOutputStreamOperator watermarks = dataStream.assignTimestampsAndWatermarks( WatermarkStrategy. forMonotonousTimestamps() .withTimestampAssigner((r, ts) -> r * 1000L) // TODO: 2024/1/12 空闲等待5s .withIdleness(Duration.ofSeconds(5))); // 分成两组: 奇数一组,偶数一组 , 开10s的事件时间滚动窗口 watermarks.keyBy(r -> r % 2) .window(TumblingEventTimeWindows.of(Time.seconds(10))) .process(new ProcessWindowFunction () { @Override public void process(Integer integer, Context context, Iterable elements, Collector out) throws Exception { long startTs = context.window().getStart(); long endTs = context.window().getEnd(); String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS"); String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS"); long count = elements.spliterator().estimateSize(); out.collect("key=" + integer + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString()); } }) .print(); environment.execute(); } }
在水印产生时,设置一个乱序容忍度,推迟系统时间的推进,保证窗口计算被延迟执行,为乱序的数据争取更多的时间进入窗口。
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10));
Flink的窗口,也允许迟到数据。当触发了窗口计算后,会先计算当前的结果,但是此时并不会关闭窗口。
以后每来一条迟到数据,就触发一次这条数据所在窗口计算(增量计算)。直到wartermark 超过了窗口结束时间+推迟时间,此时窗口会真正关闭。
.window(TumblingEventTimeWindows.of(Time.seconds(5))) .allowedLateness(Time.seconds(3))
注意:允许迟到只能运用在event time上
import com.zxl.bean.Orders; import com.zxl.datas.OrdersData; import org.apache.commons.lang3.time.DateFormatUtils; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; import java.time.Duration; public class WatermarkLateDemo { public static void main(String[] args) throws Exception { // TODO: 2024/1/15 创建Flink流处理执行环境 StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); // TODO: 2024/1/15 设置并行度为1 environment.setParallelism(1); // TODO: 2024/1/7 定义时间语义 environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); // TODO: 2024/1/6 订单数据 DataStreamSourceordersDataStreamSource = environment.addSource(new OrdersData()); // TODO: 2024/1/15 设置水位线 WatermarkStrategy watermarkStrategy = WatermarkStrategy // TODO: 2024/1/15 设置最大乱序程度,数据流中乱序数据时间戳的最大差值 . forBoundedOutOfOrderness(Duration.ofSeconds(3)) // TODO: 2024/1/15 指定水位线中对应的时间字段 .withTimestampAssigner((orders, l) -> orders.getOrder_date()) // TODO: 2024/1/15 设置空闲等待时间,如果某个分区中有长时间无数据产生将放弃此分区的水位线选举权力 .withIdleness(Duration.ofSeconds(3)); // TODO: 2024/1/15 添加水位线 SingleOutputStreamOperator operator = ordersDataStreamSource.assignTimestampsAndWatermarks(watermarkStrategy); // TODO: 2024/1/15 定义侧输出流的标签 OutputTag lateData = new OutputTag<>("late_data", Types.POJO(Orders.class)); // TODO: 2024/1/15 设置分区key SingleOutputStreamOperator process = operator.keyBy(orders -> orders.getProduct_id()) // TODO: 2024/1/15 定义滚动窗口时间大小 .window(TumblingEventTimeWindows.of(Time.seconds(5))) // TODO: 2024/1/15 窗口延迟关闭时间 .allowedLateness(Time.seconds(3)) // TODO: 2024/1/15 迟到数据的测输出流 .sideOutputLateData(lateData) // TODO: 2024/1/15 进行数据聚合 .process(new ProcessWindowFunction () { @Override public void process(Integer integer, ProcessWindowFunction .Context context, Iterable elements, Collector collector) throws Exception { long startTs = context.window().getStart(); long endTs = context.window().getEnd(); String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS"); String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS"); long count = elements.spliterator().estimateSize(); collector.collect("key=" + integer + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString()); } }); process.print(); process.getSideOutput(lateData).print("侧输出流"); environment.execute(); } }