Flume--日志收集

  Flume是一个分布式、高可靠、高可用的日志收集系统。能够有效的收集、聚合、移动大量的日志。把各种类型的数据源采集汇总到各种类型的目的地,flume有一个口号:“我们不生产数据,我们是数据的搬运工。”

  • 那我们怎么去搬运数据呢?让我们想想,那就需要一个入口(采集数据),和一个出口(推送数据),中间再加上一些队列(暂存数据,让数据流动起来),这样一来我们就可以去搬运数据了,自己也可以去实现一个简单的搬运数据的。当然我们已经不需要了,因为大牛已经帮我们实现了就是flume,能采集各种数据(各种source),推动到各种目的地(sink)。下面我们来看看flume的组件结构:
    flume组件

Flume 理解

Flume 的核心(agent)就是把数据从数据源收集过来,再送到目的地。为了保证高可靠输送一定成功,在送到目的地之前,会先缓存数据,待数据真正到达目的地后,删除自己缓存的数据。

  1. Flume : 传输的数据的基本单位是 Event,如果是文本文件,通常是一行记录,这也是事务的基本单位。
  2. Event : (包含:headers:{} 、body) 从 Source,流向 Channel,再到 Sink,本身为一个 byte 数组。
  3. Source : 对数据进行收集,分成transtion 和 event 打入到channel之中
  4. Channel : 就像一个管道(队列),接收 Source 的输出,再推送给 Sink 消费。数据直到进入到下一个Channel中或者进入终端才会被删除。即:中转Event临时存储,在 sources 和 sinks之间起一个连接作用
  5. Sink : 取出 Channel 中的数据,然后送给外部源(HDFS、HBase)或者其他 Source
  • Flume处理流程总结:
    Flume由事件(Event)贯穿了整个数据流动。事件是Flume的基本数据单位,它携带日志数据(字节数组形式)和头信息,这些Event由Agent外部的Source生成,当Source捕获事件后会进行特定的格式化,然后Source会把事件推入(单个或多个)Channel中(缓冲区),它将保存事件直到Sink处理完该事件。Sink负责持久化日志或者把事件推向另一个Source

Flume 组件

  • Flume提供了大量内置的Source、Channel和Sink。不同类型的Source,Channel和Sink可以自由组合。比如:source 来源可以是日志文件,Avro和Thrift端口Kafka等, Channel可以把事件暂存在内存里,也可以持久化到本地硬盘上。Sink可以把日志写入HDFS、Hive、HBase,甚至是另外一个Source等。

Source

  1. Avro Source:监听一个 avro 服务端口,采集Avro数据序列化后的数据;
    Avro Source
    type:avrosource的类型,必须是avro。
    bind:要监听的(本机的)主机名或者ip。此监听不是过滤发送方。一台电脑不是说只有一个IP。有多网卡的电脑,对应多个IP。
    port:绑定的本地的端口。

  2. Exec Source:基于Unix的command在标准输出上采集数据(tail -F);
    Exec Source
    type:source的类型:必须是exec。
    command:要执行命令。

  3. NetCat Source绑定的端口(tcp、udp),将流经端口的每一个文本行数据作为Event输入;
    NetCat Source
    type:source的类型,必须是netcat。
    bind:要监听的(本机的)主机名或者ip。此监听不是过滤发送方。一台电脑不是说只有一个IP。有多网卡的电脑,对应多个IP。
    port:绑定的本地的端口。

  4. Kafka Source:从 kafka 服务中采集数据;
    Kafka Source

  5. Thrift Source:监听一个 Thrift 服务端口,采集Thrift数据序列化后的数据;
    Thrift Source

  6. JMS Source Java 消息服务数据源,Java消息服务是一个与具体平台无关的API,这是支持jms规范的数据源采集;

  7. Spooling Directory Source:通过文件夹里的新增的文件作为数据源的采集;
  8. HTTP Source:监听 HTTP POST和 GET 产生的数据的采集;

Channel

  1. Memory Channel:使用 内存 作为数据的存储(详情大家可以去官网,或者阅读下载下来的doc文档)。
    Memory Channel
    channel的类型 : 必须为memory
    capacity : channel中的最大event数目
    transactionCapacity : channel中允许事务的最大event数目

  2. JDBC Channel:使用 jdbc 数据源来作为数据的存储。

  3. Kafka Channel:使用 kafka服务 来作为数据的存储。
  4. File Channel:使用 文件 来作为数据的存储。
  5. Spillable Memory Channel:使用内存和文件作为数据的存储,即:先存在内存中,如果内存中数据达到阀值则flush到文件中。

Sink

  1. HDFS Sink:将数据传输到 hdfs 集群中。
    HDFS Sink
    type : sink的类型 必须是hdfs。
    hdfs.path : hdfs的上传路径。
    hdfs.filePrefix : hdfs文件的前缀。默认是:FlumeData
    hdfs.rollInterval : 间隔多久产生新文件,默认是:30(秒) 0表示不以时间间隔为准。
    hdfs.rollSize : 文件到达多大再产生一个新文件,默认是:1024(bytes)0表示不以文件大小为准。
    hdfs.rollCount : event达到多大再产生一个新文件,默认是:10(个)0表示不以event数目为准。
    hdfs.batchSize : 每次往hdfs里提交多少个event,默认为100
    hdfs.fileType : hdfs文件的格式
    hdfs.codeC : 压缩方式:gzip, bzip2, lzo, lzop, snappy

  2. Hive Sink:将数据传输到 hive 的表中。

  3. Logger Sink:将数据作为 日志 处理(根据flume中的设置的日志的级别显示)。
    要在控制台显示在运行agent的时候加入:-Dflume.root.logger=INFO,console 。
    type : sink的类型:必须是 logger。
    maxBytesToLog : 打印body的最长的字节数 默认为16

  4. Avro Sink:数据被转换成 Avro Event ,然后发送到指定的服务端口上。

  5. Thrift Sink:数据被转换成 Thrift Event ,然后发送到指定的的服务端口上。
  6. Kafka Sink:将数据发送到 kafka服务 中。(注意依赖类库)
  7. IRC Sink:数据向指定的 IRC服务 和端口中发送。
  8. File Roll Sink:数据传输到 本地文件 中。
  9. Null Sink取消数据的传输,即不发送到任何目的地。
  10. HBaseSink:将数据发往 hbase 数据库中。
  11. MorphlineSolrSink:数据发送到 Solr搜索服务器 (集群)。
  12. ElasticSearchSink:数据发送到 Elastic Search 搜索服务器(集群)。

Interceptor

  1. Timestamp Interceptor : 时间戳拦截器 在header里加入key为timestamp,value为当前时间。
    type : 拦截器的类型,必须为timestamp
    preserveExisting : 如果此拦截器增加的key已经存在,如果这个值设置为true则保持原来的值,否则覆盖原来的值。默认为false

  2. Host Interceptor : 主机名或者ip拦截器,在header里加入ip或者主机名
    type : 拦截器的类型,必须为host
    preserveExisting : 如果此拦截器增加的key已经存在,如果这个值设置为true则保持原来的值,否则覆盖原来的值。默认为false
    useIP : 如果设置为true则使用ip地址,否则使用主机名,默认为true
    hostHeader : 使用的header的key名字,默认为host

  3. Static Interceptor静态拦截器,是在header里加入固定的key和value。
    type : avrosource的类型,必须是static。
    preserveExisting : 如果此拦截器增加的key已经存在,如果这个值设置为true则保持原来的值,否则覆盖原来的值。默认为false
    key : 静态拦截器添加的key的名字
    value : 静态拦截器添加的key对应的value值

Channel Selector

  • Multiplexing Channel Selector 根据header的key的值分配channel

    selector.type 默认为replicating
    selector.header:选择作为判断的key
    selector.default:默认的channel配置
    selector.mapping.*:匹配到的channel的配置

flume 整体结构图

当前网速较慢或者你使用的浏览器不支持博客特定功能,请尝试刷新或换用Chrome、Firefox等现代浏览器