SeaTunnel简介:Apache SeaTunnel 介绍-CSDN博客
部署
准备工作
在开始本地运行前,您需要确保您已经安装了SeaTunnel所需要的以下软件:
- 安装Java (Java 8 或 11, 其他高于Java 8的版本理论上也可以工作) 以及设置
JAVA_HOME
。
下载 SeaTunnel 发行包
下载二进制包
进入SeaTunnel下载页面下载最新版本的二进制安装包seatunnel-<version>-bin.tar.gz
或者您也可以通过终端下载:
export version="2.3.8"
wget "https://archive.apache.org/dist/seatunnel/${version}/apache-seatunnel-${version}-bin.tar.gz"
tar -xzvf "apache-seatunnel-${version}-bin.tar.gz"
下载连接器插件
从2.2.0-beta版本开始,二进制包不再默认提供连接器依赖,因此在第一次使用时,您需要执行以下命令来安装连接器:(当然,您也可以从 Apache Maven Repository 手动下载连接器,然后将其移动至connectors/
目录下,如果是2.3.5之前则需要放入connectors/seatunnel
目录下)。
sh bin/install-plugin.sh
如果您需要指定的连接器版本,以2.3.7为例,您需要执行如下命令:
sh bin/install-plugin.sh 2.3.8
通常情况下,你不需要所有的连接器插件。你可以通过配置config/plugin_config
来指定所需的插件。例如,如果你想让示例应用程序正常工作,你将需要connector-console
和connector-fake
插件。你可以修改plugin_config
配置文件,如下所示:
--seatunnel-connectors--
connector-fake
connector-console
--end--
您可以在${SEATUNNEL_HOME}/connectors/plugins-mapping.properties
下找到所有支持的连接器和相应的plugin_config配置名称。
提示
如果您想通过手动下载连接器的方式来安装连接器插件,则需要下载您所需要的连接器插件即可,并将它们放在${SEATUNNEL_HOME}/connectors/
目录下。
从源码构建SeaTunnel
下载源码
从源码构建SeaTunnel。下载源码的方式与下载二进制包的方式相同。 您可以从下载页面下载源码,或者从GitHub仓库克隆源码。
构建源码
cd seatunnel
sh ./mvnw clean install -DskipTests -Dskip.spotless=true
# 获取构建好的二进制包
cp seatunnel-dist/target/apache-seatunnel-2.3.8-bin.tar.gz /The-Path-You-Want-To-Copycd /The-Path-You-Want-To-Copy
tar -xzvf "apache-seatunnel-${version}-bin.tar.gz"
当从源码构建时,所有的连接器插件和一些必要的依赖(例如:mysql驱动)都包含在二进制包中。您可以直接使用连接器插件,而无需单独安装它们。
SeaTunnel 引擎快速开始
步骤 1: 部署SeaTunnel及连接器
在开始前,请确保您已经按照安装中的描述下载并部署了SeaTunnel。
步骤 2: 添加作业配置文件来定义作业
编辑config/v2.batch.config.template
,它决定了当seatunnel启动后数据输入、处理和输出的方式及逻辑。 下面是配置文件的示例,它与上面提到的示例应用程序相同。
env {parallelism = 1job.mode = "BATCH"
}source {FakeSource {result_table_name = "fake"row.num = 16schema = {fields {name = "string"age = "int"}}}
}transform {FieldMapper {source_table_name = "fake"result_table_name = "fake1"field_mapper = {age = agename = new_name}}
}sink {Console {source_table_name = "fake1"}
}
关于配置的更多信息请查看配置的基本概念
步骤 3: 运行SeaTunnel应用程序
您可以通过以下命令启动应用程序:
提示
从2.3.1版本开始,seatunnel.sh中的-e参数被废弃,请改用-m参数。
cd "apache-seatunnel-${version}"
./bin/seatunnel.sh --config ./config/v2.batch.config.template -m local
查看输出: 当您运行该命令时,您可以在控制台中看到它的输出。您可以认为这是命令运行成功或失败的标志。
SeaTunnel控制台将会打印一些如下日志信息:
2022-12-19 11:01:45,417 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - output rowType: name<STRING>, age<INT>
2022-12-19 11:01:46,489 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=1: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: CpiOd, 8520946
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=2: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: eQqTs, 1256802974
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=3: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: UsRgO, 2053193072
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=4: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: jDQJj, 1993016602
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=5: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: rqdKp, 1392682764
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=6: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: wCoWN, 986999925
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=7: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: qomTU, 72775247
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=8: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: jcqXR, 1074529204
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=9: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: AkWIO, 1961723427
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=10: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: hBoib, 929089763
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=11: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: GSvzm, 827085798
2022-12-19 11:01:46,491 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=12: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: NNAYI, 94307133
2022-12-19 11:01:46,491 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=13: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: EexFl, 1823689599
2022-12-19 11:01:46,491 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=14: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: CBXUb, 869582787
2022-12-19 11:01:46,491 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=15: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: Wbxtm, 1469371353
2022-12-19 11:01:46,491 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=16: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: mIJDt, 995616438
扩展示例:从 MySQL 到 Doris 批处理模式
步骤1:下载连接器
首先,您需要在${SEATUNNEL_HOME}/config/plugin_config
文件中加入连接器名称,然后,执行命令来安装连接器(当然,您也可以从 Apache Maven Repository 手动下载连接器,然后将其移动至connectors/
目录下),最后,确认连接器connector-jdbc
、connector-doris
在${SEATUNNEL_HOME}/connectors/
目录下即可。
# 配置连接器名称
--seatunnel-connectors--
connector-jdbc
connector-doris
--end--
# 安装连接器
sh bin/install-plugin.sh
步骤2:放入 MySQL 驱动
您需要下载 jdbc driver jar package 驱动,并放置在 ${SEATUNNEL_HOME}/lib/
目录下
步骤3:添加作业配置文件来定义作业
cd seatunnel/job/vim st.confenv {parallelism = 2job.mode = "BATCH"
}
source {Jdbc {url = "jdbc:mysql://localhost:3306/test"driver = "com.mysql.cj.jdbc.Driver"connection_check_timeout_sec = 100user = "user"password = "pwd"table_path = "test.table_name"query = "select * from test.table_name"}
}sink {Doris {fenodes = "doris_ip:8030"username = "user"password = "pwd"database = "test_db"table = "table_name"sink.enable-2pc = "true"sink.label-prefix = "test-cdc"doris.config = {format = "json"read_json_by_line="true"}}
}
关于配置的更多信息请查看配置的基本概念
步骤 4: 运行SeaTunnel应用程序
您可以通过以下命令启动应用程序:
cd seatunnel/
./bin/seatunnel.sh --config ./job/st.conf -m local
查看输出: 当您运行该命令时,您可以在控制台中看到它的输出。您可以认为这是命令运行成功或失败的标志。
SeaTunnel控制台将会打印一些如下日志信息:
***********************************************Job Statistic Information
***********************************************
Start Time : 2024-08-13 10:21:49
End Time : 2024-08-13 10:21:53
Total Time(s) : 4
Total Read Count : 1000
Total Write Count : 1000
Total Failed Count : 0
***********************************************
Flink 引擎快速开始
步骤 1: 部署SeaTunnel及连接器
在开始前,请确保您已经按照部署中的描述下载并部署了SeaTunnel。
步骤 2: 部署并配置Flink
Flink安装可参考:Flink入门-CSDN博客
配置SeaTunnel: 修改config/seatunnel-env.sh
中的设置,将FLINK_HOME
配置设置为Flink的部署目录。
步骤 3: 添加作业配置文件来定义作业
编辑config/v2.streaming.conf.template
,它决定了SeaTunnel启动后数据输入、处理和输出的方式及逻辑。 下面是配置文件的示例,它与上面提到的示例应用程序相同。
env {parallelism = 1job.mode = "BATCH"
}source {FakeSource {result_table_name = "fake"row.num = 16schema = {fields {name = "string"age = "int"}}}
}transform {FieldMapper {source_table_name = "fake"result_table_name = "fake1"field_mapper = {age = agename = new_name}}
}sink {Console {source_table_name = "fake1"}
}
关于配置的更多信息请查看配置的基本概念
步骤 4: 运行SeaTunnel应用程序
您可以通过以下命令启动应用程序:
Flink版本1.12.x
到1.14.x
cd "apache-seatunnel-${version}"
./bin/start-seatunnel-flink-13-connector-v2.sh --config ./config/v2.streaming.conf.template
Flink版本1.15.x
到1.18.x
cd "apache-seatunnel-${version}"
./bin/start-seatunnel-flink-15-connector-v2.sh --config ./config/v2.streaming.conf.template
查看输出: 当您运行该命令时,您可以在控制台中看到它的输出。您可以认为这是命令运行成功或失败的标志。
SeaTunnel控制台将会打印一些如下日志信息:
fields : name, age
types : STRING, INT
row=1 : elWaB, 1984352560
row=2 : uAtnp, 762961563
row=3 : TQEIB, 2042675010
row=4 : DcFjo, 593971283
row=5 : SenEb, 2099913608
row=6 : DHjkg, 1928005856
row=7 : eScCM, 526029657
row=8 : sgOeE, 600878991
row=9 : gwdvw, 1951126920
row=10 : nSiKE, 488708928
row=11 : xubpl, 1420202810
row=12 : rHZqb, 331185742
row=13 : rciGD, 1112878259
row=14 : qLhdI, 1457046294
row=15 : ZTkRx, 1240668386
row=16 : SGZCr, 94186144
Spark 引擎快速开始
步骤 1: 部署SeaTunnel及连接器
在开始前,请确保您已经按照部署中的描述下载并部署了SeaTunnel。
步骤 2: 部署并配置Spark
请先下载Spark(需要版本 >= 2.4.0)。 更多信息您可以查看入门: Standalone模式
配置SeaTunnel: 修改config/seatunnel-env.sh
中的设置,它是基于你的引擎在部署时的安装路径。 将SPARK_HOME
修改为Spark的部署目录。
步骤 3: 添加作业配置文件来定义作业
编辑config/v2.streaming.conf.template
,它决定了当SeaTunnel启动后数据输入、处理和输出的方式及逻辑。 下面是配置文件的示例,它与上面提到的示例应用程序相同。
env {parallelism = 1job.mode = "BATCH"
}source {FakeSource {result_table_name = "fake"row.num = 16schema = {fields {name = "string"age = "int"}}}
}transform {FieldMapper {source_table_name = "fake"result_table_name = "fake1"field_mapper = {age = agename = new_name}}
}sink {Console {source_table_name = "fake1"}
}
关于配置的更多信息请查看配置的基本概念
步骤 4: 运行SeaTunnel应用程序
您可以通过以下命令启动应用程序:
Spark 2.4.x
cd "apache-seatunnel-${version}"
./bin/start-seatunnel-spark-2-connector-v2.sh \
--master local[4] \
--deploy-mode client \
--config ./config/v2.streaming.conf.template
Spark 3.x.x
cd "apache-seatunnel-${version}"
./bin/start-seatunnel-spark-3-connector-v2.sh \
--master local[4] \
--deploy-mode client \
--config ./config/v2.streaming.conf.template
查看输出: 当您运行该命令时,您可以在控制台中看到它的输出。您可以认为这是命令运行成功或失败的标志。
SeaTunnel控制台将会打印一些如下日志信息:
fields : name, age
types : STRING, INT
row=1 : elWaB, 1984352560
row=2 : uAtnp, 762961563
row=3 : TQEIB, 2042675010
row=4 : DcFjo, 593971283
row=5 : SenEb, 2099913608
row=6 : DHjkg, 1928005856
row=7 : eScCM, 526029657
row=8 : sgOeE, 600878991
row=9 : gwdvw, 1951126920
row=10 : nSiKE, 488708928
row=11 : xubpl, 1420202810
row=12 : rHZqb, 331185742
row=13 : rciGD, 1112878259
row=14 : qLhdI, 1457046294
row=15 : ZTkRx, 1240668386
row=16 : SGZCr, 94186144