Apache SeaTunnel 原名 Waterdrop,在 2021 年 10 月更名为 SeaTunnel 并申请加入 Apache孵化器。目前 Apache SeaTunnel 已发布 40+个版本,并在大量企业生产实践中使用,包括 J.P.Morgan、字节跳动、Stey、中国移动、富士康、腾讯云、国双、中科大数据研究院、360、Shoppe、Bilibili、新浪、搜狗、唯品会等企业,广泛应用于海量异构数据集成、CDC 数据同步,SaaS 数据集成以及多源数据处理等场景中。
2021 年 12 月 9 日, Apache SeaTunnel 以全票通过的优秀表现正式成为 Apache 孵化器项目。
2023 年 5 月 17 日,Apache 董事会通过 Apache SeaTunnel 毕业决议,结束了为期 18 个月的孵化,正式确定 Apache SeaTunnel 成为 Apache 顶级项目
Apache SeaTunnel 是新一代高性能、分布式、海量数据集成工具,支持上百种数据源 ( Database/Cloud/SaaS ) 支持海量数据的实时 CDC 和批量同步,可以稳定高效地同步万亿级数据。
作为一款简单一易用、超高性能、支持实时流式和离线批量处理的数据集成平台,Apache SeaTunnel 整体的特征和优势包括:
SeaTunnel 引擎是 SeaTunnel 的默认引擎。SeaTunnel的安装包中已经包含了SeaTunnel Engine的全部内容
https://dlcdn.apache.org/seatunnel/2.3.3/apache-seatunnel-2.3.3-bin.tar.gz
下载完毕之后上传到服务器上面并解压
# 解压到了/opt/module目录下 tar -zxvf apache-seatunnel-2.3.3-bin.tar.gz -C /opt/module
从2.2.0-beta开始,二进制包默认不提供connectors的依赖,因此在第一次使用它时,需要执行以下命令来安装连接器:(当然,您也可以从Apache Maven Repository[https://repo.maven.apache.org/maven2/org/apache/seatunnel/]手动下载连接器,然后手动移动到connectors/seatunnel目录)
sh bin/install-plugin.sh 2.3.3
如果需要指定connector的版本,以2.3.3版本为例,需要执行
sh bin/install-plugin.sh 2.3.3
一般情况下我们不需要所有的连接器插件,所以你可以通过配置config/plugin_config来指定你需要的插件,例如,你只需要connector-console插件,然后你可以修改plugin.properties,比如
--seatunnel-connectors-- connector-console --end--
如果希望示例应用程序正常工作,则需要添加以下插件
--seatunnel-connectors-- connector-fake connector-console --end--
你可以在${SEATUNNEL_HOME}/connectors/plugins-mapping.properties下找到所有支持的连接器和相应的plugin_config配置名称
如果想手动安装V2 connector插件,只需要下载自己需要的V2 connector插件,放到${SEATUNNEL_HOME}/connectors/seatunnel目录下即可
我们可以直接使用官方的模版config/v2.batch.config.template, 该模版分别定义了env、source、sink
env:表示运行的参数设置,比如并发数,作业运行模式:BATCH/STRAM、checkpoint等等
source:表示数据源的定义
sink:表示目标端的数据源定义
执行命令:
./bin/seatunnel.sh --config ./config/v2.batch.config.template -e local
我是用的2.3.3,在运行后会报错,提示缺少:java.lang.NoClassDefFoundError: com/sun/jersey/client/impl/CopyOnWriteHashMap
报错信息:
2023-11-08 17:47:32,243 ERROR org.apache.seatunnel.core.starter.SeaTunnel - Fatal Error, 2023-11-08 17:47:32,243 ERROR org.apache.seatunnel.core.starter.SeaTunnel - Please submit bug report in https://github.com/apache/seatunnel/issues 2023-11-08 17:47:32,243 ERROR org.apache.seatunnel.core.starter.SeaTunnel - Reason:SeaTunnel job executed failed 2023-11-08 17:47:32,244 ERROR org.apache.seatunnel.core.starter.SeaTunnel - Exception StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:191) at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40) at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34) Caused by: java.util.concurrent.CompletionException: java.lang.NoClassDefFoundError: com/sun/jersey/client/impl/CopyOnWriteHashMap at com.hazelcast.spi.impl.AbstractInvocationFuture.wrapInCompletionException(AbstractInvocationFuture.java:1347) at com.hazelcast.spi.impl.AbstractInvocationFuture.cascadeException(AbstractInvocationFuture.java:1340) at com.hazelcast.spi.impl.AbstractInvocationFuture.access$200(AbstractInvocationFuture.java:65) at com.hazelcast.spi.impl.AbstractInvocationFuture$ApplyNode.execute(AbstractInvocationFuture.java:1478) at com.hazelcast.spi.impl.AbstractInvocationFuture.unblockOtherNode(AbstractInvocationFuture.java:797) at com.hazelcast.spi.impl.AbstractInvocationFuture.unblockAll(AbstractInvocationFuture.java:759) at com.hazelcast.spi.impl.AbstractInvocationFuture.complete0(AbstractInvocationFuture.java:1235) at com.hazelcast.spi.impl.AbstractInvocationFuture.completeExceptionallyInternal(AbstractInvocationFuture.java:1223) at com.hazelcast.spi.impl.AbstractInvocationFuture.completeExceptionally(AbstractInvocationFuture.java:709) at com.hazelcast.client.impl.spi.impl.ClientInvocation.completeExceptionally(ClientInvocation.java:294) at com.hazelcast.client.impl.spi.impl.ClientInvocation.notifyExceptionWithOwnedPermission(ClientInvocation.java:321) at com.hazelcast.client.impl.spi.impl.ClientInvocation.notifyException(ClientInvocation.java:304) at com.hazelcast.client.impl.spi.impl.ClientResponseHandlerSupplier.handleResponse(ClientResponseHandlerSupplier.java:164) at com.hazelcast.client.impl.spi.impl.ClientResponseHandlerSupplier.process(ClientResponseHandlerSupplier.java:141) at com.hazelcast.client.impl.spi.impl.ClientResponseHandlerSupplier.access$300(ClientResponseHandlerSupplier.java:60) at com.hazelcast.client.impl.spi.impl.ClientResponseHandlerSupplier$DynamicResponseHandler.accept(ClientResponseHandlerSupplier.java:251) at com.hazelcast.client.impl.spi.impl.ClientResponseHandlerSupplier$DynamicResponseHandler.accept(ClientResponseHandlerSupplier.java:243) at com.hazelcast.client.impl.connection.tcp.TcpClientConnection.handleClientMessage(TcpClientConnection.java:245) at com.hazelcast.client.impl.protocol.util.ClientMessageDecoder.handleMessage(ClientMessageDecoder.java:135) at com.hazelcast.client.impl.protocol.util.ClientMessageDecoder.onRead(ClientMessageDecoder.java:89) at com.hazelcast.internal.networking.nio.NioInboundPipeline.process(NioInboundPipeline.java:136) at com.hazelcast.internal.networking.nio.NioThread.processSelectionKey(NioThread.java:383) at com.hazelcast.internal.networking.nio.NioThread.processSelectionKeys(NioThread.java:368) at com.hazelcast.internal.networking.nio.NioThread.selectLoop(NioThread.java:294) at com.hazelcast.internal.networking.nio.NioThread.executeRun(NioThread.java:249) at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102) Caused by: java.lang.NoClassDefFoundError: com/sun/jersey/client/impl/CopyOnWriteHashMap at org.apache.seatunnel.engine.server.checkpoint.CheckpointPlan$Builder.(CheckpointPlan.java:66)
下载安装包
下载地址[https://repo.maven.apache.org/maven2/org/apache/seatunnel/seatunnel-hadoop3-3.1.4-uber/2.3.2/seatunnel-hadoop3-3.1.4-uber-2.3.2-optional.jar]
将安装包放入 $SEATUNNEL_HOME/lib 下面
再次运行:
./bin/seatunnel.sh --config ./config/v2.batch.config.template -e local
运行结果
2023-11-08 17:55:32,575 INFO org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - wait checkpoint completed: 9223372036854775807 2023-11-08 17:55:32,621 INFO org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - pending checkpoint(9223372036854775807/1@774572734674894849) notify finished! 2023-11-08 17:55:32,621 INFO org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - start notify checkpoint completed, checkpoint:org.apache.seatunnel.engine.server.checkpoint.CompletedCheckpoint@7e4b2ce6 2023-11-08 17:55:32,627 INFO org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - start clean pending checkpoint cause CheckpointCoordinator completed. 2023-11-08 17:55:32,628 INFO org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator - Turn checkpoint_state_774572734674894849_1 state from null to FINISHED 2023-11-08 17:55:32,672 INFO org.apache.seatunnel.engine.server.TaskExecutionService - [localhost]:5801 [seatunnel-610439] [5.1] taskDone, taskId = 20000, taskGroup = TaskGroupLocation{jobId=774572734674894849, pipelineId=1, taskGroupId=1} 2023-11-08 17:55:32,672 INFO org.apache.seatunnel.engine.server.TaskExecutionService - [localhost]:5801 [seatunnel-610439] [5.1] Task TaskGroupLocation{jobId=774572734674894849, pipelineId=1, taskGroupId=1} complete with state FINISHED 2023-11-08 17:55:32,672 INFO org.apache.seatunnel.engine.server.CoordinatorService - [localhost]:5801 [seatunnel-610439] [5.1] Received task end from execution TaskGroupLocation{jobId=774572734674894849, pipelineId=1, taskGroupId=1}, state FINISHED 2023-11-08 17:55:32,674 INFO org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex - Job SeaTunnel_Job (774572734674894849), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-FakeSource-fake]-SplitEnumerator (1/1)] turn to end state FINISHED. 2023-11-08 17:55:32,674 INFO org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex - Job SeaTunnel_Job (774572734674894849), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-FakeSource-fake]-SplitEnumerator (1/1)] end with state FINISHED 2023-11-08 17:55:32,684 INFO org.apache.seatunnel.engine.server.TaskExecutionService - [localhost]:5801 [seatunnel-610439] [5.1] taskDone, taskId = 50000, taskGroup = TaskGroupLocation{jobId=774572734674894849, pipelineId=1, taskGroupId=30000} 2023-11-08 17:55:32,684 INFO org.apache.seatunnel.engine.server.TaskExecutionService - [localhost]:5801 [seatunnel-610439] [5.1] taskDone, taskId = 50001, taskGroup = TaskGroupLocation{jobId=774572734674894849, pipelineId=1, taskGroupId=30001} 2023-11-08 17:55:33,480 INFO org.apache.seatunnel.engine.server.TaskExecutionService - [localhost]:5801 [seatunnel-610439] [5.1] taskDone, taskId = 40001, taskGroup = TaskGroupLocation{jobId=774572734674894849, pipelineId=1, taskGroupId=30001} 2023-11-08 17:55:33,480 INFO org.apache.seatunnel.engine.server.TaskExecutionService - [localhost]:5801 [seatunnel-610439] [5.1] taskDone, taskId = 40000, taskGroup = TaskGroupLocation{jobId=774572734674894849, pipelineId=1, taskGroupId=30000} 2023-11-08 17:55:33,480 INFO org.apache.seatunnel.engine.server.TaskExecutionService - [localhost]:5801 [seatunnel-610439] [5.1] Task TaskGroupLocation{jobId=774572734674894849, pipelineId=1, taskGroupId=30001} complete with state FINISHED 2023-11-08 17:55:33,480 INFO org.apache.seatunnel.engine.server.TaskExecutionService - [localhost]:5801 [seatunnel-610439] [5.1] Task TaskGroupLocation{jobId=774572734674894849, pipelineId=1, taskGroupId=30000} complete with state FINISHED 2023-11-08 17:55:33,480 INFO org.apache.seatunnel.engine.server.CoordinatorService - [localhost]:5801 [seatunnel-610439] [5.1] Received task end from execution TaskGroupLocation{jobId=774572734674894849, pipelineId=1, taskGroupId=30001}, state FINISHED 2023-11-08 17:55:33,480 INFO org.apache.seatunnel.engine.server.CoordinatorService - [localhost]:5801 [seatunnel-610439] [5.1] Received task end from execution TaskGroupLocation{jobId=774572734674894849, pipelineId=1, taskGroupId=30000}, state FINISHED 2023-11-08 17:55:33,482 INFO org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex - Job SeaTunnel_Job (774572734674894849), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-FakeSource-fake]-SourceTask (1/2)] turn to end state FINISHED. 2023-11-08 17:55:33,482 INFO org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex - Job SeaTunnel_Job (774572734674894849), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-FakeSource-fake]-SourceTask (2/2)] turn to end state FINISHED. 2023-11-08 17:55:33,482 INFO org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex - Job SeaTunnel_Job (774572734674894849), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-FakeSource-fake]-SourceTask (1/2)] end with state FINISHED 2023-11-08 17:55:33,482 INFO org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex - Job SeaTunnel_Job (774572734674894849), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-FakeSource-fake]-SourceTask (2/2)] end with state FINISHED 2023-11-08 17:55:33,482 INFO org.apache.seatunnel.engine.server.dag.physical.SubPlan - Job SeaTunnel_Job (774572734674894849), Pipeline: [(1/1)] end with state FINISHED 2023-11-08 17:55:33,506 INFO org.apache.seatunnel.engine.server.master.JobMaster - release the pipeline Job SeaTunnel_Job (774572734674894849), Pipeline: [(1/1)] resource 2023-11-08 17:55:33,507 INFO org.apache.seatunnel.engine.server.service.slot.DefaultSlotService - received slot release request, jobID: 774572734674894849, slot: SlotProfile{worker=[localhost]:5801, slotID=1, ownerJobID=774572734674894849, assigned=true, resourceProfile=ResourceProfile{cpu=CPU{core=0}, heapMemory=Memory{bytes=0}}, sequence='82bd6da9-17b1-41ba-b745-9f6d4fea5378'} 2023-11-08 17:55:33,507 INFO org.apache.seatunnel.engine.server.service.slot.DefaultSlotService - received slot release request, jobID: 774572734674894849, slot: SlotProfile{worker=[localhost]:5801, slotID=2, ownerJobID=774572734674894849, assigned=true, resourceProfile=ResourceProfile{cpu=CPU{core=0}, heapMemory=Memory{bytes=0}}, sequence='82bd6da9-17b1-41ba-b745-9f6d4fea5378'} 2023-11-08 17:55:33,507 INFO org.apache.seatunnel.engine.server.service.slot.DefaultSlotService - received slot release request, jobID: 774572734674894849, slot: SlotProfile{worker=[localhost]:5801, slotID=3, ownerJobID=774572734674894849, assigned=true, resourceProfile=ResourceProfile{cpu=CPU{core=0}, heapMemory=Memory{bytes=0}}, sequence='82bd6da9-17b1-41ba-b745-9f6d4fea5378'} 2023-11-08 17:55:33,510 INFO org.apache.seatunnel.engine.server.dag.physical.SubPlan - Job SeaTunnel_Job (774572734674894849), Pipeline: [(1/1)] turn to end state FINISHED. 2023-11-08 17:55:33,511 INFO org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan - Job SeaTunnel_Job (774572734674894849) end with state FINISHED 2023-11-08 17:55:33,523 INFO org.apache.seatunnel.engine.client.job.ClientJobProxy - Job (774572734674894849) end with state FINISHED 2023-11-08 17:55:33,546 INFO org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - *********************************************** Job Statistic Information *********************************************** Start Time : 2023-11-08 17:55:30 End Time : 2023-11-08 17:55:33 Total Time(s) : 2 Total Read Count : 32 Total Write Count : 32 Total Failed Count : 0 *********************************************** 2023-11-08 17:55:33,546 INFO com.hazelcast.core.LifecycleService - hz.client_1 [seatunnel-610439] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is SHUTTING_DOWN 2023-11-08 17:55:33,548 INFO com.hazelcast.internal.server.tcp.TcpServerConnection - [localhost]:5801 [seatunnel-610439] [5.1] Connection[id=1, /127.0.0.1:5801->/127.0.0.1:33602, qualifier=null, endpoint=[127.0.0.1]:33602, remoteUuid=38fbe3e1-876c-48e9-b145-1989888393ab, alive=false, connectionType=JVM, planeIndex=-1] closed. Reason: Connection closed by the other side 2023-11-08 17:55:33,548 INFO com.hazelcast.client.impl.connection.ClientConnectionManager - hz.client_1 [seatunnel-610439] [5.1] Removed connection to endpoint: [localhost]:5801:6f7f4921-7d71-42af-b26a-6f11a7960118, connection: ClientConnection{alive=false, connectionId=1, channel=NioChannel{/127.0.0.1:33602->localhost/127.0.0.1:5801}, remoteAddress=[localhost]:5801, lastReadTime=2023-11-08 17:55:33.543, lastWriteTime=2023-11-08 17:55:33.523, closedTime=2023-11-08 17:55:33.547, connected server version=5.1} 2023-11-08 17:55:33,548 INFO com.hazelcast.core.LifecycleService - hz.client_1 [seatunnel-610439] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is CLIENT_DISCONNECTED 2023-11-08 17:55:33,549 INFO com.hazelcast.client.impl.ClientEndpointManager - [localhost]:5801 [seatunnel-610439] [5.1] Destroying ClientEndpoint{connection=Connection[id=1, /127.0.0.1:5801->/127.0.0.1:33602, qualifier=null, endpoint=[127.0.0.1]:33602, remoteUuid=38fbe3e1-876c-48e9-b145-1989888393ab, alive=false, connectionType=JVM, planeIndex=-1], clientUuid=38fbe3e1-876c-48e9-b145-1989888393ab, clientName=hz.client_1, authenticated=true, clientVersion=5.1, creationTime=1699437330660, latest clientAttributes=lastStatisticsCollectionTime=1699437330713,enterprise=false,clientType=JVM,clientVersion=5.1,clusterConnectionTimestamp=1699437330653,clientAddress=127.0.0.1,clientName=hz.client_1,credentials.principal=null,os.committedVirtualMemorySize=7048228864,os.freePhysicalMemorySize=15730012160,os.freeSwapSpaceSize=0,os.maxFileDescriptorCount=65535,os.openFileDescriptorCount=51,os.processCpuTime=5290000000,os.systemLoadAverage=0.86,os.totalPhysicalMemorySize=33566306304,os.totalSwapSpaceSize=0,runtime.availableProcessors=8,runtime.freeMemory=994659352,runtime.maxMemory=1029177344,runtime.totalMemory=1029177344,runtime.uptime=2424,runtime.usedMemory=34517992, labels=[]} 2023-11-08 17:55:33,550 INFO com.hazelcast.core.LifecycleService - hz.client_1 [seatunnel-610439] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is SHUTDOWN 2023-11-08 17:55:33,550 INFO org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - Closed SeaTunnel client...... 2023-11-08 17:55:33,551 INFO com.hazelcast.core.LifecycleService - [localhost]:5801 [seatunnel-610439] [5.1] [localhost]:5801 is SHUTTING_DOWN 2023-11-08 17:55:33,553 INFO com.hazelcast.internal.partition.impl.MigrationManager - [localhost]:5801 [seatunnel-610439] [5.1] Shutdown request of Member [localhost]:5801 - 6f7f4921-7d71-42af-b26a-6f11a7960118 this is handled 2023-11-08 17:55:33,557 INFO com.hazelcast.instance.impl.Node - [localhost]:5801 [seatunnel-610439] [5.1] Shutting down connection manager... 2023-11-08 17:55:33,558 INFO com.hazelcast.instance.impl.Node - [localhost]:5801 [seatunnel-610439] [5.1] Shutting down node engine... 2023-11-08 17:55:35,980 INFO com.hazelcast.instance.impl.NodeExtension - [localhost]:5801 [seatunnel-610439] [5.1] Destroying node NodeExtension. 2023-11-08 17:55:35,980 INFO com.hazelcast.instance.impl.Node - [localhost]:5801 [seatunnel-610439] [5.1] Hazelcast Shutdown is completed in 2427 ms. 2023-11-08 17:55:35,980 INFO com.hazelcast.core.LifecycleService - [localhost]:5801 [seatunnel-610439] [5.1] [localhost]:5801 is SHUTDOWN 2023-11-08 17:55:35,980 INFO org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - Closed HazelcastInstance ...... 2023-11-08 17:55:35,981 INFO org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - Closed metrics executor service ...... 2023-11-08 17:55:35,981 INFO org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - run shutdown hook because get close signal
日志输入如上内容,说明配置成功
接下来,我们在测试一下使用seatunnel实现从mysql同步到mysql的配置
首先,需要下载mysql jdbc的的依赖,这里我们可以选择在plugin-mapping.properties文件中配置connector-jdbc ,也可以直接将connector-jdbc的jar包放入到 $SEATUNNEL_HOME//connectors/seatunnel 下面
create database test_01; create table test_01.user(userid int(4) primary key not null auto_increment,username varchar(16) not null); create database test_02; create table test_02.user(userid int(4) primary key not null auto_increment,username varchar(16) not null);
insert into test_01.user (username) values ("zhangsan"); insert into test_01.user (username) values ("lisi");
env { job.mode = "BATCH" } # 配置数据源 source { jdbc { url = "jdbc:mysql://172.1.1.54:3306/test_01" driver = "com.mysql.cj.jdbc.Driver" user = "admin" password = "xxxxxxx" generate_sink_sql = true database = "test_01" table = "user" query = "select * from test_01.user" } } transform { } # 配置目标库 sink { jdbc { url = "jdbc:mysql://172.1.1.54:3306/test_02" driver = "com.mysql.cj.jdbc.Driver" user = "admin" password = "xxxxxx" generate_sink_sql = true database = "test_02" table = "user" } }
运行命令:
./bin/seatunnel.sh -e LOCAL -c ./config/mysql-to-mysql.conf
输出如下信息表示同步成功:
023-11-08 20:55:00,468 INFO org.apache.seatunnel.engine.server.service.slot.DefaultSlotService - received slot release request, jobID: 774617897816293377, slot: SlotProfile{worker=[localhost]:5801, slotID=1, ownerJobID=774617897816293377, assigned=true, resourceProfile=ResourceProfile{cpu=CPU{core=0}, heapMemory=Memory{bytes=0}}, sequence='36c05043-4938-47e0-927c-94e7cac4f749'} 2023-11-08 20:55:00,468 INFO org.apache.seatunnel.engine.server.service.slot.DefaultSlotService - received slot release request, jobID: 774617897816293377, slot: SlotProfile{worker=[localhost]:5801, slotID=2, ownerJobID=774617897816293377, assigned=true, resourceProfile=ResourceProfile{cpu=CPU{core=0}, heapMemory=Memory{bytes=0}}, sequence='36c05043-4938-47e0-927c-94e7cac4f749'} 2023-11-08 20:55:00,470 INFO org.apache.seatunnel.engine.server.dag.physical.SubPlan - Job SeaTunnel_Job (774617897816293377), Pipeline: [(1/1)] turn to end state FINISHED. 2023-11-08 20:55:00,471 INFO org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan - Job SeaTunnel_Job (774617897816293377) end with state FINISHED 2023-11-08 20:55:00,483 INFO org.apache.seatunnel.engine.client.job.ClientJobProxy - Job (774617897816293377) end with state FINISHED 2023-11-08 20:55:00,511 INFO org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - *********************************************** Job Statistic Information *********************************************** Start Time : 2023-11-08 20:54:58 End Time : 2023-11-08 20:55:00 Total Time(s) : 1 Total Read Count : 2 Total Write Count : 2 Total Failed Count : 0 *********************************************** 2023-11-08 20:55:00,511 INFO com.hazelcast.core.LifecycleService - hz.client_1 [seatunnel-250845] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is SHUTTING_DOWN 2023-11-08 20:55:00,514 INFO com.hazelcast.internal.server.tcp.TcpServerConnection - [localhost]:5801 [seatunnel-250845] [5.1] Connection[id=1, /127.0.0.1:5801->/127.0.0.1:33254, qualifier=null, endpoint=[127.0.0.1]:33254, remoteUuid=332c6983-64e1-4d2a-8d4b-7b40d9677a39, alive=false, connectionType=JVM, planeIndex=-1] closed. Reason: Connection closed by the other side 2023-11-08 20:55:00,514 INFO com.hazelcast.client.impl.connection.ClientConnectionManager - hz.client_1 [seatunnel-250845] [5.1] Removed connection to endpoint: [localhost]:5801:1373b501-f19f-47a2-9ef2-66d14e5a31c0, connection: ClientConnection{alive=false, connectionId=1, channel=NioChannel{/127.0.0.1:33254->localhost/127.0.0.1:5801}, remoteAddress=[localhost]:5801, lastReadTime=2023-11-08 20:55:00.508, lastWriteTime=2023-11-08 20:55:00.483, closedTime=2023-11-08 20:55:00.512, connected server version=5.1} 2023-11-08 20:55:00,515 INFO com.hazelcast.core.LifecycleService - hz.client_1 [seatunnel-250845] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is CLIENT_DISCONNECTED 2023-11-08 20:55:00,516 INFO com.hazelcast.client.impl.ClientEndpointManager - [localhost]:5801 [seatunnel-250845] [5.1] Destroying ClientEndpoint{connection=Connection[id=1, /127.0.0.1:5801->/127.0.0.1:33254, qualifier=null, endpoint=[127.0.0.1]:33254, remoteUuid=332c6983-64e1-4d2a-8d4b-7b40d9677a39, alive=false, connectionType=JVM, planeIndex=-1], clientUuid=332c6983-64e1-4d2a-8d4b-7b40d9677a39, clientName=hz.client_1, authenticated=true, clientVersion=5.1, creationTime=1699448098390, latest clientAttributes=lastStatisticsCollectionTime=1699448098443,enterprise=false,clientType=JVM,clientVersion=5.1,clusterConnectionTimestamp=1699448098383,clientAddress=127.0.0.1,clientName=hz.client_1,credentials.principal=null,os.committedVirtualMemorySize=7048228864,os.freePhysicalMemorySize=12655243264,os.freeSwapSpaceSize=0,os.maxFileDescriptorCount=65535,os.openFileDescriptorCount=51,os.processCpuTime=5340000000,os.systemLoadAverage=0.24,os.totalPhysicalMemorySize=33566306304,os.totalSwapSpaceSize=0,runtime.availableProcessors=8,runtime.freeMemory=994543624,runtime.maxMemory=1029177344,runtime.totalMemory=1029177344,runtime.uptime=2526,runtime.usedMemory=34633720, labels=[]} 2023-11-08 20:55:00,516 INFO com.hazelcast.core.LifecycleService - hz.client_1 [seatunnel-250845] [5.1] HazelcastClient 5.1 (20220228 - 21f20e7) is SHUTDOWN 2023-11-08 20:55:00,516 INFO org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - Closed SeaTunnel client...... 2023-11-08 20:55:00,517 INFO com.hazelcast.core.LifecycleService - [localhost]:5801 [seatunnel-250845] [5.1] [localhost]:5801 is SHUTTING_DOWN 2023-11-08 20:55:00,520 INFO com.hazelcast.internal.partition.impl.MigrationManager - [localhost]:5801 [seatunnel-250845] [5.1] Shutdown request of Member [localhost]:5801 - 1373b501-f19f-47a2-9ef2-66d14e5a31c0 this is handled 2023-11-08 20:55:00,523 INFO com.hazelcast.instance.impl.Node - [localhost]:5801 [seatunnel-250845] [5.1] Shutting down connection manager... 2023-11-08 20:55:00,525 INFO com.hazelcast.instance.impl.Node - [localhost]:5801 [seatunnel-250845] [5.1] Shutting down node engine... 2023-11-08 20:55:03,377 INFO com.hazelcast.instance.impl.NodeExtension - [localhost]:5801 [seatunnel-250845] [5.1] Destroying node NodeExtension. 2023-11-08 20:55:03,377 INFO com.hazelcast.instance.impl.Node - [localhost]:5801 [seatunnel-250845] [5.1] Hazelcast Shutdown is completed in 2858 ms. 2023-11-08 20:55:03,378 INFO com.hazelcast.core.LifecycleService - [localhost]:5801 [seatunnel-250845] [5.1] [localhost]:5801 is SHUTDOWN 2023-11-08 20:55:03,378 INFO org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - Closed HazelcastInstance ...... 2023-11-08 20:55:03,378 INFO org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - Closed metrics executor service ...... 2023-11-08 20:55:03,379 INFO org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand - run shutdown hook because get close signal
MySQL [(none)]> MySQL [(none)]> select * from test_02.user; +--------+----------+ | userid | username | +--------+----------+ | 1 | zhangsan | | 2 | lisi | +--------+----------+ 2 rows in set (0.00 sec)
修改FLINK_HOME为flink部署目录
修改Spark_HOME为spark部署目录
FLINK_HOME = /data/flink-1.14.5/ SPRK_HOME = /data/spark-2.4.6/
./bin/start_cluster.sh
注意:如果是同步mysql的话,需要将jdbc的jar包放在flink/lib目录下,这次其实和使用flink做一些数据同步一样,相关的依赖包都给到flink。
./bin/start-seatunnel-flink-15-connector-v2.sh --config ./config/mysql-to-mysql.conf
执行结果如下:
Execute SeaTunnel Flink Job: ${FLINK_HOME}/bin/flink run -c org.apache.seatunnel.core.starter.flink.SeaTunnelFlink /mnt/apache-seatunnel-2.3.3/starter/seatunnel-flink-13-starter.jar --config ./config/mysql-to-mysql.conf --name SeaTunnel SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/mnt/kmr/flink1/1/flink-1.14.5/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/mnt/kmr/hadoop/1/hadoop-3.1.1/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory] Job has been submitted with JobID a643a1990705f817479b1d2880c9f038 Program execution finished Job with JobID a643a1990705f817479b1d2880c9f038 has finished. Job Runtime: 2356 ms
MySQL [(none)]> truncate table test_02.user; Query OK, 0 rows affected (0.01 sec)
./bin/start-seatunnel-spark-2-connector-v2.sh \ --master local[2] \ --deploy-mode client \ --config ./config/mysql-to-mysql.conf
23/11/08 21:19:57 INFO executor.FieldNamedPreparedStatement: PrepareStatement sql is: INSERT INTO `test_02`.`user` (`userid`, `username`) VALUES (?, ?) 23/11/08 21:19:57 INFO v2.DataWritingSparkTask: Commit authorized for partition 0 (task 0, attempt 0, stage 0.0) 23/11/08 21:19:57 INFO v2.DataWritingSparkTask: Committed partition 0 (task 0, attempt 0, stage 0.0) 23/11/08 21:19:57 INFO executor.Executor: Finished task 0.0 in stage 0.0 (TID 0). 1148 bytes result sent to driver 23/11/08 21:19:57 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 1486 ms on localhost (executor driver) (1/1) 23/11/08 21:19:57 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 23/11/08 21:19:57 INFO scheduler.DAGScheduler: ResultStage 0 (save at SinkExecuteProcessor.java:117) finished in 1.568 s 23/11/08 21:19:57 INFO scheduler.DAGScheduler: Job 0 finished: save at SinkExecuteProcessor.java:117, took 1.610054 s 23/11/08 21:19:57 INFO v2.WriteToDataSourceV2Exec: Data source writer org.apache.seatunnel.translation.spark.sink.writer.SparkDataSourceWriter@25ea068e is committing. 23/11/08 21:19:57 INFO v2.WriteToDataSourceV2Exec: Data source writer org.apache.seatunnel.translation.spark.sink.writer.SparkDataSourceWriter@25ea068e committed. 23/11/08 21:19:57 INFO execution.SparkExecution: Spark Execution started 23/11/08 21:19:57 INFO spark.SparkContext: Invoking stop() from shutdown hook 23/11/08 21:19:57 INFO server.AbstractConnector: Stopped Spark@6e8a9c30{HTTP/1.1,[http/1.1]}{0.0.0.0:4040} 23/11/08 21:19:57 INFO ui.SparkUI: Stopped Spark web UI at http://kmr-b55b8d33-gn-0a6e9139-az1-master-1-2.ksc.com:4040 23/11/08 21:19:57 INFO spark.MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 23/11/08 21:19:57 INFO memory.MemoryStore: MemoryStore cleared 23/11/08 21:19:57 INFO storage.BlockManager: BlockManager stopped 23/11/08 21:19:57 INFO storage.BlockManagerMaster: BlockManagerMaster stopped 23/11/08 21:19:57 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 23/11/08 21:19:57 INFO spark.SparkContext: Successfully stopped SparkContext 23/11/08 21:19:57 INFO util.ShutdownHookManager: Shutdown hook called 23/11/08 21:19:57 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-6dfc2825-0da5-4e4c-9623-bddcccab0849
查询数据表,已导入成功
MySQL [(none)]> truncate table test_02.user; Query OK, 0 rows affected (0.01 sec) MySQL [(none)]> select * from test_02.user; +--------+----------+ | userid | username | +--------+----------+ | 1 | zhangsan | | 2 | lisi | +--------+----------+
上一篇:MySQL 教程 1.4