- flink-connector-mysql-cdc:
- 01 mysql-cdc基础配置代码演示
- 02 mysql-cdc高级扩展
- 03 mysql-cdc常见问题汇总
- 04 mysql-cdc-kafka生产级代码分享
- 05 flink-kafka-doris生产级代码分享
- 06 flink-kafka-hudi生产级代码分享
- flink-cdc版本:3.2.0
- flink版本:flink-1.18.0
- mysql版本:8.0.26
- java版本:1.8
- maven版本:3.8.4
-
mysql-cdc同步从库数据
- 从库需要配置 log-slave-updates = 1 使从实例也能将从主实例同步的数据写入从库的 binlog 文件中,如果主库开启了gtid mode,从库也需要开启。
log-slave-updates = 1
gtid_mode = on
enforce_gtid_consistency = on
mysql-cdc同步分库分表的表
mysql cdc 的表名和库名均支持正则配置,比如 ’.tableList("cdc_demo.flink_cdc_.*")’ 可以匹配表名 cdc_demo.flink_cdc_01, cdc_demo.flink_cdc_02,cdc_demo.flink_cdc_a表.
注意正则匹配任意字符是’.’ 而不是 ‘*’, 其中点号表示任意字符,星号表示0个或多个,databaseList也如此。
MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("localhost").port(3306).username("flinkcdc").password("123456")// 表所在的数据库库名,如果需要捕获整个库的表,配置为:".*".;如果需要捕获多个数据库配置为:.databaseList("cdc01", "cdc02").databaseList("cdc_.*")// 设置需要捕获日志的表名,注意需要配置库名,大小敏感.tableList("cdc_demo.flink_cdc_.*").serverTimeZone("Asia/Shanghai")// 将 SourceRecord 转换为 JSON 字符串。.deserializer(new JsonDebeziumDeserializationSchema()).build();
自定义类型转器
3.3.1 实现类型转换类
package com.toroidal.utils;import io.debezium.spi.converter.CustomConverter;
import io.debezium.spi.converter.RelationalColumn;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.sql.Date;
import java.time.Instant;
import java.time.LocalDate;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.Properties;public class DebeziumConverter implements CustomConverter<SchemaBuilder, RelationalColumn> {private static final Logger log = LoggerFactory.getLogger(DebeziumConverter.class);private static final String DATE_FORMAT = "yyyy-MM-dd";private static final String TIME_FORMAT = "HH:mm:ss";private static final String DATETIME_FORMAT = "yyyy-MM-dd HH:mm:ss";private DateTimeFormatter dateFormatter = DateTimeFormatter.ISO_DATE;private DateTimeFormatter timeFormatter;private DateTimeFormatter datetimeFormatter;private SchemaBuilder schemaBuilder;private String databaseType;private String schemaNamePrefix;// 获取默认时区private final ZoneId zoneId = ZoneOffset.systemDefault();@Overridepublic void configure(Properties properties) {// 必填参数:database.type。获取数据库的类型,暂时支持mysql、sqlserverthis.databaseType = properties.getProperty("database.type");// 如果未设置,或者设置的不是mysql、sqlserver,则抛出异常。if (this.databaseType == null || (!this.databaseType.equals("mysql") && !this.databaseType.equals("oracle") && !this.databaseType.equals("sqlserver"))) {throw new IllegalArgumentException("database.type 必须设置为: mysql、sqlserver或oracle");}// 选填参数:format.date、format.time、format.datetime。获取时间格式化的格式String dateFormat = properties.getProperty("format.date", DATE_FORMAT);String timeFormat = properties.getProperty("format.time", TIME_FORMAT);String datetimeFormat = properties.getProperty("format.datetime", DATETIME_FORMAT);// 获取自身类的包名+数据库类型为默认schema.nameString className = this.getClass().getName();// 查看是否设置schema.name.prefixthis.schemaNamePrefix = properties.getProperty("schema.name.prefix", className + "." + this.databaseType);// 初始化时间格式化器dateFormatter = DateTimeFormatter.ofPattern(dateFormat);timeFormatter = DateTimeFormatter.ofPattern(timeFormat);datetimeFormatter = DateTimeFormatter.ofPattern(datetimeFormat);}// mysql的转换器public void registerMysqlConverter(String columnType, ConverterRegistration<SchemaBuilder> converterRegistration) {String schemaName = this.schemaNamePrefix + "." + columnType.toLowerCase();schemaBuilder = SchemaBuilder.string().name(schemaName);switch (columnType) {case "DATE":converterRegistration.register(schemaBuilder, value -> {if (value == null) {return null;} else if (value instanceof LocalDate) {return dateFormatter.format((LocalDate) value);} else if (value instanceof Integer) {LocalDate date = LocalDate.ofEpochDay((Integer) value);return dateFormatter.format(date);} else {return String.valueOf(value);}});break;case "TIME":converterRegistration.register(schemaBuilder, value -> {if (value == null) {return null;} else if (value instanceof java.time.Duration) {return timeFormatter.format(java.time.LocalTime.ofNanoOfDay(((java.time.Duration) value).toNanos()));} else if (value instanceof Integer) {LocalDate date = LocalDate.ofEpochDay((Integer) value);return dateFormatter.format(date);} else {return String.valueOf(value);}});break;case "DATETIME":case "TIMESTAMP":converterRegistration.register(schemaBuilder, value -> {if (value == null) {return null;} else if (value instanceof java.time.LocalDateTime) {return datetimeFormatter.format((java.time.LocalDateTime) value);} else if (value instanceof java.time.ZonedDateTime) {// 获取系统默认时区
// ZoneOffset zoneOffset = java.time.ZoneId.systemDefault().getRules().getOffset(java.time.Instant.now());
// return datetimeFormatter.format(((java.time.ZonedDateTime) value).withZoneSameInstant(zoneOffset).toLocalDateTime());return datetimeFormatter.format(((java.time.ZonedDateTime) value).withZoneSameInstant(zoneId).toLocalDateTime());} else if (value instanceof java.sql.Timestamp) {return datetimeFormatter.format(((java.sql.Timestamp) value).toLocalDateTime());} else if (value instanceof String) {// 初始化出现1970-01-01T00:00:00Zd的值,需要转换Instant instant = Instant.parse((String) value);java.time.LocalDateTime dateTime = java.time.LocalDateTime.ofInstant(instant, ZoneOffset.UTC);return datetimeFormatter.format(dateTime);} else {return String.valueOf(value);}});break;default:schemaBuilder = null;break;}}// oracle的转换器public void registerSqlserverConverter(String columnType, ConverterRegistration<SchemaBuilder> converterRegistration) {String schemaName = this.schemaNamePrefix + "." + columnType.toLowerCase();schemaBuilder = SchemaBuilder.string().name(schemaName);switch (columnType) {case "DATE":converterRegistration.register(schemaBuilder, value -> {System.out.println("120 value: " + value + " columnType: " + columnType);if (value == null) {return null;} else if (value instanceof Date) {return dateFormatter.format(((Date) value).toLocalDate());} else if (value instanceof Integer) {LocalDate date = LocalDate.ofEpochDay((Integer) value);return dateFormatter.format(date);} else {return String.valueOf(value).replace("TO_DATE('", "").replace("', 'YYYY-MM-DD HH24:MI:SS')", "");}});break;case "TIMESTAMP":case "TIMESTAMP(3)":case "TIMESTAMP(6)":case "TIMESTAMP(9)":converterRegistration.register(schemaBuilder, value -> {System.out.println("137 value: " + value + " columnType: " + columnType);if (value == null) {return null;} else if (value instanceof java.sql.Timestamp) {return timeFormatter.format(((java.sql.Timestamp) value).toLocalDateTime().toLocalTime());} else {return String.valueOf(value).replace("TO_TIMESTAMP('", "").replace("')", "");}});break;default:schemaBuilder = null;break;}}@Overridepublic void converterFor(RelationalColumn relationalColumn, ConverterRegistration<SchemaBuilder> converterRegistration) {// 获取字段类型String columnType = relationalColumn.typeName().toUpperCase();log.info("数据库:{},字段名称:{},字段类型:{},jdbc type :{}", this.databaseType, relationalColumn.name(), columnType, relationalColumn.jdbcType());// 根据数据库类型调用不同的转换器if (this.databaseType.equals("mysql")) {this.registerMysqlConverter(columnType, converterRegistration);} else if (this.databaseType.equals("oracle")) {this.registerSqlserverConverter(columnType, converterRegistration);} else {log.warn("===failed 不支持的数据库类型: {}", this.databaseType);schemaBuilder = null;}}private String getClassName(Object value) {if (value == null) {return null;}return value.getClass().getName();}
}
3.3.2 配置自定义类型转换器
Properties prop = new Properties();prop.setProperty("converters", "dateConverters");prop.setProperty("dateConverters.type", "com.toroidal.utils.DebeziumConverter");prop.setProperty("dateConverters.database.type", "mysql");MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("localhost").port(3306)// 表所在的数据库库名,如果需要捕获整个库的表,配置为:".*".;如果需要捕获多个数据库配置为:.databaseList("cdc01", "cdc02").databaseList("cdc")// 设置需要捕获日志的表名,注意需要配置库名,大小敏感.tableList("cdc.flink_cdc_test").username("flinkcdc").serverTimeZone("Asia/Shanghai").serverId("flink-cdc-01").password("123456").debeziumProperties(prop)// 将 SourceRecord 转换为 JSON 字符串。.deserializer(new JsonDebeziumDeserializationSchema()).build();