深入理解 Flink(四)Flink Time+WaterMark+Window 深入分析
作者:mmseoamin日期:2024-01-18

Flink Window 常见需求背景

需求描述

每隔 5 秒,计算最近 10 秒单词出现的次数 —— 滑动窗口

每隔 5 秒,计算最近 5 秒单词出现的次数 —— 滚动窗口

深入理解 Flink(四)Flink Time+WaterMark+Window 深入分析,在这里插入图片描述,第1张

关于 Flink time 种类 TimeCharacteristic

深入理解 Flink(四)Flink Time+WaterMark+Window 深入分析,在这里插入图片描述,第2张

  • ProcessingTime
  • IngestionTime
  • EventTime

    WindowAssigner 的子类

    • SlidingProcessingTimeWindows
    • SlidingEventTimeWindows
    • TumblingEventTimeWindows
    • TumblingProcessingTimeWindows

      使用 EventTime + WaterMark 处理乱序数据

      示意图:

      深入理解 Flink(四)Flink Time+WaterMark+Window 深入分析,在这里插入图片描述,第3张

      • 使用 onPeriodicEmit 方法发送 watermark,默认每 200ms 发一次。
      • 窗口起始时间默认按各个时区的整点时间,支持自定义 offset。

        Flink Watermark 机制定义

        有序的流的 Watermarks

        深入理解 Flink(四)Flink Time+WaterMark+Window 深入分析,在这里插入图片描述,第4张

        无序的流的 Watermarks

        深入理解 Flink(四)Flink Time+WaterMark+Window 深入分析,在这里插入图片描述,第5张

        多并行度流的 Watermarks

        深入理解 Flink(四)Flink Time+WaterMark+Window 深入分析,在这里插入图片描述,第6张

        深入理解 Flink Watermark

        Flink Window 触发的条件:

        1. watermark 时间 >= window_end_time
        2. 在 [window_start_time, window_end_time) 区间中有数据存在(注意是左闭右开的区间),而且是以 event time 来计算的

        Flink 处理太过延迟数据

        Flink 丢弃延迟太多的数据

        企业生产中一般不用。

        Flink 指定允许再次迟到的时间

        治标不治本,企业生产中一般不用。

        Flink 收集迟到的数据单独处理

        企业生产中应用较为广泛。

        Flink 多并行度 Watermark

        一个 window 可能会接受到多个 waterMark,我们以最小的为准。

        深入理解 Flink(四)Flink Time+WaterMark+Window 深入分析,在这里插入图片描述,第7张

        Flink Window 概述

        官网介绍

        https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/windows/

        深入理解 Flink(四)Flink Time+WaterMark+Window 深入分析,在这里插入图片描述,第8张

        Flink Window 分类

        Flink 的 window 分为两种类型的 Window,分别是:Keyed Windows 和 Non-Keyed Windows,他们的使用方式不同:

        // Keyed Windows 
        stream
            .keyBy(...) <- keyed versus non-keyed windows
            .window(...) <- required: "assigner"
            [.trigger(...)] <- optional: "trigger" (else default trigger)
            [.evictor(...)] <- optional: "evictor" (else no evictor)
            [.allowedLateness(...)] <- optional: "lateness" (else zero)
            [.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
            .reduce/aggregate/apply() <- required: "function"
            [.getSideOutput(...)] <- optional: "output tag"
        
        // Non-Keyed Windows
        stream
            .windowAll(...) <- required: "assigner"
            [.trigger(...)] <- optional: "trigger" (else default trigger)
            [.evictor(...)] <- optional: "evictor" (else no evictor)
            [.allowedLateness(...)] <- optional: "lateness" (else zero)
            [.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
            .reduce/aggregate/apply() <- required: "function"
            [.getSideOutput(...)] <- optional: "output tag"
        

        Window 的生命周期

        1. 当属于某个窗口的第一个元素到达的时候,就会创建一个窗口。
        2. 当时间(event or processing time)超过 window 的结束时间戳加上用户指定的允许延迟(Allowed Lateness)时,窗口将被完全删除。
        3. 每个 Window 之上,都绑定有一个 Trigger 或者一个 Function(ProcessWindowFunction, ReduceFunction, or AggregateFunction)用来执行窗口内数据的计算。
        4. 可以给 Window 指定一个 Evictor,它能够在 after the trigger fires 以及 before and/or after the function is applied 从窗口中删除元素。

        Flink Window 类型

        Flink 流批同一前后的 Window 分类:

        深入理解 Flink(四)Flink Time+WaterMark+Window 深入分析,在这里插入图片描述,第9张

        tumblingwindows —— 滚动窗口

        深入理解 Flink(四)Flink Time+WaterMark+Window 深入分析,在这里插入图片描述,第10张

        slidingwindows —— 滑动窗口

        深入理解 Flink(四)Flink Time+WaterMark+Window 深入分析,在这里插入图片描述,第11张

        session windows —— 会话窗口

        深入理解 Flink(四)Flink Time+WaterMark+Window 深入分析,在这里插入图片描述,第12张

        global windows —— 全局窗口

        深入理解 Flink(四)Flink Time+WaterMark+Window 深入分析,在这里插入图片描述,第13张

        Flink Window 操作使用

        高级玩法:自定义 Trigger、自定义 Evictor,读者可自行搜索相关文章与代码。

        Flink Window 增量聚合

        • reduce(ReduceFunction)
        • aggregate(AggregateFunction)
        • sum()
        • min()
        • max()
        • sum()

          Flink Window 全量聚合

          • apply(WindowFunction)
          • process(ProcessWindowFunction)

            Flink Window Join

            // 在 Flink 中对两个 DataStream 做 Join
            // 1、指定两张表
            // 2、指定这两张表的链接字段
            stream.join(otherStream) // 两个流进行关联
                .where() // 选择第一个流的key作为关联字段
                .equalTo() // 选择第二个流的key作为关联字段
                .window() // 设置窗口的类型
                .apply() // 对结果做操作 process apply = foreach
            

            Tumbling Window Join

            深入理解 Flink(四)Flink Time+WaterMark+Window 深入分析,在这里插入图片描述,第14张

            Sliding Window Join

            深入理解 Flink(四)Flink Time+WaterMark+Window 深入分析,在这里插入图片描述,第15张

            Session Window Join

            深入理解 Flink(四)Flink Time+WaterMark+Window 深入分析,在这里插入图片描述,第16张

            Interval Join

            深入理解 Flink(四)Flink Time+WaterMark+Window 深入分析,在这里插入图片描述,第17张

            核心代码示例:

            DataStream orangeStream = ...;
            DataStream greenStream = ...;
            orangeStream
                .keyBy()
                .intervalJoin(greenStream.keyBy())
                .between(Time.milliseconds(-2), Time.milliseconds(1))
                .process (new ProcessJoinFunction out) {
                        out.collect(first + "," + second);
                   }
                });