相关推荐recommended
【Flink-Kafka-To-ClickHouse】使用 Flink 实现 Kafka 数据写入 ClickHouse
作者:mmseoamin日期:2024-02-05

【Flink-Kafka-To-ClickHouse】使用 Flink 实现 Kafka 数据写入 ClickHouse

  • 1)导入相关依赖
  • 2)代码实现
    • 2.1.resources
      • 2.1.1.appconfig.yml
      • 2.1.2.log4j.properties
      • 2.1.3.log4j2.xml
      • 2.1.4.flink_backup_local.yml
      • 2.2.utils
        • 2.2.1.DBConn
        • 2.2.2.CommonUtils
        • 2.2.3.RemoteConfigUtil
        • 2.2.4.ClickhouseUtil
        • 2.3.flatmap
          • 2.3.1.FlatMapFunction
          • 2.4.sink
            • 2.4.1.ClickHouseCatalog
            • 2.5.Kafka2ClickHouse
              • 2.5.1.Kafka2chApp
              • 2.5.2.Kafka2Ck-ODS

                需求描述:

                1、数据从 Kafka 写入 ClickHouse。

                2、相关配置存放于 Mysql 中,通过 Mysql 进行动态读取。

                3、此案例中的 Kafka 是进行了 Kerberos 安全认证的,如果不需要自行修改。

                4、先在 ClickHouse 中创建表然后动态获取 ClickHouse 的表结构。

                5、Kafka 数据为 Json 格式,通过 FlatMap 扁平化处理后,根据表结构封装到 Row 中后完成写入。

                6、写入时转换成临时视图模式,利用 Flink-Sql 实现数据写入。

                7、本地测试时可以编辑 resources.flink_backup_local.yml 通过 ConfigTools.initConf 方法获取配置。

                1)导入相关依赖

                这里的依赖比较冗余,大家可以根据各自需求做删除或保留。

                
                
                    4.0.0
                    gaei.cn.x5l
                    kafka2ch
                    1.0.0
                    
                        2.3.3
                        3.1.1
                        3.0.2
                        2.12.10
                        8
                        8
                        UTF-8
                        1.14.0
                        2.12
                        1.8
                        ${target.java.version}
                        ${target.java.version}
                        2.17.2
                        3.1.2
                        3.1.2
                    
                    
                        
                        
                            org.apache.flink
                            flink-java
                            ${flink.version}
                            provided
                        
                        
                            org.apache.flink
                            flink-streaming-java_${scala.binary.version}
                            ${flink.version}
                            provided
                        
                        
                            org.apache.flink
                            flink-clients_${scala.binary.version}
                            ${flink.version}
                            provided
                        
                        
                        
                        
                            org.apache.flink
                            flink-table-api-java-bridge_${scala.binary.version}
                            1.14.0
                            provided
                        
                        
                        
                            org.apache.flink
                            flink-table-planner_${scala.binary.version}
                            ${flink.version}
                            provided
                        
                        
                            org.apache.flink
                            flink-streaming-scala_${scala.binary.version}
                            ${flink.version}
                            provided
                        
                        
                            org.apache.flink
                            flink-table-common
                            ${flink.version}
                            provided
                        
                        
                            org.apache.flink
                            flink-cep_${scala.binary.version}
                            ${flink.version}
                        
                        
                        
                        
                        
                            org.apache.flink
                            flink-json
                            ${flink.version}
                            provided
                        
                        
                            org.apache.flink
                            flink-csv
                            ${flink.version}
                            provided
                        
                        
                        
                
                
                
                
                
                        
                        
                        
                            org.apache.flink
                            flink-state-processor-api_${scala.binary.version}
                            ${flink.version}
                            provided
                        
                        
                        
                        
                        
                        
                        
                        
                        
                
                
                
                
                
                        
                            commons-lang
                            commons-lang
                            2.5
                            compile
                        
                        
                        
                        
                        
                        
                        
                        
                        
                        
                            org.apache.flink
                            flink-runtime-web_${scala.binary.version}
                            ${flink.version}
                            provided
                        
                        
                        
                        
                            org.apache.logging.log4j
                            log4j-slf4j-impl
                            ${log4j.version}
                            runtime
                        
                        
                            org.apache.logging.log4j
                            log4j-api
                            ${log4j.version}
                            runtime
                        
                        
                            org.apache.logging.log4j
                            log4j-core
                            ${log4j.version}
                            runtime
                        
                        
                        
                            org.apache.hadoop
                            hadoop-client
                            3.3.1
                            
                            
                            
                            
                            
                            
                        
                        
                        
                        
                        
                        
                        
                        
                        
                            org.apache.hadoop
                            hadoop-auth
                            ${hadoop.version}
                        
                        
                        
                            org.apache.flink
                            flink-statebackend-rocksdb_${scala.binary.version}
                            ${flink.version}
                            provided
                        
                        
                        
                            com.alibaba
                            fastjson
                            1.1.23
                        
                        
                            org.projectlombok
                            lombok
                            1.16.18
                            provided
                        
                        
                            gaei.cn.x5l.bigdata.common
                            x5l-bigdata-common
                            1.3-SNAPSHOT
                            
                                
                                    org.apache.logging.log4j
                                    log4j-core
                                
                                
                                    org.apache.logging.log4j
                                    log4j-api
                                
                                
                                    org.apache.logging.log4j
                                    log4j-slf4j-impl
                                
                            
                        
                        
                        
                        
                        
                        
                        
                        
                            org.apache.flink
                            flink-sql-connector-kafka_${scala.binary.version}
                            ${flink.version}
                        
                        
                            org.apache.flink
                            flink-connector-jdbc_${scala.binary.version}
                            ${flink.version}
                        
                        
                            org.apache.flink
                            flink-connector-clickhouse
                            1.14.3-SNAPSHOT
                            
                            
                        
                        
                            gaei.cn.x5l
                            tsp-gb-decode
                            1.0.0
                        
                        
                            org.jyaml
                            jyaml
                            1.3
                        
                        
                            mysql
                            mysql-connector-java
                            5.1.44
                            runtime
                        
                        
                            gaei.cn.x5l.flink.common
                            x5l-flink-common
                            1.4-SNAPSHOT
                        
                    
                    
                        
                            
                            
                                org.apache.maven.plugins
                                maven-compiler-plugin
                                3.1
                                
                                    ${target.java.version}
                                    ${target.java.version}
                                
                            
                            
                            
                            
                                org.apache.maven.plugins
                                maven-shade-plugin
                                3.0.0
                                
                                    
                                    
                                        package
                                        
                                            shade
                                        
                                        
                                            
                                                
                                                    org.apache.flink:force-shading
                                                    com.google.code.findbugs:jsr305
                                                    org.slf4j:*
                                                    org.apache.logging.log4j:*
                                                    org.apache.flink:flink-runtime-web_2.11
                                                
                                            
                                            
                                                
                                                    
                                                    *:*
                                                    
                                                        META-INF/*.SF
                                                        META-INF/*.DSA
                                                        META-INF/*.RSA
                                                    
                                                
                                            
                                            
                                                
                                                    com.owp.flink.kafka.KafkaSourceDemo
                                                
                                                
                                                
                                                
                                                
                                            
                                        
                                    
                                
                            
                        
                        
                            
                                
                                
                                    org.eclipse.m2e
                                    lifecycle-mapping
                                    1.0.0
                                    
                                        
                                            
                                                
                                                    
                                                        org.apache.maven.plugins
                                                        maven-shade-plugin
                                                        [3.0.0,)
                                                        
                                                            shade
                                                        
                                                    
                                                    
                                                        
                                                    
                                                
                                                
                                                    
                                                        org.apache.maven.plugins
                                                        maven-compiler-plugin
                                                        [3.1,)
                                                        
                                                            testCompile
                                                            compile
                                                        
                                                    
                                                    
                                                        
                                                    
                                                
                                            
                                        
                                    
                                
                            
                        
                    
                
                

                2)代码实现

                2.1.resources

                2.1.1.appconfig.yml

                mysql.url: "jdbc:mysql://1.1.1.1:3306/test?useSSL=false&useUnicode=true&characterEncoding=UTF8&connectTimeout=60000&socketTimeout=60000"
                mysql.username: "test"
                mysql.password: "123456"
                mysql.driver: "com.mysql.jdbc.Driver"
                

                2.1.2.log4j.properties

                log4j.rootLogger=info, stdout
                log4j.appender.stdout=org.apache.log4j.ConsoleAppender
                log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
                log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
                

                2.1.3.log4j2.xml

                
                
                    
                        
                        
                    
                    
                        
                            
                            
                        
                        
                            
                        
                    
                    
                        
                            
                            
                        
                    
                
                

                2.1.4.flink_backup_local.yml

                clickhouse:
                  connector: 'clickhouse'
                  database-name: 'dwd'
                  driver: 'ru.yandex.clickhouse.ClickHouseDriver'
                  jdbcurl: 'jdbc:clickhouse://10.1.1.1:8123/dwd?socket_timeout=480000'
                  password: 'X8v@123456!%$'
                  reissueInterval: 3
                  sink.batch-size: '200000'
                  sink.flush-interval: '3000000'
                  sink.ignore-delete: 'true'
                  sink.max-retries: '3'
                  sink.partition-key: 'toYYYYMMDD(sample_date_time)'
                  sink.partition-strategy: 'balanced'
                  table-name: 'test_local'
                  url: 'clickhouse://10.1.1.1:8123,10.1.1.1:8123,10.1.1.1:8123,10.1.1.1:8123,10.1.1.1:8123'
                  username: 'test'
                hdfs:
                  checkPointPath: 'hdfs://nameserver/user/flink/rocksdbcheckpoint'
                  checkpointTimeout: 360000
                  checkpointing: 300000
                  maxConcurrentCheckpoints: 1
                  minPauseBetweenCheckpoints: 10000
                  restartInterval: 60
                  restartStrategy: 3
                kafka-consumer:
                  prop:
                    auto.offset.reset: 'earliest'
                    bootstrap.servers: 'kfk01:9092,kfk02:9092,kfk03:9092'
                    enable.auto.commit: 'false'
                    fetch.max.bytes: '52428700'
                    group.id: 'test'
                    isKerberized: '1'
                    keytab: 'D:/keytab/test.keytab'
                    krb5Conf: 'D:/keytab/krb5.conf'
                    max.poll.interval.ms: '300000'
                    max.poll.records: '1000'
                    principal: 'test@PRE.TEST.COM'
                    security_protocol: 'SASL_PLAINTEXT'
                    serviceName: 'kafka'
                    session.timeout.ms: '600000'
                    useTicketCache: 'false'
                  topics: 'topicA,topicB'
                kafka-producer:
                  defaultTopic: 'kafka2hive_error'
                  prop:
                    acks: 'all'
                    batch.size: '1048576'
                    bootstrap.servers: 'kfk01:9092,kfk02:9092,kfk03:9092'
                    compression.type: 'lz4'
                    key.serializer: 'org.apache.kafka.common.serialization.StringSerializer'
                    retries: '3'
                    value.serializer: 'org.apache.kafka.common.serialization.StringSerializer'
                

                2.2.utils

                2.2.1.DBConn

                import java.sql.*;
                public class DBConn {
                    private static final String driver = "com.mysql.jdbc.Driver";		//mysql驱动
                    private static Connection conn = null;
                    private static PreparedStatement ps = null;
                    private static ResultSet rs = null;
                    private static final CallableStatement cs = null;
                    /**
                     * 连接数据库
                     * @return
                     */
                    public static Connection conn(String url,String username,String password) {
                        Connection conn = null;
                        try {
                            Class.forName(driver);  //加载数据库驱动
                            try {
                                conn = DriverManager.getConnection(url, username, password);  //连接数据库
                            } catch (SQLException e) {
                                e.printStackTrace();
                            }
                        } catch (ClassNotFoundException e) {
                            e.printStackTrace();
                        }
                        return conn;
                    }
                    /**
                     * 关闭数据库链接
                     * @return
                     */
                    public static void close() {
                        if(conn != null) {
                            try {
                                conn.close();  //关闭数据库链接
                            } catch (SQLException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                }
                

                2.2.2.CommonUtils

                @Slf4j
                public class CommonUtils {
                    public static StreamExecutionEnvironment setCheckpoint(StreamExecutionEnvironment env) throws IOException {
                //        ConfigTools.initConf("local");
                        Map hdfsMap = (Map) ConfigTools.mapConf.get("hdfs");
                        env.enableCheckpointing(((Integer) hdfsMap.get("checkpointing")).longValue(), CheckpointingMode.EXACTLY_ONCE);//这里会造成offset提交的延迟
                        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(((Integer) hdfsMap.get("minPauseBetweenCheckpoints")).longValue());
                        env.getCheckpointConfig().setCheckpointTimeout(((Integer) hdfsMap.get("checkpointTimeout")).longValue());
                        env.getCheckpointConfig().setMaxConcurrentCheckpoints((Integer) hdfsMap.get("maxConcurrentCheckpoints"));
                        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
                        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
                                (Integer) hdfsMap.get("restartStrategy"), // 尝试重启的次数,不宜过小,分布式任务很容易出问题(正常情况),建议3-5次
                                Time.of(((Integer) hdfsMap.get("restartInterval")).longValue(), TimeUnit.SECONDS) // 延时
                        ));
                        //设置可容忍的检查点失败数,默认值为0表示不允许容忍任何检查点失败
                        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);
                        //设置状态后端存储方式
                        env.setStateBackend(new RocksDBStateBackend((String) hdfsMap.get("checkPointPath"), true));
                //        env.setStateBackend(new FsStateBackend((String) hdfsMap.get("checkPointPath"), true));
                //        env.setStateBackend(new HashMapStateBackend(());
                        return env;
                    }
                    public static FlinkKafkaConsumer> getKafkaConsumer(Map kafkaConf) throws IOException {
                        String[] topics = ((String) kafkaConf.get("topics")).split(",");
                        log.info("监听的topic: {}", topics);
                        Properties properties = new Properties();
                        Map kafkaProp = (Map) kafkaConf.get("prop");
                        for (String key : kafkaProp.keySet()) {
                            properties.setProperty(key, kafkaProp.get(key).toString());
                        }
                        if (!StringUtils.isBlank((String) kafkaProp.get("isKerberized")) && "1".equals(kafkaProp.get("isKerberized"))) {
                            System.setProperty("java.security.krb5.conf", kafkaProp.get("krb5Conf"));
                            properties.put("security.protocol", kafkaProp.get("security_protocol"));
                            properties.put("sasl.jaas.config", "com.sun.security.auth.module.Krb5LoginModule required "
                                    + "useTicketCache=" + kafkaProp.get("useTicketCache") + " "
                                    + "serviceName=\"" + kafkaProp.get("serviceName") + "\" "
                                    + "useKeyTab=true "
                                    + "keyTab=\"" + kafkaProp.get("keytab").toString() + "\" "
                                    + "principal=\"" + kafkaProp.get("principal").toString() + "\";");
                        }
                        properties.put("key.serializer", "org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer");
                        properties.put("value.serializer", "org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer");
                        FlinkKafkaConsumer> consumerRecordFlinkKafkaConsumer = new FlinkKafkaConsumer>(Arrays.asList(topics), new KafkaDeserializationSchema>() {
                            @Override
                            public TypeInformation> getProducedType() {
                                return TypeInformation.of(new TypeHint>() {
                                });
                            }
                            @Override
                            public boolean isEndOfStream(ConsumerRecord stringStringConsumerRecord) {
                                return false;
                            }
                            @Override
                            public ConsumerRecord deserialize(ConsumerRecord record) throws Exception {
                                return new ConsumerRecord(
                                        record.topic(),
                                        record.partition(),
                                        record.offset(),
                                        record.timestamp(),
                                        record.timestampType(),
                                        record.checksum(),
                                        record.serializedKeySize(),
                                        record.serializedValueSize(),
                                        new String(record.key() == null ? "".getBytes(StandardCharsets.UTF_8) : record.key(), StandardCharsets.UTF_8),
                                        new String(record.value() == null ? "{}".getBytes(StandardCharsets.UTF_8) : record.value(), StandardCharsets.UTF_8));
                            }
                        }, properties);
                        return consumerRecordFlinkKafkaConsumer;
                    }
                }
                

                2.2.3.RemoteConfigUtil

                public class RemoteConfigUtil {
                    private static final Logger log = LoggerFactory.getLogger(RemoteConfigUtil.class);
                    private static Connection conn = null;
                    private static PreparedStatement ps = null;
                    private static ResultSet rs = null;
                    public static Map mapConf;
                    public RemoteConfigUtil() {
                    }
                    public static Map getByAppNameAndConfigName(String appName, String ConfigName) throws SQLException {
                        if (mapConf != null && mapConf.size() > 0) {
                            return mapConf;
                        } else {
                            Map ymlMap = LocalConfigUtil.getYmlMap("/appconfig");
                            String username = (String)ymlMap.get("mysql.username");
                            String password = (String)ymlMap.get("mysql.password");
                            String url = (String)ymlMap.get("mysql.url");
                            String driver = (String)ymlMap.get("mysql.driver");
                            Connection conn = JdbcUtil.getConnection(url, username, password, driver);
                            PreparedStatement preparedStatement = null;
                            Map var14;
                            try {
                                String sql = "select config_context from base_app_config where app_name = '%s' and config_name = '%s'";
                                preparedStatement = conn.prepareStatement(String.format(sql, appName, ConfigName));
                                ResultSet rs = preparedStatement.executeQuery();
                                String config_context;
                                for(config_context = ""; rs.next(); config_context = rs.getString("config_context")) {
                                }
                                rs.close();
                                log.info("配置信息config_context: {}", config_context);
                                if (StringUtils.isNotBlank(config_context)) {
                                    System.out.println(JSONObject.toJSONString(JSONObject.parseObject(config_context), new SerializerFeature[]{SerializerFeature.PrettyFormat}));
                                }
                                mapConf = (Map)JSON.parseObject(config_context, Map.class);
                                var14 = mapConf;
                            } finally {
                                if (preparedStatement != null) {
                                    preparedStatement.close();
                                }
                                if (conn != null) {
                                    conn.close();
                                }
                            }
                            return var14;
                        }
                    }
                }
                

                2.2.4.ClickhouseUtil

                public class ClickhouseUtil {
                    public ClickhouseUtil() {
                    }
                    public static List getSchemaPoList(Map chMapConf) throws SQLException {
                        List schemaPos = new ArrayList();
                        Connection connection = null;
                        try {
                            String jdbcurl = (String) chMapConf.get("jdbcurl");
                            String driver = (String) chMapConf.get("driver");
                            String userName = (String) chMapConf.get("username");
                            String password = (String) chMapConf.get("password");
                            String databaseName = (String) chMapConf.get("database-name");
                            String tableName = (String) chMapConf.get("table-name");
                            connection = JdbcUtil.getConnection(jdbcurl, userName, password, driver);
                            DatabaseMetaData metaData = connection.getMetaData();
                            ResultSet colRet = metaData.getColumns((String) null, databaseName, tableName, "%");
                            System.out.println("表字段信息:");
                            while (colRet.next()) {
                                String columnName = colRet.getString("COLUMN_NAME");
                                String columnType = colRet.getString("TYPE_NAME");
                                schemaPos.add(new SchemaPo(columnName, columnType));
                                System.out.println(columnName + "   " + columnType);
                            }
                        } finally {
                            try {
                                if (connection != null) {
                                    connection.close();
                                }
                            } catch (SQLException var18) {
                                var18.printStackTrace();
                            }
                        }
                        return schemaPos;
                    }
                    public static String getCreateSinkTableSql(Map clickhouse, String sinkTableName, List schemaPos) {
                        StringBuilder sinkTableSql = new StringBuilder();
                        String userName = (String) clickhouse.get("username");
                        String password = (String) clickhouse.get("password");
                        String connector = (String) clickhouse.get("connector");
                        String databaseName = (String) clickhouse.get("database-name");
                        String url = (String) clickhouse.get("url");
                        String tableName = (String) clickhouse.get("table-name");
                        String sinkBatchSize = (String) clickhouse.get("sink.batch-size");
                        String sinkFlushInterval = (String) clickhouse.get("sink.flush-interval");
                        String sinkMaxRetries = (String) clickhouse.get("sink.max-retries");
                        String sinkPartitionStrategy = (String) clickhouse.get("sink.partition-strategy");
                        String sinkPartitionKey = (String) clickhouse.get("sink.partition-key");
                        String sinkIgnoreDelete = (String) clickhouse.get("sink.ignore-delete");
                        sinkTableSql.append(String.format("CREATE TABLE %s (\n", sinkTableName));
                        int i = 0;
                        Iterator var17 = schemaPos.iterator();
                        while (var17.hasNext()) {
                            SchemaPo schemaPo = (SchemaPo) var17.next();
                            ++i;
                            String signal = schemaPo.getSignal();
                            String type = schemaPo.getType();
                            if ("UInt64".equalsIgnoreCase(type)) {
                                type = "BIGINT";
                            } else if ("Map(String,String)".equalsIgnoreCase(type)) {
                                type = "Map";
                            } else if ("Datetime".equalsIgnoreCase(type)) {
                                type = "Timestamp(0)";
                            } else {
                                type = "String";
                            }
                            sinkTableSql.append(String.format("    `%s` %s", signal, type));
                            sinkTableSql.append(i == schemaPos.size() ? ")" : ",\n");
                        }
                        sinkTableSql.append("WITH(\n");
                        sinkTableSql.append(String.format("'connector' = '%s',\n", connector));
                        sinkTableSql.append(String.format("'url' = '%s',\n", url));
                        sinkTableSql.append(String.format("'username' = '%s',\n", userName));
                        sinkTableSql.append(String.format("'password' = '%s',\n", password));
                        sinkTableSql.append(String.format("'url' = '%s',\n", url));
                        sinkTableSql.append(String.format("'database-name' = '%s',\n", databaseName));
                        sinkTableSql.append(String.format("'table-name' = '%s',\n", tableName));
                        sinkTableSql.append(String.format("'sink.batch-size' = '%s',\n", sinkBatchSize));
                        sinkTableSql.append(String.format("'sink.flush-interval' = '%s',\n", sinkFlushInterval));
                        sinkTableSql.append(String.format("'sink.max-retries' = '%s',\n", sinkMaxRetries));
                        sinkTableSql.append(String.format("'sink.partition-strategy' = 'hash',\n"));
                        sinkTableSql.append(String.format("'sink.partition-key' = 'sample_date_time',\n"));
                        sinkTableSql.append(String.format("'sink.ignore-delete' = '%s'\n", sinkIgnoreDelete));
                        sinkTableSql.append(" )");
                        return sinkTableSql.toString();
                    }
                    //转换成ck需要的格式
                    public static Row convertRow(Map resultMap, List schemaPos) {
                        Row row = new Row(schemaPos.size());
                        for (int i = 0; i < schemaPos.size(); i++) {
                            SchemaPo schemaPo = schemaPos.get(i);
                            String valueStr = resultMap.get(schemaPo.getSignal());
                            if (StringUtils.isBlank(valueStr)) {
                                row.setField(i, null);
                                continue;
                            }
                            if ("UInt64".equalsIgnoreCase(schemaPo.getType())) {
                                Long svalue = Long.valueOf(valueStr);
                                row.setField(i, Math.abs(svalue));
                            } else if ("Int64".equalsIgnoreCase(schemaPo.getType())) {
                                Long svalue = Long.valueOf(valueStr);
                                row.setField(i, Math.abs(svalue));
                            } else if ("Int32".equalsIgnoreCase(schemaPo.getType())) {
                                Integer svalue = Integer.valueOf(valueStr);
                                row.setField(i, svalue);
                            } else if ("datetime".equalsIgnoreCase(schemaPo.getType())) {
                                try {
                                    Date svalue = (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")).parse(valueStr);
                                    Timestamp timestamp = new Timestamp(svalue.getTime());
                                    row.setField(i, timestamp);
                                } catch (Exception ex) {
                                    System.out.println(ex.getMessage());
                                    System.out.println(Arrays.toString(ex.getStackTrace()));
                                }
                            } else {
                                row.setField(i, valueStr);
                            }
                        }
                        return row;
                    }
                }
                

                2.3.flatmap

                2.3.1.FlatMapFunction

                public interface FlatMapFunction {
                    public FlatMapFunction, Row> newInstance(List schemaPos);
                }
                

                2.4.sink

                2.4.1.ClickHouseCatalog

                public class ClickHouseCatalog extends AbstractCatalog {
                    private static final Logger LOG = LoggerFactory.getLogger(ClickHouseCatalog.class);
                    public static final String DEFAULT_DATABASE = "default";
                    private final String baseUrl;
                    private final String username;
                    private final String password;
                    private final boolean ignorePrimaryKey;
                    private final Map properties;
                    private ClickHouseConnection connection;
                    public ClickHouseCatalog(String catalogName, Map properties) {
                        this(catalogName, (String)properties.get("database-name"), (String)properties.get("url"), (String)properties.get("username"), (String)properties.get("password"), properties);
                    }
                    public ClickHouseCatalog(String catalogName, @Nullable String defaultDatabase, String baseUrl, String username, String password) {
                        this(catalogName, defaultDatabase, baseUrl, username, password, Collections.emptyMap());
                    }
                    public ClickHouseCatalog(String catalogName, @Nullable String defaultDatabase, String baseUrl, String username, String password, Map properties) {
                        super(catalogName, defaultDatabase == null ? "default" : defaultDatabase);
                        Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(baseUrl), "baseUrl cannot be null or empty");
                        Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(username), "username cannot be null or empty");
                        Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(password), "password cannot be null or empty");
                        this.baseUrl = baseUrl.endsWith("/") ? baseUrl : baseUrl + "/";
                        this.username = username;
                        this.password = password;
                        this.ignorePrimaryKey = properties.get("catalog.ignore-primary-key") == null || Boolean.parseBoolean((String)properties.get("catalog.ignore-primary-key"));
                        this.properties = Collections.unmodifiableMap(properties);
                    }
                    public void open() throws CatalogException {
                        try {
                            Properties configuration = new Properties();
                            configuration.putAll(this.properties);
                            configuration.setProperty(ClickHouseQueryParam.USER.getKey(), this.username);
                            configuration.setProperty(ClickHouseQueryParam.PASSWORD.getKey(), this.password);
                            configuration.setProperty("socket_timeout", "600000");
                            String jdbcUrl = ClickHouseUtil.getJdbcUrl(this.baseUrl, this.getDefaultDatabase());
                            BalancedClickhouseDataSource dataSource = new BalancedClickhouseDataSource(jdbcUrl, configuration);
                            dataSource.actualize();
                            this.connection = dataSource.getConnection();
                            LOG.info("Created catalog {}, established connection to {}", this.getName(), jdbcUrl);
                        } catch (Exception var4) {
                            throw new CatalogException(String.format("Opening catalog %s failed.", this.getName()), var4);
                        }
                    }
                    public synchronized void close() throws CatalogException {
                        try {
                            this.connection.close();
                            LOG.info("Closed catalog {} ", this.getName());
                        } catch (Exception var2) {
                            throw new CatalogException(String.format("Closing catalog %s failed.", this.getName()), var2);
                        }
                    }
                    public Optional getFactory() {
                        return Optional.of(new ClickHouseDynamicTableFactory());
                    }
                    public synchronized List listDatabases() throws CatalogException {
                        try {
                            PreparedStatement stmt = this.connection.prepareStatement("SELECT name from `system`.databases");
                            Throwable var2 = null;
                            try {
                                ResultSet rs = stmt.executeQuery();
                                Throwable var4 = null;
                                try {
                                    List databases = new ArrayList();
                                    while(rs.next()) {
                                        databases.add(rs.getString(1));
                                    }
                                    return databases;
                                } catch (Throwable var31) {
                                    var4 = var31;
                                    throw var31;
                                } finally {
                                    if (rs != null) {
                                        if (var4 != null) {
                                            try {
                                                rs.close();
                                            } catch (Throwable var30) {
                                                var4.addSuppressed(var30);
                                            }
                                        } else {
                                            rs.close();
                                        }
                                    }
                                }
                            } catch (Throwable var33) {
                                var2 = var33;
                                throw var33;
                            } finally {
                                if (stmt != null) {
                                    if (var2 != null) {
                                        try {
                                            stmt.close();
                                        } catch (Throwable var29) {
                                            var2.addSuppressed(var29);
                                        }
                                    } else {
                                        stmt.close();
                                    }
                                }
                            }
                        } catch (Exception var35) {
                            throw new CatalogException(String.format("Failed listing database in catalog %s", this.getName()), var35);
                        }
                    }
                    public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException, CatalogException {
                        if (this.listDatabases().contains(databaseName)) {
                            return new CatalogDatabaseImpl(Collections.emptyMap(), (String)null);
                        } else {
                            throw new DatabaseNotExistException(this.getName(), databaseName);
                        }
                    }
                    public boolean databaseExists(String databaseName) throws CatalogException {
                        Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName));
                        return this.listDatabases().contains(databaseName);
                    }
                    public void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists) throws DatabaseAlreadyExistException, CatalogException {
                        throw new UnsupportedOperationException();
                    }
                    public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade) throws DatabaseNotEmptyException, CatalogException {
                        throw new UnsupportedOperationException();
                    }
                    public void alterDatabase(String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists) throws DatabaseNotExistException, CatalogException {
                        throw new UnsupportedOperationException();
                    }
                    public synchronized List listTables(String databaseName) throws DatabaseNotExistException, CatalogException {
                        if (!this.databaseExists(databaseName)) {
                            throw new DatabaseNotExistException(this.getName(), databaseName);
                        } else {
                            try {
                                PreparedStatement stmt = this.connection.prepareStatement(String.format("SELECT name from `system`.tables where database = '%s'", databaseName));
                                Throwable var3 = null;
                                try {
                                    ResultSet rs = stmt.executeQuery();
                                    Throwable var5 = null;
                                    try {
                                        List tables = new ArrayList();
                                        while(rs.next()) {
                                            tables.add(rs.getString(1));
                                        }
                                        return tables;
                                    } catch (Throwable var32) {
                                        var5 = var32;
                                        throw var32;
                                    } finally {
                                        if (rs != null) {
                                            if (var5 != null) {
                                                try {
                                                    rs.close();
                                                } catch (Throwable var31) {
                                                    var5.addSuppressed(var31);
                                                }
                                            } else {
                                                rs.close();
                                            }
                                        }
                                    }
                                } catch (Throwable var34) {
                                    var3 = var34;
                                    throw var34;
                                } finally {
                                    if (stmt != null) {
                                        if (var3 != null) {
                                            try {
                                                stmt.close();
                                            } catch (Throwable var30) {
                                                var3.addSuppressed(var30);
                                            }
                                        } else {
                                            stmt.close();
                                        }
                                    }
                                }
                            } catch (Exception var36) {
                                throw new CatalogException(String.format("Failed listing tables in catalog %s database %s", this.getName(), databaseName), var36);
                            }
                        }
                    }
                    public List listViews(String databaseName) throws DatabaseNotExistException, CatalogException {
                        throw new UnsupportedOperationException();
                    }
                    public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException {
                        if (!this.tableExists(tablePath)) {
                            throw new TableNotExistException(this.getName(), tablePath);
                        } else {
                            Map configuration = new HashMap(this.properties);
                            configuration.put("url", this.baseUrl);
                            configuration.put("database-name", tablePath.getDatabaseName());
                            configuration.put("table-name", tablePath.getObjectName());
                            configuration.put("username", this.username);
                            configuration.put("password", this.password);
                            String databaseName = tablePath.getDatabaseName();
                            String tableName = tablePath.getObjectName();
                            try {
                                DistributedEngineFullSchema engineFullSchema = ClickHouseUtil.getAndParseDistributedEngineSchema(this.connection, tablePath.getDatabaseName(), tablePath.getObjectName());
                                if (engineFullSchema != null) {
                                    databaseName = engineFullSchema.getDatabase();
                                    tableName = engineFullSchema.getTable();
                                }
                            } catch (Exception var6) {
                                throw new CatalogException(String.format("Failed getting engine full of %s.%s.%s", this.getName(), databaseName, tableName), var6);
                            }
                            return new CatalogTableImpl(this.createTableSchema(databaseName, tableName), this.getPartitionKeys(databaseName, tableName), configuration, "");
                        }
                    }
                    private synchronized TableSchema createTableSchema(String databaseName, String tableName) {
                        try {
                            PreparedStatement stmt = this.connection.prepareStatement(String.format("SELECT * from `%s`.`%s` limit 0", databaseName, tableName));
                            Throwable var4 = null;
                            TableSchema var24;
                            try {
                                ClickHouseResultSetMetaData metaData = (ClickHouseResultSetMetaData)stmt.getMetaData().unwrap(ClickHouseResultSetMetaData.class);
                                Method getColMethod = metaData.getClass().getDeclaredMethod("getCol", Integer.TYPE);
                                getColMethod.setAccessible(true);
                                List primaryKeys = this.getPrimaryKeys(databaseName, tableName);
                                TableSchema.Builder builder = TableSchema.builder();
                                for(int idx = 1; idx <= metaData.getColumnCount(); ++idx) {
                                    ClickHouseColumnInfo columnInfo = (ClickHouseColumnInfo)getColMethod.invoke(metaData, idx);
                                    String columnName = columnInfo.getColumnName();
                                    DataType columnType = ClickHouseTypeUtil.toFlinkType(columnInfo);
                                    if (primaryKeys.contains(columnName)) {
                                        columnType = (DataType)columnType.notNull();
                                    }
                                    builder.field(columnName, columnType);
                                }
                                if (!primaryKeys.isEmpty()) {
                                    builder.primaryKey((String[])primaryKeys.toArray(new String[0]));
                                }
                                var24 = builder.build();
                            } catch (Throwable var21) {
                                var4 = var21;
                                throw var21;
                            } finally {
                                if (stmt != null) {
                                    if (var4 != null) {
                                        try {
                                            stmt.close();
                                        } catch (Throwable var20) {
                                            var4.addSuppressed(var20);
                                        }
                                    } else {
                                        stmt.close();
                                    }
                                }
                            }
                            return var24;
                        } catch (Exception var23) {
                            throw new CatalogException(String.format("Failed getting columns in catalog %s database %s table %s", this.getName(), databaseName, tableName), var23);
                        }
                    }
                    private List getPrimaryKeys(String databaseName, String tableName) {
                        if (this.ignorePrimaryKey) {
                            return Collections.emptyList();
                        } else {
                            try {
                                PreparedStatement stmt = this.connection.prepareStatement(String.format("SELECT name from `system`.columns where `database` = '%s' and `table` = '%s' and is_in_primary_key = 1", databaseName, tableName));
                                Throwable var4 = null;
                                try {
                                    ResultSet rs = stmt.executeQuery();
                                    Throwable var6 = null;
                                    try {
                                        List primaryKeys = new ArrayList();
                                        while(rs.next()) {
                                            primaryKeys.add(rs.getString(1));
                                        }
                                        return primaryKeys;
                                    } catch (Throwable var33) {
                                        var6 = var33;
                                        throw var33;
                                    } finally {
                                        if (rs != null) {
                                            if (var6 != null) {
                                                try {
                                                    rs.close();
                                                } catch (Throwable var32) {
                                                    var6.addSuppressed(var32);
                                                }
                                            } else {
                                                rs.close();
                                            }
                                        }
                                    }
                                } catch (Throwable var35) {
                                    var4 = var35;
                                    throw var35;
                                } finally {
                                    if (stmt != null) {
                                        if (var4 != null) {
                                            try {
                                                stmt.close();
                                            } catch (Throwable var31) {
                                                var4.addSuppressed(var31);
                                            }
                                        } else {
                                            stmt.close();
                                        }
                                    }
                                }
                            } catch (Exception var37) {
                                throw new CatalogException(String.format("Failed getting primary keys in catalog %s database %s table %s", this.getName(), databaseName, tableName), var37);
                            }
                        }
                    }
                    private List getPartitionKeys(String databaseName, String tableName) {
                        try {
                            PreparedStatement stmt = this.connection.prepareStatement(String.format("SELECT name from `system`.columns where `database` = '%s' and `table` = '%s' and is_in_partition_key = 1", databaseName, tableName));
                            Throwable var4 = null;
                            try {
                                ResultSet rs = stmt.executeQuery();
                                Throwable var6 = null;
                                try {
                                    List partitionKeys = new ArrayList();
                                    while(rs.next()) {
                                        partitionKeys.add(rs.getString(1));
                                    }
                                    return partitionKeys;
                                } catch (Throwable var33) {
                                    var6 = var33;
                                    throw var33;
                                } finally {
                                    if (rs != null) {
                                        if (var6 != null) {
                                            try {
                                                rs.close();
                                            } catch (Throwable var32) {
                                                var6.addSuppressed(var32);
                                            }
                                        } else {
                                            rs.close();
                                        }
                                    }
                                }
                            } catch (Throwable var35) {
                                var4 = var35;
                                throw var35;
                            } finally {
                                if (stmt != null) {
                                    if (var4 != null) {
                                        try {
                                            stmt.close();
                                        } catch (Throwable var31) {
                                            var4.addSuppressed(var31);
                                        }
                                    } else {
                                        stmt.close();
                                    }
                                }
                            }
                        } catch (Exception var37) {
                            throw new CatalogException(String.format("Failed getting partition keys of %s.%s.%s", this.getName(), databaseName, tableName), var37);
                        }
                    }
                    public boolean tableExists(ObjectPath tablePath) throws CatalogException {
                        try {
                            return this.databaseExists(tablePath.getDatabaseName()) && this.listTables(tablePath.getDatabaseName()).contains(tablePath.getObjectName());
                        } catch (DatabaseNotExistException var3) {
                            return false;
                        }
                    }
                    public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException {
                        throw new UnsupportedOperationException();
                    }
                    public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists) throws TableNotExistException, TableAlreadyExistException, CatalogException {
                        throw new UnsupportedOperationException();
                    }
                    public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
                        throw new UnsupportedOperationException();
                    }
                    public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException {
                        throw new UnsupportedOperationException();
                    }
                    public List listPartitions(ObjectPath tablePath) throws TableNotExistException, TableNotPartitionedException, CatalogException {
                        return Collections.emptyList();
                    }
                    public List listPartitions(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, CatalogException {
                        return Collections.emptyList();
                    }
                    public List listPartitionsByFilter(ObjectPath tablePath, List filters) throws TableNotExistException, TableNotPartitionedException, CatalogException {
                        return Collections.emptyList();
                    }
                    public CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws PartitionNotExistException, CatalogException {
                        throw new PartitionNotExistException(this.getName(), tablePath, partitionSpec);
                    }
                    public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws CatalogException {
                        throw new UnsupportedOperationException();
                    }
                    public void createPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition partition, boolean ignoreIfExists) throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, PartitionAlreadyExistsException, CatalogException {
                        throw new UnsupportedOperationException();
                    }
                    public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {
                        throw new UnsupportedOperationException();
                    }
                    public void alterPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition newPartition, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {
                        throw new UnsupportedOperationException();
                    }
                    public List listFunctions(String dbName) throws DatabaseNotExistException, CatalogException {
                        return Collections.emptyList();
                    }
                    public CatalogFunction getFunction(ObjectPath functionPath) throws FunctionNotExistException, CatalogException {
                        throw new FunctionNotExistException(this.getName(), functionPath);
                    }
                    public boolean functionExists(ObjectPath functionPath) throws CatalogException {
                        return false;
                    }
                    public void createFunction(ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists) throws FunctionAlreadyExistException, DatabaseNotExistException, CatalogException {
                        throw new UnsupportedOperationException();
                    }
                    public void alterFunction(ObjectPath functionPath, CatalogFunction newFunction, boolean ignoreIfNotExists) throws FunctionNotExistException, CatalogException {
                        throw new UnsupportedOperationException();
                    }
                    public void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists) throws FunctionNotExistException, CatalogException {
                        throw new UnsupportedOperationException();
                    }
                    public CatalogTableStatistics getTableStatistics(ObjectPath tablePath) throws TableNotExistException, CatalogException {
                        return CatalogTableStatistics.UNKNOWN;
                    }
                    public CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath) throws TableNotExistException, CatalogException {
                        return CatalogColumnStatistics.UNKNOWN;
                    }
                    public CatalogTableStatistics getPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws PartitionNotExistException, CatalogException {
                        return CatalogTableStatistics.UNKNOWN;
                    }
                    public CatalogColumnStatistics getPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws PartitionNotExistException, CatalogException {
                        return CatalogColumnStatistics.UNKNOWN;
                    }
                    public void alterTableStatistics(ObjectPath tablePath, CatalogTableStatistics tableStatistics, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException {
                        throw new UnsupportedOperationException();
                    }
                    public void alterTableColumnStatistics(ObjectPath tablePath, CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException, TablePartitionedException {
                        throw new UnsupportedOperationException();
                    }
                    public void alterPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogTableStatistics partitionStatistics, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {
                        throw new UnsupportedOperationException();
                    }
                    public void alterPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {
                        throw new UnsupportedOperationException();
                    }
                }
                

                2.5.Kafka2ClickHouse

                2.5.1.Kafka2chApp

                public class Kafka2chApp {
                    private static final Logger log = LoggerFactory.getLogger(Kafka2chApp.class);
                    private static String SINK_TABLE = "sinkTable";
                    private static String KAFKA_TEMP_VIEW = "kafkaTempView";
                    /**
                     * @param appName            mysql配置表对应字段
                     * @param configName         mysql配置表对应字段
                     * @throws Exception
                     */
                    public static void run(String appName, String configName, FlatMapFunction FlatMapFunction) throws Exception {
                        log.info("Kafka2chApp.run传参appName:{}, configName:{}", appName, configName);
                        // 获得数据库中的配置
                        Map mapConf = RemoteConfigUtil.getByAppNameAndConfigName(appName, configName);
                        if (mapConf == null || mapConf.size() == 0) return;
                        Map clickhouseConf = (Map) mapConf.get("clickhouse");
                        Map kafkaConsumerConf = (Map) mapConf.get("kafka-consumer");
                        Map hdfsConf = (Map) mapConf.get("hdfs");
                        // long beforeTime2Dropout = System.currentTimeMillis() - (Long) mapConf.get("before2DropoutHourStep") * 3600;
                        // long after2DropoutTime = System.currentTimeMillis();
                        // 初始化TableEnv & 获得流
                        StreamExecutionEnvironment streamEnv = StreamEnv.getStreamEnv(hdfsConf);
                        
                        streamEnv.setParallelism(ckP);
                        
                        StreamTableEnvironment tableEnv = TableEnv.getTableEnv();
                        // 处理
                        List schemaPos = ClickhouseUtil.getSchemaPoList(clickhouseConf);
                        TypeInformation[] types = getTypeInformationArray(schemaPos);
                        // TypeInformation[] types = (schemaPos);
                        String[] fieldNames = SchemaPoUtil.getFieldLists(schemaPos);
                        FlatMapFunction, Row> flatMapFunction = x5lFlatMapFunction.newInstance(schemaPos);
                        DataStreamSource> stream;
                        SingleOutputStreamOperator infos;
                        stream = streamEnv.addSource(CommonUtils.getKafkaConsumer(kafkaConsumerConf));
                        System.out.println("Source 设置并行度为"+streamEnv.getParallelism());
                        }
                        infos = stream.flatMap(flatMapFunction);
                        infos = infos.map(e -> e,new RowTypeInfo(types, fieldNames));
                        System.out.println("map 设置并行度为"+streamEnv.getParallelism());
                        }
                        // 创建kafka数据临时视图
                        tableEnv.createTemporaryView(KAFKA_TEMP_VIEW, infos);
                        // 创建存放kafka数据的clickhouse映射表
                        // String createSinkTableSql = ClickhouseUtil.getCreateSinkTableSql(clickhouseConf, SINK_TABLE, schemaPos);
                        Map props = new HashMap<>();
                        props.put(ClickHouseConfig.DATABASE_NAME, (String) clickhouseConf.get("database-name"));
                        props.put(ClickHouseConfig.URL, (String) clickhouseConf.get("url"));
                        props.put(ClickHouseConfig.USERNAME, (String) clickhouseConf.get("username"));
                        props.put(ClickHouseConfig.PASSWORD, (String) clickhouseConf.get("password"));
                        props.put(ClickHouseConfig.SINK_FLUSH_INTERVAL, (String) clickhouseConf.get("sink.flush-interval"));
                        props.put(ClickHouseConfig.SINK_BATCH_SIZE, (String) clickhouseConf.get("sink.batch-size"));
                        Catalog cHcatalog = new ClickHouseCatalog("clickhouse", props);
                        tableEnv.registerCatalog("clickhouse", cHcatalog);
                        tableEnv.useCatalog("clickhouse");
                        // Arrays.stream(tableEnv.listCatalogs()).forEach(e -> System.out.println("catalog: " + e));
                        // Arrays.stream(tableEnv.listDatabases()).forEach(e -> System.out.println("database: " + e));
                        // System.out.println(tableEnv.listTables().length);
                        // Arrays.stream(tableEnv.listTables()).forEach(e -> System.out.println("table: " + e));
                        // tableEnv.executeSql(createSinkTableSql);
                        // System.out.println(tableEnv.executeSql("select * from " + KAFKA_TEMP_VIEW).getTableSchema());
                        //拼接sql
                        String insertSql = "insert into `" + clickhouseConf.get("table-name") + "` select * from default_catalog.default_database." + KAFKA_TEMP_VIEW;
                        // System.out.println("insertSql: " + insertSql);
                        // log.info("insertSql: ", insertSql);
                        //执行sql
                        tableEnv.executeSql(insertSql);
                        // 测试打印infos结果
                        /*infos.print();
                        streamEnv.executeAsync();*/
                    }
                    
                    public static TypeInformation[] getTypeInformationArray(List schemaPos) {
                        // String[] fieldNames = new String[columnTypeMap.size()];
                        TypeInformation[] types = new TypeInformation[schemaPos.size()];
                        int i = 0;
                        for (SchemaPo po : schemaPos) {
                            if ("String".equalsIgnoreCase(po.getType())) {
                                types[i] = Types.STRING;
                            } else if ("Int64".equalsIgnoreCase(po.getType())) {
                                types[i] = Types.LONG;
                            } else if ("UInt64".equalsIgnoreCase(po.getType())) {
                                types[i] = Types.LONG;
                            } else if ("Int32".equalsIgnoreCase(po.getType())) {
                                types[i] = Types.INT;
                            } else if ("Int8".equalsIgnoreCase(po.getType())) {
                                types[i] = Types.INT;
                            } else if ("datetime".equalsIgnoreCase(po.getType())) {
                                types[i] = Types.SQL_TIMESTAMP;
                            } else if ("Map(String,String)".equalsIgnoreCase(po.getType())) {
                                types[i] = Types.MAP(Types.STRING, Types.STRING);
                            } else {
                                types[i] = Types.STRING;
                            }
                            i++;
                        }
                        return types;
                    }
                }
                

                2.5.2.Kafka2Ck-ODS

                public class Kafka2Ck-ODS implements FlatMapFunction {
                    private static Logger logger = Logger.getLogger(Kafka2Ck-ODS.class);
                    public static void main(String[] args) throws Exception {
                        Kafka2chApp.run(Kafka2Ck-ODS.class.getName(), args[0], new Kafka2Ck-ODS());
                    }
                    @Override
                    public FlatMapFunction, Row> newInstance(List schemaPos) {
                        return new FlatMapFunction, Row>() {
                            @Override
                            public void flatMap(ConsumerRecord record, Collector out) throws Exception {
                                // System.out.println("record ===> " +record); // 测试
                                String value = record.value();
                                try {
                                    HashMap infoMap = JSON.parseObject(value, HashMap.class);
                                    // 处理dataListMap中的数据
                                    for (Map.Entry entry : dataListMap.entrySet()) {
                                        String key = entry.getKey();
                                        String value1 = entry.getValue();
                                        resultMap.put(key.toLowerCase(), value1);
                                    }
                                    Row row = TableEnv.getRowBySchemaPo1(resultMap, schemaPos);
                                    out.collect(row);
                                } catch (Exception e) {
                                    e.printStackTrace();
                                    System.out.printf("数据异常,原因是%s,topic为%s,key为%s,value为%s%n", e.getMessage(), record.topic(), record.key(), record.value());
                                }
                            }
                        };
                    }
                }