一、介绍
- Table API 和 SQL 进行基于时间的操作(比如时间窗口)时需要定义相关的时间语义和时间数据来源的信息。因此会给表单独提供一个逻辑上的时间字段,专门用来在表处理程序中指示时间
- 时间属性(time attributes),其实就是每个表模式结构(schema)的一部分。它可以在创建表的 DDL 里直接定义为一个字段,也可以在 DataStream 转换成表时定义。一旦定义了时间属性,就可以作为一个普通字段引用,并且可以在基于时间的操作中使用
- 时间属性的数据类型为 TIMESTAMP,类似于常规时间戳,可以直接访问并且进行计算。
- 按照时间语义的不同,可以把时间属性的定义分成事件时间(event time)和处理时间(processing time)
二、处理时间定义
public class TestTableProcessingTime {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);DataStream<String> inputStream = env.readTextFile("./sensor.txt");DataStream<SensorReading> dataStream = inputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));});Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature, pt.proctime");tableEnv.connect(new FileSystem().path("./sensor.txt")) .withFormat(new Csv()) .withSchema(new Schema() .field("id", DataTypes.STRING()) .field("timestamp", DataTypes.BIGINT()).field("temperature", DataTypes.DOUBLE()).field("pt", DataTypes.TIMESTAMP(3)).proctime()).createTemporaryTable("sensorTable");String sinkDDL= "create table sensorTable (" +" id varchar(20) not null, " +" ts bigint, " +" temperature double, " +" pt AS PROCTIME() " +") with (" +" 'connector.type' = 'filesystem', " +" 'connector.path' = '/sensor.txt', " +" 'format.type' = 'csv')";tableEnv.sqlUpdate(sinkDDL);env.execute();}
}
三、事件时间定义
public class TestTableEventTime {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);DataStream<String> inputStream = env.readTextFile("./sensor.txt");DataStream<SensorReading> dataStream = inputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(2)) {@Overridepublic long extractTimestamp(SensorReading element) {return element.getTimestamp() * 1000L;}});Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature, rt.rowtime"); tableEnv.connect(new FileSystem().path("./sensor.txt")) .withFormat(new Csv()) .withSchema(new Schema() .field("id", DataTypes.STRING()) .field("timestamp", DataTypes.BIGINT()).rowtime( new RowTime().timestampsFromField("timestamp") .watermarksPeriodicBounded(1000) ).field("temperature", DataTypes.DOUBLE())).createTemporaryTable("sensorTable");String sinkDDL = "create table dataTable (" +" id varchar(20) not null, " + " ts bigint, " +" temperature double, " +" rt AS TO_TIMESTAMP( FROM_UNIXTIME(ts) ), " +" watermark for rt as rt - interval '1' second" +") with (" +" 'connector.type' = 'filesystem', " +" 'connector.path' = '/sensor.txt', " +" 'format.type' = 'csv')";tableEnv.sqlUpdate(sinkDDL);env.execute();}
}