第6章 Shuffle机制
本章首先介绍Shuffle的意义及设计挑战,然后介绍Shuffle的设计思想、Spark中Shuffle框架的设计,以及支持高效聚合和排序的数据结构。最后,与Hadoop MapReduce的Shuffle机制对比。
6.1 Shuffle的意义及设计挑战
第4章介绍了Spark如何将应用的逻辑处理流程转化为物理执行计划,也介绍了如何执行计算任务(task),但是没有详细讨论上游和下游stage之间是如何传递数据的,即运行在不同stage、不同节点上的task间如何进行数据传递。这个数据传递过程通常被称为Shuffle机制。Shuffle解决的问题是如何将数据重新组织,使其能够在上游和下游task之间进行传递和计算。如果是单纯的数据传递,则只需要将数据进行分区、通过网络传输即可,没有太大难度,但Shuffle机制还需要进行各种类型的计算(如聚合、排序),而且数据量一般会很大。如何支持这些不同类型的计算,如何提高Shuffle的性能都是Shuffle机制设计的难点问题。
如图6.1所示,我们通过观察包含ShuffleDependency的典型数据操作会发现,Shuffle的设计和实现需要面对多个挑战。
图6.1 包含ShuffleDependency的典型数据操作的逻辑处理流程
(图例说明:图中展示了一个包含ShuffleDependency的典型数据操作流程,涉及map task与reduce task之间的数据传递及聚合操作。)
挑战一:计算的多样性
Shuffle机制分为Shuffle Write和Shuffle Read两个阶段,前者主要解决上游stage输出数据的分区问题,后者主要解决下游stage从上游stage获取数据、重新组织、并为后续操作提供数据的问题。如图6.1所示,在进行Shuffle Write/Read时,有些操作需要对数据进行一定的计算。例如:
- 聚合:
groupByKey()需要将Shuffle Read的<K, V>record聚合为<K, list(V)>record(图6.1第1个图中的<K, CompactBuffer(V)>,Spark采用CompactBuffer来实现list)。 - combine:
reduceByKey()需要在Shuffle Write端进行combine()。 - 排序:
sortByKey()需要对Shuffle Read的数据按照Key进行排序。
那么,如何建立一个统一的Shuffle框架来支持这些操作呢?如何根据不同数据操作的特点,灵活地构建Shuffle Write/Read过程呢?如何确定聚合函数、数据分区、数据排序的执行顺序呢?
挑战二:计算的耦合性
如图6.1所示,有些操作包含用户自定义聚合函数,如aggregateByKey(seqOp, combOp)中的seqOp和combOp,以及reduceByKey(func)中的func,这些函数的计算过程和数据的Shuffle Write/Read过程耦合在一起。例如,aggregateByKey(seqOp, combOp)在Shuffle Write数据时需要调用seqOp来进行combine(),在Shuffle Read数据时需要调用combOp来对数据进行聚合。对于Shuffle Read数据需要聚合的情况,具体在什么时候调用这些聚合函数呢?是先读取数据再进行聚合,还是边读取数据边进行聚合呢?
挑战三:中间数据存储问题
在Shuffle机制中需要对数据进行重新组织(分区、聚合、排序等),也需要进行一些计算(执行聚合函数),那么在Shuffle Write/Read过程中的中间数据如何表示?如何组织?如何存放?如果Shuffle的数据量太大,那么内存无法存下怎么办?
上述问题使得Shuffle机制的设计和实现要考虑得非常全面,事实上,很难设计出一个完美方案来解决上述所有问题。下面我们讨论一些解决上述问题可能的方法,以及Spark采用的实现方法。
6.2 Shuffle的设计思想
现在我们切换到Spark设计者视角,思考如何解决Shuffle机制的技术问题。我们先从简单的问题着手,一步步优化我们的设计。为了方便讨论,本章主要讨论包含一个ShuffleDependency的数据操作,其Shuffle解决方案可以直接推广到包含多个ShuffleDependency的情况,如join(),即join()中的每个ShuffleDependency可以复用单个ShuffleDependency的解决方案。
为了方便讨论,在单个ShuffleDependency情况下,我们将上游的stage称为map stage,将下游stage称为reduce stage。相应地,map stage包含多个map task,reduce stage包含多个reduce task。
6.2.1 解决数据分区和数据聚合问题
解决Shuffle机制中最基础的两个问题:数据分区问题和数据聚合问题。
(1)数据分区问题
该问题针对Shuffle Write阶段:如何对map task输出结果进行分区,使得reduce task可以通过网络获取相应的数据?
数据分区问题解决方案:该问题包含两个子问题。
第1个子问题:如何确定分区个数?分区个数与下游stage的task个数一致。在第4章中讨论过,分区个数可以由用户自定义,如groupByKey(numPartitions)中的numPartitions一般被定义为集群中可用CPU个数的1~2倍,即将每个map task的输出数据划分为numPartitions份,相应地,在reduce stage中启动numPartitions个task来获取并处理这些数据。如果用户没有定义,则默认分区个数是parent RDD的分区个数的最大值[^96]。如图6.2的左图所示,在没有定义join(numPartitions)中的分区个数numPartitions的情况下,取两个parent RDD的分区的最大值为2。
第2个子问题:如何对map task输出数据进行分区?解决方法是对map task输出的每一个<K, V> record,根据Key计算其partitionId,具有不同partitionId的record被输出到不同的分区(文件)中。如图6.2的右图所示,下游stage中只有两个task,分区个数为2。map task需要将其输出数据分为两份,方法是让map()操作计算每个输出record的partitionId = Hash(Key) % 2,根据partitionId将record直接输出到不同分区中。这种方法非常简单,容易实现,但不支持Shuffle Write端的combine()操作。
图6.2
join()的逻辑处理流程与Shuffle Write/Read过程(左图:join()的逻辑处理流程,展示了两个parent RDD的分区;右图:Shuffle Write/Read过程,map task根据partitionId将数据分到两个分区,reduce task获取后聚合。)
(2)数据聚合问题
该问题针对Shuffle Read阶段:如何获取上游不同task的输出数据并按照Key进行聚合?如groupByKey()中需要将不同task获取到的<K, V> record聚合为<K, list(V)>(实现时为<K, CompactBuffer(V)>),reduceByKey()将<K, V> record聚合为<K, func(list(V))>。
数据聚合问题解决方案:数据聚合的本质是将相同Key的record放在一起,并进行必要的计算,这个过程可以利用C++/Java语言中的HashMap实现。方法是使用两步聚合(two-phase aggregation):先将不同tasks获取到的<K, V> record存放到HashMap中,HashMap中的Key是K,Value是list(V)。然后,对于HashMap中每一个<K, list(V)> record,使用func计算得到<K, func(list(V))> record。如图6.2的右图所示,join()在Shuffle Read阶段将来自不同task的数据以HashMap方式聚合在一起,由于join()没有聚合函数,将record按Key聚合后直接执行下一步操作,使用cartesian()计算笛卡儿积。而对于reduceByKey(func)来说,需要进一步使用func()对相同Key的record进行聚合。如图6.3的左图所示,两步聚合的第1步是将record存放到HashMap中,第2步是使用func()(此处是sum())函数对list(V)进行计算,得到最终结果。
图6.3 两步聚合和在线聚合的区别
(左图:两步聚合——先存入HashMap,再对每个Key的list执行func;右图:在线聚合——每个record加入HashMap时立即执行func并更新结果。)
两步聚合方案的优点是:可以解决数据聚合问题,逻辑清晰、容易实现。缺点是:所有Shuffle的record都会先被存放在HashMap中,占用内存空间较大。另外,对于包含聚合函数的操作,如reduceByKey(func),需要先将数据聚合到HashMap中以后再执行func()聚合函数,效率较低。
优化方案:对于reduceByKey(func)等包含聚合函数的操作来说,我们可以采用一种**在线聚合(Online aggregation)**的方法来减少内存空间占用。如图6.3的右图所示,该方案在每个record加入HashMap时,同时进行func()聚合操作,并更新相应的聚合结果。具体地,对于每一个新来的<K, V> record,首先从HashMap中get出已经存在的结果V' = HashMap.get(K),然后执行聚合函数得到新的中间结果V'' = func(V, V'),最后将V''写入HashMap中,即HashMap.put(K, V'')。一般来说,聚合函数的执行结果会小于原始数据规模,即Size(func(list(V))) < Size(list(V)),如sum()、max()等,所以在线聚合可以减少内存消耗。在线聚合将Shuffle Read和聚合函数计算耦合在一起,可以加速计算。但是,对于不包含聚合函数的操作,如groupByKey()等,在线聚合和两步聚合没有差别,因为这些操作不包含聚合函数,无法减少中间数据规模。
6.2.2 解决map()端combine问题
有了基本的Shuffle Write端数据分区功能和Shuffle Read端数据聚合功能以后,我们开始完善方案,首先考虑如何支持Shuffle Write端的combine功能。
需要进行combine操作:进行combine操作的目的是减少Shuffle的数据量。根据第3章的分析,只有包含聚合函数的数据操作需要进行map()端的combine,具体包括reduceByKey()、foldByKey()、aggregateByKey()、combineByKey()、distinct()等。对于不包含聚合函数的操作,如groupByKey(),我们即使进行了combine操作,也不能减少中间数据的规模。
combine解决方案:从本质上讲,combine和Shuffle Read端的聚合过程没有区别,都是将<K, V> record聚合成<K, func(list(V))>。不同的是,Shuffle Read端聚合的是来自所有map task输出的数据,而combine聚合的是来自单一task输出的数据。因此仍然可以采用Shuffle Read端基于HashMap的解决方案。具体地,首先利用HashMap进行combine,然后对HashMap中每一个record进行分区,输出到对应的分区文件中。
6.2.3 解决sort问题
支持了Shuffle Write端的combine功能后,我们还要考虑如何支持数据排序功能。有些操作如sortByKey()、sortBy()需要将数据按照Key进行排序,那么如何在Shuffle机制中完成排序呢?该问题包含以下两个子问题。
(1)在哪里执行sort?
首先,在Shuffle Read端必须执行sort,因为从每个task获取的数据组合起来以后不是全局按Key进行排序的。其次,理论上,在Shuffle Write端不需要排序,但如果进行了排序,那么Shuffle Read获取到(来自不同task)的数据是已经部分有序的数据,可以减少Shuffle Read端排序的复杂度。
(2)何时进行排序,即如何确定排序和聚合的顺序?
-
第1种方案:先排序再聚合。这种方案需要先使用线性数据结构如Array,存储Shuffle Read的
<K, V>record,然后对Key进行排序,排序后的数据可以直接从前到后进行扫描聚合,不需要再使用HashMap进行hash-based聚合。这种方案也是Hadoop MapReduce采用的方案。优点是既可以满足排序要求又可以满足聚合要求;缺点是需要较大内存空间来存储线性数据结构,同时排序和聚合过程不能同时进行,即不能使用在线聚合,效率较低。 -
第2种方案:排序和聚合同时进行。我们可以使用带有排序功能的Map,如TreeMap来对中间数据进行聚合。每次Shuffle Read获取到一个record,就将其放入TreeMap中与现有的record进行聚合,过程与HashMap类似,只是TreeMap自带排序功能。这种方案的优点是排序和聚合可以同时进行;缺点是相比HashMap,TreeMap的排序复杂度较高,TreeMap的插入时间复杂度是O(n log n),而且需要不断调整树的结构,不适合数据规模非常大的情况。
-
第3种方案:先聚合再排序。即维持现有基于HashMap的聚合方案不变,将HashMap中的record或record的引用放入线性数据结构中进行排序。这种方案的优点是聚合和排序过程独立,灵活性较高,而且之前的在线聚合方案不需要改动;缺点是需要复制(copy)数据或引用,空间占用较大。Spark选择的是第3种方案,设计了特殊的HashMap来高效完成先聚合再排序的任务,这会在6.4节中详细介绍。
6.2.4 解决内存不足问题
上述方案已经解决了Shuffle机制中的分区和计算问题,但还有一个性能问题:Shuffle数据量过大导致内存放不下怎么办?由于我们使用HashMap对数据进行combine和聚合,在数据量大的时候,会出现内存溢出。这个问题既可能出现在Shuffle Write阶段,又可能出现在Shuffle Read阶段。
解决方案:使用内存+磁盘混合存储方案。先在内存(如HashMap)中进行数据聚合,如果内存空间不足,则将内存中的数据spill到磁盘上,此时空闲出来的内存可以继续处理新的数据。此过程可以不断重复,直到数据处理完成。然而,问题是spill到磁盘上的数据实际上是部分聚合的结果,并没有和后续的数据进行过聚合。因此,为了得到完整的聚合结果,我们需要在进行下一步数据操作之前对磁盘上和内存中的数据进行再次聚合,这个过程我们称为“全局聚合”。为了加速全局聚合,我们需要将数据spill到磁盘上时进行排序,这样全局聚合才能够按顺序读取spill到磁盘上的数据,并减少磁盘I/O。具体做法将在后面详细描述。
6.3 Spark中Shuffle框架的设计
6.2节我们介绍了解决Shuffle机制中数据分区、聚合、排序和内存不足问题的核心思想和方法,但是如何将这些方法融合到一个统一的Shuffle框架中,使得Spark可以根据不同数据操作的特点,灵活构建合适的Shuffle机制?本节我们将介绍Spark构建Shuffle框架的主要方法。
在Shuffle机制中Spark典型数据操作的计算需求如表6.1所示。
表6.1 在Shuffle机制中Spark典型数据操作的计算需求
数据操作 Shuffle Write端 Shuffle Read端 groupByKey() 分区 聚合 reduceByKey() 分区 + combine 聚合 sortByKey() 分区 排序 aggregateByKey() 分区 + combine 聚合 join() 分区 聚合 distinct() 分区 + combine 聚合 repartition() 分区 — coalesce() 分区 — (注:表中“—”表示不需要该阶段功能。更多操作可参考Spark文档。)
通过表6.1分析可以知道,在Shuffle Write端,目前只支持combine功能,并不支持按Key排序功能。当然,未来有些数据操作可能同时需要这两个功能,所以,Shuffle框架还是需要支持全部的功能。下面我们讨论Spark设计和实现的Shuffle Write/Read框架。
6.3.1 Shuffle Write框架设计和实现
在Shuffle Write阶段,数据操作需要分区、聚合和排序3个功能,但如表6.1所示,每个数据操作只需要其中的一个或两个功能。Spark为了支持所有的情况,设计了一个通用的Shuffle Write框架,框架的计算顺序为“map()输出 → 数据聚合 → 排序 → 分区”输出。
如图6.4所示,map task每计算出一个record及其partitionId,就将record放入类似HashMap的数据结构中进行聚合;聚合完成后,再将HashMap中的数据放入类似Array的数据结构中进行排序,既可按照partitionId,也可以按照partitionId + Key进行排序;最后根据partitionId将数据写入不同的数据分区中,存放到本地磁盘上。其中,聚合(aggregate,即combine)和排序(sort)过程是可选的,如果数据操作不需要聚合或者排序,那么可以去掉相应的聚合或排序过程。
在实现过程中,Spark对不同的情况进行了分类,以及针对性的优化调整,形成了不同的Shuffle Write方式。下面我们介绍在Shuffle Write框架下,Spark如何针对不同情况构建最适合的Shuffle Write方式。
图6.4 通用的Shuffle Write框架(包含“map()输出 → 数据聚合 → 排序 → 分区输出”的过程)
(流程示意:map task输出record → 放入HashMap聚合 → 将HashMap数据转入Array排序(按partitionId或partitionId+Key)→ 根据partitionId写入分区文件。)
(1)不需要map()端聚合(combine)和排序
这种情况最简单,只需要实现分区功能。如图6.5所示,map()依次输出<K, V> record,并计算其partitionId(PID),Spark根据partitionId,将record依次输出到不同的buffer中,每当buffer填满就将record溢写到磁盘上的分区文件中。分配buffer的原因是map()输出record的速度很快,需要进行缓冲来减少磁盘I/O。在实现代码中,Spark将这种Shuffle Write方式称为BypassMergeSortShuffleWriter,即不需要进行排序的Shuffle Write方式。
图6.5 不需要map()端聚合(combine)和排序的Shuffle Write流程(BypassMergeSortShuffleWriter)
(流程示意:map()输出record → 计算partitionId → 根据partitionId写入对应buffer → buffer满则溢写到磁盘分区文件。共n个分区,对应n个buffer和n个分区文件。)
该模式的优缺点:
- 优点:速度快,直接将record输出到不同的分区文件中。
- 缺点:资源消耗过高,每个分区都需要一个buffer(大小由
spark.shuffle.file.buffer控制,默认为32KB),且同时需要建立多个分区文件进行溢写。当分区个数太大,如10 000时,每个map task需要约320 MB的内存,会造成内存消耗过大,而且每个task需要同时建立和打开10 000个文件,造成资源不足。因此,该Shuffle方案适合分区个数较少的情况(<200)。
该模式适用的操作类型:map()端不需要聚合(combine)、Key不需要排序且分区个数较少(⇐spark.shuffle.sort.bypassMergeThreshold,默认值为200)。例如,groupByKey(100),partitionBy(100),sortByKey(100)等。
(2)不需要map()端聚合(combine),但需要排序
在这种情况下需要按照partitionId + Key进行排序。如图6.6所示,Spark采用的实现方法是建立一个Array(图6.6中的PartitionedPairBuffer)来存放map()输出的record,并对Array中元素的Key进行精心设计,将每个<K, V> record转化为<(PID, K), V> record存储;然后按照partitionId + Key对record进行排序;最后将所有record写入一个文件中,通过建立索引来标示每个分区。
图6.6 不需要map()端聚合(combine),但需要排序的Shuffle Write流程设计(SortShuffleWriter)
(流程示意:map()输出record → 将<K,V>转换为<(PID,K),V>存入PartitionedPairBuffer(Array结构)→ 对Array按(PID,K)排序 → 写入单个分区文件并建立索引。)
如果Array存放不下,则会先扩容,如果还存放不下,就将Array中的record排序后spill到磁盘上,等待map()输出完以后,再将Array中的record与磁盘上已排序的record进行全局排序,得到最终有序的record,并写入文件中。
该Shuffle模式被命名为SortShuffleWriter(KeyOrdering=true),使用的Array被命名为PartitionedPairBuffer。
该Shuffle模式的优缺点:
- 优点:只需要一个Array结构就可以支持按照
partitionId + Key进行排序,Array大小可控,而且具有扩容和spill到磁盘上的功能,支持从小规模到大规模数据的排序。同时,输出的数据已经按照partitionId进行排序,因此只需要一个分区文件存储,即可标示不同的分区数据,克服了BypassMergeSortShuffleWriter中建立文件数过多的问题,适用于分区个数很大的情况。 - 缺点:排序增加计算时延。
该Shuffle模式适用的操作:map()端不需要聚合(combine)、Key需要排序、分区个数无限制。目前,Spark本身没有提供这种排序类型的数据操作,但不排除用户会自定义,或者系统未来会提供这种类型的操作。sortByKey()操作虽然需要按Key进行排序,但这个排序过程在Shuffle Read端完成即可,不需要在Shuffle Write端进行排序。
另外,回想上一个BypassMergeSortShuffleWriter模式的缺点是,分区个数一旦过多(>200),就会出现buffer过大、建立和打开的文件数过多的问题。在这种情况下,应该采用什么样的Shuffle模式呢?
我们刚才分析了SortShuffleWriter的优点是只需要分配一个Array,大小可控,同时只输出一个文件就可以标示出不同的分区,可以用于解决BypassMergeSortShuffleWriter存在的buffer分配过多的问题。唯一缺点是,需要按PartitionId + Key进行排序,而BypassMergeSortShuffleWriter面向的操作不需要按Key进行排序。因此,我们只需要将“按PartitionId + Key排序”改为“只按PartitionId排序”,就可以支持“不需要map()端combine、不需要按照Key进行排序,分区个数过大”的操作。例如,groupByKey(300)、partitionBy(300)、sortByKey(300)。
(3)需要map()端聚合(combine),需要或者不需要按Key进行排序
在这种情况下,需要实现按Key进行聚合(combine)的功能。如图6.7的上图所示,Spark采用的实现方法是建立一个类似HashMap的数据结构对map()输出的record进行聚合。HashMap中的Key是“partitionId + Key”,HashMap中的Value是经过相同combine的聚合结果。在图6.7中,combine()是sum()函数,那么Value中存放的是多个record对应的Value相加的结果。聚合完成后,Spark对HashMap中的record进行排序。如果不需要按Key进行排序,如图6.7的上图所示,那么只按partitionId进行排序;如果需要按Key进行排序,如图6.7的下图所示,那么按partitionId + Key进行排序。最后,将排序后的record写入一个分区文件中。
如果HashMap存放不下,则会先扩容为两倍大小,如果还存放不下,就将HashMap中的record排序后spill到磁盘上。此时,HashMap被清空,可以继续对map()输出的record进行聚合,如果内存再次不够用,那么继续spill到磁盘上,此过程可以重复多次。当map()输出完成以后,将此时HashMap中的record与磁盘上已排序的record进行再次聚合(merge),得到最终的record,输出到分区文件中。
图6.7 包含combine的Shuffle Write流程设计(SortShuffleWriterWithCombine)
(上图:不需要按Key排序——map()输出record → 放入HashMap(Key为partitionId+Key,Value为combine结果)→ 按partitionId排序 → 写入分区文件。下图:需要按Key排序——同上,但排序时按partitionId+Key排序。)
该Shuffle模式的优缺点:
- 优点:只需要一个HashMap结构就可以支持map()端的combine功能,HashMap具有扩容和spill到磁盘上的功能,支持小规模到大规模数据的聚合,也适用于分区个数很大的情况。在聚合后使用Array排序,可以灵活支持不同的排序需求。
- 缺点:在内存中进行聚合,内存消耗较大,需要额外的数组进行排序,而且如果有数据spill到磁盘上,还需要再次进行聚合。在实现中,Spark在Shuffle Write端使用一个经过特殊设计和优化的HashMap,命名为PartitionedAppendOnlyMap,可以同时支持聚合和排序操作,相当于HashMap和Array的合体,其实现细节将在6.4节中介绍。
该Shuffle模式适用的操作:适合map()端聚合(combine)、需要或者不需要按Key进行排序、分区个数无限制的应用,如reduceByKey()、aggregateByKey()等。
总结:Shuffle Write框架需要执行的3个步骤是“数据聚合 → 排序 → 分区”。如果应用中的数据操作不需要聚合,也不需要排序,而且分区个数很少,那么可以采用直接输出模式,即BypassMergeSortShuffleWriter。为了克服BypassMergeSortShuffleWriter打开文件过多、buffer分配过多的缺点,也为了支持需要按Key进行排序的操作,Spark提供了SortShuffleWriter,使用基于Array的方法来按partitionId或partitionId + Key进行排序,只输出单一的分区文件即可。最后,为了支持map()端combine操作,Spark提供了基于HashMap的SortShuffleWriter,将Array替换为类似HashMap的操作来支持聚合操作,在聚合后根据partitionId或partitionId + Key对record进行排序,并输出分区文件。因为SortShuffleWriter按partitionId进行了排序,所以被称为sort-based Shuffle Write。
6.3.2 Shuffle Read框架设计和实现
在Shuffle Read阶段,数据操作需要3个功能:跨节点数据获取、聚合和排序。如表6.1所示,每个数据操作都需要其中的部分功能。Spark为了支持所有的情况,设计了一个通用的Shuffle Read框架,框架的计算顺序为“数据获取 → 聚合 → 排序”输出。
如图6.8所示,reduce task不断从各个map task的分区文件中获取数据(Fetch records),然后使用类似HashMap的结构来对数据进行聚合(aggregate),该过程是边获取数据边聚合。聚合完成后,将HashMap中的数据放入类似Array的数据结构中按照Key进行排序(sort by Key),最后将排序结果输出或者传递给下一个操作。如果不需要聚合或者排序,则可以去掉相应的聚合或排序过程。
图6.8 通用的Shuffle Read框架(包含“数据获取 → 聚合 → 排序 → 输出”的过程)
(流程示意:reduce task从各map task分区文件获取数据 → 边获取边用HashMap聚合 → 聚合完成后转入Array按Key排序 → 输出排序结果。)
6.0 第6章 Shuffle机制
6.3 Shuffle Read框架(续)
每个数据操作都需要其中的部分功能。Spark 为了支持所有的情况,设计了一个通用的 Shuffle Read 框架,框架的计算顺序为“数据获取 → 聚合 → 排序输出”。
通用流程
reduce task 不断从各个 map task 的分区文件中获取数据(Fetch records),然后使用类似 HashMap 的结构来对数据进行聚合(aggregate),该过程是边获取数据边聚合。聚合完成后,将 HashMap 中的数据放入类似 Array 的数据结构中按照 Key 进行排序(sort by Key),最后将排序结果输出或者传递给下一个操作。如果不需要聚合或者排序,则可以去掉相应的聚合或排序过程。
图6.8 通用的 Shuffle Read 框架(包含“数据获取→聚合→排序输出”的过程)
(图片占位 — 示意图描述:reduce task 从多个 map task 分区文件获取数据,经过聚合、排序后输出)
(1)不需要聚合,不需要按 Key 进行排序
这种情况最简单,只需要实现数据获取功能即可。如图6.9所示,等待所有的 map task 结束后,reduce task 开始不断从各个 map task 获取 <K, V> record,并将 record 输出到一个 buffer 中(大小为 spark.reducer.maxSizeInFlight=48MB),下一个操作直接从 buffer 中获取数据即可。
图6.9 不需要聚合,不需要按 Key 进行排序的 Shuffle Read 流程设计
(图片占位 — 示意图描述:reduce task 直接从各个 map task 的文件中拉取 record 到 buffer,然后输出给下一个操作)
优缺点
- 优点:逻辑和实现简单,内存消耗很小。
- 缺点:不支持聚合、排序等复杂功能。
适用操作:适合既不需要聚合也不需要排序的应用,如 partitionBy() 等。
(2)不需要聚合,需要按 Key 进行排序
在这种情况下,需要实现数据获取和按 Key 排序的功能。如图6.10所示,获取数据后,将 buffer 中的 record 依次输出到一个 Array 结构(PartitionedPairBuffer)中。由于这里采用了本来用于 Shuffle Write 端的 PartitionedPairBuffer 结构,所以还保留了每个 record 的 partitionId。然后,对 Array 中的 record 按照 Key 进行排序,并将排序结果输出或者传递给下一步操作。
当内存无法存下所有的 record 时,PartitionedPairBuffer 将 record 排序后 spill 到磁盘上,最后将内存中和磁盘上的 record 进行全局排序,得到最终排序后的 record。
图6.10 不需要聚合、需要按 Key 进行排序的 Shuffle Read 流程设计
(图片占位 — 示意图描述:数据从多个 map task 获取后进入 PartitionedPairBuffer,排序后输出,内存不足时 spill 到磁盘再全局排序)
优缺点
- 优点:只需要一个 Array 结构就可以支持按照 Key 进行排序,Array 大小可控,而且具有扩容和 spill 到磁盘上的功能,不受数据规模限制。
- 缺点:排序增加计算时延。
适用操作:适合 reduce 端不需要聚合,但需要按 Key 进行排序的操作,如 sortByKey()、sortBy() 等。
(3)需要聚合,不需要或需要按 Key 进行排序
在这种情况下,需要实现按照 Key 进行聚合,根据需要按 Key 进行排序的功能。如图6.11的上图所示,获取 record 后,Spark 建立一个类似 HashMap 的数据结构(ExternalAppendOnlyMap)对 buffer 中的 record 进行聚合。HashMap 中的 Key 是 record 中的 Key,Value 是经过相同聚合函数(func())计算后的结果。在图6.11中,聚合函数是 sum() 函数,那么 Value 中存放的是多个 record 对应 Value 相加后的结果。之后,如果需要按照 Key 进行排序,如图6.11的下图所示,则建立一个 Array 结构,读取 HashMap 中的 record,并对 record 按 Key 进行排序,排序完成后,将结果输出或者传递给下一步操作。
图6.11 需要聚合,不需要或需要按 Key 进行排序的 Shuffle Read 流程设计
(图片占位 — 上图:数据进入 ExternalAppendOnlyMap 进行聚合;下图:聚合后的 record 导入 Array 进行排序输出)
Spill 机制
如果 HashMap 存放不下,则会先扩容为两倍大小,如果还存放不下,就将 HashMap 中的 record 排序后 spill 到磁盘上。此时,HashMap 被清空,可以继续对 buffer 中的 record 进行聚合。如果内存再次不够用,那么继续 spill 到磁盘上,此过程可以重复多次。当聚合完成以后,将此时 HashMap 中的 record 与磁盘上已排序的 record 进行再次聚合,得到最终的 record,输出到分区文件中。
优缺点
- 优点:只需要一个 HashMap 和一个 Array 结构就可以支持 reduce 端的聚合和排序功能,HashMap 具有扩容和 spill 到磁盘上的功能,支持小规模到大规模数据的聚合。边获取数据边聚合,效率较高。
- 缺点:需要在内存中进行聚合,内存消耗较大,如果有数据 spill 到磁盘上,还需要进行再次聚合。另外,经过 HashMap 聚合后的数据仍然需要拷贝到 Array 中进行排序,内存消耗较大。在实现中,Spark 使用的 HashMap 是一个经过特殊优化的 HashMap,命名为
ExternalAppendOnlyMap,可以同时支持聚合和排序操作,相当于 HashMap 和 Array 的合体,其实现细节将在6.4节中介绍。
适用操作:适合 reduce 端需要聚合、不需要或需要按 Key 进行排序的操作,如 reduceByKey()、aggregateByKey() 等。
Shuffle Read 框架需要执行的3个步骤是“数据获取 → 聚合 → 排序输出”。如果应用中的数据操作不需要聚合,也不需要排序,那么获取数据后直接输出。对于需要按 Key 进行排序的操作,Spark 使用基于 Array 的方法来对 Key 进行排序。对于需要聚合的操作,Spark 提供了基于 HashMap 的聚合方法,同时可以再次使用 Array 来支持按照 Key 进行排序。总体来讲,Shuffle Read 框架使用的技术和数据结构与 Shuffle Write 过程类似,而且由于不需要分区,过程比 Shuffle Write 更为简单。当然,还有一些可优化的地方,如聚合和排序如何进行统一来减少内存 copy 和磁盘 I/O 等,这部分内容将在6.4节中介绍。
6.4 支持高效聚合和排序的数据结构
为了提高聚合和排序性能,Spark 为 Shuffle Write/Read 的聚合和排序过程设计了3种数据结构,如表6.2所示。这几种数据结构的基本思想是在内存中对 record 进行聚合和排序,如果存放不下,则进行扩容,如果还存放不下,就将数据排序后 spill 到磁盘上,最后将磁盘和内存中的数据进行聚合、排序,得到最终结果。
表6.2 支持高效聚合和排序的数据结构
(表格占位 — 应包含数据结构名称、用途、特点等列,原文表格内容未提供)
数据结构设计特征
仔细观察 Shuffle Write/Read 过程,我们会发现 Shuffle 机制中使用的数据结构的两个特征:
- 只需要支持 record 的插入和更新操作,不需要支持删除操作,这样我们可以对数据结构进行优化,减少内存消耗。
- 只有内存放不下时才需要 spill 到磁盘上,因此数据结构设计以内存为主,磁盘为辅。
Spark 中的 PartitionedAppendOnlyMap 和 ExternalAppendOnlyMap 都基于 AppendOnlyMap 实现。因此,我们先介绍 AppendOnlyMap 的原理。
6.4.1 AppendOnlyMap 的原理
AppendOnlyMap 实际上是一个只支持 record 添加和对 Value 进行更新的 HashMap。与 Java HashMap 采用“数组+链表”实现不同,AppendOnlyMap 只使用数组来存储元素,根据元素的 Hash 值确定存储位置,如果存储元素时发生 Hash 值冲突,则使用 二次地址探测法(Quadratic probing)[^97] 来解决 Hash 值冲突。
对于每个新来的 <K, V> record,先使用 Hash(K) 计算其存放位置,如果存放位置为空,就把 record 存放到该位置。如果该位置已经被占用,就使用二次探测法来找下一个空闲位置。对于图6.12中新来的 <K6, V6> record 来说,第1次找到的位置 Hash(K6) 已被 K2 占用。按照二次探测法向后递增1个 record 位置,也就是 Hash(K6)+1^2,发现位置已被 K3 占用,然后向后递增4个 record 位置(指数递增,Hash(K6)+2^2),发现位置没有被占用,放进去即可。
图6.12 使用数组和二次地址探测法来模拟 HashMap
(图片占位 — 蓝色部分存储 Key,白色部分存储 Value;图中展示 K1..K5 已存储,K6 经过三次探测找到空位)
假设又新来了一个 <K6, V7> record,需要与刚存放进 AppendOnlyMap 中的 <K6, V6> 进行聚合,聚合函数为 func(),那么首先查找 K6 所在的位置,查找过程与刚才的插入过程类似,经过3次查找取出 <K6, V6> record 中的 V6,进行 V′=func(V6, V7) 运算,最后将 V′ 写入 V6 的位置。
扩容
AppendOnlyMap 使用数组来实现的问题是,如果插入的 record 太多,则很快会被填满。Spark 的解决方案是,如果 AppendOnlyMap 的利用率达到 70%,那么就扩张一倍,扩张意味着原来的
Hash()失效,因此对所有 Key 进行 rehash,重新排列每个 Key 的位置。
排序
由于 AppendOnlyMap 采用了数组作为底层存储结构,可以支持快速排序等排序算法。实现方法,如图6.13所示,先将数组中所有的
<K, V>record 转移到数组的前端,用begin和end来标示起始位置,然后调用排序算法对[begin, end]中的 record 进行排序。对于需要按 Key 进行排序的操作,如sortByKey(),可以按照 Key 值进行排序;对于其他操作,只按照 Key 的 Hash 值进行排序即可。
输出
迭代 AppendOnlyMap 数组中的 record,从前到后扫描输出即可。
图6.13 对 AppendOnlyMap 中的元素进行排序输出
(图片占位 — 示意图描述:数组中的 record 被紧凑到前端,然后调用排序算法排序后输出)
6.4.2 ExternalAppendOnlyMap
AppendOnlyMap 的优点是能够将聚合和排序功能很好地结合在一起,缺点是只能使用内存,难以适用于内存空间不足的情况。为了解决这个问题,Spark 基于 AppendOnlyMap 设计实现了基于 内存+磁盘 的 ExternalAppendOnlyMap,用于 Shuffle Read 端大规模数据聚合。同时,由于 Shuffle Write 端聚合需要考虑 partitionId,Spark 也设计了带有 partitionId 的 ExternalAppendOnlyMap,名为 PartitionedAppendOnlyHashMap。这两个数据结构功能类似,我们先介绍 ExternalAppendOnlyMap。
工作原理:先持有一个 AppendOnlyMap 来不断接收和聚合新来的 record,当 AppendOnlyMap 快被装满时检查一下内存剩余空间是否可以扩展,如果可直接在内存中扩展则扩展,否则对 AppendOnlyMap 中的 record 进行排序,然后将 record 都 spill 到磁盘上。因为 record 不断到来,可能会多次填满 AppendOnlyMap,所以这个 spill 过程可以出现多次,最终形成多个 spill 文件。等 record 都处理完,此时 AppendOnlyMap 中可能还留存一些聚合后的 record,磁盘上也有多个 spill 文件。因为这些数据都经过了部分聚合,还需要进行全局聚合(merge)。因此,ExternalAppendOnlyMap 的最后一步是将内存中 AppendOnlyMap 的数据与磁盘上 spill 文件中的数据进行全局聚合,得到最终结果。
上述过程中涉及3个核心问题:
- 如何获知当前
AppendOnlyMap的大小?因为AppendOnlyMap中不断添加和更新 record,其大小是动态变化的,什么时候会超过内存界限是难以确定的。 - 如何设计 spill 的文件结构,使得可以支持高效的全局聚合?
- 怎样进行全局聚合?
(1)AppendOnlyMap 的大小估计
虽然我们知道 AppendOnlyMap 中持有的数组的长度和大小,但数组里面存放的是 Key 和 Value 的引用,并不是它们的实际对象(object)大小,而且 Value 会不断被更新,实际大小不断变化。因此,想准确得到 AppendOnlyMap 的大小比较困难。一种简单的解决方法是在每次插入 record 或对现有 record 的 Value 进行更新后,都扫描一下 AppendOnlyMap 中存放的 record,计算每个 record 的实际对象大小并相加,但这样会非常耗时。而且一般 AppendOnlyMap 会插入几万甚至几百万个 record,如果每个 record 进入 AppendOnlyMap 都计算一遍,则开销会很大。Spark 设计了一个 增量式的高效估算算法,在每个 record 插入或更新时根据历史统计值和当前变化量直接估算当前 AppendOnlyMap 的大小,算法的复杂度是 O(1),开销很小。在 record 插入和聚合过程中会定期对当前 AppendOnlyMap 中的 record 进行抽样,然后精确计算这些 record 的总大小、总个数、更新个数及平均值等,并作为历史统计值。进行抽样是因为 AppendOnlyMap 中的 record 可能有上万个,难以对每个都精确计算。之后,每当有 record 插入或更新时,会根据历史统计值和历史平均的变化值,增量估算 AppendOnlyMap 的总大小,详见 Spark 源码中的 SizeTracker.estimateSize() 方法。抽样也会定期进行,更新统计值以获得更高的精度。
(2)Spill 过程与排序
当 AppendOnlyMap 达到内存限制时,会将 record 排序后写入磁盘中。排序是为了方便下一步全局聚合(聚合内存和磁盘上的 record)时可以采用更高效的 merge-sort(外部排序+聚合)。那么,问题是根据什么对 record 进行排序的?自然想到的是根据 record 的 Key 进行排序,但是这就要求操作定义 Key 的排序方法,如 sortByKey() 等操作定义了按照 Key 进行的排序。大部分操作,如 groupByKey(),并没有定义 Key 的排序方法,也不需要输出结果是按照 Key 进行排序的。在这种情况下,Spark 采用 按照 Key 的 Hash 值进行排序 的方法,这样既可以进行 merge-sort,又不要求操作定义 Key 排序的方法。然而,这种方法的问题是会出现 Hash 值冲突,也就是不同的 Key 具有相同的 Hash 值。为了解决这个问题,Spark 在 merge-sort 的同时会比较 Key 的 Hash 值是否相等,以及 Key 的实际值是否相等。
解决了 spill 时如何对 record 进行排序的问题后,每当 AppendOnlyMap 超过内存限制,就会将其内部的 record 排序后 spill 到磁盘上,如图6.14所示,AppendOnlyMap 被填满了4次,也被 spill 到磁盘上4#### (3)全局聚合(merge-sort)
前面提到过,由于最终的 spill 文件和内存中的 AppendOnlyMap 都是经过部分聚合后的结果,其中可能存在相同 Key 的 record,因此还需要一个全局聚合阶段将 AppendOnlyMap 中的 record 与 spill 文件中的 record 进行聚合,得到最终聚合后的结果。全局聚合的方法就是建立一个最小堆或最大堆,每次从各个 spill 文件中读取前几个具有相同 Key(或者相同 Key 的 Hash 值)的 record,然后与 AppendOnlyMap 中的 record 进行聚合,并输出聚合后的结果。在图6.14中,在全局聚合时,Spark 分别从4个 spill 文件中提取第1个 <K, V> record,与还留在 AppendOnlyMap 中的第1个 record 组成最小堆,然后不断从最小堆中提取具有相同 Key 的 record 进行聚合(merge)。然后,Spark 继续读取 spill 文件及 AppendOnlyMap 中的 record 填充最小堆,直到所有 record 处理完成。由于每个 spill 文件中的 record 是经过排序的,按顺序读取和聚合可以保证能够对每个 record 得到全局聚合的结果。
总结
ExternalAppendOnlyMap是一个高性能的 HashMap,只支持数据插入和更新,但可以同时利用内存和磁盘对大规模数据进行聚合和排序,满足了 Shuffle Read 阶段数据聚合、排序的需求。
6.4.3 PartitionedAppendOnlyMap
PartitionedAppendOnlyMap 用于在 Shuffle Write 端对 record 进行聚合(combine)。PartitionedAppendOnlyMap 的功能和实现与 ExternalAppendOnlyMap 的功能和实现基本一样,唯一区别是 PartitionedAppendOnlyMap 中的 Key 是 “PartitionId + Key”,这样既可以根据 partitionId 进行排序(面向不需要按 Key 进行排序的操作),也可以根据 partitionId+Key 进行排序(面向需要按 Key 进行排序的操作),从而在 Shuffle Write 阶段可以进行聚合、排序和分区。
6.4.4 PartitionedPairBuffer
PartitionedPairBuffer 本质上是一个基于内存 + 磁盘的 Array,随着数据添加,不断地扩容,当到达内存限制时,就将 Array 中的数据按照 partitionId 或 partitionId+Key 进行排序,然后 spill 到磁盘上,该过程可以进行多次,最后对内存中和磁盘上的数据进行全局排序,输出或者提供给下一个操作。
6.5 与 Hadoop MapReduce 的 Shuffle 机制对比
至此,我们已经理解了 Spark Shuffle 机制要解决的问题、设计的原则,以及 Shuffle 框架的具体设计和实现。实际上,Spark 的 Shuffle 机制已经进行过多次演变。最早在 0.4 版本中,Shuffle Read 过程还只有基于 Java HashMap 的实现,后面经过不断的性能测试和设计调整,才有了现在完整的 Shuffle 框架、灵活的 Shuffle 策略,以及完善的数据结构支持。那么,最早在 Hadoop MapReduce 中已经有了 Shuffle 的策略和实现,为何不直接照搬过来呢?
回答这个问题前,先介绍一下 Hadoop MapReduce 的 Shuffle 机制是怎么工作的。Hadoop MapReduce 有明显的两个阶段,即 map stage 和 reduce stage。如图6.15所示,在 map stage 中,每个 map task 首先执行 map(K, V) 函数,再读取每个 record,并输出新的 <K, V> record。这些 record 首先被输出到一个固定大小的 spill buffer 里(一般为 100MB),spill buffer 如果被填满就将 spill buffer 中的 record 按照 Key 排序后输出到磁盘上。这个过程类似 Spark 将 map task 输出的 record 放到一个排序数组(PartitionedPairBuffer)中,不同的是 Hadoop MapReduce 是 严格按照 Key 进行排序 的,而 PartitionedPairBuffer 排序更灵活(可以按照 partitionId 进行排序,也可以按照 partitionId+Key 进行排序)。另外,由于 spill buffer 中的 record 只进行排序,不能完成聚合(combine)功能,所以 Hadoop MapReduce 在完成 map()、等待所有的 record 都 spill 到磁盘上后,启动一个专门的聚合阶段(图6.15中的 merge phase),使用 combine() 将所有 spill 文件中的 record 进行全局聚合,得到最终聚合结果。注意,这里需要进行多次全局聚合,因为每次只针对某个分区的 spill 文件进行聚合。
在 Shuffle Read 阶段,Hadoop MapReduce 先将每个 map task 输出的相应分区文件通过网络获取,然后放入内存,如果内存放不下,就先对当前内存中的 record 进行聚合和排序,再 spill 到磁盘上。图6.15中的 a, b, c, d, … 代表从不同 map task 获取的分区文件,每个文件里面包含许多个 record。由于每个分区文件中包含的 record 已经按 Key 进行了排序,聚合时只需要一个最小堆或者最大堆保存当前每个文件中的前几个 record 即可,聚合效率比较高,但需要占用大量内存空间来存储这些分区文件。等获取所有的分区文件时,此时可能存在多个 spill 文件及内存中剩余的分区文件,这时再启动一个专门的 reduce 阶段(图6.15中的 reduce phase)来将这些内存和磁盘上的数据进行全局聚合,这个过程与 Spark 的全局聚合过程没有什么区别,最后得到聚合后的结果。
图6.15 Hadoop MapReduce 的 Shuffle 机制
(图片占位 — 示意图描述:map 阶段:map task → spill buffer(排序)→ spill 到磁盘;merge phase:combine 聚合;reduce 阶段:从多个 map 拉取分区文件,用最小堆全局聚合)
下面总结一下 Hadoop MapReduce 的 Shuffle 机制的优点和缺点。
优点
- 流程固定、阶段分明:每个阶段读取什么数据、进行什么操作、输出什么数据都是确定性的。这种确定性使得实现起来比较容易。
- 内存消耗确定:map 阶段框架只需要一个大的 spill buffer,reduce 阶段框架只需要一个大的数组(
MergeQueue)来存放获取的分区文件中的 record。这样,什么时候将数据 spill 到磁盘上是确定的,也易于实现和内存管理。当然,用户定义的聚合函数,如combine()和reduce()的内存消耗是不确定的。- 严格排序带来高效聚合:对 Key 进行了严格排序,使得可以使用最小堆或最大堆进行聚合,非常高效。而且可以原生支持
sortByKey()。- 大规模数据稳定性:按 Key 进行排序和 spill 到磁盘上的功能,可以在 Shuffle 大规模数据时仍然保证能够顺利进行。
缺点
- 强制排序增加开销:强制按 Key 进行排序,大多数应用其实不需要严格地按照 Key 进行排序,如
groupByKey(),排序增加计算量。- 不能在线聚合,资源浪费:不管是 map() 端还是 reduce() 端,都是先将数据存放到内存或者磁盘上后,再执行聚合操作的,存储这些数据需要消耗大量的内存和磁盘空间。如果能够一边获取 record 一边聚合,那么对于大多数聚合操作,可以有效地减少存储空间,并减少时延。
- 临时文件过多:如果 map task 个数为 M,reduce task 个数为 N,那么 map 阶段集群会产生 M×N 个分区文件,当 M 和 N 较大时,总的临时文件个数过多。
Spark 如何克服这些缺点?
- 克服第1个缺点(强制排序)的方法是对操作类型进行分类,如 Spark 提供了按 partitionId 排序、按 Key 排序等多种方式来灵活应对不同操作的排序需求。
- 克服第2个缺点(不能在线聚合)的方法是采用 hash-based 聚合,也就是利用 HashMap 的在线聚合特性,将 record 插入 HashMap 时自动完成聚合过程,即 Spark 为什么设计
AppendOnlyMap等数据结构。 - 克服第3个缺点(临时文件问题)的方法是将多个分区文件合并为一个文件,按照 partitionId 的顺序存储,这也是 Spark 为什么要按照 partitionId 进行排序的原因。
总的来说,Spark 采用的是 hash + sort-based Shuffle 的方法,融合了 hash-based 和 sort-based Shuffle 的优点,根据不同操作的需求,灵活选择最合适的 Shuffle 方法。
另外,由于 Hadoop MapReduce 采用独立阶段聚合,而 Spark 采用在线聚合的方法,两者的聚合函数还有一个大的区别。MapReduce 的聚合函数 reduce() 接收的是一个 <K, list(V)> record,可以对每个 record 中的 list(V) 进行任意处理,而 Spark 中的聚合函数每当接收到一个 <K, V> record 时,就要立即进行处理,在流程上有一些受限。两者的区别类似下面的处理逻辑。
由此可见,在聚合过程中 Spark 需要对每个到来的 record 进行立即处理,而 Hadoop MapReduce 没有这个要求,所以更加灵活。
6.6 本章小结
本章涉及的术语和流程较为复杂,其中既包含了 Spark 当前实现方式的说明,也包含对整个 Shuffle 机制设计的抽象和思考。某些 Shuffle 方式虽然还没有对应的操作使用,但不排除未来会有一些操作被用到。因此,我们还是从更通用的角度对 Shuffle 机制进行总结。实际上,有些 Shuffle 的实现方式可以进行进一步优化来减少内存使用和提高效率,如后面会看到基于序列化的 Shuffle 方式——SerializedShuffle。