目前,大数据体系里有各种各样的数据引擎,有大数据生态的 Hadoop、Hive、Kudu、Kafka、HDFS,也有泛大数据库体系的 MongoDB、Redis、ClickHouse、Doris,更有云上的 AWS S3、Redshift、BigQuery、Snowflake,还有各种各样数据生态 MySQL、PostgresSQL、IoTDB、TDEngine、Salesforce、Workday 等。我们需要工具让这些数据之间能互联互通,那么 Apache SeaTunnel 就是打通这些复杂数据源的利器,它可以简单、准确、实时地把各种数据源整合到目标数据源当中,成为大数据流动的“高速公路”。
简单理解:数据库同步工具,类似阿里的canal
本文介绍使用seaTunnel的MYSQL-CDC方式进行mysql与mysql相互同步
seaTunnel官网:https://seatunnel.apache.org
wget https://repository.hazelcast.com/rpm/stable/hazelcast-rpm-stable.repo -O hazelcast-rpm-stable.repo sudo mv hazelcast-rpm-stable.repo /etc/yum.repos.d/ sudo yum install hazelcast-5.0.1
创建目录
mkdir -p /home/seatunnel
cd /home/seatunnel
执行:
export version="2.3.2" wget "https://archive.apache.org/dist/seatunnel/${version}/apache-seatunnel-${version}-bin.tar.gz" tar -xzvf "apache-seatunnel-${version}-bin.tar.gz"
创建执行上下文
vim /etc/profile.d/seatunnel.sh
输入
export SEATUNNEL_HOME=/home/seatunnel/apache-seatunnel-2.3.2 export PATH=$PATH:$SEATUNNEL_HOME/bin
生效配置
source /etc/profile
打开:$SEATUNNEL_HOME/bin/seatunnel-cluster.sh
在第一行添加JVM配置
JAVA_OPTS="-Xms2G -Xmx2G"
cd apache-seatunnel-2.3.2/config
mv plugin_config plugin_config.bak
vim plugin_config
输入以下内容:
--connectors-v2-- connector-cdc-mysql connector-jdbc connector-starrocks --end--
需要其它插件可以从plugin_config.bak中挑选
保存后执行命令安装插件:
sh bin/install-plugin.sh 2.3.2
mysql-connector-java-8.0.30.jar 点击下载
将mysql-connector-java-8.0.30.jar放到$SEATUNNEL_HOME/lib/目录中
cd config
vim seatunnel.yaml
修改namespace,fs.defaultFS 默认在tmp目录
seatunnel: engine: backup-count: 1 queue-type: blockingqueue print-execution-info-interval: 60 print-job-metrics-info-interval: 60 slot-service: dynamic-slot: true checkpoint: interval: 10000 timeout: 60000 max-concurrent: 5 tolerable-failure: 2 storage: type: hdfs max-retained: 3 plugin-config: namespace: /opt/seatunnel/checkpoint_snapshot storage.type: hdfs fs.defaultFS: file:///opt/fs/ # Ensure that the directory has written permission
cd config cp v2.streaming.conf.template test.config vim test.config
输入一下内容:
env { # You can set SeaTunnel environment configuration here execution.parallelism = 2 job.mode = "STREAMING" # 10秒检查一次,可以适当加大这个值 checkpoint.interval = 10000 #execution.checkpoint.interval = 10000 #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" } # 配置数据源 source { MySQL-CDC { # 数据库账号 username = "canal" password = "canal" # 源表,格式:数据库名.表名 table-names = ["canalold.uc_user"] base-url = "jdbc:mysql://172.16.4.196:3306/canalold" } } # 配置目标库 sink { jdbc { url = "jdbc:mysql://172.16.4.175:3306/canalnew" driver = "com.mysql.cj.jdbc.Driver" user = "canal" password = "canal" generate_sink_sql = true # 目标数据库名 database = "canalnew" # 目标表名 table = "uc_user" # 主键名称 primary_keys = ["id"] } }
一张表一个配置
mkdir -p $SEATUNNEL_HOME/logs nohup bin/seatunnel-cluster.sh 2>&1 &
提交任务:
nohup ./bin/seatunnel.sh --config ./config/test.config 2>&1 &
提交一次即可
前往logs目录查看是否提交成功
集群部署:https://seatunnel.apache.org/docs/2.3.2/seatunnel-engine/deployment
sql must not null
添加如下配置
database = "canalnew" table = "uc_user"
如果不配置,必须配置query属性,即query属性和上面两个配置二选一,且上面两个配置优先级高