flume的工作流程及操作

1 什么是flume

Flume是一个分布式的、可靠的、高可用的数据采集、聚合和大型数据很迁移(log)系统。

web server.log

ipaddr visittime visiturl duration
ip地址 访问时间 来访url 停留时长

这些日志有啥用?

2. 什么时候用Flume?

需要进行数据采集时。

3 Flume体系

img

组件内部的关系

  1. 一个Source可以将events传送给一个或者多个Channel,通常一个Source对应一个Channel
  2. 一个Channel可以接入多个Sources,即多个Sources可以将events发给一个Channel。同时多个Sinks可以从一个Channel中消费消息
  3. Sink的上游为Channel,一个Sink只能从一个Channel中消费消息。
  4. Source将消息传送给Channel,以及Sink从Channel中消费消息,均为在内部的事务中进行。

source 和 sink 是异步执行的

#### 3.1 可靠性

​ events 存储在每个agent的channel中。然后events事件流到下一个agent或终端如hdfs中。只有当这些events存储在hdfs或下一个agent的channel中才会将原来的events删除。

​ flume 通过事务保证event交付的可靠性。

3.2 可恢复性

​ Flume支持一个由本地文件系统支持的持久化文件通道。

​ 将event保存在内存中,如果发生故障数据是无法恢复的。当然在内存中速度会更快。

3.3 Flume常用组件

4 Flume 常用操作

4.1 常用配置

# 每个组件的名称
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# netcat监控方式监控的iplocalhost端口44444
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# sink 的方式 logger
a1.sinks.k1.type = logger

# 写入到内存
a1.channels.c1.type = memory

# 绑定source和sink到channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

4.2 Source

Exec source

通过Linux命令(cat [name pipe] or tail -F [file])监控指定文件的日志, 如果源意外退出,将不会产生更多数据。

a1.sources = r1
a1.channels = c1
# 指定类型、命令
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/secure

a1.sources.r1.channels = c1

Exec Source详细参数

Spooling Directory Source

允许将数据放在磁盘上,比exec source更具可靠性,数据不会丢失,但效率更比exec source低。

a1.channels = ch-1
a1.sources = src-1

fs.sources.r3.type=spooldir
fs.sources.r3.spoolDir=/opt/modules/apache-flume-1.6.0-bin/flume_template
fs.sources.r3.fileHeader=true
fs.sources.r3.ignorePattern=^(.)*\\.out$ # 过滤out结尾的文件

Spooling Directory Source 详细参数

4.3 Channel

Memory Channel

数据在内存中,临时存储

a1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000

Memory Channel 详细参数

File Channel

先把数据存储到本地,再进行数据传输,增加可靠性

a1.channels = c1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
a1.channels.c1.dataDirs = /mnt/flume/data

File Channel 详细参数

Sink

Info级别日志等级,通常用于测试环境/调试

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1

HDFS Sink

事件下沉到HDFS上。它支持序列文件和txt文本文件。

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute

详细参数

5 实战

5.1 hive source(实时采集hive日志)

hive.sources = r2
hive.channels = c2
hive.sinks = k2

# define sources
hive.sources.r2.type = exec
hive.sources.r2.command = tail -F /var/log/secure
hive.sources.r2.shell = /bin/bash -c
hive.sources.r2.logStdErr = false
hive.sources.r2.channels = c2

# def channels
hive.channels.c2.type = memory
hive.channels.c2.capacity = 10000
hive.channels.c2.transactionCapacity = 100

# def sinks
hive.sinks.k2.type = hdfs
hive.sinks.k2.hdfs.path = hdfs://hadoop1:8020/user/root/flume-hive/%y-%m-%d/%H%M/%S
hive.sinks.k2.hdfs.fileType = DataStream
hive.sinks.k2.hdfs.writeFormat = Text
hive.sinks.k2.hdfs.round = true
hive.sinks.k2.hdfs.roundValue = 10
hive.sinks.k2.hdfs.roundUnit = minute
hive.sinks.k2.hdfs.rollInterval = 30
hive.sinks.k2.hdfs.rollSize = 0
hive.sinks.k2.hdfs.rollCount = 0
hive.sources.r2.channels = c2

Flume结构以及使用