使用Apache SeaTunnel进行数据库同步(MySQL to MySQL)
作者:mmseoamin日期:2023-12-13
  • Apache SeaTunnel 起到的主要作用是什么?

    目前,大数据体系里有各种各样的数据引擎,有大数据生态的 Hadoop、Hive、Kudu、Kafka、HDFS,也有泛大数据库体系的 MongoDB、Redis、ClickHouse、Doris,更有云上的 AWS S3、Redshift、BigQuery、Snowflake,还有各种各样数据生态 MySQL、PostgresSQL、IoTDB、TDEngine、Salesforce、Workday 等。我们需要工具让这些数据之间能互联互通,那么 Apache SeaTunnel 就是打通这些复杂数据源的利器,它可以简单、准确、实时地把各种数据源整合到目标数据源当中,成为大数据流动的“高速公路”。

    简单理解:数据库同步工具,类似阿里的canal

    本文介绍使用seaTunnel的MYSQL-CDC方式进行mysql与mysql相互同步


    seaTunnel官网:https://seatunnel.apache.org

    • 依赖:Java8+,hazelcast-5.0.1

      安装hazelcast-5.0.1

      wget https://repository.hazelcast.com/rpm/stable/hazelcast-rpm-stable.repo -O hazelcast-rpm-stable.repo
      sudo mv hazelcast-rpm-stable.repo /etc/yum.repos.d/
      sudo yum install hazelcast-5.0.1
      

      下载seaTunnel

      创建目录

      mkdir -p /home/seatunnel

      cd /home/seatunnel

      执行:

      export version="2.3.2"
      wget "https://archive.apache.org/dist/seatunnel/${version}/apache-seatunnel-${version}-bin.tar.gz"
      tar -xzvf "apache-seatunnel-${version}-bin.tar.gz"
      

      创建执行上下文

      vim /etc/profile.d/seatunnel.sh

      输入

      export SEATUNNEL_HOME=/home/seatunnel/apache-seatunnel-2.3.2
      export PATH=$PATH:$SEATUNNEL_HOME/bin
      

      生效配置

      source /etc/profile

      打开:$SEATUNNEL_HOME/bin/seatunnel-cluster.sh

      在第一行添加JVM配置

      JAVA_OPTS="-Xms2G -Xmx2G"

      安装插件:

      cd apache-seatunnel-2.3.2/config

      mv plugin_config plugin_config.bak

      vim plugin_config

      输入以下内容:

      --connectors-v2--
      connector-cdc-mysql
      connector-jdbc
      connector-starrocks
      --end--
      

      需要其它插件可以从plugin_config.bak中挑选

      保存后执行命令安装插件:

      sh bin/install-plugin.sh 2.3.2

      添加mysql驱动

      mysql-connector-java-8.0.30.jar 点击下载

      将mysql-connector-java-8.0.30.jar放到$SEATUNNEL_HOME/lib/目录中

      配置checkpoint存储

      cd config

      vim seatunnel.yaml

      修改namespace,fs.defaultFS 默认在tmp目录

      seatunnel:
        engine:
          backup-count: 1
          queue-type: blockingqueue
          print-execution-info-interval: 60
          print-job-metrics-info-interval: 60
          slot-service:
            dynamic-slot: true
          checkpoint:
            interval: 10000
            timeout: 60000
            max-concurrent: 5
            tolerable-failure: 2
            storage:
              type: hdfs
              max-retained: 3
              plugin-config:
                namespace: /opt/seatunnel/checkpoint_snapshot
                storage.type: hdfs
                fs.defaultFS: file:///opt/fs/ # Ensure that the directory has written permission
      

      配置MySQL CDC

      cd config
      cp v2.streaming.conf.template test.config
      vim test.config
      

      输入一下内容:

          env {
            # You can set SeaTunnel environment configuration here
            execution.parallelism = 2
            job.mode = "STREAMING"
            # 10秒检查一次,可以适当加大这个值
            checkpoint.interval = 10000
            #execution.checkpoint.interval = 10000
            #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
          }
          # 配置数据源
          source {
            MySQL-CDC {
              # 数据库账号
              username = "canal"
              password = "canal"
              # 源表,格式:数据库名.表名
              table-names = ["canalold.uc_user"]
              base-url = "jdbc:mysql://172.16.4.196:3306/canalold"
            }
          }
          # 配置目标库
          sink {
            jdbc {
              url = "jdbc:mysql://172.16.4.175:3306/canalnew"
              driver = "com.mysql.cj.jdbc.Driver"
              user = "canal"
              password = "canal"
              
              generate_sink_sql = true
      		# 目标数据库名
              database = "canalnew"
              # 目标表名
              table = "uc_user"
              # 主键名称
              primary_keys = ["id"]
            }
          }
      

      一张表一个配置