相关推荐recommended
Flink:流式 Join 类型分类 盘点 (一)
作者:mmseoamin日期:2024-03-20
Flink:流式 Join 类型分类 盘点 (一),《大数据平台架构与原型实现:数据中台建设实战》,第1张博主历时三年精心创作的《大数据平台架构与原型实现:数据中台建设实战》一书现已由知名IT图书品牌电子工业出版社博文视点出版发行,点击《重磅推荐:建大数据平台太难了!给我发个工程原型吧!》了解图书详情,京东购书链接:https://item.jd.com/12677623.html,扫描左侧二维码进入京东手机购书页面。

文章目录

  • 1. Regular Join(常规 Join):
  • 2. Interval Join(时间区间 Join)
  • 3. Temporal Join (版本表 Join)
    • 3.1. 基于事件时间的 Temporal Join
    • 3.2. 基于处理时间的 Temporal Join

      在Flink中,实现流之间连接的操作可以分为两类。第一类是基于原生State状态存储的Connect算子操作,这种方式可以实现低延迟的数据连接和转换;第二类则是基于窗口的JOIN操作,这种方式又可以细分为window join和interval join两种,通过对数据进行时间窗口和滑动窗口的划分,实现不同粒度的数据关联和计算。

      1. Regular Join(常规 Join):

      从 SQL 上看,它只是一条普通的 SQL,和批处理的 SQL 无异:

      SELECT * FROM Orders
      INNER JOIN Product
      ON Orders.productId = Product.id
      

      在为了维持常规 Join 结果的准确性,不难判断的是:Flink 需要将 Join 输入的两边数据永远保持在状态中,所以,计算查询结果所需的状态可能会无限增长,对于长时间运行的大数据量的流来说,这种 Join 的代价是负担不起的。当然,我们可以通过配置状态的 TTL 来缓解这一问题,但这可能会导致结果不准确。总得来说就是:在流上,常规 Join 是可用的,但要慎用。

      Regular Join 又会细分为 INNER Equi-JOIN 和 OUTER Equi-JOIN,具体参考文档,此处不再赘述

      2. Interval Join(时间区间 Join)

      Regular Join 的条件太宽松,导致 Join 成本巨大,Interval Join 会添加一个时间范围限制,让流上仅处于指定时间区间内的数据参与 Join。

      SELECT *
      FROM Orders o, Shipments s
      WHERE o.id = s.order_id
      AND o.order_time BETWEEN s.ship_time - INTERVAL '4' HOUR AND s.ship_time
      

      上述 SQL 是一个典型的 Interval Join,它试图关联订单和它的发货记录以便获得更多信息,如果业务上保证:下单之后 4 小时以内即可发货,那上述 SQL 就能保证 order 和它的 shipment 可以关联上。我们看到,从 SQL 上来说,Interval Join 区别于 Regular Join 的地方就是:它在 Regular Join 的基础上又追加了时间范围条件,这就大大地减轻了维持 Join 状态数据的负担。以下是一些典型的 Interval Join 条件:

      • ltime = rtime
      • ltime >= rtime AND ltime < rtime + INTERVAL '10' MINUTE
      • ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND

        3. Temporal Join (版本表 Join)

        Temporal Join 并没有对应一个精准的中文称为,但别简单地把它称为 Temporal Table Join,因为它 Join 的是 Temporal Table (时态表)中的版本表,如果要精准描述的话,应该说是:版本表 Join。

        要了解 Temporal Join 必须得先了解很么是 Temporal Table (时态表),对此,请参考 《关于 动态表 / 时态表 / 版本表 概念的澄清》一文,本文就不再解释了。Temporal Join 就是 join 了一张版本表,那这到底有何不同呢?我们知道,既然版本表中一条记录在不同时刻可能会有不同的值(版本),那这就会引申出一个问题:当我们 join 一张版本表时,应该 join 一条记录的哪个版本呢?如果没有特别配置,那么默认行为自然是应该 join 当前的最新值(版本),那有没有需要 join 过去某个时间点的值(版本)的场景呢?有!并且有很多!官方文档给出的就是一个典型的例子:对于一张订单,我们总是应该参照下单时的汇率表去转换为一种统一货币的总价,这就需要订单表去 Join 汇率表在下单时刻的那个版本值!

        3.1. 基于事件时间的 Temporal Join

        为了便于描述,我们按官方文档的介绍,让版本表作为被关联表,用”右表“指代,把主动需要关联的表称为”左表“。既然 Temporal Join 关联的右表是版本表,则关联的一方,也就是版本表必然已经定义了事件时间属性,如果关联的另一张表,也就是”左表“,也定义了事件时间属性(通过 Wartermark),且在 Join 时通过 FOR SYSTEM_TIME AS OF 关键字指定了左表上的这个事件时间属性,那么,这就是一个”基于事件时间的 Temporal Join“,以下是一个示例:

        -- 左表:orders, 注意 orders 表也定义了事件时间列:order_time
        CREATE TABLE orders (
            order_id    STRING,
            price       DECIMAL(32,2),
            currency    STRING,
            order_time  TIMESTAMP(3),
            WATERMARK FOR order_time AS order_time - INTERVAL '15' SECOND
        ) WITH (/* ... */);
        -- 右表:currency_rates 是一张版本表,因为它定义了主键和事件时间
        -- 这种表的数据通常来自 CDC 数据,也就是 数据库的 changelog
        -- 显然,这里使用的是存放在kafka中的debezium-json格式的 changelog 数据 
        CREATE TABLE currency_rates (
            currency STRING,
            conversion_rate DECIMAL(32, 2),
            update_time TIMESTAMP(3) METADATA FROM `values.source.timestamp` VIRTUAL,
            WATERMARK FOR update_time AS update_time - INTERVAL '15' SECOND,
            PRIMARY KEY(currency) NOT ENFORCED
        ) WITH (
            'connector' = 'kafka',
            'value.format' = 'debezium-json',
           /* ... */
        );
        SELECT 
             order_id,
             price,
             orders.currency,
             conversion_rate,
             order_time
        FROM orders
        -- 关键字:FOR SYSTEM_TIME AS OF 用于指定左表中的一个时间类型的字段,Flink会根据这个时间和
        -- 版本表上的指定的事件时间字段(即 currency_rates.update_time)进行比对,找到对应版本的记录
        -- 与之进行关联。这里“对应版本”的逻辑应该是:在order_time这个时刻,currency_rates 所对应着的
        -- 当时版本的记录值
        LEFT JOIN currency_rates FOR SYSTEM_TIME AS OF orders.order_time
        ON orders.currency = currency_rates.currency;
        

        在上面这个基于事件时间的 Temporal Join 中,最核心的一个逻辑在于:在按汇率计算一个 order 的总价时,到底是读的 currency_rates 版本表中的哪一个版本值?我们假设: 一个 order 的 order_time 是 13:42,currency_rates 对应货币的汇率在 13:40 和 13:45各有一个版本(假设每5分钟更新一次),分别是 6.88 和 6.89,若当前时间 是13:46 分,则这个 order join 的哪一个汇率呢?显然是 6.88。

        从 基于事件时间的 Temporal Join 的行为特征上不难看出:对于正在实时 Join 的两个流来说,如我们需要一张表总是 Join 其记录所代表的事件在发生的当时另一张表上当时的数据,此时就应该使用 “基于事件时间的 Temporal Join”,简单总结一下的话可以说是:当时对当时,这应该符合大多数流式的 Join 需求。

        3.2. 基于处理时间的 Temporal Join

        基于处理时间的 Temporal Join常常用在使用外部系统来丰富流的数据,典型的例子是:维表 Join。

        从 基于处理时间的 Temporal Join 的行为特征上不难看出:对于正在实时 Join 的两个流来说,如我们需要一张表总是 Join 另一张表上当前的最新数据,此时就应该使用 “基于处理时间的 Temporal Join”,简单总结一下的话可以说是:当时对现在,维表 Join 通常是此类情形的典型代表(通常维表的变化是很缓慢的)