第7章 数据缓存机制
本章首先介绍数据缓存的意义,之后介绍Spark中数据缓存机制的设计原理,包括哪些数据需要缓存、应用包含缓存时的逻辑处理流程和物理执行计划、缓存级别、缓存数据写入和读取方法、用户接口的设计、缓存数据的替换与回收方法。最后,与Hadoop MapReduce的缓存机制进行对比。
7.1 数据缓存的意义
在第2章中,我们以GroupByTest为例展示了Spark的数据缓存机制。数据缓存机制的主要目的是加速计算。具体地,在应用执行过程中,数据缓存机制对某些需要多次使用(重用)的数据进行缓存。这样,当应用需要再次访问这些数据时,可以直接从缓存中读取,避免再次计算,从而减少应用的执行时间。
迭代型应用与交互式应用
- 迭代型应用:如果每轮迭代时都需要读取一个固定数据(如训练数据的特征矩阵或输入图)来进行计算,可以将这个固定数据进行缓存,加快读取和计算速度。
- 交互式应用(如交互式SQL):需要不断地对一个固定数据进行查询分析(执行不同的SQL语句),如果对这个固定数据进行缓存,则可以加快查询分析速度。
7.2 数据缓存机制的设计原理
既然数据缓存能够加速计算,那么如何设计一个高效的缓存机制呢?这其中涉及决定哪些数据需要被缓存,包含数据缓存操作的逻辑处理流程和物理执行计划,缓存级别,缓存数据的写入方法,缓存数据的读取方法,用户接口的设计,缓存数据的替换与回收方法等内容。下面我们介绍一下这些内容,然后在后续章节中介绍具体实现。
7.2.1 决定哪些数据需要被缓存
对于用户来说,首先要知道的是哪些数据需要被缓存,我们通过一个简单的例子来回答这个问题。
示例:共享mappedRDD的应用程序
该示例的具体逻辑处理流程如图7.1所示。示例应用首先对输入数据进行map()计算,得到mappedRDD,然后对mappedRDD依次进行两种计算:一种是reduceByKey+foreach(println),另一种是groupByKey+foreach(println)。由于该应用有两个foreach()操作,所以会形成两个job。这两个job虽然都是在mappedRDD上进行计算的,但由于用户没有对mappedRDD进行缓存,Spark仍然认为这两个job都是从inputRDD开始计算的。
图7.1 示例程序生成的两个job,红色箭头表示具有ShuffleDependency
观察图7.1可以发现,生成的两个job中
inputRDD => mappedRDD的计算流程一样,那么理论上第2个job可以直接从mappedRDD开始进行计算。理想中的数据处理流程应该如图7.2所示,第1个job不变,第2个job变为mappedRDD: MapPartitionsRDD => ShuffledRDD => groupedRDD: MapPartitionsRDD。为了实现图7.2中的流程,用户可以在程序中声明mappedRDD需要被缓存,即在foreach()操作之前添加mappedRDD.cache()语句,去掉示例中的注释。
重要说明
- ①
cache()操作表示将数据(此处是mappedRDD)直接写入内存。- ②
cache()操作是lazy操作,不是立即执行的,即执行到mappedRDD.cache()时,只标记mappedRDD需要被缓存到内存中,此时并不真正执行缓存操作,只有等到reducedRDD.foreach(println)生成job,job运行时再将mappedRDD写入内存。- ③
cache()操作只是将数据缓存到内存中,如果用户想将数据缓存到内存和磁盘中,那么可以使用persist(MEMORY_AND_DISK)接口,在后续章节中会详细介绍。
回到图7.2中,对mappedRDD进行缓存后可以避免第2个job再进行map()计算,但代价是需要占用空间来存储mappedRDD。当mappedRDD很大时,如包含上亿个record,存储mappedRDD会消耗大量存储空间,这时,需要权衡计算代价和存储代价。在这个例子中,我们发现map()操作的计算逻辑很简单,只需要非常少量的计算(仅仅对Key加1)即可从原始数据inputRDD中得到mappedRDD。也就是说,mappedRDD的计算代价很低。此时,若mappedRDD需要很大存储空间时,那么我们可以不对mappedRDD进行缓存,而直接从原始数据中计算得到。因此,是否缓存数据不仅需要考虑数据的计算代价,也需要考虑存储代价。
缓存决策要点
总的来说,缓存机制实际上是一种空间换时间的方法。具体地,如果数据满足以下3条,就可以进行缓存:
- 会被重复使用的数据。更确切地,会被多个job共享使用的数据。被共享使用的次数越多,那么缓存该数据的性价比越高。一般来说,对于迭代型和交互型应用非常适合。
- 数据不宜过大。过大会占用大量存储空间,导致内存不足,也会降低数据计算时可使用的空间。虽然缓存数据过大时也可以存放到磁盘中,但磁盘的I/O代价比较高,有时甚至不如重新计算快。
- 非重复缓存的数据。重复缓存的意思是如果缓存了某个RDD,那么该RDD通过OneToOneDependency连接的parent RDD就不需要被缓存了。例如,在图7.2中,我们已经对
mappedRDD进行了缓存,就没有必要再对inputRDD进行缓存了,除非有新的job需要使用inputRDD,且该job不使用mappedRDD。
图7.2 对mappedRDD进行缓存后生成的两个job,黄色圆圈表示被缓存的数据分区
除了RDD可以被缓存,广播数据和task计算结果数据也可以被缓存,我们会在第9章中讨论。另外,面向结构化的数据结构DataSet、DataFrame与RDD一样也可以被缓存。
7.2.2 包含数据缓存操作的逻辑处理流程和物理执行计划
在7.2.1节中以mappedRDD为例介绍了数据缓存的意义和场景,那么当应用存在数据缓存时,Spark生成逻辑处理流程和物理执行计划的规则与第3章、第4章介绍的规则有什么区别呢?
包含数据缓存操作的应用执行流程生成的规则
- Spark首先假设应用没有数据缓存,按照第3章的规则正常生成逻辑处理流程(RDD之间的数据依赖关系)。
- 然后从第2个job开始,将cached RDD之前的RDD都去掉,得到削减后的逻辑处理流程。
- 最后,按照第4章给出的正常规则将逻辑处理流程转化为物理执行计划。
我们举一个更复杂的例子来说明这些规则,在7.2.1节的例子基础上再添加一个job,并对groupedRDD、reducedRDD进行缓存和join(),如下。
复杂数据缓存示例CacheTest:包含3个RDD cache()操作和生成3个job。
根据生成原则,这个复杂的例子会生成3个job:如图7.3所示,第1个job是inputRDD => mappedRDD => reducedRDD => foreach()。第2个job是mappedRDD => ShuffledRDD => groupedRDD => foreach()。第3个job是(reducedRDD, groupedRDD) => CoGroupedRDD & MapPartitionsRDD => foreach()。
图7.3 包含3个RDD cached()操作的复杂应用生成的3个job,黄色圆圈表示被缓存的分区
图7.4 复杂应用生成的3个job(Web UI截图描述)
图7.5 复杂应用生成的8个stage(Web UI截图描述)
按照本节给出的生成规则,在没有cache()操作的情况下,确实会生成8个stage,如表7.1所示。cache()使得一些stage可以不必实际运行,我们通过删除线来删除不需要运行的stage及不需要计算的RDD,这样可以得到6个实际被运行的stage。
表7.1 复杂应用生成的8个stage
| Stage编号 | 描述 | 是否实际运行 |
|---|---|---|
| … | … | … |
缓存对stage的影响
如果没有对job0中的
reducedRDD进行缓存,那么job2要从mappedRDD开始计算,也就是要多计算stage 4和stage5中的mappedRDD => reducedRDD;如果没有对job1中的groupedRDD进行缓存,那么job1执行完以后,job2仍然需要再次计算stage 6和stage 7中的mappedRDD => ShuffledRDD => groupedRDD。
7.2.3 缓存级别
前面两节中,我们只是说明一些RDD可以被缓存,那么这些RDD具体被缓存到了哪里?如何存储呢?
为了满足不同的缓存需求,Spark从3个方面考虑了缓存级别(StorageLevel)。
- 存储位置:可以将数据缓存到内存和磁盘中,内存空间小但读写速度快,磁盘空间大但读写速度慢。
- 是否序列化存储:如果对数据(record以Java objects形式)进行序列化,则可以减少存储空间,方便网络传输,但是在计算时需要对数据进行反序列化,会增加计算时延。
- 是否将缓存数据进行备份:将缓存数据复制多份并分配到多个节点,可以应对节点失效带来的缓存数据丢失问题,但需要更多的存储空间。
最终,Spark将缓存级别分为12类,如表7.2所示。用户可以使用rdd.persist(StorageLevel)对RDD按这些缓存级别进行缓存,如mappedRDD.persist(MEMORY_AND_DISK)。
表7.2 Spark中的数据缓存级别
| 缓存级别 | 描述 |
|---|---|
MEMORY_ONLY | 只使用内存进行缓存,如果某个分区在内存中存放不下,就不对该分区进行缓存。当后续job中的task计算需要这个分区中的数据时,需要重新计算得到该分区。 |
MEMORY_AND_DISK | 如果内存不足时,则会将部分数据存放到磁盘上。 |
DISK_ONLY | 只使用磁盘进行缓存。 |
MEMORY_ONLY_SER | 将数据按照序列化方式存储,减少存储空间,但需要序列化/反序列化,增加计算延时。 |
MEMORY_AND_DISK_SER | 同上,但内存不足时写入磁盘。 |
| … | … (其他级别) |
缓存级别的存储特点
- 对于
MEMORY_ONLY级别:例如,在图7.6中,如果mappedRDD中的第1个分区没有被缓存,那么需要先执行task 0,算出mappedRDD第1个分区中的数据,然后才能执行task1、task2、task3。- 对于
MEMORY_AND_DISK和MEMORY_AND_DISK_SER:当内存不足时使用磁盘作为溢出存储。- 因为存储到磁盘前需要对数据进行序列化,所以
DISK_ONLY级别也需要序列化存储。
用户选择缓存级别时的考量
目前,Spark需要用户在缓存数据时自己选择缓存级别。不同应用的缓存级别需求不同,用户选择时需要考虑两个问题:
- 是否有足够内存、磁盘空间进行缓存?没有足够的内存、磁盘空间但又需要进行数据缓存,可以选择
MEMORY_AND_DISK或者MEMORY_AND_DISK_SER级别缓存数据。- 如果数据缓存到磁盘上,那么读取数据的时间是否大于重新计算出该数据的时间?如果是,则可以选择不缓存或者分配更大的内存来进行缓存。
7.2.4 缓存数据的写入方法
前面7.2.1节提到过,缓存操作是lazy操作,只有等到action()操作触发job运行时才实际执行缓存操作。更进一步,当需要进行数据缓存时,Spark既要将数据写入内存或磁盘,也需要执行下一步数据操作,那么如何决定缓存和计算的先后顺序呢?如图7.7所示,在task0中的map()、persist()、combine()的执行顺序又是怎样的呢?
图7.7 数据缓存与下一步操作的计算顺序问题(先执行缓存再执行下一步操作)
我们将图7.7中的上图放大为图7.8,根据流水线机制,map()每计算出一个record,如(1, a) => (2, a)后,就将其放入HashMap结构中进行combine()聚合。聚合后,mappedRDD中的(2, a)就可以被清除了。然后,map()再读入下一个record,计算得到(2, b) => (3, b),放入HashMap进行聚合,清除mappedRDD中的(3, b)。最后以同样操作读入(3, c)进行处理。
假设先执行combine(),再执行persist(),那么当combine()执行后,mappedRDD中的数据就已经被清除,无法再进行persist()。所以正确的执行顺序是map()每计算出mappedRDD中的一个record后,就执行persist()将该record写入内存或磁盘,然后再执行下一步操作。
缓存写入总结
总结:
rdd.cache()只是对RDD进行缓存标记的,不是立即执行的,实际在action()操作的job计算过程中进行缓存。当需要缓存的RDD中的record被计算出来时,及时进行缓存,再进行下一步操作。
缓存数据写入的实现细节:在实现中,Spark在每个Executor进程中分配一个区域,以进行数据缓存,该区域由BlockManager来管理。在图7.8中,task0和task1运行在同一个Executor进程中。对于task0,当计算出mappedRDD中的partition0后,将partition0存放到BlockManager中的memoryStore内。memoryStore包含了一个LinkedHashMap,用来存储RDD的分区。该LinkedHashMap中的Key是blockId,即rddId+partitionId,如rdd_1_1,Value是分区中的数据,LinkedHashMap基于双向链表实现。在图7.8中,task0和task1都将各自需要缓存的分区存放到了LinkedHashMap中。
图7.8 在task0和task1运行过程中对partition0和partition1进行缓存
7.2.5 缓存数据的读取方法
7.2.4节介绍了缓存数据的写入方法,这一节我们讨论如何读取缓存数据。
首先,Spark如何判断一个job是否需要读取缓存数据?当某个RDD被缓存后,该RDD的分区成为CachedPartitions。例如,在图7.2的例子中,当mappedRDD: MapPartitionsRDD被缓存后,mappedRDD的3个分区成为CachedPartitions。我们可以使用reducedRDD.toDebugString()来查看mappedRDD的3个CachedPartitions的存储位置及占用的空间大小。如下所示,mappedRDD被缓存到了内存中,占用872.0B的内存空间。
下一个在groupedRDD.toDebugString()中的job的计算过程如下:
(此处为代码输出示例,原文本中未提供具体输出,可参考Spark实际输出)
这个计算过程说明mappedRDD: MapPartitionsRDD中的3个CachedPartitions在第2个job计算过程中被读取,那么具体的读取过程是怎样的呢?
7.2.4节介绍过RDD的分区被缓存到BlockManager的memoryStore(也就是LinkedHashMap)中。如图7.9所示,假设mappedRDD的partition0和partition1被Worker节点1中的BlockManager缓存,而partition2被Worker节点2中的BlockManager缓存,那么当第2个job需要读取mappedRDD中的分区时,首先去本地的BlockManager中查找该分区是否被缓存。
在图7.9中,第2个job的3个task都被分到了Worker节点1上,其中task3和task4对应的CachedPartition在本地,因此直接通过Worker节点1的memoryStore读取即可。而task5对应的CachedPartition在Worker节点2上,需要通过远程访问,也就是通过getRemote()读取。远程访问需要对数据进行序列化和反序列化,远程读取时是一条条record读取,并得到及时处理的。
图7.9 Spark task读取本地和远程缓存数据的过程
7.2.6 7.2.6 用户接口的设计
前面在7.2.3节中提到,Spark提供了一个通用的缓存操作rdd.persist(StorageLevel),可以使用不同类型的缓存级别,如mappedRDD.persist(MEMORY_AND_DISK)。对于cache(),实际上等同于persist(MEMORY_ONLY)。那么,当用户想回收缓存数据时怎么办呢?Spark也提供了一个unpersist()操作来回收缓存数据,如mappedRDD.unpersist()。
接口限制
不管
persist()还是unpersist()都只能针对用户可见的RDD进行操作。如图7.10所示,在intersection()操作中,用户在程序中可见的是蓝色部分的RDD,即rdd1、rdd2和rdd3,在执行过程中的MapPartitionsRDD和CoGroupedRDD由Spark自动生成,并不能被用户操作。
图7.10 intersection()的逻辑处理流程示例
7.2.7 缓存数据的替换与回收方法
在7.2.2节介绍的复杂例子CacheTest中,如图7.3所示,我们缓存了3个RDD:mappedRDD、reducedRDD和groupedRDD。实际上,当对reducedRDD和groupedRDD完成缓存后,可以回收mappedRDD,因为第3个job只需要使用reducedRDD和groupedRDD。另外,在内存不足时,我们可以进行缓存替换。例如,当需要缓存reducedRDD而内存空间不足时,可以及时将mappedRDD进行替换,以腾出空间存储reducedRDD。因此,在内存空间有限的情况下,Spark需要缓存替换与回收机制。
缓存替换与回收是一个传统问题,如CPU中包含一级缓存和二级缓存,内存管理也存在页面置换机制,数据库中也包含缓存。研究人员针对缓存管理问题开发了多种缓存替换算法[98],如先入先出(FIFO, First Input First Output)替换算法、最近最久未使用(LRU, Least Recently Used)替换算法、最近最常被使用(MRU, Most Recently Used)替换算法等。
1. 自动缓存替换
那么针对Spark的缓存数据特点,如何设计和实现其缓存替换与回收策略呢?
缓存替换指的是当需要缓存的RDD大于当前可利用的空间时,使用新的RDD替换旧的RDD(可能有多个)。该过程由系统自动完成,对用户来说是无感知的。在Spark中,自动缓存替换需要解决以下两个问题。
(1)选择哪些RDD进行替换?
直观上来讲,如果旧的RDD会被再次利用,那么不应该被替换。然而,当前Spark采用动态生成job的方式,即在执行到一个action()操作时才会生成一个job,仅当遇到下一个action()时再生成下一个job。在执行过程中,Spark只知道cached RDD是否会被当前job用到,而不能预知cached RDD是否会被后续的job用到。因此,Spark决定一个cached RDD是否要被替换的权衡之计是根据该cached RDD的访问历史来判断。目前Spark采用LRU替换算法,即优先替换掉当前最长时间没有被使用过的RDD。这种方式有可能替换掉后续还会被使用的RDD。
(2)需要替换多少个旧的RDD,才开始存储新的RDD?
前面章节讨论过,如果需要缓存某个RDD,那么Spark会在计算该RDD过程中对其进行缓存,而且是每计算一个record就进行存储。因此,在缓存结束前,Spark不能预知该RDD需要的存储空间,也就无法判断需要替换多少个旧的RDD。为了解决这个问题,Spark采用动态替换策略:在当前可用内存空间不足时,每次通过LRU替换一个或多个RDD(具体数目与一个动态的阈值相关),然后开始存储新的RDD,如果中途存放不下,就暂停,继续使用LRU替换一个或多个RDD,依此类推,直到存放完新的RDD。当然,如果替换掉所有旧的RDD都存不下新的RDD,那么需要分两种情况处理:
- 如果新的RDD的存储级别里包含磁盘,那么可以将新的RDD存放到磁盘中;
- 如果新的RDD的存储级别只是内存,那么就不存储该RDD了。
2. Spark LRU算法的实现及讨论
LRU替换策略的确切含义是优先替换掉当前最久未被使用的RDD。如果查看Spark的源码,则会发现好像没有相关的LRU算法实现代码。实际上,Spark直接利用了7.2.4节中介绍的LinkedHashMap自带的LRU功能实现了缓存替换。LinkedHashMap使用双向链表实现,每当Spark插入或读取其中的RDD分区数据时,LinkedHashMap自动调整链表结构,将最近插入或者最近被读取的分区数据放在表头,这样链表尾部的分区数据就是最近最久未被访问的分区数据,替换时直接将链表尾部的分区数据删除。因此,LinkedHashMap本身就形成了一个LRU cache。LinkedHashMap中的Key存放blockId,如blockId=rdd_0_1表示rdd0的第2个分区。Spark目前采用LRU替换策略,但同时也在开发新的策略,具体可见SPARK-14289 (Support multiple eviction strategies for cached RDD partitions)[99]。
注意
此外,在进行缓存替换时,RDD的分区数据不能被该RDD的其他分区数据替换。例如,Spark在缓存中存放了
newRDD的partition0和partition1后,就没有空间再放入newRDD的partition2了。此时,Spark不能删除newRDD的partition0和partition1来缓存partition2,因为被替换的RDD和要缓存的RDD是同一个RDD。
3. 用户主动回收缓存数据
上面我们提到Spark难以获取cached RDD的生命周期,也就难以精确、智能地进行缓存替换。Spark为了弥补这个缺点,允许用户自己设置进行回收的RDD和回收的时间。方法是使用unpersist()。不同于persist()的延时生效,unpersist()操作是立即生效的。用户还可以设定unpersist()是同步阻塞的还是异步执行的,如unpersist(blocking=true)表示同步阻塞,即程序需要等待unpersist()结束后再进行下一步操作,这也是Spark的默认设定。而unpersist(blocking=false)表示异步执行,即边执行unpersist()边进行下一步操作。
由于unpersist()和persist()执行方式的区别,导致如果unpersist()语句设置的位置不当,则会造成与用户预期效果不一致的结果。下面例子展示了如果unpersist()语句放在不同位置,则会得到不同的执行效果。
示例:不同位置的unpersist()语句对应用执行的影响。
(1)将mappedRDD.unpersist()直接放在reducedRDD之后、foreach之前:导致的结果是不会对mappedRDD进行缓存。由于在action()之前既执行了cache()又执行了unpersist(),所以删除了Spark刚设置的mappedRDD缓存,意味着不对mappedRDD进行缓存。该情况生成的job信息,如图7.11所示,可以看到WebUI中的job在图中没有以绿色点出现,即没有任何RDD被缓存。
图7.11 unpersist()被设置在第一个foreach之前,没有进行缓存操作
(2)将mappedRDD.unpersist()放在groupedRDD之后、foreach之前:该情况可以正常对mappedRDD进行缓存,但第2个job无法读到缓存数据。由于在第1个job中,即reducedRDD.foreach()运行前设置了mappedRDD.cache(),所以mappedRDD被正常缓存,如图7.12所示,绿色点代表mappedRDD被成功缓存。然而,由于在第2个job中,即groupedRDD.foreach()运行前设置了mappedRDD.unpersist(),该操作立即回收了mappedRDD,因此在第2个job执行时不能读取到cached mappedRDD数据,需要重新计算mappedRDD,也没有绿色点出现。
图7.12 unpersist()被设置在第二个foreach之前,第二个job无法读取缓存数据
(3)mappedRDD.unpersist()被设置在末尾:该情况可以正常缓存和读取数据。由于unpersist()被设置在末尾,第1个job和第2个job正常执行,mappedRDD在第1个job中被缓存,也被第2个job正常读取。因此,如图7.13所示,两个job都以绿色点出现。两个job都结束后,mappedRDD被回收。此时如果还有下一个job且下一个job没有直接或间接使用mappedRDD,那么当前mappedRDD.unpersist()设置的位置是合理的。
图7.13 unpersist()被设置在末尾,两个job都可以正常缓存和读取数据
7.3 与Hadoop MapReduce的缓存机制进行对比
本章介绍了Spark缓存机制设计时面临的问题和解决方法。对比Hadoop MapReduce,会发现Spark的缓存机制是Spark的优势之一。Hadoop MapReduce虽然设计了一个DistributedCache缓存机制[100],但不是用于存放job运行的中间结果的,而是用于缓存job运行所需的文件的,如所需的jar文件、每个map task需要读取的辅助文件(如一部词典)、一些文本文件等。而且DistributedCache将缓存文件存放在每个worker的本地磁盘上,并不是内存中。Spark job一般包含多个操作,按照DAG图方式执行,也适用于迭代型应用,因此会产生大量中间数据和可复用的数据。Spark为这些数据设计了基于内存和磁盘的缓存机制,可以更好地加速应用执行。
然而,当前Spark的缓存机制也不是完美的,还存在很多缺陷。例如,缓存的RDD数据是只读的,不能修改;当前的缓存机制不能根据RDD的生命周期进行自动缓存替换等。
另外,当前的缓存机制只能用在每个Spark应用内部,即缓存数据只能在job之间共享,应用之间不能共享缓存数据。例如,当一个用户提交的WordCount1应用计算出了RDD后,即使对其进行缓存,也不能用于该用户的另一个WordCount2应用。为了解决应用间缓存数据共享问题,Spark研究者又开发了分布式内存文件系统Alluxio。感兴趣的读者可以参考《Alluxio:大数据统一存储原理与实践》[101]。
7.0 第7章 数据缓存机制
7.4 本章小结
本章讨论了Spark独特的缓存机制:如果用户设置某个RDD需要被缓存,那么Spark会在计算得到这个RDD时,及时将其存放到内存或者磁盘上,同时进行下一步操作。缓存数据可以被后续job读取,从而节省计算时间。当有多个RDD需要缓存且内存不足时,可能会引发缓存替换与回收问题,Spark设计了相应的自动替换机制,同时也为用户开放了缓存回收接口,允许用户自己回收数据。本章的知识点将有助于开发效率更高的Spark应用,也有助于研究人员进一步优化缓存机制。
图像列表
以下为本章涉及的图像索引(按所在页码列出):
- Image 781 on Page 229
- Image 784 on Page 230
- Image 789 on Page 232
- Image 792 on Page 233
- Image 795 on Page 234
- Image 796 on Page 234
- Image 799 on Page 235
- Image 800 on Page 235
- Image 805 on Page 237
- Image 808 on Page 238
- Image 813 on Page 240
- Image 818 on Page 242
- Image 821 on Page 243
- Image 822 on Page 243
- Image 825 on Page 244
- Image 828 on Page 245
- Image 835 on Page 248
- Image 838 on Page 249
- Image 839 on Page 249
- Image 842 on Page 250