Storm--并行度
storm的并行度,其实就是让storm中的组件使用多线程来运行,正常情况下,每一个组件都是一个线程来运行的。
storm中的组件在运行的时候都会生成一些task(实例,还可以理解为new Spout或者new bolt)。这个task先要运行的话,需要在线程(executor)中运行,线程需要存在于进程(worker)内部。
简介
关系简介
- Worker进程存在于每个工作节点Supervisor中,一个Worker进程中可以含有一个或者多个Executor线程,每个Executor线程都会启动一个消息循环线程,用于接收、处理和发送消息,当Executor收到其下某一task的消息后,就会调用该Task对应的处理逻辑对消息进行处理;
- 1个topology可以有多个worker进程,1个worker进程只为1个topology服务。即1个worker进程执行的是1个topology的子集;
- 一个线程Executor,运行时只会运行一个task,如果有多个,循环执行,即其他的task出去等待状态;
- task,最终运行spout或bolt中代码的执行单元,即task可能是spout组件也有可能是bolt组件;
- 默认情况下:
1个supervisor
节点最多可以启动4个worker进程
,每1个topology
默认占用1个worker进程
,每个spout或者bolt
会占用1个executor
,每个executor
启动1个task
。
- 注意 : 在同一个线程中,如果有多个task,这些task一定是相同组件实例;
官网实例
上图拓扑中有2个 worker 进程。
- 蓝色的 BlueSpout 有2个 executor线程,每个 executor 有1个 task,并行度为 2;
- 绿色的 GreenBolt 有2个 executor线程,每个 executor 有2个 task,并行度为 2;
- 黄色的 YellowBolt 有6个 executor线程,每个 executor 有1个 task,并行度为 6;
- 拓扑的总并行度就是 2 + 2 + 6 = 10。具体分配到每个 worker 就有 10 / 2 = 5 个 executor。
提高并行度
- 并行度是基于线程数量来确定的,线程数被平均分配到Worker进程中。
- 提高storm并行度的方法:
- 最直接的就是提高某一个组件的
executor
线程数; - 从worker层面和task层面:
2.1.work
:可以把很多线程分配到多个worker进程中;
2.2.task
:提高task任务数量(并不能提高并行度),可以为后期进行弹性计算(rebalance)即后期动态调整某一组件的并行度。因为当topology提交到集群之后,task任务数目就不能改变了。
并行度设置
- 配置worker进程数量
默认一个从节点上可以启动 4 个worker进程(defaults.yaml)。自己可以在storm配置文件中配置,参数supervisor.slots.ports;
1
2
3
4
5supervisor.slots.ports: ##指定storm通讯端口,注意超线程
- 6701
- 6702
- 6703
- 6704默认一个topology只使用一个worker进程,可以通过代码
config.setNumWorkers(workers)
来设置使用多个worker进程;
2.1. 最好一台机器上的一个topology只使用一个worker,主要原因是减少了worker进程之间的数据传输(Netty);
2.2. 如果worker使用完的话再提交topology就不会执行,会处于等待状态;增加worker 是增加topology 计算能力的简单方法,spout 和bolt 组件都不需要做变更。
配置executor线程数量
默认情况下一个executor运行一个task,可以通过在代码中设置每个组件需要的执行线程数1
2builder.setSpout(id, spout, parallelism_hint); ##parallelism_hint设置spout的线程数量
builder.setBolt(id, bolt, parallelism_hint); ##parallelism_hint设置bolt的线程数量配置task数量
- task是通过 spout/bolt的声明中setNumTasks(num)设置对应spout/bolt的task个数,默认每个 executor 的 task 数为 1;
- executor的数量会小于等于task的数量(为了rebalance)。
实例
- 下面就一上图【Storm 官网蓝绿黄例子】做一个并行度设置,如下图所示:
弹性计算
- 从上面得知:
- 当
topology
提交到集群之后,task任务
数目就不能改变了(线程数可以变); - 因此弹性计算就需要提前给一个组件的线程创建多个task;
- 动态调整:
- topology提交到Storm集群中运行后,通过storm rebalance 命令可对topology进行动态调整。比如增加Topology的worker数,修改Bolt,Spout的并行执行数量 parallelism等,从而实现topology的动态调整,达到弹性计算的目的。
- 命令:
2.1.storm rebalance topology-name -n new-work-num
////调整指定topology的worknum;
2.2.storm rebalance topology-name -e component=parallelism
////调整指定topology中指定component的并行数量。
并行度设置多少
- 并行度设置多少合适?通过查阅资料大概总结如下:
- 单spout每秒大概可以发送500个tuple
- 单bolt每秒大概可以接收2000个tuple
- 单acker每秒大概可以接收6000个tuple
- 根据上面的指标可以根据当前业务的数据量对并行度进行动态调整。