docker 搭建 flink 并上传任务
作者:mmseoamin日期:2024-02-06

文章目录

        • 一、docker 搭建 flink
          • 1、选择合适的 flink 版本
          • 2、重新创建 JobManager、TaskManager 容器并挂载配置文件
          • 二、flink 简单示例
            • 1、创建项目架构
            • 2、批处理简单示例
            • 3、流处理简单示例
            • 4、上传 flink 集群
              • ①、UI 界面提交任务
              • ②、命令提交任务
              • 5、web-ui 提交查看撤销任务
              • 三、待解决
                一、docker 搭建 flink
                1、选择合适的 flink 版本

                docker 安装就不介绍了,去 dockerHub 搜索 flink 镜像,选择合适的版本安装 https://hub.docker.com/_/flink/tags

                使用 docker 命令 docker pull flink: 1.16.0-scala_2.12-java8拉去镜像

                docker 搭建 flink 并上传任务,在这里插入图片描述,第1张

                1.16.0-scala_2.12-java8 镜像版本说明,flink 1.16.0,flink 内置 scala 版本 2.12,Java 版本 8

                建议先简单启动 flink 容器 JobManager、TaskManager 两个容器将配置文件复制出来方便挂载

                # 创建 docker 网络,方便 JobManager 和 TaskManager 内部访问
                 docker network create flink-network
                # 创建 JobManager 
                 docker run \
                  -itd \
                  --name=jobmanager \
                  --publish 8081:8081 \
                  --network flink-network \
                  --env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" \
                  flink:1.16.0-scala_2.12-java8 jobmanager 
                  
                # 创建 TaskManager 
                 docker run \
                  -itd \
                  --name=taskmanager \
                  --network flink-network \
                  --env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" \
                  flink:1.16.0-scala_2.12-java8 taskmanager 
                

                启动成功

                docker 搭建 flink 并上传任务,在这里插入图片描述,第2张

                访问 8081 端口如下

                docker 搭建 flink 并上传任务,在这里插入图片描述,第3张

                copy 配置文件

                # jobmanager 容器
                 docker cp jobmanager:/opt/flink/conf ./JobManager/
                # taskmanager 容器
                docker cp taskmanager:/opt/flink/conf ./TaskManager/
                
                2、重新创建 JobManager、TaskManager 容器并挂载配置文件

                修改 JobManager/conf/flink-conf.yaml web 端口号为 18081

                docker 搭建 flink 并上传任务,在这里插入图片描述,第4张

                修改 TaskManager/conf/flink-conf.yaml 容器任务槽为 5

                docker 搭建 flink 并上传任务,在这里插入图片描述,第5张

                启动容器挂载配置文件

                # 启动 jobmanager   
                docker run -itd  -v /root/docker/flink/JobManager/conf/:/opt/flink/conf/ --name=jobmanager --publish 18081:18081 --env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" --network flink-network flink:1.16.0-scala_2.12-java8 jobmanager
                # 启动 taskmanager   
                docker run -itd  -v /root/docker/flink/TaskManager/conf/:/opt/flink/conf/ --name=taskmanager --network flink-network --env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager"  flink:1.16.0-scala_2.12-java8 taskmanager
                

                参数解释

                • FLINK_PROPERTIES=“jobmanager.rpc.address: jobmanager” rpc 地址,必须设置,否则 jobmanager 和 taskmanager 的 rpc 地址都是随机生成,会连接不上,当然你也可以在直接修改配置文件 flink-conf.yaml

                  如下两个容器启动成功,可以看到 web 端口为 18081,taskmanager 启动一个,包含 5 个任务槽

                  docker 搭建 flink 并上传任务,在这里插入图片描述,第6张

                  二、flink 简单示例

                  官网参考地址:https://nightlies.apache.org/flink/flink-docs-release-1.16/zh/docs/dev/configuration/overview/#getting-started

                  1、创建项目架构

                  使用 maven 命令指定原型 Flink Maven Archetype 快速创建一个包含了必要依赖的 Flink 程序骨架,自定义项目 groupId、artifactId、package 等信息

                  mvn archetype:generate ^
                    -DarchetypeGroupId=org.apache.flink ^
                    -DarchetypeArtifactId=flink-quickstart-java ^
                    -DarchetypeVersion=1.16.0	^
                    -DgroupId=com.ye ^
                    -DartifactId=flink-study ^
                    -Dversion=0.1 ^
                    -Dpackage=com.ye ^
                    -DinteractiveMode=false
                  

                  下载成功打开项目目录

                  docker 搭建 flink 并上传任务,在这里插入图片描述,第7张

                  如下:注意运行需要设置启动参数,否则启动会找不到类,因为 pom.xml 文件 flink 相关包都添加了 provided 表示只用于生产环境,另一种方法就是将 provided 修改为runtime

                  docker 搭建 flink 并上传任务,在这里插入图片描述,第8张

                  流处理和批处理在 flink 低版本(貌似1.12)需要区分,目前都使用流处理写法

                  2、批处理简单示例

                  下面代码用来统计单词出现的的次数

                  public class DataBatchJob {
                      /* 下面示例统计单词出现的次数 */
                      public static void main(String[] args) throws Exception {
                          // 获取 flink 环境
                          final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                          // 添加数据源
                          DataStreamSource streamSource = env.fromElements("hello world", "hello flink", "flink", "hello", "world");
                          // 对传入的流数据分组
                          SingleOutputStreamOperator> streamOperator = streamSource.flatMap(new FlatMapFunction>() {
                              // value 传入的数据,out
                              // Tuple2 二元组
                              // out 传出的值
                              @Override
                              public void flatMap(String value, Collector> out) throws Exception {
                                  String[] split = value.split(" ");
                                  for (String s : split) {
                                      out.collect(Tuple2.of(s, 1));
                                  }
                              }
                          });
                          // 按二元组的第 0 个位置分组
                          KeyedStream, Tuple> keyBy = streamOperator.keyBy(0);
                          // 按二元组的第 1 个位置求和
                          SingleOutputStreamOperator> sum = keyBy.sum(1);
                          sum.print();
                          env.execute("统计单词出现的次数");
                      }
                  }
                  

                  执行结果如下

                  docker 搭建 flink 并上传任务,在这里插入图片描述,第9张

                  上传 flink 集群

                  3、流处理简单示例

                  下面示例通过 socket 文本源,对输入的大于 500 和小于 500 的分别求和

                  public class DataStreamJob {
                      private static final Logger logger = LoggerFactory.getLogger(DataStreamJob.class);
                      /* 下面示例对大于 500 和小于 500 的分别求和 */
                      public static void main(String[] args) throws Exception {
                          
                          // 获取 flink 环境
                          final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                          // 添加 socket 文本流数据源
                          //DataStreamSource streamSource = env.fromElements("200", "100", "6000", "500", "2000", "300", "1500", "900");
                          DataStreamSource streamSource = env.socketTextStream("127.0.0.1", 7777);
                          // 对大于 500 和小于 500 进行分组
                          KeyedStream stringKeyedStream = streamSource.keyBy(new KeySelector() {
                              @Override
                              public String getKey(String s) throws Exception {
                                  int i = Integer.parseInt(s);
                                  return i > 500 ? "ge" : "lt";
                              }
                          });
                          // 开 10 秒滚动窗口,每 10 秒为一批数据 【00:00:00 ~ 00:00:10)、【00:00:10 ~ 00:00:20)左闭右开区间
                          WindowedStream windowedStream = stringKeyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
                          
                          // 窗口处理函数,泛型 String, Integer, String, TimeWindow 依次对应 输入类型、输出类型、 KEY类型(即keyBy 返回的类型), 窗口
                          SingleOutputStreamOperator outputStreamOperator = windowedStream.process(new ProcessWindowFunction() {
                              /*
                              * key: 分组的 key
                              * context: 上下文信息
                              * elements: 传过来的一批数据
                              * out: 数据输出
                              * */
                              @Override
                              public void process(String key, ProcessWindowFunction.Context context, Iterable elements, Collector out) throws Exception {
                                  System.out.println(key);
                                  AtomicInteger sum = new AtomicInteger();
                                  elements.forEach(item -> sum.addAndGet(Integer.parseInt(item)));
                                  out.collect(sum.get());
                              }
                          });
                          // 输出
                          outputStreamOperator.print();
                          env.execute("分组求和");
                      }
                  }
                  

                  在 window 或 Linux 开启 Socket 文本流测试

                  docker 搭建 flink 并上传任务,在这里插入图片描述,第10张

                  4、上传 flink 集群

                  打包项目:可以在 pom.xml 修改启动类,也可以在命令启动或者 ui 界面上传设置启动类参数

                  docker 搭建 flink 并上传任务,在这里插入图片描述,第11张

                  ①、UI 界面提交任务

                  使用 ui 界面上传 jar 到 flink 集群,点击 submit 运行

                  docker 搭建 flink 并上传任务,在这里插入图片描述,第12张

                  ②、命令提交任务
                  # 如果集群( 即JobManager) 在当前服务器可以使用如下命令
                  	$ bin/flink run -Dexecution.runtime-mode=BATCH 
                  # 如果集群( 即JobManager) 不在当前服务器,在 TaskManager 服务器提交作业可以使用如下命令
                  	# -m 指定 JobManager 服务器地址
                  	# -c 指定作业入口程序
                  	# -p 指定并行度
                  	$ bin/flink run -m 192.168.1.1:8081 -c com.ye.StreamWordCount -p 2 
                  # 撤销任务	
                  	$ bin/flink cancle 
                  
                  5、web-ui 提交查看撤销任务

                  批处理运行完成docker 搭建 flink 并上传任务,在这里插入图片描述,第13张

                  流处理正在运行

                  docker 搭建 flink 并上传任务,在这里插入图片描述,第14张

                  三、待解决

                  使用 docker 启动的 flink 集群发现 UI 界面的 stdout 没有 print 输出

                  docker 搭建 flink 并上传任务,在这里插入图片描述,第15张