您的位置:首页 > 娱乐 > 八卦 > Flink CDC基本概念以及MySQL同步到MySQL

Flink CDC基本概念以及MySQL同步到MySQL

2024/12/22 0:45:29 来源:https://blog.csdn.net/L_15156024189/article/details/140747170  浏览:    关键词:Flink CDC基本概念以及MySQL同步到MySQL

目录

欢迎来到Flink CDC

核心概念

数据管道(Data Pipeline)

数据源(Data Source)

数据接收器(Data Sink)

表ID(Table ID)

转换(Transform)

路由(Route)

连接器(connectors)

管道连接器(pipeline connectors)

支持连接器

开发自己的连接器

Flink源

支持的连接器

 支持的Flink版本

特征

MySQL同步到MySQL

DataStream方式实现

需要的依赖pom.xml

准备工作

代码 

测试

​编辑

 SQL方式实现

需要的依赖pom.xml

代码

测试


        本文基于Flink CDC v2.4.2版本和Flink 1.17.1版本。

欢迎来到Flink CDC

        Flink CDC是一个流数据集成工具,旨在为用户提供更强大的API。它允许用户通过YAML优雅地描述他们的ETL管道逻辑,并帮助用户自动生成自定义Flink算子和提交作业。Flink CDC优先优化任务提交过程,并提供增强的功能,如模式演化(schema evolution)、数据转换(data transformation)、全数据库同步(full database synchronization)和仅一次语义(exactly-once semantic)。

        与Apache Flink深度集成并由其提供支持,Flink CDC提供:

✅端到端数据集成框架
✅基于数据集成API用户可轻松构建作业
✅源端/目标端中多表支持
✅整个数据库的同步
✅模式演化能力


核心概念

数据管道(Data Pipeline

        由于Flink CDC中的事件(events)以管道( pipeline)方式从上游流向下游,因此整个ETL任务被称为数据管道(Data Pipeline)。

数据源(Data Source

        数据源用于访问元数据(metadata)并从外部系统读取变更的数据(the changed data)。一个数据源可以同时从多个表中读取数据。

        注意,这里的数据源并不是指的外部系统这个数据源,而是Flink中自身定义的数据源,Flink用这个数据源来从外部系统读取变更的数据。

数据接收器(Data Sink

        数据接收器用于应用模式更改(schema changes)并将更改数据写入外部系统。一个数据接收器可以同时写多个表。

表ID(Table ID)

        在与外部系统连接时,需要与外部系统的存储对象建立映射关系。需要唯一确定存储对象,这就是Table ID所指。为了与大多数外部系统兼容,表ID由一个3元组表示:(namespace, schemaName, tableName)。连接器应该在表ID和外部系统中的存储对象之间建立映射。下表列出了不同数据系统表ID中的部分:

数据系统表ID组成例子
Oracle/PostgreSQLdatabase, schema, tablemydb.default.orders
MySQL/Doris/StarRocksdatabase, tablemydb.orders
Kafkatopicorders

转换(Transform

        Transform模块帮助用户根据表中的数据列来删除和扩展数据列。此外,它还可以帮助用户在同步过程中过滤一些不必要的数据。

路由(Route

        路由指定匹配源表列表和映射到目标表的规则。最典型的场景是合并子数据库和子表,将多个上游源表路由到同一个目标表。


连接器(connectors)

        这里connector分了两个章节,需要说明connector、souce、sink的区别。source和sink都可以称为connector。或者connector包括source和sink,由于历史原因,先是source,sink,后面使用connector对source和sink做了统一。

管道连接器(pipeline connectors)

        Flink CDC提供了几个源和接收器连接器来与外部系统进行交互。通过将发布的jar添加到Flink CDC环境中,并在YAML管道定义中指定连接器,您可以使用开箱即用的连接器。

支持连接器

连接器支持的连接器类型外部系统
Apache DorisSink
  • Apache Doris: 1.2.x, 2.x.x
KafkaSink
  • Kafka
MySQLSource
  • MySQL: 5.6, 5.7, 8.0.x
  • RDS MySQL: 5.6, 5.7, 8.0.x
  • PolarDB MySQL: 5.6, 5.7, 8.0.x
  • Aurora MySQL: 5.6, 5.7, 8.0.x
  • MariaDB: 10.x
  • PolarDB X: 2.0.1
PaimonSink
  • Paimon: 0.6, 0.7, 0.8
StarRocksSink
  • StarRocks: 2.x, 3.x

开发自己的连接器

        如果提供的连接器不能满足您的要求,您可以开发自己的连接器,以使您的外部系统参与Flink CDC管道。查看Flink CDC api,了解如何开发自己的连接器。

Flink源

        Flink CDC源是Apache Flink的一组源连接器(source connectors),使用变更数据捕获(CDC)从不同的数据库摄取更改。一些CDC源集成了Debezium作为捕获数据变化的引擎。所以它可以充分利用Debezium的能力。了解更多关于什么是Debezium。

debezium

支持的连接器

连接器数据库驱动
mongodb-cdc
  • MongoDB: 3.6, 4.x, 5.0, 6.0, 6.1
MongoDB Driver: 4.9.1
mysql-cdc
  • MySQL: 5.6, 5.7, 8.0.x
  • RDS MySQL: 5.6, 5.7, 8.0.x
  • PolarDB MySQL: 5.6, 5.7, 8.0.x
  • Aurora MySQL: 5.6, 5.7, 8.0.x
  • MariaDB: 10.x
  • PolarDB X: 2.0.1
JDBC Driver: 8.0.28
oceanbase-cdc
  • OceanBase CE: 3.1.x, 4.x
  • OceanBase EE: 2.x, 3.x, 4.x
OceanBase Driver: 2.4.x
oracle-cdc
  • Oracle: 11, 12, 19, 21
Oracle Driver: 19.3.0.0
postgres-cdc
  • PostgreSQL: 9.6, 10, 11, 12, 13, 14
JDBC Driver: 42.5.1
sqlserver-cdc
  • Sqlserver: 2012, 2014, 2016, 2017, 2019
JDBC Driver: 9.4.1.jre8
tidb-cdc
  • TiDB: 5.1.x, 5.2.x, 5.3.x, 5.4.x, 6.0.0
JDBC Driver: 8.0.27
db2-cdc
  • Db2: 11.5
Db2 Driver: 11.5.0.0
vitess-cdc
  • Vitess: 8.0.x, 9.0.x
MySql JDBC Driver: 8.0.26

 支持的Flink版本

Flink CDC 版本Flink版本
1.0.01.11.*
1.1.01.11.*
1.2.01.12.*
1.3.01.12.*
1.4.01.13.*
2.0.*1.13.*
2.1.*1.13.*
2.2.*1.13.*, 1.14.*
2.3.*1.13.*, 1.14.*, 1.15.*, 1.16.*
2.4.*1.13.*, 1.14.*, 1.15.*, 1.16.*, 1.17.*
3.0.*1.14.*, 1.15.*, 1.16.*, 1.17.*, 1.18.*

特征

1、支持读取数据库快照,即使发生故障,也能以仅一次处理方式继续读取binlogs。
2、数据流API的CDC连接器,用户可以在单个作业中消费多个数据库和表上的更改,而无需部署Debezium和Kafka。
3、用于表/SQL API的CDC连接器,用户可以使用SQL DDL创建CDC源来监视单个表上的更改。

        下表显示了连接器(connector)的当前特性:

连接器无锁读并行读仅一次读增量快照读
mongodb-cdc
mysql-cdc
oracle-cdc
postgres-cdc
sqlserver-cdc
oceanbase-cdc
tidb-cdc
db2-cdc
vitess-cdc

MySQL同步到MySQL

DataStream方式实现

需要的依赖pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.leboop.www</groupId><artifactId>flink</artifactId><version>1.0-SNAPSHOT</version><properties><flink.version>1.17.1</flink.version><scala.binary.version>2.12</scala.binary.version><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target></properties><dependencies><!-- flink客户端 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><!--  Table API for Java --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java</artifactId><version>${flink.version}</version></dependency><!-- flink cdc for mysql --><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.4.2</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>1.18.0</version></dependency><!-- json解析 --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.78</version> <!-- 请使用最新的版本号 --></dependency></dependencies>
</project>

如果缺少依赖,可能报错如下:

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/connector/base/source/reader/RecordEmitterat com.leboop.cdc.MysqlCDCDemo.main(MysqlCDCDemo.java:17)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.connector.base.source.reader.RecordEmitterat java.net.URLClassLoader.findClass(URLClassLoader.java:382)at java.lang.ClassLoader.loadClass(ClassLoader.java:424)at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)at java.lang.ClassLoader.loadClass(ClassLoader.java:357)... 1 more

添加如下依赖即可:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>1.18.0</version>
</dependency>

如果报错如下:

Exception in thread "main" java.lang.IllegalStateException: No ExecutorFactory found to execute the application.at org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:88)at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getPipelineExecutor(StreamExecutionEnvironment.java:2717)at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2194)at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2084)at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:68)at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2058)at com.leboop.cdc.MysqlCDCDemo.main(MysqlCDCDemo.java:62)

添加如下依赖: 

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version>
</dependency>

准备工作

        本文仅仅为了演示,在windows本地安装了8.0.30版本的MySQL。如图:

准备两个数据库,分别作为本次案例的source和sink,如图: 

 建表语句分别如下:


CREATE TABLE `human` (`id` bigint NOT NULL AUTO_INCREMENT,`name` varchar(100) DEFAULT NULL,`age` int DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=8 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;CREATE TABLE `human_sink` (`id` bigint NOT NULL AUTO_INCREMENT,`name` varchar(100) DEFAULT NULL,`age` int DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=8 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

 MySQL CDC需要开启biglog日志,执行如下SQL查看biglog日志是否开启

SHOW VARIABLES LIKE 'log_bin';

如图:

 Value为ON,表示开启。

代码 

package com.leboop.cdc;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;/*** Description TODO.* Date 2024/7/28 15:48** @author leb* @version 2.0*/
public class MysqlCDCDemo {public static void main(String[] args) throws Exception {// flink source,source类型为mysqlMySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("localhost").port(80).databaseList("cdc_demo").tableList("cdc_demo.human").username("root").password("root").deserializer(new JsonDebeziumDeserializationSchema()).serverId("1").build();// 初始化环境.final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// enable checkpointenv.enableCheckpointing(3000);DataStreamSource<String> stringDataStreamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")// set 1 parallel source tasks.setParallelism(1);// 将数据打印到客户端.stringDataStreamSource.print().setParallelism(1); // use parallelism 1 for sink// 数据同步到mysqlstringDataStreamSource.addSink(new RichSinkFunction<String>() {private Connection connection = null;private PreparedStatement preparedStatement = null;@Overridepublic void open(Configuration parameters) throws Exception {if (connection == null) {Class.forName("com.mysql.cj.jdbc.Driver");//加载数据库驱动connection = DriverManager.getConnection("jdbc:mysql://localhost:80", "root", "root");//获取连接connection.setAutoCommit(false);//关闭自动提交}}@Overridepublic void invoke(String value, Context context) throws Exception {JSONObject jsonObject = JSON.parseObject(value);String op = jsonObject.getString("op");if ("r".equals(op)) { // 首次全量System.out.println("执行清表操作");connection.prepareStatement("truncate table cdc_sink.human_sink").execute(); // 清空目标表数据JSONObject after = jsonObject.getJSONObject("after");Integer id = after.getInteger("id");String name = after.getString("name");Integer age = after.getInteger("age");preparedStatement = connection.prepareStatement("insert into cdc_sink.human_sink values (?,?,?)");preparedStatement.setInt(1, id);preparedStatement.setString(2, name);preparedStatement.setInt(3, age);preparedStatement.execute();connection.commit();//预处理完成后统一提交}else if("c".equals(op)) { // 新增.JSONObject after = jsonObject.getJSONObject("after");Integer id = after.getInteger("id");String name = after.getString("name");Integer age = after.getInteger("age");preparedStatement = connection.prepareStatement("insert into cdc_sink.human_sink values (?,?,?)");preparedStatement.setInt(1, id);preparedStatement.setString(2, name);preparedStatement.setInt(3, age);preparedStatement.execute();connection.commit();//预处理完成后统一提交}else if ("d".equals(op)) { // 删除JSONObject after = jsonObject.getJSONObject("before");Integer id = after.getInteger("id");preparedStatement = connection.prepareStatement("delete from cdc_sink.human_sink where id = ?");preparedStatement.setInt(1, id);preparedStatement.execute();connection.commit();//预处理完成后统一提交} else if ("u".equals(op)) { // 更新JSONObject after = jsonObject.getJSONObject("after");Integer id = after.getInteger("id");String name = after.getString("name");Integer age = after.getInteger("age");preparedStatement = connection.prepareStatement("update cdc_sink.human_sink set name = ?, age = ? where id = ?");preparedStatement.setString(1, name);preparedStatement.setInt(2, age);preparedStatement.setInt(3, id);preparedStatement.execute();connection.commit();//预处理完成后统一提交} else {System.out.println("不支持的操作op=" + op);}}@Overridepublic void close() throws Exception {System.out.println("执行close方法");if (preparedStatement != null) {preparedStatement.close();}if (connection != null) {connection.close();}}});env.execute("Print MySQL Snapshot + Binlog");}
}

(1)Flink源

如下代码连接了本地MySQL数据库cdc_demo。

        MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("localhost").port(80).databaseList("cdc_demo").tableList("cdc_demo.human").username("root").password("root").deserializer(new JsonDebeziumDeserializationSchema()).serverId("1").build();

new JsonDebeziumDeserializationSchema()将读取的MySQL binlog数据反序列为JSON字符串数据,后面通过控制台输出可以看到。

(2)server id

        每个用于读取binlog的MySQL数据库客户端都应该有一个唯一的id,称为服务器id。MySQL服务器将使用此id来维护网络连接和binlog位置。因此,如果不同的作业共享相同的服务器id,可能会导致从错误的binlog位置读取。因此,建议为每个阅读器设置不同的服务器id,例如,假设源并行度为4,那么我们可以使用'5401-5404',为4个源阅读器中的每一个分配唯一的服务器id。

(3)从MySQL源读取数据

        DataStreamSource<String> stringDataStreamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")// set 1 parallel source tasks.setParallelism(1);

代码从MySQL源读取了数据,并设置读取并行度为1,如果这里并行度为4,则前面需要4个server id,例如"1-4"。

(3)将读取的MySQL数据打印到控制台

        // 将数据打印到客户端.stringDataStreamSource.print().setParallelism(1); // use parallelism 1 for sink

这里仅仅为了查看Binglog日志读取后,转换成Json字符串是什么样的。下面展示了三条该字符串:

{"before":null,"after":{"id":7,"name":"lisi","age":23},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":0,"snapshot":"false","db":"cdc_demo","sequence":null,"table":"human","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1722218198388,"transaction":null}
{"before":{"id":6,"name":"zhangsan","age":12},"after":null,"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1722218564000,"snapshot":"false","db":"cdc_demo","sequence":null,"table":"human","server_id":1,"gtid":null,"file":"LEBOOP-bin.000005","pos":75954,"row":0,"thread":57,"query":null},"op":"d","ts_ms":1722218564587,"transaction":null}
{"before":{"id":7,"name":"lisi","age":23},"after":{"id":7,"name":"lisi","age":10},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1722218597000,"snapshot":"false","db":"cdc_demo","sequence":null,"table":"human","server_id":1,"gtid":null,"file":"LEBOOP-bin.000005","pos":76582,"row":0,"thread":57,"query":null},"op":"u","ts_ms":1722218597551,"transaction":null}

第一条Json数据格式化后如下: 

{"before": null,"after": {"id": 7,"name": "lisi","age": 23},"source": {"version": "1.9.7.Final","connector": "mysql","name": "mysql_binlog_source","ts_ms": 0,"snapshot": "false","db": "cdc_demo","sequence": null,"table": "human","server_id": 0,"gtid": null,"file": "","pos": 0,"row": 0,"thread": null,"query": null},"op": "r","ts_ms": 1722218198388,"transaction": null
}

其中before表示操作前的数据,after表示操作后的数据。op表示操作类型,分为:

  • "op": "d" 代表删除操作

  • "op": "u" 代表更新操作

  • "op": "c" 代表新增操作

  • "op": "r" 代表全量读取,而不是来自 binlog 的增量读取

例如上面第一条为首次全量同步cdc_demo数据库human表Json格式的binglog数据,因此before为null,after为数据,op为r。类似地,第二条为更新数据;第三条数据为删除一条数据,其op值为d。

(4)sink

这里使用匿名内部类RichSinkFunction实现了MySQL sink。

测试

        先向human表中插入2条数据,SQL如下:

insert into cdc_demo.human(id,name,age) values(1,"zhangsan",12);
insert into cdc_demo.human(id,name,age) values(2,"lisi",23);

然后启动程序,输出日志如下:

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
log4j:WARN No appenders could be found for logger (org.apache.flink.shaded.netty4.io.netty.util.internal.logging.InternalLoggerFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
SLF4J: Failed to load class "org.slf4j.impl.StaticMDCBinder".
SLF4J: Defaulting to no-operation MDCAdapter implementation.
SLF4J: See http://www.slf4j.org/codes.html#no_static_mdc_binder for further details.
{"before":null,"after":{"id":1,"name":"zhangsan","age":12},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":0,"snapshot":"false","db":"cdc_demo","sequence":null,"table":"human","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1722219401429,"transaction":null}
{"before":null,"after":{"id":2,"name":"lisi","age":23},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":0,"snapshot":"false","db":"cdc_demo","sequence":null,"table":"human","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1722219401430,"transaction":null}
七月 29, 2024 10:16:42 上午 com.github.shyiko.mysql.binlog.BinaryLogClient connect
信息: Connected to localhost:80 at LEBOOP-bin.000005/80097 (sid:1, cid:803)
执行清表操作
执行清表操作

查看human_sink表,可以看到human表中的两条数据已经被同步:

接着执行如下更新、删除、新增SQL:

update cdc_demo.human set age = 10 where id = 1;
delete from cdc_demo.human where id = 2;
insert into cdc_demo.human(id,name,age) values(3,"zhangsan",12);

 输出日志如下:

{"before":{"id":1,"name":"zhangsan","age":12},"after":{"id":1,"name":"zhangsan","age":10},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1722219563000,"snapshot":"false","db":"cdc_demo","sequence":null,"table":"human","server_id":1,"gtid":null,"file":"LEBOOP-bin.000005","pos":81312,"row":0,"thread":57,"query":null},"op":"u","ts_ms":1722219563829,"transaction":null}
{"before":{"id":2,"name":"lisi","age":23},"after":null,"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1722219563000,"snapshot":"false","db":"cdc_demo","sequence":null,"table":"human","server_id":1,"gtid":null,"file":"LEBOOP-bin.000005","pos":81647,"row":0,"thread":57,"query":null},"op":"d","ts_ms":1722219563849,"transaction":null}
{"before":null,"after":{"id":3,"name":"zhangsan","age":12},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1722219563000,"snapshot":"false","db":"cdc_demo","sequence":null,"table":"human","server_id":1,"gtid":null,"file":"LEBOOP-bin.000005","pos":81954,"row":0,"thread":57,"query":null},"op":"c","ts_ms":1722219563872,"transaction":null}

如图:

最终看到两张表数据保持一致,如图: 

 SQL方式实现

需要的依赖pom.xml

        在DataStream方式上,还需要添加如下依赖:

        <dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>3.0.0-1.16</version></dependency>

如果报错如下:

Exception in thread "main" org.apache.flink.table.api.ValidationException: Could not find any factories that implement 'org.apache.flink.table.delegation.ExecutorFactory' in the classpath.at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:534)at org.apache.flink.table.api.internal.TableEnvironmentImpl.create(TableEnvironmentImpl.java:277)at org.apache.flink.table.api.TableEnvironment.create(TableEnvironment.java:95)at com.leboop.cdc.MysqlCDCSqlDemo.main(MysqlCDCSqlDemo.java:30)

添加如下依赖: 

        <dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency>

如果报错如下: 

Exception in thread "main" org.apache.flink.table.api.ValidationException: Unable to create a sink for writing table 'default_catalog.default_database.human_sink'.Table options are:'connector'='jdbc'
'driver'='com.mysql.cj.jdbc.Driver'
'password'='******'
'table-name'='human_sink'
'url'='jdbc:mysql://localhost:80/cdc_sink'
'username'='root'at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:270)at org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:459)at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:236)at org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:194)at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:234)at scala.collection.Iterator.foreach(Iterator.scala:929)at scala.collection.Iterator.foreach$(Iterator.scala:929)at scala.collection.AbstractIterator.foreach(Iterator.scala:1406)at scala.collection.IterableLike.foreach(IterableLike.scala:71)at scala.collection.IterableLike.foreach$(IterableLike.scala:70)at scala.collection.AbstractIterable.foreach(Iterable.scala:54)at scala.collection.TraversableLike.map(TraversableLike.scala:234)at scala.collection.TraversableLike.map$(TraversableLike.scala:227)at scala.collection.AbstractTraversable.map(Traversable.scala:104)at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:194)at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1803)at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:881)at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:989)at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:765)at com.leboop.cdc.MysqlCDCSqlDemo.main(MysqlCDCSqlDemo.java:68)
Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option: 'connector'='jdbc'at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:736)at org.apache.flink.table.factories.FactoryUtil.discoverTableFactory(FactoryUtil.java:710)at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:265)... 19 more
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'jdbc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.Available factory identifiers are:blackhole
datagen
mysql-cdc
printat org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:546)at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:732)... 21 more

请添加如下依赖: 

        <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>3.0.0-1.16</version></dependency>

代码

package com.leboop.cdc;import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;/*** Description TODO.* Date 2024/7/28 15:48** @author leb* @version 2.0*/
public class MysqlCDCSqlDemo {public static void main(String[] args) throws Exception {EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();TableEnvironment tableEnv = TableEnvironment.create(settings);tableEnv.getConfig().getConfiguration().setLong("execution.checkpointing.interval", 3000L);// sourceTableResult createSourceTable = tableEnv.executeSql("CREATE TABLE flink_human ( \n" +"id BIGINT ,\n" +"name STRING ,\n" +"age INT ,\n" +"PRIMARY KEY (id) NOT ENFORCED \n" +") WITH (\n" +" 'connector' = 'mysql-cdc',\n" +" 'hostname' = 'localhost',\n" +" 'port' = '80',\n" +" 'username' = 'root',\n" +" 'password' = 'root',\n" +" 'database-name' = 'cdc_demo',\n" +" 'table-name' = 'human') ");// 输出source表createSourceTable.print();System.out.println("创建源表结束");// sinkTableResult createSinkTable = tableEnv.executeSql("CREATE TABLE flink_human_sink (" +"id BIGINT ," +"name STRING ," +"age INT ," +"PRIMARY KEY(id) NOT ENFORCED " +") WITH (" +" 'connector' = 'jdbc'," +" 'url' = 'jdbc:mysql://localhost:80/cdc_sink', " +" 'driver' = 'com.mysql.cj.jdbc.Driver', " +" 'username' = 'root'," +" 'password' = 'root'," +" 'table-name' = 'human_sink' )");createSinkTable.print();System.out.println("创建sink表结束");// 插入tableEnv.executeSql("insert into flink_human_sink select id,name,age from flink_human");System.out.println("插入sink表结束");}
}

 (1)创建源表

        如下代码创建了Flink中的源表,为什么说是Flink中呢?原因是该代码将mysql中的human表映射为Flink中的flink_human表,后文代码中就可以使用flink_human表了,代码如下:

        // sourceTableResult createSourceTable = tableEnv.executeSql("CREATE TABLE flink_human ( \n" +"id BIGINT ,\n" +"name STRING ,\n" +"age INT ,\n" +"PRIMARY KEY (id) NOT ENFORCED \n" +") WITH (\n" +" 'connector' = 'mysql-cdc',\n" +" 'hostname' = 'localhost',\n" +" 'port' = '80',\n" +" 'username' = 'root',\n" +" 'password' = 'root',\n" +" 'database-name' = 'cdc_demo',\n" +" 'table-name' = 'human') ");

注意这里connector必须是mysql-cdc。

(2)创建目标表

代码如下:

        // sinkTableResult createSinkTable = tableEnv.executeSql("CREATE TABLE flink_human_sink (" +"id BIGINT ," +"name STRING ," +"age INT ," +"PRIMARY KEY(id) NOT ENFORCED " +") WITH (" +" 'connector' = 'jdbc'," +" 'url' = 'jdbc:mysql://localhost:80/cdc_sink', " +" 'driver' = 'com.mysql.cj.jdbc.Driver', " +" 'username' = 'root'," +" 'password' = 'root'," +" 'table-name' = 'human_sink' )");

这里connector的值必须是jdbc,即通过jdbc连接器实现。

(3)同步数据

        通过如下SQL即可以实现数据同步:

        tableEnv.executeSql("insert into flink_human_sink select id,name,age from flink_human");

测试

        与DataStream测试过程相同。


        值得注意的是:对MySQL的insert、update、delete操作可以完成同步,但对有些操作并不能完成同步,例如truncate操作。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com