Flume简介 Flume是由cloudera软件公司产出的可分布式日志收集系统,后与2009年被捐赠了Apache软件基金会,为Hadoop相关组件之一。尤其近几年随着Flume的不断完善以及升级版本的逐一推出,特别是Flume-ng;同时Flume内部的各种组件不断丰富,用户再开发的过程中使用的便利性得到很大的改善,现已成为Apache top项目之一。
Flume
定义:Apache Flume是一个从可以收集例如日志,事件等数据资源,并将这些数量庞大的数据从各项数据资源中集中起来存储的工具/服务,或者数集中机制;Flume是分布式的日志收集系统,它将各个服务器中的数据收集起来并送到目的地。
Flume结构 Flume内部有一个或者多个Agent,然而对于每一个Agent来说,它就是一共独立的守护进程(JVM),它从客户端哪儿接收收集,或者从其他的Agent哪儿接收,然后迅速的将获取的数据传给下一个目的节点sink,或者Agent
Flume的Agent(主要由:source、channel、sink三个组件组成)
Source :采集源,用于跟数据源对接,以获取数据
从数据发生器接收数据,并将接收的数据以Flume的event格式 传递给一个或者多个通道channal,Flume提供多种数据接收的方式,比如Avro、Thrift等
Channel :存储容器,它将从source处接收到的event格式的数据缓存起来,直到它们被sinks消费掉
channel是一个完整的事务,这一点保证了数据在收发的时候的一致性,并且它可以和任意数量的source和sink链接,支持的类型有:JDBC channel,File System channel、Memort channel等
Sink :下沉地,采集数据的传送目的地,用于往下一级Agent传递数据或者最终存储系统传递数据
sink将数据存储到集中存储器比如HBase和HDFS,它从channels消费数据(events)并将其传递给目标地
Flume就是将数据从数据源(source)收集过来,Flume会先缓存数据到channel,再将收集到的数据送到指定的目的地(sink),最后Flume在删除自己缓存的数据
Flume事件 事件作为Flume内部数据传输的最基本单元,它是由一个转载数据的字节数组(该数据组是从数据源接入点传入,并传输给传输器,也就是HDFS/HBase和一个可选头部构成
一个完整的event包括:event header、event body信息,其中event信息就是Flume收集到的日志记录
event将传输的数据进行封装。如果是文本文件,通常是一行记录,event也是十五的基本单位。event从source,流向chanel,再到sink,本身为一个字节数组,并可携带headers(头信息)信息,event代表着一个数据的最小完整单元,从外部数据源来,向外部的目的地去。
Flume采集结构图
安装
前置条件
Java 1.7 or later
为sources,channels和sinks提供充足的内存
为channles提供充足的磁盘空间
为agent提供读和写权限
Flume 的安装非常简单,上传安装包到数据源所在节点上然后解压 tar -zxvf apache-flume-1.9.0-bin.tar.gz
,然后进入 flume 的目录,修改 conf 下的 flume-env.sh,在里面配置 JAVA_HOME。配置flume环境变量:
1 2 3 4 5 vi ~/.bash_profile export FLUME_HOME=/root/bigdata/flume/bin export PATH=$FLUME_HOME/bin:$PATH source /root/.bash_profile
检查是否配置成功:flume-ng version查看flume版本。根据数据采集需求配置采集方案 ,描述在配置文件中(文件名可任意自定义)
Flume使用 在Flume配置文件中,需要
需要命名当前使用的Agent的名称
命名Agent下的source的名字
命名Agent下的channel的名字
命名Agent下的sink的名字
将source和sink通过channel绑定起来
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 # 为source、channel、sink命名 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 定义采集源 a1.sources.r1.type = netcat a1.sources.r1.bind = 127.0.0.1 a1.sources.r1.port = 44444 # 定义下沉地 a1.sinks.k1.type = logger # 定义channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 把source、sink通过channel连接起来 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
启动agent命令
1 flume-ng路径 -c conf -f 上方配置文件路径 -n a1 -Dflume.root.logger=INFO,console
-c conf:指定Flume自身的配置文件
-f conf/netcat-logger.conf:指定我们所描述的采集方案
-n a1:指定Agent的名字
测试
1 2 3 4 telnet 127.0.0.1 44444 1234 19/06/26 17:27:25 INFO sink.LoggerSink: Event: { headers:{} body: 31 32 33 34 0D 1234. }
Flume可选配置参数
Flume支持各种各样的source,sinks,channels,它们支持的类型如下:
Source
Avro Source序列化数据源
ThriftSource序列化数据源
Exec Source 执行Linux命令行的数据源
NETCAT Source通过指定端口,ip监控的数据源
Kafka Source直接对接Kafka的数据源
自定义Source
Channel
Memory Channel 内存
File Channel 磁盘
Kafka Channel 存在kafka
JDBC Channel
Sink
HDFS Sink写入到HDFS
Hive Sink 写入到Hive
Avro Sink 写入到序列化
HBase Sinks写入到HBase
HBase Sink 同步写入到HBase
Async HBase Sink 异步写入到HBase
配置过程
一般Flume中会存在多个Agent,所以需要分别取名字来区分他们,名字不能重复
1 2 3 4 5 # Agent取名为 agent_name # source 取名为 source_name ,一次类推agent_name.source = source_name ,source_name1 agent_name.channels = channel_name,channel_name1 agent_name.sinks = sink_name,sink_name1
1 2 3 agent_name.sources. source_name.type = value agent_name.sources. source_name.property2 = value agent_name.sources. source_name.property3 = value
Flume在source和sink配间提供各种管道来传递数据
1 2 3 agent_name.channels.channel_name.type = value agent_name.channels.channel_name. property2 = value agent_name.channels.channel_name. property3 = value
1 2 3 agent_name.sinks. sink_name.type = value agent_name.sinks. sink_name.property2 = value agent_name.sinks. sink_name.property3 = value
1 2 agent_name.sources.source_name.channels = channel_name agent_name.sinks.sink_name.channels = channel_name
Flume配置参数
source
Avro:监听Avro端口,从Avro client streams接收events。Avro Source被设计为高扩展的RPC服务器,能从其他的Flume Agent的Avro Sink或者使用Flume的SDK发生数据的客户端应用,接受数据到一个Flume Agent中
利用Avro Source可以实现多级流动,扇出流,扇入流等效果。另外也可以接受通过Flume提供Avro客户端发送日志信息
Thrift Source
ThriftSource 与Avro Source 基本一致。只要把source的类型改成thrift即可,例如a1.sources.r1.type = thrift,比较简单。
Spooling Directory Source Spooling Directory Source监测配置的目录下新增的文件,并将文件中的数据读取出来。其中,Spool Source有2个注意地方,第一个是拷贝到spool目录下的文件不可以再打开编辑,第二个是spool目录下不可包含相应的子目录。这个主要用途作为对日志的准实时监控。可选参数过多,不展示。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 #监控指定的目录,如果有新文件产生,那么将文件的内容显示到控制台 #配置一个agent agent的名称可以自定义 #指定agent的 sources,sinks,channels #分别指定 agent的 sources,sinks,channels 的名称 名称可以自定义 a1.sources=s1 a1.channels=c1 a1.sinks=k1 #配置 source 根据 agent的 sources 的名称来对 source 进行配置 #source 的参数是根据 不同的数据源 配置不同---在文档查找即可 #配置目录 source flume这个文件夹用于存储需要读取的文件 a1.sources.s1.type=spooldir a1.sources.s1.spoolDir=/home/hadoop/apps/apache-flume-1.8.0-bin/flume #配置 channel 根据 agent的 channels的名称来对 channels 进行配置 #配置内存 channel a1.channels.c1.type=memory #配置 sink 根据 agent的sinks 的名称来对 sinks 进行配置 #配置一个 logger sink a1.sinks.k1.type=logger #绑定 特别注意 source的channel 的绑定有 s,sink的 channel的绑定没有 s a1.sources.s1.channels=c1 a1.sinks.k1.channel=c1
实例 从A服务器到B服务器
需求:将A服务器中日志实时采集到B服务器
A服务器:监控一个文件实时采集新增的数据输出B服务器
B服务器:从指定网络端口采集数据输出到控制台
A服务器conf
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 a.sources = a_source a.sinks = a_sink a.channels = memory-channel a.sources.a_source.type = exec a.sources.a_source.command = tail -F /root/logs/collect.log a.sources.a_source.shell = /bin /sh -c a.sinks.a_sink.type = avro a.sinks.a_sink.hostname = A_IP a.sinks.a_sink.port = 44444 exec -memory-avro.channels.memory-channel.type = memorya.sources.a_source.channels = memory-channel a.sinks.a_sink.channel = memory-channel
B服务器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 b.sources = b_source b.sinks = b_sink b.channels = memory-channel b.sources.b_source.type = avro b.sources.b_source.bind = B_IP b.sources.b_source.port = 44444 avro-memory-logger.sinks.logger-sink.type = logger avro-memory-logger.channels.memory-channel.type = memory b.sources.b_source.channels = memory-channel b.sinks.b_sink.channel = memory-channel
先启动B,再启动A
1 2 3 4 # B /root/bigdata/flume/bin/flume-ng agent --conf conf --conf-file /root/bigdata/flume/conf/b.conf --name b -Dflume.root.logger=INFO,console # A /root/bigdata/flume/bin/flume-ng agent --conf conf --conf-file /root/bigdata/flume/conf/a.conf --name a -Dflume.root.logger=INFO,console
采集目录到HDFS中 采集需求:服务器特定目录下,会不断产生新文件,每当有新文件出现,就需要把文件采集到HDFS中去
采集源:spooldir
下沉目标:hdfs sink
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 #命名 exec-memory-hdfs.sources = exec-source exec-memory-hdfs.sinks = hdfs-sink exec-memory-hdfs.channels = memory-channel # 定义source exec-memory-hdfs.sources.exec-source.type = exec exec-memory-hdfs.sources.exec-source.command = tail -F /root/logs/collect.log exec-memory-avro.sources.exec-source.shell=/bin/sh -c # 定义sink exec-memory-hdfs.sinks.hdfs-sink.type = hdfs exec-memory-hdfs.sinks.hdfs-sink.hdfs.path = hdfs://IP:9000/headlines/events/%Y-%m-%d/ exec-memory-hdfs.sinks.hdfs-sink.hdfs.filePrefix = events- exec-memory-hdfs.sinks.hdfs-sink.hdfs.round = true exec-memory-hdfs.sinks.hdfs-sink.hdfs.roundValue = 10 exec-memory-hdfs.sinks.hdfs-sink.hdfs.roundUnit = minute exec-memory-hdfs.sinks.hdfs-sink.hdfs.rollInterval = 3 exec-memory-hdfs.sinks.hdfs-sink.hdfs.rollSize = 20 exec-memory-hdfs.sinks.hdfs-sink.hdfs.rollCount = 5 exec-memory-hdfs.sinks.hdfs-sink.hdfs.batchSize = 1 exec-memory-hdfs.sinks.hdfs-sink.hdfs.useLocalTimeStamp = true #生成的文件类型,默认是 Sequencefile,可用 DataStream,则为普通文本 exec-memory-hdfs.sinks.hdfs-sink.hdfs.fileType = DataStream # 定义channel exec-memory-hdfs.channels.memory-channel.type = memory exec-memory-hdfs.channels.memory-channel.capacity = 1000 exec-memory-hdfs.channels.memory-channel.transactionCapacity = 100 # 连接source和sink exec-memory-hdfs.sources.exec-source.channels = memory-channel exec-memory-hdfs.sinks.hdfs-sink.channel = memory-channel
1 2 # 启动 /root/bigdata/flume/bin/flume-ng agent --conf conf --conf-file /root/bigdata/flume/conf/spool-memory-hdfs.conf --name exec-memory-hdfs -Dflume.root.logger=INFO,console
channel参数解释:
capacity:默认该通道中最大的可以存储的 event 数量
trasactionCapacity:每次最大可以从 source 中拿到或者送到 sink 中的 event数量
sinks参数解析:
rollInterval:默认值:30
hdfs sink 间隔多长将临时文件滚动成最终目标文件,单位:秒;如果设置成 0,则表示不根据时间来滚动文件;注:滚动(roll)指的是,hdfs sink 将临时文件重命名成最终目标文件,并新打开一个临时文件来写入数据;
rollSize:默认值:1024
当临时文件达到该大小(单位:bytes)时,滚动成目标文件;如果设置成 0,则表示不根据临时文件大小来滚动文件;
rollCount:默认值:10,当 events 数据达到该数量时候,将临时文件滚动成目标文件;如果设置成 0,则表示不根据 events 数据来滚动文件;
round:默认值:false,是否启用时间上的“舍弃”,这里的“舍弃”,类似于“四舍五入”。
roundValue:默认值:1时间上进行“舍弃”的值;
roundUnit:默认值:seconds,时间上进行“舍弃”的单位,包含:second,minute,hour
exec-memory-hdfs.sinks.hdfs-sink.hdfs.useLocalTimeStamp = true:添加时间戳,创建文件可以使用时间(也可以通过FLume的拦截器插件实现)
Flume插件
interceptors拦截器:
用于source和channel之间,用来更改或者检查Flume的events数据
管道选择器channel Selectors:
再多管道是被用来选择使用哪一条管道来传递数据(events).管道选择器有分为两种:
默认管道选择器:每个管道传递的都是相同的events
多路复用通道选择器:依据每一个event的头部header的地址选择管道
Flume中的拦截器,用户Source读取events读取Sink的时候,再events header中加入一些有用的信息,或者对events的内容进行过滤,完成初步的数据清洗。
Interceptors拦截器
UUID Interceptor:UUID拦截器,用于在每个events header中生成一个UUID字符串,例如:b5755073-77a9-43c1-8fad-b7a586fc1b97。生成的UUID可以在sink中读取并使用。
Host Interceptor:主机名拦截器。将运行Flume agent的主机名或者IP地址加入到events header中,key名字为:host(也可自定义)。
日志收集案例
案例场景:A、B 两台日志服务机器实时生产日志主要类型为 access.log、nginx.log、web.log
现在要求:把 A、B 机器中的 access.log、nginx.log、web.log 采集汇总到 C 机器上
然后统一收集到hdfs中。但是在hdfs中要求目录为:
1 2 3 4 5 /source/logs/access/20160101/** /source/logs/nginx/20160101/** /source/logs/web/20160101/**
**A、B服务器上的配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 exec -memory-avro.sources = access-source nginx-source web-sourceexec -memory-avro.sinks = avro-sinkexec -memory-avro.channels = memory-channelexec -memory-avro.sources.access-source.type = exec exec -memory-avro.sources.access-source.command = tail -F /root/logs/access.logexec -memory-avro.sources.access-source.interceptors = i1exec -memory-avro.sources.access-source.interceptors.i1.type = staticexec -memory-avro.sources.access-source.interceptors.i1.key = type exec -memory-avro.sources.access-source.interceptors.i1.value = accessexec -memory-avro.sources.nginx-source.type = exec exec -memory-avro.sources.nginx-source.command = tail -F /root/logs/nginx.logexec -memory-avro.sources.nginx-source.interceptors = i2exec -memory-avro.sources.nginx-source.interceptors.i2.type = staticexec -memory-avro.sources.nginx-source.interceptors.i2.key = type exec -memory-avro.sources.nginx-source.interceptors.i2.value = nginxexec -memory-avro.sources.web-source.type = exec exec -memory-avro.sources.web-source.command = tail -F /root/logs/web.logexec -memory-avro.sources.web-source.interceptors = i3exec -memory-avro.sources.web-source.interceptors.i3.type = staticexec -memory-avro.sources.web-source.interceptors.i3.key = type exec -memory-avro.sources.web-source.interceptors.i3.value = webexec -memory-avro.sinks.avro-sink.type = avroexec -memory-avro.sinks.avro-sink.hostname = 192.168 .19 .137 exec -memory-avro.sinks.avro-sink.port = 44444 exec -memory-avro.channels.memory-channel.type = memoryexec -memory-avro.channels.memory-channel.capacity = 20000 exec -memory-avro.channels.memory-channel.transactionCapacity = 10000 exec -memory-avro.sources.access-source.channels = memory-channelexec -memory-avro.sources.nginx-source.channels = memory-channelexec -memory-avro.sources.web-source.channels = memory-channelexec -memory-avro.sinks.avro-sink.channel = memory-channel
C服务器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 avro-memory-hdfs.sources = avro-source avro-memory-hdfs.sinks = hdfs-sink avro-memory-hdfs.channels = memory-channel avro-memory-hdfs.sources.avro-source.type = avro avro-memory-hdfs.sources.avro-source.bind = 192.168 .19 .137 avro-memory-hdfs.sources.avro-source.port =44444 avro-memory-hdfs.sources.avro-source.interceptors = i1 avro-memory-hdfs.sources.avro-source.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder avro-memory-hdfs.sinks.hdfs-sink.type = hdfs avro-memory-hdfs.sinks.hdfs-sink.hdfs.path=hdfs://192.168 .19 .137 :9000 /headlines/logs/%{type }/%Y-%m-%d avro-memory-hdfs.sinks.hdfs-sink.hdfs.filePrefix =events avro-memory-hdfs.sinks.hdfs-sink.hdfs.fileType = DataStream avro-memory-hdfs.sinks.hdfs-sink.hdfs.writeFormat = Text avro-memory-hdfs.sinks.hdfs-sink.hdfs.useLocalTimeStamp = true avro-memory-hdfs.sinks.hdfs-sink.hdfs.rollCount = 0 avro-memory-hdfs.sinks.hdfs-sink.hdfs.rollInterval = 30 avro-memory-hdfs.sinks.hdfs-sink.hdfs.rollSize = 10485760 avro-memory-hdfs.sinks.hdfs-sink.hdfs.batchSize = 10000 avro-memory-hdfs.sinks.hdfs-sink.hdfs.threadsPoolSize=10 avro-memory-hdfs.sinks.hdfs-sink.hdfs.callTimeout=30000 avro-memory-hdfs.channels.memory-channel.type = memory avro-memory-hdfs.channels.memory-channel.capacity = 20000 avro-memory-hdfs.channels.memory-channel.transactionCapacity = 10000 avro-memory-hdfs.sources.avro-source.channels = memory-channel avro-memory-hdfs.sinks.hdfs-sink.channel = memory-channel
先启动C再分别启动A和B
1 2 3 4 # C /root/bigdata/flume/bin/flume-ng agent -c conf -f /root/bigdata/flume/conf/avro-memory-hdfs.conf -name avro-memory-hdfs -Dflume.root.logger=DEBUG,console # A、B /root/bigdata/flume/bin/flume-ng agent -c conf -f /root/bigdata/flume/conf/exec-memory-avro.conf -name exec-memory-avro - Dflume.root.logger=DEBUG,console
Flume优化
负载均衡(load balance)
容错(failover)
load balance 负载均衡是用于解决一台机器(一个进程)无法解决所有请求而产生的一种算法。Load balancing Sink Processor能够实现load balance功能,如下图Agent1是一个路由节点,负责将Channel暂存的event均衡到对应的多个Sink组件上,而每个Sink组件分别连接到一个独立的Agent上
常见配置:
1 2 3 4 5 6 7 8 9 10 11 a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 k3 a1.sinkgroups.g1.processor.type = load_balance a1.sinkgroups.g1.processor.backoff = true # 如果开启,则将失败的 sink 放入黑名单 a1.sinkgroups.g1.processor.selector = round_robin # 另外还支持 random a1.sinkgroups.g1.processor.selector.maxTimeOut=10000 #在黑名单放置的超时时间,超时结束时,若仍然无法接收,则超时时间呈指数增长
failover Failover Sink Processor 能够实现 failover 功能,具体流程类似 load balance,但是内部处理机制与 load balance 完全不同。
Failover Sink Processor 维护一个优先级 Sink 组件列表,只要有一个 Sink组件可用,Event 就被传递到下一个组件。 故障转移机制的作用是将失败的 Sink降级到一个池,在这些池中它们被分配一个冷却时间,随着故障的连续,在重试之前冷却时间增加。一旦 Sink 成功发送一个事件,它将恢复到活动池。 Sink 具有与之相关的优先级,数量越大,优先级越高。 例如,具有优先级为 100 的 sink 在优先级为 80 的 Sink 之前被激活。如果在发送事件时汇聚失败,则接下来将尝试下一个具有最高优先级的 Sink 发送事件。如果没有指定优先级,则根据在配置中指定 Sink 的顺序来确定优先级。
常见配置:
1 2 3 4 5 6 7 a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 k3 a1.sinkgroups.g1.processor.type = failover a1.sinkgroups.g1.processor.priority.k1 = 5 a1.sinkgroups.g1.processor.priority.k2 = 7 a1.sinkgroups.g1.processor.priority.k3 = 6 a1.sinkgroups.g1.processor.maxpenalty = 20000
转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 zoubinbf@163.com