Storm--故障容忍和消息可靠性

  如果Storm集群中某个Worker挂了会怎样?Nimbus和Supervisor挂掉了又会怎样?Storm流式处理数据又是怎样保证每条数据都能完全被处理的呢?这将是本博客讨论的重点,Storm的故障容忍,以及Storm的消息可靠性Acker机制

故障容忍

概念

  • Worker进程不会因Nimbus或者Supervisor的挂掉而受到影响

    1. worker进程死掉: Supervisor会重启它。如果这个Worker连续在启动时失败,并且无法让Nimbus观察到它的心跳,Nimbus将这个Worker重新分配到另一台机器上。
    2. supervisor进程死掉: 这样不会影响之前已经提交的topology的运行,只是后期不会再向这个节点分配任务了。
    3. nimbus进程死掉: 这样不会影响之前已经提交的topology的运行,只是后期不能向集群中提交topology了。
  • Nimbus和Supervisor daemon进程,设计成快速失败(无论何时当遇到任何异常情况,将会执行自毁)和无状态(所有的状态都保存在Zookeeper或者磁盘上)。

  • Nimbus和Supervisor daemon进程,必须在监控下运行,如使用daemontools或者monit工具。
  • Nimbus是会有单点故障的问题,但Nimbus进程挂掉也不会引起任何灾难发生。

示例

  • 我这里有一个一主两从的节点,从节点主机名为xxo09、xxo10。然后向集群中提交了一个求和的topology,如下图所示:
    首先,向Storm集群提交了一个求和topology
    查看Storm UI,发现spout和bolt都在xxo10上
  • kill worker进程
    kill 掉xxo10上worker进程,自动在xxo09上启动了
    Worker总topology重新分配到xxo09机器上

  • kill supervisor进程
    kill 掉supervisor进程
    worker继续在工作

消息可靠性

上面我们讨论了,storm的故障容错,下面我们继续来看一看strom是如何保证消息的可靠性的。

可靠性

  1. 在Topology中,Spout通过SpoutOutputCollector的emit()方法发射一个tuple(源)即消息。而后经过在该Topology中定义的多个Bolt处理时,可能会产生一个或多个新的Tuple。源Tuple和新产生的Tuple便构成了一个Tuple树
  2. 当整棵Tuple树被处理完成,才算一个Tuple被完全处理,其中任何一个节点的Tuple处理失败或超时,则整棵Tuple树处理失败。
  3. Storm的消息可靠性 : 指的是storm保证每个tuple都能被toplology完全处理。而且处理的结果要么成功要么失败。出现失败的原因可能有两种即节点处理失败或者处理超时。
    1
    this.spoutOutputCollector.emit( new Values(i) );

可靠性设置

  • 如何开启:在spout中发射数据的时候,带上messageid,messageid和tuple中的元素的关系需要程序员自己维护

    1
    this.collector.emit(new Values("num"),messageid)
  • 如果对消息进去确认:这个工作其实是由acker线程进行追踪tuple的状态,

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    //当在bolt中对数据处理成功的时候,需要调用
    this.collector.ack(tuple) //这个时候,spout类中的ack方法会被调用。(这个方法是一个回调方法)

    //如果在bolt中处理tuple的时候,最后没有调用ack方法(前提是:开起来acker消息确认机制)
    //这个时候,当达到tuple的处理超时时间的时候,这个方法会被默认处理失败。这样spout类中的fail方法就会被调用。
    config.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 10); //设置超时时间


    //针对多个bolt的topology,需要把bolt发射出来的tuple作为上一级tuple的一个衍生tuple,
    //相当于把这些tuple组成一个tuple树,
    //此时,只有所有衍生的tuple都调用ack方法的时候,spout中的ack方法才会被调用。保证tuple被完全处理
    this.collector.emit(input,new Values(value));

acker示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package com.xxo.acker;

import backtype.storm.*;
import backtype.storm.generated.*;
import backtype.storm.spout.*;
import backtype.storm.task.*;
import backtype.storm.topology.*;
import backtype.storm.topology.base.*;
import backtype.storm.tuple.*;
import backtype.storm.utils.Utils;
import org.slf4j.*;

import java.util.Map;

/**
* 求和
* Created by xiaoxiaomo on 2016/6/6.
*/
public class AckerSumTopology {
private static final Logger logger = LoggerFactory.getLogger( AckerSumTopology.class ) ;
/**
* 自定义Spout
*/
public static class SumSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
int i = 0 ;
/**
* 该方法只会被调用一次
* 做一些初始化的操作
* @param map
* @param context
* @param collector
*/
public void open(Map map, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
i = 0 ;
}

/**
* 会不停的执行,像一个死循环
*/
public void nextTuple() {
i++;

//messageid和tuple是一一对应的
//可以认为messageid 是tuple里面的数据主键id
//messageid和tuple关系需要程序员自己去维护
//注意:只有在发spout中发射tuple的时候带上messageid,才说明开启了消息确认机制
this.collector.emit( new Values(i) , i );
Utils.sleep(1000);
}

/**
*
* @param declarer
*/
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare( new Fields("num"));
}

@Override
public void ack(Object msgId) {
System.out.println( "消息msgId:"+msgId + ",处理成功!" );
}

@Override
public void fail(Object msgId) {
System.out.println("消息msgId:" + msgId + ",处理失败!");
}
}

/**
* 自定义Bolt
*/
public static class SumBolt extends BaseRichBolt{
private OutputCollector collector;
/**
* 做初始化操作
* @param map
* @param context
* @param collector
*/
public void prepare(Map map, TopologyContext context, OutputCollector collector) {
this.collector = collector ;
}

/**
* 循环执行,当对应的spout有输出时,该方法就会被调用一次
* @param tuple
*/
public void execute(Tuple tuple) {

try {
System.out.println("求和:" + tuple.getIntegerByField("num"));
// if( tuple.getIntegerByField("num") > 10 ){ //test 手动抛出异常
// throw new RuntimeException("num > 10!") ;
// }
// outputCollector.ack( tuple );
} catch ( Exception e ){
collector.fail( tuple);
logger.error(" execute tuple error! " , e );
}

}
public void declareOutputFields(OutputFieldsDeclarer declarer) {}
}

public static void main(String[] args) {

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout_1", new SumSpout());
//通过shuffleGrouping,可以指定bolt接收哪个组件发射出来的数据
builder.setBolt("bolt_1",new SumBolt()).setNumTasks(5).shuffleGrouping("spout_1");

LocalCluster cluster = new LocalCluster();
Config conf = new Config();

//使用.setNumTasks(5) 预留task,方便后期进行rebalance

//设置超时
////在指定的时间内,如果bolt没有确认tuple处理成功,那么系统会默认这个tuple处理失败
conf.put( Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS , 10 ) ;

//集群
if( args != null && args.length > 0 ){
try {
StormSubmitter.submitTopology(
AckerSumTopology.class.getSimpleName() , conf , builder.createTopology() );
} catch (Exception e) {
e.printStackTrace();
}

}

//本地
else {
cluster.submitTopology( "cluster_1", conf ,builder.createTopology());
}
}
}

当前网速较慢或者你使用的浏览器不支持博客特定功能,请尝试刷新或换用Chrome、Firefox等现代浏览器