Flume--负载均衡和故障转移

  Flume Sink Processors,source里的event流经channel,进入sink。在sink中可以分组,sink groups允许给一个实体设置多个sinks,sink processors可以使在sink group中所有sink具有负载均衡的能力,或者在一个sink失效后切换到另一个sink的fail over模式

  • 下面来看一下Sink Processors结构图
    Sink Processors 结构图
  • 具体配置可以看文档
    Flume Sink Processors

准备

  • 机器配置 -博主有三台机器,分别是xxo08xxo09xxo10
    xxo08 : 一个agent,exec source用于读取/opt/data/access.log数据,并使用了4个avro sink;
    xxo09 : 两个agent,端口分别为44444,44445 ,都为avro source用于接收xxo08 avro sink过来的数据;
    xxo10 : 两个agent,端口分别为44444,44445 ,都为avro source用于接收xxo08 avro sink过来的数据;

default

  • 默认的sink processor只接受一个sink,不用创建sink group

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    # Default Sink Processor(默认)

    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1 k2 k3 k4
    a1.channels = c1

    # exec source
    a1.sources.r1.type = exec
    a1.sources.r1.command = tail -F /opt/data/access.log

    # 4 avro sink
    a1.sinks.k1.type = avro
    a1.sinks.k1.hostname=xxo09
    a1.sinks.k1.port=44444

    a1.sinks.k2.type = avro
    a1.sinks.k2.hostname=xxo10
    a1.sinks.k2.port=44444

    a1.sinks.k3.type = avro
    a1.sinks.k3.hostname=xxo09
    a1.sinks.k3.port=44445

    a1.sinks.k4.type = avro
    a1.sinks.k4.hostname=xxo10
    a1.sinks.k4.port=44445

    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    ####存储在channel中的最大容量#################
    a1.channels.c1.capacity = 1000
    ####从一个source中去或者给一个sink,每个事务中最大的事件数#########
    a1.channels.c1.transactionCapacity = 100


    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    a1.sinks.k2.channel = c1
    a1.sinks.k3.channel = c1
    a1.sinks.k4.channel = c1
  • 下面我向access.log文件快速写入450条数据

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    [root@xxo08 flume]# echo "3" >> /opt/data/access.log
    [root@xxo08 flume]# echo "3" >> /opt/data/access.log
    [root@xxo08 flume]# echo "3" >> /opt/data/access.log
    [root@xxo08 flume]# echo "3" >> /opt/data/access.log
    [root@xxo08 flume]# echo "3" >> /opt/data/access.log
    [root@xxo08 flume]# echo "3" >> /opt/data/access.log
    [root@xxo08 flume]# echo "3" >> /opt/data/access.log
    [root@xxo08 flume]# echo "3" >> /opt/data/access.log
    [root@xxo08 flume]# echo "3" >> /opt/data/access.log
    [root@xxo08 flume]# echo "3" >> /opt/data/access.log
    [root@xxo08 flume]# echo "3" >> /opt/data/access.log
    ......

    ####发现数据全部都到了xxo09的44445端口
    2016-05-23 23:26:36,286 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x39baaf57, /192.168.33.73:60764 => /192.168.33.74:44445] CONNECTED: /192.168.33.73:60764
    2016-05-23 23:26:42,330 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 33 3 }
    2016-05-23 23:26:42,330 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 33 3 }
    2016-05-23 23:26:42,330 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 33 3 }
    2016-05-23 23:26:42,331 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 33 3 }
    ......
  • 注意:很慢的写数据时不定的

负载均衡

  • 负载均衡即sink组内部根据负载算法(round_robin轮询、random随机)选择sink,后续可以选择不同机器上的agent实现负载均衡

random

  • 配置如下

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    # Default Sink Processor(负载均衡-随机)

    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1 k2 k3 k4
    a1.channels = c1

    # exec source
    a1.sources.r1.type = exec
    a1.sources.r1.command = tail -F /opt/data/access.log

    # 4 avro sink
    a1.sinks.k1.type = avro
    a1.sinks.k1.hostname=xxo09
    a1.sinks.k1.port=44444

    a1.sinks.k2.type = avro
    a1.sinks.k2.hostname=xxo10
    a1.sinks.k2.port=44444

    a1.sinks.k3.type = avro
    a1.sinks.k3.hostname=xxo09
    a1.sinks.k3.port=44445

    a1.sinks.k4.type = avro
    a1.sinks.k4.hostname=xxo10
    a1.sinks.k4.port=44445

    #define sinkgroups random
    a1.sinkgroups = g1
    a1.sinkgroups.g1.sinks = k1 k2 k3 k4
    a1.sinkgroups.g1.processor.type = load_balance
    a1.sinkgroups.g1.processor.backoff = true
    a1.sinkgroups.g1.processor.selector = random


    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100


    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    a1.sinks.k2.channel = c1
    a1.sinks.k3.channel = c1
    a1.sinks.k4.channel = c1
  • 源码如下

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    @Override
    public synchronized Iterator<T> createIterator() {
    List<Integer> indexList = getIndexList();

    int size = indexList.size();
    int[] indexOrder = new int[size];

    while (indexList.size() != 1) {
    int pick = random.nextInt(indexList.size());
    indexOrder[indexList.size() - 1] = indexList.remove(pick);
    }

    indexOrder[0] = indexList.get(0);

    return new SpecificOrderIterator<T>(indexOrder, getObjects());
    }
  • 测试数据 450条数据

  1. 博主就向/opt/data/access.log中快速写入450条数据
  2. 博主在内存channel中设置了,每次最大送到sink中的event数量也是100(如上配置
  3. 测试结果,如下:

    1
    2
    3
    4
    xxo09 端口44444 条数:41
    xxo09 端口44445 条数:100
    xxo10 端口44444 条数:206
    xxo10 端口44445 条数:10
  4. 并且在xxo10 端口44444中100-101中基本不存在时间间隔,在200-201中存在时间间隔
    随机选取,接下来很快被选中了
    随机选取,继续随机并没有很快被选中

round_robin

  • 轮询,其实很简单,就是挨个挨个的来推送到sink,下面来看一下源码(RoundRobinOrderSelector):

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    @Override
    public Iterator<T> createIterator() {
    List<Integer> activeIndices = getIndexList();
    int size = activeIndices.size();
    // possible that the size has shrunk so gotta adjust nextHead for that
    if (nextHead >= size) {
    nextHead = 0;
    }

    ######每次开始时,nextHead加1##################
    int begin = nextHead++;
    if (nextHead == activeIndices.size()) {
    nextHead = 0;
    }

    int[] indexOrder = new int[size];

    ######重点是下面的代码##########################
    for (int i = 0; i < size; i++) {
    indexOrder[i] = activeIndices.get((begin + i) % size);
    }

    return new SpecificOrderIterator<T>(indexOrder, getObjects());
    }
  • 测试轮询

  1. 由于博主在内存channel中设置了,每次最大送到sink中的event数量也是100
  2. 博主就向/opt/data/access.log中快速写入450条数据
  3. 测试结果,有三个sink中刚好为100条event,另一个event为150条数据,其中150条数据的记录时间如下:
    Flume 轮询

故障转移

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# Default Sink Processor(故障转移)

# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2 k3 k4
a1.channels = c1

# exec source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/data/access.log

# 4 avro sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname=xxo09
a1.sinks.k1.port=44444

a1.sinks.k2.type = avro
a1.sinks.k2.hostname=xxo10
a1.sinks.k2.port=44444

a1.sinks.k3.type = avro
a1.sinks.k3.hostname=xxo09
a1.sinks.k3.port=44445

a1.sinks.k4.type = avro
a1.sinks.k4.hostname=xxo10
a1.sinks.k4.port=44445

#define sinkgroups Failover
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2 k3 k4
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 6
a1.sinkgroups.g1.processor.priority.k3 = 10
a1.sinkgroups.g1.processor.priority.k4 = 8
a1.sinkgroups.g1.processor.maxpenalty = 10000


# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100


# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
a1.sinks.k3.channel = c1
a1.sinks.k4.channel = c1
  • 测试结果
  1. 如上配置,xxo09,端口44445优先级最高(10),测试450条数据,全部都去了那儿。
  2. 如果中途强制kill掉,剩余event会转移到xxo10 端口44445的agent(故障成功转移)!
  3. 强制kill时会有如下提示
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    [root@xxo08 flume]# 2016-05-24 02:16:31,492 (SinkRunner-PollingRunner-FailoverSinkProcessor) [WARN - org.apache.flume.sink.FailoverSinkProcessor.process(FailoverSinkProcessor.java:185)] Sink k3 failed and has been sent to failover list
    org.apache.flume.EventDeliveryException: Failed to send events
    at org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:392)
    at org.apache.flume.sink.FailoverSinkProcessor.process(FailoverSinkProcessor.java:182)
    at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
    at java.lang.Thread.run(Thread.java:745)
    Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient { host: xxo09, port: 44445 }: Failed to send batch
    at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:315)
    at org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:376)
    ... 3 more
    Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient { host: xxo09, port: 44445 }: RPC request exception
    at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:365)
    at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:303)
    ... 4 more
    Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Error connecting to xxo09/192.168.33.74:44445
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.util.concurrent.FutureTask.get(FutureTask.java:202)
    at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:357)
    ... 5 more
    Caused by: java.io.IOException: Error connecting to xxo09/192.168.33.74:44445
    at org.apache.avro.ipc.NettyTransceiver.getChannel(NettyTransceiver.java:261)
    at org.apache.avro.ipc.NettyTransceiver.getRemoteName(NettyTransceiver.java:386)
    at org.apache.avro.ipc.Requestor.writeHandshake(Requestor.java:202)
    at org.apache.avro.ipc.Requestor.access$300(Requestor.java:52)
    at org.apache.avro.ipc.Requestor$Request.getBytes(Requestor.java:478)
    at org.apache.avro.ipc.Requestor.request(Requestor.java:147)
    at org.apache.avro.ipc.Requestor.request(Requestor.java:129)
    at org.apache.avro.ipc.specific.SpecificRequestor.invoke(SpecificRequestor.java:84)
    at com.sun.proxy.$Proxy4.appendBatch(Unknown Source)
    at org.apache.flume.api.NettyAvroRpcClient$2.call(NettyAvroRpcClient.java:348)
    at org.apache.flume.api.NettyAvroRpcClient$2.call(NettyAvroRpcClient.java:344)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    ... 1 more
    Caused by: java.net.ConnectException: Connection refused
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
    at org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink$Boss.connect(NioClientSocketPipelineSink.java:496)
    at org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink$Boss.processSelectedKeys(NioClientSocketPipelineSink.java:452)
    at org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink$Boss.run(NioClientSocketPipelineSink.java:365)
    ... 3 more
    2016-05-24 02:16:35,651 (SinkRunner-PollingRunner-FailoverSinkProcessor) [INFO - org.apache.flume.sink.AbstractRpcSink.createConnection(AbstractRpcSink.java:206)] Rpc sink k3: Building RpcClient with hostname: xxo09, port: 44445
    2016-05-24 02:16:35,651 (SinkRunner-PollingRunner-FailoverSinkProcessor) [INFO - org.apache.flume.sink.AvroSink.initializeRpcClient(AvroSink.java:126)] Attempting to create Avro Rpc client.
    2016-05-24 02:16:35,652 (SinkRunner-PollingRunner-FailoverSinkProcessor) [WARN - org.apache.flume.api.NettyAvroRpcClient.configure(NettyAvroRpcClient.java:634)] Using default maxIOWorkers
    ......

博客源码

当前网速较慢或者你使用的浏览器不支持博客特定功能,请尝试刷新或换用Chrome、Firefox等现代浏览器