Flume数据采集框架的使用
1. 概述
- Flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。
- Flume可以采集文件,socket数据包、文件、文件夹、kafka等各种形式源数据,又可以将采集到的数据(下沉sink)输出到HDFS、hbase、hive、kafka等众多外部存储系统中
- 一般的采集需求,通过对flume的简单配置即可实现
- Flume针对特殊场景也具备良好的自定义扩展能力,因此,flume可以适用于大部分的日常数据采集场景
2. 运行机制
flume的核心是把数据从数据源(source)收集过来,在将收集到的数据送到指定的目的地(sink)。为了保证输送的过程一定成功,在送到目的地(sink)之前,会先缓存数据(channel),待数据真正到达目的地(sink)后,flume在删除自己缓存的数据。
在整个数据的传输的过程中,流动的是event,即事务保证是在event级别进行的。event将传输的数据进行封装,是flume传输数据的基本单位,如果是文本文件,通常是一行记录,event也是事务的基本单位。event从source,流向channel,再到sink,本身为一个字节数组,并可携带headers(头信息)信息。event代表着一个数据的最小完整单元,从外部数据源来,向外部的目的地去。 一个完整的event包括:event headers、event body、event信息(即文本文件中的单行记录)
- Flume分布式系统中最核心的角色是agent,flume采集系统就是由一个个agent所连接起来形成的
- 每一个agent相当于一个数据传递员,内部有三个组件:
- Source:采集组件,用于跟数据源对接,以获取数据
- Sink:下沉组件,用于往下一级agent传递数据或者往最终存储系统传递数据
- Channel:传输通道组件,用于从source将数据传递到sink
3. Flume采集系统结构图
1. 简单结构
- 单个agent采集数据
2. 复杂结构
- 两个agent之间串联
- 多级agent之间串联
- 多级channel
例子
监控一个文件实时采集新增的数据输出到Kafka
FLume采用 exec source + memory channel+ kafka sink
配置文件:
---------------------------------------------------
# 给agent命名
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 配置source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /Users/null/data/flume-test.log
a1.sources.r1.channels = c1
# 配置sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = kafka-test
a1.sinks.k1.brokerList = localhost:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 2
a1.sinks.k1.channel = c1
# 配置memory channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 配置关联
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
---------------------------------------------------
启动zookeeper
Kafka需要zookeeper环境
bin/zkServer.sh start
启动Kafka
bin/kafka-server-start.sh config/server.properties
创建topic
kafka-test 为topic 名称与flume 配置中a1.sinks.k1.topic 对应
kafka-topics.sh --list --zookeeper localhost:2181 kafka-test
启动一个Consumer
kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafka-test --from-beginning
启动FLume
--conf 指定配置文件所在位置 --conf-file为指定配置文件 --name 为agent的名称
flume-ng agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/exec-memory-kafca.conf --name a1 -Dflume.root.logger=INFO,console
在监控的文件中添加内容,可以查看到kafka发出并消费