第3章 Spark逻辑处理流程
本章主要介绍Spark是如何将应用程序转化为逻辑处理流程的,包括RDD数据模型概念、数据操作概念,以及数据依赖关系的建立规则等。本章还将详细介绍常用的数据操作,不仅给出相关的示例代码,还会详细给出其逻辑处理流程图、探讨相关的性能问题,为下一章讨论物理执行计划做准备。
3.1 Spark逻辑处理流程概览
在第2章中,我们了解了Spark在运行应用前,首先需要将应用程序转化为逻辑处理流程(Logical plan)。这一章我们将详细讨论这个转化过程。为了解释一些概念,我们假设Spark已经为一个典型应用生成了逻辑处理流程,如图3.1所示。图3.1表示了从数据源开始经过了哪些处理步骤得到最终结果,还有中间数据及其依赖关系。
这个典型的逻辑处理流程主要包含四部分。
(1)数据源(Data blocks):数据源表示的是原始数据,数据可以存放在本地文件系统和分布式文件系统中,如HDFS、分布式Key-Value数据库(HBase)等。在IntelliJ IDEA中运行单机测试时,数据源可以是内存数据结构,如list(1,2,3,4,5);对于流式处理来说,数据源还可以是网络流等。这里我们只讨论批式处理,所以限定数据源是静态数据。
图3.1 一个典型的Spark逻辑处理流程(因文本提取无法显示图片,此处保留文字描述:从数据源(Data blocks)开始,经过一系列RDD转换(如map、filter等),最终到达action操作,得到结果result,并发送到Driver端执行f()函数。中间涉及多个RDD及其分区,以及窄依赖和宽依赖关系。)
(2)数据模型:确定了数据源后,我们需要对数据进行操作处理。首要问题是如何对输入/输出、中间数据进行抽象表示,使得程序能够识别处理。在使用普通的面向对象程序(C++/Java程序)处理数据时,我们将数据抽象为对象(object),如doubleObject = new Double(2.0)、listObject = new ArrayList()。然后,我们可以在对象上定义数据操作,如doubleObject.longValue()可以将数据转化为long类型,listObject.add(i, Value)可以在list的第i个位置插入一个元素Value。Hadoop MapReduce框架将输入/输出、中间数据抽象为<K, V> record,这样map()/reduce()按照<K, V> record的形式读入并处理数据,最后输出为<K, V> record形式。这种数据表示方式的优点是简单易操作,缺点是过于细粒度。没有对这些<K, V> record进行更高层的抽象,导致只能使用map(K, V)这样的固定形式去处理数据,而无法使用类似面向对象程序的灵活数据处理方式,如records.operation()。
Spark认知到了这个缺点,将输入/输出、中间数据抽象表示为统一的数据模型(数据结构),命名为RDD(Resilient Distributed Datasets)。每个输入/输出、中间数据可以是一个具体的实例化的RDD,如第2章介绍的ParallelCollectionRDD等。RDD中可以包含各种类型的数据,可以是普通的Int、Double,也可以是<K, V> record等。RDD与普通数据结构(如ArrayList)的主要区别有两点:
- RDD只是一个逻辑概念,在内存中并不会真正地为某个RDD分配存储空间(除非该RDD需要被缓存)。RDD中的数据只会在计算中产生,而且在计算完成后就会消失,而ArrayList等数据结构常驻内存。
- RDD可以包含多个数据分区,不同数据分区可以由不同的任务(task)在不同节点进行处理。
(3)数据操作:定义了数据模型后,我们可以对RDD进行各种数据操作,Spark将这些数据操作分为两种:transformation()操作和action()操作。两者的区别是action()操作一般是对数据结果进行后处理(post-processing),产生输出结果,而且会触发Spark提交job真正执行数据处理任务(在下一章中详细介绍)。transformation()操作和action()操作的使用方式分别为rdd.transformation()和rdd.action(),如rdd2 = rdd1.map(func)表示对rdd1进行map()操作得到新的rdd2;还有二元操作,如rdd3 = rdd1.join(rdd2)表示对rdd1和rdd2进行join()操作得到rdd3。这里读者可能会问一个问题:为什么操作叫作transformation()?transformation这个词隐含了一个意思是单向操作,也就是rdd1使用transformation()后,会生成新的rdd2,而不能对rdd1本身进行修改。在普通C++/Java程序中,我们既可以对ArrayList上的数据进行统计分析再生成新的ArrayList,也可以对ArrayList中的数据进行修改,而且可以对每个元素进行细粒度的修改,如ArrayList[i] = ArrayList[i] + 1。然而,在Spark中,因为数据操作一般是单向操作,通过流水线执行(下一章介绍),还需要进行错误容忍等,所以RDD被设计成一个不可变类型,可以类比成一个不能修改其中元素的ArrayList。后续我们会更深入讨论这个问题。一直使用transformation()操作可以不断生成新的RDD,而action()操作用来计算最后的数据结果,如rdd1.count()操作可以统计rdd1中包含的元素个数,rdd1.collect()操作可以将rdd1中的所有元素汇集到Driver节点,并进行进一步处理。
(4)计算结果处理:由于RDD实际上是分布在不同机器上的,所以大数据应用的结果计算分为两种方式:一种方式是直接将计算结果存放到分布式文件系统中,如rdd.save("hdfs://file_location"),这种方式一般不需要在Driver端进行集中计算;另一种方式是需要在Driver端进行集中计算,如统计RDD中元素数目,需要先使用多个task统计每个RDD中分区(partition)的元素数目,然后将它们汇集到Driver端进行加和计算。例如,在图3.1中,每个分区进行action()操作得到部分计算结果result,然后将这些result发送到Driver端后对其执行f()函数,得到最终结果。
3.2 Spark逻辑处理流程生成方法
我们在写程序时会想到类似图3.1的逻辑处理流程。然而,Spark实际生成的逻辑处理流程图往往比我们头脑中想象的更加复杂,例如,会多出几个RDD,每个RDD会有不同的分区个数,RDD之间的数据依赖关系不同,等等。对于Spark来说,需要有一套通用的方法,其能够将应用程序自动转化为确定性的逻辑处理流程,也就是RDD及其之间的数据依赖关系。因此,需要解决以下3个问题。
- 根据应用程序如何产生RDD,产生什么样的RDD?
- 如何建立RDD之间的数据依赖关系?
- 如何计算RDD中的数据?
3.2.1 根据应用程序如何产生RDD,产生什么样的RDD
我们能想到的一种简单解决方法是对程序中的每一个数据进行操作,也就是用transformation()方法返回(创建)一个新的RDD。这种方法的主要问题是只适用于逻辑比较简单的transformation(),如在rdd1上使用map(func)进行操作时,是对rdd1中每一个元素执行func()函数得到一个新的元素,因此只会生成一个rdd2。然而,一些复杂的transformation(),如join()、distinct()等,需要对中间数据进行一系列子操作,那么一个transformation()会创建多个RDD。例如,rdd3 = rdd1.join(rdd2)需要先将rdd1和rdd2中的元素聚合在一起,然后使用笛卡儿积操作生成关联后的结果,在这个过程中会生成多个RDD。Spark依据这些子操作的顺序,将生成的多个RDD串联在一起,而且只返回给用户最后生成的RDD。这就是Spark实际创建出的RDD个数比我们想象的多一些的原因。
我们在第2章中看到RDD的类型有多种,如ParallelCollectionRDD、MapPartitionsRDD、ShuffledRDD等。为什么会有这么多不同类型的RDD,应该产生哪些RDD?虽然我们用RDD来对输入/输出、中间数据进行统一抽象,但这些数据本身可能具有不同的类型,而且是由不同的计算逻辑得到的,可能具有不同的依赖关系。因此,我们需要多种类型的RDD来表示这些不同的数据类型、不同的计算逻辑,以及不同的数据依赖。Spark实际产生的RDD类型和个数与transformation()的计算逻辑有关,官网上[69]也给出了典型的transformation()操作及其创建的RDD。然而,只看官网上的解释,很难理解某些操作的真正含义,我们会在3.3节中通过图示详细介绍每个操作的含义及产生的RDD。
3.2.2 如何建立RDD之间的数据依赖关系
我们已经知道transformation()操作会形成新的RDD,那么接下来的问题就是如何建立RDD之间的数据依赖关系?数据依赖关系包括两方面:一方面是RDD之间的依赖关系,如一些transformation()会对多个RDD进行操作,则需要建立这些RDD之间的关系。另一方面是RDD本身具有分区特性,需要建立RDD自身分区之间的关联关系。具体地,我们需要解决以下3个问题。
- 如何建立RDD之间的数据依赖关系?例如,生成的RDD是依赖于一个parent RDD,还是多个parent RDD?
- 新生成的RDD应该包含多少个分区?
- 新生成的RDD与其parent RDD中的分区间是什么依赖关系?是依赖parent RDD中的一个分区还是多个分区呢?
第1个问题可以很自然地解决,对于一元操作,如rdd2 = rdd1.transformation()可以确定rdd2只依赖rdd1,所以关联关系是“rdd1 ⇒ rdd2”。对于二元操作,如rdd3 = rdd1.join(rdd2),可以确定rdd3同时依赖rdd1和rdd2,关联关系是“(rdd1, rdd2) ⇒ rdd3”。二元以上的操作可以类比二元操作。
第2个问题是如何确定新生成的RDD的分区个数?在Spark中,新生成的RDD的分区个数由用户和parent RDD共同决定,对于一些transformation(),如join()操作,我们可以指定其生成的分区的个数,如果个数不指定,则一般取其parent RDD的分区个数最大值。还有一些操作,如map(),其生成的RDD的分区个数与数据源的分区个数相同,会在后面详细讨论。
第3个问题比较复杂,分区之间的依赖关系既与transformation()的语义有关,也与RDD的分区个数有关。例如,在执行rdd2 = rdd1.map()时,map()对rdd1的每个分区中的每个元素进行计算,可以得到新的元素,类似一一映射,所以并不需要改变分区个数,即rdd2的每个分区唯一依赖rdd1中对应的一个分区。而对于groupByKey()之类的聚合操作,在计算时需要对parent RDD中各个分区的元素进行计算,需要改变分区之间的依赖关系,使得RDD中的每个分区依赖其parent RDD中的多个分区,后面会详细展示。
那么Spark是怎么设计出一个通用的方法来解决第3个问题,即建立分区之间的依赖关系的呢?
理论上,分区之间的数据依赖关系可以灵活自定义,如一一映射、多对一映射、多对多映射或者任意映射等。但实际上,常见数据操作的数据依赖关系具有一定的规律,Spark通过总结这些数据操作的数据依赖关系,将其分为两大类,具体如下所述。
1)窄依赖(NarrowDependency)
窄依赖的官方解释是:“Base class for dependencies where each partition of the child RDD depends on a small number of partitions of the parent RDD. Narrow dependencies allow for pipelined execution。”
中文意思:“如果新生成的child RDD中每个分区都依赖parent RDD中的一部分分区,那么这个分区依赖关系被称为NarrowDependency。”
RDD及其分区之间的数据依赖关系类型如图3.2所示。窄依赖可以进一步细分为4种依赖。
图3.2 RDD及其分区之间的数据依赖关系类型(因文本提取无法显示图片,此处保留文字描述:图中展示了四种窄依赖关系和一种宽依赖关系。窄依赖包括:OneToOneDependency(一对一)、RangeDependency(区域)、ManyToOneDependency(多对一)、ManyToManyDependency(多对多)。宽依赖为ShuffleDependency(Shuffle依赖)。每个分支展示了parent RDD的分区和child RDD分区之间的连线关系。)
- 一对一依赖(OneToOneDependency):一对一依赖表示child RDD和parent RDD中的分区个数相同,并存在一一映射关系。典型的
transformation()包括map()、filter()等。 - 区域依赖(RangeDependency):表示child RDD和parent RDD的分区经过区域化后存在一一映射关系。典型的
transformation()包括union()等。 - 多对一依赖(ManyToOneDependency):表示child RDD中的一个分区同时依赖多个parent RDD中的分区。典型的
transformation()包括具有特殊性质的cogroup()、join()等,这个特殊性质将在3.3节中详细介绍。 - 多对多依赖(ManyToManyDependency):表示child RDD中的一个分区依赖parent RDD中的多个分区,同时parent RDD中的一个分区被child RDD中的多个分区依赖。典型的
transformation()是cartesian()。
注意:为了区别不同种类的依赖关系,本书定义了两种新的窄依赖关系,即ManyToOneDependency和ManyToManyDependency,实际上,在Spark代码中没有对这两种依赖关系进行命名,只是统称为NarrowDependency。另外,至于窄依赖为什么可以方便地进行流水线执行,将在下一章中介绍。
2)宽依赖(ShuffleDependency)
宽依赖的官方解释是:“Represents a dependency on the output of a shuffle stage。”这个解释是从实现角度来讲的,如果从数据流角度解释,宽依赖表示新生成的child RDD中的分区依赖parent RDD中的每个分区的一部分。什么是“依赖parent RDD中的每个分区的一部分”呢?我们对比图3.2中的ManyToManyDependency和ShuffleDependency,发现ShuffleDependency中RDD 2的每个分区虽然依赖RDD 1中的所有分区,但只依赖这些分区中id为1或2的部分,而ManyToManyDependency中RDD 2的每个分区依赖RDD 1中每个分区的所有部分。实际上,在ManyToManyDependency中,RDD 1中每个分区被依赖了2次,而在ShuffleDependency中每个分区只被依赖了1次。通常,parent RDD中的分区只需要被使用(处理)1次,因此ShuffleDependency更加常用。与NarrowDependency类似,ShuffleDependency也包含多个parent RDD的情况。在图3.2及后面的图中,我们用红色箭头来表示ShuffleDependency。
总的来说,窄依赖、宽依赖的区别是child RDD的各个分区是否完全依赖parent RDD的一个或者多个分区。根据数据操作语义和分区个数,Spark可以在生成逻辑处理流程时就明确child RDD是否需要parent RDD的一个或多个分区的全部数据。如果parent RDD的一个或者多个分区中的数据需要全部流入child RDD的某一个或者多个分区,则是窄依赖。如果parent RDD分区中的数据需要一部分流入child RDD的某一个分区,另外一部分流入child RDD的另外分区,则是宽依赖。
读者可能会问,“对数据依赖(Dependency)进行分类有什么用处”?这样做首先可以明确RDD分区之间的数据依赖关系,在执行时Spark可以确定从哪里获取数据,输出数据到哪里。其次,对数据依赖进行分类有利于生成物理执行计划。NarrowDependency在执行时可以在同一个阶段进行流水线(pipeline)操作,不需要进行Shuffle,而ShuffleDependency需要进行Shuffle(将在下一章的物理执行计划中详细介绍)。最后,对数据依赖进行分类有利于代码实现,如OneToOneDependency可以采用一种实现方式,而ShuffleDependency采用另一种实现方式。这样,Spark可以根据transformation()操作的计算逻辑选择合适的数据依赖进行实现。
了解RDD之间的分区依赖关系后,我们还需要解决的一个问题是如何对RDD内部的数据进行分区?常用的数据分区方法(Partitioner)包括3种:水平划分、Hash划分(HashPartitioner)和Range划分(RangePartitioner)。Spark采用了这3种分区方法,具体如下所述。
(1)水平划分:按照record的索引进行划分。例如,我们经常使用的sparkContext.parallelize(list(1, 2, 3, 4, 5, 6, 7, 8, 9), 3),就是按照元素的下标划分,(1,2,3)为一组,(4,5,6)为一组,(7,8,9)为一组。这种划分方式经常用于输入数据的划分,如使用Spark处理大数据时,我们先将输入数据上传到HDFS上,HDFS自动对数据进行水平划分,也就是按照128MB为单位将输入数据划分为很多个小数据块(block),之后每个Spark task可以只处理一个数据块。
(2)Hash划分(HashPartitioner):使用record的Hash值来对数据进行划分,该划分方法的好处是只需要知道分区个数,就能将数据确定性地划分到某个分区。在水平划分中,由于每个RDD中的元素数目和排列顺序不固定,同一个元素在不同RDD中可能被划分到不同分区。而使用HashPartitioner,可以根据元素的Hash值,确定性地得出该元素的分区。该划分方法经常被用于数据Shuffle阶段。
(3)Range划分(RangePartitioner):该划分方法一般适用于排序任务,核心思想是按照元素的大小关系将其划分到不同分区,每个分区表示一个数据区域。例如,我们想对一个数组进行Range划分经常采用抽样方法来估算数据区域边界。
3.2.3 如何计算RDD中的数据
在上面两小节中,我们理解了如何生成RDD,以及建立RDD之间的数据依赖关系,但还有一个问题是,如何计算RDD中的数据?RDD中的每个分区中包含n条数据,我们需要计算其中的每条数据,那么怎么计算这些数据呢?
在确定了数据依赖关系后,相当于我们知道了child RDD中每个分区的输入数据是什么,那么只需要使用transformation(func)处理这些输入数据,将生成的数据推送到child RDD中对应的分区即可。在普通程序中,我们得到输入数据后,可以写任意的控制逻辑程序进行处理。例如,输入一个数组,我们可以对数组进行前向迭代、后向迭代或者循环处理等。然而,Spark中的大多数transformation()类似数学中的映射函数,具有固定的计算方式(控制流),如map(func)操作需要每读入一个record,就进行处理,然后输出一个record。reduceByKey(func)操作中的func对中间结果和下一个record进行聚合计算并输出结果。当然,有些大数据应用需要更灵活的控制流,Spark也提供了一些类似普通程序的操作,如mapPartitions()可以对分区中的数据进行多次操作后再输出结果。
图3.3展示了数据操作map()和mapPartitions()的区别,rdd2 = rdd1.map(func)和rdd2 = rdd1.mapPartitions(func)都会生成一个新的rdd2,而且rdd2和rdd1之间是OneToOneDependency,不同的是,两个func的控制流不一样。假设rdd1中某个分区的数据是[1, 2, 3, 4, 5],rdd2 = rdd1.map(func)的计算逻辑类似于下面的单机程序:
// 伪代码示例
val rdd2 = new ArrayList[Int]()
for (record <- rdd1) {
rdd2.add(func(record))
}rdd2 = rdd1.mapPartitions(func)的计算逻辑类似于下面的单机程序:
// 伪代码示例
val rdd2 = func(rdd1.iterator) // func接收整个分区的迭代器,可进行多次操作后输出结果Spark中
mapPartitions()的计算逻辑更接近Hadoop MapReduce中的map()和cleanup()函数。在Hadoop MapReduce中,map()函数对每个到来的<K,V>record都进行处理,等对这些record处理完成后,使用cleanup()对处理结果进行集中输出。同样,Spark中的mapPartitions()可以在对分区中所有record处理后,再集中输出。
,经过map(func)后,每个元素单独经过func,输出新的分区[2,3,4,5,6](假设func(x)=x+1);右侧展示mapPartitions(func),func接收整个分区的迭代器,可以批量处理,例如先排序再映射,输出结果可能不同。两个操作都保持OneToOneDependency关系。)
至此,我们了解了逻辑处理流程的生成过程,接下来我们详细讨论常用的transformation()的语义、计算逻辑、产生的RDD及数据依赖关系,理解这些知识,将有助于开发Spark应用,并进行性能调优等。
3.3 常用 transformation() 数据操作
本节深入探讨常用 transformation() 操作的逻辑处理流程。针对每个操作,先简要介绍其语义,再给出逻辑处理流程图和示例代码,最后分析其特性与使用建议。
map() 和 mapValues() 操作
逻辑处理流程如图 3.4 所示。
图3.4
map()和mapValues()操作的逻辑处理流程示例:图中展示两个输入分区经过map()或mapValues()后各自生成一个对应的输出分区,每个分区内的记录逐一被转换,输出分区数与输入相同。
map() 示例代码:
// 假设 rdd1 为 RDD[(Int, String)]
val rdd2 = rdd1.map{ case (k, v) => (k, v.toUpperCase) }mapValues() 示例代码:
val rdd2 = rdd1.mapValues(_.toUpperCase)map() 和 mapValues() 均生成 MapPartitionsRDD,其数据依赖关系均为 OneToOneDependency。
filter() 和 filterByRange() 操作
逻辑处理流程如图 3.5 所示。
图3.5
filter()和filterByRange()操作的逻辑处理流程示例:每个分区中的记录经过条件筛选,符合条件的记录保留并传入下一 RDD,分区数不变。
filter() 示例代码(输出 rdd1 中 Key 为偶数的 record):
val rdd2 = rdd1.filter { case (k, _) => k % 2 == 0 }filterByRange() 示例代码(输出 rdd1 中 Key 在 [2, 4] 中的 record):
val rdd2 = rdd1.filterByRange(2, 4)两者均生成 MapPartitionsRDD,依赖关系为 OneToOneDependency。
flatMap() 和 flatMapValues() 操作
逻辑处理流程如图 3.6 所示。
图3.6
flatMap()和flatMapValues()操作的逻辑处理流程示例:每个输入记录可展开为 0 个或多个输出记录,分区内记录数可能变化,但分区结构保持不变。
flatMap() 示例代码(分词程序):
val words = linesRDD.flatMap(_.split(" "))flatMapValues() 示例代码(分词程序):
val pairs = rdd1.flatMapValues(_.split(" "))两者均生成 MapPartitionsRDD,依赖关系为 OneToOneDependency。
sample() 和 sampleByKey() 操作
逻辑处理流程如图 3.7 所示。
图3.7
sample()和sampleByKey()操作的逻辑处理流程示例:sample()对每个分区独立抽样,sampleByKey()则按 Key 设定不同抽样概率。图中显示sample(false)使用伯努利抽样,每个分区随机保留部分记录;sampleByKey()则根据 Key 的概率分布抽取。
sample() 生成 PartitionwiseSampledRDD,sampleByKey() 生成 MapPartitionsRDD,依赖关系均为 OneToOneDependency。
sample(false)采用伯努利抽样,每个 record 有fraction × 100%概率被选中。sample(true)采用泊松分布抽样,抽样结果可能包含多于原记录数的副本。sampleByKey()允许为每个 Key 单独设置抽取概率。
sample() 示例代码(无放回模式和有放回模式抽样):
val sampled1 = rdd1.sample(false, 0.5) // 无放回,约50%数据
val sampled2 = rdd1.sample(true, 0.5) // 有放回,泊松抽样sampleByKey() 示例代码(无放回模式和有放回模式抽样):
import org.apache.spark.SparkContext._
val fractions = Map(1 -> 0.8, 2 -> 0.5) // Key=1 抽80%,Key=2 抽50%
val sampled = rdd1.sampleByKey(false, fractions)两者均生成 MapPartitionsRDD,依赖关系为 OneToOneDependency。
mapPartitions() 和 mapPartitionsWithIndex() 操作
逻辑处理流程如图 3.8 所示。
图3.8
mapPartitions()和mapPartitionsWithIndex()操作的逻辑处理流程示例:每个分区作为一个整体被处理,一次可访问分区内所有记录,并输出任意大小的新分区。
mapPartitions() 示例代码(计算每个分区中奇数的和与偶数的和):
val result = rdd1.mapPartitions(iter => {
var oddSum = 0
var evenSum = 0
for ((k, v) <- iter) {
if (v % 2 == 0) evenSum += v else oddSum += v
}
Iterator(("oddSum", oddSum), ("evenSum", evenSum))
})mapPartitionsWithIndex() 示例代码(同样计算,但包含分区索引):
val result = rdd1.mapPartitionsWithIndex((index, iter) => {
var oddSum = 0
var evenSum = 0
for ((k, v) <- iter) {
if (v % 2 == 0) evenSum += v else oddSum += v
}
Iterator((index, "odd", oddSum), (index, "even", evenSum))
})实用技巧:使用 mapPartitionsWithIndex() 可输出每个分区的索引及其包含的记录,便于调试:
rdd1.mapPartitionsWithIndex((idx, iter) => Iterator((idx, iter.toList)))mapPartitions() 和 mapPartitionsWithIndex() 更贴近过程式编程:
- 可在处理一组数据时持有中间结果。
- 可输出任意大小、任意类型的数据。
- 适合实现数据库操作:例如在
mapPartitions()中建立一次数据库连接,然后逐条插入记录。相比之下,map()会对每条 record 创建连接,造成资源浪费。
partitionBy() 操作
逻辑处理流程如图 3.9 所示。
图3.9
partitionBy()的逻辑处理流程示例:
- 左图:使用
HashPartitioner重新分区。Key=2 和 Key=4 的记录被分到同个分区(partition 1),Key=1 和 Key=3 的记录被分到 partition 2。- 右图:使用
RangePartitioner重新分区。Key 值较小的记录集中在 partition 1,较大的集中在 partition 2。注意RangePartitioner不保证分区内数据有序。
示例代码:
val rdd2 = rdd1.partitionBy(new HashPartitioner(2))
// 或
val rdd2 = rdd1.partitionBy(new RangePartitioner(2, rdd1))groupByKey() 操作
逻辑处理流程如图 3.10 所示。
图3.10
groupByKey()操作的逻辑处理流程示例:
- 左图:
rdd1水平划分(3 个分区),rdd2使用 Hash 划分(2 个分区)。由于 partitioner 不同,需通过ShuffleDependency重新分配数据。- 右图:
rdd1已使用 Hash 划分且分区数与rdd2一致,则仅需在每个分区内执行groupByKey(),无需ShuffleDependency。
groupByKey() 示例代码(对 <K, V> 类型数据进行聚合):
val grouped = rdd1.groupByKey() // 默认使用 HashPartitioner,分区数等于父RDD无 Shuffle 的 groupByKey() 示例代码(当 parent RDD 与 child RDD 的 partitioner 一致且分区数相同时):
val rdd1 = rdd1.partitionBy(new HashPartitioner(2))
val grouped = rdd1.groupByKey() // 无需 Shuffle,仅窄依赖关键特性:
- 类似于 SQL 中的
GROUP BY,但并行执行。 - 引入
ShuffleDependency,可灵活调整分区数(通过numPartitions参数,默认与 parent 相同)。 - 缺点:Shuffle 时产生大量中间数据,内存占用高。多数情况下应优先使用
reduceByKey()。
reduceByKey() 操作
逻辑处理流程如图 3.11 所示。
图3.11
reduceByKey()操作的逻辑处理流程示例:
- 第1步:在
ShuffleDependency之前,对每个分区内的数据执行本地combine()操作(亦称 mini-reduce / map端 combine)。将相同 Key 的 Value 使用func聚合,这一步不形成新 RDD。- 第2步:
reduceByKey()生成ShuffledRDD,将来自不同分区的相同 Key 的数据再次使用func聚合。
示例代码(对 <K, V> 类型数据进行聚合):
val reduced = rdd1.reduceByKey((v1, v2) => v1 + v2)注意:
func必须满足交换律和结合律(因为 Shuffle 不保证数据到达顺序)。- Key 不宜使用复杂类型(如 Array),因为 Shuffle 依赖 Hash 划分。
reduceByKey()的combine()与reduce()使用相同逻辑,灵活性低于aggregateByKey()。- 相比
groupByKey(),reduceByKey()在 Shuffle 前进行预聚合,大幅减少数据传输量和内存占用,效率更高。
aggregateByKey() 操作
逻辑处理流程如图 3.12 所示。
图3.12
aggregateByKey()操作的逻辑处理流程示例:
- 对每个分区,首先使用
seqOp将分区内的<K, V>记录聚合为<K, V'>。- 然后在
ShuffledRDD中,使用combOp将不同分区中相同 Key 的<K, V'>进一步聚合。- 图中示例:第3个分区中对3个 record 执行
seqOp计算;在 ShuffledRDD 中对 Key=2 的记录执行combOp。
为什么需要 aggregateByKey()?
reduceByKey() 的 combine() 和 reduce() 使用同一函数,灵活性有限。例如,若希望在 combine() 中用 sum,在 reduce() 中用 max,则 reduceByKey() 不能满足。aggregateByKey() 将两个函数分开:seqOp(分区内聚合)和 combOp(跨分区聚合),并允许提供初始值 zeroValue。
示例代码(使用下划线和 @ 符号连接字符串):
val result = inputRDD.aggregateByKey("x")(
seqOp = (u, t) => u + "_" + t,
combOp = (u1, u2) => u1 + "@" + u2
)应用示例:
- FPGrowthModel 中使用
aggregateByKey()。 - NaiveBayes 代码中也使用
aggregateByKey()。
aggregateByKey() 与 reduceByKey() 的另一个区别:seqOp 的输入 Value 与输出类型可以不同(zeroValue 类型与输出相同,但与输入可以不同),而 reduceByKey() 要求输入输出类型一致。
补充:当 seqOp 处理的中间数据量很大并导致 Shuffle spill 时,Spark 会在 map 端自动执行 combOp(),将磁盘上经 seqOp 处理的数据与内存中的数据进行合并。reduceByKey() 可视为 seqOp = combOp = func 的特例。
combineByKey() 操作
逻辑处理流程如图 3.13 所示。
图3.13
combineByKey()的逻辑处理流程示例:先通过createCombiner为每个 record 创建初始值,然后mergeValue在分区内合并,最后mergeCombiners跨分区合并。
为什么需要 combineByKey()?它与 aggregateByKey() 本质相似,aggregateByKey() 基于 combineByKey() 实现:
zeroValue→createCombiner(函数而非固定值)seqOp→mergeValuecombOp→mergeCombiners
区别在于 createCombiner 是一个初始化函数,灵活性高于固定值 zeroValue。例如,可根据每条记录的 Value 定制初始值。
示例代码:
val result = rdd1.combineByKey(
createCombiner = (v: Int) => v.toString,
mergeValue = (c: String, v: Int) => c + "@" + v,
mergeCombiners = (c1: String, c2: String) => c1 + "#" + c2
)foldByKey() 操作
逻辑处理流程如图 3.14 所示。
图3.14
foldByKey()的逻辑处理流程示例:与aggregateByKey()类似,但seqOp = combOp = func,且提供初始值zeroValue。功能介于reduceByKey()与aggregateByKey()之间。
示例代码:
val result = rdd1.foldByKey("x")((u, t) => u + "_" + t)cogroup() / groupWith() 操作
逻辑处理流程如图 3.15 所示。
图3.15
cogroup()操作的逻辑处理流程示例:
- 上图:
CoGroupedRDD与rdd1和rdd2之间均为ShuffleDependency(两者 partitioner 均不同)。- 下图:
rdd1与CoGroupedRDD使用相同的HashPartitioner且分区数相同,因此为OneToOneDependency;而rdd2仍需要ShuffleDependency。
cogroup() 与 groupByKey() 的区别:cogroup() 可以聚合多个 RDD(最多 4 个)。其生成的 CoGroupedRDD 与多个 parent RDD 存在依赖关系,依赖类型取决于每个 parent 的 partitioner 与分区数。
对应上图示例代码(将 inputRDD1 与 inputRDD2 数据聚合):
val cogrouped = inputRDD1.cogroup(inputRDD2)对应下图示例代码(当 rdd1 已使用 `对应下图示例代码:
// 当 rdd1 已使用 HashPartitioner(2) 且分区数与 CoGroupedRDD 一致时
val rdd1 = sc.parallelize(Seq((1,"a"),(2,"b"),(3,"c"))).partitionBy(new HashPartitioner(2))
val rdd2 = sc.parallelize(Seq((1,"x"),(2,"y"),(4,"z")))
val cogrouped = rdd1.cogroup(rdd2)结论:Spark 决定 RDD 间数据依赖时,不仅考虑 transformation() 的计算逻辑,还考虑 child RDD 与 parent RDD 的分区信息。当分区个数和 partitioner 一致时,parent RDD 中的数据可直接流入 child RDD,无需 ShuffleDependency,从而避免数据传输,提升执行效率。
cogroup() 最多支持 4 个 RDD 同时进行 cogroup,例如:rdd5 = rdd1.cogroup(rdd2, rdd3, rdd4)。cogroup() 实际生成两个 RDD:CoGroupedRDD 负责将数据聚合在一起,MapPartitionsRDD 将数据类型转换为 CompactBuffer(类似于 Java 的 ArrayList)。当 cogroup 聚合的 RDD 包含大量数据时,Shuffle 这些中间数据会增加网络传输,且需要大量内存存储聚合后的数据,效率较低。
join() 操作
示例代码(将 inputRDD1 中的数据与 inputRDD2 中的数据关联在一起):
val joined = inputRDD1.join(inputRDD2)逻辑处理流程如图 3.16 所示。join() 操作建立在 cogroup() 之上:首先利用 CoGroupedRDD 将相同 Key 的 Value 聚合在一起,形成 <K, [list(V), list(W)]>,然后对 [list(V), list(W)] 进行笛卡儿积计算并输出结果 <K, (V, W)>(此处 list 表示 CompactBuffer)。实际实现中,join() 先调用 cogroup() 生成 CoGroupedRDD 和 MapPartitionsRDD(为节省空间,图中将两者画在一起),然后计算 MapPartitionsRDD 中 [list(V), list(W)] 的笛卡儿积,生成新的 MapPartitionsRDD。
在 cogroup() 中已介绍,child RDD 与 parent RDD 之间的依赖关系与 partitioner 类型和分区个数相关。对于示例代码,Spark 在不同情况下可生成 4 种不同的逻辑处理流程,如图 3.16 所示。
图 3.16
join()操作的不同逻辑处理流程示例:
- 第 1 个图:
rdd1、rdd2、CoGroupedRDD具有不同的partitioner。直接执行示例代码,由于各 RDD 的partitioner不同,相同 Key 的 record 分布在不同的分区中,因此需要ShuffleDependency使其聚合。- 第 2 个图:
rdd1与CoGroupedRDD的partitioner均为HashPartitioner(3)。若只去掉第 1 个注释代码(即预先对rdd1做 Hash 分区),则rdd1的数据分布与CoGroupedRDD一致,只需OneToOneDependency。- 第 3 个图:
rdd2与CoGroupedRDD的partitioner均为HashPartitioner(3)。若只去掉第 2 个注释代码,原理与第 2 个图相同。- 第 4 个图:
rdd1、rdd2、CoGroupedRDD的partitioner均为HashPartitioner(3)。若同时去掉两个注释代码,整个join()操作不存在ShuffleDependency,下一章将看到该逻辑处理流程图不会产生 Shuffle 阶段。
cartesian() 操作
示例代码(计算 inputRDD1 与 inputRDD2 中数据的笛卡儿积):
val cartesianRDD = inputRDD1.cartesian(inputRDD2)逻辑处理流程如图 3.17 所示。假设 rdd1 分区数为 m,rdd2 分区数为 n,cartesian() 操作生成 m × n 个分区。rdd1 和 rdd2 中的分区两两组合,形成 CartesianRDD 中的一个分区,该分区中的数据是相应两个分区中数据的笛卡儿积。
图 3.17
cartesian()操作的逻辑处理流程示例:展示rdd1(3 个分区)与rdd2(2 个分区)组合生成 6 个分区的过程。
cartesian() 形成的数据依赖关系虽较复杂,但归属于多对多的 NarrowDependency,而非 ShuffleDependency。
sortByKey() 操作
示例代码(将 inputRDD 中的数据按照 Key 排序):
val sortedRDD = inputRDD.sortByKey()逻辑处理流程如图 3.18 所示。sortByKey() 首先将 rdd1 中不同 Key 的 record 分发到 ShuffledRDD 的不同分区中,然后在每个分区内按照 Key 对 record 排序,形成的数据依赖关系为 ShuffleDependency。可以看到在 rdd2 中,所有 record 均按 Key 排序,但相同 Key 下 Value 是无序的。sortByKey() 与 groupByKey() 一样,不使用 map 端的 combine。
图 3.18
sortByKey()操作的逻辑处理流程示例:数据经过 Shuffle 后进入ShuffledRDD,每个分区内按 Key 排序,输出全局有序(按 Key)的结果。
与 reduceByKey() 等使用 Hash 划分不同,sortByKey() 为了保证全局有序(按 Key 排序),采用 Range 划分。Range 划分可以保证在生成的 RDD 中,partition 1 的所有 record 的 Key 均小于(或均大于)partition 2 中所有 record 的 Key。
深入问题:sortByKey() 的缺点是 Key 有序但 Value 无序,如何使 Value 也有序?
在 Hadoop MapReduce 中可使用 SecondarySort:将 Value 放入 Key 中,如 <Key, Value> → <(Key, Value), null>,并重新定义 Key 的排序函数。Spark 中有两种方法:
- 类似 Hadoop 的方法:先用
map()将<Key, Value>转换为<(Key, Value), null>,将(Key, Value)定义为新 class 并重写compare()排序函数,再用sortByKey()排序,最后只输出 Key 即可。 - 更复杂的方法:先用
groupByKey()聚合为<Key, list(Value)>,再使用rdd.mapValues(sort function)对list(Value)排序。
coalesce() 操作
示例代码:
val coalescedRDD = rdd1.coalesce(2)
val coalescedWithShuffle = rdd1.coalesce(2, shuffle = true)逻辑处理流程如图 3.19 所示。coalesce() 可改变 RDD 的分区个数,不同参数下具有不同逻辑处理流程,具体分为 4 种情况:
图 3.19
coalesce()操作的不同逻辑处理流程示例:
- ① 减少分区个数(无 Shuffle):
rdd1有 5 个分区,使用coalesce(2)时,Spark 将相邻分区直接合并,形成rdd2的 2 个分区,依赖关系为多对一的NarrowDependency。缺点:若rdd1各分区数据量差别大,直接合并易导致数据倾斜。- ② 增加分区个数(无 Shuffle):使用
coalesce(6)试图将分区数增至 6,但生成的rdd2分区数仍为 5,无变化。因为coalesce()默认使用NarrowDependency,不能将一个分区拆分为多份。- ③ 使用 Shuffle 来减少分区个数:
coalesce(2, shuffle = true)可解决数据倾斜问题。Spark 随机打乱数据,使各分区数据均衡。方法:为每个 record 添加一个特殊 Key(Int 类型,从[0, numPartitions)中随机生成),然后根据 Key 的 Hash 值分发数据到目标分区,最后去掉 Key(见图中最后的MapPartitionsRDD)。- ④ 使用 Shuffle 来增加分区个数:
coalesce(6, shuffle = true)通过ShuffleDependency可对分区进行拆分和重新组合,解决分区数无法增加的问题。
repartition() 操作
逻辑处理流程图与图 3.19 中的第 3 个和第 4 个图相同。实际上 repartition() 等价于 coalesce(numPartitions, shuffle = true),专用于改变分区数(增加或减少均使用 Shuffle)。
repartitionAndSortWithinPartitions() 操作
示例代码(对 inputRDD 中的数据重新划分并在分区内排序):
val repartitionedSorted = inputRDD.repartitionAndSortWithinPartitions(new HashPartitioner(3))逻辑处理流程如图 3.20 左图所示。repartitionAndSortWithinPartitions() 首先使用用户定义的 partitioner 将数据分发到不同分区,然后对每个分区中的数据按照 Key 排序。与 repartition() 相比,它多了分区内排序功能。与 sortByKey() 相比(图 3.20 右图),repartitionAndSortWithinPartitions() 的 partitioner 可由用户定义,不一定是 sortByKey() 默认的 RangePartitioner。因此,其结果不保证 Key 全局有序。
图 3.20
repartitionAndSortWithinPartitions()与sortByKey()的逻辑流程对比:左图使用自定义HashPartitioner分区后排序,右图使用RangePartitioner全局排序。
intersection() 操作
示例代码(计算两个 RDD 的交集):
val intersectionRDD = rdd1.intersection(rdd2)核心思想:先利用 cogroup() 将 rdd1 和 rdd2 的相同 record 聚合在一起,然后过滤出在两个 RDD 中都存在的 record。如图 3.21 所示,具体方法是:先将 rdd1 中的 record 转化为 <K, V> 类型(V 固定为 null),然后将 rdd1 和 rdd2 的 record 聚合在一起,过滤掉出现 () 的 record(即只存在于一个 RDD 中的 record),最后只保留 Key,得到交集元素。
图 3.21
intersection()的逻辑处理流程示例:数据经cogroup()后,过滤条件为两个CompactBuffer均非空,输出 Key。
distinct() 操作
示例代码(对 inputRDD 中的数据进行去重):
val distinctRDD = inputRDD.distinct()逻辑处理流程如图 3.22 所示。与 intersection() 类似,distinct() 先将数据转化为 <K, V> 类型(Value 为 null),然后使用 reduceByKey() 将这些 record 聚合在一起,最后使用 map() 只输出 Key,即得到去重后的元素。
图 3.22
distinct()的逻辑处理流程示例:数据经过reduceByKey()合并相同 Key 后,再通过map()提取 Key。
3.0 第3章 Spark逻辑处理流程
- 页面范围:58-130
- 本部分:3/4
distinct() 操作
示例代码(对 inputRDD 中的数据进行去重):
inputRDD.distinct()逻辑处理流程:如图 3.22 所示,与 intersection() 操作类似,distinct() 操作先将数据转化为 <K, V> 类型,其中 Value 为 null 类型,然后使用 reduceByKey() 将这些 record 聚合在一起,最后使用 map() 只输出 Key,即可得到去重后的元素。
Figure
图3.22 distinct() 的逻辑处理流程示例
union() 操作
示例代码1:
rdd1.union(rdd2) // 两个RDD使用不同分区器示例代码2:
rdd1.union(rdd2) // 两个RDD都使用HashPartitioner且分区数相同逻辑处理流程:如图 3.23 所示,union() 将 rdd1 和 rdd2 中的 record 组合在一起,形成新的 rdd3。形成的数据依赖关系是 RangeDependency。union() 形成的逻辑执行流程有以下两种:
- 第一种:如图3.23中的左图和示例代码1所示,rdd1 和 rdd2 是两个非空的 RDD,且两者的 partitioner 不一致,合并后的 rdd3 为 UnionRDD,其分区个数是 rdd1 和 rdd2 的分区个数之和,rdd3 的每个分区也一一对应 rdd1 或 rdd2 中相应的分区。
- 第二种:如图3.23中的右图和示例代码2所示,rdd1 和 rdd2 是两个非空的 RDD,且两者都使用 Hash 划分,得到 rdd1′ 和 rdd2′。因此,rdd1′ 和 rdd2′ 的 partitioner 是一致的,都是 Hash 划分且分区个数相同。rdd1′ 和 rdd2′ 合并后的 rdd3 为 PartitionerAwareUnionRDD,其分区个数与 rdd1 和 rdd2 的分区个数相同,且 rdd3 中的每个分区的数据是 rdd1′ 和 rdd2′ 对应分区合并后的结果。
Figure
图3.23 union() 的逻辑处理流程示例
zip() 操作
示例代码:
rdd1.zip(rdd2)逻辑处理流程:如图 3.24 所示,zip() 操作像拉链一样将 rdd1 和 rdd2 中的 record 按照一一对应关系连接在一起,形成新的 <K, V> record,生成的 RDD 名为 ZippedPartitionsRDD2,RDD2 的意思是对两个 RDD 进行连接。
Figure
图3.24 zip() 的逻辑处理流程示例
zipPartitions() 操作
示例代码:
rdd1.zipPartitions(rdd2){ (iter1, iter2) => ... }逻辑处理流程:如图 3.25 所示,zipPartitions() 操作首先像拉链一样将 rdd1 和 rdd2 中的分区(而非分区中的每个 record)按照一一对应关系连接在一起,并提供两个迭代器 rdd1Iter 和 rdd2Iter,来分别迭代每个分区中来自 rdd1 和 rdd2 的 record。例如,可以通过 rdd1Iter.next() 来依次访问来自 rdd1 的两个 record:(2, c) 和 (3, d)。在示例中,我们同时迭代 rdd1 和 rdd2 中的 record,并使用下划线连接索引相同的 record,如连接 (4, e) 和 (2, h) 得到 (4, e)_(2, h)。zipPartitions() 生成的 RDD 的类型同样是 ZippedPartitionsRDD2。
Figure
图3.25 zipPartitions() 的逻辑处理流程示例
zipPartitions() 可以同时连接 2、3 或者 4 个 RDD,如 rdd5 = rdd1.zipPartitions(rdd2, rdd3, rdd4)。zipPartitions() 的要求是参与连接的 RDD 都包含相同的分区个数,但不要求每个分区中的 record 数目相同。zipPartitions() 还有一个参数是 preservePartitioning,其默认值为 false,意即 zipPartitions() 生成的 RDD(如图3.25中的 rdd3)继承 parent RDD 的 partitioner,因为继承 partitioner 可以提升后续操作的执行效率(如避免 Shuffle 阶段)。假设 rdd1 和 rdd2 的 partitioner 都为 HashPartitioner(3),那么 preservePartitioning = true,表示 rdd3 的 partitioner 仍为 HashPartitioner(3);如果 preservePartitioning = false,那么 rdd3 的 partitioner = None,也就是 rdd3 被 Spark 认为是随机划分的。但这个 preservePartitioning 参数限制太强,因为参与 zipPartitions() 的 RDD 有多个,每个 RDD 的 partitioner 可能不同,仅当参与 zipPartitions() 的多个 RDD 具有相同的 partitioner 时,preservePartitioning 才有意义。
zipWithIndex() 和 zipWithUniqueId() 操作
示例代码:
rdd1.zipWithIndex()
rdd1.zipWithUniqueId()逻辑处理流程:如图 3.26 的左图所示,zipWithIndex() 对 rdd1 中的每个 record 都进行编号,编号方式是从 0 开始依次递增(跨分区)的,生成的 RDD 类型是 ZippedWithIndexRDD。
如图 3.26 的右图所示,zipWithUniqueId() 对 rdd1 中的每个 record 都进行编号,编号方式是按照 round-robin 方式,也就是像发扑克牌一样,将编号发给每个分区中的 record,不可以轮空。例如,在图 3.26 的右图中,编号 6 和 7 分别分配给了 partition1 中的第 3 个 record 和 partition2 中的第 3 个 record,但由于这两个分区中没有对应的第 3 个 record,这两个编号就作废了,接下来的编号 8 分配给了 partition3 中的第 3 个 record。zipWithUniqueId() 操作生成的 RDD 类型是 MapPartitionsRDD。
Figure
图3.26 zipWithIndex() 和 zipWithUniqueId() 的逻辑处理流程示例  给每个record分配全局递增的ID(跨分区连续),生成ZippedWithIndexRDD;右图:zipWithUniqueId() 按轮询(round-robin)方式分配唯一ID,每个分区内记录按顺序获得ID,但全局ID不连续,生成MapPartitionsRDD。)
subtractByKey() 操作
示例代码:
rdd1.subtractByKey(rdd2)逻辑处理流程:subtractByKey() 可以计算出 Key 在 rdd1 中而不在 rdd2 中的 record。如图 3.27 所示,该操作首先将 rdd1 和 rdd2 中的 <K, V> record 按 Key 聚合在一起,得到 SubtractedRDD,该过程类似 cogroup()。然后,只保留 [(a), (b)] 中 b 为 () 的 record,从而得到在 rdd1 中而不在 rdd2 中的元素。SubtractedRDD 结构和数据依赖模式都类似于 CoGroupedRDD,可以形成 OneToOneDependency 或者 ShuffleDependency,但实现比 CoGroupedRDD 更高效。
Figure
图3.27 subtractByKey() 的逻辑处理流程示例 
subtract() 操作
示例代码:
rdd1.subtract(rdd2)逻辑处理流程:subtract() 的语义与 subtractByKey() 类似,不同点是 subtract() 适用面更广,可以针对非 <K, V> 类型的 RDD。subtract() 的底层实现基于 subtractByKey() 来完成。如图 3.28 所示,先将 rdd1 和 rdd2 表示为 <K, V> record,Value 为 null,然后按照 Key 将这些 record 聚合在一起得到 SubtractedRDD,只保留 [(a), (b)] 中 b 为 () 的 record,从而得到在 rdd1 中但不在 rdd2 中的 record。
Figure
图3.28 subtract() 的逻辑处理流程示例
sortBy(func) 操作
示例代码(将 <K, V> 数据按照 Value 进行排序):
rdd1.sortBy(_._2)逻辑处理流程:sortBy(func) 与 sortByKey() 的语义类似,不同点是 sortByKey() 要求 RDD 是 <K, V> 类型,并且根据 Key 来排序,而 sortBy(func) 不对 RDD 类型作要求,只是根据每个 record 经过 func 的执行结果进行排序。sortBy(func) 基于 sortByKey() 实现。如图 3.29 所示,我们想对 rdd1 中的 <K, V> record 按照 Value 进行排序,那么我们设计的排序函数 func 为 record => record._2。为了利用 sortByKey() 对这些 record 进行排序,首先将 rdd1 中每个 record 的形式进行改变,将 <K, V> record 转化为 <V, (K, V)> record,如将 (D, 2) 转化为 <2, (D, 2)>,然后利用 sortByKey() 对转化后的 record 进行排序。最后,只保留第二项,也就是 <V, (K, V)> 中的 (K, V),即可得到排序后的 record,也就是 rdd2。
Figure
图3.29 sortBy() 的逻辑处理流程示例 
glom() 操作
示例代码:
rdd1.glom()逻辑处理流程:如图 3.30 所示,glom() 是一个简单的操作,直接将分区中的数据合并到一个 list 中。
Figure
图3.30 glom() 的逻辑处理流程示例
3.4 常用 action() 数据操作
我们知道 action() 数据操作是用来对计算结果进行后处理的,同时提交计算 job,经常在一连串 transformation() 后使用。然而,RDD 的数据操作具有各种各样的名字,我们如何判断一个操作是 action() 还是 transformation()?答案是看返回值:transformation() 操作一般返回 RDD 类型,而 action() 操作一般返回数值、数据结构(如 Map)或者不返回任何值(如写磁盘)。下面我们总结一下常用 action() 数据操作,分析这些操作如何得到最终计算结果。
count() / countByKey() / countByValue() 操作
示例代码:
rdd1.count()
rdd1.countByKey()
rdd1.countByValue()逻辑处理流程:
- count():如图 3.31 中第 1 个图所示,count() 操作首先计算每个分区中 record 的数目,然后在 Driver 端进行累加操作,得到最终结果。
- countByKey():如图 3.31 中第 2 个图所示,countByKey() 操作只统计每个 Key 出现的次数,因此首先利用 mapValues() 操作将
<K, V>record 的 Value 设置为 1(去掉原有的 Value),然后利用 reduceByKey() 统计每个 Key 出现的次数,最后汇总到 Driver 端,形成 Map。 - countByValue():如图 3.31 中第 3 个图所示,countByValue() 操作统计每个 record 出现的次数,先将 record 变为
<record, null>类型,这样转化是为了接下来使用 reduceByKey() 得到每个 record 出现的次数,最后汇总到 Driver 端,形成 Map。
Figure
图3.31 count() / countByKey() / countByValue() 的逻辑处理流程示例 
WARNING
从图3.31中可以看出,countByKey() 和 countByValue() 需要在 Driver 端存放一个 Map,当数据量比较大时,这个 Map 会超过 Driver 的内存大小。所以,在数据量较大时,建议先使用 reduceByKey() 对数据进行统计,然后将结果写入分布式文件系统(如 HDFS 等)。
collect() 和 collectAsMap() 操作
这两个操作的逻辑比较简单,都是将 RDD 中的数据直接汇总到 Driver 端,类似 count() 操作的流程图,因此不再单独给出逻辑处理流程图。collect() 将 record 直接汇总到 Driver 端,而 collectAsMap() 将 <K, V> record 都汇集到 Driver 端。在数据量较大时,两者都会造成大量内存消耗,所以需要注意内存用量。
foreach() 和 foreachPartition() 操作
示例代码:
rdd1.foreach(record => { if (record._1 >= 3) println(record) })
rdd1.foreachPartition(iter => { while (iter.hasNext) { val r = iter.next; if (r._1 >= 3) println(r) } })逻辑处理流程:如图 3.32 所示,foreach() 操作使用 func 对 rdd1 中的每个 record 进行计算,不同于 map() 操作的是,foreach() 操作一般会直接输出计算结果,并不形成新的 RDD。同理,foreachPartition() 的用法也类似于 mapPartitions(),但不形成新的 RDD。在图 3.32 中,我们自定义了 func 函数,只输出 Key ≥ 3 的 record,这些 record 被 print(打印输出)到控制台,并不形成新的 RDD。
Figure
图3.32 foreach() 和 foreachPartition() 的逻辑处理流程示例
fold() / reduce() / aggregate() 操作
逻辑处理流程:
- fold(func):如图 3.33 的第 1 个图所示,fold(func) 中的 func 的语义与 foldByKey(func) 中的 func 相同,区别是 foldByKey() 生成一个新的 RDD,而 fold() 直接计算出结果,并不生成新的 RDD。fold() 首先在 rdd1 的每个分区中计算局部结果,如
0_a_b_c,然后在 Driver 端将局部结果聚合成最终结果。需要注意的是,每次聚合时初始值zeroValue都会参与计算,而 foldByKey() 在聚合来自不同分区的 record 时并不使用初始值。 - reduce(func):如图 3.33 的第 2 个图所示,reduce(func) 的语义与去掉初始值的 fold(func) 相同。reduce(func) 可以看作是 aggregate(seqOp, combOp) 中
seqOp = combOp = func的场景。 - **aggreg(seqOp, combOp)中的 seqOp 和 combOp 的语义相同,区别是 aggregateByKey() 生成一个新的 RDD,而 aggregate() 直接计算出结果。aggregate() 使用 seqOp 在每个分区中计算局部结果,然后使用 combOp 在 Driver 端将局部结果聚合成最终结果。需要注意的是,在 aggregate() 中,seqOp 和 combOp 聚合时初始值 zeroValue 都会参与计算,而在 aggregateByKey() 中,初始值只参与 seqOp 的计算。
Figure
图3.33 fold()、reduce() 和 aggregate() 的逻辑处理流程示例 ;2. reduce: 分区内无初始值聚合,Driver再聚合;3. aggregate: 分区内seqOp聚合(带初始值),Driver端combOp聚合(带初始值)。)
示例代码:
rdd1.fold(0)(_+_)
rdd1.reduce(_+_)
rdd1.aggregate(0)(_+_, _+_)其他说明:为什么已经有 reduceByKey()、aggregateByKey() 等操作,还要定义 aggregate()、reduce() 等操作呢?答案是,有时候我们需要全局聚合。虽然 reduceByKey()、aggregateByKey() 等操作可以对每个分区中的 record,以及跨分区且具有相同 Key 的 record 进行聚合,但这些聚合都是在部分数据上(如 <K, func(list(V))> 使用 func 聚合函数)进行的,并不是针对所有 record 进行全局聚合的,即 func(<K, list(V)>)。当我们需要全局聚合结果时,需要对这些部分聚合结果 <K, func(list(V))> record 进行 merge,而这个 merge 操作就是 aggregate()、reduce() 等。这几个操作的共同问题是,当需要 merge 的部分结果很大时,数据传输量很大,而且 Driver 是单点 merge,存在效率和内存空间限制问题。为了解决这个问题,Spark 对这些聚合操作进行了优化,提出了 treeAggregate() 和 treeReduce() 操作。
treeAggregate() 和 treeReduce() 操作
treeAggregate() 的逻辑处理流程如图 3.34 所示,treeAggregate(seqOp, combOp) 的语义与 aggregate(seqOp, combOp) 的语义相同,区别是 treeAggregate() 使用树形聚合方法来优化全局聚合阶段,从而减轻了 Driver 端聚合的压力(数据传输量和内存用量)。树形聚合方法类似归并排序中的层次归并。那么如何实现这个树形聚合过程呢?如果树形聚合全部放在 Driver 端进行,则没有意义,因为没有减少数据传输量。换个角度思考,在树形聚合时,非根节点实际上是局部聚合,只有根节点是全局聚合,那么我们可以利用之前的聚合操作(如 reduceByKey()、aggregateByKey())来实现非根节点的局部聚合,而将最后的根节点聚合放在 Driver 端进行,只是我们需要为每个非根节点分配合理的数据。基于这个思想,Spark 采用 foldByKey() 来实现非根节点的聚合,并使用 fold() 来实现根节点的聚合。
具体实现过程如图 3.34 的上图所示,treeAggregate() 首先对 rdd1 中的每个分区进行局部聚合,然后不断利用 foldByKey() 进行树形聚合。因为图 3.34 的上图中只有 6 个分区,所以 depth=2 的树形聚合已经满足性能要求,如果有成百上千个分区,那么可以连续使用 foldByKey() 来进行多层(depth>2)树形聚合。由于 foldByKey() 需要 <K, V> 类型的数据,treeAggregate() 为每个 record 添加一个特殊的 Key,使得 rdd 中的数据被均分到每个非根节点进行聚合。foldByKey() 使用 ShuffleDependency,但实际上每个分区中只存在一个 record,如 <0, 0=0_1_2_3>,因此形式上是 ShuffleDependency,实际上数据传输时类似多对一的 NarrowDependency。当然,如果输入数据中的分区个数本来就很少,如图 3.34 的下图中只有 4 个分区,那么即使调用了 treeAggregate(),也会退化为类似 aggregate() 的方式进行处理。此时 treeAggregate() 与 aggregate() 的区别是,treeAggregate() 中的 zeroValue 会被多次使用(由于调用了 fold() 函数)。
Figure
图3.34 treeAggregate() 的逻辑处理流程示例
treeReduce() 的逻辑处理流程如图 3.35 所示。treeReduce() 实际上是调用 treeAggregate() 实现的,唯一区别是没有初始值 zeroValue,因此其逻辑处理流程图是简化版的 treeAggregate()。
Figure
图3.35 treeReduce() 的逻辑处理流程示例
示例代码:
rdd1.treeAggregate(0)(_+_, _+_, depth=2)
rdd1.treeReduce(_+_, depth=2)reduceByKeyLocality() 操作
示例代码:
rdd1.reduceByKeyLocally(_+_)逻辑处理流程:如图 3.36 所示,reduceByKeyLocality() 首先在 rdd1 的每个分区中对数据进行聚合,并使用 HashMap 来存储聚合结果,然后把数据汇总到 Driver 端进行全局聚合,仍然是将聚合结果存放到 HashMap 而不是 RDD 中。
Figure
图3.36 reduceByKeyLocality() 的逻辑处理流程示例
take() / first() / takeOrdered() / top() 操作
逻辑处理流程:
- take(num):如图 3.37 中的左图所示,take(num) 操作首先取出 rdd1 中第一个分区的前 num 个 record,如果 num 大于 partition1 中 record 的总数,则 take() 继续从后面的分区中取出 record。为了提高效率,Spark 在第一个分区中取 record 的时候会估计还需要对多少个后续的分区进行操作。
- first():等价于 take(1),不再画出其逻辑处理流程图。
- takeOrdered(num):如图 3.37 中的右图所示,takeOrdered(num) 的目的是从 rdd1 中找到最小的 num 个 record,因此要求 rdd1 中的 record 可比较。takeOrdered() 操作首先使用 map 在每个分区中寻找最小的 num 个 record,因为全局最小的 n 个元素一定是每个分区中最小的 n 个元素的子集。然后将这些 record 收集到 Driver 端,进行排序,然后取出前 num 个 record。top(num) 的执行逻辑与 takeOrdered(num) 相同,只是取出最大的 num 个 record。可以看到,这 4 个操作都需要将数据收集到 Driver 端,因此不适合 num 较大的情况。
Figure
图3.37 take() 和 takeOrdered() 的逻辑处理流程示例  从第一个分区开始逐个分区取记录直到取够num个;右图:takeOrdered(num) 每个分区内找最小的num个记录,然后收集到Driver排序取前num个。)
max() 和 min() 操作
max() 和 min() 操作都是基于 reduce(func) 实现的,func 的语义是选取最大值或最小值。逻辑处理流程如图 3.38 所示,两者的逻辑流程图与 reduce(func) 类似。
isEmpty() 操作
逻辑处理流程如图 3.38 所示。isEmpty() 操作主要用来判断 rdd 中是否包含 record。如果对 rdd 执行一些数据操作(如过滤、求交集等)后,rdd 为空的话,那么执行其他操作的结果肯定也为空,因此,提前判断 rdd 是否为空,可以避免提交冗余的 job。
Figure
图3.38 max() 和 min() 以及 isEmpty() 的逻辑处理流程示例
lookup() 操作
lookup() 操作查找包含特定 Key 的 record,并将这些 record 的 Value 组合成 list。逻辑处理流程如图 3.39 所示,lookup() 首先过滤出给定 Key 的 record,然后使用 map() 得到相应的 Value,最后使用 collect() 将这些 Value 收集到 Driver 端形成 list(也就是图 3.39 中的 WrappedArray)。如果 rdd1 的 partitioner 已经确定,如 HashPartitioner(3),那么在过滤前,可以通过 Hash(Key) 直接确定需要操作的分区,这样可以减少操作的数据。
Figure
图3.39 lookup() 的逻辑处理流程示例
saveAsTextFile() / saveAsObjectFile() / saveAsHadoopFile() / saveAsSequenceFile() 操作
之前介绍的 action() 数据操作都是将 rdd 中的数据聚合到 Driver 端输出的。实际上,Spark 应用产生的数据量很大,因此更多的需求是将 rdd 中的数据直接输出到分布式文件系统,如 HDFS 中。由于 rdd 中的数据可以是各种类型,如 Int、String、自定义对象等,我们需要各种输出操作来满足这些数据类型的输出需求。Spark 主要提供了 saveAsTextFile()、saveAsObjectFile()、saveAsSequenceFile() 等数据输出操作,这些操作都是将 rdd 中的 record 进行格式转化后,直接写入分布式文件系统中的,逻辑简单,此处不再给出具体的流程图。
- saveAsTextFile():针对 String 类型的 record,将 record 转化为
<NullWriter, Text>类型,然后一条条输出。NullWriter 的意思是空写,也就是每条输出数据只包含类型为文本的 Value。 - saveAsObjectFile():针对普通对象类型,将 record 进行序列化,并且以每 10 个 record 为一组转化为
SequenceFile<NullWritable, Array[Object]>格式,调用 saveAsSequenceFile() 写入 HDFS 中。 - saveAsSequenceFile():针对
<K, V>类型的 record,将 record 进行序列化后,以 SequenceFile 形式写入分布式文件系统中。
这些操作都是基于 saveAsHadoopFile() 实现的。saveAsHadoopFile() 连接 HDFS,进行必要的初始化和配置,然后把文件写入 HDFS 中。关于 SequenceFile 的存储格式,可以参考书籍《Hadoop 权威指南》[70]。
3.5 对比 MapReduce,Spark 的优缺点
我们已经知道 Spark 是如何设计和实现数据处理流程的,这里我们再深入思考一下,为什么 Spark 能够替代 MapReduce 成为主流的大数据处理框架呢?对比 MapReduce,Spark 究竟有哪些优势?
目前,我们只分析了 Spark 的编程模型和逻辑执行流程,从编程模型角度来说,Spark 的编程模型更具有通用性和易用性。
(1)通用性:基于函数式编程思想,MapReduce 将数据类型抽象为 <K, V> 格式,并将数据处理操作抽象为 map() 和 reduce() 两个算子,这两个算子可以表达一大部分数据处理任务。因此,MapReduce 为这两个算子设计了固定的处理流程 map—Shuffle—reduce。在 3.3 节和 3.4 节中,我们看到数据处理流程其实多种多样,map—Shuffle—reduce 模式只适用于表达类似 foldByKey()、reduceByKey()、aggregateByKey() 的处理流程,而像 cogroup()、join()、cartesian()、coalesce() 的流程需要更灵活的表达方式。那么如何达到灵活呢?Spark 转变了思路,在两方面进行了优化改进:一方面借鉴了 DryadLINQ/FlumeJava 的思想,将输入/输出、中间数据抽象表达为一个数据结构 RDD,相当于在 Java 中定义了 class,然后可以根据不同类型的中间数据,生成不同的 RDD(相当于 Java 中生成不同类型的 object)。这样,数据结构就变得灵活了,不再拘泥于 MapReduce 中的 <K, V> 格式,而且中间数据变得可定义、可表示、可操作、可连接。另一方面通过可定义的数据依赖关系来灵活连接中间数据。在 MapReduce 中,数据依赖关系只有 ShuffleDependency,而在 3.3 节和 3.4 节中,我们看到了数据处理操作包含多种多样的数据依赖关系,Spark 对这些数据依赖关系进行了分类,并总结出 ShuffleDependency、NarrowDependency(包含多种子依赖关系)。这样,Spark 可以根据数据操作的计算逻辑灵活选择数据依赖关系来实现。另外,Spark 使用 DAG 图来组合数据处理操作,比固定的 map—Shuffle—reduce 处理流程表达能力更强。
(2)易用性:基于灵活的数据结构和依赖关系,Spark 原生实现了很多常见的数据操作,如 MapReduce 中的 map()、reduceByKey(),SQL 中的 filter()、groupByKey()、join()、sortByKey(),Pig Latin 中的 cogroup(),集合操作 union()、intersection(),以及特殊的 zip() 等。通过使用和组合这些操作,开发人员容易实现复杂的数据处理流程。另外,由于数据结构 RDD 上的操作可以由 Spark 自动并行化,程序开发时更像在写普通程序,不用考虑这些操作到底是本地的还是由 Spark 分布执行的。另外,使用 RDD 上的数据操作,开发人员更容易将数据操作与普通程序的控制流进行结合。例如,在实现迭代程序时,可以使用普通程序的 while 语句,而 while 循环内部可以使用 RDD 操作。在 MapReduce 中,实现迭代程序比较困难,需要不断手动提交 job,而 Spark 提供了 action() 操作,job 分割和提交都完全由 Spark 框架来进行,易用性得到了进一步提高。
既然 Spark 有这么多优点,是不是意味着 Spark 可以完全满足大数据处理的需求呢?答案是否定的,Spark 的编程模型仍然存在一些缺点。虽然 Spark 比 MapReduce 更加通用、易用,但还不能达到普通语言(如 Java)的灵活性,具体存在两个缺点。
第一个,Spark 中的操作都是单向操作,单向的意思是中间数据不可修改。在普通 Java 程序中,在数据结构中存放的数据是可以直接被修改的,而 Spark 只能生成新的数据作为修改后的结果。
第二个,Spark 中的操作是粗粒度的。粗粒度操作是指 RDD 上的操作是面向分区的,也就是每个分区上的数据操作是相同的。假设我们处理 partition1 上的数据时需要 partition2 中的数据,并不能通过 RDD 的操作访问到 partition2 的数据,只能通过添加聚合操作来将数据汇总在一起处理,而普通 Java 程序的操作是细粒度的,我们随时可以访问数据结构中的数据。
当然,这两个缺点也是并行化设计权衡后的结果,即这两个缺点是并行化的优点,粗粒度可以方便并行执行,如一台机器处理一个分区,而单向操作有利于错误容忍,后面章节我们会具体讨论。
3.6 本章小结
本章首先介绍了 Spark 应用的逻辑处理流程,以及分布式数据结构 RDD、数据依赖关系建立的规则等,然后详细讨论了每个常见 transformation() 操作和 action() 操作。至此,我们已经学会 Spark 的基本操作,可以使用这些操作来开发大数据处理应用,就像学会了 Java API 我们可以开发应用程序一样。
然而,我们目前只讨论了应用在 Spark 中是如何表示的,并没有讨论这些应用是如何执行的。如果我们想开发高效的应用程序,那么需要进一步了解应用是怎么执行的。在下一章中,我们会介绍如何将应用转化为可分布执行的任务,也就是如何生成物理执行计划。
3.7 扩展阅读
Spark从1.6版本开始提供了DataSet、DataFrame数据结构和API。其中DataSet是RDD的升级版,既包含RDD的一些数据操作,如map()、union()、groupByKey()等,也包含一些面向SQL的数据操作,如select()、where()、orderBy()等。DataFrame相当于面向二维表数据的DataSet,将每个record表示为二维表中的Row。使用DataSet、Data