您的位置:首页 > 科技 > 能源 > 制作网站多少钱一个_网站设计制作一条龙免费_优秀网页设计赏析_北京网站建设公司大全

制作网站多少钱一个_网站设计制作一条龙免费_优秀网页设计赏析_北京网站建设公司大全

2024/11/17 16:46:25 来源:https://blog.csdn.net/ABU009/article/details/143571529  浏览:    关键词:制作网站多少钱一个_网站设计制作一条龙免费_优秀网页设计赏析_北京网站建设公司大全
制作网站多少钱一个_网站设计制作一条龙免费_优秀网页设计赏析_北京网站建设公司大全

滑动窗口的使用,主要是计算,在reduce之前添加滑动窗口,设置好间隔和所统计的时间,然后再进行reduce计算数据即可。

窗口设置好时间间隔,和处理时间窗口的时间,比如将滑动窗口的时间间隔都设置为5s,处理时间为15s,意思是每隔五秒,就处理15s秒的数据

滑动窗口(window)

比如打了3s的输入,到了第五秒的时候,滑动window开始处理15秒的数据,数据就像滑动一样,用一个线段展示。

代码展示:


import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;public class Demo4Window {public static void main(String[] args) throws Exception {//1、创建环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//2、读取数据DataStream<String> linesDS = env.socketTextStream("master", 8888);//使用lambda表达式处理数据DataStream<String> wordsDS = linesDS.flatMap((line, out) -> {for (String word : line.split(",")) {out.collect(word);}}, Types.STRING);DataStream<Tuple2<String, Integer>> kvDS = wordsDS.map(word -> Tuple2.of(word, 1))//指定返回类型.returns(Types.TUPLE(Types.STRING, Types.INT));KeyedStream<Tuple2<String, Integer>, String> keyByDS = kvDS.keyBy(kv -> kv.f0);/** SlidingProcessingTimeWindows:滑动的处理时间窗口*/WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowDS = keyByDS//每隔5秒计算最近15秒的数据.window(SlidingProcessingTimeWindows.of(Time.seconds(15), Time.seconds(5)));//kv1代表之前的结果(状态),kv2代码最新一条数据//reduce:有状态计算DataStream<Tuple2<String, Integer>> countDS = windowDS.reduce((kv1, kv2) -> Tuple2.of(kv1.f0, kv1.f1 + kv2.f1));countDS.print();//execute方法会触发任务执行(任务调度)env.execute("lambda");}
}

滑动窗口(windowAll) 

将同一个窗口的数据放在一起计算,将之前计算的结果与最新统计的结果相加

 代码展示:

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.AllWindowedStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;public class Demo4WindowAll {public static void main(String[] args) throws Exception {//1、创建环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//2、读取数据DataStream<String> linesDS = env.socketTextStream("master", 8888);//使用lambda表达式处理数据DataStream<String> wordsDS = linesDS.flatMap((line, out) -> {for (String word : line.split(",")) {out.collect(word);}}, Types.STRING);DataStream<Tuple2<String, Integer>> kvDS = wordsDS.map(word -> Tuple2.of(word, 1))//指定返回类型.returns(Types.TUPLE(Types.STRING, Types.INT));/** SlidingProcessingTimeWindows:滑动的处理时间窗口*/AllWindowedStream<Tuple2<String, Integer>, TimeWindow> windowAllDS = kvDS//每隔5秒计算最近15秒的数据//windowAll:将同一个窗口的数据发一起进行计算.windowAll(SlidingProcessingTimeWindows.of(Time.seconds(15), Time.seconds(5)));//kv1代表之前的结果(状态),kv2代码最新一条数据//reduce:有状态计算DataStream<Tuple2<String, Integer>> countDS = windowAllDS.reduce((kv1, kv2) -> Tuple2.of(kv1.f0, kv1.f1 + kv2.f1));countDS.print();//execute方法会触发任务执行(任务调度)env.execute("lambda");}
}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com