相关推荐recommended
SpringBoot集成flink
作者:mmseoamin日期:2024-04-27

Flink是一个批处理和流处理结合的统一计算框架,其核心是一个提供了数据分发以及并行化计算的流数据处理引擎。

最大亮点是流处理,最适合的应用场景是低时延的数据处理。

场景:高并发pipeline处理数据,时延毫秒级,且兼具可靠性。

环境搭建:

①、安装flink

https://nightlies.apache.org/flink/flink-docs-master/zh/docs/try-flink/local_installation/

②、安装Netcat

Netcat(又称为NC)是一个计算机网络工具,它可以在两台计算机之间建立 TCP/IP 或 UDP 连接。

用于测试网络中的端口,发送文件等操作。

进行网络调试和探测,也可以进行加密连接和远程管理等高级网络操作

yum install -y nc # 安装nc命令
nc -lk 8888 # 启动socket端口

无界流之读取socket文本流

一、依赖



    
        springboot-demo
        com.et
        1.0-SNAPSHOT
    
    4.0.0
    flink
    
        8
        8
    
    
        
            org.springframework.boot
            spring-boot-starter-web
        
        
            org.springframework.boot
            spring-boot-autoconfigure
        
        
            org.springframework.boot
            spring-boot-starter-test
            test
        
        
        
            org.apache.flink
            flink-streaming-java
            1.17.0
        
        
        
            org.apache.flink
            flink-java
            1.17.0
        
        
        
            org.apache.flink
            flink-clients
            1.17.0
        
        
        
            org.apache.flink
            flink-connector-base
            1.17.0
        
        
            org.apache.flink
            flink-connector-files
            1.17.0
        
        
        
            org.apache.flink
            flink-connector-kafka
            1.17.0
        
        
            org.apache.flink
            flink-runtime-web
            1.17.0
        
    
    
        
            
                org.apache.maven.plugins
                maven-shade-plugin
                
                    
                        package
                        
                            shade
                        
                        
                            
                                
                                    META-INF/spring.handlers
                                
                                
                                    META-INF/spring.factories
                                
                                
                                    META-INF/spring.schemas
                                
                                
                                
                                    com.et.flink.job.SocketJob
                                
                            
                        
                    
                
            
        
    

二、SoketJob

public class SocketJob{
	
	public static void main(String[] args)throws Exception{
		
		// 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 指定并行度,默认电脑线程数
        env.setParallelism(3);
        // 读取数据socket文本流 指定监听 IP 端口 只有在接收到数据才会执行任务
        DataStreamSource socketDS = env.socketTextStream("172.24.4.193", 8888);
        // 处理数据: 切换、转换、分组、聚合 得到统计结果
        SingleOutputStreamOperator> sum = socketDS
                .flatMap(
                        (String value, Collector> out) -> {
                            String[] words = value.split(" ");
                            for (String word : words) {
                                out.collect(Tuple2.of(word, 1));
                            }
                        }
                )
                .setParallelism(2)
                // // 显式地提供类型信息:对于flatMap传入Lambda表达式,系统只能推断出返回的是Tuple2类型,而无法得到Tuple2。只有显式设置系统当前返回类型,才能正确解析出完整数据
                .returns(new TypeHint>() {
                })
//                .returns(Types.TUPLE(Types.STRING,Types.INT))
                .keyBy(value -> value.f0)
                .sum(1);
        // 输出
        sum.print();
        // 执行
        env.execute();
	}
}

测试:

启动socket流:

nc -l 8888

本地执行:直接ideal启动main程序,在socket流中输入

abc bcd cde
bcd cde fgh
cde fgh hij

SpringBoot集成flink,在这里插入图片描述,第1张

集群执行:

执行maven打包,将打包的jar上传到集群中

SpringBoot集成flink,在这里插入图片描述,第2张