#工作记录#
在同一个消费组消费kafka里面的数据时我们一般会通过:scan.startup.mode
--从kafka最开始点位消费
'scan.startup.mode'='earliest-offset'
--从kafka最新点位开始消费
'scan.startup.mode'='latest-offset'
一般kafka上的数据不特殊设置都是保存7天。加入我们想指定时间点位来消费的话可以通过以下方式设置
--消费方式设置为时间戳
'scan.startup.mode'='timestamp',
--指定消费点位的时间戳即可,时间戳为13位
'scan.startup.timestamp-millis'='1725811200000',
以上就是kafka两种消费的设置方式。
以下是我flinksql设置的语句:
,PRIMARY KEY(ID,NAME) NOT ENFORCED
) with (
'connector'='kafka',
'topic'='DEBEZIUM_KAFKA',
--'scan.startup.mode'='earliest-offset',
'scan.startup.mode'='timestamp',
'scan.startup.timestamp-millis'='1725811200000',
'properties.group.id'='KAFKA_HUDI1',
'properties.bootstrap.servers'='111.111.11.111:9092,111.111.11.112:9092,111.111.11.113:9092',
'value.format'='debezium-json',
'scan.topic-partition-discovery.interval' = '10000',
'value.debezium-json.ignore-parse-errors' = 'true'
);