Storm--并行度

  storm的并行度,其实就是让storm中的组件使用多线程来运行,正常情况下,每一个组件都是一个线程来运行的。

Storm的并行度

  storm中的组件在运行的时候都会生成一些task(实例,还可以理解为new Spout或者new bolt)。这个task先要运行的话,需要在线程(executor)中运行,线程需要存在于进程(worker)内部。

简介

关系简介

  1. Worker进程存在于每个工作节点Supervisor中,一个Worker进程中可以含有一个或者多个Executor线程,每个Executor线程都会启动一个消息循环线程,用于接收、处理和发送消息,当Executor收到其下某一task的消息后,就会调用该Task对应的处理逻辑对消息进行处理;
  2. 1个topology可以有多个worker进程,1个worker进程只为1个topology服务。即1个worker进程执行的是1个topology的子集
  3. 一个线程Executor,运行时只会运行一个task,如果有多个,循环执行,即其他的task出去等待状态;
  4. task,最终运行spout或bolt中代码的执行单元,即task可能是spout组件也有可能是bolt组件
  5. 默认情况下: 1个supervisor节点最多可以启动4个worker进程,每1个topology默认占用1个worker进程每个spout或者bolt会占用1个executor每个executor启动1个task

Storm Supervisor

  • 注意 : 在同一个线程中,如果有多个task,这些task一定是相同组件实例;

  

官网实例

Storm 官网蓝绿黄例子
上图拓扑中有2个 worker 进程。

  1. 蓝色的 BlueSpout 有2个 executor线程,每个 executor 有1个 task,并行度为 2;
  2. 绿色的 GreenBolt 有2个 executor线程,每个 executor 有2个 task,并行度为 2;
  3. 黄色的 YellowBolt 有6个 executor线程,每个 executor 有1个 task,并行度为 6;
  4. 拓扑的总并行度就是 2 + 2 + 6 = 10。具体分配到每个 worker 就有 10 / 2 = 5 个 executor。

提高并行度

  • 并行度是基于线程数量来确定的,线程数被平均分配到Worker进程中。
  • 提高storm并行度的方法
  1. 最直接的就是提高某一个组件的executor线程数;
  2. 从worker层面和task层面:
    2.1. work:可以把很多线程分配到多个worker进程中;
    2.2. task:提高task任务数量(并不能提高并行度),可以为后期进行弹性计算(rebalance)即后期动态调整某一组件的并行度因为当topology提交到集群之后,task任务数目就不能改变了。

并行度设置

  • 配置worker进程数量
  1. 默认一个从节点上可以启动 4 个worker进程(defaults.yaml)。自己可以在storm配置文件中配置,参数supervisor.slots.ports;

    1
    2
    3
    4
    5
    supervisor.slots.ports: ##指定storm通讯端口,注意超线程
    - 6701
    - 6702
    - 6703
    - 6704
  2. 默认一个topology只使用一个worker进程,可以通过代码config.setNumWorkers(workers)来设置使用多个worker进程;
    2.1. 最好一台机器上的一个topology只使用一个worker,主要原因是减少了worker进程之间的数据传输(Netty);
    2.2. 如果worker使用完的话再提交topology就不会执行,会处于等待状态;

  3. 增加worker 是增加topology 计算能力的简单方法,spout 和bolt 组件都不需要做变更。

  • 配置executor线程数量
    默认情况下一个executor运行一个task,可以通过在代码中设置每个组件需要的执行线程数

    1
    2
    builder.setSpout(id, spout, parallelism_hint); ##parallelism_hint设置spout的线程数量
    builder.setBolt(id, bolt, parallelism_hint); ##parallelism_hint设置bolt的线程数量
  • 配置task数量

  1. task是通过 spout/bolt的声明中setNumTasks(num)设置对应spout/bolt的task个数,默认每个 executor 的 task 数为 1;
  2. executor的数量会小于等于task的数量(为了rebalance)。

实例

  • 下面就一上图【Storm 官网蓝绿黄例子】做一个并行度设置,如下图所示:
    Storm 官网蓝绿黄并行度设置

弹性计算

  • 从上面得知:
  1. topology提交到集群之后,task任务数目就不能改变了(线程数可以变);
  2. 因此弹性计算就需要提前给一个组件的线程创建多个task;
  • 动态调整:
  1. topology提交到Storm集群中运行后,通过storm rebalance 命令可对topology进行动态调整。比如增加Topology的worker数,修改Bolt,Spout的并行执行数量 parallelism等,从而实现topology的动态调整,达到弹性计算的目的。
  2. 命令:
    2.1. storm rebalance topology-name -n new-work-num ////调整指定topology的worknum;
    2.2. storm rebalance topology-name -e component=parallelism ////调整指定topology中指定component的并行数量。

并行度设置多少

  • 并行度设置多少合适?通过查阅资料大概总结如下:
  1. 单spout每秒大概可以发送500个tuple
  2. 单bolt每秒大概可以接收2000个tuple
  3. 单acker每秒大概可以接收6000个tuple
  4. 根据上面的指标可以根据当前业务的数据量对并行度进行动态调整。

当前网速较慢或者你使用的浏览器不支持博客特定功能,请尝试刷新或换用Chrome、Firefox等现代浏览器