文章目录
- Flink基础
- 今日课程内容目标
- Flink的执行环境
- 编程入口
- 执行模式
- 基本source算子
- 基于集合的Source(测试用)
- 基于Socket的Source(测试用)
- 基于文件的Source(测试用)
- Kafka Source(生产常用)
- 自定义Source
- 自定义Source,实现自定义&并行度为1的source
- 自定义Source,实现一个支持并行度的source
- 自定义Source,实现一个支持并行度的富类source
- 自定义Source,实现消费MySQL中的数据
- 数据的转换操作(Transformation算子)
- 映射算子
- map映射(DataStream → DataStream)
- flatMap扁平化映射(DataStream → DataStream)
- 过滤算子
- filter过滤(DataStream → DataStream)
- 分组算子
- keyBy按key分组(DataStream → KeyedStream)
- 聚合算子
- sum
- min、minBy
- max、maxBy
- reduce归约
- 基本sink算子
- 打印到控制台:print
- 打印到文件:StreamFileSink
- Row格式文件输出代码示例
- Bulk列式存储文件输出代码示例1
- Bulk列式存储文件输出代码示例2
- Bulk列式存储文件输出代码示例3
- 输出到Kafka
- 输出到 MySQL(JDBC)
- 不保证Exactly-Once的JdbcSink
- 保证Exactly-Once的JdbcSink
- 今日总结
Flink基础
今日课程内容目标
- Flink的执行环境
- 基本source算子
- 数据的转换操作(Transformation算子)
- 基本sink算子
Flink的执行环境
编程入口
- 流式计算入口(用于测试/生产)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- 流批一体入口(用于测试/生产)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 为env设置环境参数
ExecutionConfig config = env.getConfig();
// 设置为批处理模式
config.setExecutionMode(ExecutionMode.BATCH);
- 开启webui的本地运行环境(用于测试)
Configuration conf = new Configuration();
conf.setInteger("rest.port",8081);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
-
要开启本地webui功能,需要添加依赖
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web</artifactId><version>${flink.version}</version> </dependency>
执行模式
- 流执行(STREAMING)模式。
- STREAMING模式是DataStream API最经典的模式,一般用于需要持续实时处理的无界数据流。在默认情况下,程序使用的就是STREAMING模式。
- 批执行(BATCH)模式。
- BATCH模式是专门用于批处理的执行模式,在这种模式下,Flink处理作业的方式类似于MapReduce框架。对于不会持续计算的有界数据,用这种模式处理会更方便。
- 自动(AUTOMATIC)模式。
- 在AUTOMATIC模式下,将由程序根据输入数据源是否有界来自动选择执行模式。
执行模式的配置方法(以BATCH为例,默认是STREAMING模式):
-
通过命令行配置(在提交作业时,增加execution.runtime-mode参数,进行指定)
bin/flink run -Dexecution.runtime-mode=BATCH ...
-
通过代码配置(在代码中调用setRuntimeMode方法指定)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment (); env.setRuntimeMode(RuntimeExecutionMode.BATCH);
不要在代码中配置,而是使用命令行。这同设置并行度是类似的,在提交作业时指定参数可以更加灵活,同一段应用在程序写好之后,既可以用于批处理,又可以用于流处理,而在代码中进行硬编码的方式的可扩展性比较差,一般都不推荐。
基本source算子
基于集合的Source(测试用)
可将一个普通的Java集合、迭代器或者可变参数转换成一个分布式数据流DataStream;
-
fromElements
非并行的Source,可以将一到多个数据作为可变参数传入到该方法中,返回DataStreamSource。
public class FromElementDemo {public static void main(String[] args) throws Exception {//创建流计算执行上下文环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//指定多个相同类型的数据创建DataStreamDataStreamSource<String> words = env.fromElements("flink", "hadoop", "flink");//调用Sink将数据在控制台打印words.print();//执行env.execute("FromElementDemo");} }
-
fromCollection
非并行的Source,可以将一个Collection作为参数传入到该方法中,返回一个DataStreamSource。
//创建一个List List<String> wordList = Arrays.asList("flink", "spark", "hadoop", "flink"); //将List并行化成DataStream DataStreamSource<String> words = env.fromCollection(wordList);// fromParallelCollection fromParallelCollection(SplittableIterator, Class) 方法是一个并行的Source(并行度可以使用env的setParallelism来设置),该方法需要传入两个参数,第一个是继承SplittableIterator的实现类的迭代器,第二个是迭代器中数据的类型。 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3); //设置并行度为3//调用env的fromParallelCollection创建并行的生成数据的DataStreamSource DataStreamSource<Long> numbers = env.fromParallelCollection(new NumberSequenceIterator(1L, 10L), // 生成数组的rangeLong.class //输出数据的类型 );
-
generateSequence
并行的Source(并行度也可以通过调用该方法后,再调用setParallelism来设置)通过指定的起始值和结束值来生成数据序列流;
//调用env的generateSequence生成并行的DataSource,输出的数字是1到100 DataStreamSource<Long> numbers = env.generateSequence(1L, 100L).setParallelism(3);
基于Socket的Source(测试用)
非并行的Source,通过socket通信来获取数据得到数据流;
该方法还有多个重载的方法,如:
socketTextStream(String hostname, int port, String delimiter, long maxRetry)
可以指定行分隔符和最大重新连接次数。
//调用env的socketTextStream方法,从指定的Socket地址和端口创建DataStream
DataStreamSource<String> lines = env.socketTextStream("node1", 9999);
注意:socketSource是一个非并行source,如果使用socketTextStream读取数据,在启动Flink程序之前,必须先启动一个Socket服务,为了方便,Mac或Linux用户可以在命令行终端输入nc -lk 9999启动一个Socket服务并在命令行中向该Socket服务发送数据。
基于文件的Source(测试用)
基于文件的Source,本质上就是使用指定的FileInputFormat组件读取数据,可以指定TextInputFormat、CsvInputFormat、BinaryInputFormat等格式;
底层都是ContinuousFileMonitoringFunction,这个类继承了RichSourceFunction
,都是非并行的Source;
-
readFile
readFile(FileInputFormat inputFormat, String filePath) 方法可以指定读取文件的FileInputFormat 格式,参数FileProcessingMode,可取值:
- PROCESS_ONCE,只读取文件中的数据一次,读取完成后,程序退出
- PROCESS_CONTINUOUSLY,会一直监听指定的文件,文件的内容发生变化后,会将以前的内容和新的内容全部都读取出来,进而造成数据重复读取
String path = "D://word.txt"; //PROCESS_CONTINUOUSLY模式是一直监听指定的文件或目录,2秒钟检测一次文件是否发生变化 DataStreamSource<String> lines = env.readFile(new TextInputFormat(null), path,FileProcessingMode.PROCESS_CONTINUOUSLY, 2000);
-
readTextFile
readTextFile(String filePath) 可以从指定的目录或文件读取数据,默认使用的是TextInputFormat格式读取数据,还有一个重载的方法readTextFile(String filePath, String charsetName)可以传入读取文件指定的字符集,默认是UTF-8编码。该方法是一个有限的数据源,数据读完后,程序就会退出,不能一直运行。该方法底层调用的是readFile方法,FileProcessingMode为PROCESS_ONCE
DataStreamSource<String> lines = env.readTextFile(path);
Kafka Source(生产常用)
在实际生产环境中,为了保证flink可以高效地读取数据源中的数据,通常是跟一些分布式消息中件结合使用,例如Apache Kafka。Kafka的特点是分布式、多副本、高可用、高吞吐、可以记录偏移量等。Flink和Kafka整合可以高效的读取数据,并且可以保证Exactly Once(精确一次性语义)。
首先在maven项目的pom.xml文件中导入Flink跟Kafka整合的依赖
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>3.3.0-1.20</version>
</dependency>
参考代码:
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()//指定kafka集群的地址.setBootstrapServers("node1.itcast.cn:9092")//设置订阅的主题.setTopics("test01")//制定消费者组id.setGroupId("itcast001")//指定起始消费位移//OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST):消费起始位移选择之前所递交的偏移量(如果没有,则重置为Latest)//OffsetsInitializer.earliest():消费起始位移直接选择为最早//OffsetsInitializer.latest():消费起始位移直接选择为最新//OffsetsInitializer.offsets(Map<TopicPartition, Long>):消费起始位移选择为:方法所传入的每个分区对应的起始偏移量.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))//设置value数据的反序列化起(使用默认的字符串解析器).setValueOnlyDeserializer(new SimpleStringSchema())//设置动态分区检查(10秒钟检查一次新的分区).setProperty("partition.discovery.interval.ms", "10000")//设置开启kafka底层消费者的自动位移递交机制// TODO 注释:他会把最新的消费位移递交到kafka的consumer_offset中// TODO 注释:建议在于flink整合的时候不要设置true,由flink决定是否递交偏移量可以保证业务的端对端的一次性语义.setProperty("auto.offset.commit", "true")// TODO 注释:将这个source段子设置为Bounded属性(有界流)// 将来source去读取数据的时候,读取到指定的位置,停止读取并退出// 经常用来进行补数或者重跑一段历史的数据// .setBounded(OffsetsInitializer.committedOffsets())// TODO 注释:将这个source段子设置为UnBounded属性(无界流)// 并不会一直读取数据,而是达到了指定位置就停止读取,但是程序不会退出// 主要应用场景式:需要从kafka中读取某一段固定长度数据,然后拿着数据去跟另外一个真正的无界流联合处理// .setUnbounded(OffsetsInitializer.latest()).build();
// 将实例化成功的对象作为参数传入到fromSource方法中
DataStreamSource<String> kafkaStreamSource = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka-source");
自定义Source
Flink的DataStream API可以让开发者根据实际需要,灵活的自定义Source,本质上就是定义一个类,实现SourceFunction或继承RichParallelSourceFunction,实现run方法和cancel方法。
准备工作:
定义一个JavaBean对象
@NoArgsConstructor
@AllArgsConstructor
@Getter
@Setter
@ToString
class EventLog{//唯一标识符private long guid;//会话IDprivate String sessionId;//事件idprivate String eventId;//时间戳private long timeStamp;//事件信息private Map<String,String> eventInfo;
}
自定义Source,实现自定义&并行度为1的source
自定义source,实现SourceFunction接口,实现一个没有并行度的案例
功能:每隔 1s 进行生成一个****EventLog****
实现的方法:run(),作为数据源,所有数据的产生都在 run() 方法中实现
public class CustomSourceFunctionDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<EventLog> dataStreamSource = env.addSource(new MySourceFunction());dataStreamSource.map(JSON::toJSONString).print();env.execute();}
}
class MySourceFunction implements SourceFunction<EventLog> {volatile boolean flag = true;@Overridepublic void run(SourceContext<EventLog> ctx) throws Exception {EventLog eventLog = new EventLog();String[] events = {"hadoop","spark","flink","hbase","kafka","hdfs","mapreduce"};HashMap<String, String> eventInfoMap = new HashMap<>();while(flag) {eventLog.setGuid(RandomUtils.nextLong(1, 1000));//该方法随机生成一个包含大小写字母的字符串,一个参数表示该字符串包含的字母的个数eventLog.setSessionId(RandomStringUtils.randomAlphabetic(12).toUpperCase());eventLog.setTimeStamp(System.currentTimeMillis());eventLog.setEventId(events[RandomUtils.nextInt(0, events.length)]);//该方法随机生成一个包含大小写字母的字符串,但是该字符串的长度是随机的生成的,随机的范围就是该方法的两个参数,第一个表示随机的最小值,第二个表示随机的最大值eventInfoMap.put(RandomStringUtils.randomAlphabetic(1), RandomStringUtils.randomAlphabetic(2));eventLog.setEventInfo(eventInfoMap);ctx.collect(eventLog);eventInfoMap.clear();Thread.sleep(1000);}}@Overridepublic void cancel() {flag = false;}
}
自定义Source,实现一个支持并行度的source
实现ParallelSourceFunction接口
该接口只是个标记接口,用于标识继承该接口的Source都是并行执行的。其直接实现类是RichParallelSourceFunction,它是一个抽象类并继承自 AbstractRichFunction(从名称可以看出,它应该兼具 rich 和 parallel 两个特性,这里的rich体现在它定义了 open 和 close 这两个方法)。
public class CustomParallelSourceFunctionDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//DataStreamSource<EventLog> dataStreamSource = env.addSource(new MySourceFunction());DataStreamSource<EventLog> dataStreamSource = env.addSource(new MyParallelSourceFunction()).setParallelism(2);dataStreamSource.map(JSON::toJSONString).print();env.execute();}
}
class MyParallelSourceFunction implements ParallelSourceFunction<EventLog> {volatile boolean flag = true;@Overridepublic void run(SourceContext<EventLog> ctx) throws Exception {EventLog eventLog = new EventLog();String[] events = {"hadoop","spark","flink","hbase","kafka","hdfs","mapreduce"};HashMap<String, String> eventInfoMap = new HashMap<>();while(flag) {eventLog.setGuid(RandomUtils.nextLong(1, 1000));//该方法随机生成一个包含大小写字母的字符串,一个参数表示该字符串包含的字母的个数eventLog.setSessionId(RandomStringUtils.randomAlphabetic(12).toUpperCase());eventLog.setTimeStamp(System.currentTimeMillis());eventLog.setEventId(events[RandomUtils.nextInt(0, events.length)]);//该方法随机生成一个包含大小写字母的字符串,但是该字符串的长度是随机的生成的,随机的范围就是该方法的两个参数,第一个表示随机的最小值,第二个表示随机的最大值eventInfoMap.put(RandomStringUtils.randomAlphabetic(1), RandomStringUtils.randomAlphabetic(2));eventLog.setEventInfo(eventInfoMap);ctx.collect(eventLog);eventInfoMap.clear();Thread.sleep(1000);}}
自定义Source,实现一个支持并行度的富类source
RichParallelSourceFunction 中的rich体现在额外提供open和close方法
针对source中如果需要获取其他链接资源,那么可以在open方法中获取资源链接,在close中关闭资源链接。
public class CustomSourceFunctionDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//DataStreamSource<EventLog> dataStreamSource = env.addSource(new MySourceFunction());//DataStreamSource<EventLog> dataStreamSource = env.addSource(new MyParallelSourceFunction()).setParallelism(2);DataStreamSource<EventLog> dataStreamSource = env.addSource(new MyRichSourceFunction());// DataStreamSource<EventLog> dataStreamSource = env.addSource(new MyRichParallelSourceFunction()).setParallelism(2);dataStreamSource.map(JSON::toJSONString).print();env.execute();}
}class MyRichSourceFunction extends RichSourceFunction<EventLog> {volatile boolean flag = true;/*** source组件初始化** @param parameters* @throws Exception*/@Overridepublic void open(Configuration parameters) throws Exception {RuntimeContext runtimeContext = getRuntimeContext();// 可以从运行时上下文中,取到本算子所属的 task 的task名String taskName = runtimeContext.getTaskName();// 可以从运行时上下文中,取到本算子所属的 subTask 的subTaskIdint indexOfThisSubtask = runtimeContext.getIndexOfThisSubtask();// 如果需要获取其他链接资源,那么可以在open方法中获取资源链接System.out.println("资源链接.. ");}/*** source组件生成数据的过程(核心工作逻辑)** @param ctx* @throws Exception*/@Overridepublic void run(SourceContext<EventLog> ctx) throws Exception {EventLog eventLog = new EventLog();String[] events = {"hadoop", "spark", "flink", "hbase", "kafka", "hdfs", "mapreduce"};HashMap<String, String> eventInfoMap = new HashMap<>();while (flag) {eventLog.setGuid(RandomUtils.nextLong(1, 1000));eventLog.setSessionId(RandomStringUtils.randomAlphabetic(12).toUpperCase());eventLog.setTimeStamp(System.currentTimeMillis());eventLog.setEventId(events[RandomUtils.nextInt(0, events.length)]);eventInfoMap.put(RandomStringUtils.randomAlphabetic(1), RandomStringUtils.randomAlphabetic(2));eventLog.setEventInfo(eventInfoMap);ctx.collect(eventLog);eventInfoMap.clear();Thread.sleep(RandomUtils.nextInt(500, 1500));}}/*** job取消调用的方法*/@Overridepublic void cancel() {flag = false;}/*** 组件关闭调用的方法* @throws Exception*/@Overridepublic void close() throws Exception {// 在close中关闭资源链接System.out.println("资源关闭.. ");}
}
自定义Source,实现消费MySQL中的数据
这个更加接近实际的案例,上面我们已经使用了自定义数据源和Flink自带的Kafka source,那么接下来就模仿着写一个从 MySQL 中读取数据的 Source。
- mysql建表语句
create database if not exists flinkdemo;
use flinkdemo;
DROP TABLE IF EXISTS `user`;
CREATE TABLE `user` (`id` int(11) NOT NULL,`username` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,`password` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,`name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact;-- ----------------------------
-- Records of user
-- ----------------------------
INSERT INTO `user` VALUES (10, 'dazhuang', '123456', '大壮');
INSERT INTO `user` VALUES (11, 'erya', '123456', '二丫');
INSERT INTO `user` VALUES (12, 'sanpang', '123456', '三胖');SET FOREIGN_KEY_CHECKS = 1;
- 创建自定义Source类,继承 RichSourceFunction
public class CustomSourceFunctionDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<UserInfo> dataStreamSource = env.addSource(new MyRichParallelSourceFunction()).setParallelism(2);dataStreamSource.map(JSON::toJSONString).print();env.execute();}
}class MyRichParallelSourceFunction extends RichParallelSourceFunction<UserInfo> {private Connection connection = null; // 定义数据库连接对象private PreparedStatement ps = null; // 定义PreparedStatement对象/*使用open方法, 这个方法在实例化类的时候会执行一次, 比较适合用来做数据库连接*/@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);// 加载数据库驱动Class.forName("com.mysql.jdbc.Driver");// 创建数据库连接String url = "jdbc:mysql://node1:3306/flinkdemo?useUnicode=true&characterEncoding=utf-8&useSSL=false";this.connection = DriverManager.getConnection(url, "root", "123456");// 准备PreparedStatement对象this.ps = connection.prepareStatement("SELECT id, username, password, name FROM user");System.out.println("资源链接.. ");}/*使用close方法, 这个方法在销毁实例的时候会执行一次, 比较适合用来关闭连接*/@Overridepublic void close() throws Exception {super.close();// 关闭资源if (this.ps != null) this.ps.close();if (this.connection != null) this.connection.close();System.out.println("资源关闭.. ");}@Overridepublic void run(SourceContext<UserInfo> ctx) throws Exception {ResultSet resultSet = ps.executeQuery();while (resultSet.next()) {int id = resultSet.getInt("id");String username = resultSet.getString("username");String password = resultSet.getString("password");String name = resultSet.getString("name");ctx.collect(new UserInfo(id, username, password, name));}}@Overridepublic void cancel() {System.out.println("任务被取消......");}
}
/**数据定义类, POJO*/
@Data
@AllArgsConstructor
@NoArgsConstructor
class UserInfo {int id;String username;String password;String name;
}
数据的转换操作(Transformation算子)
映射算子
map映射(DataStream → DataStream)
map(new MapFunction )
MapFunction: (x)-> y [1条变1条]
public class MapDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//调用env的fromElements创建一个非并行的DataStreamSourceDataStreamSource<String> words = env.fromElements("hadoop","spark","flink","hbase","flink","spark");//在map方法中传入MapFunction实现类实例,重写map方法DataStream<String> upperWords = words.map(new MapFunction<String, String>() {@Overridepublic String map(String value) throws Exception {//将每一个单词转成大写return value.toUpperCase();}});//调用Sink将数据打印在控制台upperWords.print();env.execute("MapDemo");}
}
flatMap扁平化映射(DataStream → DataStream)
flatMap( new FlatMapFcuntion)
FlatMapFunction: x-> x1, x2,x3,x4 [1条变多条,并展平]
DataStream<Tuple2<String, Integer>> wordAndOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {//将一行字符串按空格切分成一个字符串数组String[] arr = line.split(" ");for (String word : arr) {//将单词转成小写放入到Collector中out.collect(Tuple2.of(word.toLowerCase(), 1));}}}
);
如果是调用flatMap方法时传入Lambda表达式,需要在调用flatMap方法后,在调用returns方法指定返回的数据的类型。不然Flink无法自动推断出返回的数据类型,会出现异常。
DataStream<Tuple2<String, Integer>> wAndOne = lines.flatMap((String line, Collector<Tuple2<String, Integer>> out) -> {Arrays.asList(line.split("\\W+")).forEach(word -> {out.collect(Tuple2.of(word.toLowerCase(), 1));});}
).returns(Types.TUPLE(Types.STRING, Types.INT)); //使用returns指定返回数据的类型
过滤算子
filter过滤(DataStream → DataStream)
filter(new FilterFunction)
FilterFunction : x -> true/false
DataStreamSource<Integer> numbers = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
//过滤掉奇数,保留偶数
DataStream<Integer> even = numbers.filter(new FilterFunction<Integer>() {@Overridepublic boolean filter(Integer value) throws Exception {return value % 2 == 0; //过滤掉返回false的数组}
});
分组算子
keyBy按key分组(DataStream → KeyedStream)
//按照Tuple2中的第0个位置进行分组,分组后得到KeyedStream
KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0);
//按照Bean中的属性名word进行分组
KeyedStream<CountBean, Tuple> keyed = wordAndOne.keyBy("word");
聚合算子
有了按键分区的数据流 KeyedStream,我们就可以基于它进行聚合操作了。
-
此处所说的聚合算子,是多个聚合算子的统称,有sum、min、minBy、max、maxBy;
-
这些算子的底层逻辑都是维护一个聚合值,并使用每条流入的数据对聚合值进行滚动更新;
-
这些算子都只能在KeyedStream上调用(就是必须keyby后调用);
sum
该算子实现实时滚动相加的功能,即新输入的数据和历史数据进行相加。
//按照Tuple2中的第0个位置进行分组,分组后得到KeyedStream
KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0);//将分组后的数据进行sum运算
DataStream<Tuple2<String, Integer>> result = keyed.sum(1);
//如果是自定义的POJO类型数据,可以传入一个要聚合的字段名称。
//按照Bean中的属性名word进行分组
KeyedStream<CountBean, Tuple> keyed = wordAndOne.keyBy("word");
//按照Bean中的属性名count进行sum聚合
DataStream<CountBean> result = keyed.sum("count");
min、minBy
这两个算子都是求最小值;min和minBy的区别在于:
-
min的返回值,最小值字段以外,其他字段是第一条输入数据的值;
-
minBy返回值,就是最小值字段所在的那条数据;
底层原理:滚动更新时是更新一个字段,还是更新整条数据的区别;
KeyedStream<Tuple3<String, String, Integer>, Tuple> keyedStream = wordAndCount.keyBy(1);
//将分组后的数据进行调用min、minBy
DataStream<Tuple3<String, String, Integer>> min1 = keyedStream.min(2);
DataStream<Tuple3<String, String, Integer>> min2 = keyedStream.minBy(2);
DataStream<Tuple3<String, String, Integer>> min3 = keyedStream.minBy(2, false);
//调用print sink打印结果
min1.print("min");
min2.print("minBy");
min3.print("minBy last");
max、maxBy
这两个算子都是求最大值,用法和min、minBy相同。
reduce归约
它的滚动聚合逻辑没有写死,而是由用户通过ReduceFunction来传入。
//按照Tuple2中的第0个位置进行分组,分组后得到KeyedStream
KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0);
//将分组后的数据进行reduce
DataStream<Tuple2<String, Integer>> reduced = keyed.reduce(new ReduceFunction<Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {t1.f1 += t2.f1; //将元组对应的次数进行累加return t1; //返回累加后的元组}}
);
基本sink算子
-
sink算子是将计算结果最终输出的算子
-
不同的sink算子可以将数据输出到不同的目标,如写入到的文件、输出到指定的网络端口、消息中间件、外部的文件系统或者是打印到控制台。
打印到控制台:print
打印是最简单的一个Sink,通常是用来做实验和测试时使用。
DataStream<Tuple2<String, Integer>> result = wordAndOne.keyBy(0).sum(1);
result.print();
打印到文件:StreamFileSink
该Sink不但可以将数据写入到各种文件系统中,而且整合了checkpoint机制来保证Exacly Once语义,还可以对文件进行分桶存储,还支持以列式存储的格式写入,功能更强大。
streamFileSink中输出的文件,其生命周期会经历3中状态:
- in-progress Files
- Pending Files
- Finished Files
Bucket:FileSink可向由Flink FileSystem抽象支持的文件系统写入分区文件(因为是流式写入,数据被视为无界)。该分区行为可配,默认按时间,具体来说每小时写入一个Bucket,该Bucket包括若干文件,内容是这一小时间隔内流中收到的所有record。
PartFile:每个Bukcket内部分为多个PartFile来存储输出数据,该Bucket生命周期内接收到数据的sink的每个子任务至少有一个PartFile。
而额外文件滚动由可配的滚动策略决定,默认策略是根据文件大小和打开超时(文件可以被打开的最大持续时间)以及文件最大不活动超时等决定是否滚动。
Bucket和SubTask、PartFile关系如图所示:

FileSink 支持****行编码(Row-encoded)*和*批量编码(Bulk-encoded,比如 Parquet)格式****。这两种不同的方式都有各自的构建器(builder),调用方法也非常简单,可以直接调用StreamingFileSink 的静态方法:
-
行编码:FileSink.forRowFormat(basePath,rowEncoder)
-
批量编码:FileSink.forBulkFormat(basePath,bulkWriterFactory)
在创建行或批量编码 Sink 时,我们需要传入两个参数,用来指定存储桶的基本路径(basePath)和数据的编码逻辑(rowEncoder 或 bulkWriterFactory)。
需要添加依赖:
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-avro</artifactId><version>${flink.version}</version>
</dependency>
<dependency><groupId>org.apache.parquet</groupId><artifactId>parquet-avro</artifactId><version>${parquet-avro}</version>
</dependency>
<dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop.version}</version>
</dependency>
Row格式文件输出代码示例
需求:编写Flink程序,每隔1s生成字符串数据,然后将接收到的数据流式方式存储到hdfs。
-
自定义数据源,每秒钟生成一条数据
public class MySourceFunction implements SourceFunction<EventLog>{volatile boolean flag = true;@Overridepublic void run(SourceContext<EventLog> ctx) throws Exception {EventLog eventLog = new EventLog();String[] events = { "hadoop", "spark", "flink", "hbase", "kafka", "hdfs", "mapreduce"};HashMap<String, String> eventInfoMap = new HashMap<>();while (flag) {eventLog.setGuid(RandomUtils.nextLong(1, 1000));eventLog.setSessionId(RandomStringUtils.randomAlphabetic(12).toUpperCase());eventLog.setTimeStamp(System.currentTimeMillis());eventLog.setEventId(events[RandomUtils.nextInt(0, events.length)]);eventInfoMap.put(RandomStringUtils.randomAlphabetic(1), RandomStringUtils.randomAlphabetic(2));eventLog.setEventInfo(eventInfoMap);ctx.collect(eventLog);eventInfoMap.clear();Thread.sleep(RandomUtils.nextInt(200, 1500));}}@Overridepublic void cancel() {flag = false;} }
-
示例代码
public class FileSinkRowFormat_Demo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig().setCheckpointStorage("file:///d:/ckpt");env.setParallelism(2);DataStreamSource<EventLog> streamSource = env.addSource(new MySourceFunction());/*** 应用 FileSink 算子,来将数据输出到 文件系统*//*** 1. 输出为 行格式*/// 构造一个FileSink对象FileSink<String> rowSink = FileSink.forRowFormat(new Path("d:/filesink/rowformat"), new SimpleStringEncoder<String>("utf-8"))// 文件的滚动策略 (间隔时长10s,或文件大小达到 5M,就进行文件切换.withRollingPolicy(DefaultRollingPolicy.builder()//至少包含多少时间的数据.withRolloverInterval(Duration.ofSeconds(10))//多少时间没有新的数据.withInactivityInterval(Duration.ofSeconds(10))//数据达到多大1G.withMaxPartSize(MemorySize.ofMebiBytes(1)).build())// 分桶的策略(划分子文件夹的策略).withBucketAssigner(new DateTimeBucketAssigner<String>())// 输出文件的文件名相关配置.withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("itcast").withPartSuffix(".txt").build()).build();// 然后添加到流,进行输出streamSource.map(JSON::toJSONString)//.addSink() /* SinkFunction实现类对象,用addSink() 来添加*/.sinkTo(rowSink).uid("fileSink"); /*Sink 的实现类对象,用 sinkTo()来添加 */env.execute();} }
Bulk列式存储文件输出代码示例1
需求:手动构建Avro的Schema对象,得到ParquetWriterFactory的方式实现如上需求
public class FileSinkBulkFormat_Demo1 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 开启checkpointenv.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig().setCheckpointStorage("file:///d:/ckpt");env.setParallelism(1);/*** 方式一:* 核心逻辑:* - 构造一个schema* - 利用schema构造一个parquetWriterFactory* - 利用parquetWriterFactory构造一个FileSink算子* - 将原始数据转成GenericRecord流,输出到FileSink算子*/// 1. 先定义GenericRecord的数据模式Schema schema = SchemaBuilder.builder().record("DataRecord").namespace("cn.itcast.flink.avro.schema").fields().requiredInt("gid").requiredLong("ts").requiredString("eventId").requiredString("sessionId").name("eventInfo").type().map().values().type("string").noDefault().endRecord();// 构造好一个数据流DataStreamSource<EventLog> streamSource = env.addSource(new MySourceFunction());// 2. 通过定义好的schema模式,来得到一个parquetWriterParquetWriterFactory<GenericRecord> writerFactory = AvroParquetWriters.forGenericRecord(schema);// 3. 利用生成好的parquetWriter,来构造一个 支持列式输出parquet文件的 sink算子FileSink<GenericRecord> sink1 = FileSink.forBulkFormat(new Path("d:/filesink/bulkformat"), writerFactory).withBucketAssigner(new DateTimeBucketAssigner<GenericRecord>("yyyy-MM-dd--HH")).withRollingPolicy(OnCheckpointRollingPolicy.build()).withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("itcast").withPartSuffix(".parquet").build()).build();// 4. 将自定义javabean的流,转成 上述sink算子中parquetWriter所需要的 GenericRecord流SingleOutputStreamOperator<GenericRecord> recordStream = streamSource.map((MapFunction<EventLog, GenericRecord>) eventLog -> {// 构造一个Record对象GenericData.Record record = new GenericData.Record(schema);// 将数据填入recordrecord.put("gid", (int) eventLog.getGuid());record.put("eventId", eventLog.getEventId());record.put("ts", eventLog.getTimeStamp());record.put("sessionId", eventLog.getSessionId());record.put("eventInfo", eventLog.getEventInfo());return record;}).returns(new GenericRecordAvroTypeInfo(schema)); // 由于avro的相关类、对象需要用avro的序列化器,所以需要显式指定AvroTypeInfo来提供AvroSerializer// 5. 输出数据recordStream.sinkTo(sink1).uid("fileSink");env.execute();}
}
Bulk列式存储文件输出代码示例2
需求:编写avsc 配置文件,并利用插件生成“特定JavaBean”,得到ParquetWriterFactory的方式实现如上需求。
开发步骤:
编写一个avsc文本文件(json),来描述数据模式
{"namespace": "cn.itcast.chapter5.avro.schema","type": "record","name": "AvroEventLog","fields": [{"name": "guid", "type": "long"},{"name": "sessionId", "type": "string"},{"name": "eventId", "type": "string"},{"name": "timeStamp", "type": "long"},{"name": "eventInfo", "type": { "type":"map","values": "string"} }]
}
-
添加 maven代码生成器插件,来针对上述的avsc生成avro特定格式的JavaBean类
-
利用代码生成器生成的 JavaBean,来构造一个 parquetWriterFactory
-
利用parquetWriterFactory构造一个FileSink算子
-
将原始数据流 转成 特定格式JavaBean流,输出到 FileSink算子
示例代码:
public class FileSinkBulkFormat_Demo2 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 开启checkpointenv.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig().setCheckpointStorage("file:///d:/ckpt");// 构造好一个数据流DataStreamSource<EventLog> streamSource = env.addSource(new MySourceFunction());/*** 方式二:* 核心逻辑:* - 编写一个avsc文本文件(json),来描述数据模式* - 添加 maven代码生成器插件,来针对上述的avsc生成avro特定格式的JavaBean类* - 利用代码生成器生成的 JavaBean,来构造一个 parquetWriterFactory* - 利用parquetWriterFactory构造一个FileSink算子* - 将原始数据流 转成 特定格式JavaBean流,输出到 FileSink算子*/// 1. 先定义avsc文件放在resources文件夹中,并用maven的插件,来编译一下,生成特定格式的JavaBean : AvroEventLog// 这种根据avsc生成的JavaBean类,自身就已经带有了Schema对象// AvroEventLog avroEventLog = new AvroEventLog();// Schema schema = avroEventLog.getSchema();// 2. 通过自动生成 AvroEventLog类,来得到一个parquetWriterParquetWriterFactory<AvroEventLog> parquetWriterFactory = AvroParquetWriters.forSpecificRecord(AvroEventLog.class);// 3. 利用生成好的parquetWriter,来构造一个 支持列式输出parquet文件的 sink算子FileSink<AvroEventLog> bulkSink = FileSink.forBulkFormat(new Path("d:/filesink/bulkformat2"), parquetWriterFactory).withBucketAssigner(new DateTimeBucketAssigner<AvroEventLog>("yyyy-MM-dd--HH")).withRollingPolicy(OnCheckpointRollingPolicy.build()).withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("itcast").withPartSuffix(".parquet").build()).build();// 4. 将自定义javabean的 EventLog 流,转成 上述sink算子中parquetWriter所需要的 AvroEventLog 流SingleOutputStreamOperator<AvroEventLog> avroEventLogStream = streamSource.map(new MapFunction<EventLog, AvroEventLog>() {@Overridepublic AvroEventLog map(EventLog eventLog) throws Exception {HashMap<CharSequence, CharSequence> eventInfo1 = new HashMap<>();// 进行hashmap<charsequenct,charsequence>类型的数据转移Map<String, String> eventInfo2 = eventLog.getEventInfo();Set<Map.Entry<String, String>> entries = eventInfo2.entrySet();for (Map.Entry<String, String> entry : entries) {eventInfo1.put(entry.getKey(), entry.getValue());}return new AvroEventLog(eventLog.getGuid(), eventLog.getSessionId(), eventLog.getEventId(), eventLog.getTimeStamp(), eventInfo1);}});// 5. 输出数据avroEventLogStream.sinkTo(bulkSink);env.execute();}
}
Bulk列式存储文件输出代码示例3
需求:直接利用普通JavaBean,利用工具自身的反射机制,得到ParquetWriterFactory的方式实现如上需求;
public class FileSinkBulkFormat_Demo3 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 开启checkpointenv.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig().setCheckpointStorage("file:///d:/ckpt");// 构造好一个数据流DataStreamSource<EventLog> streamSource = env.addSource(new MySourceFunction());// 将上面的数据流输出到文件系统(假装成一个经过了各种复杂计算后的结果数据流)/*** 方式三:* 核心逻辑:* - 利用自己的JavaBean类,来构造一个 parquetWriterFactory* - 利用parquetWriterFactory构造一个FileSink算子* - 将原始数据流,输出到 FileSink算子*/// 2. 通过自己的JavaBean类,来得到一个parquetWriterParquetWriterFactory<EventLog> parquetWriterFactory = AvroParquetWriters.forReflectRecord(EventLog.class);// 3. 利用生成好的parquetWriter,来构造一个 支持列式输出parquet文件的 sink算子FileSink<EventLog> bulkSink = FileSink.forBulkFormat(new Path("d:/filesink/bulkformat3"), parquetWriterFactory).withBucketAssigner(new DateTimeBucketAssigner<EventLog>("yyyy-MM-dd--HH")).withRollingPolicy(OnCheckpointRollingPolicy.build()).withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("itcast").withPartSuffix(".parquet").build()).build();// 5. 输出数据streamSource.sinkTo(bulkSink);env.execute();}
}
输出到Kafka
Kafka 是一个分布式的基于发布/订阅的消息系统,本身处理的也是流式数据,所以跟Flink经常放到一起使用,作为 Flink 的输入数据源和输出系统。Flink 官方为 Kafka 提供了 Source和 Sink 的连接器,我们可以用它方便地从 Kafka 读写数据。如果仅仅是支持读写,那还说明不了 Kafka 和 Flink 关系的亲密;真正让它们密不可分的是,*Flink 与 Kafka 的连接器提供了端到端的精确一次(exactly once)语义保证,这在实际项目中是最高级别的一致性保证*。
import cn.itcast.chapter6.beans.EventLog;
import cn.itcast.chapter6.utils.source.MySourceFunction;
import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** @author : www.itcast.cn* @date : 22.10.22 13:21* @Desc:* 利用KafkaSink将数据流写入kafka* 测试准备,创建目标topic:* kafka-topics.sh --create --topic event-log --partitions 3 --replication-factor 2 --zookeeper node1.itcast.cn:2181**/
public class KafkaSink_Demo {public static void main(String[] args) throws Exception {Configuration configuration = new Configuration();StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);// 开启checkpointenv.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig().setCheckpointStorage("file:///d:/ckpt");// 构造好一个数据流DataStreamSource<EventLog> streamSource = env.addSource(new MySourceFunction());// 把数据写入kafka// 1. 构造一个kafka的sink算子KafkaSink<String> kafkaSink = KafkaSink.<String>builder().setBootstrapServers("node1.itcast.cn:9092,node2.itcast.cn:9092").setRecordSerializer(KafkaRecordSerializationSchema.<String>builder().setTopic("event-log").setValueSerializationSchema(new SimpleStringSchema()).build()).setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).build();// 2. 把数据流输出到构造好的sink算子streamSource.map(JSON::toJSONString).disableChaining().sinkTo(kafkaSink);env.execute();}
}
注意:如果使用DeliveryGuarantee.EXACTLY_ONCE 的语义保证,则需要使用 setTransactionalIdPrefix(String),如:
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setTransactionalIdPrefix(“itcast-” + RandomUtils.nextInt(1, 100))
.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG , “36000”)
输出到 MySQL(JDBC)
不保证Exactly-Once的JdbcSink
-
创建表结构
CREATE TABLE flinkdemo.t_eventlog ( guid BIGINT NOT NULL, sessionId varchar(100) NULL, eventId varchar(100) NULL, `timeStamp` BIGINT NULL, eventInfo varchar(500) NULL, CONSTRAINT t_eventlog_PK PRIMARY KEY (guid) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_general_ci;
-
示例代码
public class JdbcSink_Demo {public static void main(String[] args) throws Exception {Configuration configuration = new Configuration();configuration.setInteger("rest.port", 8081);StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);// 开启checkpointenv.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig().setCheckpointStorage("file:///d:/ckpt");// 构造好一个数据流DataStreamSource<EventLog> streamSource = env.addSource(new MySourceFunction());/*** 一、 不保证 EOS语义的方式*/SinkFunction<EventLog> jdbcSink = JdbcSink.sink("insert into t_eventlog values (?,?,?,?,?) on duplicate key update sessionId=?,eventId=?,ts=?,eventInfo=? ",new JdbcStatementBuilder<EventLog>() {@Overridepublic void accept(PreparedStatement preparedStatement, EventLog eventLog) throws SQLException {preparedStatement.setLong(1, eventLog.getGuid());preparedStatement.setString(2, eventLog.getSessionId());preparedStatement.setString(3, eventLog.getEventId());preparedStatement.setLong(4, eventLog.getTimeStamp());preparedStatement.setString(5, JSON.toJSONString(eventLog.getEventInfo()));preparedStatement.setString(6, eventLog.getSessionId());preparedStatement.setString(7, eventLog.getEventId());preparedStatement.setLong(8, eventLog.getTimeStamp());preparedStatement.setString(9, JSON.toJSONString(eventLog.getEventInfo()));}},JdbcExecutionOptions.builder().withMaxRetries(3).withBatchSize(1).build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://node1:3306/test?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8").withUsername("root").withPassword("123456").build());// 输出数据streamSource.addSink(jdbcSink);env.execute();} }
保证Exactly-Once的JdbcSink
- 示例代码
public class JdbcSink_Demo {public static void main(String[] args) throws Exception {Configuration configuration = new Configuration();configuration.setInteger("rest.port", 8081);StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);// 开启checkpointenv.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig().setCheckpointStorage("file:///d:/ckpt");// 构造好一个数据流DataStreamSource<EventLog> streamSource = env.addSource(new MySourceFunction());/*** 二、可以提供 EOS 语义保证的 sink*/SinkFunction<EventLog> exactlyOnceSink = JdbcSink.exactlyOnceSink("insert into t_eventlog values (?,?,?,?,?) on duplicate key update sessionId=?,eventId=?,ts=?,eventInfo=? ",new JdbcStatementBuilder<EventLog>() {@Overridepublic void accept(PreparedStatement preparedStatement, EventLog eventLog) throws SQLException {preparedStatement.setLong(1, eventLog.getGuid());preparedStatement.setString(2, eventLog.getSessionId());preparedStatement.setString(3, eventLog.getEventId());preparedStatement.setLong(4, eventLog.getTimeStamp());preparedStatement.setString(5, JSON.toJSONString(eventLog.getEventInfo()));preparedStatement.setString(6, eventLog.getSessionId());preparedStatement.setString(7, eventLog.getEventId());preparedStatement.setLong(8, eventLog.getTimeStamp());preparedStatement.setString(9, JSON.toJSONString(eventLog.getEventInfo()));}},JdbcExecutionOptions.builder().withMaxRetries(0).withBatchSize(1).build(),JdbcExactlyOnceOptions.builder()// mysql不支持同一个连接上存在并行的多个事务,必须把该参数设置为true.withTransactionPerConnection(true).build(),new SerializableSupplier<XADataSource>() {@Overridepublic XADataSource get() {// XADataSource就是jdbc连接,不过它是支持分布式事务的连接// 而且它的构造方法,不同的数据库构造方法不同MysqlXADataSource xaDataSource = new MysqlXADataSource();xaDataSource.setUrl("jdbc:mysql://node1:3306/test?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8");xaDataSource.setUser("root");xaDataSource.setPassword("123456");return xaDataSource;}});// 输出数据streamSource.addSink(exactlyOnceSink);env.execute();}
}
今日总结
本章节依次讲解了执行环境的创建、数据源的读取、数据流的转换操作,和最终结果数据的输出,对各种常见的转换操作 API 和外部系统的连接都做了详细介绍,这些api都是最基本的api,除此之外还有更复杂的api将会在后面的章节进行详细的介绍,通过这个章节主要让大家掌握 DataStream API 的基本用法以及熟悉 Flink 的编程习惯。