在当今大数据时代,有效管理和处理海量日志数据对于企业获取洞察和保持运营效率至关重要。为此目的设计的强有力工具之一是 Apache Flume。本文将带您探索Flume,了解其功能、安装方法以及一些实际用例,以展示其在处理日志数据方面的有效性。
Flume简介
Apache Flume 是一个分布式、可靠且可用的服务,旨在高效地收集、聚合和移动来自各种来源的大量日志数据到集中式数据存储。它在处理管道中扮演着关键角色,充当数据流的导管。
Flume 常被形象地比喻为“水管”,一端连接着数据源(水源),另一端连接着数据存储(水桶)。它特别适合于实时日志数据的收集与传输。
Flume的核心组件
- Agent:Flume的基本工作单元,每个Flume配置文件可以包含一个或多个Agent。
- Source:数据的来源,可以是日志文件、网络端口等。
- Channel:数据的通道,用于暂存从Source到Sink的数据。
- Sink:数据的目的地,可以是HDFS、HBase或其他存储系统。
- Event:Flume中数据的基本单位。
Flume的关键特性
- 可靠的数据处理: Flume确保在传输过程中不会丢失数据,提供端到端的可靠性。
- 持久性: 它使用通道支持持久化存储,确保在故障情况下数据不会丢失。
- 灵活性: Flume可以通过各种源、通道和接收器进行定制,以适应特定的数据流需求。
- 可扩展性: 它旨在随着数据量的增加而扩展,适合大规模数据处理。
安装和配置
通过网盘分享的文件
apache-flume-1.9.0-bin.tar.gz
安装Flume
Flume的安装相对简单,以Flume 1.9.0版本为例
下载并解压
把安装包拉到/opt/modules目录下
进入/opt/modules目录下
tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /opt/installs/
进入/opt/installs目录下
mv apache-flume-1.9.0-bin/ flume
配置环境变量
进入/etc/profile配置环境变量
export FLUME_HOME=/opt/installs/flume
export PATH=$PATH:$FLUME_HOME/bin
使用命令配置环境变量
echo 'export FLUME_HOME=/opt/installs/flume' >> /etc/profile
echo 'export PATH=$PATH:$FLUME_HOME/bin' >> /etc/profile
刷新环境变量
source /etc/profile
修改配置文件
在/opt/installs/flume/conf路径下复制并修改flume-env.sh
文件。
cp flume-env.sh.template flume-env.sh
修改 JAVA_HOME 的路径为自己的 jdk 路径。
export JAVA_HOME=/opt/installs/jdk
Flume的数据模型
Flume支持单一数据流和多数据流模型,可以根据实际需求灵活配置。
单一数据流模型
单一数据流模型包含一个Agent,适用于简单的日志收集场景。
多数据流模型
多数据流模型可以包含多个Agent,Agent之间可以进行数据的流转和处理。
Flume的使用
Flume的使用主要依赖于配置文件,通过定义Source、Channel和Sink的组件及其关系来实现数据的流动。
编写 conf文件
flume 的使用是编写 conf文件的,运行的时候指定该文件
# 定义组件的名字
<Agent>.sources = <Source>
a1.sources=s1
<Agent>.channels = <Channel1> <Channel2>
a1.channels=c1
<Agent>.sinks = <Sink>
a1.sinks=sink1# 设置source 和 channel 之间的关系
<Agent>.sources.<Source>.channels = <Channel1> <Channel2> ...
a1.sources.s1.channels=c1# 设置sink 和 channel 之间的关系
<Agent>.sinks.<Sink>.channel = <Channel1>
a1.sinks.sink1.channel=c1先定义agent的名字,再定义agent中三大组件的名字
接着定义各个组件之间的关联关系
# list the sources, sinks and channels for the agent
agent_foo.sources = avro-appserver-src-1
agent_foo.channels = mem-channel-1
agent_foo.sinks = hdfs-sink-1# set channel for source
agent_foo.sources.avro-appserver-src-1.channels = mem-channel-1# set channel for sink
agent_foo.sinks.hdfs-sink-1.channel = mem-channel-1
常见的组件
参考网址:Flume 1.9用户手册中文版 — 可能是目前翻译最完整的版本了
常见的Source
常见的channel
常见的sink
总结:Kafka 可以充当flume中的各个组件。三个组件可以两两组合在一起,所以使用场景非常的多。
案例展示
1.)Avro+Memory+Logger(Avro + 内存 + 日志)
此设置适用于演示目的。它监听指定端口,通过内存收集数据,并将其记录到控制台。
先找source 中的avro看需要设置什么参数
#编写s1的类型是什么
a1.sources.s1.type = avro
a1.sources.s1.bind = 192.168.32.128
a1.sources.s1.port = 4141
a1.sources.s1.channels = c1
找到channel中的memory类型,再设置一下
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
#source 或者 sink 每个事务中存取 Event 的操作数量
a1.channels.c1.transactionCapacity = 10000
接着查找sink,sink的类型是logger
a1.sinks.s2.channel = c1
a1.sinks.s2.type = logger
最终合并起来的文件就是:
配置
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.bind = localhost
a1.sources.r1.port = 4141
a1.sources.r1.channels = c1
a1.channels.c1.type = memory
a1.sinks = k1
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1
在flume文件夹下创建一个文件夹 myconf,用于存放我们写好的文件
进入后创建 avro-memory-log.conf
将配置文件的内容拷贝进去
启动命令
flume-ng agent -c ../ -f avro-memory-log.conf -n a1 -Dflume.root.logger=INFO,console
-c 后面跟上 配置文件的路径
-f 跟上自己编写的conf文件
-n agent的名字
-Dflume.root.logger=INFO,console INFO 日志输出级别 Debug,INFO,warn,error 等接着向端口中发送数据:
flume-ng avro-client -c /opt/installs/flume/conf/ -H bigdata01 -p 4141 -F /home/hivedata/arr1.txt
给avro发消息,使用avro-client
flume是没有运行结束时间的,它一直监听某个Ip的端口,有消息就处理,没消息,就等着,反正不可能运行结束。
如果想停止,可以使用ctrl + c 终止flume。
2)Exec + Memory + HDFS(执行命令 + 内存 + HDFS)
此配置用于持续监控日志文件,并将它们存储在HDFS中。
配置
以下版本演示的是没有时间语义的案例:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hivedata/arr1.txt
a1.sources.r1.channels = c1a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/event/
接着我们演示hdfs文件中含有时间转义字符怎么办?
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/data/c.txta1.sources.r1.channels = c1a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/
# round 用于控制含有时间转义符的文件夹的生成规则
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
# 使用本地时间,否则会报时间戳是null的异常
a1.sinks.k1.hdfs.useLocalTimeStamp=true
在flume文件夹下创建一个文件夹 myconf,用于存放我们写好的文件
进入后创建 exec-memory-hdfs.conf
将配置文件的内容拷贝进去
启动命令
flume-ng agent -c ./ -f exec-memory-hdfs.conf -n a1 -Dflume.root.logger=INFO,console
假如hdfs中有时间转义字符,必须给定时间,给定时间有两种方式,要么通过header 传递一个时间,要么使用本地时间。
假如hdfs中使用了时间转义字符,配置文件中必须二选一:
1)useLocalTimeStamp=true
2)使用时间戳拦截器时间需要转义,没有时间无法翻译为具体的值 %d 就无法翻译为 日期
实现不断的向a.txt中存入数据的效果
echo "Hello World" >> a.txt
3)Spool +File + HDFS(Spooldir + 文件 + HDFS)
此设置非常适合处理包含多个不断更新的日志文件的目录。
配置
a1.channels = ch-1
a1.sources = src-1a1.sources.src-1.type = spooldir
a1.sources.src-1.channels = ch-1
a1.sources.src-1.spoolDir = /home/scripts/
a1.sources.src-1.fileHeader = truea1.channels.ch-1.type = filea1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = ch-1
a1.sinks.k1.hdfs.path = /flume/
在flume文件夹下创建一个文件夹 myconf,用于存放我们写好的文件
进入后创建 Spool_File_hdfs.conf
将配置文件的内容拷贝进去
启动命令
flume-ng agent -c ./ -f Spool_File_hdfs.conf -n a1 -Dflume.root.logger=INFO,console
数据采集的意思:不管是一个文件还是一个文件夹,数据都是不断产生的,特别是生产环境下更是如此。
以上的采集只能采集到文件夹中是否有新的文件产生,不能采集变化的文件。
抽取一个文件夹中的所有文件,子文件夹中的文件是不抽取的,抽取过的文件,数据发生了变化,也不会再抽取一次。
假如你的channel是 file类型,必定会有临时文件产生,产生的文件在
总结:
channel 一般常用的只要两个,一个是memory,一个是file ,最高效的是memory ,也是最常用的。
4)TailDir+Memory+HDFS(TailDir+日志+HDFS)
这个案例演示了如何监控文件内容的变化,将变化的内容实时上传至HDFS。
配置
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = TAILDIR
a1.sources.r1.channels = c1
a1.sources.r1.filegroups = f1
# . 代表的意思是一个任意字符 * 代表前面的字符出现0到多次
a1.sources.r1.filegroups.f1 = /home/scripts/datas/.*txt.*a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume3/logs
在flume文件夹下创建一个文件夹 myconf,用于存放我们写好的文件
进入后创建 taildir_memory_hdfs.conf
将配置文件的内容拷贝进去
启动命令
flume-ng agent -c ./ -f taildir_memory_hdfs.conf -n a1 -Dflume.root.logger=INFO,console
数据不断的发生变化,每发生一次变化,就传递一次,源源不断的抽取出来。
刚才为什么抽取了那么多个文件还没有抽取完?
由于我们刚才的一些数据非常的大,根据如下参数可以疯狂创建文件:
hdfs.rollInterval 30 当前文件写入达到该值时间后触发滚动创建新文件(0表示不按照时间来分割文件),单位:秒
hdfs.rollSize 1024 当前文件写入达到该大小后触发滚动创建新文件(0表示不根据文件大小来分割文件),单位:字节
hdfs.rollCount 10 当前文件写入Event达到该数量后触发滚动创建新文件(0表示不根据 Event 数量来分割文件)再次抽取,发现不抽了,原因是有一个文件记录了以前抽取过的内容,将其删除:
rm -rf /root/.flume/taildir_position.json修改1.txt 里面的内容,flume会再次抽取数据
echo "hello" >> 1.txt
视频链接
01-flume的介绍_哔哩哔哩_bilibili
02-flume的安装_哔哩哔哩_bilibili
03-flume的数据模型_哔哩哔哩_bilibili
04-flume中的conf的语法_哔哩哔哩_bilibili
05-avro+mem+log的演示_哔哩哔哩_bilibili
06-vi和touch创建文件后有何不同_哔哩哔哩_bilibili
07-exec+mem+hdfs的演示_哔哩哔哩_bilibili
08-spool+file+hdfs_哔哩哔哩_bilibili
09-taildir+mem+hdfs_哔哩哔哩_bilibili
结论
Apache Flume是一个多功能工具,它简化了收集、聚合和移动大量日志数据的过程。其在配置上的灵活性和数据处理的稳健性使其成为处理大数据的理想选择。无论您是在实时监控日志文件还是在多个源中聚合数据,Flume都提供了一个可扩展且可靠的解决方案。本文介绍了Flume的基础概念、安装配置以及几个典型的使用案例,希望能够帮助读者更好地理解和使用Flume。