第4章 Spark物理执行计划

本章首先以一个典型的Spark应用为例,概览该应用对应的物理执行计划,然后讨论Spark物理执行计划生成方法,最后讨论常用数据操作生成的物理执行计划。

4.1 Spark物理执行计划概览

在第3章中,我们详细讨论了Spark应用生成的逻辑处理流程,本章我们讨论如何将应用的逻辑处理流程转化为物理执行计划,使得应用程序可以被分布执行。

Spark应用生成的逻辑处理流程多种多样,为了方便讨论,我们首先构建一个较为复杂但比较典型的逻辑处理流程,然后以该流程为例讨论如何将其转化为物理执行计划。如图4.1所示,我们构建了一个ComplexApplication应用,该应用包含map()partitionBy()union()join()等多种数据操作。

图4.1 ComplexApplication应用的逻辑处理流程

ComplexApplication应用的示例代码:

// 示例代码(假设)
val rdd1 = sc.parallelize(/* ... */)
val rdd2 = sc.parallelize(/* ... */)
val rdd3 = sc.parallelize(/* ... */)
val mappedRDD1 = rdd1.map(/* func */)
val mappedRDD2 = rdd2.map(/* func */)
val mappedRDD3 = rdd3.map(/* func */)
val unionRDD = mappedRDD2.union(mappedRDD3)
val shuffledRDD = mappedRDD1.partitionBy(new HashPartitioner(3))
val joinedRDD = shuffledRDD.join(unionRDD) // 内部生成CoGroupedRDD等
val resultRDD = joinedRDD.map(/* func */)
resultRDD.foreach(/* action */)

本章的核心问题是如何将逻辑处理流程转化为物理执行计划。MapReduce、Spark等大数据处理框架的核心思想是将大的应用拆分为小的执行任务,那么面对这么复杂的数据处理流程,Spark如何将其拆分为小的执行任务呢?

想法1:一个直观想法是将每个具体的数据操作作为一个执行阶段,也就是将前后关联的RDD组成一个执行阶段,图4.1中的每个箭头都生成一个执行任务。对于2个RDD聚合成1个RDD的情况(见图4.1中的ShuffledRDD、UnionRDD、CoGroupedRDD),将这3个RDD组成一个stage。这样虽然可以解决任务划分问题,但存在多个性能问题。第1个性能问题是会产生很多个任务,如图4.1中有36个箭头,会生成36个任务,当然我们可以对ShuffleDependency进行优化,将指向child RDD中同一个分区的箭头合并为一个task,使得一个task从parent RDD中的多个分区中获取数据,但是仍然会有多达21个任务。过多的任务不仅会增加调度压力,而且会产生第2个严重的性能问题,即需要存储大量的中间数据。一般来说,每个任务需要将执行结果存到磁盘或者内存中,这样方便下一个任务读取数据、进行计算。如果每个箭头都是计算任务的话,那么存储这些中间计算结果(RDD中的数据)需要大量的内存和磁盘空间,效率较低。

想法2:既然想法1中生成的任务过多会造成中间数据量过大,那么第2个想法是减少任务数量。仔细观察一下逻辑处理流程图会发现中间数据只是暂时有用的,中间数据(RDD)产生以后,只用于下一步计算操作(图4.1中的箭头),而下一步计算操作完成后,中间数据可以被删除。那么,一个大胆的想法是将这些计算操作串联起来,只用一个执行阶段来执行这些串联的多个操作,使得上一步操作在内存中生成的数据被下一步操作处理完后能够及时回收,减少内存消耗。

基于这个串联思想,接下来需要解决的两个问题分别是:

  1. 每个RDD包含多个分区,那么需要生成多少个任务计算?如图4.2所示,我们观察到RDD中每个分区的计算逻辑相同,可以独立计算,因此我们可以将每个分区上的操作串联为一个task,也就是为最后的MapPartitionsRDD的每个分区分配一个task。
  2. 如何串联操作?遇到复杂依赖关系(如ShuffleDependency)如何处理?因为某些操作,如cogroup()join()的输入数据(RDD)可以有多个,而输出数据(RDD)一般只有一个,所以我们将串联的顺序调整为从后往前。如图4.2中黑色粗箭头所示,从图中最后的MapPartitionsRDD开始向前串联,当遇到ShuffleDependency时,我们采用的处理方法是将该分区所依赖的上游数据(parent RDD)及操作都纳入一个task中。然而,这个方案仍然存在性能问题,当遇到ShuffleDependency时,task包含很多数据依赖和操作,导致划分出的task可能太大,而且会出现重复计算。例如,在图4.2中,从rdd2到UnionRDD所有的数据和操作都被纳入task0中,造成task0的计算量过大,而且其他task会重复处理这些数据,如使用虚线表示的task1仍然需要计算rdd2 UnionRDD中的数据。当然我们可以在task0计算完后缓存这些需要重复计算的数据,以便后续task的计算,但这样缓存会占用存储空间,而且会使得task0与其他task不能同时并行计算,降低了并行度。

图4.2 一种物理执行计划的生成方法

想法3:想法2的缺点是task会变得很大,降低了并行度。问题根源是遇到ShuffleDependency时会出现重复计算,不能有效地划分任务。既然ShuffleDependency包含复杂的多对多的依赖关系,导致任务划分困难,为何不对该ShuffleDependency关系进行划分呢?如将ShuffleDependency前后的计算逻辑分开,形成不同的计算阶段和任务,这样就不会出现task过大的情况。Spark实际上就是基于这个思想设计的,下面我们详细讨论Spark的划分方案。

4.2 Spark物理执行计划生成方法

1. 执行步骤

Spark具体采用3个步骤来生成物理执行计划:首先根据action()操作顺序将应用划分为作业(job),然后根据每个job的逻辑处理流程中的ShuffleDependency依赖关系,将job划分为执行阶段(stage),最后在每个stage中,根据最后生成的RDD的分区个数生成多个计算任务(task),具体如下所述。

(1)根据action()操作顺序将应用划分为作业(job)

这一步主要解决何时生成job,以及如何生成job逻辑处理流程的问题。当应用程序出现action()操作时,如resultRDD.action(),表示应用会生成一个job,该job的逻辑处理流程为从输入数据到resultRDD的逻辑处理流程。例如,在示例代码中,我们在join()之后使用了foreach()这一action()操作,因此,Spark会生成图4.3这样的逻辑处理流程(这里添加了输入数据的表示Data blocks,因为一般的job是从分布式文件系统读取数据的)。如果应用程序中有很多action()操作,那么Spark会按照顺序为每个action()操作生成一个job,每个job的逻辑处理流程也都是从输入数据到最后action()操作的。

(2)根据ShuffleDependency依赖关系将job划分为执行阶段(stage)

对于每个job,从其最后的RDD(图4.3中连接results的MapPartitionsRDD)往前回溯整个逻辑处理流程,如果遇到NarrowDependency,则将当前RDD的parent RDD纳入,并继续往前回溯。当遇到ShuffleDependency时,停止回溯,将当前已经纳入的所有RDD按照其依赖关系建立一个执行阶段,命名为stage i。如图4.3所示,首先从results之前的MapPartitionsRDD开始向前回溯,回溯到CoGroupedRDD时,发现其包含两个parent RDD,其中一个是UnionRDD。因为CoGroupedRDD与UnionRDD的依赖关系是ShuffleDependency,对其进行划分,并继续从CoGroupedRDD的另一个parent RDD回溯,回溯到ShuffledRDD时,同样发现了ShuffleDependency,对其进行划分得到了一个执行阶段stage 2。接着从stage 2之前的UnionRDD开始向前回溯,由于都是NarrowDependency,将一直回溯到读取输入数据的RDD2和RDD3中,形成stage 1。最后,只剩余RDD1成为一个stage 0。

图4.3 ComplexApplication应用生成的物理执行图

(3)根据分区计算将各个stage划分为计算任务(task)

执行第2步后,我们可以发现整个job被划分成了大小适中(相比想法2中的划分方法)、逻辑分明的执行阶段stage。接下来的问题是如何生成计算任务。我们之前的想法是每个分区上的计算逻辑相同,而且是独立的,因此每个分区上的计算可以独立成为一个task。Spark也采用了这个策略,根据每个stage中最后一个RDD的分区个数决定生成task的个数。如在图4.3的stage 2中,最后一个MapPartitionsRDD的分区个数为3,那么stage 2就生成3个task。如图4.3中粗箭头所示,在stage 2中,每个task负责ShuffledRDD CoGroupedRDD MapPartitionsRDD MapPartitionsRDD中一个分区的计算。同样,在stage 1中生成4个task,前2个task负责Data blocks RDD2 MapPartitionsRDD UnionRDD中2个分区的计算,后2个task负责Data blocks RDD3 UnionRDD中2个分区的计算。在stage 0中,生成3个task,负责Data blocks RDD1的计算。

2. 相关问题

经过以上3个步骤,Spark可以将一个应用的逻辑处理流程划分为多个job,每个job又可以划分为多个stage,每个stage可以生成多个task,而同一个阶段中的task可以同时分发到不同的机器并行执行。看起来已经很完美了,但还有3个执行方面的问题:一个应用生成了多个job、stage和task,如何确定它们的计算顺序?task内部如何存储和计算中间数据?前后stage中的task间如何传递和计算数据?

(1)job、stage和task的计算顺序

job的提交时间与action()被调用的时间有关,当应用程序执行到rdd.action()时,就会立即将rdd.action()形成的job提交给Spark。job的逻辑处理流程实际上是一个DAG图,经过stage划分后,仍然是DAG图形状。每个stage的输入数据要么是job的输入数据,要么是上游stage的输出结果。因此,计算顺序从包含输入数据的stage开始,从前到后依次执行,仅当上游的stage都执行完成后,再执行下游的stage。在图4.3中,stage 0和stage 1由于都包含了job的输入数据,两者都可以先开始计算,仅当两者都完成后,stage 2才开始计算。stage中每个task因为是独立而且同构的,可以并行运行没有先后之分。

(2)task内部数据的存储与计算问题(流水线计算)

讨论完job、stage、task的执行顺序后,我们聚焦在task内部,讨论task如何计算每个RDD中的数据。在想法2中,我们提出的解决方案是每计算出一个中间数据(也就是RDD中的一个分区)就将其存放到内存中,等下一个操作处理完成并生成新的RDD中的一个分区后,回收上一个RDD在内存中的数据。这种方案虽然可以减少内存空间,但当某个RDD中一个分区个数太多时,仍然会占用大量内存。为了解决这个问题,进一步观察RDD分区之间的关系,发现上游分区包含的record和下游分区包含的record之间经常存在一对一的数据依赖关系。例如,在图4.4中的第1个计算模式(pattern)中,f()g()函数每次读取一个record,计算并生成一个新record,这类型的操作包括map()filter()等。也就是说,f()函数在计算record1的时候,并不需要知道record2是什么,同样,g()函数在计算record1′的时候也只需要f()函数输出record1′,并不需要f()函数此时就计算出record2′。因此,在第1个pattern中执行f()g()函数的时候,可以采用以下步骤进行“流水线”式的计算。

  • 读取record1 f(record1) record1′ g(record1′) 输出record1′′
  • 读取record2 f(record2) record2′ g(record2′) 输出record2′′
  • 读取record3 f(record3) record3′ g(record3′) 输出record3′′

“流水线”式计算的好处是可以有效地减少内存使用空间,在task计算时只需要在内存中保留当前被处理的单个record即可,不需要保存其他record或者已经被处理完的record。例如,在第1个pattern中,没有必要在执行f(record1)之前将record2和record3提前算出来放入内存中。

图4.4 task内部中间数据的4种计算模式,即根据上游数据计算下游数据的不同方式

对于其他类型的操作,是否还可以采用“流水线”式计算呢?第2个pattern中的g()函数、第3个pattern中的f()函数、第4个pattern中的f()g()函数都需要一次性读取上游分区中所有的record来计算。这样的函数主要出现在mapPartitions(func f)zipPartitions(func g)等操作中。举个例子,f()g()函数可以是第3章中介绍的mapPartitions(func),具体代码如下,其中iter是读取上游分区中record的迭代器。

// 示例:mapPartitions(func) 的简化示意
val resultRDD = rdd.mapPartitions { iter =>
  // 函数内部可能需要一次性读取所有record
  iter.map(/* ... */)
}

在第2个pattern中,由于f()函数仍然是一一映射的,所以仍然可以采用“流水线”式计算,计算流程如下:

  1. 读取record1 f(record1) record1′ g(record1′) record1′进入g()函数中的iter.next()进行计算 g()函数将计算结果存入g()函数中的list。
  2. 读取record2 f(record2) record2′ g(record2′) record2′进入g()函数中的iter.next()进行计算 g()函数将计算结果存入g()函数中的list。
  3. 读取record3 f(record3) record3′ g(record3′) record3′进入g()函数中的iter.next()进行计算 g()函数将计算结果存入g()函数中的list。
  4. g()函数一条条输出list中的record。

从计算流程可以看到,f()函数每生成一条数据,都进入类似上面mapPartitions()的例子g()函数的iter.next()中进行计算,g()函数需要在内存中保存这些中间计算结果,并在输出时将中间结果依次输出。当然,有些g()函数逻辑简单,不需要使用数据结构来保存中间结果,如求record的max值,只需要保存当前最大的record即可。

在第3个pattern中,由于f()函数需要将[record1,record2,record3]都算出后才能计算得到[record1′,record2′,record3′],因此会先执行f()函数,完成后再计算g()函数。实际的执行过程:首先执行f()函数算出[record1′,record2′,record3′],然后使用g()函数依次计算g(record1′) => record1′′g(record2′) => record2′′g(record3′) => record3′′。也就是说,f()函数的输出结果需要保存在内存中,而g()函数计算完每个record′并得到record′′后,可以对record′进行回收。

在第4个pattern中,计算顺序仍然是从前到后,但不能进行record的“流水线”式计算。与第3个pattern类似,f()函数需要一次性读取[record1,record2,record3]后才能算出[record1′,record2′,record3′],同样,g()函数需要一次性读取[record1′,record2′,record3′]且计算后才能输出[record1′′,record2′′,record3′′]。这两个函数只是依次执行,“流水线”式计算退化到“计算-回收”模式:每执行完一个操作,回收之前的中间计算结果。

总结

Spark采用“流水线”式计算来提高task的执行效率,减少内存使用量。这也是Spark可以在有限内存中处理大规模数据的原因。然而,对于某些需要聚合中间计算结果的操作,还是需要占用一定的内存空间,也会在一定程度上影响流水线计算的效率。

(3)task间的数据传递与计算问题

讨论了task内部数据计算后,还有一个问题是不同stage之间的task如何传递数据进行计算。回顾一下,stage之间存在的依赖关系是ShuffleDependency,而ShuffleDependency是部分依赖的,也就是下游stage中的每个task需要从parent RDD的每个分区中获取部分数据。ShuffleDependency的数据划分方法包括Hash划分、Range划分等,也就是要求上游stage预先将输出数据进行划分,按照分区存放,分区个数与下游task的个数一致,这个过程被称为“Shuffle Write”。按照分区存放完成后,下游的task将属于自己分区的数据通过网络传输获取,然后将来自上游不同分区的数据聚合在一起进行处理,这个过程被称为“Shuffle Read”。总的来说,不同stage的task之间通过Shuffle Write + Shuffle Read传递数据,至于如何具体进行Shuffle操作,分区数据是写内存还是磁盘,如何对这些数据进行聚合,这些问题将在后续章节中详细介绍。

3. stage和task命名方式

3. stage和task命名方式

在MapReduce中,stage只包含两类:map stage和reduce stage,map stage中包含多个执行map()函数的任务,被称为map task;reduce stage中包含多个执行reduce()函数的任务,被称为reduce task。而在Spark中,stage可以有多个,有些stage既包含类似reduce()的聚合操作又包含map()操作,所以一般不区分是map stage还是reduce stage,而直接使用stage i来命名,只有当生成的逻辑处理流程类似MapReduce的两个执行阶段时,我们才会依据习惯区分map/reduce stage。虽然在Spark中一般不区分map/reduce stage,但可以对stage中的task使用不同的命名,如果task的输出结果需要进行Shuffle Write,以便传输给下一个stage,那么这些task被称为ShuffleMapTasks;而如果task的输出结果被汇总到Driver端或者直接写入分布式文件系统,那么这些task被称为ResultTasks。如图4.3所示,stage 0和stage 1中的task是ShuffleMapTasks,stage 2中的task是ResultTasks,直接输出结果。

4. 快速了解一个应用的物理执行计划

虽然我们理解了Spark生成物理执行计划的过程,但面对一个新的应用时,如何快速知道该应用会产生哪些stage?每个stage包含多少个task?答案是我们可以利用Spark UI界面提供的信息快速分析出Spark应用的物理执行图,具体步骤如下所述。

(1)查看job信息

例如,在ComplexApplication的用户代码中,只包含一个action()操作(foreach()操作),因此只生成一个job。我们可以在Spark job界面看到foreach()生成的job信息,如图4.5所示。

图4.5 ComplexApplication应用生成的job信息

(2)查看job包含的stage

从Details for job 0界面可以看到该job包含3个stage,如图4.6所示,其中stage 0包含3个task,共Shuffle Write了376.0B,stage 1包含4个task,共Shuffle Write了988.0B,而stage 2包含3个task,一共Shuffle Read了1364.0B=376.0B+988.0B。

图4.6 ComplexApplication应用生成的stage信息

还可以单击“DAG Visualization”查看stage之间的数据依赖关系,如图4.7所示。

图4.7 ComplexApplication应用中stage之间的依赖关系

图4.7展示了ComplexApplication应用的job被划分为3个stage,每个stage包含一个或多个数据操作,每个黑色实心圆圈代表一个RDD。但这个图稍显混乱,stage 0中parallelize操作生成的RDD应该是被stage 2中的partitionBy处理的,与stage 1中的parallelize无关,也就是stage 0到stage 2的横箭头不应该贯穿stage 1。如果想进一步了解黑色实心圆圈代表哪些RDD,则可以进入stage的UI界面或者直接单击图4.7中的stage,最终可以看到图4.8这样的stage结构图,可以看到每个stage的信息与我们之前分析的一致。与图4.3不同的是,图4.8详细展示了每个操作会生成哪些RDD(如join()操作生成了CoGroupedRDD及两个MapPartitionsRDD),但没有展示stage之间的连接关系。

图4.8 ComplexApplication应用中每个stage包含的操作及生成的RDD信息

(3)查看每个stage包含的task

进入Details for stage i的界面可以看到每个stage包含的task信息。如图4.9所示,stage 0包含3个task,每个task都进行了Shuffle Write,写入了2~3个record,也就是说Spark UI中也会统计Shuffle Write/Read的record数目。

图4.9 ComplexApplication应用中stage 0包含的task信息

如图4.10所示,stage 1包含4个task,每个task都进行了Shuffle Write,写入了2个record。

图4.10 ComplexApplication应用中stage 1包含的task信息

如图4.11所示,stage 2包含3个task,每个task从上游的stage 0/1那里Shuffle Read了5~6个record。

图4.11 ComplexApplication应用中stage 2包含的task信息

4.3 常用数据操作生成的物理执行计划

4.2节我们讨论了Spark划分stage和task的一般原则,那么具体到每个常用的数据操作,如何进行划分呢?我们首先回顾一下NarrowDependencyShuffleDependency数据依赖关系,然后探讨相关的stage划分原则,最后挑选一些具有代表性的数据操作来详细说明。

先回顾一下数据依赖关系,宽依赖(ShuffleDependency)和窄依赖(NarrowDependency)的区别是child RDD的各个分区中的数据是否完全依赖其parent RDD的一个或者多个分区。完全依赖指parent RDD中的一个分区不需要进行划分就可以流入child RDD的分区中。如果是完全依赖,那么数据依赖关系是窄依赖。如果是不完全依赖,也就是parent RDD的一个分区中的数据需要经过划分(如HashPartition或者RangePartition)后才能流入child RDD的不同分区中,那么数据依赖关系是宽依赖。

再看一下NarrowDependencyShuffleDependency的stage划分原则,如图4.12所示。对于NarrowDependency,parent RDD和child RDD的分区之间是完全依赖的,我们可以将parent RDD和child RDD直接合并为一个stage。在合成的stage中(图4.12中为stage 0),对于OneToOneDependency,每个task读取parent RDD中的一个分区,并计算child RDD中的一个分区。对于ManyToOneDependency或者ManyToManyDependency,每个task读取parent RDD中多个分区,并计算出child RDD中的一个分区。对于ShuffleDependency,如图4.13所示,将parent RDD和child RDD进行划分,形成两个或多个stage,每个stage产生多个task,stage之间通过Shuffle WriteShuffle Read来传递数据。下面,我们将常用数据操作归类到这几种依赖关系中,并举例详细介绍其物理执行计划。

图4.12 NarrowDependency的stage划分原则

示意图描述:显示一个parent RDD(包含多个分区)通过窄依赖(如OneToOne、ManyToOne、ManyToMany)连接到child RDD,共同形成一个stage(标记为stage 0)。每个task负责从parent分区读取并计算child分区。

graph LR
    subgraph Stage0
        A[Parent RDD partitions] --> B[Child RDD partitions]
    end
    style A fill:#f9f,stroke:#333
    style B fill:#ccf,stroke:#333

图4.13 ShuffleDependency的stage划分原则

示意图描述:显示两个独立的stage(stage 0 和 stage 1)。stage 0中的task通过Shuffle Write输出数据,stage 1中的task通过Shuffle Read获取数据。中间以Shuffle Dependency分隔。

graph LR
    subgraph Stage0
        A[Parent RDD partitions] -- Shuffle Write --> B[Shuffled data]
    end
    subgraph Stage1
        C[Child RDD partitions] -- Shuffle Read --> B
    end
    style A fill:#f9f,stroke:#333
    style C fill:#ccf,stroke:#333

OneToOneDependency类型的操作

图4.14展示了flatMap()mapPartitionsWithIndex()操作的stage和task划分图。这两个操作都生成了一个stage,stage中不同颜色的箭头表示不同的task。每个task负责处理一个分区,进行流水线计算,且计算逻辑清晰。这两个操作唯一不同的是flatMap()每读入一条record就处理和输出一条,而mapPartitionsWithIndex()等到全部record都处理完后再输出record。图4.14右图中的mapPartitionsWithIndex()是计算每个分区中奇数的和及偶数的和。

图4.14 OneToOneDependency数据依赖划分举例

示意图描述:左侧为flatMap操作,parent RDD的一个分区输入给task,输出多条record;右侧为mapPartitionsWithIndex操作,相同结构,但task内部先处理整个分区再输出。两个操作都属于OneToOneDependency,合并为一个stage。

graph LR
    subgraph Stage
        P1[Parent Partition] --> T1[Task: flatMap]
        T1 --> C1[Child Partition]
        P2[Parent Partition] --> T2[Task: flatMap]
        T2 --> C2[Child Partition]
    end
    style P1 fill:#f9f
    style P2 fill:#f9f

RangeDependency类型的操作

图4.15展示了在一般情况下union()操作的stage和task划分图。该操作将两个RDD合并为一个RDD,只生成了一个stage,stage中不同颜色的箭头表示不同的task,每个task负责处理一个分区。

图4.15 RangeDependency数据依赖划分举例

示意图描述:两个parent RDD(各有分区),通过union操作合并为一个child RDD,每个task读取一个parent分区(来自任一parent)并输出到对应child分区。属于RangeDependency,合并为一个stage。

graph LR
    subgraph Stage
        A1[Parent A Partition 1] --> T1[Task 1]
        A2[Parent A Partition 2] --> T2[Task 2]
        B1[Parent B Partition 1] --> T1
        B2[Parent B Partition 2] --> T2
        T1 --> C1[Child Partition 1]
        T2 --> C2[Child Partition 2]
    end
    style A1 fill:#f9f
    style A2 fill:#f9f
    style B1 fill:#ccf
    style B2 fill:#ccf

ManyToOneDependency类型的操作

如图4.16所示,coalesce(shuffle=false)、特殊情况下的union()(见第3章的说明),以及zipPartitions()操作对应的数据依赖关系都是ManyToOneDependency,child RDD中的每个分区需要从parent RDD中获取所依赖的多个分区的全部数据。由于ManyToOneDependency是窄依赖,所以Spark将parent RDD和child RDD组合为一个stage,该stage生成的task个数与最后的RDD的分区个数相等。与OneToOneDependency形成的task相比,这里每个task需要同时在parent RDD中获取多个分区中的数据。

图4.16 ManyToOneDependency数据依赖划分举例

示意图描述:多个parent分区(来自一个或多个parent RDD)被合并到一个child分区。例如,coalesce将三个parent分区合并为一个child分区,形成一个task。属于ManyToOneDependency,合并为一个stage。

graph LR
    subgraph Stage
        A1[Parent Partition 1] --> T1[Task]
        A2[Parent Partition 2] --> T1
        A3[Parent Partition 3] --> T1
        T1 --> C1[Child Partition]
    end
    style A1 fill:#f9f
    style A2 fill:#f9f
    style A3 fill:#f9f

ManyToManyDependency类型的操作

如图4.17所示,cartesian()操作对应的数据依赖关系是ManyToManyDependency,child RDD中的每个分区需要从两个parent RDD中获取所依赖的分区的全部数据。虽然ManyToManyDependency形似ShuffleDependency,却属于NarrowDependency,因此Spark将parent RDD和child RDD组合为一个stage,该stage生成的task个数与最后的RDD的分区个数相等。与ManyToOneDependency形成的task相比,这里每个task需要同时在多个parent RDD中获取分区中的数据。

图4.17 ManyToManyDependency数据依赖划分举例

示意图描述:两个parent RDD(RDD A和RDD B)各有多个分区,通过cartesian操作生成child RDD,每个child分区由A的一个分区和B的一个分区笛卡尔乘积得到。一个task负责读取A的一个分区和B的一个分区,生成child的一个分区。属于ManyToManyDependency,合并为一个stage。

graph LR
    subgraph Stage
        A1[Parent A Partition 1] --> T1[Task 1]
        B1[Parent B Partition 1] --> T1
        T1 --> C1[Child Partition 1-1]
        A2[Parent A Partition 2] --> T2[Task 2]
        B2[Parent B Partition 2] --> T2
        T2 --> C2[Child Partition 2-2]
    end
    style A1 fill:#f9f
    style A2 fill:#f9f
    style B1 fill:#ccf
    style B2 fill:#ccf

单一ShuffleDependency类型的操作

如图4.18所示,aggregateByKey()sortByKey()操作形成的是单一的ShuffleDependency数据依赖关系,也就是只与一个parent RDD形成ShuffleDependency。根据划分原则,Spark将parent RDD和child RDD分开,分别形成一个stage,每个stage中的task个数与该stage中最后一个RDD中的分区个数相等。为了进行跨stage的数据传递,上游stage中的task将输出数据进行Shuffle Write,child stage中的task通过Shuffle Read同时获取parent RDD中多个分区中的数据。与NarrowDependency不同,这里从parent RDD的分区中获取的数据是划分后的部分数据。

图4.18 单一ShuffleDependency数据依赖划分举例

示意图描述:一个parent RDD分为两个stage。Stage 0包含parent RDD的分区,每个task进行Shuffle Write;Stage 1包含child RDD的分区,每个task通过Shuffle Read从所有上游分区获取划分后的数据。

graph LR
    subgraph Stage0
        P1[Parent Partition 1] -- Shuffle Write --> SW1[Shuffle data]
        P2[Parent Partition 2] -- Shuffle Write --> SW2[Shuffle data]
    end
    subgraph Stage1
        C1[Child Partition 1] -- Shuffle Read --> SW1 & SW2
        C2[Child Partition 2] -- Shuffle Read --> SW1 & SW2
    end
    style P1 fill:#f9f
    style P2 fill:#f9f
    style C1 fill:#ccf
    style C2 fill:#ccf

多ShuffleDependency类型的操作

如图4.19所示,join()操作在不同配置下会生成多种不同类型的数据依赖关系。

  • 图4.19(d)中,由于rdd1rdd2CoGroupedRDD具有相同的partitioner,parent RDD和child RDD之间只存在窄依赖ManyToOneDependency,因此只形成一个stage。
  • 图4.19(b)、图4.19(c)都同时包含OneToOneDependencyShuffleDependency,根据Spark的stage划分原则,只对ShuffleDependency进行划分,得到两个stage,stage 1中的task既需要读取上游stage中的多个分区中的数据,也需要处理通过OneToOneDependency连接的RDD中的数据。
  • 图4.19(a)最复杂,包含了多个ShuffleDependency,依据Spark的划分原则,需要对多个ShuffleDependency都进行划分,得到多个stage(这里划分出3个stage)。下游stage需要等待上游stage完成后再执行,Shuffle Read获取上游stage的输出数据。

图4.19 多ShuffleDependency数据依赖划分举例

示意图描述:四张子图(a)(b)(c)(d)表示不同配置下join操作的stage划分结果。

graph TB
    subgraph "(a) 多个ShuffleDependency -> 3个stage"
        Stage0_a[Stage 0: parent RDDs]
        Stage1_a[Stage 1: shuffle 1]
        Stage2_a[Stage 2: shuffle 2 + join]
        Stage0_a --Shuffle Write--> Stage1_a
        Stage1_a --Shuffle Read--> Stage2_a
    end
    subgraph "(b) 一个Shuffle + OneToOne -> 2个stage"
        Stage0_b[Stage 0: parent + narrow]
        Stage1_b[Stage 1: shuffle + narrow]
        Stage0_b --Shuffle Write--> Stage1_b
    end
    subgraph "(c) 类似(b)"
        Stage0_c[Stage 0: parent + narrow]
        Stage1_c[Stage 1: shuffle + narrow]
        Stage0_c --Shuffle Write--> Stage1_c
    end
    subgraph "(d) 只有窄依赖 -> 1个stage"
        Stage0_d[Stage 0: all narrow dependencies]
    end
    style Stage0_a fill:#f9f
    style Stage1_a fill:#ccf
    style Stage2_a fill:#ffc

4.4 本章小结

本章主要讨论了Spark将逻辑处理流程转化为物理执行计划的一般过程及其典型实例。至此,给定一个Spark应用,读者可以分析出其逻辑处理流程和物理执行计划。然而,如果想深入理解Spark,还需要探究很多细节问题,如task如何在分布式集群中运行,Shuffle机制如何设计与实现,一些更复杂的应用(如迭代应用)的逻辑处理流程和物理执行计划有什么不同,这些问题我们会在后续章节中详细讨论。

4.5 扩展阅读

第3章及本章介绍的逻辑处理流程和物理执行计划是完全根据用户定义的RDD操作顺序和数据依赖关系生成的。这样存在的一个问题是用户自己定义的数据处理流程可能不是最优的,如将用户代码rdd1.join(rdd2).map(func)中的操作调整顺序后形成的rdd1.map(func).join(rdd2)的执行效率可能更高;再如在某个数据集上先进行分区再进行join()的效率要比直接进行join()的效率高,然而一般用户并不清楚操作顺序对性能的影响。

实际上,为了优化数据处理流程,尤其是SQL数据处理流程,数据库领域已经有很多相关的优化工作,包括基于规则的优化、基于性能模型的优化和基于自适应的优化等。Spark SQL引擎也将这些优化技术引入了用户代码的逻辑处理流程和物理执行计划转化过程中。Spark SQL使用基于规则的优化技术,如谓词下推、算子组合、常量折叠等技术来对逻辑处理流程进行优化;使用基于性能模型的优化技术选择最优的物理执行计划;使用基于自适应的优化执行技术,根据应用运行时的信息来动态调整执行计划(包括自动Shuffle分区个数的确定、数据倾斜的处理等),提高执行效率。读者可以进一步阅读Spark SQL论文和相关技术文档来了解更多的优化技术。