Flink是新一代流批统一的计算引擎,它需要从不同的第三方存储引擎中把数据读过来,进行处理,然后再写出到另外的存储引擎中。Connector的作用就相当于一个连接器,连接Flink计算引擎跟外界存储系统。Flink里有以下几种方式,当然也不限于这几种方式可以跟外界进行数据交换:
【1】Flink里面预定义了一些source和sink;
【2】Flink内部也提供了一些Boundled connectors;
【3】可以使用第三方Apache Bahir项目中提供的连接器;
【4】是通过异步IO方式;
Flink里预定义了一部分source和sink。在这里分了几类。
如果要从文本文件中读取数据,可以直接使用:
env.readTextFile(path)
就可以以文本的形式读取该文件中的内容。当然也可以使用:根据指定的fileInputFormat格式读取文件中的内容。
env.readFile(fileInputFormat, path)
如果数据在Flink内进行了一系列的计算,想把结果写出到文件里,也可以直接使用内部预定义的一些sink,比如将结果已文本或csv格式写出到文件中,可以使用DataStream的writeAsText(path)和DataSet的writeAsCsv(path)。
提供 Socket的host name及port,可以直接用StreamExecutionEnvironment预定的接口socketTextStream创建基于Socket的source,从该 socket中以文本的形式读取数据。当然如果想把结果写出到另外一个Socket,也可以直接调用DataStream writeToSocket。
//从 socket 中读取数据流 env.socketTextStream("localhost",777); //输出至 socket resultDataStream.writeToSocket("hadoop1",6666,new SimpleStringSchema())
可以直接基于内存中的集合或者迭代器,调用StreamExecutionEnvironment fromCollection、fromElements构建相应的source。结果数据也可以直接print、printToError的方式写出到标准输出或标准错误。详细也可以参考Flink源码中提供的一些相对应的Examples来查看异常预定义 source和sink的使用方法,例如WordCount、SocketWindowWordCount。
//从Java.util.Collection集合中读取数据作为数据源 ArrayListlist = new ArrayList<>(5); list.add("flink"); list.add("scala"); list.add("spark"); list.add("hadoop"); list.add("hive"); env.fromCollection(list).print(); //从Java.util.Collection集合中读取数据作为数据源 env.fromElements("flink", "scala", "spark", "hadoop", "hive").print();
Flink里已经提供了一些绑定的Connector,例如kafka source和sink,Es sink等。读写kafka、es、rabbitMQ时可以直接使用相应 connector的api即可。
虽然该部分是Flink项目源代码里的一部分,但是真正意义上不算作Flink引擎相关逻辑,并且该部分没有打包在二进制的发布包里面。所以在提交Job时候需要注意,job代码jar包中一定要将相应的connetor相关类打包进去,否则在提交作业时就会失败,提示找不到相应的类,或初始化某些类异常。
Apache Bahir最初是从Apache Spark中独立出来项目提供,以提供不限于Spark相关的扩展 / 插件、连接器和其他可插入组件的实现。通过提供多样化的流连接器streaming connectors和SQL数据源扩展分析平台的覆盖面。如有需要写到flume、redis的需求的话,可以使用该项目提供的connector。
流计算中经常需要与外部存储系统交互,比如需要关联MySQL中的某个表。一般来说,如果用同步I/O的方式,会造成系统中出现大的等待时间,影响吞吐和延迟。为了解决这个问题,异步I/O可以并发处理多个请求,提高吞吐,减少延迟。Async的原理可参考官方文档
上一篇:RabbitMQ顺序消费