Storm--DRPC

  DRPCDistributed RPC)分布式远程过程调用,Storm中的DRPC提供了集群中处理功能的访问接口。相当于集群向外暴露一个功能接口,用户可以在任何地方进行调用。DRPC的真正目的就是使用storm的实时并行计算功能。以一个输入流作为函数参数,以一个输出流的形式发射每个函数调用的结果。

DRPC介绍

Storm DRPC

  1. Storm是一个分布式实时处理框架,它支持以DRPC方式调用.可以理解为Storm是一个集群,DRPC提供了集群中处理功能的访问接口;
  2. DPRC是Storm提供的一套开发组建,使用DRPC可以极大的简化这一过程;
  3. Storm里面引入DRPC主要是利用storm的实时计算能力来并行化CPU intensive的计算。DRPC的storm topology以函数的参数流作为输入,而把这些函数调用的返回值作为topology的输出流;
  4. DRPC其实不能算是storm本身的一个特性,它是通过组合storm的原语spout,bolt, topology而成的一种模式(pattern);
  5. 本来应该把DRPC单独打成一个包的, 但是DRPC实在是太有用了,所以我们我们把它和storm捆绑在一起;
  6. 可以通过在Topoloye中的spout中建立一个TCP/HTTP监听来接收数据,在最后一个Bolt中将数据发送到指定位置也是可以的。

DRPC工作流程

DRPC工作流程

  1. 客户端给DRPC服务器发送要执行的方法的名字,以及这个方法的参数。
  2. 实现了这个函数的topology使用DRPCSpout从DRPC服务器接收函数调用流。每个函数调用被DRPC服务器标记了一个唯一的id。
  3. 这个topology然后计算结果,在topology的最后一个叫做ReturnResults的bolt会连接到DRPC服务器,并且把这个调用的结果发送给DRPC服务器(通过那个唯一的id标识)。
  4. DRPC服务器用那个唯一id来跟等待的客户端匹配上,唤醒这个客户端并且把结果发送给它。

DRPC使用

  • 一、启动Storm中的DRPC Server
  1. 修改Storm/conf/storm.yaml中的drpc server地址(所有节点都需要修改);
  2. storm drpc命令启动drpc server。
  • 二、创建一个DRPC 的Topology,提交到storm中运行,有两种方式创建:
  1. 直接使用 Storm 提供的LinearDRPCTopologyBuilder。 (不过该方法在0.82版本中显示为已过期,被trident取代)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    package com.xxo.drpc;

    import backtype.storm.Config;
    import backtype.storm.ILocalDRPC;
    import backtype.storm.LocalCluster;
    import backtype.storm.LocalDRPC;
    import backtype.storm.drpc.LinearDRPCTopologyBuilder;
    import backtype.storm.task.OutputCollector;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.base.BaseRichBolt;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Tuple;
    import backtype.storm.tuple.Values;

    import java.util.Map;
    /**
    * DRPC
    * Created by xiaoxiaomo on 2016/6/10.
    */
    public class LocalDrpcTopology {
    public static class MyBolt extends BaseRichBolt{

    private OutputCollector collector;
    @Override
    public void prepare(Map stormConf, TopologyContext context,
    OutputCollector collector) {
    this.collector = collector;
    }

    /**
    * 这个tuple中封装两个元素
    * 第一个元素:请求的id
    * 第二个元素:请求的参数
    */
    @Override
    public void execute(Tuple input) {
    String value = input.getString(1);
    value = "hello "+value;
    this.collector.emit(new Values(input.getValue(0),value));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("id","value"));
    }
    }

    /**
    * 这里使用本地调用
    */
    public static void main(String[] args) throws Exception {
    @SuppressWarnings("deprecation")
    //创建一个线性DRPC拓扑构建器
    LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("hello");
    builder.addBolt(new MyBolt());

    LocalCluster localCluster = new LocalCluster();
    String simpleName = local.class.getSimpleName();
    ILocalDRPC drpc = new LocalDRPC();
    local.submitTopology(simpleName, new Config(), builder.createLocalTopology(drpc));

    String result = drpc.execute("hello", "storm");
    System.out.println("客户端调用的结果:" + result);
    }
    }
  2. 直接使用 Storm 提供的通用TopologyBuilder。 不过需要自己手动加上开始的DRPCSpout和结束的ReturnResults。(LinearDRPCTopologyBuilder也是这样封装的)。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    TopologyBuilder builder = new TopologyBuilder();
    //开始的Spout
    DRPCSpout drpcSpout = new DRPCSpout("DrpcBuildTopology");
    builder.setSpout("drpc-input", drpcSpout, 5);

    //真正处理的Bolt
    builder.setBolt("cpp", new MyBolt(), 5).noneGrouping("drpc-input");

    //结束的ReturnResults
    builder.setBolt("return", new ReturnResults(),5).noneGrouping("cpp");

    StormSubmitter.submitTopology("DrpcBuildTopology", new Config(), builder.createTopology());

当前网速较慢或者你使用的浏览器不支持博客特定功能,请尝试刷新或换用Chrome、Firefox等现代浏览器