您的位置:首页 > 教育 > 锐评 > FlinkCDC初体验

FlinkCDC初体验

2024/10/5 22:30:49 来源:https://blog.csdn.net/weixin_44996457/article/details/141252529  浏览:    关键词:FlinkCDC初体验

1、pom依赖

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>Flink1.17.2</artifactId><version>1.0-SNAPSHOT</version><name>Flink1.17.2</name><!-- FIXME change it to the project's website --><url>http://www.example.com</url><repositories><repository><id>oss.sonatype.org-snapshot</id><name>OSS Sonatype Snapshot Repository</name><url>http://oss.sonatype.org/content/repositories/snapshots</url><releases><enabled>false</enabled></releases><snapshots><enabled>true</enabled></snapshots></repository><repository><id>apache.snapshots</id><name>Apache Development Snapshot Repository</name><url>https://repository.apache.org/content/repositories/snapshots/</url><releases><enabled>false</enabled></releases><snapshots><enabled>true</enabled></snapshots></repository></repositories><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><scala.version>2.12.7</scala.version><scala.binary.version>2.12</scala.binary.version><flink.version>1.17.2</flink.version><java.version>1.8</java.version><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target></properties><dependencies>
<!--    <dependency>-->
<!--      <groupId>com.alibaba.flink</groupId>-->
<!--      <artifactId>datahub-connector</artifactId>-->
<!--      <version>0.1-SNAPSHOT</version>-->
<!--      <classifier>jar-with-dependencies</classifier>-->
<!--    </dependency>-->
<!--     https://mvnrepository.com/artifact/com.alibaba.ververica/ververica-connector-datahub --><dependency><groupId>com.alibaba.ververica</groupId><artifactId>ververica-connector-datahub</artifactId><version>1.17-vvr-8.0.8</version><scope>provided</scope></dependency><!-- https://mvnrepository.com/artifact/com.alibaba.ververica/ververica-connector-common --><dependency><groupId>com.alibaba.ververica</groupId><artifactId>ververica-connector-common</artifactId><version>1.17-vvr-8.0.8</version><scope>provided</scope></dependency><dependency><groupId>com.alibaba.ververica</groupId><artifactId>ververica-connector-continuous-odps</artifactId><version>1.17-vvr-8.0.8</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>${flink.version}</version>
<!--          <scope>provided</scope>--></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-core</artifactId><version>${flink.version}</version>
<!--        <scope>provided</scope>--></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version>
<!--        <scope>provided</scope>--></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version>
<!--          <scope>provided</scope>--></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version>
<!--            <scope>provided</scope>--></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-runtime --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime</artifactId><version>${flink.version}</version>
<!--            <scope>provided</scope>--></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-common --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version>
<!--      <scope>provided</scope>--></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-bridge --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>${flink.version}</version>
<!--      <scope>provided</scope>--></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.12</artifactId><version>${flink.version}</version>
<!--      <scope>test</scope>--></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-hive --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-hive_2.12</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>2.12.7</version>
<!--            <scope>provided</scope>--></dependency><!-- https://mvnrepository.com/artifact/org.projectlombok/lombok --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.34</version><!--      <scope>provided</scope>--></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.23</version><scope>provided</scope></dependency><!-- https://mvnrepository.com/artifact/com.ververica/flink-connector-debezium --><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-debezium</artifactId><version>2.2.0</version>
<!--          <scope>provided</scope>--></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.2.0</version>
<!--      <scope>provided</scope>--></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-sql-connector-mysql-cdc</artifactId><version>2.2.0</version>
<!--          <scope>provided</scope>--></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>${java.version}</source><target>${java.version}</target></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.2.4</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><finalName>${project.artifactId}-${project.version}-flink-fat-jar</finalName><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>com.tbea.cdc.FlinkCDCStreamExample</mainClass> <!-- 指定你的主类,例如 org.example.MyFlinkJob --></transformer></transformers></configuration></execution></executions></plugin></plugins></build>
</project>

 2、FlinkCDC-SQL

package com.tbea.cdc;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.*;public class FlinkCDCSqlExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);// 创建HiveCatalog
//        String name            = "myhive";
//        String defaultDatabase = "mydatabase";
//        String hiveConfDir      = "/path/to/your/hive/conf"; // 替换为你的Hive配置文件夹路径
//        HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase, hiveConfDir);
//        tableEnv.registerCatalog("myhive", hiveCatalog);
//
//        // 设置HiveCatalog为当前catalog
//        tableEnv.useCatalog("myhive");// 注册MySQL表String sourceDDL = "CREATE TABLE source_table (" +" id INT," +" name STRING," +" age INT," +" addr STRING ,"+" create_time timestamp,"+" PRIMARY KEY (id) NOT ENFORCED" +") WITH (" +" 'connector' = 'mysql-cdc'," +" 'hostname' = '192.168.140.1'," + // 替换为你的MySQL主机名" 'port' = '3306'," +" 'username' = 'flink'," + // 替换为你的MySQL用户名" 'password' = 'flink'," + // 替换为你的MySQL密码" 'database-name' = 'flinkcdc'," + // 替换为你的数据库名" 'table-name' = 'user_info'," + // 替换为你的表名//                " 'scan.startup.timestamp-millis' = '1000',"+" 'scan.incremental.snapshot.enabled' = 'true',"+" 'scan.incremental.snapshot.chunk.size' = '8096'," +" 'scan.startup.mode' = 'latest-offset'," +" 'debezium.snapshot.mode'= 'latest',"+" 'scan.newly-added-table.enabled' = 'true'"+
//                " 'debezium.skipped.operations'='d'" +")";tableEnv.executeSql(sourceDDL);// 查询并打印数据TableResult result = tableEnv.executeSql("SELECT id,name,age,addr,create_time FROM source_table");result.print();env.execute();}
}

运行结果:

3、FlinkCDC-DataStream

package com.tbea.cdc;import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class FlinkCDCStreamExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("192.168.140.1").port(3306)// 设置捕获的数据库.databaseList("flinkcdc")// 设置捕获的表 [product, user, address].tableList("flinkcdc.*").username("flink").password("flink").deserializer(new JsonDebeziumDeserializationSchema()) // 将 SourceRecord 转换为 JSON 字符串// 启用扫描新添加的表功能.scanNewlyAddedTableEnabled(true).startupOptions(StartupOptions.latest()).build();// 你的业务代码// 设置 3s 的 checkpoint 间隔env.setParallelism(1);env.setRestartStrategy(RestartStrategies.failureRateRestart(1, Time.seconds(3L),Time.seconds(5L)));env.enableCheckpointing(3000);DataStreamSource<String> dataStreamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "Mysql-Source");dataStreamSource.print();env.execute("Print MySQL Snapshot + Binlog");}
}

 运行结果:

{"before":null,"after":{"id":22,"name":"杨延昭","age":45,"addr":"辽宁","create_time":1723798965000},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1723770167000,"snapshot":"false","db":"flinkcdc","sequence":null,"table":"user_info","server_id":1,"gtid":null,"file":"YANGYINGCHUN-bin.000004","pos":8640,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1723770167469,"transaction":null}{"before":{"id":17,"name":"储熊","age":28,"addr":"张北","create_time":null},"after":{"id":17,"name":"储熊","age":28,"addr":"张北","create_time":1723798410000},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1723769613000,"snapshot":"false","db":"flinkcdc","sequence":null,"table":"user_info","server_id":1,"gtid":null,"file":"YANGYINGCHUN-bin.000004","pos":6540,"row":0,"thread":null,"query":null},"op":"u","ts_ms":1723769613737,"transaction":null}{"before":{"id":8,"name":"赵刚","age":33,"addr":"延安","create_time":1724230872000},"after":null,"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1723770354000,"snapshot":"false","db":"flinkcdc","sequence":null,"table":"user_info","server_id":1,"gtid":null,"file":"YANGYINGCHUN-bin.000004","pos":8968,"row":0,"thread":null,"query":null},"op":"d","ts_ms":1723770354642,"transaction":null}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com