您的位置:首页 > 科技 > IT业 > 深圳专业做网站的公司哪家好_广东seo点击排名软件哪家好_查询网 网站查询_找网站设计公司

深圳专业做网站的公司哪家好_广东seo点击排名软件哪家好_查询网 网站查询_找网站设计公司

2024/12/25 21:29:16 来源:https://blog.csdn.net/weixin_52642840/article/details/143992670  浏览:    关键词:深圳专业做网站的公司哪家好_广东seo点击排名软件哪家好_查询网 网站查询_找网站设计公司
深圳专业做网站的公司哪家好_广东seo点击排名软件哪家好_查询网 网站查询_找网站设计公司

keyBy案例

package com.bigdata.day02;public class _04_keyBy {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);DataStreamSource<String> dataStreamSource = env.readTextFile("datas/a.log");//2. source-加载数据dataStreamSource.map(new MapFunction<String, LogBean>() {@Overridepublic LogBean map(String s) throws Exception {String[] line = s.split("\\s+");LogBean logBean = new LogBean();logBean.setIp(line[0]);logBean.setUserId(Integer.parseInt(line[1]));logBean.setMethod(line[3]);// 17/05/2015:10:05:30SimpleDateFormat simpleDateFormat = new SimpleDateFormat("dd/MM/yyyy:hh:mm:ss");Date date = simpleDateFormat.parse(line[2]);// 另一种方法Date date1 = DateUtils.parseDate(line[2], "dd/MM/yyyy:hh:mm:ss");logBean.setTimestamp(date1.getTime());logBean.setPath(line[4]);return logBean;}}).map(new MapFunction<LogBean, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(LogBean logBean) throws Exception {return new Tuple2<>(logBean.getIp(), 1);}}).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {return stringIntegerTuple2.f0;}}).print();env.execute();}
}
可以认为是将key值一样的数据放在一个分区中
new KeySelector<Tuple2<String, Integer>, String>()
第一个参数表示传入的值为Tuple2 ,String表示key是该类型的
public String getKey(Tuple2<String, Integer> stringIntegerTuple2)
可以看出返回值是String  类型的,也就是说可以随意指定按照某个key进行group(有点类似)

元组类型

单个字段keyBy
//用字段位置(已经被废弃)
wordAndOne.keyBy(0)//用字段表达式
同上
多个字段keyBy
//用字段位置
wordAndOne.keyBy(0, 1);//用KeySelector
wordAndOne.keyBy(new KeySelector<Tuple2<String, Integer>, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> getKey(Tuple2<String, Integer> value) throws Exception {return Tuple2.of(value.f0, value.f1);}
});//也就是说多个字段返回值就为Tuple类型

POJO

单个字段keyBy
source.keyBy(a -> a.getProvince());
多个字段keyBy
source.keyBy(new KeySelector<PeopleCount, Tuple2<String, String>>() {@Overridepublic Tuple2<String, String> getKey(PeopleCount value) throws Exception {return Tuple2.of(value.getProvince(), value.getCity());}
});
//也就是说多个字段返回值就为Tuple类型

Reduce案例

——sum的底层是reduce

package com.bigdata.day02;public class _04_keyBy {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);DataStreamSource<String> dataStreamSource = env.readTextFile("datas/a.log");KeyedStream<Tuple2<String, Integer>, String> tuple2StringKeyedStream = dataStreamSource.map(new MapFunction<String, LogBean>() {@Overridepublic LogBean map(String s) throws Exception {String[] line = s.split("\\s+");LogBean logBean = new LogBean();logBean.setIp(line[0]);logBean.setUserId(Integer.parseInt(line[1]));logBean.setMethod(line[3]);// 17/05/2015:10:05:30Date date = DateUtils.parseDate(line[2], "dd/MM/yyyy:hh:mm:ss");logBean.setTimestamp(date.getTime());logBean.setPath(line[4]);   return logBean;}}).map(new MapFunction<LogBean, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(LogBean logBean) throws Exception {return new Tuple2<>(logBean.getIp(), 1);}}).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {return stringIntegerTuple2.f0;}});//        tuple2StringKeyedStream.sum(1).print();tuple2StringKeyedStream.reduce(new ReduceFunction<Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {return new Tuple2<>(t1.f0,t1.f1+t2.f1);}}).print();env.execute();}
}

new ReduceFunction<Tuple2<String, Integer>>()

public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2)

t1 用于表示每一条数据,t2 用于表示 结果的累计 ,

其中t1 和 t2 的第一个值是一样的,t2 的第二个值 一直再累加,而t1的为1 1 1 1 1 1

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com