状态在Flink中叫做State,用来保存中间计算结果或者缓存数据。要做到比较好的状态管理,需要考虑以下几点内容:
在Task内部,如何高效地保存状态数据和使用状态数据。
作业失败是无法避免的,那么就要考虑如何高效地将状态数据保存下来,避免状态备份降低集群的吞吐量,并且在Failover时恢复作业到失败前的状态。
作业在集群内并行执行那么就要思考对于作业的Task而言如何使用统一的方式对状态数据进行切分,在作业修改并行度导致Task数据改变的时候,如何确保正确地恢复。
State按照是否有Key划分KeyedState和OperatorState两种。按照数据结构不同,flink定义了多种state,分别应用于不同的场景,具体实现如下:ValueState、ListState、MapState、ReducingState、AggregatingState。
ValueState: 保存一个可以更新和检索的值(如上所述,每个值都对应到当前的输入数据的 key,因此算子接收到的每个 key 都可能对应一个值)。 这个值可以通过 update(T) 进行更新,通过 T value() 进行检索。
ListState: 保存一个元素的列表。可以往这个列表中追加数据,并在当前的列表上进行检索。可以通过 add(T) 或者 addAll(List) 进行添加元素,通过 Iterable get() 获得整个列表。还可以通过 update(List) 覆盖当前的列表。
ReducingState: 保存一个单值,表示添加到状态的所有值的聚合。接口与 ListState 类似,但使用 add(T) 增加元素,会使用提供的 ReduceFunction 进行聚合。
AggregatingState
MapState
更多代码示例请下载Flink State体系剖析以及案例实践
代码如下:
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* 需求:当接收到的相同 key 的元素个数等于 3个,就计算这些元素的 value 的平均值。
* 计算keyed stream中每3个元素的 value 的平均值
*/
public class TestKeyedStateMain {
public static void main(String[] args) throws Exception{
//获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置并行度
env.setParallelism(12);
//获取数据源
DataStreamSource> dataStreamSource =
env.fromElements(
Tuple2.of(1L, 3L),
Tuple2.of(1L, 7L),
Tuple2.of(2L, 4L),
Tuple2.of(1L, 5L),
Tuple2.of(2L, 2L),
Tuple2.of(2L, 6L));
/**
* 1L, 3L
* 1L, 7L
* 1L, 5L
*
* 1L,5.0 double
*
* 2L, 4L
* 2L, 2L
* 2L, 6L
*
* 2L,4.0 double
*
*
*/
// 输出:
//(1,5.0)
//(2,4.0)
dataStreamSource
.keyBy(tuple -> tuple.f0) //分组
.flatMap(new CountAverageWithListState())
.print();
env.execute("TestStatefulApi");
}
}
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import org.apache.flink.util.Collector;
import java.util.Collections;
import java.util.List;
/**
* ListState :这个状态为每一个 key 保存集合的值
* get() 获取状态值
* add() / addAll() 更新状态值,将数据放到状态中
* clear() 清除状态
*/
public class CountAverageWithListState
extends RichFlatMapFunction, Tuple2> {
// managed keyed state
/**
* ValueState : 里面只能存一条元素
* ListState : 里面可以存很多数据
*/
private ListState> elementsByKey;
@Override
public void open(Configuration parameters) throws Exception {
// 注册状态
ListStateDescriptor> descriptor =
new ListStateDescriptor>(
"average", // 状态的名字
Types.TUPLE(Types.LONG, Types.LONG)); // 状态存储的数据类型
elementsByKey = getRuntimeContext().getListState(descriptor);
}
@Override
public void flatMap(Tuple2 element,
Collector> out) throws Exception {
// 拿到当前的 key 的状态值
Iterable> currentState = elementsByKey.get();
// 如果状态值还没有初始化,则初始化
if (currentState == null) {
elementsByKey.addAll(Collections.emptyList());
}
// 更新状态
elementsByKey.add(element);
// 判断,如果当前的 key 出现了 3 次,则需要计算平均值,并且输出
List> allElements = Lists.newArrayList(elementsByKey.get());
if (allElements.size() == 3) {
long count = 0;
long sum = 0;
for (Tuple2 ele : allElements) {
count++;
sum += ele.f1;
}
double avg = (double) sum / count;
out.collect(Tuple2.of(element.f0, avg));
// 清除状态
elementsByKey.clear();
}
}
}
import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.state.MapState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.shaded.guava18.com.google.common.collect.Lists; import org.apache.flink.util.Collector; import java.util.List; import java.util.UUID; /** * MapState:这个状态为每一个 key 保存一个 Map 集合 * put() 将对应的 key 的键值对放到状态中 * values() 拿到 MapState 中所有的 value * clear() 清除状态 */ public class CountAverageWithMapState extends RichFlatMapFunction , Tuple2 > { // managed keyed state //1. MapState :key 是一个唯一的值,value 是接收到的相同的 key 对应的 value 的值 /** * MapState: * Map集合的特点,相同key,会覆盖数据。 */ private MapState mapState; @Override public void open(Configuration parameters) throws Exception { // 注册状态 MapStateDescriptor descriptor = new MapStateDescriptor ( "average", // 状态的名字 String.class, Long.class); // 状态存储的数据类型 mapState = getRuntimeContext().getMapState(descriptor); } /** * * @param element * @param out * @throws Exception */ @Override public void flatMap(Tuple2 element, Collector > out) throws Exception { mapState.put(UUID.randomUUID().toString(), element.f1); //list // 判断,如果当前的 key 出现了 3 次,则需要计算平均值,并且输出 List allElements = Lists.newArrayList(mapState.values()); if (allElements.size() == 3) { long count = 0; long sum = 0; for (Long ele : allElements) { count++; sum += ele; } double avg = (double) sum / count; // out.collect(Tuple2.of(element.f0, avg)); // 清除状态 mapState.clear(); } } }
state 的数值总是与一个 current key 对应。
backend 有 on-heap 和 off-heap(RocksDB)的多种实现。
snapshot 和 restore 方法;而 keyed state 则由 backend 自行实现,对用户透明。
相对比较大的。需要注意的是,这是一个经验判断,不是一个绝对的判断区分标准。
更多内容和代码示例请下载Flink State体系剖析以及案例实践