CREATE CATALOG mypg WITH('type' = 'jdbc','default-database' = 'postgres','username' = 'postgres','password' = '','base-url' = 'jdbc:postgresql://10.50.108.42:5432'
);create table soc_all (
WATERMARK FOR collectorreceipttime AS collectorreceipttime - INTERVAL '5' SECOND
)
WITH('connector' = 'jdbc','url' = 'jdbc:postgresql://10.50.108.42:5432/postgres','username' = 'postgres','password' = '','driver' = 'org.postgresql.Driver'
)
LIKE `mypg`.`postgres`.`public.soc_local`;CREATE TABLE sink_pg(srcUserName STRING,eventTime TIMESTAMP(3),ip STRING,baseline STRING,alert BOOLEAN
)
WITH ('connector' = 'jdbc','url' = 'jdbc:postgresql://10.50.108.42:5432/postgres','table-name' = 'alert','username' = 'postgres','password' = '','driver' = 'org.postgresql.Driver'
);-- 改成全部小写
insert into sink_pg
select'' as srcusername,ceil(collectorreceipttime to hour) as eventtime,srcaddress as ip,'' as baseline,false
from soc_all
where collectorreceipttime >= '2024-10-16'
and collectorreceipttime < '2024-10-27'
and srcaddress <> ''
limit 100;
踩坑
flink 水印字段要求 timestamp(0-3)
postgres字段类型 timestamp 默认 timestamp(6),需要转换后才能作为水印字段
ALTER TABLE public.soc_local
ALTER COLUMN collectorReceiptTime TYPE TIMESTAMP(3);
postgresql 大小写不敏感
pg 中查询大写字段需要加双引号,但是flink sql 不支持引号,所以用flinksql查询pg大写字段会报错,见参考链接。
Caused by: java.lang.IllegalArgumentException: open() failed.ERROR: column "collectorreceipttime" does not existHint: Perhaps you meant to reference the column "soc_local.collectorReceiptTime".
解决方案,把pg中字段改成全小写
参考
- https://issues.apache.org/jira/browse/FLINK-23324
- https://stackoverflow.com/questions/77383157/flink-postgres-jdbc-source-connector-read-uppercase-field-failed