需求描述:
1、数据从 Kafka 写入 ClickHouse。
2、相关配置存放于 Mysql 中,通过 Mysql 进行动态读取。
3、此案例中的 Kafka 是进行了 Kerberos 安全认证的,如果不需要自行修改。
4、先在 ClickHouse 中创建表然后动态获取 ClickHouse 的表结构。
5、Kafka 数据为 Json 格式,通过 FlatMap 扁平化处理后,根据表结构封装到 Row 中后完成写入。
6、写入时转换成临时视图模式,利用 Flink-Sql 实现数据写入。
7、本地测试时可以编辑 resources.flink_backup_local.yml 通过 ConfigTools.initConf 方法获取配置。
这里的依赖比较冗余,大家可以根据各自需求做删除或保留。
4.0.0 gaei.cn.x5l kafka2ch 1.0.0 2.3.3 3.1.1 3.0.2 2.12.10 8 8 UTF-8 1.14.0 2.12 1.8 ${target.java.version} ${target.java.version} 2.17.2 3.1.2 3.1.2 org.apache.flink flink-java ${flink.version} provided org.apache.flink flink-streaming-java_${scala.binary.version} ${flink.version} provided org.apache.flink flink-clients_${scala.binary.version} ${flink.version} provided org.apache.flink flink-table-api-java-bridge_${scala.binary.version} 1.14.0 provided org.apache.flink flink-table-planner_${scala.binary.version} ${flink.version} provided org.apache.flink flink-streaming-scala_${scala.binary.version} ${flink.version} provided org.apache.flink flink-table-common ${flink.version} provided org.apache.flink flink-cep_${scala.binary.version} ${flink.version} org.apache.flink flink-json ${flink.version} provided org.apache.flink flink-csv ${flink.version} provided org.apache.flink flink-state-processor-api_${scala.binary.version} ${flink.version} provided commons-lang commons-lang 2.5 compile org.apache.flink flink-runtime-web_${scala.binary.version} ${flink.version} provided org.apache.logging.log4j log4j-slf4j-impl ${log4j.version} runtime org.apache.logging.log4j log4j-api ${log4j.version} runtime org.apache.logging.log4j log4j-core ${log4j.version} runtime org.apache.hadoop hadoop-client 3.3.1 org.apache.hadoop hadoop-auth ${hadoop.version} org.apache.flink flink-statebackend-rocksdb_${scala.binary.version} ${flink.version} provided com.alibaba fastjson 1.1.23 org.projectlombok lombok 1.16.18 provided gaei.cn.x5l.bigdata.common x5l-bigdata-common 1.3-SNAPSHOT org.apache.logging.log4j log4j-core org.apache.logging.log4j log4j-api org.apache.logging.log4j log4j-slf4j-impl org.apache.flink flink-sql-connector-kafka_${scala.binary.version} ${flink.version} org.apache.flink flink-connector-jdbc_${scala.binary.version} ${flink.version} org.apache.flink flink-connector-clickhouse 1.14.3-SNAPSHOT gaei.cn.x5l tsp-gb-decode 1.0.0 org.jyaml jyaml 1.3 mysql mysql-connector-java 5.1.44 runtime gaei.cn.x5l.flink.common x5l-flink-common 1.4-SNAPSHOT org.apache.maven.plugins maven-compiler-plugin 3.1 ${target.java.version} ${target.java.version} org.apache.maven.plugins maven-shade-plugin 3.0.0 package shade org.apache.flink:force-shading com.google.code.findbugs:jsr305 org.slf4j:* org.apache.logging.log4j:* org.apache.flink:flink-runtime-web_2.11 *:* META-INF/*.SF META-INF/*.DSA META-INF/*.RSA com.owp.flink.kafka.KafkaSourceDemo org.eclipse.m2e lifecycle-mapping 1.0.0 org.apache.maven.plugins maven-shade-plugin [3.0.0,) shade org.apache.maven.plugins maven-compiler-plugin [3.1,) testCompile compile
mysql.url: "jdbc:mysql://1.1.1.1:3306/test?useSSL=false&useUnicode=true&characterEncoding=UTF8&connectTimeout=60000&socketTimeout=60000" mysql.username: "test" mysql.password: "123456" mysql.driver: "com.mysql.jdbc.Driver"
log4j.rootLogger=info, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
clickhouse: connector: 'clickhouse' database-name: 'dwd' driver: 'ru.yandex.clickhouse.ClickHouseDriver' jdbcurl: 'jdbc:clickhouse://10.1.1.1:8123/dwd?socket_timeout=480000' password: 'X8v@123456!%$' reissueInterval: 3 sink.batch-size: '200000' sink.flush-interval: '3000000' sink.ignore-delete: 'true' sink.max-retries: '3' sink.partition-key: 'toYYYYMMDD(sample_date_time)' sink.partition-strategy: 'balanced' table-name: 'test_local' url: 'clickhouse://10.1.1.1:8123,10.1.1.1:8123,10.1.1.1:8123,10.1.1.1:8123,10.1.1.1:8123' username: 'test' hdfs: checkPointPath: 'hdfs://nameserver/user/flink/rocksdbcheckpoint' checkpointTimeout: 360000 checkpointing: 300000 maxConcurrentCheckpoints: 1 minPauseBetweenCheckpoints: 10000 restartInterval: 60 restartStrategy: 3 kafka-consumer: prop: auto.offset.reset: 'earliest' bootstrap.servers: 'kfk01:9092,kfk02:9092,kfk03:9092' enable.auto.commit: 'false' fetch.max.bytes: '52428700' group.id: 'test' isKerberized: '1' keytab: 'D:/keytab/test.keytab' krb5Conf: 'D:/keytab/krb5.conf' max.poll.interval.ms: '300000' max.poll.records: '1000' principal: 'test@PRE.TEST.COM' security_protocol: 'SASL_PLAINTEXT' serviceName: 'kafka' session.timeout.ms: '600000' useTicketCache: 'false' topics: 'topicA,topicB' kafka-producer: defaultTopic: 'kafka2hive_error' prop: acks: 'all' batch.size: '1048576' bootstrap.servers: 'kfk01:9092,kfk02:9092,kfk03:9092' compression.type: 'lz4' key.serializer: 'org.apache.kafka.common.serialization.StringSerializer' retries: '3' value.serializer: 'org.apache.kafka.common.serialization.StringSerializer'
import java.sql.*; public class DBConn { private static final String driver = "com.mysql.jdbc.Driver"; //mysql驱动 private static Connection conn = null; private static PreparedStatement ps = null; private static ResultSet rs = null; private static final CallableStatement cs = null; /** * 连接数据库 * @return */ public static Connection conn(String url,String username,String password) { Connection conn = null; try { Class.forName(driver); //加载数据库驱动 try { conn = DriverManager.getConnection(url, username, password); //连接数据库 } catch (SQLException e) { e.printStackTrace(); } } catch (ClassNotFoundException e) { e.printStackTrace(); } return conn; } /** * 关闭数据库链接 * @return */ public static void close() { if(conn != null) { try { conn.close(); //关闭数据库链接 } catch (SQLException e) { e.printStackTrace(); } } } }
@Slf4j public class CommonUtils { public static StreamExecutionEnvironment setCheckpoint(StreamExecutionEnvironment env) throws IOException { // ConfigTools.initConf("local"); Map hdfsMap = (Map) ConfigTools.mapConf.get("hdfs"); env.enableCheckpointing(((Integer) hdfsMap.get("checkpointing")).longValue(), CheckpointingMode.EXACTLY_ONCE);//这里会造成offset提交的延迟 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(((Integer) hdfsMap.get("minPauseBetweenCheckpoints")).longValue()); env.getCheckpointConfig().setCheckpointTimeout(((Integer) hdfsMap.get("checkpointTimeout")).longValue()); env.getCheckpointConfig().setMaxConcurrentCheckpoints((Integer) hdfsMap.get("maxConcurrentCheckpoints")); env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); env.setRestartStrategy(RestartStrategies.fixedDelayRestart( (Integer) hdfsMap.get("restartStrategy"), // 尝试重启的次数,不宜过小,分布式任务很容易出问题(正常情况),建议3-5次 Time.of(((Integer) hdfsMap.get("restartInterval")).longValue(), TimeUnit.SECONDS) // 延时 )); //设置可容忍的检查点失败数,默认值为0表示不允许容忍任何检查点失败 env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2); //设置状态后端存储方式 env.setStateBackend(new RocksDBStateBackend((String) hdfsMap.get("checkPointPath"), true)); // env.setStateBackend(new FsStateBackend((String) hdfsMap.get("checkPointPath"), true)); // env.setStateBackend(new HashMapStateBackend(()); return env; } public static FlinkKafkaConsumer> getKafkaConsumer(Map kafkaConf) throws IOException { String[] topics = ((String) kafkaConf.get("topics")).split(","); log.info("监听的topic: {}", topics); Properties properties = new Properties(); Map kafkaProp = (Map ) kafkaConf.get("prop"); for (String key : kafkaProp.keySet()) { properties.setProperty(key, kafkaProp.get(key).toString()); } if (!StringUtils.isBlank((String) kafkaProp.get("isKerberized")) && "1".equals(kafkaProp.get("isKerberized"))) { System.setProperty("java.security.krb5.conf", kafkaProp.get("krb5Conf")); properties.put("security.protocol", kafkaProp.get("security_protocol")); properties.put("sasl.jaas.config", "com.sun.security.auth.module.Krb5LoginModule required " + "useTicketCache=" + kafkaProp.get("useTicketCache") + " " + "serviceName=\"" + kafkaProp.get("serviceName") + "\" " + "useKeyTab=true " + "keyTab=\"" + kafkaProp.get("keytab").toString() + "\" " + "principal=\"" + kafkaProp.get("principal").toString() + "\";"); } properties.put("key.serializer", "org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer"); properties.put("value.serializer", "org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer"); FlinkKafkaConsumer > consumerRecordFlinkKafkaConsumer = new FlinkKafkaConsumer >(Arrays.asList(topics), new KafkaDeserializationSchema >() { @Override public TypeInformation > getProducedType() { return TypeInformation.of(new TypeHint >() { }); } @Override public boolean isEndOfStream(ConsumerRecord stringStringConsumerRecord) { return false; } @Override public ConsumerRecord deserialize(ConsumerRecord record) throws Exception { return new ConsumerRecord ( record.topic(), record.partition(), record.offset(), record.timestamp(), record.timestampType(), record.checksum(), record.serializedKeySize(), record.serializedValueSize(), new String(record.key() == null ? "".getBytes(StandardCharsets.UTF_8) : record.key(), StandardCharsets.UTF_8), new String(record.value() == null ? "{}".getBytes(StandardCharsets.UTF_8) : record.value(), StandardCharsets.UTF_8)); } }, properties); return consumerRecordFlinkKafkaConsumer; } }
public class RemoteConfigUtil { private static final Logger log = LoggerFactory.getLogger(RemoteConfigUtil.class); private static Connection conn = null; private static PreparedStatement ps = null; private static ResultSet rs = null; public static MapmapConf; public RemoteConfigUtil() { } public static Map getByAppNameAndConfigName(String appName, String ConfigName) throws SQLException { if (mapConf != null && mapConf.size() > 0) { return mapConf; } else { Map ymlMap = LocalConfigUtil.getYmlMap("/appconfig"); String username = (String)ymlMap.get("mysql.username"); String password = (String)ymlMap.get("mysql.password"); String url = (String)ymlMap.get("mysql.url"); String driver = (String)ymlMap.get("mysql.driver"); Connection conn = JdbcUtil.getConnection(url, username, password, driver); PreparedStatement preparedStatement = null; Map var14; try { String sql = "select config_context from base_app_config where app_name = '%s' and config_name = '%s'"; preparedStatement = conn.prepareStatement(String.format(sql, appName, ConfigName)); ResultSet rs = preparedStatement.executeQuery(); String config_context; for(config_context = ""; rs.next(); config_context = rs.getString("config_context")) { } rs.close(); log.info("配置信息config_context: {}", config_context); if (StringUtils.isNotBlank(config_context)) { System.out.println(JSONObject.toJSONString(JSONObject.parseObject(config_context), new SerializerFeature[]{SerializerFeature.PrettyFormat})); } mapConf = (Map)JSON.parseObject(config_context, Map.class); var14 = mapConf; } finally { if (preparedStatement != null) { preparedStatement.close(); } if (conn != null) { conn.close(); } } return var14; } } }
public class ClickhouseUtil { public ClickhouseUtil() { } public static ListgetSchemaPoList(Map chMapConf) throws SQLException { List schemaPos = new ArrayList(); Connection connection = null; try { String jdbcurl = (String) chMapConf.get("jdbcurl"); String driver = (String) chMapConf.get("driver"); String userName = (String) chMapConf.get("username"); String password = (String) chMapConf.get("password"); String databaseName = (String) chMapConf.get("database-name"); String tableName = (String) chMapConf.get("table-name"); connection = JdbcUtil.getConnection(jdbcurl, userName, password, driver); DatabaseMetaData metaData = connection.getMetaData(); ResultSet colRet = metaData.getColumns((String) null, databaseName, tableName, "%"); System.out.println("表字段信息:"); while (colRet.next()) { String columnName = colRet.getString("COLUMN_NAME"); String columnType = colRet.getString("TYPE_NAME"); schemaPos.add(new SchemaPo(columnName, columnType)); System.out.println(columnName + " " + columnType); } } finally { try { if (connection != null) { connection.close(); } } catch (SQLException var18) { var18.printStackTrace(); } } return schemaPos; } public static String getCreateSinkTableSql(Map clickhouse, String sinkTableName, List schemaPos) { StringBuilder sinkTableSql = new StringBuilder(); String userName = (String) clickhouse.get("username"); String password = (String) clickhouse.get("password"); String connector = (String) clickhouse.get("connector"); String databaseName = (String) clickhouse.get("database-name"); String url = (String) clickhouse.get("url"); String tableName = (String) clickhouse.get("table-name"); String sinkBatchSize = (String) clickhouse.get("sink.batch-size"); String sinkFlushInterval = (String) clickhouse.get("sink.flush-interval"); String sinkMaxRetries = (String) clickhouse.get("sink.max-retries"); String sinkPartitionStrategy = (String) clickhouse.get("sink.partition-strategy"); String sinkPartitionKey = (String) clickhouse.get("sink.partition-key"); String sinkIgnoreDelete = (String) clickhouse.get("sink.ignore-delete"); sinkTableSql.append(String.format("CREATE TABLE %s (\n", sinkTableName)); int i = 0; Iterator var17 = schemaPos.iterator(); while (var17.hasNext()) { SchemaPo schemaPo = (SchemaPo) var17.next(); ++i; String signal = schemaPo.getSignal(); String type = schemaPo.getType(); if ("UInt64".equalsIgnoreCase(type)) { type = "BIGINT"; } else if ("Map(String,String)".equalsIgnoreCase(type)) { type = "Map "; } else if ("Datetime".equalsIgnoreCase(type)) { type = "Timestamp(0)"; } else { type = "String"; } sinkTableSql.append(String.format(" `%s` %s", signal, type)); sinkTableSql.append(i == schemaPos.size() ? ")" : ",\n"); } sinkTableSql.append("WITH(\n"); sinkTableSql.append(String.format("'connector' = '%s',\n", connector)); sinkTableSql.append(String.format("'url' = '%s',\n", url)); sinkTableSql.append(String.format("'username' = '%s',\n", userName)); sinkTableSql.append(String.format("'password' = '%s',\n", password)); sinkTableSql.append(String.format("'url' = '%s',\n", url)); sinkTableSql.append(String.format("'database-name' = '%s',\n", databaseName)); sinkTableSql.append(String.format("'table-name' = '%s',\n", tableName)); sinkTableSql.append(String.format("'sink.batch-size' = '%s',\n", sinkBatchSize)); sinkTableSql.append(String.format("'sink.flush-interval' = '%s',\n", sinkFlushInterval)); sinkTableSql.append(String.format("'sink.max-retries' = '%s',\n", sinkMaxRetries)); sinkTableSql.append(String.format("'sink.partition-strategy' = 'hash',\n")); sinkTableSql.append(String.format("'sink.partition-key' = 'sample_date_time',\n")); sinkTableSql.append(String.format("'sink.ignore-delete' = '%s'\n", sinkIgnoreDelete)); sinkTableSql.append(" )"); return sinkTableSql.toString(); } //转换成ck需要的格式 public static Row convertRow(Map resultMap, List schemaPos) { Row row = new Row(schemaPos.size()); for (int i = 0; i < schemaPos.size(); i++) { SchemaPo schemaPo = schemaPos.get(i); String valueStr = resultMap.get(schemaPo.getSignal()); if (StringUtils.isBlank(valueStr)) { row.setField(i, null); continue; } if ("UInt64".equalsIgnoreCase(schemaPo.getType())) { Long svalue = Long.valueOf(valueStr); row.setField(i, Math.abs(svalue)); } else if ("Int64".equalsIgnoreCase(schemaPo.getType())) { Long svalue = Long.valueOf(valueStr); row.setField(i, Math.abs(svalue)); } else if ("Int32".equalsIgnoreCase(schemaPo.getType())) { Integer svalue = Integer.valueOf(valueStr); row.setField(i, svalue); } else if ("datetime".equalsIgnoreCase(schemaPo.getType())) { try { Date svalue = (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")).parse(valueStr); Timestamp timestamp = new Timestamp(svalue.getTime()); row.setField(i, timestamp); } catch (Exception ex) { System.out.println(ex.getMessage()); System.out.println(Arrays.toString(ex.getStackTrace())); } } else { row.setField(i, valueStr); } } return row; } }
public interface FlatMapFunction { public FlatMapFunction, Row> newInstance(List schemaPos); }
public class ClickHouseCatalog extends AbstractCatalog { private static final Logger LOG = LoggerFactory.getLogger(ClickHouseCatalog.class); public static final String DEFAULT_DATABASE = "default"; private final String baseUrl; private final String username; private final String password; private final boolean ignorePrimaryKey; private final Mapproperties; private ClickHouseConnection connection; public ClickHouseCatalog(String catalogName, Map properties) { this(catalogName, (String)properties.get("database-name"), (String)properties.get("url"), (String)properties.get("username"), (String)properties.get("password"), properties); } public ClickHouseCatalog(String catalogName, @Nullable String defaultDatabase, String baseUrl, String username, String password) { this(catalogName, defaultDatabase, baseUrl, username, password, Collections.emptyMap()); } public ClickHouseCatalog(String catalogName, @Nullable String defaultDatabase, String baseUrl, String username, String password, Map properties) { super(catalogName, defaultDatabase == null ? "default" : defaultDatabase); Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(baseUrl), "baseUrl cannot be null or empty"); Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(username), "username cannot be null or empty"); Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(password), "password cannot be null or empty"); this.baseUrl = baseUrl.endsWith("/") ? baseUrl : baseUrl + "/"; this.username = username; this.password = password; this.ignorePrimaryKey = properties.get("catalog.ignore-primary-key") == null || Boolean.parseBoolean((String)properties.get("catalog.ignore-primary-key")); this.properties = Collections.unmodifiableMap(properties); } public void open() throws CatalogException { try { Properties configuration = new Properties(); configuration.putAll(this.properties); configuration.setProperty(ClickHouseQueryParam.USER.getKey(), this.username); configuration.setProperty(ClickHouseQueryParam.PASSWORD.getKey(), this.password); configuration.setProperty("socket_timeout", "600000"); String jdbcUrl = ClickHouseUtil.getJdbcUrl(this.baseUrl, this.getDefaultDatabase()); BalancedClickhouseDataSource dataSource = new BalancedClickhouseDataSource(jdbcUrl, configuration); dataSource.actualize(); this.connection = dataSource.getConnection(); LOG.info("Created catalog {}, established connection to {}", this.getName(), jdbcUrl); } catch (Exception var4) { throw new CatalogException(String.format("Opening catalog %s failed.", this.getName()), var4); } } public synchronized void close() throws CatalogException { try { this.connection.close(); LOG.info("Closed catalog {} ", this.getName()); } catch (Exception var2) { throw new CatalogException(String.format("Closing catalog %s failed.", this.getName()), var2); } } public Optional getFactory() { return Optional.of(new ClickHouseDynamicTableFactory()); } public synchronized List listDatabases() throws CatalogException { try { PreparedStatement stmt = this.connection.prepareStatement("SELECT name from `system`.databases"); Throwable var2 = null; try { ResultSet rs = stmt.executeQuery(); Throwable var4 = null; try { List databases = new ArrayList(); while(rs.next()) { databases.add(rs.getString(1)); } return databases; } catch (Throwable var31) { var4 = var31; throw var31; } finally { if (rs != null) { if (var4 != null) { try { rs.close(); } catch (Throwable var30) { var4.addSuppressed(var30); } } else { rs.close(); } } } } catch (Throwable var33) { var2 = var33; throw var33; } finally { if (stmt != null) { if (var2 != null) { try { stmt.close(); } catch (Throwable var29) { var2.addSuppressed(var29); } } else { stmt.close(); } } } } catch (Exception var35) { throw new CatalogException(String.format("Failed listing database in catalog %s", this.getName()), var35); } } public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException, CatalogException { if (this.listDatabases().contains(databaseName)) { return new CatalogDatabaseImpl(Collections.emptyMap(), (String)null); } else { throw new DatabaseNotExistException(this.getName(), databaseName); } } public boolean databaseExists(String databaseName) throws CatalogException { Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName)); return this.listDatabases().contains(databaseName); } public void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists) throws DatabaseAlreadyExistException, CatalogException { throw new UnsupportedOperationException(); } public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade) throws DatabaseNotEmptyException, CatalogException { throw new UnsupportedOperationException(); } public void alterDatabase(String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists) throws DatabaseNotExistException, CatalogException { throw new UnsupportedOperationException(); } public synchronized List listTables(String databaseName) throws DatabaseNotExistException, CatalogException { if (!this.databaseExists(databaseName)) { throw new DatabaseNotExistException(this.getName(), databaseName); } else { try { PreparedStatement stmt = this.connection.prepareStatement(String.format("SELECT name from `system`.tables where database = '%s'", databaseName)); Throwable var3 = null; try { ResultSet rs = stmt.executeQuery(); Throwable var5 = null; try { List tables = new ArrayList(); while(rs.next()) { tables.add(rs.getString(1)); } return tables; } catch (Throwable var32) { var5 = var32; throw var32; } finally { if (rs != null) { if (var5 != null) { try { rs.close(); } catch (Throwable var31) { var5.addSuppressed(var31); } } else { rs.close(); } } } } catch (Throwable var34) { var3 = var34; throw var34; } finally { if (stmt != null) { if (var3 != null) { try { stmt.close(); } catch (Throwable var30) { var3.addSuppressed(var30); } } else { stmt.close(); } } } } catch (Exception var36) { throw new CatalogException(String.format("Failed listing tables in catalog %s database %s", this.getName(), databaseName), var36); } } } public List listViews(String databaseName) throws DatabaseNotExistException, CatalogException { throw new UnsupportedOperationException(); } public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException { if (!this.tableExists(tablePath)) { throw new TableNotExistException(this.getName(), tablePath); } else { Map configuration = new HashMap(this.properties); configuration.put("url", this.baseUrl); configuration.put("database-name", tablePath.getDatabaseName()); configuration.put("table-name", tablePath.getObjectName()); configuration.put("username", this.username); configuration.put("password", this.password); String databaseName = tablePath.getDatabaseName(); String tableName = tablePath.getObjectName(); try { DistributedEngineFullSchema engineFullSchema = ClickHouseUtil.getAndParseDistributedEngineSchema(this.connection, tablePath.getDatabaseName(), tablePath.getObjectName()); if (engineFullSchema != null) { databaseName = engineFullSchema.getDatabase(); tableName = engineFullSchema.getTable(); } } catch (Exception var6) { throw new CatalogException(String.format("Failed getting engine full of %s.%s.%s", this.getName(), databaseName, tableName), var6); } return new CatalogTableImpl(this.createTableSchema(databaseName, tableName), this.getPartitionKeys(databaseName, tableName), configuration, ""); } } private synchronized TableSchema createTableSchema(String databaseName, String tableName) { try { PreparedStatement stmt = this.connection.prepareStatement(String.format("SELECT * from `%s`.`%s` limit 0", databaseName, tableName)); Throwable var4 = null; TableSchema var24; try { ClickHouseResultSetMetaData metaData = (ClickHouseResultSetMetaData)stmt.getMetaData().unwrap(ClickHouseResultSetMetaData.class); Method getColMethod = metaData.getClass().getDeclaredMethod("getCol", Integer.TYPE); getColMethod.setAccessible(true); List primaryKeys = this.getPrimaryKeys(databaseName, tableName); TableSchema.Builder builder = TableSchema.builder(); for(int idx = 1; idx <= metaData.getColumnCount(); ++idx) { ClickHouseColumnInfo columnInfo = (ClickHouseColumnInfo)getColMethod.invoke(metaData, idx); String columnName = columnInfo.getColumnName(); DataType columnType = ClickHouseTypeUtil.toFlinkType(columnInfo); if (primaryKeys.contains(columnName)) { columnType = (DataType)columnType.notNull(); } builder.field(columnName, columnType); } if (!primaryKeys.isEmpty()) { builder.primaryKey((String[])primaryKeys.toArray(new String[0])); } var24 = builder.build(); } catch (Throwable var21) { var4 = var21; throw var21; } finally { if (stmt != null) { if (var4 != null) { try { stmt.close(); } catch (Throwable var20) { var4.addSuppressed(var20); } } else { stmt.close(); } } } return var24; } catch (Exception var23) { throw new CatalogException(String.format("Failed getting columns in catalog %s database %s table %s", this.getName(), databaseName, tableName), var23); } } private List getPrimaryKeys(String databaseName, String tableName) { if (this.ignorePrimaryKey) { return Collections.emptyList(); } else { try { PreparedStatement stmt = this.connection.prepareStatement(String.format("SELECT name from `system`.columns where `database` = '%s' and `table` = '%s' and is_in_primary_key = 1", databaseName, tableName)); Throwable var4 = null; try { ResultSet rs = stmt.executeQuery(); Throwable var6 = null; try { List primaryKeys = new ArrayList(); while(rs.next()) { primaryKeys.add(rs.getString(1)); } return primaryKeys; } catch (Throwable var33) { var6 = var33; throw var33; } finally { if (rs != null) { if (var6 != null) { try { rs.close(); } catch (Throwable var32) { var6.addSuppressed(var32); } } else { rs.close(); } } } } catch (Throwable var35) { var4 = var35; throw var35; } finally { if (stmt != null) { if (var4 != null) { try { stmt.close(); } catch (Throwable var31) { var4.addSuppressed(var31); } } else { stmt.close(); } } } } catch (Exception var37) { throw new CatalogException(String.format("Failed getting primary keys in catalog %s database %s table %s", this.getName(), databaseName, tableName), var37); } } } private List getPartitionKeys(String databaseName, String tableName) { try { PreparedStatement stmt = this.connection.prepareStatement(String.format("SELECT name from `system`.columns where `database` = '%s' and `table` = '%s' and is_in_partition_key = 1", databaseName, tableName)); Throwable var4 = null; try { ResultSet rs = stmt.executeQuery(); Throwable var6 = null; try { List partitionKeys = new ArrayList(); while(rs.next()) { partitionKeys.add(rs.getString(1)); } return partitionKeys; } catch (Throwable var33) { var6 = var33; throw var33; } finally { if (rs != null) { if (var6 != null) { try { rs.close(); } catch (Throwable var32) { var6.addSuppressed(var32); } } else { rs.close(); } } } } catch (Throwable var35) { var4 = var35; throw var35; } finally { if (stmt != null) { if (var4 != null) { try { stmt.close(); } catch (Throwable var31) { var4.addSuppressed(var31); } } else { stmt.close(); } } } } catch (Exception var37) { throw new CatalogException(String.format("Failed getting partition keys of %s.%s.%s", this.getName(), databaseName, tableName), var37); } } public boolean tableExists(ObjectPath tablePath) throws CatalogException { try { return this.databaseExists(tablePath.getDatabaseName()) && this.listTables(tablePath.getDatabaseName()).contains(tablePath.getObjectName()); } catch (DatabaseNotExistException var3) { return false; } } public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException { throw new UnsupportedOperationException(); } public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists) throws TableNotExistException, TableAlreadyExistException, CatalogException { throw new UnsupportedOperationException(); } public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { throw new UnsupportedOperationException(); } public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException { throw new UnsupportedOperationException(); } public List listPartitions(ObjectPath tablePath) throws TableNotExistException, TableNotPartitionedException, CatalogException { return Collections.emptyList(); } public List listPartitions(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, CatalogException { return Collections.emptyList(); } public List listPartitionsByFilter(ObjectPath tablePath, List filters) throws TableNotExistException, TableNotPartitionedException, CatalogException { return Collections.emptyList(); } public CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws PartitionNotExistException, CatalogException { throw new PartitionNotExistException(this.getName(), tablePath, partitionSpec); } public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws CatalogException { throw new UnsupportedOperationException(); } public void createPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition partition, boolean ignoreIfExists) throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, PartitionAlreadyExistsException, CatalogException { throw new UnsupportedOperationException(); } public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException { throw new UnsupportedOperationException(); } public void alterPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition newPartition, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException { throw new UnsupportedOperationException(); } public List listFunctions(String dbName) throws DatabaseNotExistException, CatalogException { return Collections.emptyList(); } public CatalogFunction getFunction(ObjectPath functionPath) throws FunctionNotExistException, CatalogException { throw new FunctionNotExistException(this.getName(), functionPath); } public boolean functionExists(ObjectPath functionPath) throws CatalogException { return false; } public void createFunction(ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists) throws FunctionAlreadyExistException, DatabaseNotExistException, CatalogException { throw new UnsupportedOperationException(); } public void alterFunction(ObjectPath functionPath, CatalogFunction newFunction, boolean ignoreIfNotExists) throws FunctionNotExistException, CatalogException { throw new UnsupportedOperationException(); } public void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists) throws FunctionNotExistException, CatalogException { throw new UnsupportedOperationException(); } public CatalogTableStatistics getTableStatistics(ObjectPath tablePath) throws TableNotExistException, CatalogException { return CatalogTableStatistics.UNKNOWN; } public CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath) throws TableNotExistException, CatalogException { return CatalogColumnStatistics.UNKNOWN; } public CatalogTableStatistics getPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws PartitionNotExistException, CatalogException { return CatalogTableStatistics.UNKNOWN; } public CatalogColumnStatistics getPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws PartitionNotExistException, CatalogException { return CatalogColumnStatistics.UNKNOWN; } public void alterTableStatistics(ObjectPath tablePath, CatalogTableStatistics tableStatistics, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException { throw new UnsupportedOperationException(); } public void alterTableColumnStatistics(ObjectPath tablePath, CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException, TablePartitionedException { throw new UnsupportedOperationException(); } public void alterPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogTableStatistics partitionStatistics, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException { throw new UnsupportedOperationException(); } public void alterPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException { throw new UnsupportedOperationException(); } }
public class Kafka2chApp { private static final Logger log = LoggerFactory.getLogger(Kafka2chApp.class); private static String SINK_TABLE = "sinkTable"; private static String KAFKA_TEMP_VIEW = "kafkaTempView"; /** * @param appName mysql配置表对应字段 * @param configName mysql配置表对应字段 * @throws Exception */ public static void run(String appName, String configName, FlatMapFunction FlatMapFunction) throws Exception { log.info("Kafka2chApp.run传参appName:{}, configName:{}", appName, configName); // 获得数据库中的配置 MapmapConf = RemoteConfigUtil.getByAppNameAndConfigName(appName, configName); if (mapConf == null || mapConf.size() == 0) return; Map clickhouseConf = (Map ) mapConf.get("clickhouse"); Map kafkaConsumerConf = (Map ) mapConf.get("kafka-consumer"); Map hdfsConf = (Map ) mapConf.get("hdfs"); // long beforeTime2Dropout = System.currentTimeMillis() - (Long) mapConf.get("before2DropoutHourStep") * 3600; // long after2DropoutTime = System.currentTimeMillis(); // 初始化TableEnv & 获得流 StreamExecutionEnvironment streamEnv = StreamEnv.getStreamEnv(hdfsConf); streamEnv.setParallelism(ckP); StreamTableEnvironment tableEnv = TableEnv.getTableEnv(); // 处理 List schemaPos = ClickhouseUtil.getSchemaPoList(clickhouseConf); TypeInformation[] types = getTypeInformationArray(schemaPos); // TypeInformation[] types = (schemaPos); String[] fieldNames = SchemaPoUtil.getFieldLists(schemaPos); FlatMapFunction , Row> flatMapFunction = x5lFlatMapFunction.newInstance(schemaPos); DataStreamSource > stream; SingleOutputStreamOperator infos; stream = streamEnv.addSource(CommonUtils.getKafkaConsumer(kafkaConsumerConf)); System.out.println("Source 设置并行度为"+streamEnv.getParallelism()); } infos = stream.flatMap(flatMapFunction); infos = infos.map(e -> e,new RowTypeInfo(types, fieldNames)); System.out.println("map 设置并行度为"+streamEnv.getParallelism()); } // 创建kafka数据临时视图 tableEnv.createTemporaryView(KAFKA_TEMP_VIEW, infos); // 创建存放kafka数据的clickhouse映射表 // String createSinkTableSql = ClickhouseUtil.getCreateSinkTableSql(clickhouseConf, SINK_TABLE, schemaPos); Map
props = new HashMap<>(); props.put(ClickHouseConfig.DATABASE_NAME, (String) clickhouseConf.get("database-name")); props.put(ClickHouseConfig.URL, (String) clickhouseConf.get("url")); props.put(ClickHouseConfig.USERNAME, (String) clickhouseConf.get("username")); props.put(ClickHouseConfig.PASSWORD, (String) clickhouseConf.get("password")); props.put(ClickHouseConfig.SINK_FLUSH_INTERVAL, (String) clickhouseConf.get("sink.flush-interval")); props.put(ClickHouseConfig.SINK_BATCH_SIZE, (String) clickhouseConf.get("sink.batch-size")); Catalog cHcatalog = new ClickHouseCatalog("clickhouse", props); tableEnv.registerCatalog("clickhouse", cHcatalog); tableEnv.useCatalog("clickhouse"); // Arrays.stream(tableEnv.listCatalogs()).forEach(e -> System.out.println("catalog: " + e)); // Arrays.stream(tableEnv.listDatabases()).forEach(e -> System.out.println("database: " + e)); // System.out.println(tableEnv.listTables().length); // Arrays.stream(tableEnv.listTables()).forEach(e -> System.out.println("table: " + e)); // tableEnv.executeSql(createSinkTableSql); // System.out.println(tableEnv.executeSql("select * from " + KAFKA_TEMP_VIEW).getTableSchema()); //拼接sql String insertSql = "insert into `" + clickhouseConf.get("table-name") + "` select * from default_catalog.default_database." + KAFKA_TEMP_VIEW; // System.out.println("insertSql: " + insertSql); // log.info("insertSql: ", insertSql); //执行sql tableEnv.executeSql(insertSql); // 测试打印infos结果 /*infos.print(); streamEnv.executeAsync();*/ } public static TypeInformation[] getTypeInformationArray(List schemaPos) { // String[] fieldNames = new String[columnTypeMap.size()]; TypeInformation[] types = new TypeInformation[schemaPos.size()]; int i = 0; for (SchemaPo po : schemaPos) { if ("String".equalsIgnoreCase(po.getType())) { types[i] = Types.STRING; } else if ("Int64".equalsIgnoreCase(po.getType())) { types[i] = Types.LONG; } else if ("UInt64".equalsIgnoreCase(po.getType())) { types[i] = Types.LONG; } else if ("Int32".equalsIgnoreCase(po.getType())) { types[i] = Types.INT; } else if ("Int8".equalsIgnoreCase(po.getType())) { types[i] = Types.INT; } else if ("datetime".equalsIgnoreCase(po.getType())) { types[i] = Types.SQL_TIMESTAMP; } else if ("Map(String,String)".equalsIgnoreCase(po.getType())) { types[i] = Types.MAP(Types.STRING, Types.STRING); } else { types[i] = Types.STRING; } i++; } return types; } }
public class Kafka2Ck-ODS implements FlatMapFunction { private static Logger logger = Logger.getLogger(Kafka2Ck-ODS.class); public static void main(String[] args) throws Exception { Kafka2chApp.run(Kafka2Ck-ODS.class.getName(), args[0], new Kafka2Ck-ODS()); } @Override public FlatMapFunction, Row> newInstance(List schemaPos) { return new FlatMapFunction , Row>() { @Override public void flatMap(ConsumerRecord record, Collector out) throws Exception { // System.out.println("record ===> " +record); // 测试 String value = record.value(); try { HashMap
infoMap = JSON.parseObject(value, HashMap.class); // 处理dataListMap中的数据 for (Map.Entry entry : dataListMap.entrySet()) { String key = entry.getKey(); String value1 = entry.getValue(); resultMap.put(key.toLowerCase(), value1); } Row row = TableEnv.getRowBySchemaPo1(resultMap, schemaPos); out.collect(row); } catch (Exception e) { e.printStackTrace(); System.out.printf("数据异常,原因是%s,topic为%s,key为%s,value为%s%n", e.getMessage(), record.topic(), record.key(), record.value()); } } }; } }