第11章

批处理

如果一个系统过于受单个人影响,它就无法成功。当初始设计完成并足够健壮后,真正的考验随着许多持有不同观点的人开始他们自己的实验而到来。
—Donald Knuth,《The Errors of TeX》(1989)

到目前为止,本书大部分内容都在讨论请求与查询及其对应的响应或结果。许多现代数据系统都假定采用这种数据处理风格:你请求某样东西,或发送一条指令,系统就会尽可能快地给你答案。

一个请求页面的网页浏览器、调用远程API的服务、数据库、缓存、搜索索引以及许多其他系统都按这种方式工作。我们称这些为在线系统。响应时间通常是它们主要的性能度量指标,并且它们通常需要容错性来确保高可用性。

然而,有时你需要运行一个比交互式请求所能处理的计算更大或数据量更多的计算任务。也许你需要训练一个AI模型,或者将大量数据从一种形式转换为另一种形式,或者对一个非常大的数据集进行计算分析。我们将这些任务称为批处理作业,而处理它们的系统有时被称为离线系统

一个批处理作业接收输入数据(只读),并产生输出数据(每次运行时从头生成)。它通常不会像读/写事务那样改变数据。因此,输出是从输入派生而来的(如第10页“记录系统与派生数据”中讨论的那样)。如果你不喜欢输出,你可以删除它,调整作业逻辑,然后重新运行作业。

通过将输入视为不可变并避免副作用(例如写入外部数据库),批处理作业能够实现良好的性能以及其他好处:

  • 如果在代码中引入了一个错误,导致输出错误或损坏,你可以简单地回滚到代码的先前版本并重新运行作业,输出就会再次正确。或者,更简单的方法是,你可以将旧输出保留在另一个目录中并切换回去。大多数对象存储和开放表格式(参见第135页“云数据仓库”)都支持这一特性,称为数据时间旅行(time travel)。大多数具有读写事务的数据库不具备这一属性:如果部署了写入错误数据的错误代码,回滚代码并不能修复那些数据。这种能从错误代码中恢复的想法被称为人类容错(human fault tolerance) [1]。

  • 由于回滚的便利性,功能开发的速度可以比在错误可能导致不可逆损害的环境中更快。这种最小化不可逆性的原则对敏捷软件开发是有益的 [2]。

  • 同一组文件可以用作各种类型作业的输入,包括监控作业,这些作业计算指标并评估作业的输出是否具有预期的特征(例如,通过与之前运行的输出进行比较并测量差异)。

  • 批处理框架能高效利用计算资源。尽管可以通过在线数据系统(如OLTP数据库和应用服务器)对数据进行批处理,但这样做在所需资源方面可能会昂贵得多。

批处理已被证明在广泛的用例中非常有用,我们将在第476页的“批处理用例”中重新讨论。然而,它也带来了挑战。对于大多数框架,只有在整个作业完成后,输出才能被其他作业处理。批处理也可能效率低下;对输入数据的任何更改——即使是一个字节——都需要批处理作业重新处理整个输入数据集。

批处理作业可能需要很长时间来运行:几分钟、几小时,甚至几天。作业可能被安排定期运行(例如每天一次)。性能的主要度量指标通常是吞吐量:作业在单位时间内能处理多少数据。有些批处理系统通过简单地中止并重启整个作业来处理故障,而另一些则具有容错能力,即使某些节点崩溃,作业也能成功完成。

在线系统和批处理系统之间的界限并不总是清晰的;一个长时间运行的数据库查询看起来很像一个批处理过程。但批处理也具有一些特定特性,使其成为构建可靠、可扩展和可维护应用程序的有用构建块。例如,它通常在数据集成中发挥作用——组合多个数据系统来实现单个系统无法完成的事情。第7页“数据仓库”中讨论的ETL就是一个例子。

NOTE

批处理的另一种选择是流处理,其中作业在处理完输入后不会结束运行,而是持续监控输入,并在输入发生变化后不久就处理这些变化。我们将在第12章转向流处理。

现代批处理深受MapReduce的影响,MapReduce是Google在2004年发表的一种批处理算法 [3],随后在多个开源数据系统中实现,包括Hadoop、CouchDB和MongoDB。MapReduce是一种相当底层的编程模型,不如数据仓库中常见的并行查询执行引擎那么复杂 [4,5]。当它刚出现时,MapReduce在商用硬件上能实现的处理规模方面是一大进步,但现在它已在很大程度上过时,Google已不再使用 [6,7]。

如今的批处理更多是使用像Spark或Flink这样的框架,或数据仓库查询引擎来完成的。与MapReduce一样,它们严重依赖分片(参见第7章)和并行执行,但它们拥有更复杂的缓存和执行策略。随着这些系统的成熟,运维上的问题已基本得到解决,因此焦点转向了可用性。新的处理模型,如数据流API、查询语言和DataFrame API,现已得到广泛支持。作业和工作流编排也已成熟。以Hadoop为中心的工作流调度器(如Oozie和Azkaban)已被更通用的解决方案(如Airflow、Dagster和Prefect)所取代,这些方案支持广泛的批处理框架和云数据仓库。

云计算已变得无处不在。批处理存储层正从分布式文件系统(DFS,如HDFS、GlusterFS和CephFS)向对象存储系统(如S3)迁移。可扩展的云数据仓库(如BigQuery和Snowflake)正在模糊数据仓库和批处理之间的界限。

为了建立对批处理概念的直观理解,我们将在本章开始时以一个在单机上使用标准Unix工具的示例开始。然后我们将探讨如何将数据处理扩展到分布式系统中的多台机器。我们将看到,类似于操作系统,分布式批处理框架拥有一个调度器和一个文件系统。然后我们将探索用于编写批处理作业的各种处理模型。最后,我们将讨论常见的批处理用例。


使用Unix工具的批处理

假设你有一个Web服务器,它在每次服务请求时都会向日志文件追加一行。例如,使用NGINX默认的访问日志格式,日志中的一行可能如下所示:

216.58.210.78 - - [27/Jun/2025:17:55:11 +0000] "GET /css/typography.css HTTP/1.1"
200 3377 "https://martin.kleppmann.com/" "Mozilla/5.0 (Macintosh; Intel Mac OS X
10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36"

(这实际上是一行;为了可读性这里被拆成多行。)那一行中包含大量信息。要解释它,你需要查看日志格式的定义,如下所示:

$remote_addr - $remote_user [$time_local] "$request"
$status $body_bytes_sent "$http_referer" "$http_user_agent"

所以,这一行日志表明:在2025年6月27日17:55:11 UTC,服务器收到一个对文件/css/typography.css的请求,来自客户端IP地址216.58.210.78。用户未经过身份验证,因此$remote_user被设为连字符(-)。响应状态为200(即请求成功),响应大小为3377字节。Web浏览器是Chrome 137,它加载该文件是因为它在URL https://martin.kleppmann.com/ 的页面中被引用。

尽管日志解析看起来可能有点做作,但它是许多现代技术公司运营中的关键部分,并被用于从广告管道到支付处理的一切场景。事实上,它是推动MapReduce和“大数据”运动快速采用的驱动力之一。

简单日志分析

各种工具可以获取这些日志文件并生成关于网站流量的漂亮报告,但为了练习,让我们使用基本的Unix工具自己构建一个。例如,假设你想找出网站上最受欢迎的五个页面。你可以在Unix shell中按如下方式操作:

cat /var/log/nginx/access.log |   # 读取日志文件.(严格来说,这里的cat是不必要的,因为输入文件可以直接作为awk的参数给出.但这样写出线性管道会更明显.)
  awk '{print $7}' |               # 将每一行拆分为字段,仅输出第7个字段(即请求的URL,假设为$7,实际上在awk中$7代表第7个字段,对应日志格式中的"$request"中的URL部分).
  sort             |               # 按字母顺序对请求的URL进行排序.如果许多页面被多次请求,则相同URL会相邻出现.
  uniq -c          |               # uniq命令通过统计相邻重复行出现的次数来压缩它们.然后输出计数和URL.
  sort -r -n       |               # 按第一列(数字计数)进行反向数值排序,使得最受欢迎的页面排在顶部.
  head -n 5                        # 输出前五行(即最受欢迎的五个URL).

(注:在原始文本中,awk '{print $7}'中的$7是示例,实际应根据日志格式调整;这里保持原文不变。)


[Page 451]
[Page 452]
[Page 453]
[Page 454]

(页码标注保留,代表原始印刷页码。在文本中原始页码出现在每个段落之后,此处为清晰已集中列出。)

第11章:批处理

使用Unix工具进行批处理

sort             | 
  uniq -c          | 
  sort -r -n       | 
  head -n 5

读取日志文件。(严格来说,此处cat并非必要,因为输入文件可直接作为参数传递给awk。但这样写能使线性管道更清晰。)

按空白字符分割每一行,并只输出每行的第七个字段,该字段恰好是请求的URL。在我们的示例行中,该URL是/css/typography.css

对请求的URL列表进行字母排序。排序的目的是确保:如果一个URL被请求了n次,排序后的文件中该URL将连续重复出现n次。

uniq命令通过检查输入中相邻两行是否相同来过滤掉重复行。-c选项使其同时输出计数器:对于每个不同的URL,报告其在输入中出现的次数。

第二次sort按每行开头的数字(-n)进行排序,该数字是URL被请求的次数。然后以逆序(-r)返回结果——最大数字排在最前。

最后,head只输出输入的前五行(-n 5),丢弃其余部分。

该命令序列的输出类似如下:

4189 /favicon.ico
3631 /2016/02/08/how-to-do-distributed-locking.html
2124 /2020/11/18/distributed-systems-and-elliptic-curves.html
1369 /
 915 /css/typography.css

尽管上述命令行如果对Unix工具不熟悉可能看起来有些晦涩,但它极其强大。它能在几秒钟内处理数GB的日志文件,并且你可以轻松修改分析以满足需求。例如,如果想从报告中忽略CSS文件,可以将awk参数改为$7 !~ /\.css$/ {print $7};如果想统计客户端IP地址(而非页面)的排名,可将awk参数改为{print $1},等等。

本书篇幅有限,无法详尽探索Unix工具,但它们非常值得学习。许多数据分析工作可以通过组合awksedgrepsortuniqxargs在几分钟内完成,并且性能惊人地好 [8]。

命令链 vs 自定义程序

除了Unix命令链,你也可以编写一个简单的程序来完成相同的任务。例如,用Python可能如下所示:

from collections import defaultdict
 
counts = defaultdict(int) 
with open('/var/log/nginx/access.log', 'r') as file:
    for line in file:
        url = line.split()[6] 
        counts[url] += 1 
 
top5 = sorted(((count, url) for url, count in counts.items()),
              reverse=True)[:5] 
 
for count, url in top5:  
    print(f"{count} {url}")
  • 初始化counts为一个哈希表,为每个URL保持一个计数器,初始值为0。
  • 从日志每一行中获取请求的URL,即第七个空白分隔字段(数组索引为6,因为Python数组从0开始)。
  • 递增当前行中URL的计数器。
  • 按计数器值(降序)对哈希表内容排序,并取前五个条目。
  • 打印前五个条目。

这个程序不如Unix命令链简洁,但相当可读,两者偏好部分取决于个人品味。然而,除了两者表面的语法差异外,执行流程存在很大差异,这在处理大文件时会显现出来。

排序 vs 内存内聚合

Python脚本在内存中维护一个URL的哈希表,每个URL映射到其出现次数。而Unix管道示例没有这样的哈希表,而是依赖于对URL列表进行排序,其中同一URL的多次出现只是简单重复。

哪种方法更好?取决于你拥有的URL数量。对于大多数中小型网站,你可能可以将所有不同URL以及每个URL的计数器放入(例如)1 GB内存中。在此示例中,作业的工作集(作业需要随机访问的内存量)仅取决于不同URL的数量。如果有一个URL有一百万条日志条目,哈希表中所需的空间仍然只是一个URL加上计数器的大小。如果这个工作集足够小,内存内哈希表就能很好地工作——即使在笔记本电脑上。

另一方面,如果作业的工作集大于可用内存,排序方法则具有能够高效利用磁盘的优势。这与我们在“日志结构存储”(第118页)中讨论的原理相同:数据块可以在内存中排序并写入磁盘作为段文件,然后多个已排序的段可以合并成一个更大的已排序文件。归并排序具有顺序访问模式,这在磁盘上表现良好(参见“SSD上的顺序与随机写入”,第130页)。

GNU Coreutils(Linux)中的sort工具通过溢出到磁盘自动处理大于内存的数据集,并自动跨多个CPU核心并行排序 [9]。这意味着我们刚才看到的简单Unix命令链能够轻松扩展到大数据集,而不会耗尽内存。瓶颈很可能是从磁盘读取输入文件的速度。

Unix工具的一个限制是它们运行在单台机器上。对于太大而无法放入内存或本地磁盘的数据集,这就成了问题——而这正是分布式批处理框架的用武之地。

分布式系统中的批处理

运行我们Unix工具示例的那台机器有许多协同工作的组件来处理日志数据:

  • 通过操作系统文件系统接口访问的存储设备
  • 确定进程何时运行以及如何分配CPU资源的调度器
  • 一系列Unix程序,它们的标准输入和标准输出(stdin和stdout)通过管道连接在一起

这些相同的组件也存在于分布式数据处理框架中。实际上,你可以将这些框架视为分布式操作系统;它们拥有文件系统、作业调度器以及通过文件系统或其他通信渠道相互发送数据的程序。

分布式文件系统

操作系统提供的文件系统由多个层组成:

  • 在最底层,块设备驱动程序直接与磁盘通信,并允许上层读取和写入原始块。
  • 块层之上是一个页缓存,将最近访问的块保留在内存中以加快访问速度。
  • 块API被封装在文件系统层中,该层将大文件分解为块,并跟踪文件元数据,如inode、目录和文件。例如,Linux上的两种常见实现是ext4和XFS。
  • 最后,操作系统通过一个名为虚拟文件系统(VFS)的通用API向应用程序公开不同的文件系统。VFS使得应用程序能够以标准方式读取和写入,而不管底层文件系统是什么。

分布式文件系统的工作方式类似。文件被分解成块,并分布到多台机器上。DFS块通常比本地块大得多。HDFS默认值为128 MB,而JuiceFS和许多对象存储使用4 MB块——远大于ext4的4096字节。更大的块意味着需要跟踪的元数据更少,这对PB级数据集影响巨大。更大的块还能降低相对于读取块时的寻道开销。

大多数物理存储设备无法写入部分块,因此操作系统要求写入使用完整的块,即使数据没有填满整个块。由于分布式文件系统具有更大的块,并且通常实现于操作系统文件系统之上,它们没有这个要求。例如,一个900 MB的文件使用128 MB块存储时,将有七个块使用128 MB,一个块使用4 MB。

DFS块通过向集群中存储该块的机器发送网络请求来读取。每台机器运行一个守护进程,暴露一个API,允许远程进程在其本地文件系统上将块作为文件读取和写入。HDFS将这些守护进程称为DataNode,而GlusterFS称其为glusterfsd进程。本书中我们将它们称为数据节点

分布式文件系统也实现了页面缓存的分布式等效物。由于DFS块作为文件存储在数据节点上,读取和写入会通过每个数据节点的操作系统,其中包括内存页缓存。这使得频繁读取的数据块保留在数据节点的内存中。一些分布式文件系统还实现了更多缓存层级,例如JuiceFS中的客户端缓存和本地磁盘缓存。

数据本地性

分布式文件系统的关键设计原则之一是数据本地性:将计算移动到数据所在的位置,而不是将数据移动到计算处。这减少了网络流量,并允许利用数据节点的本地磁盘和内存缓存。MapReduce和Spark等框架在调度任务时会优先考虑数据本地性,即在存储所需数据的节点附近启动任务。

Chapter 11: Batch Processing Filesystems such as ext4 and XFS keep track of storage metadata including free space, file block locations, directory structures, permission settings, and more. Distributed filesystems also need a way to track file locations spread across machines, permission settings, and so on. Hadoop has a service called the NameNode that maintains metadata for the cluster. DeepSeek’s 3FS has a metadata service that persists its data to a key-value store such as FoundationDB. Above the filesystem sits the VFS. A close analogue in batch processing is a distributed filesystem’s protocol. Distributed filesystems must expose a protocol or interface so that batch processing systems can read and write files. This protocol acts as a pluggable interface; any DFS may be used so long as it implements the protocol. For example, Amazon S3’s API has been widely adopted by storage systems such as MinIO, Cloudflare’s R2, Tigris, Backblaze’s B2, and many others. Batch processing systems with S3 support can use any of these storage systems. Some DFSs implement POSIX-compliant filesystems that appear to the operating system’s VFS like any other filesystem. Filesystem in Userspace (FUSE) or the Network File System (NFS) protocol are often used to integrate into the VFS. NFS is perhaps the most well-known distributed filesystem protocol. The protocol was originally developed to allow multiple clients to read and write data on a single server. More recently, filesystems such as Amazon Elastic File System (EFS) and Archil provide NFS-compatible distributed filesystem implementations that are far more scalable. NFS clients still connect to one endpoint, but underneath, these systems communicate with distributed metadata services and data nodes to read and write data. Distributed Filesystems and Network Storage Distributed filesystems are based on the shared-nothing principle (see “Shared-Memory, Shared-Disk, and Shared-Nothing Architectures” on page 51), in contrast to the shared-disk approach of network attached storage (NAS) and storage area network (SAN) architectures. Shared-disk storage is implemented by a centralized storage appliance, often using custom hardware and special network infrastructure such as Fibre Channel. On the other hand, the shared-nothing approach requires no special hardware, only computers connected by a conventional datacenter network. Many distributed filesystems are built on commodity hardware, which is less expensive but has higher failure rates than enterprise-grade hardware. In order to tolerate machine and disk failures, file blocks are replicated on multiple machines. This also allows schedulers to more evenly distribute workloads since they can execute a task on any node that contains a replica of the task’s input data. Replication may mean simply keeping several copies of the same data on multiple machines, as described in Chapter 6, or using an erasure coding scheme such as Reed–Solomon codes, which allows lost data to be recovered with lower storage overhead than full replication [10, 11, 12]. The techniques are similar to RAID, which provides redundancy across several disks attached to the same machine; the difference is that in a distributed filesystem, file access and replication are done over a conventional datacenter network without special hardware. Object Stores Object storage services such as Amazon S3, Google Cloud Storage, Azure Blob Storage, and OpenStack Swift have become a popular alternative to distributed filesystems for batch processing jobs. In fact, the line between the two is somewhat blurry. As we saw in the previous section and “Databases Backed by Object Storage” on page 202, FUSE drivers allow users to treat object stores such as S3 as a filesystem. Some DFS implementations, such as JuiceFS and Ceph, offer both object storage and filesystem APIs. However, their APIs, performance, and consistency guarantees are very different. Care must be taken when adopting such systems to make sure they behave as expected, even if they seem to implement the requisite APIs. Each object in an object store has a URL such as s3://my-photo-bucket/2025/04/01/birthday.png. The host portion of the URL (my-photo-bucket) describes the bucket where objects are stored, and the part that follows is the object’s key (/2025/04/01/birthday.png in our example). A bucket has a globally unique name, and each object’s key must be unique within its bucket. Objects are read using a get call and written using a put call. Unlike files on a filesystem, objects are immutable once written. To update an object, it must be fully rewritten using a put call, similarly to a key-value store. Azure Blob Storage and S3 Express One Zone support appends, but most other stores do not. There are no file handle APIs with functions like fopen and fseek. Objects may look as if they are organized into directories, which is somewhat confusing because object stores do not have the concept of directories. The path structure is simply a convention, and the slashes are a part of the object’s key. This convention allows you to perform something similar to a directory listing by requesting a list of objects with a particular prefix. However, listing objects by prefix is different from a filesystem directory listing in two ways:

  • A prefix list operation behaves like a recursive ls -R call on a Unix system. It returns all objects that start with the prefix, and objects in subpaths are included.
  • Empty directories are not possible. If you were to remove all objects underneath s3://my-photo-bucket/2025/04/01, then 01 would no longer appear when you called list on s3://my-photo-bucket/2025/04. It’s common practice to create a zero-byte object as a way to represent an empty directory (e.g., creating an empty s3://my-photo-bucket/2025/04/01 file to keep it present when all child objects are deleted). DFS implementations often support common filesystem operations such as hard links, symbolic links, file locking, and atomic renames. Such features are missing from object stores. Linking and locks are typically not supported, while renames are nonatomic; they’re accomplished by copying the object to the new key and then deleting the old object. If you want to rename a directory, you have to individually rename every object within it, since the directory name is a part of the key. The key-value stores we discussed in Chapter 4 are optimized for small values (typically kilobytes) and frequent, low-latency reads/writes. In contrast, distributed filesystems and object stores are generally optimized for large objects (megabytes to gigabytes) and less frequent, larger reads. Recently, however, object stores have begun to add support for frequent and smaller reads/writes. For example, S3 Express One Zone now offers single-millisecond latency and a pricing model that is more similar to key-value stores. Another difference between distributed filesystems and object stores is that DFSs such as HDFS allow computing tasks to be run on the machine that stores a copy of a particular file. This allows the task to read that file without having to send it over the network, which saves bandwidth if the executable code of the task is smaller than the file it needs to read. On the other hand, object stores usually keep storage and computation separate. Doing so might use more bandwidth, but modern datacenter networks are very fast, so this is often acceptable. This architecture also allows machine resources such as CPU and memory to be scaled independently of storage since the two are decoupled. Distributed Job Orchestration Our operating system analogy also applies to job orchestration. When you execute a Unix batch job, something needs to actually run the awk, sort, uniq, and head processes. Data needs to be transferred from one process’s output to another process’s input, memory must be allocated for each process, instructions from each process must be scheduled fairly and executed on the CPU, memory and I/O boundaries must be enforced, and so on. On a single machine, an operating system’s kernel is responsible for such work. In a distributed environment, this is the role of a job orchestrator. Batch processing frameworks send a request to an orchestrator’s scheduler to run a job. Requests to start a job contain metadata such as this:
  • The number of tasks to execute
  • The amount of memory, CPU, and disk needed for each task
  • A job identifier
  • Access credentials
  • Job parameters such as input and output data
  • Required hardware details such as GPUs or disk types
  • The location of the job’s executable code Orchestrators such as Kubernetes and Hadoop YARN (Yet Another Resource Negotiator) [13] combine this information with cluster metadata to execute the job using the following components: Task executors An executor daemon such as YARN’s NodeManager or Kubernetes’s kubelet runs on each node in the cluster. Executors are responsible for running job tasks, sending heartbeats to signal their liveness, and tracking task status and resource allocation on the node. When a task-start request is sent to an executor, it retrieves the job’s executable code and runs a command to start the task. The executor then monitors the process until it finishes or fails, at which point it updates the task status metadata accordingly. Many executors also work with the operating system to provide both security and performance isolation. YARN and Kubernetes use Linux cgroups, for example. This prevents tasks from accessing data without permission or from negatively affecting the performance of other tasks on the node by using excessive resources. Resource manager An orchestrator’s resource manager stores metadata about each node, including available hardware (CPUs, GPUs, memory, disks, and so on), task statuses, network location, node status, and other relevant information. Thus, the manager provides a global view of the cluster’s current state. The centralized nature of the resource manager can lead to both scalability and availability bottlenecks. YARN uses ZooKeeper, and Kubernetes uses etcd, to store cluster state (see “Coordination Services” on page 437). Scheduler Orchestrators usually have a centralized scheduler subsystem, which receives requests to start, stop, or check on the status of a job. For example, a scheduler might receive a request to start a job with 10 tasks using a specific Docker image on nodes that have a specific type of GPU. The scheduler uses the information from the request and the state of the resource manager to determine which tasks to run on which nodes. The task executors are then informed of their assigned work and begin execution.

第 11 章:批处理

尽管每个编排器使用不同的术语,但在几乎所有编排系统中都能找到这些组件。

调度决策有时需要应用特定的调度器,以便考虑特殊需求,例如当达到某个查询阈值时自动扩缩读取副本。集中式调度器和应用特定的调度器协同工作,以确定如何最好地执行任务。YARN 将其子调度器称为 ApplicationMaster,而 Kubernetes 则将其称为 operator。

资源分配

调度器在作业编排中扮演着特别具有挑战性的角色。它们必须找出如何在具有竞争需求的作业之间最优地分配集群的有限资源。从根本上说,这些决策必须平衡公平性和效率。

假设一个小型集群有五个节点,总共有 160 个 CPU 核心可用。集群的调度器收到两个作业请求,每个请求需要 100 个核心来完成其工作。最佳的工作负载调度方式是什么?

  • 调度器可以决定为每个作业运行 80 个任务,并在较早的任务完成后开始运行每个作业剩余的 20 个任务。
  • 调度器可以先运行一个作业的所有任务,然后只在有 100 个核心可用时才开始运行第二个作业的任务(这种策略称为 群体调度 gang scheduling)。
  • 如果第二个作业请求比第一个晚很多到达,调度器掌握的信息就不完整。它必须决定是将所有 100 个核心分配给第一个作业,还是预留一些,以应对可能到来也可能永远不会到来的未来作业。

这是一个非常简单的例子,但我们已经看到了许多困难的权衡。例如,在群体调度场景中,如果调度器保留 CPU 核心,直到所有 100 个核心同时可用,节点将会闲置。集群的资源利用率会下降,如果其他作业也试图预留 CPU 核心,还可能发生死锁。另一方面,如果调度器只是等待 100 个核心变得可用,其他作业可能会在此期间抢占这些核心。集群可能长时间无法提供 100 个核心,导致饥饿。或者,调度器可以决定抢占第一个作业的部分任务,将其杀死,为第二个作业腾出空间。然而,任务抢占也会降低集群效率,因为被杀死任务稍后需要重新启动。

现在设想一个调度器,它必须为成百上千甚至数百万个这样的作业请求做出分配决策。找到最优解似乎难以处理。事实上,该问题是 NP 难的,这意味着除了最小的示例外,计算最优解的速度会慢得令人望而却步 [14, 15]。

因此,在实践中,调度器使用启发式算法来做出非最优但合理的决策。常用的算法有几种,包括先进先出(FIFO)、主资源公平性(DRF)、优先级队列、基于容量或配额的调度,以及各种装箱算法。这些算法的细节超出了本书的范围,但它们是一个迷人的研究领域。

调度工作流

“简单日志分析”(第 454 页)中的 Unix 工具示例涉及由 Unix 管道连接的多个命令的链条。分布式批处理过程中也出现了相同的模式:一个作业的输出通常需要成为另一个或多个作业的输入,并且每个作业可能有多个由其他作业产生的输入。这称为作业的工作流有向无环图(DAG)

在“持久执行与工作流”(第 187 页)中,我们看到了提供步骤序列持久执行的工作流引擎,通常执行 RPC。在批处理上下文中,“工作流”具有不同的含义:它是一系列批处理过程,每个过程接收输入数据并产生输出数据,但通常不对外部服务进行 RPC。持久执行引擎通常每个请求处理的数据量少于其批处理对应物,尽管界线有些模糊。

可能需要一个由多个作业组成的工作流,原因如下:

  • 如果一个作业的输出需要成为其他几个作业的输入(这些作业由不同团队维护),那么第一个作业最好将其写入一个所有其他作业都能读取的位置。然后,这些消费作业可以安排在每次数据更新时运行,或者按其他计划运行。
  • 你可能希望将数据从一种处理工具转移到另一种处理工具。例如,Spark 作业可能将其输出到 HDFS,然后 Python 脚本可能触发 Trino SQL 查询(见“云数据仓库”,第 135 页),该查询对 HDFS 文件进行进一步处理并输出到 S3。
  • 某些数据流水线内部需要多个处理阶段。例如,如果一个阶段需要按键分片数据,而下一阶段需要按不同键分片,则第一阶段可以输出以第二阶段所需方式分片的数据。

在 Unix 工具示例中,将一个命令的输出连接到另一个命令输入的管道仅使用一个小的内存缓冲区,不将数据写入文件。如果该缓冲区填满,生产者进程需要等待消费者进程从缓冲区读取一些数据后才能输出更多——这是一种背压形式。Spark、Flink 和其他批处理执行引擎支持类似的模型,即一个任务的输出直接传递给另一个任务(如果任务在不同机器上运行,则通过网络传递)。

然而,更典型的情况是工作流中的一个作业将其输出写入分布式文件系统或对象存储,下一个作业从那里读取。这使作业彼此解耦,允许它们在不同时间运行。如果一个作业有多个输入,工作流调度器通常会等待所有产生其输入的作业成功完成,然后才运行消费这些输入的作业。

编排框架(如 YARN 的 ResourceManager 或 Spark 的内置调度器)中的调度器不管理整个工作流;它们按作业进行调度。为了处理作业执行之间的这些依赖关系,已经开发了各种工作流调度器,包括 Airflow、Dagster 和 Prefect。工作流调度器具有管理功能,对于维护大量批处理作业非常有用。由 50 到 100 个作业组成的工作流在许多数据流水线中很常见,在大型组织中,许多团队可能运行跨多个系统读取彼此输出的作业或工作流。对于管理如此复杂的数据流,工具支持非常重要。

处理故障

批处理作业通常运行很长时间。具有许多并行任务的长时间运行作业很可能会在运行过程中遇到至少一个任务失败。如“硬件和软件故障”(第 44 页)和“不可靠的网络”(第 347 页)所述,这可能是由于硬件故障(尤其是在商品硬件上)或网络中断等原因造成的。

任务可能无法完成运行的另一个原因是调度器可能会主动抢占(杀死)它。如果你有多个优先级级别,例如运行成本较低的低优先级任务和成本较高的高优先级任务,抢占特别有用。低优先级任务只要有空闲计算能力就可以运行,但如果高优先级任务到达,它们随时面临被抢占的风险。这种更便宜的低优先级虚拟机在 Amazon EC2 上称为竞价实例,在 Azure 上称为竞价虚拟机,在 Google Cloud 上称为可抢占实例 [16]。

由于批处理通常用于对时间不敏感的作业,因此非常适合使用低优先级任务和竞价实例来降低运行作业的成本。本质上,这些作业可以使用原本会闲置的备用计算资源,从而提高集群利用率。然而,这也意味着这些任务更有可能被调度器杀死,因为抢占发生的频率高于硬件故障 [17]。

由于批处理作业每次运行时都会从头重新生成输出,因此任务失败比在线系统更容易处理。系统可以删除失败执行的中间输出,并将任务安排在另一台机器上重新运行。然而,因为单个任务失败就重新运行整个作业会很浪费。因此,MapReduce 及其后继者保持并行任务的执行相互独立,以便它们可以在单个任务粒度上重试工作 [3]。

当一个任务的输出成为工作流中另一个任务的输入时,容错更加棘手。MapReduce 通过始终将这些中间数据写回分布式文件系统,并在允许其他任务读取数据之前等待写入任务成功完成来解决此问题。即使在抢占很常见的环境中,这也能工作,但这意味着大量向 DFS 写入,可能效率低下。

Spark 将中间数据保留在内存中(如果容纳不下则“溢出”到本地磁盘),并且只将最终结果写入 DFS。它还跟踪中间数据的计算方式,以便在中间数据丢失时可以由 Spark 重新计算 [18]。Flink 使用了不同的方法,基于定期对任务快照进行检查点 [19]。我们将在“数据流引擎”(第 468 页)中回到这个主题。

批处理模型

我们已经了解了如何调度批处理作业的分布式环境。现在让我们把注意力转向批处理框架如何处理数据。最常见的两种模型是 MapReduce 和数据流引擎。尽管在实践中数据流引擎已在很大程度上取代了 MapReduce,但了解 MapReduce 的工作原理仍然很有用,因为它影响了许多现代批处理框架。

MapReduce 和数据流引擎已经发展出支持多种编程模型,包括低级编程 API、关系查询语言和 DataFrame API。多种选项使应用工程师、分析工程师、业务分析师甚至非技术员工能够处理公司数据,以满足各种用例,我们将在“批处理用例”(第 476 页)中讨论。

MapReduce

MapReduce 中的数据处理模式与“简单日志分析”(第 454 页)中的 Web 服务器日志分析示例非常相似:

  1. 读取一组输入文件并将其分解为记录。在 Web 服务器日志示例中,每条记录是日志中的一行(即 \n 是记录分隔符)。在 Hadoop 的 MapReduce 中,输入文件存储在像 HDFS 这样的分布式文件系统或像 S3 这样的对象存储中。使用了各种文件格式,例如 Apache Parquet(一种列式存储格式)。

第11章:批处理

MapReduce 与函数式编程(续)

在 Web 服务器日志示例中:

  1. 将数据拆分成记录。每条记录是日志中的一行(即 \n 是记录分隔符)。在 Hadoop 的 MapReduce 中,输入文件存储在分布式文件系统(如 HDFS)或对象存储(如 S3)中。使用多种文件格式,例如 Apache Parquet(一种列式存储,详见第136页的“列式存储”)或 Apache Avro(一种行式格式,详见第172页的“Avro”)。
  2. 对每条输入记录调用 mapper 函数来提取键和值。在 Unix 工具示例中,mapper 函数是 awk '{print $7}',它提取 URL($7)作为键,值留空。
  3. 按键对所有键值对排序。在日志示例中,这由第一个 sort 命令完成。
  4. 调用 reducer 函数遍历排序后的键值对。如果同一个键出现多次,排序已使它们在列表中相邻,因此可以轻松合并这些值,无需在内存中保留大量状态。在 Unix 工具示例中,reducer 由命令 uniq -c 实现,它统计具有相同键的相邻记录的数量。

这四步可以由一个 MapReduce 作业完成。步骤 2(map)和步骤 4(reduce)是你编写自定义数据处理代码的地方。步骤 1(将文件拆分成记录)由输入格式解析器处理。步骤 3(排序步骤)在 MapReduce 中是隐式的——你不需要编写它,因为 mapper 的输出在交给 reducer 之前总是经过排序。这个排序步骤是基础的批处理算法,我们将在第469页的“数据混洗”中再次讨论。

要创建一个 MapReduce 作业,你需要实现两个回调函数:mapper 和 reducer,它们的行为如下:

  • Mapper:对每条输入记录调用一次,其任务是从记录中提取键和值。对于每个输入,它可以生成任意数量的键值对(包括零个)。它不会在输入记录之间保留任何状态,因此每条记录都是独立处理的。可以有多个 mapper 并行运行在输入的不同部分上。
  • Reducer:MapReduce 框架收集 mapper 产生的键值对,将所有属于同一个键的值汇集在一起,然后使用一个迭代器调用 reducer,该迭代器遍历这些值的集合。reducer 可以产生输出记录(例如同一个 URL 的出现次数)。不同键的 reducer 也可以并行运行。

在 Web 服务器日志示例中,我们在步骤 5 中还有第二个 sort 命令,它按请求次数对 URL 进行排名。在 MapReduce 中,如果需要第二个排序阶段,可以通过编写第二个 MapReduce 作业并将第一个作业的输出作为第二个作业的输入来实现。如此看来,mapper 的角色是通过将数据放入适合排序的形式来准备数据,而 reducer 的角色是处理已排序的数据。

批处理模型 | 467


MapReduce 与函数式编程

尽管 MapReduce 用于批处理,但其编程模型源自函数式编程。Lisp 引入了 mapreduce(或 fold)作为列表上的高阶函数,并已进入主流语言如 Python、Rust 和 Java。

许多常见的数据处理操作(包括 SQL 提供的操作)都可以在 MapReduce 之上实现。避免可变状态的函数式编程原则有助于实现并行执行。由于对 mapper 和 reducer 的每次调用仅依赖于 MapReduce 框架显式传递给这些函数的数据,因此框架可以在不同节点上自由并行运行独立的调用。如果某个任务失败,框架可以在另一个节点上以相同输入再次调用 mapper 和 reducer。

使用原始的 MapReduce API 实现复杂处理作业实际上相当费力——例如,作业使用的任何连接算法都需要从头开始实现 [20]。与更现代的批处理引擎相比,MapReduce 也相当慢。一个原因是其基于文件的 I/O 阻止了作业流水线化(即,在上游作业完成之前,下游作业无法处理输出数据)。


数据流引擎

为了解决 MapReduce 的一些问题,开发了几种新的分布式批量计算执行引擎,其中最著名的是 Spark [18, 21] 和 Flink [19]。它们的设计不同,但有一个共同点:它们将整个工作流作为一个作业来处理,而不是将其拆分为独立的子作业。

由于这些系统显式地建模数据流通过多个处理阶段,因此被称为 数据流引擎。与 MapReduce 类似,它们支持低级 API,该 API 重复调用用户定义的函数一次处理一条记录,但它们也提供更高级的运算符,如 joingroup by。它们通过分片输入来并行化工作,并将一个任务的输出通过网络复制到另一个任务的输入。与 MapReduce 不同的是,运算符不必承担交替 map 和 reduce 的严格角色,而是可以以更灵活的方式组合。

这些数据流 API 通常使用关系型风格的构建块来表达计算:根据字段值连接数据集;按键分组元组;按条件过滤;以及通过计数、求和或其他函数聚合元组。在内部,这些操作使用我们将在下一节讨论的混洗算法来实现。

468 | 第11章:批处理

这种处理引擎风格基于 Dryad [22] 和 Nephele [23] 等研究系统,与 MapReduce 模型相比具有多个优势:

  • 昂贵的操作(如排序)只需在必要的地方执行,而不是在每次 map 和 reduce 阶段之间都默认执行。
  • 当连续有多个不改变数据集分片的运算符(如 mapfilter)时,它们可以合并到单个任务中,减少数据复制开销。
  • 由于工作流中的所有连接和数据依赖关系都显式声明,调度器可以全面了解需要什么数据以及在哪里需要,因此可以进行局部性优化。例如,它可以尝试将消费某些数据的任务放置在产生这些数据的同一台机器上,这样数据可以通过共享内存缓冲区交换,而无需通过网络复制。
  • 通常,运算符之间的中间状态只需保存在内存或写入本地磁盘即可,这比写入分布式文件系统或对象存储(需要复制到多个机器并在每个副本上写入磁盘)所需的 I/O 更少。MapReduce 已经对 mapper 输出使用了这种优化,但数据流引擎将此想法推广到所有中间状态。
  • 运算符可以在输入准备好后立即开始执行,无需等待整个前一阶段完成后再开始下一阶段。
  • 可以重用现有进程来运行新运算符,与 MapReduce(为每个任务启动新 JVM)相比,减少了启动开销。

你可以使用数据流引擎实现与 MapReduce 工作流相同的计算,并且由于这里描述的优化,它们通常会执行得更快。


数据混洗

正如我们所见,本章开头的 Unix 工具示例和 MapReduce 都基于排序。批处理引擎必须能够对 PB 级的数据集进行排序,这些数据集太大,无法容纳在单台机器上。因此,它们需要一种分布式排序算法,其中输入和输出都是分片的。这样的算法称为混洗(shuffle)

批处理模型 | 469

混洗不是随机的

“混洗”这个术语可能令人困惑。当你洗牌时,最终得到的是随机顺序。相反,我们这里讨论的混洗产生的是排序后的顺序,没有随机性。

混洗是批处理引擎的基础算法,用于连接和聚合。MapReduce、Spark、Flink、Daft、Dataflow 和 BigQuery [24] 都实现了可扩展且高性能的混洗算法,以处理大型数据集。我们将以 Hadoop MapReduce [25] 中的混洗为例进行说明,但本节中的概念也适用于其他系统。

图 11-1 显示了 MapReduce 作业中的数据流。我们假设作业的输入是分片的,并且这些分片标记为 m₁、m₂ 和 m₃。例如,每个分片可能是 HDFS 上的一个单独文件或对象存储中的一个单独对象,并且属于同一数据集的所有分片被分组到同一个 HDFS 目录中,或者在对象存储桶中具有相同的键前缀。

图 11-1. 一个包含三个 mapper 和三个 reducer 的 MapReduce 作业

框架为每个输入分片启动一个独立的 map 任务。任务读取其分配的文件,一次将一条记录传递给 mapper 回调。计算的 reduce 侧也是分片的。虽然 map 任务的数量由输入分片的数量决定,但 reduce 任务的数量由作业作者配置(它可以不同于 map 任务的数量)。

470 | 第11章:批处理

(图 11-1 的数据流示意:三个 mapper 分别处理三个输入分片,每个 mapper 输出中间键值对;这些中间数据经过分区(通常根据键的哈希值)被发送到三个 reducer;每个 reducer 从所有 mapper 接收属于其分区的数据,按键排序后传递给 reducer 函数,最终产生输出。)

第11章:批处理

映射器的输出由键值对组成,框架需要确保:如果两个映射器输出相同的键,这些键值对最终由同一个归约器任务处理。为实现这一点,每个映射器在其本地磁盘上为每个归约器创建一个独立的输出文件(例如,图11-1中的文件m1, r2是映射器1创建的、旨在发送给归约器2的文件)。当映射器输出一个键值对时,通常由键的哈希值决定写入哪个归约器文件(类似于按键哈希分片所描述的过程,见第258页)。

在映射器写入这些文件时,它还会对每个文件内部的键值对进行排序。这可以通过我们在“日志结构存储”(第118页)中看到的技术来完成:首先将一批批的键值对收集到内存中的排序数据结构中,然后写出为已排序的段文件,并将较小的段文件逐步合并为较大的文件。

每个映射器完成后,归约器连接到该映射器,并将相应已排序的键值对文件复制到自己的本地磁盘。一旦归约任务从所有映射器那里获得了其应处理的输出份额,它就会以归并排序的方式将这些文件合并在一起,保持排序顺序。具有相同键的键值对现在连续排列,即使它们来自不同的映射器。然后,每个键调用一次归约函数,每次提供一个迭代器,返回该键的所有值。

归约函数输出的任何记录都按顺序写入一个文件,每个归约任务对应一个文件。这些文件(图11-1中的r1、r2和r3)成为作业输出数据集的分片,并被写回分布式文件系统或对象存储。

尽管MapReduce在其映射和归约步骤之间执行洗牌步骤,但现代数据流引擎和云数据仓库更为复杂。像BigQuery这样的系统已经优化了其洗牌算法,以将数据保留在内存中,并写入外部排序服务[24]。此类服务加快洗牌速度,并通过复制洗牌后的数据来提供韧性。

连接与分组

让我们看看排序数据如何简化分布式连接和聚合。为了说明,我们将继续以MapReduce为例,尽管这些概念适用于大多数批处理系统。

图11-2展示了一个批处理作业中连接的典型示例。左侧是描述登录用户网站上操作的事件日志(称为活动事件或点击流数据),右侧是用户数据库。你可以将此示例视为星型模式的一部分(参见第77页的“星型和雪花型:分析用模式”);事件日志是事实表,用户数据库是维度之一。

图11-2. 用户活动事件日志与用户配置文件数据库之间的连接

如果你想要对活动事件进行分析,同时考虑来自用户数据库的信息(例如,利用用户配置文件中的出生日期字段,判断某些页面更受年轻用户还是年长用户欢迎),则需要计算这两个表之间的连接。假设这两个表都非常大,必须分片,如何计算该连接?

你可以利用MapReduce的一个特性:洗牌会将所有具有相同键的键值对(无论它们最初位于哪个分片)汇集到同一个归约器。这里,用户ID可以作为键。因此,你可以编写一个映射器,遍历用户活动事件,并发出以用户ID为键的页面视图URL,如图11-3所示。另一个映射器逐行遍历用户数据库,提取用户ID作为键,用户的出生日期作为值。

洗牌确保归约函数能够同时访问特定用户的出生日期和该用户的所有页面视图事件。MapReduce作业甚至可以安排记录的排序顺序,使得归约器总是先看到用户数据库的记录,然后是按时间戳排序的活动事件。这种技术被称为二次排序 [25]。

归约器现在可以轻松执行实际的连接逻辑。第一个值预期是出生日期,归约器将其存储在本地变量中。然后它遍历具有相同用户ID的活动事件,输出每个被查看的URL以及查看者的出生日期。由于归约器一次性处理特定用户ID的所有记录,它任何时候只需在内存中保留一条用户记录,并且永远不需要通过网络发出任何请求。这种算法被称为排序合并连接,因为映射器的输出按键排序,归约器随后将来自连接两侧的已排序记录列表合并在一起。

工作流中的下一个MapReduce作业可以计算每个URL的查看者年龄分布。为此,作业首先以URL为键对数据进行洗牌。排序后,归约器遍历单个URL的所有页面视图(带有查看者出生日期),为每个年龄组维护一个查看计数,并对每个页面视图递增相应的计数器。这样,你就可以实现分组操作和聚合。

图11-3. 基于用户ID的排序合并连接;如果输入数据集被分片到多个文件中,每个文件可以由多个映射器并行处理

查询语言

多年来,分布式批处理的执行引擎已经成熟。基础设施现在足够稳健,可以在超过10,000台机器的集群上存储和处理数PB的数据。随着在如此规模下物理运行批处理的问题基本上得到解决,人们的注意力转向了改进编程模型。

MapReduce、数据流引擎和云数据仓库都采用了SQL作为批处理的标准通用语言。这是很自然的,因为传统数据仓库使用SQL,数据分析与ETL工具已经支持它,而且所有开发人员和分析师都熟悉它。

与手写MapReduce作业相比,这些查询语言接口不仅需要的代码更少,还允许交互式使用:你可以编写分析查询并从终端或GUI运行它们。这种交互式查询风格对于业务分析师、产品经理、销售和财务团队以及其他人员在批处理环境中探索数据而言,是一种高效且自然的方式。SQL的支持也使分布式批处理系统适用于探索性查询。

高级查询语言不仅让使用该系统的用户更高效,还提高了机器级别的作业执行效率。正如我们在“云数据仓库”(第135页)中所看到的,查询引擎负责将SQL查询转换为要在集群中执行的批处理作业。从查询到语法树再到物理操作符的这种转换步骤使得引擎能够优化查询。像Hive、Trino、Spark和Flink这样的查询引擎拥有基于成本的查询优化器,可以分析连接输入的属性,并自动决定最适合当前任务的算法。优化器甚至可以改变连接的顺序,以最小化中间状态的数量 [19, 26, 27, 28]。

虽然SQL是最流行的通用批处理查询语言,但其他语言仍然用于特定需求。例如,Apache Pig是一种基于关系操作符的语言,允许逐步指定数据管道,而不是作为一个大的SQL查询。DataFrame(在下一节讨论)具有类似的特性,而Morel是一种更现代的语言,受Pig影响。其他用户采用了JSON查询语言,例如jq、JMESPath或JSONPath。

在“图式数据模型”(第84页)中,我们讨论了使用图来建模数据,并使用图查询语言遍历图中的边和顶点。许多图处理框架也通过诸如Apache TinkerPop的Gremlin之类的查询语言支持批计算。我们将在“批处理用例”(第476页)中更详细地查看图处理场景。

批处理与云数据仓库的融合

历史上,数据仓库运行在专用硬件设备上,支持基于SQL的、关系型数据上的分析查询。而像MapReduce这样的批处理框架旨在通过支持用通用编程语言编写的处理逻辑,提供更大的可扩展性和灵活性,允许读写任意数据格式。

随着时间的推移,二者已经变得非常相似。现代批处理框架现在支持SQL作为编写批处理作业的语言,并通过使用列式存储格式(如Parquet)和优化的查询执行引擎(参见第142页的“查询执行:编译与向量化”)在关系型查询上取得了良好性能。同时,数据仓库通过迁移到云(参见第135页的“云数据仓库”)并实现了分布式批处理框架所使用的许多相同调度、容错和洗牌技术,从而变得更加可扩展。许多数据仓库也使用分布式文件系统。

正如批处理系统采用SQL作为处理模型一样,云仓库也采用了替代处理模型。例如,BigQuery提供了DataFrames库,Snowflake的Snowpark库与Pandas集成。批处理工作流编排器(如Airflow、Prefect和Dagster)也与云仓库集成。

第11章:批处理

并非所有批处理作业都能轻易用SQL表达,包括PageRank等迭代图算法、复杂机器学习任务以及许多其他工作流。AI数据处理(涉及非关系型和多模态数据,如图像、视频和音频)也难以用SQL表达。

云数据仓库在某些工作负载上也存在困难。逐行计算在使用列式存储格式时效率较低;在这种情况下,更适合使用替代的仓库API或批处理系统。云数据仓库的成本通常也高于其他批处理系统。在Spark或Flink等批处理系统中运行大型作业往往更具成本效益。

最终,在批处理系统和数据仓库之间处理数据的决策通常取决于成本、便捷性、实现难度和可用性等因素。大多数大型企业拥有多个数据处理系统,这使他们在决策上具有灵活性。小型公司通常只用一个系统就能应付。

数据框(DataFrames)

数据科学家和统计学家通常习惯于使用R和Pandas中的DataFrame数据模型(参见第105页的“DataFrames、矩阵和数组”)。DataFrame类似于关系数据库中的表:它是行的集合,同一列中的所有值具有相同类型。用户无需编写一个大的SQL查询,而是调用与关系运算符对应的函数来执行过滤、连接、排序、聚合等操作。

最初,DataFrame操作通常在本地内存中执行。因此,DataFrame仅限于适合单台机器的数据集。数据科学家希望使用他们熟悉的DataFrame API来与批处理环境中的大型数据集交互,因为SQL和MapReduce不太适合他们的需求。Spark、Flink和Daft等分布式数据处理框架已采用DataFrame API来满足这一需求。不过,它们的实现方式有所不同:本地DataFrame通常带有索引和排序,而分布式DataFrame通常不具备29。这可能导致在迁移到批处理框架时出现性能意外。

DataFrame API看起来与数据流API类似,但实现方式各异。Pandas在调用DataFrame方法时会立即执行操作,而Spark首先将所有DataFrame API调用转换为查询计划,并在基于其分布式数据流引擎执行工作流之前运行查询优化。

Daft等框架甚至同时支持客户端和服务端计算。较小的内存操作在客户端执行,较大的数据集则在服务端处理。Apache Arrow等列式存储格式提供了统一的数据模型,客户端和服务端执行引擎均可共享。

批处理模型

批处理用例

现在我们已经了解了批处理的工作方式,接下来看看它如何应用于一系列应用程序。批处理作业非常适合批量处理大型数据集,但不适合低延迟场景。因此,凡是有大量数据且数据新鲜度不重要的地方,你都会发现批处理作业。这听起来可能有限制,但事实证明,大量数据处理任务都符合这一模型。例如:

  • 会计和库存对账(公司验证交易是否与银行账户和库存一致)通常作为批处理作业执行30
  • 在制造业中,需求预测通常作为定期批处理作业运行31
  • 电子商务、媒体和社交媒体公司通过批处理作业训练其推荐模型32, 33
  • 许多金融系统基于批处理;例如,美国银行网络几乎完全依靠批处理作业运行34

在以下章节中,我们将讨论几乎每个行业都会遇到的一些批处理用例。

提取-转换-加载(ETL)

第7页的“数据仓库”介绍了ETL和ELT,其中数据处理管道从生产数据库中提取数据,进行转换,然后将结果加载到下游系统中(在本节中,我们使用“ETL”来代表ETL和ELT两种工作负载)。批处理作业通常用于此类工作负载,尤其是当下游系统为数据仓库时。

批处理作业的并行特性使其非常适合数据转换,其中许多转换涉及“令人尴尬的并行”工作负载。过滤数据、投影字段以及许多其他常见的数据仓库转换都可以并行完成。

批处理环境还配备了强大的工作流调度器,可以轻松地调度、编排和调试ETL数据管道作业。当发生故障时,调度器通常会重试作业以缓解可能出现的临时问题。反复失败的作业将被标记为失败,这有助于开发人员轻松查看数据管道中哪个作业停止工作。像Airflow这样的调度器甚至内置了针对MySQL、PostgreSQL、Snowflake、Spark、Flink以及数十种其他流行系统的源、汇和查询运算符。调度器与数据处理系统之间的紧密集成简化了数据集成。

我们还了解到,当出现问题时,批处理作业易于排查和修复。这一特性在调试数据管道时非常宝贵。可以轻松检查失败的文件以查看问题所在,并且可以修复并重新运行ETL批处理作业。例如,如果输入文件不包含转换批处理作业打算使用的字段,数据工程师可以轻松发现该字段缺失,并更新转换逻辑或生成输入的作业。

过去,数据管道通常由单个数据工程团队管理,因为要求其他负责产品功能的团队编写和管理复杂的批处理数据管道被认为是不合理的。最近,批处理模型和元数据管理方面的改进使得整个组织的工程师可以更轻松地贡献和管理自己的数据管道。数据网格35, 36、数据合约37和数据编织38实践提供了标准和工具,帮助团队安全地发布数据,供组织中的任何人使用。

数据管道和分析查询不仅开始共享处理模型,还共享执行引擎。许多批处理ETL作业现在与读取其输出的分析查询运行在相同的系统上。数据管道转换和分析查询都作为SparkSQL、Trino或DuckDB查询运行的情况并不少见。这种架构进一步模糊了应用工程、数据工程、分析工程和业务分析之间的界限。

分析

在第3页的“运营系统与分析系统”中,我们看到分析查询(OLAP)通常扫描大量记录,执行分组和聚合。可以在批处理系统中与其它批处理工作负载一起运行此类工作负载。分析师编写在查询引擎上执行的SQL查询,该引擎从分布式文件系统或对象存储中读取并写入。表元数据(如表到文件的映射、名称和类型)通过Apache Iceberg等表格式和Unity等目录进行管理(参见第135页的“云数据仓库”)。这种架构称为数据湖仓39

与ETL类似,SQL查询接口的改进意味着许多组织现在使用Spark等批处理框架进行分析。此类查询模式有两种风格:

预聚合查询:数据被汇总到OLAP多维数据集或数据集市中以加速查询(参见第143页的“物化视图和数据立方体”)。预聚合数据在仓库中查询,或推送到专门的实时OLAP系统,如Apache Druid或Apache Pinot。预聚合通常按计划的时间间隔进行。第464页“调度工作流”中讨论的工作流调度器用于管理这些工作负载。

临时查询:用户运行这些查询来回答特定的业务问题、调查用户行为、调试操作问题等。对于此用例,响应时间很重要。分析师在获得响应并进一步了解所调查的数据时,会迭代地运行查询。具有快速查询执行的批处理框架有助于减少分析师的等待时间。

SQL支持使批处理框架能够与电子表格和数据可视化工具(如Tableau、Power BI、Looker和Apache Superset)集成。例如,Tableau提供SparkSQL和Presto连接器,而Apache Superset支持Trino、Hive、Spark SQL、Presto以及许多其他最终通过执行批处理作业来查询数据的系统。

机器学习

机器学习(ML)频繁使用批处理。数据科学家、机器学习工程师和AI工程师使用批处理框架来调查数据模式、转换数据和训练ML模型。常见用途包括:

特征工程:将原始数据过滤并转换为模型可训练的数据。预测模型通常需要数值型数据,因此工程师必须将其他形式的数据(如文本或离散值)转换为所需格式。

模型训练:训练数据是批处理过程的输入,训练后的模型权重是输出。

批量推理:如果数据集很大且不需要实时结果,可以使用训练好的模型进行批量预测。这包括在测试数据集上评估模型的预测结果。

批处理框架提供了专门用于这些用例的工具。例如,Apache Spark的MLlib和Apache Flink的FlinkML提供了广泛的特征工程工具、统计函数和分类器。

推荐引擎和排序系统等ML应用也大量使用图处理(参见第84页的“图状数据模型”)。许多图算法通过一次一条边地遍历来表达,将一个顶点与相邻顶点连接以传播某些信息,并重复直到满足特定条件——例如,直到没有更多边可走,或者直到某个指标收敛。

批量同步并行(BSP)计算模型40在批处理图中变得流行;它由Apache Giraph 20、Spark的GraphX

第11章:批处理

…直到满足某个条件——例如,直到没有更多的边可以跟随,或者直到某个指标收敛。

同步并行批量(BSP)计算模型 [40] 在批处理图数据方面变得流行;它由 Apache Giraph [20]、Spark 的 GraphX API 以及 Flink 的 Gelly API [41] 等实现。它也被称为 Pregel 模型,因为 Google 的 Pregel 论文推广了这种处理图的方法 [42]。

批处理也是大型语言模型数据准备和训练中不可或缺的一部分。原始文本输入数据(例如网站内容)通常位于分布式文件系统或对象存储中。这些数据必须经过预处理才能适用于训练。非常适合批处理框架的预处理步骤包括:

  • 从 HTML 中提取纯文本并修复格式错误的文本。
  • 检测并移除低质量、不相关和重复的文档。
  • 对文本进行分词(将其拆分为单词)并将其转换为嵌入(即每个单词的数字表示)。

诸如 Kubeflow、Flyte 和 Ray 之类的批处理框架是专门为此类工作负载构建的。例如,OpenAI 在其 ChatGPT 训练过程中使用了 Ray [43]。这些框架与 PyTorch、TensorFlow、XGBoost 等 LLM 和 AI 库有内置集成。它们还提供对特征工程、模型训练、批量推理和微调(针对特定用例调整基础模型)的内置支持。

最后,数据科学家经常在交互式笔记本(如 Jupyter 或 Hex)中实验数据。笔记本由单元格组成,这些单元格是 Markdown、Python 或 SQL 的小块。单元格按顺序执行以生成电子表格、图表或数据。许多笔记本通过 DataFrame API 使用批处理,或使用 SQL 查询此类系统。

服务派生数据

批作业通常用于构建预计算或派生数据集,例如产品推荐、面向用户的报告和 ML 模型的特征。这些数据集通常从生产数据库、键值存储或搜索引擎中提供服务。

无论使用什么系统,预计算数据都需要从批处理器的分布式文件系统或对象存储传输到服务于实时流量的数据库中。

你可能倾向于在批作业中直接使用你喜爱数据库的客户端库,并逐条记录直接写入数据库服务器。这样做是可行的(假设你的防火墙规则允许从批处理环境直接访问生产数据库),但出于以下几个原因,这是一个坏主意:

WARNING

  • 性能严重下降:为每条记录发起一个网络请求比批处理任务的正常吞吐量慢几个数量级。即使客户端库支持批处理,性能也可能很差。
  • 数据库不堪重负:批处理框架通常并行运行许多任务。如果所有任务以批处理预期的速率同时写入同一个输出数据库,该数据库很容易被压垮,其查询性能可能会严重下降。这反过来可能导致系统中其他部分的运行问题 [44]。
  • 外部副作用与原子性冲突:通常,批处理作业为作业输出提供干净的全有或全无保证。如果作业成功,结果是每个任务恰好执行一次的输出(即使某些任务失败并在过程中被重试);如果整个作业失败,则不产生任何输出。然而,从作业内部写入外部系统会产生外部可见的副作用,这些副作用无法以这种方式隐藏。因此,你必须担心部分完成的作业的结果被其他系统看到。如果任务失败并重启,它可能会重复失败执行时的输出。

一个更好的解决方案是让批处理作业将预计算的数据集推送到诸如 Kafka 主题之类的流中,我们将在第 12 章进一步讨论。像 Elasticsearch 这样的搜索引擎、Apache Pinot 和 Apache Druid 这样的实时 OLAP 系统、Venice [45] 这样的派生数据存储,以及 ClickHouse 这样的云数据仓库,都内置了从 Kafka 摄取数据到其系统的能力。通过流式系统推送数据解决了上述几个问题:

  • 流式系统针对顺序写入进行了优化,这使其更适合批处理作业的批量写入工作负载。
  • 流式系统可以作为批处理作业和生产数据库之间的缓冲区。下游系统可以调节其读取速率,以确保它们能够继续舒适地服务生产流量。
  • 单个批处理作业的输出可以被多个下游系统消费。
  • 流式系统可以作为批处理环境和生产网络之间的安全边界。它们可以部署在所谓的非军事区 (DMZ) 网络中,该网络位于批处理网络和生产网络之间。

流式本身并不能固有地解决全有或全无保证的问题。为了实现这一点,在批处理作业完成后,必须向下游系统发送通知,告知其作业已完成且数据现在可以提供服务。流的消费者需要能够将收到的数据保持对查询不可见(就像具有读已提交隔离级别的未提交事务,参见第 290 页的“读已提交”),直到它们收到作业完成的通知。

另一种在引导数据库时更常见的模式是在批处理作业内部构建一个全新的数据库,然后将这些文件从分布式文件系统、对象存储或本地文件系统批量直接加载到数据库中。许多数据系统提供批量导入工具,例如 TiDB 的 Lightning 和 Apache Pinot 的 Hadoop 导入作业。RocksDB 也提供一个 API,用于从批处理作业批量导入排序字符串表 (SST) 文件。

在批处理中构建数据库并批量导入数据非常快速,并且使系统更容易在数据集版本之间进行原子切换。另一方面,从构建全新数据库的批处理作业中增量更新数据集可能具有挑战性。当同时需要引导和增量加载时,通常采用混合方法。例如,Venice 支持允许批量基于行的更新和完整数据集交换的混合存储。

总结

在本章中,我们探讨了批处理系统的设计和实现。我们从经典的 Unix 工具链(awk、sort、uniq 等)开始,以说明排序和计数等基本批处理原语。

然后我们扩展到分布式批处理系统。批处理框架处理不可变的、有界的输入数据集以产生输出数据,允许重新运行和调试而不会产生副作用。这种处理涉及三个主要组成部分:一个编排层,决定作业运行的地点和时间;一个存储层,用于持久化数据;以及一个计算层,处理实际数据。

我们研究了分布式文件系统和对象存储如何通过基于块的复制、缓存和元数据服务来管理大文件,以及现代批处理框架如何通过可插拔 API 与这些系统交互。我们还讨论了作业编排器如何调度任务、分配资源以及处理大型集群中的故障,并将它们与管理工作流编排器进行比较,后者管理按依赖关系图运行的一组作业的生命周期。

我们调研了批处理模型,从 MapReduce 及其经典的 map 和 reduce 函数开始。接下来,我们转向了像 Spark 和 Flink 这样的数据流引擎,它们提供了更易于使用的数据流 API 和更好的性能。为了理解批处理作业如何扩展,我们介绍了 shuffle 算法,这是一种支持分组、连接和聚合的基础操作。

我们看到,随着批处理系统的成熟,重点转向了易用性。增加了对高级查询语言(如 SQL 和 DataFrame API)的支持,使批处理作业更易于访问和优化。批处理框架自动确定如何以这些语言编写的作业在集群机器上高效执行。

我们以对常见批处理用例的调研结束了本章,包括以下内容:

  • ETL 管道:使用计划的工作流在系统之间提取、转换和加载数据。
  • 分析:批处理作业支持预聚合查询和即席查询。
  • 机器学习:批处理作业用于准备和处理大型训练数据集。
  • 从批处理输出填充生产系统:通常通过流或批量加载工具,将派生数据提供给用户。

在下一章中,我们将转向流处理,其中输入是无界的——即作业的输入是永无止境的数据流。这意味着作业永远不会完成,因为随时可能有更多的工作进入。我们将看到流处理和批处理在某些方面是相似的,但无界流的假设也对如何构建系统有重大影响。

参考文献

[1] Nathan Marz. “How to Beat the CAP Theorem.” nathanmarz.com, October 2011. Archived at perma.cc/4BS9-R9A4

[2] Molly Bartlett Dishman and Martin Fowler. “Agile Architecture.” At O’Reilly Software Architecture Conference, March 2015.

[3] Jeffrey Dean and Sanjay Ghemawat. “MapReduce: Simplified Data Processing on Large Clusters.” At 6th USENIX Symposium on Operating System Design and Implementation (OSDI), December 2004.

[4] Shivnath Babu and Herodotos Herodotou. “Massively Parallel Databases and MapReduce Systems.” Foundations and Trends in Databases, volume 5, issue 1, pages 1–104, November 2013. doi:10.1561/1900000036

[5] David J. DeWitt and Michael Stonebraker. “MapReduce: A Major Step Backwards.” Originally published at databasecolumn.vertica.com, January 2008. Archived at perma.cc/U8PA-K48V

[6] Henry Robinson. “The Elephant Was a Trojan Horse: On the Death of Map-Reduce at Google.” the-paper-trail.org, June 2014. Archived at perma.cc/9FEM-X787

[7] Urs Hölzle. “R.I.P. MapReduce. After having served us well since 2003, today we removed the remaining internal codebase for good.” x.com, September 2019. Archived at perma.cc/B34T-LLY7

第11章:批处理

[8] Adam Drake. “命令行工具比Hadoop集群快235倍。” aadrake.com,2014年1月。存档于 perma.cc/87SP-ZMCY

[9] “sort: 排序文本文件。” GNU Coreutils 9.7 文档,自由软件基金会公司,2025年。存档于 perma.cc/68KN-E8TL

[10] Michael Ovsiannikov, Silvius Rus, Damian Reeves, Paul Sutter, Sriram Rao, 和 Jim Kelly. “Quantcast文件系统。” VLDB基金会会刊,第6卷,第11期,第1092–1101页,2013年8月。doi:10.14778/2536222.2536234

[11] Andrew Wang, Zhe Zhang, Kai Zheng, Uma Maheswara G., 和 Vinayakumar B. “Apache Hadoop中HDFS纠删码简介。” blog.cloudera.com,2015年9月。存档于 archive.org

[12] Andy Warfield. “构建和运营一个名为S3的巨大存储系统。” allthingsdistributed.com,2023年7月。存档于 perma.cc/7LPK-TP7V

[13] Vinod Kumar Vavilapalli, Arun C. Murthy, Chris Douglas, Sharad Agarwal, Mahadev Konar, Robert Evans, Thomas Graves, Jason Lowe, Hitesh Shah, Siddharth Seth, Bikas Saha, Carlo Curino, Owen O’Malley, Sanjay Radia, Benjamin Reed, 和 Eric Baldeschwieler. “Apache Hadoop YARN: 又一个资源协商器。” 在第4届云计算年度研讨会 (SoCC) 上,2013年10月。doi:10.1145/2523616.2523633

[14] Richard M. Karp. “组合问题之间的可归约性。” 计算机计算的复杂性。IBM研究研讨会系列。Springer,1972年。doi:10.1007/978-1-4684-2001-2_9

[15] J. D. Ullman. “NP完全调度问题。” 计算机与系统科学杂志,第10卷,第3期,第384–393页,1975年6月。doi:10.1016/S0022-0000(75)80008-0

[16] Gilad David Maayan. “AWS、Azure和GCP上竞价实例的完整指南。” datacenterdynamics.com,2021年3月。存档于 archive.org

[17] Abhishek Verma, Luis Pedrosa, Madhukar Korupolu, David Oppenheimer, Eric Tune, 和 John Wilkes. “使用Borg在Google进行大规模集群管理。” 在第10届欧洲计算机系统会议 (EuroSys) 上,2015年4月。doi:10.1145/2741948.2741964

[18] Matei Zaharia, Mosharaf Chowdhury, Tathagata Das, Ankur Dave, Justin Ma, Murphy McCauley, Michael J. Franklin, Scott Shenker, 和 Ion Stoica. “弹性分布式数据集:一种用于内存集群计算的容错抽象。” 在第9届USENIX网络系统设计与实现研讨会 (NSDI) 上,2012年4月。

摘要 | 483

[19] Paris Carbone, Stephan Ewen, Seif Haridi, Asterios Katsifodimos, Volker Markl, 和 Kostas Tzoumas. “Apache Flink:单一引擎中的流处理和批处理。” IEEE计算机学会数据工程技术委员会公报,第38卷,第4期,第28–38页,2015年12月。存档于 perma.cc/G3N3-BKX5

[20] Mark Grover, Ted Malaska, Jonathan Seidman, 和 Gwen Shapira. Hadoop应用架构。O’Reilly Media,2015年。ISBN: 9781491900048

[21] Jules S. Damji, Brooke Wenig, Tathagata Das, 和 Denny Lee. 学习Spark,第2版。O’Reilly Media,2020年。ISBN: 9781492050049

[22] Michael Isard, Mihai Budiu, Yuan Yu, Andrew Birrell, 和 Dennis Fetterly. “Dryad:由顺序构建模块构成的分布式数据并行程序。” 在第2届欧洲计算机系统会议 (EuroSys) 上,2007年3月。doi:10.1145/1272996.1273005

[23] Daniel Warneke 和 Odej Kao. “Nephele:云中的高效并行数据处理。” 在第2届网格与超级计算机多任务计算研讨会 (MTAGS) 上,2009年11月。doi:10.1145/1646468.1646476

[24] Hossein Ahmadi. “Google BigQuery中的内存查询执行。” cloud.google.com,2016年8月。存档于 perma.cc/DGG2-FL9W

[25] Tom White. Hadoop权威指南,第4版。O’Reilly Media,2015年。ISBN: 9781491901632

[26] Fabian Hüske. “窥探Apache Flink的引擎室。” flink.apache.org,2015年3月。存档于 perma.cc/44BW-ALJX

[27] Mostafa Mokhtar. “Hive 0.14基于代价的优化器(CBO)技术概述。” hortonworks.com,2015年3月。存档于 archive.org

[28] Michael Armbrust, Reynold S. Xin, Cheng Lian, Yin Huai, Davies Liu, Joseph K. Bradley, Xiangrui Meng, Tomer Kaftan, Michael J. Franklin, Ali Ghodsi, 和 Matei Zaharia. “Spark SQL:Spark中的关系数据处理。” 在ACM国际数据管理会议 (SIGMOD) 上,2015年6月。doi:10.1145/2723372.2742797

[29] Kaya Kupferschmidt. “Spark vs. Pandas,第2部分——Spark。” towardsdatascience.com,2020年10月。存档于 perma.cc/5BRK-G4N5

[30] Ammar Chalifah. “大规模跟踪支付。” bolt.eu.com,2025年6月。存档于 perma.cc/Q4KX-8K3J

[31] Nafi Ahmet Turgut, Hamza Akyıldız, Hasan Burak Yel, Mehmet İkbal Özmen, Mutlu Polatcan, Pinar Baki, 和 Esra Kayabali. “使用Amazon Forecast在Getir进行需求预测。” aws.amazon.com,2023年5月。存档于 perma.cc/H3H6-GNL7

484 | 第11章:批处理

[32] Jason (Siyu) Zhu. “通过利用大规模语料稀疏ID嵌入增强主页Feed相关性。” linkedin.com,2023年8月。存档于 archive.org

[33] Avery Ching, Sital Kedia, 和 Shuojie Wang. “Apache Spark @Scale:一个60 TB+的生产用例。” engineering.fb.com,2016年8月。存档于 perma.cc/F7R5-YFAV

[34] Edward Kim. “ACH如何工作:开发者视角——第1部分。” engineering.gusto.com,2014年4月。存档于 perma.cc/F67P-VBLK

[35] Zhamak Dehghani. “如何超越单体数据湖走向分布式数据网格。” martinfowler.com,2019年5月。存档于 perma.cc/LN2L-L4VC

[36] Chris Riccomini. “数据网格到底是什么?!” cnr.sh,2021年6月。存档于 perma.cc/NEJ2-BAX3

[37] Chad Sanderson, Mark Freeman, 和 B. E. Schmidt. 数据契约。O’Reilly Media,2025年。ISBN: 9781098157623

[38] Daniel Abadi. “数据结构 vs. 数据网格:有什么区别?” starburst.io,2021年11月。存档于 perma.cc/RSK3-HXDK

[39] Michael Armbrust, Ali Ghodsi, Reynold Xin, 和 Matei Zaharia. “Lakehouse:统一数据仓库和高级分析的新一代开放平台。” 在第11届创新数据系统研究年度会议 (CIDR) 上,2021年1月。存档于 perma.cc/7C6D-T9NR

[40] Leslie G. Valiant. “并行计算的桥接模型。” ACM通讯,第33卷,第8期,第103–111页,1990年8月。doi:10.1145/79173.79181

[41] Stephan Ewen, Kostas Tzoumas, Moritz Kaufmann, 和 Volker Markl. “加速迭代数据流。” VLDB基金会会刊,第5卷,第11期,第1268–1279页,2012年7月。doi:10.14778/2350229.2350245

[42] Grzegorz Malewicz, Matthew H. Austern, Aart J. C. Bik, James C. Dehnert, Ilan Horn, Naty Leiser, 和 Grzegorz Czajkowski. “Pregel:大规模图处理系统。” 在ACM国际数据管理会议 (SIGMOD) 上,2010年6月。doi:10.1145/1807167.1807184

[43] Richard MacManus. “OpenAI在Anyscale的Ray峰会上讨论LLM规模化。” thenewstack.io,2023年9月。存档于 perma.cc/YJD6-KUXU

[44] Jay Kreps. “为什么本地状态是流处理中的基本原语。” oreilly.com,2014年7月。存档于 perma.cc/P8HU-R5LA

[45] Félix GV. “开源Venice——LinkedIn的衍生数据平台。” linkedin.com,2022年9月。存档于 archive.org

摘要 | 485