Flume数据采集框架的使用

标签:无 1299人阅读 评论(0)
分类:

1. 概述

- Flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。

- Flume可以采集文件,socket数据包、文件、文件夹、kafka等各种形式源数据,又可以将采集到的数据(下沉sink)输出到HDFS、hbase、hive、kafka等众多外部存储系统中

- 一般的采集需求,通过对flume的简单配置即可实现

- Flume针对特殊场景也具备良好的自定义扩展能力,因此,flume可以适用于大部分的日常数据采集场景


2. 运行机制

flume的核心是把数据从数据源(source)收集过来,在将收集到的数据送到指定的目的地(sink)。为了保证输送的过程一定成功,在送到目的地(sink)之前,会先缓存数据(channel),待数据真正到达目的地(sink)后,flume在删除自己缓存的数据。

在整个数据的传输的过程中,流动的是event,即事务保证是在event级别进行的。event将传输的数据进行封装,是flume传输数据的基本单位,如果是文本文件,通常是一行记录,event也是事务的基本单位。event从source,流向channel,再到sink,本身为一个字节数组,并可携带headers(头信息)信息。event代表着一个数据的最小完整单元,从外部数据源来,向外部的目的地去。 一个完整的event包括:event headers、event body、event信息(即文本文件中的单行记录)

- Flume分布式系统中最核心的角色是agent,flume采集系统就是由一个个agent所连接起来形成的

- 每一个agent相当于一个数据传递员,内部有三个组件:

  - Source:采集组件,用于跟数据源对接,以获取数据

  - Sink:下沉组件,用于往下一级agent传递数据或者往最终存储系统传递数据

  - Channel:传输通道组件,用于从source将数据传递到sink


1.jpg



3. Flume采集系统结构图

1. 简单结构

- 单个agent采集数据


2.jpg



2. 复杂结构

- 两个agent之间串联


3.png



- 多级agent之间串联


4.png



- 多级channel



5.png


例子


监控一个文件实时采集新增的数据输出到Kafka

FLume采用 exec source + memory channel+ kafka sink


配置文件:

---------------------------------------------------



# 给agent命名

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# 配置source

a1.sources.r1.type = exec

a1.sources.r1.command = tail -F /Users/null/data/flume-test.log

a1.sources.r1.channels = c1

# 配置sink

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

a1.sinks.k1.topic = kafka-test

a1.sinks.k1.brokerList = localhost:9092

a1.sinks.k1.requiredAcks = 1

a1.sinks.k1.batchSize = 2

a1.sinks.k1.channel = c1

# 配置memory channel

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

# 配置关联

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1


---------------------------------------------------


启动zookeeper

Kafka需要zookeeper环境

bin/zkServer.sh start

启动Kafka

bin/kafka-server-start.sh config/server.properties

创建topic

kafka-test 为topic 名称与flume 配置中a1.sinks.k1.topic 对应

kafka-topics.sh --list --zookeeper localhost:2181 kafka-test

启动一个Consumer

kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafka-test --from-beginning

启动FLume

--conf 指定配置文件所在位置 --conf-file为指定配置文件 --name 为agent的名称

flume-ng agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/exec-memory-kafca.conf --name a1 -Dflume.root.logger=INFO,console


在监控的文件中添加内容,可以查看到kafka发出并消费


查看评论

暂无评论

发表评论
  • 评论内容:
      
首页
团队介绍
发展历史
组织结构
MESA大事记
新闻中心
通知
组内动态
科研成果
专利
论文
项目
获奖
软著
人才培养
MESA毕业生
MESA在读生
MESA员工
招贤纳士
走进MESA
学长分享
招聘通知
招生宣传
知识库
文章
地址:北京市朝阳区华严北里甲22号楼五层 | 邮编:100029
邮箱:nelist@iie.ac.cn
京ICP备15019404号-1