Flink StreamGraph生成过程
作者:mmseoamin日期:2024-03-20

文章目录

    • 概要
    • SteramGraph 核心对象
    • SteramGraph 生成过程

      概要

      在 Flink 中,StreamGraph 是数据流的逻辑表示,它描述了如何在 Flink 作业中执行数据流转换。StreamGraph 是 Flink 运行时生成执行计划的基础。

      Flink StreamGraph生成过程,在这里插入图片描述,第1张

      使用DataStream API开发的应用程序,首先被转换为 Transformation,再被映射为StreamGraph,在客户端进行StreamGraph、JobGraph的转换,提交JobGraph到Flink集群后,Flink集群负责将JobGraph转换为ExecutionGraph,之后进入调度执行阶段。

      SteramGraph 核心对象

      • StreamNode

        StreamNode 是 StremGraph 中的节点 ,从 Transformation 转换而来,可以简单理解为一个 StreamNode 表示一个算子,从逻辑上来说,SteramNode 在 StreamGraph 中存在实体和虚拟的 StreamNode。StremNode 可以有多个输入,也可以有多个输出。

        实体的 StreamNode 会最终变成物理算子。虚拟的 StreamNode 会附着在 StreamEdge 上。

      • StreamEdge

        StreamEdge 是 StreamGraph 中的边,用来连接两个 StreamNode,一个 StreamNode 可以有多个出边、入边,StreamEdge 中包含了旁路输出、分区器、字段筛选输出等信息。

        SteramGraph 生成过程

        StreamGraph 在 FlinkClient 中生成,由 FlinkClient 在提交的时候触发 Flink 应用的 main 方法,用户编写的业务逻辑组装成 Transformation 流水线,在最后调用 StreamExecutionEnvironment.execute() 的时候开始触发 StreamGraph 构建。

        StreamGraph在Flink的作业提交前生成,生成StreamGraph的入口在StreamExecutionEnvironment中

            @Internal
            public StreamGraph getStreamGraph() {
                return this.getStreamGraph(this.getJobName());
            }
            @Internal
            public StreamGraph getStreamGraph(String jobName) {
                return this.getStreamGraph(jobName, true);
            }
            @Internal
            public StreamGraph getStreamGraph(String jobName, boolean clearTransformations) {
                StreamGraph streamGraph = this.getStreamGraphGenerator().setJobName(jobName).generate();
                if (clearTransformations) {
                    this.transformations.clear();
                }
                return streamGraph;
            }
            private StreamGraphGenerator getStreamGraphGenerator() {
                if (this.transformations.size() <= 0) {
                    throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");
                } else {
                    RuntimeExecutionMode executionMode = (RuntimeExecutionMode)this.configuration.get(ExecutionOptions.RUNTIME_MODE);
                    return (new StreamGraphGenerator(this.transformations, this.config, this.checkpointCfg, this.getConfiguration())).setRuntimeExecutionMode(executionMode).setStateBackend(this.defaultStateBackend).setChaining(this.isChainingEnabled).setUserArtifacts(this.cacheFile).setTimeCharacteristic(this.timeCharacteristic).setDefaultBufferTimeout(this.bufferTimeout);
                }
            }
        

        StreamGraph实际上是在StreamGraphGenerator中生成的,从SinkTransformation(输出向前追溯到SourceTransformation)。在遍历过程中一边遍历一遍构建StreamGraph,如代码清单所示

        @Internal
        public class StreamGraphGenerator {
            private final List> transformations;
            private StateBackend stateBackend;
            private static final Map, TransformationTranslator> translatorMap;
            protected static Integer iterationIdCounter;
            private StreamGraph streamGraph;
            private Map, Collection> alreadyTransformed;
            public StreamGraphGenerator(List> transformations, ExecutionConfig executionConfig, CheckpointConfig checkpointConfig) {
                this(transformations, executionConfig, checkpointConfig, new Configuration());
            }
            public StreamGraphGenerator(List> transformations, ExecutionConfig executionConfig, CheckpointConfig checkpointConfig, ReadableConfig configuration) {
                this.chaining = true;
                this.timeCharacteristic = DEFAULT_TIME_CHARACTERISTIC;
                this.jobName = "Flink Streaming Job";
                this.savepointRestoreSettings = SavepointRestoreSettings.none();
                this.defaultBufferTimeout = -1L;
                this.runtimeExecutionMode = RuntimeExecutionMode.STREAMING;
                this.transformations = (List)Preconditions.checkNotNull(transformations);
                this.executionConfig = (ExecutionConfig)Preconditions.checkNotNull(executionConfig);
                this.checkpointConfig = new CheckpointConfig(checkpointConfig);
                this.configuration = (ReadableConfig)Preconditions.checkNotNull(configuration);
            }
            public StreamGraph generate() {
                this.streamGraph = new StreamGraph(this.executionConfig, this.checkpointConfig, this.savepointRestoreSettings);
                this.shouldExecuteInBatchMode = this.shouldExecuteInBatchMode(this.runtimeExecutionMode);
                this.configureStreamGraph(this.streamGraph);
                this.alreadyTransformed = new HashMap();
                Iterator var1 = this.transformations.iterator();
                while(var1.hasNext()) {
                    Transformation transformation = (Transformation)var1.next();
                    this.transform(transformation);
                }
                StreamGraph builtStreamGraph = this.streamGraph;
                this.alreadyTransformed.clear();
                this.alreadyTransformed = null;
                this.streamGraph = null;
                return builtStreamGraph;
            }
         
            private Collection transform(Transformation transform) {
                if (this.alreadyTransformed.containsKey(transform)) {
                    return (Collection)this.alreadyTransformed.get(transform);
                } else {
                    LOG.debug("Transforming " + transform);
                    if (transform.getMaxParallelism() <= 0) {
                        int globalMaxParallelismFromConfig = this.executionConfig.getMaxParallelism();
                        if (globalMaxParallelismFromConfig > 0) {
                            transform.setMaxParallelism(globalMaxParallelismFromConfig);
                        }
                    }
                    transform.getOutputType();
                    TransformationTranslator> translator = (TransformationTranslator)translatorMap.get(transform.getClass());
                    Collection transformedIds;
                    if (translator != null) {
                        transformedIds = this.translate(translator, transform);
                    } else {
                        transformedIds = this.legacyTransform(transform);
                    }
                    if (!this.alreadyTransformed.containsKey(transform)) {
                        this.alreadyTransformed.put(transform, transformedIds);
                    }
                    return transformedIds;
                }
            }
            private Collection legacyTransform(Transformation transform) {
                Collection transformedIds;
                if (transform instanceof FeedbackTransformation) {
                    transformedIds = this.transformFeedback((FeedbackTransformation)transform);
                } else {
                    if (!(transform instanceof CoFeedbackTransformation)) {
                        throw new IllegalStateException("Unknown transformation: " + transform);
                    }
                    transformedIds = this.transformCoFeedback((CoFeedbackTransformation)transform);
                }
                if (transform.getBufferTimeout() >= 0L) {
                    this.streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout());
                } else {
                    this.streamGraph.setBufferTimeout(transform.getId(), this.defaultBufferTimeout);
                }
                if (transform.getUid() != null) {
                    this.streamGraph.setTransformationUID(transform.getId(), transform.getUid());
                }
                if (transform.getUserProvidedNodeHash() != null) {
                    this.streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash());
                }
                if (!this.streamGraph.getExecutionConfig().hasAutoGeneratedUIDsEnabled() && transform instanceof PhysicalTransformation && transform.getUserProvidedNodeHash() == null && transform.getUid() == null) {
                    throw new IllegalStateException("Auto generated UIDs have been disabled but no UID or hash has been assigned to operator " + transform.getName());
                } else {
                    if (transform.getMinResources() != null && transform.getPreferredResources() != null) {
                        this.streamGraph.setResources(transform.getId(), transform.getMinResources(), transform.getPreferredResources());
                    }
                    this.streamGraph.setManagedMemoryUseCaseWeights(transform.getId(), transform.getManagedMemoryOperatorScopeUseCaseWeights(), transform.getManagedMemorySlotScopeUseCases());
                    return transformedIds;
                }
            }
            private  Collection transformFeedback(FeedbackTransformation iterate) {
                if (this.shouldExecuteInBatchMode) {
                    throw new UnsupportedOperationException("Iterations are not supported in BATCH execution mode. If you want to execute such a pipeline, please set the '" + ExecutionOptions.RUNTIME_MODE.key() + "'=" + RuntimeExecutionMode.STREAMING.name());
                } else if (iterate.getFeedbackEdges().size() <= 0) {
                    throw new IllegalStateException("Iteration " + iterate + " does not have any feedback edges.");
                } else {
                    List> inputs = iterate.getInputs();
                    Preconditions.checkState(inputs.size() == 1);
                    Transformation input = (Transformation)inputs.get(0);
                    List resultIds = new ArrayList();
                    Collection inputIds = this.transform(input);
                    resultIds.addAll(inputIds);
                    if (this.alreadyTransformed.containsKey(iterate)) {
                        return (Collection)this.alreadyTransformed.get(iterate);
                    } else {
                        Tuple2 itSourceAndSink = this.streamGraph.createIterationSourceAndSink(iterate.getId(), getNewIterationNodeId(), getNewIterationNodeId(), iterate.getWaitTime(), iterate.getParallelism(), iterate.getMaxParallelism(), iterate.getMinResources(), iterate.getPreferredResources());
                        StreamNode itSource = (StreamNode)itSourceAndSink.f0;
                        StreamNode itSink = (StreamNode)itSourceAndSink.f1;
                        this.streamGraph.setSerializers(itSource.getId(), (TypeSerializer)null, (TypeSerializer)null, iterate.getOutputType().createSerializer(this.executionConfig));
                        this.streamGraph.setSerializers(itSink.getId(), iterate.getOutputType().createSerializer(this.executionConfig), (TypeSerializer)null, (TypeSerializer)null);
                        resultIds.add(itSource.getId());
                        this.alreadyTransformed.put(iterate, resultIds);
                        List allFeedbackIds = new ArrayList();
                        Iterator var10 = iterate.getFeedbackEdges().iterator();
                        while(var10.hasNext()) {
                            Transformation feedbackEdge = (Transformation)var10.next();
                            Collection feedbackIds = this.transform(feedbackEdge);
                            allFeedbackIds.addAll(feedbackIds);
                            Iterator var13 = feedbackIds.iterator();
                            while(var13.hasNext()) {
                                Integer feedbackId = (Integer)var13.next();
                                this.streamGraph.addEdge(feedbackId, itSink.getId(), 0);
                            }
                        }
                        String slotSharingGroup = this.determineSlotSharingGroup((String)null, allFeedbackIds);
                        if (slotSharingGroup == null) {
                            slotSharingGroup = "SlotSharingGroup-" + iterate.getId();
                        }
                        itSink.setSlotSharingGroup(slotSharingGroup);
                        itSource.setSlotSharingGroup(slotSharingGroup);
                        return resultIds;
                    }
                }
            }
            private  Collection transformCoFeedback(CoFeedbackTransformation coIterate) {
                if (this.shouldExecuteInBatchMode) {
                    throw new UnsupportedOperationException("Iterations are not supported in BATCH execution mode. If you want to execute such a pipeline, please set the '" + ExecutionOptions.RUNTIME_MODE.key() + "'=" + RuntimeExecutionMode.STREAMING.name());
                } else {
                    Tuple2 itSourceAndSink = this.streamGraph.createIterationSourceAndSink(coIterate.getId(), getNewIterationNodeId(), getNewIterationNodeId(), coIterate.getWaitTime(), coIterate.getParallelism(), coIterate.getMaxParallelism(), coIterate.getMinResources(), coIterate.getPreferredResources());
                    StreamNode itSource = (StreamNode)itSourceAndSink.f0;
                    StreamNode itSink = (StreamNode)itSourceAndSink.f1;
                    this.streamGraph.setSerializers(itSource.getId(), (TypeSerializer)null, (TypeSerializer)null, coIterate.getOutputType().createSerializer(this.executionConfig));
                    this.streamGraph.setSerializers(itSink.getId(), coIterate.getOutputType().createSerializer(this.executionConfig), (TypeSerializer)null, (TypeSerializer)null);
                    Collection resultIds = Collections.singleton(itSource.getId());
                    this.alreadyTransformed.put(coIterate, resultIds);
                    List allFeedbackIds = new ArrayList();
                    Iterator var7 = coIterate.getFeedbackEdges().iterator();
                    while(var7.hasNext()) {
                        Transformation feedbackEdge = (Transformation)var7.next();
                        Collection feedbackIds = this.transform(feedbackEdge);
                        allFeedbackIds.addAll(feedbackIds);
                        Iterator var10 = feedbackIds.iterator();
                        while(var10.hasNext()) {
                            Integer feedbackId = (Integer)var10.next();
                            this.streamGraph.addEdge(feedbackId, itSink.getId(), 0);
                        }
                    }
                    String slotSharingGroup = this.determineSlotSharingGroup((String)null, allFeedbackIds);
                    itSink.setSlotSharingGroup(slotSharingGroup);
                    itSource.setSlotSharingGroup(slotSharingGroup);
                    return Collections.singleton(itSource.getId());
                }
            }
            private Collection translate(TransformationTranslator> translator, Transformation transform) {
                Preconditions.checkNotNull(translator);
                Preconditions.checkNotNull(transform);
                List> allInputIds = this.getParentInputIds(transform.getInputs());
                if (this.alreadyTransformed.containsKey(transform)) {
                    return (Collection)this.alreadyTransformed.get(transform);
                } else {
                    String slotSharingGroup = this.determineSlotSharingGroup(transform.getSlotSharingGroup(), (Collection)allInputIds.stream().flatMap(Collection::stream).collect(Collectors.toList()));
                    Context context = new StreamGraphGenerator.ContextImpl(this, this.streamGraph, slotSharingGroup, this.configuration);
                    return this.shouldExecuteInBatchMode ? translator.translateForBatch(transform, context) : translator.translateForStreaming(transform, context);
                }
            }
            private List> getParentInputIds(@Nullable Collection> parentTransformations) {
                List> allInputIds = new ArrayList();
                if (parentTransformations == null) {
                    return allInputIds;
                } else {
                    Iterator var3 = parentTransformations.iterator();
                    while(var3.hasNext()) {
                        Transformation transformation = (Transformation)var3.next();
                        allInputIds.add(this.transform(transformation));
                    }
                    return allInputIds;
                }
            }
            private String determineSlotSharingGroup(String specifiedGroup, Collection inputIds) {
                if (specifiedGroup != null) {
                    return specifiedGroup;
                } else {
                    String inputGroup = null;
                    Iterator var4 = inputIds.iterator();
                    while(var4.hasNext()) {
                        int id = (Integer)var4.next();
                        String inputGroupCandidate = this.streamGraph.getSlotSharingGroup(id);
                        if (inputGroup == null) {
                            inputGroup = inputGroupCandidate;
                        } else if (!inputGroup.equals(inputGroupCandidate)) {
                            return "default";
                        }
                    }
                    return inputGroup == null ? "default" : inputGroup;
                }
            }
            static {
                DEFAULT_TIME_CHARACTERISTIC = TimeCharacteristic.ProcessingTime;
                Map, TransformationTranslator> tmp = new HashMap();
                tmp.put(OneInputTransformation.class, new OneInputTransformationTranslator());
                tmp.put(TwoInputTransformation.class, new TwoInputTransformationTranslator());
                tmp.put(MultipleInputTransformation.class, new MultiInputTransformationTranslator());
                tmp.put(KeyedMultipleInputTransformation.class, new MultiInputTransformationTranslator());
                tmp.put(SourceTransformation.class, new SourceTransformationTranslator());
                tmp.put(SinkTransformation.class, new SinkTransformationTranslator());
                tmp.put(LegacySinkTransformation.class, new LegacySinkTransformationTranslator());
                tmp.put(LegacySourceTransformation.class, new LegacySourceTransformationTranslator());
                tmp.put(UnionTransformation.class, new UnionTransformationTranslator());
                tmp.put(PartitionTransformation.class, new PartitionTransformationTranslator());
                tmp.put(SideOutputTransformation.class, new SideOutputTransformationTranslator());
                tmp.put(ReduceTransformation.class, new ReduceTransformationTranslator());
                tmp.put(TimestampsAndWatermarksTransformation.class, new TimestampsAndWatermarksTransformationTranslator());
                tmp.put(BroadcastStateTransformation.class, new BroadcastStateTransformationTranslator());
                tmp.put(KeyedBroadcastStateTransformation.class, new KeyedBroadcastStateTransformationTranslator());
                translatorMap = Collections.unmodifiableMap(tmp);
                iterationIdCounter = 0;
            } 
        }