第1章 大数据处理框架概览
本章主要介绍大数据处理框架的基本概念,包括大数据的概念、大数据处理框架的概念及其发展历程。其中还介绍了大数据处理框架的编程模型和大数据应用运行时的四层结构,即用户层、分布式数据并行处理层、资源管理与任务调度层、物理执行层。在介绍这些技术内容的同时会总结相关的研究工作,扩充大数据处理框架的知识内容。
1.1 大数据及其带来的挑战
大数据越来越成为工业界和学术界的重要研究对象。企业和科研机构需要收集和处理大量数据,进而从大数据中获取知识或经济效益。例如,搜索引擎每天都在收集、处理、分析海量的网页及多媒体数据,并对外提供数据查询服务;社交网站每天记录大量的用户数据,组织形成虚拟的人际网络,提供社交和通信等服务;商业智能公司分析企业生产和销售的数据,为企业提供商务决策支持;学术研究机构也在天文、地理、物理、化学、生命科学等领域不断积累大量的实验数据,并从中分析挖掘各种科学知识。
互联网、云计算、物联网等技术的发展使得数据的产生速度越来越快、数据规模越来越大、数据类型越来越多。例如,社交网站 Facebook 每天要处理约 25 亿条消息和 500+TB 的新数据,用户每天上传 3 亿张照片[1]。早在 2008 年,Google 每天就要处理约 20 000 TB(20PB)的数据[2],在 YouTube 网站上用户每分钟会上传约 48h 的视频[3]。早在 2012 年,在 Twitter 上用户每天大约发布 1.75 亿条微博[4]。
在这样的背景下,大数据的概念被提出。大数据具有数据量大(Volume)、数据类型多样(Variety)、产生与处理速度快(Velocity)、价值高(Value)的“4V”特性[5]。这些特性对传统数据处理系统提出了新的挑战,使得传统数据处理系统难以在可接受的时间范围内对大数据进行高效处理。虽然出现在 21 世纪 70 年代的关系数据库解决了关系型数据的存储与 OLTP(On-line Transaction Processing,在线事务处理)问题,以及之后出现的数据仓库解决了数据建模及 OLAP(On-line analytical processing,在线分析处理)问题,但是在大数据环境下,传统的数据库和数据仓库都面临着可扩展性的问题。该问题导致这些传统软件难以处理大数据或者处理效率低下。为了解决这个问题,经过工业界和学术界十多年的探索和实践,多种可扩展的大数据处理框架应运而生。
1.2 大数据处理框架
为了高效处理大数据,工业界和学术界提出了很多分布式大数据处理框架。2004 年 Google 在计算机系统领域顶级会议 OSDI 上提出了基于分治、归并和函数式编程思想的 MapReduce 分布式计算框架[6]。这一框架受到了广泛关注,也获得了巨大成功。随后,Apache 社区对 Google File System[7] 和 MapReduce 进行了开源实现,并命名为 Hadoop[8]。经过多年发展,Hadoop 已经形成一个完整的生态系统,被工业界和学术界广泛使用,成为当时大数据存储和处理的实际标准。2007 年微软公司提出了 Dryad 分布式计算框架[9]。Dryad 的思路跟 MapReduce 有相似之处,但更加灵活。不同于 MapReduce 固定的数据处理流程,Dryad 允许用户将任务处理组织成有向无环图(Directed Acyclic Graph,DAG)来获得更强的数据处理表达能力。2012 年 UC Berkeley 的 AMPLab 提出了基于内存,适合迭代计算的 Spark 分布式处理框架[10,11]。该框架允许用户将可重用的数据缓存(cache)到内存中,同时利用内存进行中间数据的聚合,极大缩短了数据处理的时间。这些大数据处理框架拥有共同的编程模型,即 MapReduce-like 模型,采用“分治-聚合”策略来对数据进行分布并行处理。
1.3 大数据应用及编程模型
大数据应用一般是指为满足业务需求,运行在大数据处理框架之上,对大数据进行分布处理的应用,如 Hadoop MapReduce 应用和 Spark 应用。大数据应用在工业界和学术界广泛存在,如网页索引的构建、日志挖掘、大数据 SQL 查询、机器学习、社交网络图分析等。
针对不同的大数据应用,需要解决的问题是如何将其转化为可以运行在特定大数据处理框架之上的程序。为了解决这一问题,大数据处理框架为用户提供了简单且具有扩展性的编程模型。没有任何并行和分布式应用开发经验的用户也可以通过简单的编程模型来开发数据密集型应用。目前通用的大数据处理框架,如 Hadoop、Dryad 和 Spark,都是以 MapReduce 编程模型为基础的。MapReduce 编程模型可以被简单地表示为:
- map 阶段:
map<K1, V1> ⇒ list<K2, V2> - reduce 阶段:
reduce<K2, list(V2)> ⇒ list<K3, V3>
在图 1.1 中,WordCount 应用使用 MapReduce 框架来统计一篇英文文章中每个单词出现的次数。我们首先将文章(Page)按行拆分为多个分块(input split),图 1.1 中的 Page 被拆分为 4 个分块,每个分块有两行,每行包含行号(lineNo)和该行的内容(line)。在 map 阶段,我们对每一行执行 map(K=lineNo, V=line) 函数。该函数对 line 进行分词,并统计 line 中每个 word (w) 出现的次数。例如,line 1 中包含 5 个 w1 和 2 个 w2,就输出两个键值对(record),即 <w1, 5> 和 <w2, 2>。在 reduce 阶段,我们将包含相同词的 record 聚合在一起,形成 <word, list(count)>。例如,将多个 <w1, count> record 聚合为 <w1, list(5, 1, 9, 6)>,之后对 list 中的值(Value)进行累加,得到并输出 <w1, 21>。
图1.1 WordCount 应用的 MapReduce 执行流程
图中展示了 Page 被拆分为 4 个 input split,每个 split 包含两行(lineNo, line)。map 函数处理每一行输出 word 与出现次数,reduce 函数将相同 word 的 count 累加输出最终结果。
从计算框架来说,Hadoop 支持标准的 MapReduce 编程模型,并得到了广泛使用。然而,MapReduce 编程模型也存在一些局限性。例如,该模型不能直接对多张表格数据进行 join()。为了提高 MapReduce 编程模型的通用性,Dryad 和 Spark 设计了一些更一般的、对用户更友好的操作符(operator),如 flatMap()、groupByKey()、reduceByKey()、cogroup() 和 join() 等。这些操作基于 map() 和 reduce() 的函数式编程思想构建,可以表达更复杂的数据处理流程。
除了基于如上所述的编程模型开发大数据应用,用户也可以借助构建于框架之上的高层语言或者高层库来开发大数据应用。例如,在 Hadoop MapReduce 之上,Yahoo! 开发了 SQL-like 的 Pig Latin 语言[12],可以将 SQL-like 脚本转化成 Hadoop MapReduce 作业;Facebook 开发的分布式数据仓库 Hive[13] 构建在 Hadoop MapReduce 之上,也可以将类 SQL 查询分析语言转化成 Hadoop MapReduce 作业;Apache Mahout[14] 提供了基于 Hadoop MapReduce 的机器学习库;在 Spark 之上,GraphX[15] 提供了面向大规模图处理的库,MLlib[16] 提供了面向大规模机器学习的库,Spark SQL[17] 提供了基于 Spark 的 SQL 查询框架及语言。
1.4 大数据处理框架的四层结构
一个大数据应用可以表示为 <输入数据, 用户代码, 配置参数>。应用的输入数据一般以分块(如以 128MB 为一块)形式预先存储在分布式文件系统(如 HDFS[18])之上。用户在向大数据处理框架提交应用之前,需要指定数据存储位置,撰写数据处理代码,并设定配置参数。之后,用户将应用提交给大数据处理框架运行。
大数据处理框架大体可以分为四层结构:用户层、分布式数据并行处理层、资源管理与任务调度层、物理执行层。以 Apache Spark 框架为例,其四层结构如图 1.2 所示。在用户层中,用户需要准备数据、开发用户代码、配置参数。之后,分布式数据并行处理层根据用户代码和配置参数,将用户代码转化成逻辑处理流程(数据单元及数据依赖关系),然后将逻辑处理流程转化为物理执行计划(执行阶段及执行任务)。资源管理与任务调度层根据用户提供的资源需求来分配资源容器,并将任务(task)调度到合适的资源容器上运行。物理执行层实际运行具体的数据处理任务。下面具体介绍每个层次的详细信息,以及工业界和学术界进行的一些相关工作。
图1.2 大数据处理框架的四层结构
图中展示了从用户层(数据、代码、配置)向下依次经过分布式数据并行处理层、资源管理与任务调度层,最终到物理执行层的层次关系。
1.4.1 用户层
用户层方便用户开发大数据应用。如前所述,我们将一个大数据应用表示为 <输入数据, 用户代码, 配置参数>。下面介绍用户在开发应用时需要准备的输入数据、用户代码和配置参数。
1. 输入数据
对于批式大数据处理框架,如 Hadoop、Spark,用户在提交作业(job)之前,需要提前准备好输入数据。输入数据一般以分块(如以 128MB 为一块)的形式预先存储,可以存放在分布式文件系统(如 Hadoop 的分布式文件系统 HDFS)和分布式 Key-Value 数据库(如 HBase[19])上,也可以存放到关系数据库中。输入数据在应用提交后会由框架进行自动分块,每个分块一般对应一个具体执行任务(task)。
对于流式大数据处理框架,如 Spark Streaming[20] 和 Apache Flink[21],输入数据可以来自网络流(socket)、消息队列(Kafka)等。数据以微批(多条数据形成一个微批,称为 mini-batch)或者连续(一条接一条,称为 continuous)的形式进入流式大数据处理框架。
对于大数据应用,数据的高效读取常常成为影响系统整体性能的重要因素。为了提高应用读取数据的性能,学术界研究了如何通过降低磁盘 I/O 来提高性能。例如,PACMan[22] 根据一定策略提前将 task 所需的部分数据缓存到内存中,以提高 task 的执行性能。为了加速不同的大数据应用(如 Hadoop、Spark 等)之间的数据传递和共享,Tachyon[23](现在更名为 Alluxio[24])构造了一个基于内存的分布式数据存储系统,用户可以将不同应用产生的中间数据缓存到 Alluxio 中,而不是直接缓存到框架中,这样可以加速中间数据的写入和读取,同时也可以降低框架的内存消耗。
2. 用户代码
用户代码可以是用户手写的 MapReduce 代码,或者是基于其他大数据处理框架的具体应用处理流程的代码。图 1.3 展示了在 Hadoop MapReduce 上实现 WordCount 的用户代码,其用于计算字符出现的次数。在 Hadoop MapReduce 上用户需要自定义 map() 和 reduce() 函数。除了 map() 和 reduce() 函数,用户为了优化应用性能还定义了一个“迷你”的 reduce(),叫作 combine()。combine() 可以在 reduce() 执行之前对中间数据进行聚合,这样可以减少 reduce() 从各个节点获取的输入数据量,进而减少网络 I/O 开销和 reduce() 的压力。combine() 和 reduce() 的代码实现一般是相同的。Hadoop MapReduce 提供的 map() 和 reduce() 函数的处理逻辑比较固定单一,难以支持复杂数据操作,如常见的排序操作 sort()、数据库表的关联操作 join() 等。为此,Dryad 和 Spark 提供了更加通用的数据操作符,如 flatMap() 等。图 1.4 展示了在 Spark 上实现 WordCount 的用户代码,对于同样的应用处理逻辑,基于 Spark 的用户代码比基于 Hadoop MapReduce 的用户代码要更加简洁。
图1.3 在 Hadoop MapReduce 上实现 WordCount 的用户代码
图中包含用户自定义的 MyMapper 类和 MyReducer 类,以及 MyCombiner 类(与 MyReducer 相同),包括 map 方法、reduce 方法以及 main 方法中的 Job 配置。
图1.4 在 Spark 上实现 WordCount 的用户代码
图中使用 SparkContext 的 textFile 读取数据,调用 flatMap、mapToPair、reduceByKey 等操作,代码更加简洁。
在实际系统中,用户撰写用户代码后,大数据处理框架会生成一个 Driver 程序,将用户代码提交给集群运行。例如,在 Hadoop MapReduce 中,Driver 程序负责设定输入/输出数据类型,并向 Hadoop MapReduce 框架提交作业;在 Spark 中,Driver 程序不仅可以产生数据、广播数据给各个 task,而且可以收集 task 的运行结果,最后在 Driver 程序的内存中计算出最终结果。图 1.5 展示了在 Spark 平台上 Driver 程序的运行模式。
图1.5 在 Spark 平台上 Driver 程序的运行模式
图中显示 Driver 程序运行在客户端,向集群资源管理器(如 Standalone 或 YARN)申请 Executor,并将 task 分发到 Executor 上执行,最后从 Executor 收集结果。
除了直接依赖底层操作手动撰写用户代码,用户还可以利用高层语言或者高层库来间接产生用户代码。例如,在图 1.6 中用户可以使用类似 SQL 的 Apache Pig 脚本自动转化生成 Hadoop MapReduce 代码。通过这种方式生成的代码是二进制的,map() 和 reduce() 等函数代码不可见。一些高层库还提供了更简单的方式生成用户代码,如使用 Spark 之上的机器学习库 MLlib 时,用户只需要选择算法和设置算法参数,MLlib 即可自动生成可执行的 Spark 作业了。
图1.6 使用类似 SQL 的 Apache Pig 脚本自动转化生成 Hadoop MapReduce 代码
图中展示一段 Pig Latin 脚本(如 A = LOAD …; B = GROUP …; C = FOREACH … GENERATE …),并示意其被自动转化为 MapReduce 作业。
除了 Apache Pig 和 MLlib,工业界和学术界也提出了很多更简单、更通用的高层语言和高层库,使用户不用手写较为烦琐的 map() 和 reduce() 代码。Google 提出了 FlumeJava[25],可以将多个 MapReduce 作业以流水线(pipeline)的形式串联起来,并提供了基本的数据操作符,如 group()、join(),使常见的编程任务变得简单。Cascading[26] 是用 Java 编写的,它是构建在 Hadoop 之上的一套数据操作函数库。与 FlumeJava 类似,Cascading 同样为用户提供了基本的数据操作符,可以方便用户构建出较为复杂的数据流程。Google 设计的 Sawzall[27] 是一种用于数据查询的脚本语言,偏向统计分析。Sawzall 脚本可以自动转化为 MapReduce 作业执行,使得分析人员不用直接写 MapReduce 程序就可以进行大数据分析。Google 还设计了 Tenzing[28],该模块构建在 MapReduce 框架之上,支持 SQL 查询语言,并实现高效、低延迟的数据查询服务。微软研究院也设计了自己的用户层语言 DryadLINQ[29] 和 SCOPE[30]。DryadLINQ 将针对数据对象操作的 LINQ 程序转化成 Dryad 任务,再利用 Dryad 框架来并行处理数据。SCOPE 与 Sawzall 在一个层次上,可以将 SQL 脚本转化成 Dryad DAG 任务,同样利用 Dryad 框架来并行处理数据。SCOPE 和 Dryad 是使用 C#/C++ 实现的。
在用户代码的优化方面,PeriSCOPE[31] 根据 job pipeline 的拓扑结构对用户代码采用类似编译的优化措施,自动优化运行在 SCOPE 上的 job 性能。
3. 配置参数
一个大数据应用可以有很多配置参数,如 Hadoop 支持 200 多个配置参数。这些配置参数可以分为两大类:一类是与资源相关的配置参数。例如,buffer size 定义框架缓冲区的大小,影响 map/reduce 任务的内存用量。在 Hadoop 中,map/reduce 任务实际启动一个 JVM 来运行,因此用户还要设置 JVM 的大小,也就是 heap size。在 Spark 中,map/reduce 任务在资源容器(Executor JVM)中以线程的方式执行,用户需要估算应用的资源需求量,并设置应用需要的资源容器个数、CPU 个数和内存大小。
另一类是与数据流相关的配置参数。例如,Hadoop 和 Spark 中都可以设置 partition() 函数、partition 个数和数据分块大小。partition() 函数定义如何划分 map() 的输出数据。partition 个数定义产生多少个数据分块,也就是有多少个 reduce 任务会被运行。数据分块大小定义 map 任务的输入数据大小。
由于 Hadoop/Spark 框架本身没有提供自动优化配置参数的功能,所以工业界和学术界研究了如何通过寻找最优配置参数来对应用进行性能调优。StarFish[32] 研究了如何选择性能最优的 Hadoop 应用配置参数,其核心是一个 Just-In-Time 的优化器,该优化器可以对 Hadoop 应用的历史运行信息进行分析,并根据分析结果来预测应用在不同配置参数下的执行时间,以选择最优参数。Verma 等[33,34] 讨论了在给定应用完成时限的情况下,如何为 Hadoop 应用分配最佳的资源(map/reduce slot)来保证应用能够在给定时限内完成。DynMR[35] 通过调整任务启动时间、启动顺序、任务个数来减少任务等待时间和由于过早启动而引起的任务之间的资源竞争。MROnline[36] 根据任务执行状态,使用爬山法寻找最优的缓冲区大小和任务内存大小,以减少应用执行时间。Xu 等[37] 研究了如何离线估计 MapReduce 应用内存用量,采用的方法是先用小样本数据运行应用,然后根据应用运行信息来估算应用在大数据上的实际内存消耗。SkewTune[38] 可以根据用户自定义的代价函数来优化数据划分算法,在保持数据输入顺序的同时,减少数据倾斜问题。
1.4.2 分布式数据并行处理层
分布式数据并行处理层首先将用户提交的应用转化为较小的计算任务,然后通过调用底层的资源管理与任务调度层实现并行执行。
在 Hadoop MapReduce 中,这个转化过程是直接的。因为 MapReduce 具有固定的执行流程(map — Shuffle — reduce),可以直接将包含 map/reduce 函数的作业划分为 map 和 reduce 两个阶段。map 阶段包含多个可以并行执行的 map 任务,reduce 阶段包含多个可以并行执行的 reduce 任务。map 任务负责将输入的分块数据进行 map() 处理,并将其输出结果写入缓冲区,然后对缓冲区中的数据进行分区、排序、聚合等操作,最后将数据输出到磁盘上的不同分区中。reduce 任务首先将 map 任务输出的对应分区数据通过网络传输拷贝到本地内存中,内存空间不够时,会将内存数据排序后写入磁盘,然后经过归并、排序等阶段产生 reduce() 的输入数据。reduce() 处理完输入数据后,将输出数据写入分布式文件系统中。
与 Hadoop MapReduce 不同,Spark 上应用的转化过程包含两层:逻辑处理流程、执行阶段与执行任务划分。如图 1.7 所示,Spark 首先根据用户代码中的数据操作语义和操作顺序,将代码转化为逻辑处理流程。逻辑处理流程包含多个数据单元和数据依赖,每个数据单元包含多个数据分块。然后,框架对逻辑处理流程进行划分,生成物理执行计划。该计划包含多个执行阶段(stage),每个执行阶段包含若干执行任务(task)。微软的大数据编程框架 DryadLINQ 也提供类似的编译过程,可以将用户编写的大数据应用程序(LINQ)编译为可分布运行的 Dryad 执行计划和任务。
为了将用户代码转化为逻辑处理流程,Spark 和 Dryad 对输入/输出、中间数据进行了更具体的抽象处理,将这些数据用一个统一的数据结构表示。在 Spark 中,输入/输出、中间数据被表示成 RDD(Resilient Distributed Datasets,弹性分布式数据集)。在 RDD 上可以执行多种数据操作,如简单的 map(),以及复杂的 cogroup()、join() 等。一个 RDD 可以包含多个数据分区(partition)。parent RDD 和 child RDD 之间通过数据依赖关系关联,支持一对一和多对一等数据依赖关系。数据依赖关系的类型由数据操作的类型决定。如图 1.7 所示,逻辑处理流程是一个有向无环图(Directed Acyclic Graph,简称 DAG 图),其中的节点是数据单元 RDD,每个数据单元里面的圆形是指 RDD 的多个数据分块,正方形专指输入数据分块。箭头是在 RDD 上的一些数据操作(也隐含了 parent RDD 和 child RDD 之间的依赖关系)。
图1.7 Spark 应用转化与执行流程
图中展示用户代码被转化为 DAG 逻辑处理流程(RDD 节点与操作箭头),然后划分为 stage 和 task,最终在集群上执行的过程。
为了将逻辑处理流程转化为物理执行计划,Spark 首先根据 RDD 之间的数据依赖关系,将整个流程划分为多个小的执行阶段(stage)。例如,图 1.7 中逻辑处理流程被划分为 3 个执行阶段。之后,在每个执行阶段形成计算任务(task),计算任务的个数一般与 RDD 中分区的个数一致。与 MapReduce 不同的是,一个 Spark job 可以包含很多个执行阶段,而且每个执行阶段可以包含多种计算任务,因此并不能严格地区分每个执行阶段中的任务是 map 任务还是 reduce 任务。另外,在 Spark 中,用户可以通过调用 cache() 接口使框架缓存可被重用的中间数据。例如,当前 job 的输出可能会被下一个 job 用到,那么用户可以使用 cache() 对这些数据进行缓存。
1.0 第1章 大数据处理框架概览
1.4.4 物理执行层
大数据处理框架的物理执行层负责启动 task,执行每个 task 的数据处理步骤。在 Hadoop MapReduce 中,一个应用需要经历 map、Shuffle、reduce 三个数据处理阶段。而在 Spark 中,一个应用可以有更多的执行阶段(stage),如迭代型应用可能有几十个执行阶段,每个执行阶段也包含多个 task。另外,这些执行阶段可以形成复杂的 DAG 图结构。在物理执行时首先执行上游 stage 中的 task,完成后执行下游 stage 中的 task。
在 Hadoop MapReduce 中,每个 task 对应一个进程,也就是说每个 task 以 JVM(Java 虚拟机)的方式来运行,所以在 Hadoop MapReduce 中 task 的内存用量指的是 JVM 的堆内存用量。在 Spark 中,每个 task 对应 JVM 中的一个线程,而一个 JVM 可能同时运行了多个 task,因此 JVM 的内存空间由 task 共享。在应用未运行前,我们难以预知 task 的内存消耗和执行时间,也难以预知 JVM 中的堆内存用量。
从应用特点来分析,我们可以将 task 执行过程中主要消耗内存的数据分为以下三类:
- 框架执行时的中间数据。例如,
map()输出到缓冲区的数据和 reduce task 在 Shuffle 阶段暂存到内存中的数据。 - 框架缓存数据。例如,在 Spark 中,用户调用
cache()接口缓存到内存中的数据。 - 用户代码产生的中间计算结果。例如,用户代码调用
map()、reduce()、combine(),在处理输入数据时会在内存中产生中间计算结果。
很多大数据处理框架在设计时就考虑了内存的使用问题,并进行了相应的优化设计。例如,Spark 框架是基于内存计算的,它将大量的输入数据和中间数据都缓存到内存中,这种设计能够有效地提高交互型 job 和迭代型 job 的执行效率。
由于大数据应用的内存消耗量很大,所以当前许多研究关注如何改进大数据处理框架的内存管理机制,以减少应用内存消耗。例如:
- UCSD 提出了 ThemisMR [43],重新设计了 MapReduce 的数据流及内存管理方案,有效地将中间数据磁盘读写次数降低为两次,从而提高了 job 的执行性能。
- Tachyon 构造了一个基于内存的分布式数据存储系统,主要用于在不同 Hadoop/Spark 应用之间共享数据。用户可以将不同应用产生的中间数据缓存到 Tachyon 中而非直接缓存到框架中,以降低框架的内存消耗。
- FAÇADE [44] 提供了用于降低用户代码内存消耗的代码编译和执行环境。FAÇADE 的设计目的是将数据存储和数据操作分开,方法是将数据存放到 JVM 的堆外内存中,将对堆内对象的数据操作转化为对 FAÇADE 的函数调用。
- 对于 Java 对象本身产生的 overhead(也就是 Java 对象自身所需的 header 和 reference),Bu 等 [45] 提出了一些优化方法,如将大量数据对象(record object)合并为少量的大的数据对象。
- Lu 等 [46] 提出了基于对象生命周期的内存管理机制,可以根据数据对象类型和生命周期,将对象分配到不同队列进行分配和回收。
- Xu 等 [47] 针对 Hadoop/Spark 等大数据框架经常出现的垃圾回收时间长、频繁等问题,通过实验分析主流 Java 垃圾回收算法在大数据环境下存在的性能缺陷,提出了垃圾回收算法的三种改进方法。
- Yak [48] 提出了一种混合 GC 算法,将堆内存划分为控制流区域和数据流区域,前者使用传统 GC 算法回收控制流代码的内存对象,后者使用基于时域区域(epoch-based region)的内存管理,并根据数据对象生命周期来回收内存。
- Spark 社区 采用堆外内存管理机制和基于堆外内存的 Shuffle 机制,提出了钨丝计划 [49]。
另外,如何预测大数据应用的执行时间也被一些研究人员关注。如果能够预测出 job 的执行时间可以为任务调度器提供决策依据,则方便用户了解 job 的执行进度。
- 华盛顿大学 的研究人员提出了 KAMD [50] 和 ParaTimer [51],可以根据 job 执行的历史信息并结合正在运行的 job 处理的数据量,使用启发式方法来估算 job 剩余的执行时间。
- UIUC 的研究人员提出了 ARIA [33],细粒度地分析了单个 MapReduce job 的执行阶段,并提出了基于上下界的时间估算公式,可以通过 job 的历史信息或调试信息来估算执行时间。
- 华盛顿大学 的研究人员后来又提出了 PerfXplain [52],通过对比两个包含同样处理逻辑的 job 的性能指标,来解释两个 job 执行效率不同的原因。
物理执行层的关键点
- Task 在 Hadoop MapReduce 中是进程,在 Spark 中是线程。
- 内存消耗分为三类:中间数据、缓存数据、用户代码计算结果。
- 多项研究致力于内存管理优化与执行时间预测。
1.5 错误容忍机制
由于不能避免系统和用户代码的 Bug、节点宕机、网络异常、磁盘损坏等软硬件可靠性问题,分布式文件系统在设计时一般都会考虑错误容忍机制,在实现时也会针对各种失效情况采取相应措施。分布式大数据并行处理框架也不例外,设计了各种针对 Master 节点失效、task 执行失败等问题的错误容忍机制。然而,对于 task 的执行失败问题,框架的错误容忍机制比较简单,只是选择合适节点重新运行该 task。对于某些可靠性问题引起的 task 执行失败,如内存溢出等,简单地重新运行 task 并不能解决问题,因为内存溢出的问题很有可能会重复出现。现有框架的另一个局限是,一般用户在出错时很难找到真正的出错原因,即使是十分熟悉框架运行细节的用户,在缺乏分析诊断工具的情况下,也难以快速找到出错原因。
下面我们总结分析一下在错误容忍机制方面的前沿工作。
1.5.1 大数据应用错误分析
- Li 等 [53] 研究了 250 个 SCOPE 作业(运行在微软的 Dryad 框架之上)的故障错误,发现错误发生的主要原因是未定义的列、错误的数据模式、不正确的行格式等。
- Kavulya 等 [54] 分析了 4100 个在 Yahoo! 管理的集群上执行失败的 Hadoop 作业,发现 36% 的故障是数组访问越界造成的,还有 23% 的故障是因为 I/O 异常。
- Xu 等 [55] 研究了 123 个 Hadoop/Spark 应用中的内存溢出错误,发现内存溢出的主要原因包括应用配置异常、数据流异常、代码空间复杂度过高和框架内存管理缺陷等。
1.5.2 大数据应用错误诊断
- Titian [56] 通过记录 Spark 应用中全部中间数据和数据依赖关系来追踪出错的数据路径。
- BigDebug [57] 为 Spark 应用提供了断点、观察点、细粒度数据追踪等调试功能。
- Xu 等 [58] 设计实现了 Hadoop MapReduce 的内存溢出错误诊断工具 Mprof,它可以建立执行任务内存用量与数据流量的定量关系。在此基础上,Mprof 设计了多种内存溢出错误诊断规则,这些规则根据应用配置、数据流量与任务内存用量的关联关系来定位内存溢出错误的相关代码、数据,以及不恰当的配置参数。
1.5.3 大数据应用错误修复
- Interruptile Tasks [59] 改进了现有的 task,使得 task 具备一定的错误容忍能力。当 task 在运行时遇到内存用量过大或者内存溢出的问题时,Interruptile Tasks 会暂停当前 task 的运行,回收部分运行数据及中间结果,并将不能回收的结果溢写(spill)到磁盘上,然后执行用户定义的 interrupt 逻辑,等到内存用量下降到一定程度后,再让 task 继续运行。
错误容忍机制的局限
简单重试无法解决内存溢出等重复性错误;用户缺乏有效的错误诊断工具。
1.6 其他大数据处理框架
除了本章介绍的分布式处理框架 MapReduce、Spark 和 Dryad,Yahoo! 还提出了 Map-Reduce merge [60] 框架,通过在 reduce 阶段后面加入 merge 阶段,提高了 MapReduce 对二维表的关系代数处理能力。UC Berkeley 提出了 MapReduce Online [61],改进了从 map 阶段到 reduce 阶段的数据流动方式,使得 mapper 输出的数据可以更快地流入 reducer 中,提高 MapReduce 对数据的在线处理能力。UCI 的 Bu 等 提出了 HaLoop [62],提高了 MapReduce 迭代型任务的执行性能。NYU 提出的面向内存计算应用的分布式计算编程模型 Piccolo [63] 可以提供 Key-Value 表的操作接口。与 MapReduce 相比,Piccolo 能够轻松地访问中间状态及中间数据。Spark Structured Streaming [64] 和 Apache Flink 统一了批式处理与流式处理的执行流程。
1.7 本章小结
本章探讨了大数据的基本概念和大数据处理所面临的挑战,介绍了基本的大数据处理框架,讨论了用于支持不同大数据应用的通用编程模型,最后重点讲述了大数据处理框架的四层结构。通过本章可以了解到:
- 用户层如何开发应用
- 分布式数据并行处理层如何执行数据处理流程
- 资源管理和任务调度层如何分配资源、调度任务
- 物理执行层如何执行具体的任务
- 大数据处理框架的错误容忍机制
在各个章节中,我们还介绍了与大数据系统相关的各种前沿研究工作。这些背景知识是探讨下面章节中的 Spark 设计与实现原理的基础。
1.8 扩展阅读
除了支持基本的 RDD 数据结构,Spark 还支持在 RDD 基础上扩展的、面向结构化数据(主要是表格数据)的高级数据结构 [65],即 DataSet 和 DataFrame。使用 DataSet、DataFrame 开发的应用可以更好地执行各种 SQL 操作,并利用 Spark SQL 引擎中的优化技术来对执行计划进行优化。读者可以参考 Spark: The Definitive Guide [66] 了解 DataSet、DataFrame 数据结构的使用方法。
本书主要讨论基于 RDD 数据结构的 Spark 应用,因为这些应用更为基础,也更有助于在分布式层面理解数据和计算任务的划分,以及生成规则。读者在理解这些应用的执行原理后,可以进一步参考 Spark SQL 论文和 《Spark SQL 内核剖析》 [67] 学习 Spark SQL 引擎中的优化技术。
另外需要注意的是,基于 RDD 应用的逻辑处理流程和物理执行计划与 Spark SQL 应用的 Logical plan 和 Physical plan 有所不同:
- 基于 RDD 应用:逻辑处理流程指的是一系列 RDD 操作形成的输入/输出、中间数据及数据之间的依赖关系;物理执行计划指的是具体的执行阶段(stage)和执行任务(task)。
- Spark SQL 应用:Logical plan 指的是将 SQL 脚本转化后的逻辑算子树,包含各种 SQL 操作,如
Project()、filter()、join()等;Physical plan 指的是对逻辑算子树进行转化后形成的物理算子树,树中的节点可以转化为 RDD 及其操作,也可以直接生成实现Project()、filter()、join()等操作的 Java 代码。
更多的介绍可以参考参考文献 [67]。
图片引用
- [Image 55 on Page 17]
- [Image 60 on Page 19]
- [Image 65 on Page 21]
- [Image 68 on Page 22]
- [Image 69 on Page 22]
- [Image 72 on Page 23]
- [Image 79 on Page 26]
- [Image 84 on Page 28]
- [Image 87 on Page 29]