您的位置:首页 > 新闻 > 热点要闻 > 网页设计作品源代码彼岸花坊_移动端显卡天梯图2024_郑州百度推广公司电话_聚合搜索引擎

网页设计作品源代码彼岸花坊_移动端显卡天梯图2024_郑州百度推广公司电话_聚合搜索引擎

2025/3/10 23:28:49 来源:https://blog.csdn.net/weixin_52642840/article/details/144226512  浏览:    关键词:网页设计作品源代码彼岸花坊_移动端显卡天梯图2024_郑州百度推广公司电话_聚合搜索引擎
网页设计作品源代码彼岸花坊_移动端显卡天梯图2024_郑州百度推广公司电话_聚合搜索引擎

流——>表

方式一

方式二

方式一:写sql 
DataStreamSource<String> source = env.socketTextStream("localhost", 8881);
// 表名,流,字段名称
tableEnv.createTemporaryView("t_1",source,$("word"));方式二:使用dsl
DataStreamSource<String> source = env.socketTextStream("localhost", 8881);
// 表名,流,字段名称
Table table = tableEnv.fromDataStream(source,$("word"));

 表——>流

Table table = tEnv.sqlQuery("select word,count(1) wordCount from t_1 group by word");// 方式一:toAppendStream
DataStream<Row> appendStream = tEnv.toAppendStream(table, Row.class);// 报错:toAppendStream doesn't support consuming update changes which is produced by node GroupAggregate(groupBy=[word], select=[word, SUM(num) AS sumNum])// 这个不支持分组和聚合操作,若出现聚合操作使用方式二将表转为流//方式二:toRetractStream
DataStream<Tuple2<Boolean, Row>> retractStream = tEnv.toRetractStream(table, Row.class);

 wordCount案例

方式一:使用sql

package com.bigdata.day07;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;import static org.apache.flink.table.api.Expressions.$;/*** @基本功能:* @program:flinkProject* @author: 堇年* @create:2024-11-28 14:42:27**/
public class _06_flink_wordcounnt {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 获取tableEnv对象// 通过env 获取一个table 环境StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);DataStreamSource<String> source = env.socketTextStream("localhost", 8881);SingleOutputStreamOperator<String> flatMap = source.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {String[] split = value.split(",");for (String s : split) {out.collect(s);}}});//2. 创建表对象tEnv.createTemporaryView("t_1",flatMap,$("word"));//3. 编写sql语句Table table = tEnv.sqlQuery("select word,count(1) wordCount from t_1 group by word");//4. 将Table变为stream流//使用toAppendStream时会报错 因为有聚合操作//DataStream<Row> appendStream = tEnv.toAppendStream(table, Row.class);// toAppendStream doesn't support consuming update changes which is produced by node GroupAggregate(groupBy=[word], select=[word, SUM(num) AS sumNum])// 在这里可以映射为ROW对象,也可以映射为自己定义的实体类DataStream<Tuple2<Boolean, Row>> retractStream = tEnv.toRetractStream(table, Row.class);retractStream.filter(new FilterFunction<Tuple2<Boolean, Row>>() {@Overridepublic boolean filter(Tuple2<Boolean, Row> value) throws Exception {return value.f0;}}).print();//5. execute-执行env.execute();}
}

方式二:使用dsl语句 

package com.bigdata.day07;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;import static org.apache.flink.table.api.Expressions.$;public class _06_flink_wordcounnt_dsl {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 获取tableEnv对象// 通过env 获取一个table 环境StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);DataStreamSource<String> source = env.socketTextStream("localhost", 8881);SingleOutputStreamOperator<String> flatMap = source.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {String[] split = value.split(",");for (String s : split) {out.collect(s);}}});//2. 创建表对象Table table = tEnv.fromDataStream(flatMap,$("word"));//3. 编写sql语句Table rsTable = table.groupBy($("word")).select($("word"),$("word").count().as("wordcount"));rsTable.printSchema();//4. 将Table变为stream流DataStream<Tuple2<Boolean, Row>> retractStream = tEnv.toRetractStream(rsTable, Row.class);retractStream.filter(new FilterFunction<Tuple2<Boolean, Row>>() {@Overridepublic boolean filter(Tuple2<Boolean, Row> value) throws Exception {return value.f0;}}).print();//5. execute-执行env.execute();}
}

结果展示 

+I 表示有一条新数据进行了插入
+U 表示有一条已存在的数据有插入了一条,需要进行更新
-U 在+U前表示,先删除原本的,在update新的

 

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com