Flume--负载均衡和故障转移
Flume Sink Processors,source里的event流经channel,进入sink。在sink中可以分组,sink groups允许给一个实体设置多个sinks,sink processors可以使在sink group中所有sink具有负载均衡的能力,或者在一个sink失效后切换到另一个sink的fail over模式。
- 下面来看一下Sink Processors的结构图:
- 具体配置可以看文档:
准备
- 机器配置 -博主有三台机器,分别是
xxo08
、xxo09
、xxo10
xxo08
: 一个agent,exec source用于读取/opt/data/access.log数据,并使用了4个avro sink;xxo09
: 两个agent,端口分别为44444,44445 ,都为avro source用于接收xxo08 avro sink过来的数据;xxo10
: 两个agent,端口分别为44444,44445 ,都为avro source用于接收xxo08 avro sink过来的数据;
default
默认的sink processor只接受一个sink,不用创建sink group。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42# Default Sink Processor(默认)
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2 k3 k4
a1.channels = c1
# exec source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/data/access.log
# 4 avro sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname=xxo09
a1.sinks.k1.port=44444
a1.sinks.k2.type = avro
a1.sinks.k2.hostname=xxo10
a1.sinks.k2.port=44444
a1.sinks.k3.type = avro
a1.sinks.k3.hostname=xxo09
a1.sinks.k3.port=44445
a1.sinks.k4.type = avro
a1.sinks.k4.hostname=xxo10
a1.sinks.k4.port=44445
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
####存储在channel中的最大容量#################
a1.channels.c1.capacity = 1000
####从一个source中去或者给一个sink,每个事务中最大的事件数#########
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
a1.sinks.k3.channel = c1
a1.sinks.k4.channel = c1下面我向access.log文件快速写入450条数据:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20[root@xxo08 flume]# echo "3" >> /opt/data/access.log
[root@xxo08 flume]# echo "3" >> /opt/data/access.log
[root@xxo08 flume]# echo "3" >> /opt/data/access.log
[root@xxo08 flume]# echo "3" >> /opt/data/access.log
[root@xxo08 flume]# echo "3" >> /opt/data/access.log
[root@xxo08 flume]# echo "3" >> /opt/data/access.log
[root@xxo08 flume]# echo "3" >> /opt/data/access.log
[root@xxo08 flume]# echo "3" >> /opt/data/access.log
[root@xxo08 flume]# echo "3" >> /opt/data/access.log
[root@xxo08 flume]# echo "3" >> /opt/data/access.log
[root@xxo08 flume]# echo "3" >> /opt/data/access.log
......
####发现数据全部都到了xxo09的44445端口
2016-05-23 23:26:36,286 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x39baaf57, /192.168.33.73:60764 => /192.168.33.74:44445] CONNECTED: /192.168.33.73:60764
2016-05-23 23:26:42,330 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 33 3 }
2016-05-23 23:26:42,330 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 33 3 }
2016-05-23 23:26:42,330 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 33 3 }
2016-05-23 23:26:42,331 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 33 3 }
......注意:很慢的写数据时不定的
负载均衡
- 负载均衡:
即sink组内部根据负载算法(round_robin轮询、random随机)选择sink,后续可以选择不同机器上的agent实现负载均衡
。
random
配置如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48# Default Sink Processor(负载均衡-随机)
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2 k3 k4
a1.channels = c1
# exec source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/data/access.log
# 4 avro sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname=xxo09
a1.sinks.k1.port=44444
a1.sinks.k2.type = avro
a1.sinks.k2.hostname=xxo10
a1.sinks.k2.port=44444
a1.sinks.k3.type = avro
a1.sinks.k3.hostname=xxo09
a1.sinks.k3.port=44445
a1.sinks.k4.type = avro
a1.sinks.k4.hostname=xxo10
a1.sinks.k4.port=44445
#define sinkgroups random
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2 k3 k4
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = random
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
a1.sinks.k3.channel = c1
a1.sinks.k4.channel = c1源码如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public synchronized Iterator<T> createIterator() {
List<Integer> indexList = getIndexList();
int size = indexList.size();
int[] indexOrder = new int[size];
while (indexList.size() != 1) {
int pick = random.nextInt(indexList.size());
indexOrder[indexList.size() - 1] = indexList.remove(pick);
}
indexOrder[0] = indexList.get(0);
return new SpecificOrderIterator<T>(indexOrder, getObjects());
}测试数据 450条数据
- 博主就向/opt/data/access.log中快速写入450条数据
- 博主在内存channel中设置了,每次最大送到sink中的event数量也是100(如上配置)
测试结果,如下:
1
2
3
4xxo09 端口44444 条数:41
xxo09 端口44445 条数:100
xxo10 端口44444 条数:206
xxo10 端口44445 条数:10并且在xxo10 端口44444中100-101中基本不存在时间间隔,在200-201中存在时间间隔
round_robin
轮询,其实很简单,就是挨个挨个的来推送到sink,下面来看一下源码(RoundRobinOrderSelector):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public Iterator<T> createIterator() {
List<Integer> activeIndices = getIndexList();
int size = activeIndices.size();
// possible that the size has shrunk so gotta adjust nextHead for that
if (nextHead >= size) {
nextHead = 0;
}
######每次开始时,nextHead加1##################
int begin = nextHead++;
if (nextHead == activeIndices.size()) {
nextHead = 0;
}
int[] indexOrder = new int[size];
######重点是下面的代码##########################
for (int i = 0; i < size; i++) {
indexOrder[i] = activeIndices.get((begin + i) % size);
}
return new SpecificOrderIterator<T>(indexOrder, getObjects());
}测试轮询:
- 由于博主在内存channel中设置了,每次最大送到sink中的event数量也是100
- 博主就向/opt/data/access.log中快速写入450条数据
- 测试结果,有三个sink中刚好为100条event,另一个event为150条数据,其中150条数据的记录时间如下:
故障转移
1 | # Default Sink Processor(故障转移) |
- 测试结果:
- 如上配置,
xxo09,端口44445
优先级最高(10),测试450条数据,全部都去了那儿。 - 如果中途强制kill掉,剩余event会转移到
xxo10 端口44445
的agent(故障成功转移)! - 强制kill时会有如下提示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46[root@xxo08 flume]# 2016-05-24 02:16:31,492 (SinkRunner-PollingRunner-FailoverSinkProcessor) [WARN - org.apache.flume.sink.FailoverSinkProcessor.process(FailoverSinkProcessor.java:185)] Sink k3 failed and has been sent to failover list
org.apache.flume.EventDeliveryException: Failed to send events
at org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:392)
at org.apache.flume.sink.FailoverSinkProcessor.process(FailoverSinkProcessor.java:182)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient { host: xxo09, port: 44445 }: Failed to send batch
at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:315)
at org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:376)
... 3 more
Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient { host: xxo09, port: 44445 }: RPC request exception
at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:365)
at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:303)
... 4 more
Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Error connecting to xxo09/192.168.33.74:44445
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:202)
at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:357)
... 5 more
Caused by: java.io.IOException: Error connecting to xxo09/192.168.33.74:44445
at org.apache.avro.ipc.NettyTransceiver.getChannel(NettyTransceiver.java:261)
at org.apache.avro.ipc.NettyTransceiver.getRemoteName(NettyTransceiver.java:386)
at org.apache.avro.ipc.Requestor.writeHandshake(Requestor.java:202)
at org.apache.avro.ipc.Requestor.access$300(Requestor.java:52)
at org.apache.avro.ipc.Requestor$Request.getBytes(Requestor.java:478)
at org.apache.avro.ipc.Requestor.request(Requestor.java:147)
at org.apache.avro.ipc.Requestor.request(Requestor.java:129)
at org.apache.avro.ipc.specific.SpecificRequestor.invoke(SpecificRequestor.java:84)
at com.sun.proxy.$Proxy4.appendBatch(Unknown Source)
at org.apache.flume.api.NettyAvroRpcClient$2.call(NettyAvroRpcClient.java:348)
at org.apache.flume.api.NettyAvroRpcClient$2.call(NettyAvroRpcClient.java:344)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
... 1 more
Caused by: java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
at org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink$Boss.connect(NioClientSocketPipelineSink.java:496)
at org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink$Boss.processSelectedKeys(NioClientSocketPipelineSink.java:452)
at org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink$Boss.run(NioClientSocketPipelineSink.java:365)
... 3 more
2016-05-24 02:16:35,651 (SinkRunner-PollingRunner-FailoverSinkProcessor) [INFO - org.apache.flume.sink.AbstractRpcSink.createConnection(AbstractRpcSink.java:206)] Rpc sink k3: Building RpcClient with hostname: xxo09, port: 44445
2016-05-24 02:16:35,651 (SinkRunner-PollingRunner-FailoverSinkProcessor) [INFO - org.apache.flume.sink.AvroSink.initializeRpcClient(AvroSink.java:126)] Attempting to create Avro Rpc client.
2016-05-24 02:16:35,652 (SinkRunner-PollingRunner-FailoverSinkProcessor) [WARN - org.apache.flume.api.NettyAvroRpcClient.configure(NettyAvroRpcClient.java:634)] Using default maxIOWorkers
......