Flink从入门到实践(一):Flink入门、Flink部署
Flink从入门到实践(二):Flink DataStream API
Flink从入门到实践(三):数据实时采集 - Flink MySQL CDC
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/overview/
/** * 1、获取Flink执行的环境 * getExecutionEnvironment() 这是我们用的最多的一种 * createLocalEnvironment() 这种仅限于本地开发使用 * createRemoteEnvironment(String host, int port, String... jarFiles); 知道就行,开发不用 * * * getExecutionEnvironment 传入一个 new Configuration(),本质上是一个HashMap */ // StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(new Configuration()); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(3000); // 3秒检查一次,提高应用程序的容错性和数据一致性。 DataStreamtext = env.readTextFile("file:///path/to/file");
通常来说使用getExecutionEnvironment()就可以了,会自动选择你当前的运行环境。
我们可以使用env.addSource(sourceFunction)来添加数据来源,实际有许多内置的Source,也可以定义自己的Source。
如果想要自定义数据来源,比如说(该方式在1.18已过时,推荐使用Source接口):
实现SourceFunction接口来实现单并行度的数据来源;
实现ParallelSourceFunction接口来实现多并行度的数据来源;
实现RichParallelSourceFunction接口来实现更高级的多并行度的数据来源。
内置的数据来源(本质上也是使用env.addSource(sourceFunction)来已经预实现了):
env.readTextFile(path):逐行读取文本文件,即符合TextInputFormat规范的文件,并将其作为字符串返回。
readFile(fileInputFormat, path):按照指定的文件输入格式读取(一次)文件。
readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo):更加复杂的文件处理。
socketTextStream():从Socket读取。元素可以用分隔符分隔。
fromCollection(Collection)、fromCollection(Iterator, Class)、fromElements(T ...)、fromParallelCollection(SplittableIterator, Class)、generateSequence(from, to):从集合读取。
addSource(new FlinkKafkaConsumer<>(...)):从kafka读取。
// 实体类 public class Access { private long time; private String domain; private double traffic; } public class Student { private int id; private String name; private int age; }
// 工具类 需要引入mysql-connector-java包 import java.sql.Connection; import java.sql.DriverManager; public class MySQLUtils { public static Connection getConnection() throws Exception { Class.forName("com.mysql.jdbc.Driver"); return DriverManager.getConnection("jdbc:mysql://localhost:3306/flink", "root", "123"); } public static void close(AutoCloseable closeable) { if(null != closeable) { try { closeable.close(); // null.close } catch (Exception e) { e.printStackTrace(); } finally { closeable = null; } } } }
// 自定义source /** * 自定义数据源 * 并行度为1 */ public class AccessSource implements SourceFunction{ volatile boolean isRunning = true; /** * 造数据是自定义数据源的使用方式之一 * @param ctx * @throws Exception */ @Override public void run(SourceContext ctx) throws Exception { Random random = new Random(); String[] domains = {"test1.com","test2.com","test3.com"}; while (isRunning) { long time = System.currentTimeMillis(); ctx.collect(new Access(time, domains[random.nextInt(domains.length)], random.nextInt(1000) + 1000)); Thread.sleep(2000); } } @Override public void cancel() { isRunning = false; } } /** * 自定义数据源 * 多并行度 */ public class AccessSourceV2 implements ParallelSourceFunction { volatile boolean isRunning = true; /** * 造数据是自定义数据源的使用方式之一 * @param ctx * @throws Exception */ @Override public void run(SourceContext ctx) throws Exception { Random random = new Random(); String[] domains = {"test1.com","test2.com","test3.com"}; while (isRunning) { long time = System.currentTimeMillis(); ctx.collect(new Access(time, domains[random.nextInt(domains.length)], random.nextInt(1000) + 1000)); Thread.sleep(5000); } } @Override public void cancel() { isRunning = false; } } /** * RichSourceFunction: Rich + SourceFunction * Rich: 包含了生命周期方法 open close * SourceFunction:单 * * 自定义二次开发:按照框架(Flink/Spark/....)所提供的接口,去实现自己的业务逻辑即可 * 自定义Source * 自定义Sink * * * 扩展:对于Spark SQL的外部数据源熟悉吗? 按照Spark所提供的接口,自己实现业务逻辑 * */ public class MySQLSource extends RichSourceFunction { Connection connection; PreparedStatement pstmt; /** * 初始化操作,建立connection */ @Override public void open(Configuration parameters) throws Exception { connection = MySQLUtils.getConnection(); pstmt = connection.prepareStatement("select * from student"); } /** * 释放资源,关闭connection */ @Override public void close() throws Exception { MySQLUtils.close(pstmt); MySQLUtils.close(connection); } /** * 业务逻辑:就是把表中的数据读取出来 ==> Student */ @Override public void run(SourceContext ctx) throws Exception { ResultSet rs = pstmt.executeQuery(); while (rs.next()) { int id = rs.getInt("id"); String name = rs.getString("name"); int age = rs.getInt("age"); Student student = new Student(id, name, age); ctx.collect(student); } } @Override public void cancel() { } }
/** * Flink中datasource的使用 */ public class FlinkDataSourceApp { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); /** * 使用内置的dataSource */ // DataStreamSourcesource = env.readFile(new TextInputFormat(null), "data/wc.data"); // // 这个readTextFile方法底层其实调用的就是readFile // DataStreamSource source = env.readTextFile("data/wc.txt"); // System.out.println(source.getParallelism()); // 8 // // SingleOutputStreamOperator mapStream = source.map(String::toUpperCase); // System.out.println(mapStream.getParallelism()); // mapStream.print(); // // DataStreamSource source = env.fromParallelCollection(new NumberSequenceIterator(1, 10), Long.TYPE); // System.out.println(source.getParallelism());// 8 // SingleOutputStreamOperator map = source.map(x -> x + 1); // map.print(); // // DataStreamSource source = env.addSource(new AccessSourceV2()).setParallelism(3); // 对于ParallelSourceFunction是可以根据具体情况来设定并行度的 // System.out.println(source.getParallelism()); // source.print(); /** * 使用自定义数据源 */ // env.addSource(new AccessSource()).print(); // env.addSource(new AccessSourceV2()).setParallelism(3).print(); // 多并行度的可以自行设置并行度 /** * 使用Flink自定义MySQL的数据源,进而读取MySQL里面的数据 * 该方式已过时 …… flink更新太快了 */ env.addSource(new MySQLSource()).print(); /** * 单并行度:fromElements fromCollection socketTextStream * 多并行度:readTextFile fromParallelCollection generateSequence readFile * 自定义: */ env.execute("作业名字"); } }
暂无
官方文档:
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/operators/overview/
运算符将一个或多个数据流转换为新的数据流。程序可以将多种转换组合成复杂的数据流拓扑。
把DataStream转换成新的DataStream。
// 将读取的文件,按照,分割,然后每一行数据组成一个Access对象 DataStreamdataStream = env.readTextFile("data/access.log"); SingleOutputStreamOperator mapStream = dataStream.map(new MapFunction () { @Override public Access map(String value) throws Exception { String[] splits = value.split(","); Access access = new Access(); access.setTime(Long.parseLong(splits[0].trim())); access.setDomain(splits[1].trim()); access.setTraffic(Double.parseDouble(splits[2].trim())); return access; } }); mapStream.print();
把DataStream转换成新的DataStream。
计算每个元素的布尔函数,并保留函数返回true的元素。
也即:过滤出满足条件的元素。
// 过滤出不为0的元素 dataStream.filter(new FilterFunction() { @Override public boolean filter(Integer value) throws Exception { return value != 0; } });
把DataStream转换成新的DataStream。
可以是一对一、一对多、一对0 一个元素进来,可以出去0、1、多个元素。
dataStream.flatMap(new FlatMapFunction() { @Override public void flatMap(String value, Collector out) throws Exception { for(String word: value.split(" ")){ // 把每一个元素按空格分割 out.collect(word); // 收集每一个 分割后的 元素 } } });
把DataStream转换为KeyedStream 。
在逻辑上将流划分为不相交的分区。具有相同关键字的所有记录都被分配到同一个分区。
在内部,keyBy()是通过散列分区实现的。
(类似Map - Reduce思想)
注意!如果是根据一个对象分组,要重写 hashCode()方法,否则会使用默认的Object.hashCode()。
// 根据value的某个属性分组,相当于mysql的group by // 通常分组之后,就要求和、求一些统计数据了 dataStream.keyBy(value -> value.getSomeKey()); dataStream.keyBy(value -> value.f0); dataStream .keyBy(value -> value.getSomeKey()) .sum("field") // 根据字段求和还可以求最大值最小值等 .print();
把KeyedStream 转换为 DataStream。
将当前元素与最后一个减少的值合并,并发出新值。
keyedStream.reduce(new ReduceFunction() { @Override public Integer reduce(Integer value1, Integer value2) throws Exception { return value1 + value2; } });
把多个DataStream合并为一个DataStream。
两个或多个数据流的联合,创建一个包含所有流中所有元素的新流。注意:如果您将数据流与其自身联合,您将在结果流中两次获得每个元素。
/** * union:合并多个流 * 数据类型问题:union的多个流中数据类型是需要相同的 * 数据类型相同的多流操作 */ DataStreamSourcestream1 = env.fromElements(1, 2, 3); DataStreamSource stream2 = env.fromElements(11, 12, 13); DataStreamSource stream3 = env.fromElements("A", "B", "C"); stream1.union(stream2).map(x -> "PK_" + x).print(); stream1.union(stream1).print(); stream1.union(stream1, stream2).print();
把两个DataStream 合并为 ConnectedStream。
DataStreamsomeStream = //... DataStream otherStream = //... ConnectedStreams connectedStreams = someStream.connect(otherStream);
/** * connect: 数据类型可以不同 * 两个流的操作 * 只是形式的连接 */ ConnectedStreamsconnectedStreams = stream1.connect(stream3); connectedStreams.map(new CoMapFunction () { // 共享状态 String prefix = "common_"; // 对第一个流的操作 @Override public String map1(Integer value) throws Exception { return prefix + value*10; } // 对第二个流的操作 @Override public String map2(String value) throws Exception { return prefix + value.toLowerCase(); } }).print();
将ConnectedStream 转换为 DataStream。
类似于连接数据流上的map和flatMap。
connectedStreams.map(new CoMapFunction() { @Override public Boolean map1(Integer value) { return true; } @Override public Boolean map2(String value) { return false; } }); connectedStreams.flatMap(new CoFlatMapFunction () { @Override public void flatMap1(Integer value, Collector out) { out.collect(value.toString()); } @Override public void flatMap2(String value, Collector out) { for (String word: value.split(" ")) { out.collect(word); } } });
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.Partitioner; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class PartitionTest2 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(3); // 一般情况下,并行度跟分区相同,相同分区在同一个线程中执行 DataStreamSourcesourcePartition = env.readTextFile("data/access.log"); sourcePartition // 读取数据转成Access对象 .map(new RichMapFunction () { @Override public Access map(String value) throws Exception { String[] splits = value.split(","); Access access = new Access(); access.setTime(Long.parseLong(splits[0].trim())); access.setDomain(splits[1].trim()); access.setTraffic(Double.parseDouble(splits[2].trim())); return access; } }) // 按照指定字段进行分区 .partitionCustom(new Partitioner () { @Override public int partition(String key, int numPartitions) { System.out.println(numPartitions); if("test1.com".equals(key)) { return 0; } else if("test2.com".equals(key)) { return 1; } else { return 2; } } }, x -> x.getDomain()) // 下面的这段map方法目的是验证:相同的域名是否真的在同一个分区内,看threadid是否相同即可 .map(new MapFunction () { @Override public Access map(Access value) throws Exception { System.out.println("current thread id is " + Thread.currentThread().getId() + ", value is:" + value); return value; } }).print(); env.execute("作业名字"); } }
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/side_output/
/** * 分流操作:把一个分拆分成多个流 * * split 在老的flink版本中是有的,但是新的版本中已经没有这个api * * 那就说明新版本肯定提供了更好用的使用方式:side output */ DataStreamSourcesource = env.readTextFile("data/access.log"); SingleOutputStreamOperator stream = source.map(new AccessConvertFunction()); // 很low的写法 // SingleOutputStreamOperator pk1Stream = stream.filter(x -> "test1.com".equals(x.getDomain())); // SingleOutputStreamOperator pk2Stream = stream.filter(x -> "test1.com".equals(x.getDomain())); // pk1Stream.print("域名是pk1.com的流"); // pk2Stream.print("域名是pk2.com的流"); // 定义两个Tag OutputTag test1OutputTag = new OutputTag ("test1"){}; OutputTag test2OutputTag = new OutputTag ("test2"){}; SingleOutputStreamOperator processStream = stream.process(new ProcessFunction () { @Override public void processElement(Access value, Context ctx, Collector out) throws Exception { if ("test1.com".equals(value.getDomain())) { ctx.output(test1OutputTag, value); // pk1.com的走pk1的OutputTag } else if ("test2.com".equals(value.getDomain())) { ctx.output(test2OutputTag, value); // pk2.com的走pk2的OutputTag } else { out.collect(value); // pk3.com的走主流 } } }); processStream.print("主流:"); processStream.getSideOutput(test1OutputTag).print("test1的:"); processStream.getSideOutput(test2OutputTag).print("test2的:"); env.execute("作业名字");
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/overview/#data-sinks
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/datastream/overview/
数据接收器消费数据流,并将它们转发到文件、Socket、外部系统或打印它们。
Flink自带多种内置输出格式:
writeAsText() / TextOutputFormat:将元素作为字符串逐行写入。字符串是通过调用每个元素的toString()方法获得的。
writeAsCsv(...) / CsvOutputFormat:将元组写入逗号分隔值文件。行和字段分隔符是可配置的。每个字段的值来自对象的toString()方法。
print() / printToErr():打印标准输出/标准错误流中每个元素的toString()值。可选地,可以提供一个前缀(msg),将其附加到输出的前面。这有助于区分不同的打印调用。如果并行度大于1,输出还将加上产生输出的任务的标识符。
writeUsingOutputFormat() / FileOutputFormat:自定义文件输出的方法和基类。支持自定义对象到字节的转换。
writeToSocket:根据SerializationSchema将元素写入Socket。
addSink:调用自定义接收器函数。Flink与其他系统(如Apache Kafka)的连接器捆绑在一起,这些连接器被实现为sink函数。
stream.print(); /* >7> Access{time=202810110120, domain='test1.com', traffic=2000.0} 1> Access{time=202810110120, domain='test2.com', traffic=4000.0} 11> Access{time=202810110120, domain='test1.com', traffic=5000.0} 4> Access{time=202810110120, domain='test3.com', traffic=1000.0} 9> Access{time=202810110120, domain='test2.com', traffic=6000.0} 线程号 + 数据.toString() 如果这样: stream.print().setParallelism(1); 并行度设置为1,那么前面就不会输出数字 这样打印红色: stream.printToErr(); */
源码:
stream.addSink(new RichSinkFunction() { int subTaskId; // num> @Override public void open(Configuration parameters) throws Exception { super.open(parameters); subTaskId = getRuntimeContext().getIndexOfThisSubtask(); } @Override public void invoke(Access value, SinkFunction.Context context) throws Exception { System.out.println(subTaskId + 1 + "> " + value); // 最终执行的方法,输出到终端 } });
// 已过时 // 写入到文件,每一个并行度,会生成一个文件。并行度为1会生成test一个文件 stream.writeAsText("out/test", FileSystem.WriteMode.OVERWRITE).setParallelism(1); // 也已经过时了,推荐使用 org.apache.flink.connector.file.sink.FileSink,需要额外引入包 StreamingFileSinkfileSink = StreamingFileSink .forRowFormat(new Path("out"), new SimpleStringEncoder()) .withRollingPolicy(DefaultRollingPolicy.builder() // 构建文本滚动生成的策略 .withRolloverInterval(Duration.ofMinutes(15)) // 按时间间隔滚动 .withInactivityInterval(Duration.ofSeconds(5)) // 按不活跃滚动 .withMaxPartSize(MemorySize.ofMebiBytes(1)) // 按大小滚动 .build()) .build(); // 数据类型需要前后对应 stream.map(Access::toString).addSink(fileSink);
JdbcSink.sink提供至少一次保证。然而有效的是,通过创建upsert SQL语句或幂等SQL更新可以实现“恰好一次”。
org.apache.flink flink-connector-jdbc 3.1.1-1.17
// 写入到mysql // 需要使用upsert语句 SinkFunctionjdbcSink = JdbcSink.sink( // sql "insert into access (id, name) values (?, ?) on duplicate key update name=VALUES(name)", // sql的参数 (JdbcStatementBuilder ) (preparedStatement, access) -> { preparedStatement.setInt(1, (int)access.getTraffic()); preparedStatement.setString(2, access.getDomain()); }, // 执行参数 JdbcExecutionOptions.builder() .withBatchSize(5) .withBatchIntervalMs(200) .withMaxRetries(5) // 重试 .build(), // jdbc连接信息 new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withUrl("jdbc:mysql://192.168.56.10:3306/testdb") .withDriverName("com.mysql.jdbc.Driver") .withUsername("root") .withPassword("root") .build() ); stream.addSink(jdbcSink);
自1.13起, Flink JDBC sink支持恰好一次模式。该实现依赖于XA标准的JDBC驱动程序支持。如果数据库也支持XA,则大多数驱动程序都支持XA(因此驱动程序通常是相同的)。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env .fromElements(...) .addSink(JdbcSink.exactlyOnceSink( "insert into books (id, title, author, price, qty) values (?,?,?,?,?)", (ps, t) -> { ps.setInt(1, t.id); ps.setString(2, t.title); ps.setString(3, t.author); ps.setDouble(4, t.price); ps.setInt(5, t.qty); }, JdbcExecutionOptions.builder() .withMaxRetries(0) .build(), JdbcExactlyOnceOptions.defaults(), () -> { // create a driver-specific XA DataSource // The following example is for derby EmbeddedXADataSource ds = new EmbeddedXADataSource(); ds.setDatabaseName("my_db"); return ds; }); env.execute();
https://bahir.apache.org/docs/flink/current/flink-streaming-redis/
public static class RedisExampleMapper implements RedisMapper>{ @Override public RedisCommandDescription getCommandDescription() { return new RedisCommandDescription(RedisCommand.HSET, "HASH_NAME"); } @Override public String getKeyFromData(Tuple2 data) { return data.f0; } @Override public String getValueFromData(Tuple2 data) { return data.f1; } } FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build(); DataStream stream = ...; stream.addSink(new RedisSink >(conf, new RedisExampleMapper());
// 输出到Socket,注意类型匹配,输出为字符串 stream.map(Access::toString).writeToSocket("localhost", 9528, new SimpleStringSchema());
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/datastream/kafka/
上一篇:HBase 进阶