Storm--DRPC
DRPC (Distributed RPC)分布式远程过程调用,Storm中的DRPC提供了集群中处理功能的访问接口。相当于集群向外暴露一个功能接口,用户可以在任何地方进行调用。DRPC的真正目的就是使用storm的实时并行计算功能。以一个输入流作为函数参数,以一个输出流的形式发射每个函数调用的结果。
DRPC介绍
- Storm是一个分布式实时处理框架,它支持以DRPC方式调用.可以理解为Storm是一个集群,DRPC提供了集群中处理功能的访问接口;
- DPRC是Storm提供的一套开发组建,使用DRPC可以极大的简化这一过程;
- Storm里面引入DRPC主要是利用storm的实时计算能力来并行化CPU intensive的计算。DRPC的storm topology以函数的参数流作为输入,而把这些函数调用的返回值作为topology的输出流;
- DRPC其实不能算是storm本身的一个特性,它是通过组合storm的原语spout,bolt, topology而成的一种模式(pattern);
- 本来应该把DRPC单独打成一个包的, 但是DRPC实在是太有用了,所以我们我们把它和storm捆绑在一起;
- 可以通过在Topoloye中的spout中建立一个TCP/HTTP监听来接收数据,在最后一个Bolt中将数据发送到指定位置也是可以的。
DRPC工作流程
- 客户端给DRPC服务器发送要执行的方法的名字,以及这个方法的参数。
- 实现了这个函数的topology使用DRPCSpout从DRPC服务器接收函数调用流。每个函数调用被DRPC服务器标记了一个唯一的id。
- 这个topology然后计算结果,在topology的最后一个叫做ReturnResults的bolt会连接到DRPC服务器,并且把这个调用的结果发送给DRPC服务器(通过那个唯一的id标识)。
- DRPC服务器用那个唯一id来跟等待的客户端匹配上,唤醒这个客户端并且把结果发送给它。
DRPC使用
- 一、启动Storm中的DRPC Server
- 修改Storm/conf/storm.yaml中的drpc server地址(所有节点都需要修改);
- storm drpc命令启动drpc server。
- 二、创建一个DRPC 的Topology,提交到storm中运行,有两种方式创建:
直接使用 Storm 提供的LinearDRPCTopologyBuilder。 (不过该方法在0.82版本中显示为已过期,被trident取代)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66package com.xxo.drpc;
import backtype.storm.Config;
import backtype.storm.ILocalDRPC;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.drpc.LinearDRPCTopologyBuilder;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.Map;
/**
* DRPC
* Created by xiaoxiaomo on 2016/6/10.
*/
public class LocalDrpcTopology {
public static class MyBolt extends BaseRichBolt{
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
}
/**
* 这个tuple中封装两个元素
* 第一个元素:请求的id
* 第二个元素:请求的参数
*/
@Override
public void execute(Tuple input) {
String value = input.getString(1);
value = "hello "+value;
this.collector.emit(new Values(input.getValue(0),value));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id","value"));
}
}
/**
* 这里使用本地调用
*/
public static void main(String[] args) throws Exception {
@SuppressWarnings("deprecation")
//创建一个线性DRPC拓扑构建器
LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("hello");
builder.addBolt(new MyBolt());
LocalCluster localCluster = new LocalCluster();
String simpleName = local.class.getSimpleName();
ILocalDRPC drpc = new LocalDRPC();
local.submitTopology(simpleName, new Config(), builder.createLocalTopology(drpc));
String result = drpc.execute("hello", "storm");
System.out.println("客户端调用的结果:" + result);
}
}直接使用 Storm 提供的通用TopologyBuilder。 不过需要自己手动加上开始的DRPCSpout和结束的ReturnResults。(LinearDRPCTopologyBuilder也是这样封装的)。
1
2
3
4
5
6
7
8
9
10
11
12TopologyBuilder builder = new TopologyBuilder();
//开始的Spout
DRPCSpout drpcSpout = new DRPCSpout("DrpcBuildTopology");
builder.setSpout("drpc-input", drpcSpout, 5);
//真正处理的Bolt
builder.setBolt("cpp", new MyBolt(), 5).noneGrouping("drpc-input");
//结束的ReturnResults
builder.setBolt("return", new ReturnResults(),5).noneGrouping("cpp");
StormSubmitter.submitTopology("DrpcBuildTopology", new Config(), builder.createTopology());
三、通过DRPCClient对Cluster进行访问
1
2
3DRPCClient client = new DRPCClient("192.168.1.171", 3772);
String result = drpcClient.execute("hello", "world");
System.out.println("服务端返回的结果:"+result);参考资料
http://storm.apache.org/releases/0.9.6/Distributed-RPC.html
http://www.dataguru.cn/article-5572-1.html