您的位置:首页 > 健康 > 美食 > flink wordcount

flink wordcount

2024/10/6 6:04:52 来源:https://blog.csdn.net/qq_45972323/article/details/142027535  浏览:    关键词:flink wordcount

Maven配置pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>com.atguigu</artifactId><version>1.0-SNAPSHOT</version><properties><flink.version>1.17.0</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency></dependencies></project>

java编写wordcount代码

基于DataSet API(过时的,不推荐)
之后用 DataStream API

package com.atguigu.wc;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;public class WordCountBatchDemo {public static void main(String[] args) throws Exception {//1.创建执行环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//2.读取数据,从文件中读取DataSource<String> lineDS = env.readTextFile("input/word.txt");//3.切分、转换(word,1)FlatMapOperator<String, Tuple2<String, Integer>> wordAndOne = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {//Todo3.1 按照空格 切分单词String[] words = value.split(" ");//Todo3.2 将单词转换为(word,1)for (String word : words) {Tuple2<String, Integer> wordTuple2 = Tuple2.of(word, 1);//Todo3.3 调用采集器collector 向下游发送数据out.collect(wordTuple2);}}});//4.按照word分组UnsortedGrouping<Tuple2<String, Integer>> wordAndOneGroupBy = wordAndOne.groupBy(0);//5.各分组内聚合AggregateOperator<Tuple2<String, Integer>> sum = wordAndOneGroupBy.sum(1);//6.输出sum.print();}
}

在这里插入图片描述

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com