编写自定义加密函数,继承ScalarFunction类,实现eval方法,参数个数类型和返回值根据业务来自定义。
import org.apache.flink.table.functions.ScalarFunction; import javax.crypto.Cipher; import javax.crypto.KeyGenerator; import javax.crypto.SecretKey; import javax.crypto.spec.SecretKeySpec; import java.nio.charset.StandardCharsets; import java.security.Key; import java.security.NoSuchAlgorithmException; import java.security.SecureRandom; import java.util.Base64; public class AESUtil extends ScalarFunction { private static String DEFAULT_CIPHER_ALGORITHM = "SHA1PRNG"; private static String KEY_ALGORITHM = "AES"; private static String key = "AD42F6697B035B75"; //必须有这个方法,在这个方法里实现业务逻辑 public String eval(String str) { return encrypt(str); } /** * 加密 * * @param key * @param messBytes * @return */ private static byte[] encrypt(Key key, byte[] messBytes) throws Exception { if (key != null) { Cipher cipher = Cipher.getInstance(KEY_ALGORITHM); cipher.init(Cipher.ENCRYPT_MODE, key); return cipher.doFinal(messBytes); } return null; } /** * AES(256)解密 * * @param key * @param cipherBytes * @return */ private static byte[] decrypt(Key key, byte[] cipherBytes) throws Exception { if (key != null) { Cipher cipher = Cipher.getInstance(KEY_ALGORITHM); cipher.init(Cipher.DECRYPT_MODE, key); return cipher.doFinal(cipherBytes); } return null; } /** * 生成加密秘钥 * * @return * @throws NoSuchAlgorithmException */ private static KeyGenerator getKeyGenerator() { KeyGenerator keygen = null; try { keygen = KeyGenerator.getInstance(KEY_ALGORITHM); SecureRandom secureRandom = SecureRandom.getInstance(DEFAULT_CIPHER_ALGORITHM); secureRandom.setSeed(key.getBytes()); keygen.init(128, secureRandom); } catch (NoSuchAlgorithmException e) { } return keygen; } public static String encrypt(String message) { try { KeyGenerator keygen = getKeyGenerator(); SecretKey secretKey = new SecretKeySpec(keygen.generateKey().getEncoded(), KEY_ALGORITHM); return Base64.getEncoder().encodeToString(encrypt(secretKey, message.getBytes(StandardCharsets.UTF_8))); } catch (Exception e) { } return null; } public static String decrypt(String ciphertext) { try { KeyGenerator keygen = getKeyGenerator(); SecretKey secretKey = new SecretKeySpec(keygen.generateKey().getEncoded(), KEY_ALGORITHM); return new String(decrypt(secretKey, Base64.getDecoder().decode(ciphertext)), StandardCharsets.UTF_8); } catch (Exception e) { } return null; }
FlinkCDC mysql到mysql 业务代码
import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.example.util.AESUtil; public class FlinkMysqlToMysql { public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000)); env.enableCheckpointing(5000); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 创建Table环境 EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings); // 注册源表和目标表 tEnv.executeSql("create table sourceTable(id bigint,test VARCHAR, PRIMARY KEY (id) NOT ENFORCED ) WITH (\n" + //源表连接器一定得是mysql-cdc "'connector' = 'mysql-cdc'," + "'hostname' = 'localhost',\n" + " 'port' = '3306',\n" + " 'database-name' = 'testdb',\n" + " 'table-name' = 'flinktest',\n" + " 'username' = 'root',\n" + " 'password' = 'admin'\n" + ")"); //这里注册加密函数 tEnv.createTemporarySystemFunction("encrypt", new AESUtil()); //sql里面使用自定义函数加密 Table result = tEnv.sqlQuery("SELECT id,encrypt(test) FROM sourceTable"); tEnv.registerTable("sourceTable", result); //创建skink表 tEnv.executeSql("create table targetTable(id bigint,test VARCHAR ,PRIMARY KEY (id) NOT ENFORCED ) WITH (\n" + //目标表连接器是jdbc "'connector' = 'jdbc'," + "'url' = 'jdbc:mysql://localhost:3306/testdb?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=false',\n" + " 'table-name' = 'flinktest2',\n" + " 'username' = 'root',\n" + " 'driver' = 'com.mysql.cj.jdbc.Driver',\n" + " 'password' = 'admin'\n" + ")"); // 执行CDC过程 String query = "INSERT INTO targetTable SELECT * FROM sourceTable"; tEnv.executeSql(query).print(); } }
运行结果,加密成功