相关推荐recommended
Flink State 状态管理
作者:mmseoamin日期:2024-04-27

文章目录

  • 前言
  • 一、状态分类
  • 二、keyed代码示例
    • ListState
    • MapState
    • 总结

      前言

      状态在Flink中叫做State,用来保存中间计算结果或者缓存数据。要做到比较好的状态管理,需要考虑以下几点内容:

      • 状态数据的存储和访问

        在Task内部,如何高效地保存状态数据和使用状态数据。

      • 状态数据的备份和恢复

        作业失败是无法避免的,那么就要考虑如何高效地将状态数据保存下来,避免状态备份降低集群的吞吐量,并且在Failover时恢复作业到失败前的状态。

      • 状态数据的划分和动态扩容

        作业在集群内并行执行那么就要思考对于作业的Task而言如何使用统一的方式对状态数据进行切分,在作业修改并行度导致Task数据改变的时候,如何确保正确地恢复。


        一、状态分类

        State按照是否有Key划分KeyedState和OperatorState两种。按照数据结构不同,flink定义了多种state,分别应用于不同的场景,具体实现如下:ValueState、ListState、MapState、ReducingState、AggregatingState。

        • ValueState: 保存一个可以更新和检索的值(如上所述,每个值都对应到当前的输入数据的 key,因此算子接收到的每个 key 都可能对应一个值)。 这个值可以通过 update(T) 进行更新,通过 T value() 进行检索。

        • ListState: 保存一个元素的列表。可以往这个列表中追加数据,并在当前的列表上进行检索。可以通过 add(T) 或者 addAll(List) 进行添加元素,通过 Iterable get() 获得整个列表。还可以通过 update(List) 覆盖当前的列表。

        • ReducingState: 保存一个单值,表示添加到状态的所有值的聚合。接口与 ListState 类似,但使用 add(T) 增加元素,会使用提供的 ReduceFunction 进行聚合。

        • AggregatingState: 保留一个单值,表示添加到状态的所有值的聚合。和 ReducingState 相反的是, 聚合类型可能与 添加到状态的元素的类型不同。 接口与 ListState 类似,但使用 add(IN) 添加的元素会用指定的 AggregateFunction 进行聚合。

        • MapState: 维护了一个映射列表。 你可以添加键值对到状态中,也可以获得反映当前所有映射的迭代器。使用 put(UK,UV) 或者 putAll(Map) 添加映射。 使用 get(UK) 检索特定 key。 使用 entries(),keys() 和 values() 分别检索映射、键和值的可迭代视图。你还可以通过 isEmpty() 来判断是否包含任何键值对。

          二、keyed代码示例

          更多代码示例请下载Flink State体系剖析以及案例实践

          ListState

          代码如下:

          import org.apache.flink.api.java.tuple.Tuple2;
          import org.apache.flink.streaming.api.datastream.DataStreamSource;
          import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
          /**
           * 需求:当接收到的相同 key 的元素个数等于 3个,就计算这些元素的 value 的平均值。
           * 计算keyed stream中每3个元素的 value 的平均值
           */
          public class TestKeyedStateMain {
              public static void main(String[] args) throws  Exception{
                  //获取执行环境
                  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                  //设置并行度
                  env.setParallelism(12);
                  //获取数据源
                  DataStreamSource> dataStreamSource =
                          env.fromElements(
                                  Tuple2.of(1L, 3L),
                                  Tuple2.of(1L, 7L),
                                  Tuple2.of(2L, 4L),
                                  Tuple2.of(1L, 5L),
                                  Tuple2.of(2L, 2L),
                                  Tuple2.of(2L, 6L));
                  /**
                   * 1L, 3L
                   * 1L, 7L
                   * 1L, 5L
                   *
                   * 1L,5.0 double
                   *
                   * 2L, 4L
                   * 2L, 2L
                   * 2L, 6L
                   *
                   * 2L,4.0 double
                   *
                   *
                   */
                  // 输出:
                  //(1,5.0)
                  //(2,4.0)
                  dataStreamSource
                          .keyBy(tuple -> tuple.f0) //分组
                          .flatMap(new CountAverageWithListState())
                          .print();
                  env.execute("TestStatefulApi");
              }
          }
          import org.apache.flink.api.common.functions.RichFlatMapFunction;
          import org.apache.flink.api.common.state.ListState;
          import org.apache.flink.api.common.state.ListStateDescriptor;
          import org.apache.flink.api.common.typeinfo.Types;
          import org.apache.flink.api.java.tuple.Tuple2;
          import org.apache.flink.configuration.Configuration;
          import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
          import org.apache.flink.util.Collector;
          import java.util.Collections;
          import java.util.List;
          /**
           *  ListState :这个状态为每一个 key 保存集合的值
           *      get() 获取状态值
           *      add() / addAll() 更新状态值,将数据放到状态中
           *      clear() 清除状态
           */
          public class CountAverageWithListState
                  extends RichFlatMapFunction, Tuple2> {
              // managed keyed state
              /**
               * ValueState : 里面只能存一条元素
               * ListState : 里面可以存很多数据
               */
              private ListState> elementsByKey;
              @Override
              public void open(Configuration parameters) throws Exception {
                  // 注册状态
                  ListStateDescriptor> descriptor =
                          new ListStateDescriptor>(
                                  "average",  // 状态的名字
                                  Types.TUPLE(Types.LONG, Types.LONG)); // 状态存储的数据类型
                  elementsByKey = getRuntimeContext().getListState(descriptor);
              }
              @Override
              public void flatMap(Tuple2 element,
                                  Collector> out) throws Exception {
                  // 拿到当前的 key 的状态值
                  Iterable> currentState = elementsByKey.get();
                  // 如果状态值还没有初始化,则初始化
                  if (currentState == null) {
                      elementsByKey.addAll(Collections.emptyList());
                  }
                  // 更新状态
                  elementsByKey.add(element);
                  // 判断,如果当前的 key 出现了 3 次,则需要计算平均值,并且输出
                  List> allElements = Lists.newArrayList(elementsByKey.get());
                  if (allElements.size() == 3) {
                      long count = 0;
                      long sum = 0;
                      for (Tuple2 ele : allElements) {
                          count++;
                          sum += ele.f1;
                      }
                      double avg = (double) sum / count;
                      out.collect(Tuple2.of(element.f0, avg));
                      // 清除状态
                      elementsByKey.clear();
                  }
              }
          }
          

          MapState

          import org.apache.flink.api.common.functions.RichFlatMapFunction;
          import org.apache.flink.api.common.state.MapState;
          import org.apache.flink.api.common.state.MapStateDescriptor;
          import org.apache.flink.api.java.tuple.Tuple2;
          import org.apache.flink.configuration.Configuration;
          import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
          import org.apache.flink.util.Collector;
          import java.util.List;
          import java.util.UUID;
          /**
           *  MapState :这个状态为每一个 key 保存一个 Map 集合
           *      put() 将对应的 key 的键值对放到状态中
           *      values() 拿到 MapState 中所有的 value
           *      clear() 清除状态
           */
          public class CountAverageWithMapState
                  extends RichFlatMapFunction, Tuple2> {
              // managed keyed state
              //1. MapState :key 是一个唯一的值,value 是接收到的相同的 key 对应的 value 的值
              /**
               * MapState:
               *      Map集合的特点,相同key,会覆盖数据。
               */
              private MapState mapState;
              @Override
              public void open(Configuration parameters) throws Exception {
                  // 注册状态
                  MapStateDescriptor descriptor =
                          new MapStateDescriptor(
                                  "average",  // 状态的名字
                                  String.class, Long.class); // 状态存储的数据类型
                  mapState = getRuntimeContext().getMapState(descriptor);
              }
              /**
               *
               * @param element
               * @param out
               * @throws Exception
               */
              @Override
              public void flatMap(Tuple2 element,
                                  Collector> out) throws Exception {
                  mapState.put(UUID.randomUUID().toString(), element.f1); //list
                  // 判断,如果当前的 key 出现了 3 次,则需要计算平均值,并且输出
                  List allElements = Lists.newArrayList(mapState.values());
                  if (allElements.size() == 3) {
                      long count = 0;
                      long sum = 0;
                      for (Long ele : allElements) {
                          count++;
                          sum += ele;
                      }
                      double avg = (double) sum / count;
                      //
                      out.collect(Tuple2.of(element.f0, avg));
                      // 清除状态
                      mapState.clear();
                  }
              }
          }
          

          总结

          1. 是否存在当前处理的 key(current key):operator state 是没有当前 key 的概念,而 keyed

            state 的数值总是与一个 current key 对应。

          2. 存储对象是否 on heap: 目前 operator state backend 仅有一种 on-heap 的实现;而 keyed state

            backend 有 on-heap 和 off-heap(RocksDB)的多种实现。

          3. 是否需要手动声明快照(snapshot)和恢复 (restore) 方法:operator state 需要手动实现

            snapshot 和 restore 方法;而 keyed state 则由 backend 自行实现,对用户透明。

          4. 数据大小:一般而言,我们认为 operator state 的数据规模是比较小的;认为 keyed state 规模是

            相对比较大的。需要注意的是,这是一个经验判断,不是一个绝对的判断区分标准。

            更多内容和代码示例请下载Flink State体系剖析以及案例实践