背景信息
如果您通过DataStream的方式读写数据,则需要使用对应的DataStream连接器连接Flink全托管。Maven中央库中已经放置了VVR DataStream连接器,以供您在作业开发时直接使用。您可以通过以下任何一种方式来使用连接器:
-
(推荐)上传连接器JAR包到实时计算控制台后,填写配置信息
-
直接将连接器作为项目依赖打进作业JAR包
重要
-
请使用我们在支持的连接器中指明提供DataStream API的连接器。如果某个连接器未注明提供DataStream API,请勿自行使用,因为未来接口和参数可能会被修改。
-
DataStream连接器均添加了商业化加密保护,直接运行会报错。如需本地调试和运行,请参见本地运行和调试包含连接器的作业。
-
(推荐)上传连接器JAR包到实时计算控制台后,填写配置信息
-
登录实时计算控制台。
-
单击目标工作空间操作列下的控制台。
-
在左侧导航栏,单击文件管理。
-
单击上传资源,选择您要上传的目标链接器的JAR包。
您可以上传您自己开发的连接器,也可以上传Flink全托管产品提供的连接器。Flink全托管产品提供的连接器官方JAR包的下载地址,请参见Connector列表。
-
在目标作业开发页面附加依赖文件项,选择目标连接器的JAR包。
直接将连接器作为项目依赖打进作业JAR包
步骤一:准备DataStream作业开发环境
-
在Maven项目的pom.xml文件中添加以下配置以引用SNAPSHOT仓库。
<repositories><repository><id>oss.sonatype.org-snapshot</id><name>OSS Sonatype Snapshot Repository</name><url>http://oss.sonatype.org/content/repositories/snapshots</url><releases><enabled>false</enabled></releases><snapshots><enabled>true</enabled></snapshots></repository><repository><id>apache.snapshots</id><name>Apache Development Snapshot Repository</name><url>https://repository.apache.org/content/repositories/snapshots/</url><releases><enabled>false</enabled></releases><snapshots><enabled>true</enabled></snapshots></repository> </repositories>
-
检查您的settings.xml配置文件中是否存在
<mirrorOf>*</mirrorOf>
配置。<mirrorOf>
中包含*,表示当前mirror已经包含了所有仓库,maven不会从上述指定的两个 SNAPSHOT仓库中下载,这会导致Maven工程无法下载这两个仓库中的SNAPSHOT依赖。因此如果 settings.xml文件<mirrorOf>*</mirrorOf>
配置中包含*,您可以根据如下情况进行相应的修改:-
存在
<mirrorOf>*</mirrorOf>
配置,请将配置改为<mirrorOf>*,!oss.sonatype.org-snapshot,!apache.snapshots</mirrorOf>
。 -
存在
<mirrorOf>external:*</mirrorOf>
配置,请将配置改为<mirrorOf>external:*,!oss.sonatype.org-snapshot,!apache.snapshots</mirrorOf>
。 -
存在
<mirrorOf>external:http:*</mirrorOf>
配置,请将配置改为<mirrorOf>external:http:*,!oss.sonatype.org-snapshot,!apache.snapshots</mirrorOf>
。
-
-
在作业的Maven POM文件中添加您需要的连接器作为项目依赖。
每个Connector版本对应的Connector类型可能不同,建议您使用最新版本。完整的依赖信息请参见MaxCompute-Demo、DataHub-Demo、Kafka-Demo或RocketMQ-Demo示例中的pom.xml文件。MaxCompute增量源表的项目依赖代码示例如下。
<dependency><groupId>com.alibaba.ververica</groupId><artifactId>ververica-connector-continuous-odps</artifactId><version>${connector.version}</version> </dependency>
除connector外,项目还需要依赖connector的公共包
flink-connector-base
:<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>${flink.version}</version> </dependency>
其中,
${flink.version}
是作业运行环境对应的Flink版本,如您的作业运行在1.15-vvr-6.0.7
版本引擎上,其对应的Flink版本为1.15.0
。重要
-
您需要在SNAPSHOT仓库(oss.sonatype.org)查找带SNAPSHOT的Connector版本,在Maven中央库(search.maven.org)上会查找不到。
-
在使用多个Connector时,请注意META-INF目录需要Merge,即在pom.xml文件中添加如下代码。
<transformers><!-- The service transformer is needed to merge META-INF/services files --><transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/><transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"><projectName>Apache Flink</projectName><encoding>UTF-8</encoding></transformer> </transformers>
-
步骤二:开发DataStream作业
DataStream连接配置信息和代码示例需要去查看对应的DataStream连接器文档,详情请参见:
-
MaxCompute
-
DataHub
-
Kafka
-
MySQL CDC
-
Hologres
-
RocketMQ
-
SLS
步骤三:打包并提交DataStream作业
使用Maven工具打包工程项目,并将生成的JAR包上传和提交到Flink全托管平台上,详细请参见部署JAR作业。