Spark--从RDD的角度来看Spark内部原理

  - 上篇博客主要讲了Spark的执行流程,看完后应该是对Spark有一个整体的了解,对Spark各个组件的工作流程都应该是有一个很清晰的认识了。
本篇博客,笔者主要是继续接着“宜人蜂巢内部分享”,从RDD的角度来看Spark的内部原理,包括以下内容:

  1. RDD为什么是Spark的核心概念
  2. 通过一个wordCount例子来看一看RDD
  3. RDD的管理与操作(算子)
  4. 常见的RDD操作有哪些(包括RDD的分类)
  5. RDD的依赖关系(DAG)
  6. RDD依赖关系的划分(stage)

RDD为什么是Spark的核心概念

  • Spark建立在统一抽象的RDD之上,使得Spark可以很容易扩展,比如 Spark Streaming、Spark SQL、Machine Learning、Graph都是在spark RDD上面进行的扩展(可以看见RDD的核心地位了吧)
  • RDD是什么呢?理解一下概念
  1. RDD(Resilient Distributed Dataset):弹性分布式数据集。
  2. RDD是只读的,由多个partition组成
  3. Partition分区,和Block数据块是一一对应的
    RDD 与 Partition、Block 的对应关系图

通过一个wordCount例子来看一看RDD

  • 上面只是RDD的概念,下面举个wordCount的例子来说明一下:
    1
    2
    3
    val file = sc.textFile("hdfs://data/test.txt")

    val data = file.flatMap(_.split(" ")).map((_,1)).reduceByKey(_ + _)

wordCount 例子,执行示意图

  • Spark 程序具体的流程图
    Spark 程序具体的流程图
  1. 客户端提交任务,初始化sparkContext之后,sc.textFile(“hdfs://“),去hdfs加载文件
  2. 加载的文件比如有300MB,在hdfs中就有3个block块,对应 RDD 里面的三个partition
  3. 加载的这个过程其实就是RDD的创建(MapPartitionsRDD)
  4. 数据被加载到不同的partition中,通过构建的stage(task set)然后提交到executor中去并行计算
  5. 计算完成后输出结果
  • 在整个spark计算流程里面,RDD起到了一些什么作用?

作用当然大了!可以看出整个计算流程都是基于RDD在做计算,从数据加载,即RDD的创建,中途的计算(stage的划分,RDD的操作,shuffle)。到最后结果的输出,整个计算流程都是由RDD在贯穿

写博客其实挺累的,太耗时了

继续吧,既然已经开始…..

RDD的管理与操作(算子)

  • RDD管理
  1. RDD是一个分布式数据集,即数据分布存储在多台机器上。从上面也可以看到,每个RDD的数据都以Block的形式存储于多台机器上
  2. 在Spark任务运行时
  3. Driver 节点的BlockManagerMaster保存Block的元数据,并且管理RDD与Block的关系。
  4. Executor 会启动一个BlockManagerSlave,管理Block数据并向BlockManagerMaster注册该Block
  5. 当RDD不再需要存储的时候,BlockManagerMaster将向BlockManagerSlave发送指令删除相应的Block。
    RDD管理
  • RDD操作
    RDD还提供了一组丰富的操作来操作这些数据,这种操作叫做算子。比如map、flatMap、filter、join、groupBy、reduceByKey等

  • RDD分类,分为创建算子、转换、缓存、执行

  1. Transformation:转换算子,这类转换并不触发提交作业,完成作业中间过程处理。
  2. Action:行动算子,这类算子会触发SparkContext提交Job作业。
    RDD操作

常见的RDD操作有哪些(包括RDD的分类)

  • 常见的算子,如下图所示:
    常见的算子

  • 具体的来看一下,例如map算子示例如下:

RDD的依赖关系(DAG)

  • RDD的依赖关系有两种:窄依赖(narrow dependency)和宽依赖(wide dependency)。
  1. 窄依赖每一个parent RDD的Partition最多被子RDD的一个Partition使用
  2. 宽依赖多个子RDD的Partition会依赖同一个parent RDD的Partition
    RDD的依赖关系--血统(左边窄依赖,右边宽依赖)
  • 依赖的具体实现,以及怎么去知道是窄依赖还是宽依赖?
  1. 所有的依赖都要实现trait Dependency[T]

    abstract class Dependency[T] extends Serializable {
        def rdd: RDD[T]
    }

  2. 窄依赖是有两种具体实现 OneToOneDependencyRangeDependency

    abstract class NarrowDependencyT extends Dependency[T] {
        def getParents(partitionId: Int): Seq[Int]
        override def rdd: RDD[T] = _rdd
    }

    //OneToOneDependency
    class OneToOneDependencyT extends NarrowDependencyT {
        override def getParents(partitionId: Int) = List(partitionId)

    //RangeDependency
    class RangeDependencyT extends NarrowDependencyT {

      override def getParents(partitionId: Int): List[Int] = {
        if (partitionId >= outStart && partitionId < outStart + length) {
          List(partitionId - outStart + inStart)
        } else {
          Nil
        }
      }
    }

  3. 宽依赖的实现只有一种:ShuffleDependency

    class ShuffleDependency[K, V, C] extends Dependency[Product2[K, V]] { … }

  4. 窄依赖|宽依赖,可以通过dependencies方法来查看,以上面的wordCount为例,可以看到res4(reduceByKey)为宽依赖
    RDD依赖关系的划分(stage)

RDD依赖关系的划分(stage)

  • RDD依赖关系的划分,RDD怎么被划分到一个stage里面?
  1. 就是通过窄依赖和宽依赖来划分stage的
  2. 如果是窄依赖就他们放在一个Stage里面,遇到宽依赖就断开划分为另外一个stage
  3. 如上面的workCount例子,就划分为了两个stage,在reduceByKey的时候断开了
  4. 如下图,遇到groupByKey断开,为一个stage1,map、union为窄依赖遇到join断开划分为stage2,其余划分为stage3
    RDD依赖关系的划分(stage)
  • 参考:
  1. http://blog.csdn.net/wangxiaotongfan/article/details/51395769
  2. http://www.cnblogs.com/zlslch/p/5723403.html

当前网速较慢或者你使用的浏览器不支持博客特定功能,请尝试刷新或换用Chrome、Firefox等现代浏览器