您的位置:首页 > 游戏 > 游戏 > 国内外免费开源cms_小皮怎么创建网站_高报师培训机构排名_网络营销策划书3000字

国内外免费开源cms_小皮怎么创建网站_高报师培训机构排名_网络营销策划书3000字

2025/1/8 15:50:01 来源:https://blog.csdn.net/qq_73339471/article/details/142469110  浏览:    关键词:国内外免费开源cms_小皮怎么创建网站_高报师培训机构排名_网络营销策划书3000字
国内外免费开源cms_小皮怎么创建网站_高报师培训机构排名_网络营销策划书3000字

Flink加载维度数据

1、为何要加载维度数据?

在我们构建实时数仓时,不能光有事实数据,也需要加载维度数据来标明这些事实数据的具体含义。若只含有事实数据的话,就相当于只有数据本身在不断地变化,而并不知道这些数据具体表示什么意思。因此,我们应当加载维度数据进来。

2、加载维度数据的方式

此处,将提供两种常见的用于加载维度数据的方式。

方式一:缓存文件

district.txt文件:存放于resources资源目录下

1   nanjing
2   suzhou
3   changzhou
4   xuzhou

主体代码

package recovery;import modules.env.Environments;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.JobListener;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import scala.Tuple3;import javax.annotation.Nullable;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;/*** 缓存文件的注册与获取*/
public class TestCache {public static void main(String[] args) throws Exception {// 创建环境StreamExecutionEnvironment see = new Environments().build().enableCheckpoint("file:///D:/phase/flink_state_backend", 3, 1, 1).enableRetries(3, 1).enableStateBackend("hashmap", true, false).finish(RuntimeExecutionMode.STREAMING, 1, 3);// 1.注册缓存文件String path = Thread.currentThread().getContextClassLoader().getResource("district.txt").getPath();// 获取静态文件district.txt的路径see.registerCachedFile(path,"district"); // 缓存至环境中// 2.注册任务侦听器see.registerJobListener(new JobListener() {@Overridepublic void onJobSubmitted(@Nullable JobClient jobClient, @Nullable Throwable throwable) {// 任务提交时// 任务正常:输出jobClient,任务异常:throwableif (Objects.nonNull(jobClient)) {// 输出IDSystem.out.println(jobClient.getJobID().toString());// 输出状态try {System.err.println(jobClient.getJobStatus().get(10, TimeUnit.SECONDS).name());} catch (Exception e) {System.err.println(e.getMessage());}}else if (Objects.nonNull(throwable)) {// 异常不为空System.err.println(throwable.getMessage());}}@Overridepublic void onJobExecuted(@Nullable JobExecutionResult jobExecutionResult, @Nullable Throwable throwable) {// 任务执行// 任务正常:输出jobExecutionResult,任务异常:throwableif (Objects.nonNull(jobExecutionResult)) {System.out.println(jobExecutionResult);}else if (Objects.nonNull(throwable)){System.err.println(throwable.getMessage());}}});// 3.数据:ID,温度,时间戳// 生成水位线TimestampAssignerSupplier<Tuple3> supplier = new TimestampAssignerSupplier<Tuple3>() {@Overridepublic TimestampAssigner<Tuple3> createTimestampAssigner(Context context) {return (element,recordTimestamp) -> (Long) element._3();}};WatermarkStrategy<Tuple3> watermark = WatermarkStrategy.<Tuple3>forMonotonousTimestamps().withTimestampAssigner(supplier);// 数据see.fromCollection(Arrays.asList(new Tuple3(1,34,System.currentTimeMillis()),new Tuple3(2,36,System.currentTimeMillis()+1000),new Tuple3(1,35,System.currentTimeMillis()+2000),new Tuple3(3,32,System.currentTimeMillis()+3000),new Tuple3(2,33,System.currentTimeMillis()+4000)))// 4.将缓存文件中地址内容来替代数据中的ID号【通过ID关联】.setParallelism(1).assignTimestampsAndWatermarks(watermark).map(new RichMapFunction<Tuple3, Tuple3>() {Map<Integer,String> idName = new HashMap<>(); // 全局Map// 初始化资源@Overridepublic void open(Configuration parameters) throws Exception {// 读取缓存文件File district = getRuntimeContext().getDistributedCache().getFile("district");try(BufferedReader br = new BufferedReader(new FileReader(district))){ // 会自动释放()内资源String line;while (Objects.nonNull(line = br.readLine())) {String[] s = line.split("\\s+");idName.put(Integer.valueOf(s[0]),s[1]);}}catch (Exception ex){ex.printStackTrace();}}@Overridepublic Tuple3 map(Tuple3 value) throws Exception {return new Tuple3(idName.get(value._1()),value._2(),value._3());}// 释放资源@Overridepublic void close() throws Exception {idName.clear();}}).print();see.execute("cache-test");}
}

结果展示

(nanjing,34,1727094791401)
(suzhou,36,1727094792401)
(nanjing,35,1727094793401)
(changzhou,32,1727094794401)
(suzhou,33,1727094795401)

方式二:广播变量

主要代码

package recovery;import modules.env.Environments;
import modules.time.Timer;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.WindowStagger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import scala.Tuple2;
import scala.Tuple3;import java.util.Arrays;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;/*** 广播变量的发送与获取* 连接流 connect*/
public class TestBroadcastConnect {public static void main(String[] args) throws Exception {// 1.创建环境StreamExecutionEnvironment see = new Environments().build().enableCheckpoint("file:///D:/phase/flink_state_backend", 3, 1, 1).enableRetries(3, 1).enableStateBackend("hashmap", true, false).finish(RuntimeExecutionMode.STREAMING, 1, 3);// 2.广播变量MapStateDescriptor desc1 = new MapStateDescriptor("idCity", Integer.class, String.class); // 描述特征BroadcastStream<Tuple2> broadcastStream = see.fromCollection(Arrays.asList(// 广播出去的内容new Tuple2(1, "nanjing"),new Tuple2(2, "suzhou"),new Tuple2(3, "wuxi"))).broadcast(desc1); // 广播流// 3.数据:ID,温度,时间戳see.fromCollection(Arrays.asList(new Tuple3(1,34,System.currentTimeMillis()),new Tuple3(2,36,System.currentTimeMillis()+1000),new Tuple3(1,35,System.currentTimeMillis()+2000),new Tuple3(3,32,System.currentTimeMillis()+3000),new Tuple3(2,33,System.currentTimeMillis()+4000))).setParallelism(1).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3>forMonotonousTimestamps().withTimestampAssigner((SerializableTimestampAssigner<Tuple3>) (element,recordTimestamp) -> (Long) element._3()))// 4.连接流:与广播流数据进行连接(获取广播变量,变为广播连接流).connect(broadcastStream)// 5.将广播变量中地址内容来替代数据中的ID号【通过ID关联】.process(new BroadcastProcessFunction<Tuple3, Tuple2, Tuple3>() {@Overridepublic void processElement(Tuple3 value, BroadcastProcessFunction<Tuple3, Tuple2, Tuple3>.ReadOnlyContext ctx, Collector<Tuple3> out) throws Exception {Object v = ctx.getBroadcastState(desc1).get(value._1()); // 取out.collect(new Tuple3(v,value._2(),value._3()));}@Overridepublic void processBroadcastElement(Tuple2 value, BroadcastProcessFunction<Tuple3, Tuple2, Tuple3>.Context ctx, Collector<Tuple3> out) throws Exception {ctx.getBroadcastState(desc1).put(value._1,value._2); // 存}})// 6.业务: 平均温度.keyBy(t3->t3._1().toString()).window(Timer.tumbling(5,0,TimeUnit.SECONDS ,WindowStagger.NATURAL)).process(new ProcessWindowFunction<Tuple3, Tuple2, String, TimeWindow>() {@Overridepublic void process(String city, ProcessWindowFunction<Tuple3, Tuple2, String, TimeWindow>.Context context, Iterable<Tuple3> elements, Collector<Tuple2> out) throws Exception {float avg = 0.0f;int count = 0;Iterator<Tuple3> it = elements.iterator();while(it.hasNext()){count++;avg += (Integer) it.next()._2();}avg /= count;// 将平均温度往后送out.collect(new Tuple2(city,avg));}})// 相当于print()操作.addSink(new SinkFunction<Tuple2>() {@Overridepublic void invoke(Tuple2 value, Context context) throws Exception {System.out.println(value);}});see.execute("broadcast-connect");}
}

结果展示

(nanjing,34.5)
(suzhou,36.0)
(wuxi,32.0)
(suzhou,33.0)

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com