您的位置:首页 > 健康 > 美食 > 做app推广被警察传唤_大连房产网_网站流量分析的指标有哪些_西安做网站公司

做app推广被警察传唤_大连房产网_网站流量分析的指标有哪些_西安做网站公司

2025/1/10 8:58:44 来源:https://blog.csdn.net/qq_45972323/article/details/144817928  浏览:    关键词:做app推广被警察传唤_大连房产网_网站流量分析的指标有哪些_西安做网站公司
做app推广被警察传唤_大连房产网_网站流量分析的指标有哪些_西安做网站公司

医疗数仓配置Flume

  • Flume配置概述

Flume配置概述

Flume需要将Kafka中各topic的数据传输到HDFS,因此选用KafkaSource以及HDFSSink。对于安全性要求高的数据(不允许丢失)选用FileChannel,允许部分丢失的数据如日志可以选用MemoryChannel以追求更高的效率。此处采集的是业务数据,不允许丢失,选用FileChannel,生产环境根据实际情况选择合适的组件。

KafkaSource订阅Kafka medical_ods主题的数据,HDFSSink将不同topic的数据写入不同路径,路径中应包含表名及日期,前者用于区分来源于不同业务表的数据,后者按天对数据进行划分。关键配置如下:
在这里插入图片描述
在这里插入图片描述
2)Flume配置实操
(1)创建Flume配置文件
在hadoop104节点的Flume家目录下创建job目录,在job下创建medical_kafka_to_hdfs.conf。

[atguigu@hadoop104 flume]$ mkdir job 
[atguigu@hadoop104 flume]$ cd job/
[atguigu@hadoop104 job]$ vim medical_kafka_to_hdfs.conf 

(2)配置文件内容如下

a1.sources = r1
a1.channels = c1
a1.sinks = k1a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.topics = topic_db
a1.sources.r1.kafka.consumer.group.id = medical-flume
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.medical.flume.interceptors.TimestampAndTableNameInterceptor$Buildera1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/medical
a1.channels.c1.dataDirs = /opt/module/flume/data/medical
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/medical/%{tableName}_inc/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = db
a1.sinks.k1.hdfs.round = falsea1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1

(3)编写Flume拦截器
① 创建名为medical-flume-interceptor的项目
在这里插入图片描述
② 在pom文件中添加如下内容。

<dependencies><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.10.1</version><scope>provided</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.68</version></dependency>
</dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.0.0</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins>
</build>

③ 在com.atguigu.medical.flume.interceptors包下创建TimestampAndTableNameInterceptor类

package com.atguigu.medical.flume.interceptors;import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;public class TimestampAndTableNameInterceptor implements Interceptor {@Overridepublic void initialize() {}@Overridepublic Event intercept(Event event) {Map<String, String> headers = event.getHeaders();String log = new String(event.getBody(), StandardCharsets.UTF_8);JSONObject jsonObject = JSONObject.parseObject(log);Long ts = jsonObject.getLong("ts");String tableName = jsonObject.getString("table");headers.put("timestamp", ts * 1000 + "");headers.put("tableName", tableName);return event;}@Overridepublic List<Event> intercept(List<Event> events) {for (Event event : events) {intercept(event);}return events;}@Overridepublic void close() {}public static class Builder implements Interceptor.Builder {@Overridepublic Interceptor build() {return new TimestampAndTableNameInterceptor();}@Overridepublic void configure(Context context) {}}
}

④ 打包
在这里插入图片描述
⑤ 在target目录下查看打好的包
在这里插入图片描述
⑥ 将打好的包放入到hadoop104的/opt/module/flume/lib文件夹下

[atguigu@hadoop104 lib]$ ls | grep medical-flume-interceptor
medical-flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar

3)通道测试
(1)启动Zookeeper、Kafka集群
(2)启动hadoop104的Flume
[atguigu@hadoop104 flume]$ bin/flume-ng agent -n a1 -c conf/ -f job/medical_kafka_to_hdfs.conf -Dflume.root.logger=INFO,console
(3)执行模拟数据jar包,生成数据
[atguigu@hadoop102 bin]$ medical_mock.sh 1
(4)观察HDFS上的目标路径,如下。
在这里插入图片描述
增量表目录已生成。

4)编写Flume启停脚本
为方便使用,此处编写一个Flume的启停脚本
(1)在hadoop102节点的/home/atguigu/bin目录下创建脚本medical-f1.sh
[atguigu@hadoop102 bin]$ vim medical-f1.sh
在脚本中填写如下内容

#!/bin/bashcase $1 in
"start")echo " --------启动 hadoop104 业务数据flume-------"ssh hadoop104 "nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf -f /opt/module/flume/job/medical_kafka_to_hdfs.conf > /opt/module/flume/medical-f1.log 2>&1 &"
;;
"stop")echo " --------停止 hadoop104 业务数据flume-------"ssh hadoop104 "ps -ef | grep medical_kafka_to_hdfs | grep -v grep |awk '{print \$2}' | xargs -n1 kill"
;;
esac

(2)增加脚本执行权限
[atguigu@hadoop102 bin]$ chmod +x medical-f1.sh
(3)medical-f1启动
[atguigu@hadoop102 module]$ medical-f1.sh start
(4)medical-f1停止
[atguigu@hadoop102 module]$ medical-f1.sh stop

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com