在 Flink 中,StreamGraph 是数据流的逻辑表示,它描述了如何在 Flink 作业中执行数据流转换。StreamGraph 是 Flink 运行时生成执行计划的基础。
使用DataStream API开发的应用程序,首先被转换为 Transformation,再被映射为StreamGraph,在客户端进行StreamGraph、JobGraph的转换,提交JobGraph到Flink集群后,Flink集群负责将JobGraph转换为ExecutionGraph,之后进入调度执行阶段。
StreamNode 是 StremGraph 中的节点 ,从 Transformation 转换而来,可以简单理解为一个 StreamNode 表示一个算子,从逻辑上来说,SteramNode 在 StreamGraph 中存在实体和虚拟的 StreamNode。StremNode 可以有多个输入,也可以有多个输出。
实体的 StreamNode 会最终变成物理算子。虚拟的 StreamNode 会附着在 StreamEdge 上。
StreamEdge 是 StreamGraph 中的边,用来连接两个 StreamNode,一个 StreamNode 可以有多个出边、入边,StreamEdge 中包含了旁路输出、分区器、字段筛选输出等信息。
StreamGraph 在 FlinkClient 中生成,由 FlinkClient 在提交的时候触发 Flink 应用的 main 方法,用户编写的业务逻辑组装成 Transformation 流水线,在最后调用 StreamExecutionEnvironment.execute() 的时候开始触发 StreamGraph 构建。
StreamGraph在Flink的作业提交前生成,生成StreamGraph的入口在StreamExecutionEnvironment中
@Internal public StreamGraph getStreamGraph() { return this.getStreamGraph(this.getJobName()); } @Internal public StreamGraph getStreamGraph(String jobName) { return this.getStreamGraph(jobName, true); } @Internal public StreamGraph getStreamGraph(String jobName, boolean clearTransformations) { StreamGraph streamGraph = this.getStreamGraphGenerator().setJobName(jobName).generate(); if (clearTransformations) { this.transformations.clear(); } return streamGraph; } private StreamGraphGenerator getStreamGraphGenerator() { if (this.transformations.size() <= 0) { throw new IllegalStateException("No operators defined in streaming topology. Cannot execute."); } else { RuntimeExecutionMode executionMode = (RuntimeExecutionMode)this.configuration.get(ExecutionOptions.RUNTIME_MODE); return (new StreamGraphGenerator(this.transformations, this.config, this.checkpointCfg, this.getConfiguration())).setRuntimeExecutionMode(executionMode).setStateBackend(this.defaultStateBackend).setChaining(this.isChainingEnabled).setUserArtifacts(this.cacheFile).setTimeCharacteristic(this.timeCharacteristic).setDefaultBufferTimeout(this.bufferTimeout); } }
StreamGraph实际上是在StreamGraphGenerator中生成的,从SinkTransformation(输出向前追溯到SourceTransformation)。在遍历过程中一边遍历一遍构建StreamGraph,如代码清单所示
@Internal public class StreamGraphGenerator { private final List> transformations; private StateBackend stateBackend; private static final Map , TransformationTranslator, ? extends Transformation>> translatorMap; protected static Integer iterationIdCounter; private StreamGraph streamGraph; private Map , Collection > alreadyTransformed; public StreamGraphGenerator(List > transformations, ExecutionConfig executionConfig, CheckpointConfig checkpointConfig) { this(transformations, executionConfig, checkpointConfig, new Configuration()); } public StreamGraphGenerator(List > transformations, ExecutionConfig executionConfig, CheckpointConfig checkpointConfig, ReadableConfig configuration) { this.chaining = true; this.timeCharacteristic = DEFAULT_TIME_CHARACTERISTIC; this.jobName = "Flink Streaming Job"; this.savepointRestoreSettings = SavepointRestoreSettings.none(); this.defaultBufferTimeout = -1L; this.runtimeExecutionMode = RuntimeExecutionMode.STREAMING; this.transformations = (List)Preconditions.checkNotNull(transformations); this.executionConfig = (ExecutionConfig)Preconditions.checkNotNull(executionConfig); this.checkpointConfig = new CheckpointConfig(checkpointConfig); this.configuration = (ReadableConfig)Preconditions.checkNotNull(configuration); } public StreamGraph generate() { this.streamGraph = new StreamGraph(this.executionConfig, this.checkpointConfig, this.savepointRestoreSettings); this.shouldExecuteInBatchMode = this.shouldExecuteInBatchMode(this.runtimeExecutionMode); this.configureStreamGraph(this.streamGraph); this.alreadyTransformed = new HashMap(); Iterator var1 = this.transformations.iterator(); while(var1.hasNext()) { Transformation> transformation = (Transformation)var1.next(); this.transform(transformation); } StreamGraph builtStreamGraph = this.streamGraph; this.alreadyTransformed.clear(); this.alreadyTransformed = null; this.streamGraph = null; return builtStreamGraph; } private Collection transform(Transformation> transform) { if (this.alreadyTransformed.containsKey(transform)) { return (Collection)this.alreadyTransformed.get(transform); } else { LOG.debug("Transforming " + transform); if (transform.getMaxParallelism() <= 0) { int globalMaxParallelismFromConfig = this.executionConfig.getMaxParallelism(); if (globalMaxParallelismFromConfig > 0) { transform.setMaxParallelism(globalMaxParallelismFromConfig); } } transform.getOutputType(); TransformationTranslator, Transformation>> translator = (TransformationTranslator)translatorMap.get(transform.getClass()); Collection transformedIds; if (translator != null) { transformedIds = this.translate(translator, transform); } else { transformedIds = this.legacyTransform(transform); } if (!this.alreadyTransformed.containsKey(transform)) { this.alreadyTransformed.put(transform, transformedIds); } return transformedIds; } } private Collection legacyTransform(Transformation> transform) { Collection transformedIds; if (transform instanceof FeedbackTransformation) { transformedIds = this.transformFeedback((FeedbackTransformation)transform); } else { if (!(transform instanceof CoFeedbackTransformation)) { throw new IllegalStateException("Unknown transformation: " + transform); } transformedIds = this.transformCoFeedback((CoFeedbackTransformation)transform); } if (transform.getBufferTimeout() >= 0L) { this.streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout()); } else { this.streamGraph.setBufferTimeout(transform.getId(), this.defaultBufferTimeout); } if (transform.getUid() != null) { this.streamGraph.setTransformationUID(transform.getId(), transform.getUid()); } if (transform.getUserProvidedNodeHash() != null) { this.streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash()); } if (!this.streamGraph.getExecutionConfig().hasAutoGeneratedUIDsEnabled() && transform instanceof PhysicalTransformation && transform.getUserProvidedNodeHash() == null && transform.getUid() == null) { throw new IllegalStateException("Auto generated UIDs have been disabled but no UID or hash has been assigned to operator " + transform.getName()); } else { if (transform.getMinResources() != null && transform.getPreferredResources() != null) { this.streamGraph.setResources(transform.getId(), transform.getMinResources(), transform.getPreferredResources()); } this.streamGraph.setManagedMemoryUseCaseWeights(transform.getId(), transform.getManagedMemoryOperatorScopeUseCaseWeights(), transform.getManagedMemorySlotScopeUseCases()); return transformedIds; } } private Collection transformFeedback(FeedbackTransformation iterate) { if (this.shouldExecuteInBatchMode) { throw new UnsupportedOperationException("Iterations are not supported in BATCH execution mode. If you want to execute such a pipeline, please set the '" + ExecutionOptions.RUNTIME_MODE.key() + "'=" + RuntimeExecutionMode.STREAMING.name()); } else if (iterate.getFeedbackEdges().size() <= 0) { throw new IllegalStateException("Iteration " + iterate + " does not have any feedback edges."); } else { List > inputs = iterate.getInputs(); Preconditions.checkState(inputs.size() == 1); Transformation> input = (Transformation)inputs.get(0); List resultIds = new ArrayList(); Collection inputIds = this.transform(input); resultIds.addAll(inputIds); if (this.alreadyTransformed.containsKey(iterate)) { return (Collection)this.alreadyTransformed.get(iterate); } else { Tuple2 itSourceAndSink = this.streamGraph.createIterationSourceAndSink(iterate.getId(), getNewIterationNodeId(), getNewIterationNodeId(), iterate.getWaitTime(), iterate.getParallelism(), iterate.getMaxParallelism(), iterate.getMinResources(), iterate.getPreferredResources()); StreamNode itSource = (StreamNode)itSourceAndSink.f0; StreamNode itSink = (StreamNode)itSourceAndSink.f1; this.streamGraph.setSerializers(itSource.getId(), (TypeSerializer)null, (TypeSerializer)null, iterate.getOutputType().createSerializer(this.executionConfig)); this.streamGraph.setSerializers(itSink.getId(), iterate.getOutputType().createSerializer(this.executionConfig), (TypeSerializer)null, (TypeSerializer)null); resultIds.add(itSource.getId()); this.alreadyTransformed.put(iterate, resultIds); List allFeedbackIds = new ArrayList(); Iterator var10 = iterate.getFeedbackEdges().iterator(); while(var10.hasNext()) { Transformation feedbackEdge = (Transformation)var10.next(); Collection feedbackIds = this.transform(feedbackEdge); allFeedbackIds.addAll(feedbackIds); Iterator var13 = feedbackIds.iterator(); while(var13.hasNext()) { Integer feedbackId = (Integer)var13.next(); this.streamGraph.addEdge(feedbackId, itSink.getId(), 0); } } String slotSharingGroup = this.determineSlotSharingGroup((String)null, allFeedbackIds); if (slotSharingGroup == null) { slotSharingGroup = "SlotSharingGroup-" + iterate.getId(); } itSink.setSlotSharingGroup(slotSharingGroup); itSource.setSlotSharingGroup(slotSharingGroup); return resultIds; } } } private Collection transformCoFeedback(CoFeedbackTransformation coIterate) { if (this.shouldExecuteInBatchMode) { throw new UnsupportedOperationException("Iterations are not supported in BATCH execution mode. If you want to execute such a pipeline, please set the '" + ExecutionOptions.RUNTIME_MODE.key() + "'=" + RuntimeExecutionMode.STREAMING.name()); } else { Tuple2 itSourceAndSink = this.streamGraph.createIterationSourceAndSink(coIterate.getId(), getNewIterationNodeId(), getNewIterationNodeId(), coIterate.getWaitTime(), coIterate.getParallelism(), coIterate.getMaxParallelism(), coIterate.getMinResources(), coIterate.getPreferredResources()); StreamNode itSource = (StreamNode)itSourceAndSink.f0; StreamNode itSink = (StreamNode)itSourceAndSink.f1; this.streamGraph.setSerializers(itSource.getId(), (TypeSerializer)null, (TypeSerializer)null, coIterate.getOutputType().createSerializer(this.executionConfig)); this.streamGraph.setSerializers(itSink.getId(), coIterate.getOutputType().createSerializer(this.executionConfig), (TypeSerializer)null, (TypeSerializer)null); Collection resultIds = Collections.singleton(itSource.getId()); this.alreadyTransformed.put(coIterate, resultIds); List allFeedbackIds = new ArrayList(); Iterator var7 = coIterate.getFeedbackEdges().iterator(); while(var7.hasNext()) { Transformation feedbackEdge = (Transformation)var7.next(); Collection feedbackIds = this.transform(feedbackEdge); allFeedbackIds.addAll(feedbackIds); Iterator var10 = feedbackIds.iterator(); while(var10.hasNext()) { Integer feedbackId = (Integer)var10.next(); this.streamGraph.addEdge(feedbackId, itSink.getId(), 0); } } String slotSharingGroup = this.determineSlotSharingGroup((String)null, allFeedbackIds); itSink.setSlotSharingGroup(slotSharingGroup); itSource.setSlotSharingGroup(slotSharingGroup); return Collections.singleton(itSource.getId()); } } private Collection translate(TransformationTranslator, Transformation>> translator, Transformation> transform) { Preconditions.checkNotNull(translator); Preconditions.checkNotNull(transform); List > allInputIds = this.getParentInputIds(transform.getInputs()); if (this.alreadyTransformed.containsKey(transform)) { return (Collection)this.alreadyTransformed.get(transform); } else { String slotSharingGroup = this.determineSlotSharingGroup(transform.getSlotSharingGroup(), (Collection)allInputIds.stream().flatMap(Collection::stream).collect(Collectors.toList())); Context context = new StreamGraphGenerator.ContextImpl(this, this.streamGraph, slotSharingGroup, this.configuration); return this.shouldExecuteInBatchMode ? translator.translateForBatch(transform, context) : translator.translateForStreaming(transform, context); } } private List > getParentInputIds(@Nullable Collection > parentTransformations) { List > allInputIds = new ArrayList(); if (parentTransformations == null) { return allInputIds; } else { Iterator var3 = parentTransformations.iterator(); while(var3.hasNext()) { Transformation> transformation = (Transformation)var3.next(); allInputIds.add(this.transform(transformation)); } return allInputIds; } } private String determineSlotSharingGroup(String specifiedGroup, Collection inputIds) { if (specifiedGroup != null) { return specifiedGroup; } else { String inputGroup = null; Iterator var4 = inputIds.iterator(); while(var4.hasNext()) { int id = (Integer)var4.next(); String inputGroupCandidate = this.streamGraph.getSlotSharingGroup(id); if (inputGroup == null) { inputGroup = inputGroupCandidate; } else if (!inputGroup.equals(inputGroupCandidate)) { return "default"; } } return inputGroup == null ? "default" : inputGroup; } } static { DEFAULT_TIME_CHARACTERISTIC = TimeCharacteristic.ProcessingTime; Map , TransformationTranslator, ? extends Transformation>> tmp = new HashMap(); tmp.put(OneInputTransformation.class, new OneInputTransformationTranslator()); tmp.put(TwoInputTransformation.class, new TwoInputTransformationTranslator()); tmp.put(MultipleInputTransformation.class, new MultiInputTransformationTranslator()); tmp.put(KeyedMultipleInputTransformation.class, new MultiInputTransformationTranslator()); tmp.put(SourceTransformation.class, new SourceTransformationTranslator()); tmp.put(SinkTransformation.class, new SinkTransformationTranslator()); tmp.put(LegacySinkTransformation.class, new LegacySinkTransformationTranslator()); tmp.put(LegacySourceTransformation.class, new LegacySourceTransformationTranslator()); tmp.put(UnionTransformation.class, new UnionTransformationTranslator()); tmp.put(PartitionTransformation.class, new PartitionTransformationTranslator()); tmp.put(SideOutputTransformation.class, new SideOutputTransformationTranslator()); tmp.put(ReduceTransformation.class, new ReduceTransformationTranslator()); tmp.put(TimestampsAndWatermarksTransformation.class, new TimestampsAndWatermarksTransformationTranslator()); tmp.put(BroadcastStateTransformation.class, new BroadcastStateTransformationTranslator()); tmp.put(KeyedBroadcastStateTransformation.class, new KeyedBroadcastStateTransformationTranslator()); translatorMap = Collections.unmodifiableMap(tmp); iterationIdCounter = 0; } }