第12章 流处理
一个能正常运行的复杂系统,总是从一个能正常运行的简单系统演化而来。反之亦然:从头设计的复杂系统从来都不能正常运行,也无法通过修补使其正常运行。
—John Gall, Systemantics (1975)
在第11章中,我们讨论了批处理——将一组文件作为输入,并生成一组新的输出文件的技术。输出是一种派生数据;也就是说,如果需要,可以通过再次运行批处理来重新创建该数据集。我们看到了如何使用这个简单但强大的想法来创建搜索索引、推荐系统、分析系统等。
然而,第11章始终存在一个大前提:即输入是有界的——大小已知且有限——因此批处理过程知道何时完成对输入的读取。例如,MapReduce核心的排序操作必须在读取完所有输入后才能开始产生输出,因为最后一个输入记录可能是具有最小键的记录,因此它需要成为第一个输出记录,所以提前开始输出是不可行的。
实际上,许多数据是无界的,因为它们随时间逐渐到达。你的用户昨天和今天产生了数据,他们明天还会继续产生更多数据。除非你停业,否则这个过程永远不会结束,因此数据集在任何有意义的层面上永远不会“完整” [1]。因此,批处理器必须人为地将数据分成固定时长的块——例如,每天结束时处理一天的数据,或每小时结束时处理一小时的数据。
每日批处理的问题在于,输入的变化会一天后才反映到输出中,这对于许多不耐烦的用户来说太慢了。为了减少延迟,我们可以更频繁地运行处理——例如,每秒结束时处理一秒的数据——甚至连续运行,完全放弃固定的时间片,并在每个事件发生时立即处理它。这就是流处理背后的思想。
一般来说,流指的是随时间增量到达的数据。这个概念出现在许多地方:Unix的stdin和stdout、编程语言(惰性列表)[2]、文件系统API(如Java的FileInputStream)、TCP连接、通过互联网传输的音频和视频,等等。
在本章中,我们将研究作为数据管理机制的事件流:它是上一章中我们看到的有界批处理数据的无界、增量处理对应物。我们将首先讨论流如何表示、存储和通过网络传输,然后研究流与数据库之间的关系。最后,在“处理流”部分(第513页),我们将探讨持续处理这些流的方法和工具,以及如何利用它们构建应用程序。
传输事件流
在批处理世界中,一个作业的输入和输出是文件(可能在分布式文件系统上)。流处理的对等物是什么样子的?
当输入是一个文件(字节序列)时,第一个处理步骤通常是将其解析为记录序列。在流处理上下文中,记录更常被称为事件,但本质上是相同的东西:一个小的、自包含的、不可变的对象,包含某个时间点发生的事件的详细信息。事件通常包含一个时间戳,指示根据一日时钟(参见第359页的“单调时钟与一日时钟”)发生的时间。
例如,发生的事件可能是用户采取的操作,例如查看页面或进行购买。它也可能来自机器,例如温度传感器的定期测量或CPU利用率指标。在第454页的“使用Unix工具进行批处理”示例中,Web服务器日志的每一行都是一个事件。
事件可以编码为文本字符串、JSON,或者可能是二进制形式,如第5章所述。这种编码允许你存储事件——例如,通过将其追加到文件、插入关系表或写入文档数据库。编码还允许你通过网络将事件发送到另一个节点进行处理。
在批处理中,文件被写入一次,然后可能被多个作业读取。类似地,在流处理术语中,事件由生产者(也称为发布者或发送者)生成一次,然后可能被多个消费者(订阅者或接收者)处理 [3]。在文件系统中,文件名标识一组相关记录;在流系统中,相关事件通常被分组到主题或流中。
原则上,文件或数据库足以连接生产者和消费者。生产者将其生成的每个事件写入数据存储,每个消费者定期轮询数据存储以检查上次运行以来出现的事件。这本质上就是批处理过程在每天结束时处理一天数据时所做的事情。
然而,当转向低延迟的持续处理时,如果数据存储不是为这种使用方式设计的,轮询会变得昂贵。你轮询得越频繁,返回新事件的请求占比就越低,因此开销就越高。相反,更好的做法是当新事件出现时通知消费者。
数据库传统上不太支持这种通知机制。关系数据库通常有触发器,可以对更改做出反应(例如,向表中插入一行),但它们的功能非常有限,并且在数据库设计中有点事后补救的意味 [4]。相反,专门用于传递事件通知的工具已经被开发出来。
消息系统
通知消费者新事件的常见方法是使用消息系统:生产者发送包含事件的消息,然后将其推送给消费者。我们在第189页的“事件驱动架构”中提到了这些系统,现在我们将更详细地了解它们。
像Unix管道或TCP连接这样的直接通信通道是生产者与消费者之间实现消息系统的一种简单方式。然而,大多数消息系统在此基本模型基础上进行了扩展。具体来说,Unix管道和TCP恰好连接一个发送者和一个接收者,而消息系统允许多个生产者节点向同一主题发送消息,并允许多个消费者节点接收主题中的消息。
在这个发布/订阅模型中,不同的系统采用了广泛的方法,没有一种正确的方法适用于所有目的。为了区分系统,提出以下两个问题特别有帮助:
如果生产者发送消息的速度快于消费者处理消息的速度,会发生什么?
大致来说,系统有三种选择:丢弃消息、在队列中缓冲消息或应用背压(也称为流控,这会阻塞生产者发送更多消息)。例如,Unix管道和TCP使用背压;它们有一个固定大小的缓冲区,如果它满了,发送者会被阻塞,直到接收者从缓冲区中取出数据(参见第353页的“网络拥塞和排队”)。
如果消息在队列中缓冲,那么了解当队列增长时会发生什么很重要。如果队列不再适合内存,系统会崩溃吗?还是会将消息写入磁盘?如果是后者,磁盘访问如何影响消息系统的性能 [5]?当磁盘满了会发生什么 [6]?
如果节点崩溃或暂时离线,消息是否会丢失?
与数据库一样,持久性可能需要结合写入磁盘和/或复制(参见第283页的边栏“复制与持久性”),这需要成本。如果你能承受偶尔丢失消息,那么在相同硬件上你可能会获得更高的吞吐量和更低的延迟。
消息丢失是否可以接受,在很大程度上取决于应用程序。例如,对于定期传输的传感器读数和指标,偶尔丢失一个数据点可能并不重要,因为一个更新的值将在短时间内再次发送。但要注意,如果大量消息被丢弃,可能不会立即察觉到指标不正确 [7]。如果你在计数事件,那么可靠传递更为重要,因为每丢失一条消息都意味着计数器不正确。
我们在第11章中探索的批处理系统的一个很好的特性是它们提供了强大的可靠性保证。失败的任务会自动重试,失败任务的部分输出会自动丢弃。这意味着输出与没有发生故障时相同,这有助于简化编程模型。在本章后面,我们将研究如何在流上下文中提供类似的保证。
从生产者到消费者的直接消息传递
许多消息系统使用生产者和消费者之间的直接网络通信,而不使用中间节点:
- UDP 多播 在金融行业被广泛用于低延迟至关重要的流,例如股票市场行情 [8]。尽管UDP本身不可靠,但应用层协议可以恢复丢失的数据包(生产者必须记住它已发送的数据包,以便在需要时重新传输它们)。
- 无代理消息库,如 ZeroMQ 和 nanomsg,采用类似的方法,通过TCP或IP多播实现发布/订阅消息传递。
第12章:流处理
- 一些指标收集代理(例如 StatsD [9])使用不可靠的 UDP 消息从网络上的所有机器收集指标并进行监控。(在 StatsD 协议中,只有所有消息都被接收时计数器指标才是正确的;使用 UDP 使得指标最多是近似值 [10]。另请参见第354页的“TCP 与 UDP”。)
- 如果消费者在网络上暴露一个服务,生产者可以直接发送 HTTP 或 RPC 请求(参见第180页的“通过服务的数据流:REST 和 RPC”)将消息推送给消费者。这就是 webhook [11] 背后的思想,一种模式:一个服务的回调 URL 注册到另一个服务,每当事件发生时,该服务就会向该 URL 发起请求。
尽管这些直接消息传递系统在其设计场景中运行良好,但它们通常要求应用程序代码意识到消息可能丢失。它们能够容忍的故障非常有限。即使协议检测到网络中丢失的数据包并重新传输,它们通常也假设生产者和消费者始终在线。
如果消费者离线,它可能会错过在其不可达期间发送的消息。某些协议允许生产者重试失败的消息投递,但如果生产者崩溃,丢失了本应重试的消息缓冲区,这种方法可能会失效。
消息代理
一种广泛使用的替代方案是通过消息代理(也称为消息队列)发送消息,它本质上是一种针对处理消息流而优化的数据库 [12]。它作为服务器运行,生产者和消费者作为客户端连接到它。生产者向代理写入消息,代理将消息投递给消费者。
通过在代理中集中管理数据,这些系统可以更容易地容忍客户端的连接、断开和崩溃,而持久性的问题则转移到了代理身上。某些消息代理仅在内存中保存消息,而其他代理(取决于配置)则将消息写入磁盘,以避免代理崩溃时丢失消息。面对缓慢的消费者,它们通常允许无界队列(而不是丢弃消息或使用背压),不过这个选择也可能取决于配置。
队列带来的一个后果是消费者通常是异步的。当生产者发送消息时,它通常只等待代理确认已缓冲该消息,而不等待消息被消费者处理完毕。消息投递给消费者将在未来的某个不确定时间点发生——通常在几分之一秒内,但如果存在队列积压,有时会显著延迟。
消息代理与数据库的比较
某些消息代理甚至可以使用 XA 或 JTA 参与两阶段提交协议(参见第328页的“跨不同系统的分布式事务”)。这一特性使它们在本质上与数据库非常相似,尽管消息代理和数据库在实际中仍存在重要区别:
- 数据库通常保留数据直到显式删除,而某些消息代理会在消息成功投递给消费者后自动删除该消息。此类消息代理不适用于长期数据存储。
- 由于快速删除消息,大多数消息代理假设其工作集相当小——即队列很短。如果因为消费者缓慢而需要代理缓冲大量消息(可能因为内存不足而将消息溢出到磁盘),每条消息的处理时间会变长,整体吞吐量可能下降 [5]。
- 数据库通常支持二级索引以及使用查询语言搜索数据的多种方式,而消息代理通常支持某种方式订阅与模式匹配的某个主题子集。两者本质上都是客户端选择它想了解的数据部分的方式,但数据库通常提供更高级的查询功能。
- 查询数据库时,结果通常基于数据在某个时间点的快照。如果另一个客户端随后写入数据库并改变了查询结果,第一个客户端不会发现其先前的结果现在已经过时(除非它重复查询或轮询变更)。相比之下,消息代理不支持任意查询,也不允许消息发送后更新,但它们会在数据变化时(即新消息可用时)通知客户端。
这是消息代理的传统观点,它被封装在如 JMS [13] 和 AMQP [14] 等标准中,并在 RabbitMQ、ActiveMQ、HornetQ、Qpid、TIBCO Enterprise Message Service、IBM MQ、Azure Service Bus 和 Google Cloud Pub/Sub [15] 等软件中实现。虽然可以使用数据库作为队列,但调整它们以获得良好性能并不简单 [16]。
多个消费者
当多个消费者读取同一主题的消息时,通常使用两种消息传递模式,如图12-1所示:
- 负载均衡:每条消息被投递给其中一个消费者,因此消费者可以分担处理该主题消息的工作。代理可以任意地将消息分配给消费者。这种模式适用于消息处理开销较大时,因此希望通过添加消费者来并行处理。(在 AMQP 中,可以通过让多个客户端从同一队列消费来实现负载均衡,在 JMS 中称为共享订阅。)
- 扇出:每条消息被投递给所有消费者。扇出允许几个独立的消费者各自“收听”同一广播消息,而不相互影响——这相当于多个批处理作业读取同一个输入文件的流式等价物。(此功能由 JMS 中的主题订阅和 AMQP 中的交换绑定提供。)
图12-1. (a) 负载均衡在消费者之间共享消费主题的工作;(b) 扇出模式下,每条消息被投递给多个消费者。
这两种模式可以组合——例如,使用 Kafka 的消费者组功能。当一个消费者组订阅一个主题时,主题中的每条消息被发送给组内的一个消费者(在组内进行负载均衡)。如果两个不同的消费者组订阅同一个主题,每条消息被发送给每个组中的一个消费者(跨消费者组进行扇出)。
确认与重新投递
消费者可能随时崩溃。因此,代理可能将一条消息投递给消费者,但消费者从未处理它,或者处理了一部分就崩溃了。为确保消息不丢失,消息代理使用确认机制:客户端必须显式告知代理它已完成消息处理,这样代理才能从队列中移除该消息。
如果到客户端的连接关闭或超时,而代理未收到确认,则假定该消息未被处理,因此它会将该消息重新投递给另一个消费者。(注意,实际可能消息已被完全处理,但确认在网络中丢失。处理这种情况需要原子提交协议,如第329页“仅一次消息处理”中所讨论的,除非该操作是幂等的或不需要仅一次语义。)
当与负载均衡结合时,这种重新投递行为会对消息顺序产生有趣的影响。在图12-2中,消费者通常按生产者发送的顺序处理消息。然而,消费者2在处理消息m3时崩溃,同时消费者1正在处理消息m4。未被确认的消息m3随后被重新投递给消费者1,结果消费者1处理消息的顺序为m4, m3, m5。因此,m3和m4的投递顺序与生产者1发送的顺序不同。
图12-2. 消费者2在处理m3时崩溃,因此稍后它将重新投递给消费者1。
即使消息代理在其他方面努力保持消息顺序(如JMS和AMQP标准所要求的),负载均衡与重新投递的组合不可避免地会导致消息重新排序。要避免此问题,可以为每个消费者使用单独的队列(即不使用负载均衡功能)。如果消息彼此完全独立,消息重新排序不是问题;但如果消息之间存在因果依赖关系,则可能非常重要,我们将在本章后面看到。
494 |
第十二章:流处理
重新投递带来的问题与死信队列
重新投递还可能导致流中的资源浪费、资源枯竭或永久阻塞。一个常见的情景是生产者不当序列化消息——例如,在 JSON 编码的对象中遗漏了必需的键。如果缺少该键的消息导致消费者崩溃并重启,消费者将不会确认该消息,因此代理会重新发送它,这会导致另一个消费者失败。这个循环会无限重复。如果代理保证强顺序性,则无法再取得任何进展。允许消息重新排序的代理可以继续取得进展,但会在永远无法确认的消息上浪费资源。
死信队列(Dead Letter Queues,DLQ)用于处理此问题。不是将消息保留在当前队列中并无限重试,而是将消息移动到另一个队列以解除消费者阻塞 [17, 18]。通常在 DLQ 上设置监控——队列中的任何消息都表示一个错误。一旦检测到新消息,操作员可以决定永久丢弃它、手动修改并重新生成消息,或修复消费者代码以适当处理该消息。DLQ 在大多数排队系统中很常见,但现在基于日志的消息系统(如 Apache Pulsar)和流处理系统(如 Kafka Streams)也支持它们 [19]。
基于日志的消息代理
通过网络发送数据包或向网络服务发出请求通常是一种瞬态操作,不会留下永久痕迹。尽管可以永久记录此类操作(使用数据包捕获和日志记录),但我们通常不这么认为。AMQP/JMS 风格的消息代理继承了这种瞬态消息传递的思维方式。即使它们可能将消息写入磁盘,在将消息传递给消费者后,它们也会很快再次删除这些消息。
数据库和文件系统采用相反的方法:通常期望写入数据库或文件的所有内容都被永久记录,至少直到有人明确选择再次删除它。
这种思维方式的差异对派生数据的创建方式有很大影响。正如第 11 章所讨论的,批处理的一个关键特性是可以重复运行它们,试验处理步骤,而不会对输入造成损害(因为输入是只读的)。AMQP/JMS 风格的消息传递则并非如此:如果确认消息会导致代理将其删除,则接收消息是破坏性的,因此无法再次运行同一个消费者并期望获得相同的结果。
如果向消息系统添加新的消费者,它通常只开始接收自注册时间之后发送的消息;任何先前的消息都已消失且无法恢复。相比之下,文件和数据库允许随时添加新客户端,并且它可以读取任意时间之前写入的数据(只要应用程序未明确覆盖或删除)。
为什么不能混合使用数据库的持久存储方法和消息传递的低延迟通知功能?这就是基于日志的消息代理背后的想法,近年来变得非常流行。
使用日志进行消息存储
日志只是磁盘上仅追加的记录序列。我们之前在第 4 章中讨论了日志结构存储引擎和预写日志的上下文,在第 6 章中讨论了复制的上下文,在第 10 章中讨论了共识的一种形式。
相同的结构可用于实现消息代理。生产者通过将消息追加到日志末尾来发送消息,消费者通过顺序读取日志来接收消息。如果消费者到达日志末尾,它会等待新消息追加的通知。Unix 工具 tail -f(监视文件并等待数据追加)的工作方式与此类似。
为了扩展到单个磁盘无法提供的高吞吐量,可以对日志进行分片(在第 7 章的意义上)。然后可以将不同的分片托管在不同的机器上,使每个分片成为一个独立的日志,可以独立于其他分片进行读写,并且可以将主题定义为一组分片,所有分片都携带相同类型的消息。图 12-3 说明了这种方法。
图 12-3. 生产者通过将消息追加到主题分区文件来发送消息,消费者顺序读取这些文件。
在每个分片内(Kafka 称之为分区),代理为每条消息分配一个单调递增的序列号或偏移量(在图 12-3 中,方框中的数字是消息偏移量)。这样的序列号是有意义的,因为分区(分片)是仅追加的,因此分区内的消息是完全有序的。不同分区之间没有顺序保证。
Apache Kafka [20] 和 Amazon Kinesis Streams 是像这样工作的基于日志的消息代理。Google Cloud Pub/Sub 在架构上类似,但公开了 JMS 风格的 API,而不是日志抽象 [15]。尽管这些消息代理将所有消息写入磁盘,但它们通过跨多台机器分片来实现每秒数百万条消息的吞吐量,并通过复制消息来实现容错 [21, 22]。
日志与传统消息传递的比较
基于日志的方法轻松支持扇出消息传递,因为多个消费者可以独立读取日志而互不影响;读取消息不会将其从日志中删除。为了在消费者组之间实现负载均衡,代理可以将整个分片分配给消费者组中的节点,而不是将单个消息分配给消费者客户端。
然后,每个客户端依序消费其被分配的分片中的所有消息。通常,当一个消费者被分配了一个日志分片时,它会以简单的单线程方式顺序读取该分片中的消息。这种粗粒度的负载均衡方法有以下缺点:
- 共享消费主题工作的节点数最多只能等于该主题中的日志分片数,因为同一分片内的消息被传递到同一节点。(可以创建一种负载均衡方案,其中两个消费者通过读取完整消息集来共享处理分片的工作,但一个只考虑偶数偏移的消息,另一个处理奇数偏移的消息。或者,也可以将消息处理分散到线程池中,但这种方法会使消费者偏移管理复杂化。通常,单线程处理分片更可取,并且可以通过使用更多分片来增加并行性。)
- 如果单个消息处理缓慢,则会阻塞该分片中后续消息的处理(一种队头阻塞形式;参见第 37 页的“描述性能”)。
因此,当消息处理代价高昂并且希望逐条消息并行处理,且消息顺序不那么重要时,JMS/AMQP 风格的消息代理更可取。另一方面,在消息吞吐量高、每条消息处理速度快且消息顺序重要的场景下,基于日志的方法效果非常好 [23, 24]。然而,这两种架构之间的区别正在变得模糊,因为基于日志的消息系统(如 Kafka)现在支持 JMS/AMQP 风格的消费者组,允许多个消费者从同一个分区接收消息 [25, 26]。
由于分片日志通常仅在单个分片内保留消息顺序,因此所有需要一致排序的消息都需要路由到同一个分片。例如,应用程序可能要求与特定用户相关的事件以固定顺序出现。这可以通过根据事件的用户 ID 选择事件的分片来实现(换句话说,使用户 ID 成为分区键)。
消费者偏移量
顺序消费分片可以很容易地判断哪些消息已被处理。所有偏移量小于消费者当前偏移量的消息都已处理,所有偏移量更大的消息尚未被看到。因此,代理不需要跟踪每条消息的确认,而只需定期记录消费者偏移量。这种方法减少了簿记开销,并提供了批处理和流水线操作的机会,有助于提高基于日志的系统的吞吐量。然而,如果消费者失败,它将从最后记录的偏移量恢复,而不是它看到的最后偏移量。这可能导致消费者再次看到某些消息两次。
实际上,偏移量非常类似于单主数据库复制中常见的日志序列号,我们在第 201 页的“设置新跟随者”中讨论过这一点。在数据库复制中,日志序列号允许跟随者在断开连接后重新连接到领导者,并恢复复制而不跳过任何写入。这里使用了完全相同的原理:消息代理的行为类似于领导者数据库,消费者行为类似于跟随者。
如果消费者节点发生故障,消费者组中的另一个节点将被分配给故障消费者的分片,并从最后记录的偏移量开始消费消息。如果消费者已处理后续消息但尚未记录其偏移量,则这些消息在重新启动时将再次被处理。我们将在本章后面讨论处理此问题的方法。
磁盘空间使用
如果只向日志追加数据,最终会用尽磁盘空间。为了回收磁盘空间,日志被分成段,并定期删除旧段或将其移动到存档存储。(我们将在第 505 页的“日志压缩”中讨论一种更复杂的释放磁盘空间的方法。)
这意味着,如果慢速消费者无法跟上消息速率,并且落后太多以至于其消费者偏移量指向已删除的段,它将错过一些消息。实际上,日志实现了一个有界大小的缓冲区,当缓冲区满时会丢弃旧消息,也称为循环缓冲区或环形缓冲区。然而,由于该缓冲区在磁盘上,它可以非常大。
第12章:流处理
让我们做一个粗略的估算。在撰写本文时,一块典型的大容量硬盘容量为20 TB,顺序写入吞吐量为250 MB/s。如果以最快速度写入消息,大约22小时后硬盘就会写满,需要开始删除最旧的消息。这意味着基于磁盘的日志总能至少缓冲22小时的消息,即使有许多磁盘和多台机器(磁盘越多,可用空间和总写入带宽都增加)。实践中,部署很少会耗尽磁盘的全部写入带宽,因此日志通常可以保持数天甚至数周的消息缓冲区。
许多基于日志的消息代理现在将消息存储在对象存储中,以增加其存储容量,类似于数据库(参见第202页的“由对象存储支持的数据库”)。像Apache Kafka和Redpanda这样的消息代理,将较旧的消息从对象存储中提供出来,作为其分层层存储的一部分。其他如WarpStream、Confluent Freight和Bufstream,则将所有数据存储在对象存储中。除了成本效率,这种架构还使数据集成更容易:对象存储中的消息以Iceberg表的形式存储,这使得批处理和数据仓库作业可以直接在数据上执行,而无需将数据复制到另一个系统。
当消费者无法跟上生产者时
在第489页“消息系统”的开头,我们讨论了如果消费者无法跟上生产者发送消息的速率时的三种选择:丢弃消息、缓冲或应用背压。在这种分类中,基于日志的方法是一种缓冲形式,具有大但固定大小的缓冲区(受可用磁盘空间限制)。
如果消费者落后太多,以至于它需要的消息比磁盘上保留的消息更旧,那么它将无法读取这些消息——因此代理实际上会丢弃比缓冲区大小所能容纳更早的旧消息。你可以监控消费者落后日志头部的距离,并在其落后显著时发出警报。由于缓冲区很大,操作员有足够的时间修复慢速消费者,使其在开始丢失消息之前赶上。
即使消费者确实落后太多并开始丢失消息,也只有该消费者受到影响;它不会干扰其他消费者的服务。这是一个很大的操作优势。你可以出于开发、测试或调试目的,实验性地消费生产日志,而不必担心破坏生产服务。当消费者关闭或崩溃时,它停止消耗资源——唯一剩下的就是它的消费者偏移量。
发送事件流 | 499
这种行为与传统消息代理形成对比,在传统消息代理中,你需要小心删除任何消费者已关闭的队列,以避免它们不必要地累积消息并从活动消费者那里占用内存。
重播旧消息
我们之前提到,在AMQP和JMS风格的消息代理中,处理和确认消息是一种破坏性操作,因为它会导致消息在代理上被删除。另一方面,在基于日志的消息代理中,消费消息更像是从文件中读取:它是一种只读操作,不会改变日志。
处理的唯一副作用(除了消费者的任何输出)是消费者偏移量向前移动。但偏移量由消费者控制,因此必要时可以轻松操作——例如,你可以启动一个使用昨日偏移量的消费者副本,并将输出写入不同的位置,以重新处理过去一天的消息。你可以任意多次重复此操作,并更改处理代码。
这一方面使得基于日志的消息传递更类似于上一章讨论的批处理过程,其中派生数据通过可重复的转换过程与输入数据清晰分离。它允许更多的实验,更容易从错误和缺陷中恢复,使其成为组织内集成数据流的良好工具[27]。
数据库与流
我们已经对消息代理和数据库进行了一些比较。尽管传统上它们被视为不同的工具类别,但我们看到基于日志的消息代理成功地借鉴了数据库的思想并将其应用于消息传递。我们也可以反过来做,将消息和流的思想应用于数据库。
一种方法是使用事件流作为存储数据的记录系统(参见第10页的“记录系统与派生数据”)。这正是我们在第101页的“事件溯源与CQRS”中讨论的事件溯源。与其通过更新和删除来改变数据模型来存储数据,不如将每个状态变化建模为不可变的事件,并将其写入仅追加日志。任何读优化的物化视图都是通过这些事件派生出来的。基于日志的消息代理(配置为从不删除旧事件)非常适合事件溯源,因为它们使用仅追加存储,并且可以以低延迟通知消费者新事件。
但你不需要完全采用事件溯源;即使使用可变数据模型,事件流对数据库也很有用。事实上,每次对数据库的写入都是一个可以被捕获、存储和处理的事件。数据库与流之间的联系不仅仅是磁盘上日志的物理存储——它相当根本。
例如,复制日志(见第206页“复制日志的实现”)是一个数据库写入事件流,由主节点在处理事务时产生。从节点应用该写入流到自己的数据库副本,从而最终得到相同数据的准确副本。复制日志中的事件描述了发生的数据变化。
我们还遇到了第433页“使用共享日志”中的状态机复制原则,该原则指出:如果每个事件表示对数据库的一次写入,并且每个副本以相同顺序处理相同的事件,那么所有副本最终将达到相同的最终状态。(处理事件被认为是确定性操作。)这只是事件流的另一个例子!
在本节中,我们首先将研究异构数据系统中出现的一个问题,然后探讨如何通过将事件流的思想引入数据库来解决它。
保持系统同步
正如我们在整本书中看到的,没有单一系统能够满足所有数据存储、查询和处理需求。在实践中,大多数重要的应用需要结合多种技术来满足其需求——例如,使用OLTP数据库来服务用户请求,使用缓存来加速常见请求,使用全文索引来处理搜索查询,以及使用数据仓库进行分析。每个系统都有自己的数据副本,存储在自己优化过的表示形式中。
由于相同或相关的数据出现在多个地方,它们需要彼此保持同步。如果数据库中的某个条目被更新,它也需要在缓存、搜索索引和数据仓库中更新。对于数据仓库,这种同步通常由ETL过程执行(见第7页“数据仓库”),通常是通过获取数据库的完整副本、转换它、然后批量加载到数据仓库中——换句话说,是一个批处理过程。类似地,我们在第476页“批处理用例”中看到,搜索索引、推荐系统和其他派生数据系统可以使用批处理过程创建。
如果定期的数据库完整转储太慢,有时使用的替代方案是双重写入,即应用程序代码在数据变化时显式写入每个系统——例如,先写入数据库,然后更新搜索索引,再使缓存条目失效(甚至并发执行这些写入)。
数据库与流 | 501
然而,双重写入存在严重问题,其中之一是图12-4所示的竞态条件。在此示例中,两个客户端并发想要更新项X。客户端1想将值设置为A,客户端2想设置为B。两个客户端首先将新值写入数据库,然后写入搜索索引。由于不巧的时间,请求交织在一起。数据库首先看到客户端1将值设置为A的写入,然后看到客户端2将值设置为B的写入,因此数据库中的最终值为B。搜索索引首先看到客户端2的写入,然后看到客户端1的写入,因此搜索索引中的最终值为A。现在这两个系统永久不一致,即使没有发生错误。
除非你有额外的并发检测机制,例如我们在第237页“检测并发写入”中讨论的版本向量,否则你甚至不会注意到并发写入的发生。一个值只会静默地覆盖另一个值。
双重写入的另一个问题是,其中一个写入可能失败而另一个成功。这是一个容错问题而非并发问题,但同样会导致两个系统彼此不一致。确保两者要么都成功要么都失败是原子提交问题的一个实例,解决起来代价高昂(见第324页“两阶段提交”)。
如果你只有一个带单一主节点的复制数据库,那么该主节点决定写入顺序,因此在数据库副本之间状态机复制方法是可行的。然而,在图12-4中,并没有单一主节点。数据库可能有一个主节点,搜索索引可能有一个主节点,但两者互不跟随,因此可能发生冲突(见第215页“多主节点复制”)。
如果确实只有一个主节点——例如数据库——并且我们可以使搜索索引成为数据库的从节点,那么情况会更好。但这在实践中可能吗?
第12章:流处理
变更数据捕获
大多数数据库的复制日志存在一个根本问题:长期以来,它们被视为数据库的内部实现细节,而非公共API。客户端应通过数据模型和查询语言来查询数据库,而不是解析复制日志并尝试从中提取数据。
数十年来,许多数据库根本没有提供一种有文档记录的方式来获取写入其中的变更日志。这使得将数据库中的所有更改提取出来,并复制到不同的存储技术(如搜索索引、缓存或数据仓库)变得十分困难。
最近,人们对变更数据捕获(Change Data Capture,CDC)的兴趣日益增长。CDC是一个过程,它观察写入数据库的所有数据变更,并将其提取为一种可以复制到其他系统的形式[28]。当变更在写入时立即以流的形式提供时,CDC尤其有趣。
例如,你可以捕获数据库中的变更,并持续将相同的变更应用到搜索索引。如果变更日志按相同顺序应用,你可以预期搜索索引中的数据与数据库中的数据保持一致。搜索索引和其他任何派生数据系统都只是变更流的消费者。
图12-5展示了如何用CDC解决图12-4中的并发问题。即使两个分别将X设为A和B的请求并发到达数据库,数据库会决定其执行顺序,并按该顺序写入复制日志。搜索索引拾取这些变更,并按相同顺序应用。如果你需要另一个系统中的数据(如数据仓库),只需将其添加为CDC事件流的另一个消费者即可。
图12-5
将提交到数据库的变更,按相同顺序传播给下游系统。 (图中描述:双主复制中的并发写入问题,通过CDC将数据库作为单一主节点,搜索索引作为从节点跟随变更日志解决。)
实现CDC
我们可以将日志消费者称为派生数据系统,正如第10页“记录系统与派生数据”中讨论的那样。存储在搜索索引和数据仓库中的数据仅仅是记录系统中数据的另一种视图。CDC是一种确保记录系统所做的所有变更也能反映在派生数据系统中的机制,从而使派生系统拥有数据的准确副本。
本质上,CDC使一个数据库成为领导者(从中捕获变更的数据库),并将其他数据库变为追随者。基于日志的消息代理非常适合将变更事件从源数据库传输到派生系统,因为它能保持消息的顺序(避免了图12-2中的重新排序问题)。
可以使用逻辑复制日志来实现CDC(参见第208页“逻辑(基于行的)日志复制”),尽管这会带来一些挑战,例如处理模式变更和正确建模更新。开源项目Debezium致力于解决这些挑战。该项目包含针对MySQL、PostgreSQL、Oracle、SQL Server、Db2、Cassandra以及许多其他数据库的源连接器。这些连接器连接到数据库的复制日志,并以标准事件模式的形式呈现变更。然后,消息可以经过转换并写入下游数据库。
Kafka Connect框架也为多种数据库提供了CDC连接器。Maxwell通过解析MySQL的binlog实现了类似的功能[29],GoldenGate为Oracle提供了类似设施,pgcapture则为PostgreSQL提供了相同功能。
与消息代理类似,CDC通常是异步的:记录系统数据库在提交变更之前,不会等待消费者应用该变更。这种设计在运维上的优点是:添加一个慢速消费者不会对记录系统造成太大影响;但其缺点是,复制延迟的所有问题(参见第209页“复制延迟问题”)都会显现。
初始快照
如果你拥有数据库中曾经发生的所有变更日志,你可以通过重放日志来重建数据库的完整状态。然而,在许多情况下,永久保留所有变更将需要过多的磁盘空间,并且重放它们会花费太长时间,因此日志需要被截断。
例如,构建一个全新的全文索引需要数据库的完整副本。仅应用近期变更的日志是不够的,因为它会缺失那些近期未更新的条目。因此,如果你没有完整的日志历史记录,则需要从一个一致的快照开始,如第201页“设置新从节点”中所述。
数据库的快照必须对应变更日志中一个已知的位置或偏移量,这样在处理完快照后,你才能知道从哪个点开始应用变更。一些CDC工具集成了此快照功能,而其他工具则将其留作手动操作。Debezium使用Netflix的DBLog水印算法来提供增量快照[30, 31]。
日志压缩
如果你只能保留有限数量的日志历史记录,那么每次添加新的派生数据系统时,都需要执行快照过程。然而,日志压缩提供了一个很好的替代方案。
我们之前在第118页“日志结构存储”中,结合日志结构存储引擎讨论过日志压缩(参见图4-3中的示例)。其原理很简单:存储引擎定期查找具有相同键的日志记录,丢弃所有重复项,并只保留每个键的最新更新。这可以使日志段变得小得多,因此这些段也可以在压缩过程中合并,如图12-6所示。这个过程在后台运行。
图12-6
在这个键值对日志中,键是猫视频的ID(mew、purr、scratch或yawn),值是它被播放的次数;日志压缩只保留每个键的最新值。 (图中描述:日志压缩过程,重复键的早期版本被丢弃,只保留最近更新。)
在日志结构存储引擎中,带有特殊空值(墓碑标记)的更新表示某个键已被删除,并导致它在日志压缩期间被移除。但只要一个键没有被覆盖或删除,它就会永远保留在日志中。这样一个压缩日志所需的磁盘空间仅取决于数据库的当前内容,而不是数据库中曾经发生的写入次数。如果同一个键被频繁覆盖,先前的值最终会被垃圾回收,只保留最新的值。
同样的思路也适用于基于日志的消息代理和CDC。如果CDC系统的设置使得每个变更都有一个主键,并且每个键的更新都替换了该键的先前值,那么仅保留特定键的最新写入就足够了。
现在,每当你想重建派生数据系统(如搜索索引)时,你可以从日志压缩主题的偏移量0处启动一个新的消费者,并按顺序扫描日志中的所有消息。该日志保证包含数据库中每个键的最新值(可能还有一些较旧的值)。换句话说,你可以使用它来获取数据库内容的完整副本,而无需对CDC源数据库进行另一次快照。
Apache Kafka支持此日志压缩功能。正如我们将在本章后面看到的,它允许消息代理用于持久化存储,而不仅仅是瞬态消息传递。
变更流的API支持
现在,大多数流行的数据库都将变更流作为一等接口暴露出来,而不是像过去那样进行事后逆向工程的CDC工作。诸如MySQL和PostgreSQL之类的关系型数据库,通常会通过它们用于自身副本的同一个复制日志来发送变更。大多数云厂商也为其产品提供CDC解决方案——例如,Datastream为Google Cloud的关系型数据库和数据仓库提供了流式数据访问。
即使是像Cassandra这样最终一致、基于仲裁的数据库,现在也支持CDC。正如我们在第411页“实现线性化系统”中看到的,客户端必须将写入持久化到多数节点后,写入才被视为可见。对于仲裁写入的CDC支持具有挑战性,因为没有一个单一的事实来源可供订阅。数据是否可见取决于每个读取器的一致性偏好。Cassandra通过为每个节点暴露原始日志段来规避这个问题,而不是提供单一的变更流。希望消费数据的系统必须读取每个节点的原始日志段,并决定如何最好地将其合并为单一流(这很像仲裁读取器的工作方式)[32]。
Kafka Connect [33] 将多种数据库系统的CDC工具与Kafka集成在一起。一旦变更事件流进入Kafka,它就可以用于更新派生数据系统(如搜索索引),并输入到流处理系统中(本章后面将讨论)。
第十二章:流处理
CDC 与事件溯源的对比
CDC 与事件溯源有何不同?与 CDC 类似,事件溯源涉及将应用程序状态的所有变更存储为变更事件的日志。最大的区别在于,事件溯源在不同的抽象层次上应用了这一思想:
- 在 CDC 中,应用程序以可变方式使用数据库,可以随意更新和删除记录。变更日志是从数据库的低层(例如,通过解析复制日志)提取的,这确保了从数据库提取的写入顺序与它们实际写入的顺序一致,避免了图 12-4 中的竞态条件。
- 在 事件溯源 中,应用程序逻辑明确构建在写入事件日志的不可变事件之上。在这种情况下,事件存储是仅追加的,不鼓励或禁止更新或删除事件。事件旨在反映应用层面发生的事情,而非低层的状态变更。
哪种方式更好取决于你的具体情况。对于一个尚未采用事件溯源的应用程序来说,采用它是一个重大改变;它有许多利弊,我们在第 101 页的“事件溯源与 CQRS”中讨论过。相比之下,CDC 可以以最小的更改添加到现有数据库中;写入数据库的应用程序甚至可能不知道 CDC 正在发生。
变更数据捕获与数据库模式
尽管 CDC 看起来比事件溯源更容易采用,但它也面临一系列挑战。在微服务架构中,数据库通常只由一个服务访问。其他服务通过该服务的公共 API 与之交互,通常不会直接访问数据库。这使得数据库成为服务的内部实现细节,允许开发人员更改其数据库模式而不影响公共 API。
然而,CDC 系统在复制数据时通常使用上游数据库的模式,这使得这些模式变成了公共 API,必须像服务的公共 API 一样进行管理。删除数据库表中的列会破坏依赖该字段的下游消费者。这类挑战在数据管道中一直存在,但通常只影响数据仓库的 ETL。由于 CDC 通常实现为数据流,其他生产服务也可能成为消费者。此类消费者的故障可能导致面向客户的停机 [34]。数据契约通常用于防止这些故障。
一种将内部模式与外部模式解耦的常见方法是使用发件箱模式。发件箱是具有自己模式的表,它们暴露给 CDC 系统,而不是数据库中的内部领域模型 [35, 36]。开发人员可以根据需要修改内部模式,同时保持发件箱表不变。这看起来像是双写——确实如此。然而,发件箱避免了我们在第 501 页“保持系统同步”中讨论的挑战,将两次写入保留在同一个系统(数据库)中。这种设计允许两次写出现在单个事务中。
不过,发件箱也存在一些权衡。开发人员仍然需要维护内部模式与发件箱模式之间的转换,这可能具有挑战性。发件箱还增加了数据库必须写入底层存储的数据量,可能引发性能问题。
与 CDC 类似,重放事件日志可以重建系统的当前状态。然而,日志压缩需要以不同的方式处理:
- CDC 中,更新记录的事件通常包含记录的整个新版本,因此主键的当前值完全由该主键最近的事件决定,日志压缩可以丢弃同一键的先前事件。
- 相反,在事件溯源中,事件在更高层次上建模。事件通常表达用户操作的意图,而不是作为操作结果发生的状态更新的机制。在这种情况下,后续事件通常不会覆盖先前事件,因此你需要完整的事件历史来重建最终状态。日志压缩不能以相同的方式进行。
使用事件溯源的应用程序通常有一种机制来存储从事件日志派生的当前状态的快照,这样它们就不需要重复处理整个日志。然而,这只是在读取和崩溃恢复时加速读取的性能优化;系统的意图是能够永久存储所有原始事件,并在需要时重新处理整个事件日志。我们在第 512 页的“不可变性的限制”中讨论了这一假设。
状态、流与不可变性
我们在第 11 章中看到,批处理受益于其输入文件的不可变性,因此你可以在现有输入文件上运行实验性处理作业,而不必担心损坏它们。这种不可变性原则也是事件溯源和 CDC 如此强大的原因。
我们通常认为数据库存储的是应用程序的当前状态。这种表示针对读取进行了优化,通常最适合查询。状态的性质是它会发生变更,因此数据库支持更新、删除和插入数据。这与不可变性如何兼容?
每当状态发生变更时,该状态都是随时间变化而改变事件的结果。例如,你当前可用座位的列表是你已处理的预订的结果,当前账户余额是账户上的贷项和借项的结果,而你的 Web 服务器的响应时间图是所有已发生 Web 请求的个体响应时间的聚合。
无论状态如何变化,总有一个事件序列导致了那些变化。即使事情做了又撤销,这些事件发生过的事实仍然成立。关键思想是,可变状态和不可变事件的仅追加日志并不矛盾;它们是同一枚硬币的两面。所有变更的日志(或 changelog)代表了状态随时间的演化。
如果你有数学倾向,你可能会说:应用程序状态是你随时间对事件流进行积分得到的结果,而变更流是你随时间对状态进行微分得到的结果,如图 12-7 所示 [37, 38]。这个类比存在局限性(例如,状态的二阶导数似乎没有意义),但它是一个思考数据的有用起点。
图 12-7:当前应用程序状态与事件流之间的关系
如果你持久化存储变更日志,这实际上使状态变得可复现。如果你将事件日志视为你的记录系统,并将任何可变状态视为派生于它,那么推理数据如何在系统中流动就变得更容易。正如 Jim Gray 和 Andreas Reuter 在 1992 年所言 [39]:
根本不需要保留数据库;日志包含了所有信息。存储数据库(即日志的当前末端)的唯一原因是提高检索操作的性能。
日志压缩是弥合日志与数据库状态之间区别的一种方式。压缩仅保留每个记录的最新版本,并丢弃被覆盖的版本。
不可变事件的优势
数据库中的不可变性是一个古老的想法。例如,会计师在金融记账中已经使用不可变性数百年。当发生交易时,它被记录在一个仅追加的分类账中,这本质上是一个描述货币、商品或服务易手的事件日志。账户,如损益表或资产负债表,是通过对分类账中的交易进行汇总得出的 [40]。
如果发生错误,会计师不会擦除或更改分类账中的错误交易。相反,他们添加另一笔交易来补偿错误——例如,退还错误的收费。错误的交易永远保留在分类账中,因为它可能对审计很重要。如果从错误分类账中得出的错误数字已经发布,则下一个会计期间的数字会包含更正。这个过程在会计中是完全正常的 [41]。
尽管这种可审计性在金融系统中尤其重要,但对于许多不受如此严格监管的其他系统也是有益的。如果你意外部署了有缺陷的代码,向数据库写入了错误数据,如果代码能够破坏性地覆盖数据,恢复就会困难得多。有了不可变事件的仅追加日志,诊断问题和从问题中恢复就更容易了。同样,客户服务可以使用审计日志来诊断客户请求和投诉。
不可变事件捕获的信息也远不止当前状态。例如,在购物网站上,客户可能将商品加入购物车,然后又移除。尽管从订单履行的角度来看,第二个事件抵消了第一个事件,但从分析角度知道客户曾考虑过某个商品但最终决定不买可能是有用的。也许他们将来会购买,或者他们找到了替代品。这些信息记录在事件日志中,但在数据库(当商品被移除时删除它们)中则会丢失。
从同一事件日志中派生多个视图
通过分离可变状态与不可变事件日志,你可以从同一事件日志中派生多个面向读取的表示。这就像拥有流的多个消费者一样(图 12-5)——例如,分析数据库 Druid 使用这种方法直接从 Kafka 摄取数据,Kafka Connect 的接收器可以将数据从 Kafka 导出到各种数据库和索引 [33]。
拥有从事件日志到数据库的显式转换步骤,使得随着时间的推移更容易演进你的应用程序。如果你想引入一个新功能,以新的方式呈现现有数据,你可以使用事件日志为新功能构建一个单独的读取优化视图,并与现有系统并行运行,而无需修改它们。同时运行新旧系统通常比在现有系统中执行复杂的模式迁移更容易。一旦读者切换到新系统并且旧系统不再需要,你可以直接关闭它并回收其资源 [42, 43]。
我们在第 101 页的“事件溯源与 CQRS”中遇到过这样的想法:以一种写优化的形式写入数据,然后根据需要将其转换为不同的读优化表示。这是存储事件的核心思想——事件是发生了什么的事实记录。存储所有发生的事件,而不是记录两件事之间的当前关系,可以避免许多问题:
- 它可以更容易地检测因果关系,因为事件发生的时间顺序被保留(参见“时间顺序的保证”)。
- 它保留了更丰富的信息,基于这些信息可以推导出复杂的派生(参见“批处理工作流的输出”)。
要点:将状态视为事件流的派生结果,使得系统更具弹性、可审计性和演进能力。不可变事件日志是支撑事件溯源和 CDC 的核心。
第12章:流处理
与CQRS的事件溯源
我们在第101页的“事件溯源与CQRS”中遇到了一种理念:以写优化形式写入数据,再根据需要将其转换为不同的读优化表示形式。这一过程并不一定需要事件溯源;你也可以从CDC事件流中构建多个物化视图 [44]。
传统的数据库和模式设计方法基于一个错误假设:数据的写入形式必须与查询形式相同。关于规范化与反规范化的争论(见第72页的“规范化、反规范化和连接”),如果你能够将数据从写优化的事件日志转换为读优化的应用程序状态,那么这些争论在很大程度上就变得无关紧要了。在读优化的视图中对数据进行反规范化是完全合理的,因为转换过程提供了一种机制,使其与事件日志保持一致。
在第34页的“案例研究:社交网络主页时间线”中,我们讨论了一个社交网络的主页时间线,即用户关注的人最近帖子的缓存(类似邮箱)。这是读优化状态的另一个例子:主页时间是高度反规范化的,因为你的帖子会被复制到所有关注你的用户的时间线中。然而,扇出服务使这种重复状态与新帖子和新关注关系保持同步,从而使重复变得可管理。
并发控制
CQRS最大的缺点在于:事件日志的消费者通常是异步的,因此用户可能先写入日志,然后从派生视图中读取,却发现其写入尚未在视图中反映。我们在第210页的“读取你自己的写入”中讨论了这个问题及潜在的解决方案。
一种解决方案是将读视图的更新与事件追加到日志同步执行。这需要在事件日志和派生视图之间进行分布式事务,或者某种方式等待事件在视图中被反映。这两种方法通常都不切实际,因此视图通常是异步更新的。
另一方面,从事件日志推导当前状态也简化了并发控制的某些方面。多对象事务的需求(见第284页的“单对象和多对象操作”)通常源于单个用户操作需要在多个位置更改数据。使用事件溯源,你可以将事件设计为对用户操作的完整描述。这样一来,用户操作只需要在单个位置进行一次写入——即将事件追加到日志中——这很容易实现原子性。
如果事件日志和应用程序状态以相同方式分片(例如,处理分片3中客户的事件只需要更新应用程序状态的分片3),那么一个简单的单线程日志消费者就不需要对写入进行并发控制。通过构造,它一次只处理一个事件(见第309页的“实际串行执行”)。日志通过定义分片中事件的串行顺序,消除了并发的非确定性 [27]。如果一个事件涉及多个状态分片,则需要额外的工作,我们将在第13章中讨论。
许多不使用事件溯源模型的系统仍然依赖不可变性来进行并发控制。各种数据库内部使用不可变数据结构或多版本数据来支持时间点快照(见第298页的“索引与快照隔离”)。版本控制系统(如Git、Mercurial和Fossil)也依赖不可变数据来保留文件的版本历史。
不可变性的局限性
永远保留所有变更的不可变历史在多大程度上是可行的?答案取决于数据集的变更量。某些工作负载主要是添加数据,很少更新或删除;这类工作负载很容易实现不可变性。其他工作负载在相对较小的数据集上有很高的更新和删除率;在这种情况下,不可变历史可能会变得过大,碎片化可能成为问题,并且压缩和垃圾回收的性能对于运维稳健性至关重要 [45, 46]。
除了性能原因外,尽管具有不可变性,你可能还需要出于管理或法律原因删除数据。例如,GDPR等隐私法规要求根据请求删除用户的个人信息并移除错误信息,或者可能需要限制敏感信息的意外泄露。
在这种情况下,仅向日志追加另一个事件以指示先前的数据应被视为已删除是不够的——你实际上希望重写历史,假装数据从未被写入过。例如,Datomic将此功能称为 excision [47],而Fossil版本控制系统有一个类似的概念称为 shunning [48]。
真正删除数据异常困难 [49],因为副本可能存在于许多地方。存储引擎、文件系统和固态硬盘通常写入新位置,而不是就地覆盖数据 [41],并且备份通常故意不可变以防止意外删除或损坏。
一种实现不可变数据删除的方法是加密粉碎 [50]。你未来可能想要删除的数据以加密形式存储,当你想要销毁它时,就忘记加密密钥。加密数据仍然存在,但无人能使用。
从某种意义上说,这只不过是将问题转移了:实际数据仍然不可变,但你的密钥存储是可变的。此外,你必须提前决定用同一密钥加密哪些数据,以及何时使用不同的密钥——这是一个重要的决策,因为你以后可以对特定密钥加密的所有数据或全部数据(但不能是部分数据)进行加密粉碎。为每个单独的数据项存储单独的密钥会变得过于笨拙,因为密钥存储的大小将与主数据存储相同。更复杂的方案(如可穿孔加密 [51])使得能够选择性撤销密钥的解密能力,但尚未广泛使用。
总的来说,删除更多的是“使检索数据变得更难”而不是“使检索数据变得不可能”。然而,有时你不得不尝试,正如我们在第596页的“立法与自我监管”中所看到的。
处理流
到目前为止,本章我们讨论了流的来源(用户活动事件、传感器和数据库写入)以及流的传输方式(通过直接消息传递、消息代理和事件日志)。
剩下要讨论的是:一旦你获得了流,你能用它做什么?你可以处理它。广义上,你有三个选项:
- 你可以将事件中的数据写入数据库、缓存、搜索引擎或类似的存储系统,然后其他客户端可以从中查询。如图12-5所示,这是一种保持数据库与系统中其他部分变化同步的好方法——特别是当流消费者是唯一写入数据库的客户端时。写入存储系统是我们在第476页“批处理用例”中所讨论内容的流式等价物。
- 你可以以某种方式将事件推送给用户——例如,发送电子邮件警报或推送通知,或者将事件流式传输到实时仪表板进行可视化。在这种情况下,人类是流的最终消费者。
- 你可以处理一个或多个输入流,以产生一个或多个输出流。流可能经过由几个这样的处理阶段组成的管道,最终到达输出(选项1或2)。
在本章的剩余部分,我们将讨论选项3:处理流以生成其他派生流。像这样处理流的代码片段被称为运算符或作业。它与我们在第11章中讨论的Unix进程和MapReduce作业密切相关,数据流的模式类似:流处理器以只读方式消费输入流,并以仅追加方式将其输出写入不同位置。
流处理器中的分片和并行化模式也与MapReduce以及我们在第11章中看到的数据流引擎非常相似,因此这里不再重复这些主题。基本的映射操作(如转换和过滤记录)也以相同方式工作。
与批处理作业的一个关键区别在于流永远不会结束。这一区别有诸多影响。如本章开头所述,对无界数据集进行排序没有意义,因此排序-合并连接(见第471页的“连接与分组”)无法使用。容错机制也必须改变。对于运行了几分钟的批处理作业,失败的任务可以简单地从头重新启动;但对于已经运行了几年的流作业,崩溃后从头重新启动可能不是一个可行的选项。
流处理的用途
流处理长期以来被用于监控目的,组织希望在特定事件发生时收到警报。例如:
- 欺诈检测系统需要确定信用卡的使用模式是否意外变化,如果卡可能被盗,则阻止该卡。
- 交易系统需要检查金融市场的价格变化,并根据指定规则执行交易。
- 制造系统需要监控工厂中机器的状态,并在发生故障时快速识别问题。
- 军事和情报系统需要跟踪潜在对手的活动,并在任何攻击迹象出现时发出警报。
这类应用需要相当复杂的模式匹配和关联。然而,随着时间的推移,流处理的其他用途也逐渐出现。在本节中,我们将简要比较和对比其中一些应用。
复杂事件处理
复杂事件处理(CEP)是20世纪90年代发展起来的一种分析事件流的方法,特别适用于需要搜索特定事件模式的应用 [52]。类似于正则表达式允许你在字符串中搜索特定字符模式,CEP允许你指定规则来搜索流中的特定事件模式。
CEP系统通常使用高级声明式查询语言(如SQL)或图形用户界面来描述应检测的事件模式。这些查询被提交给一个处理引擎,该引擎消费输入流并在内部维护一个执行必要匹配的状态机。当找到匹配时,引擎会发出一个复杂事件(因此得名),其中包含检测到的事件模式的详细信息 [53]。
在这些系统中,查询和数据之间的关系与普通数据库相反。通常,数据库持久存储数据,并将查询视为临时的。当查询到来时,数据库搜索与查询匹配的数据。
第12章:流处理
流处理
复杂事件处理(CEP)
在这些系统中,查询与数据之间的关系与常规数据库相反。通常,数据库持久化存储数据,并将查询视为临时性的。当查询到来时,数据库搜索匹配该查询的数据,并在完成后忘记该查询。CEP引擎则反转了这些角色:查询被长期存储;每当事件到达时,引擎检查是否已观察到与该引擎的某个常驻查询相匹配的事件模式54。
CEP的具体实现包括Esper、Apama和TIBCO StreamBase。分布式流处理器(如Flink和Spark Streaming)也支持对流进行声明式查询的SQL。
流分析
流处理也用于流上的分析。CEP与流分析之间的界限是模糊的,但一般而言,流分析更少关注检测特定事件序列,而更侧重于对大量事件进行聚合和统计度量。应用示例如下:
- 测量某类事件的发生速率(每时间间隔内发生的频率)。
- 计算某值在一段时间内的滚动平均值。
- 将当前统计量与先前时间间隔进行比较(例如,检测趋势,或对与上周同期相比异常高或低的指标发出警报)。
此类统计量通常在固定时间间隔内计算——例如,你可能希望了解过去五分钟内某服务的每秒平均查询数及其99百分位响应时间。对几分钟内的数据取平均,可以平滑掉每秒之间的无关波动,同时仍能及时反映流量模式的变化。用于聚合的时间间隔称为窗口;我们将在第518页的“关于时间的推理”一节中更详细地探讨窗口。
流分析系统有时会使用概率算法,例如用于集合成员检查的布隆过滤器(我们在第122页的“布隆过滤器”中遇到过)、用于基数估计的HyperLogLog 55,以及各种百分位估计算法(参见第42页的“计算百分位数”)。概率算法产生近似结果,但其优点是所需内存远少于精确算法。这种算法近似性的使用有时会让人认为流处理系统总是有损且不精确的,但这是错误的。流处理本身并非近似,使用概率算法只是一种优化手段56。
许多开源分布式流处理框架,如Apache Storm、Spark Streaming、Flink、Samza、Apache Beam和Kafka Streams,都是为分析而设计的57。托管服务包括Google Cloud Dataflow和Azure Stream Analytics。
维护物化视图
我们已经看到,数据库变更流可用于保持派生数据系统(如缓存、搜索索引和数据仓库)与源数据库同步。这些是维护物化视图的示例:从数据集派生替代视图,以便高效查询,并在底层数据发生变化时更新该视图37。
类似地,在事件溯源中,通过应用事件日志来维护应用状态;这里,应用状态也是一种物化视图。与流分析场景不同,仅考虑特定时间窗口内的事件通常不够。构建物化视图可能需要在任意时间段内的所有事件,除了那些可能通过日志压缩丢弃的过时事件。实际上,你需要一个一直延伸到时间起点的窗口。
原则上,任何流处理器都可以用于物化视图维护,但需要无限期保存事件这一点与某些面向分析的框架(主要操作有限时间窗口)的假设相悖。Kafka Streams和Confluent的ksqlDB支持这种用法,其构建在Kafka对日志压缩的支持之上58。
增量视图维护
数据库可能看起来非常适合物化视图维护;它们毕竟设计用于保存完整的数据集。许多数据库也支持物化视图。我们在第143页的“物化视图与数据立方体”中看到,数据仓库常见的分析查询可以物化为OLAP立方体。
不幸的是,数据库通常使用定期批处理作业或按需请求(如PostgreSQL的REFRESH MATERIALIZED VIEW)来刷新物化视图表,而不是在源数据每次更新时立即更新。这种方法有两个显著缺点,使其不适合流处理视图维护:
- 效率低下:每次更新视图时都重新处理所有数据,而大部分数据可能没有变化。
- 数据新鲜度差:源数据的变化不会反映在物化视图中,直到其查询再次运行(在下一次计划更新时)。
可以编写数据库触发器,在数据易于分区且计算天然可增量时高效地更新物化视图。例如,如果物化视图维护每日销售总收入,则每次发生新销售时都可以更新相应天的行。定制解决方案在少数情况下有效,但许多SQL查询不能轻松或高效地转换为增量计算。
增量视图维护(Incremental View Maintenance, IVM) 是上述问题更通用的解决方案。IVM技术将用SQL或其他语言编写的查询转换为能够进行增量计算的算子。IVM算法不处理整个数据集,而是仅重新计算和更新已更改的数据38, 59, 60。这样视图计算效率大大提高;这意味着可以更频繁地运行更新,从而显著提高数据新鲜度。
诸如Materialize 61、RisingWave、ClickHouse和Feldera等数据库都使用IVM技术来提供高效的增量物化视图。这些数据库摄取事件流,以实时暴露物化视图。近期事件在内存中缓冲,并定期用于更新磁盘上的物化视图。读取操作结合近期事件和物化数据,提供单一的实时视图。由于读取通常用SQL表达,且物化视图通常以OLAP风格的格式存储,这些系统也支持第11章讨论的大规模数据仓库风格的查询。
流搜索
除了允许搜索由多个事件组成的模式的CEP之外,有时也需要基于复杂标准(如全文搜索查询)搜索单个事件。例如,媒体监控服务订阅来自媒体的新闻文章和广播的源,并搜索提及公司、产品或感兴趣话题的新闻。这通过预先制定搜索查询,然后持续将新闻流与该查询进行匹配来实现。某些网站也存在类似功能——例如,房地产网站的用户可以请求在出现符合搜索条件的新房源时收到通知。Elasticsearch的percolator功能62是实现这种流搜索的一种选择。
传统搜索引擎首先对文档建立索引,然后在索引上运行查询。相比之下,搜索流则颠倒处理方式:查询被存储,文档被针对它们进行评估,如同在CEP中一样。最简单的情况下,你可以对每个文档测试每个查询,但如果查询数量很大,这会很慢。为了优化该过程,可以对查询和文档都建立索引,从而缩小可能匹配的查询集合63。
事件驱动架构与RPC
在第189页的“事件驱动架构”中,我们讨论了消息传递系统作为RPC的替代方案。这种服务间的通信机制例如在Actor模型中使用。
尽管这些系统也基于消息和事件,但我们通常不将它们视为流处理器,原因如下:
- Actor框架主要是一种管理并发和通信模块分布式执行的机制,而流处理主要是一种数据管理技术。
- Actor之间的通信通常是短暂且一对一的,而事件日志是持久化且多订阅者的。
- Actor可以以任意方式通信(包括循环的请求/响应模式),而流处理器通常被设置为无环管道,其中每个流是某个特定作业的输出,并源自一组明确定义的输入流。
尽管如此,RPC类系统与流处理之间存在一些交叉。例如,Apache Storm具有称为分布式RPC的功能,允许将用户查询分配给也处理事件流的一组节点;这些查询然后与输入流中的事件交错,结果可以聚合并发回给用户。
也可以使用Actor框架来处理流。然而,许多此类框架在崩溃时不能保证消息传递,因此除非实现额外的重试逻辑,否则处理不是容错的。
关于时间的推理
流处理器经常需要处理时间,尤其是在运行分析任务时,这些任务经常使用时间窗口,例如“过去五分钟的平均值”。“过去五分钟”的含义看似明确清晰,但不幸的是这个概念的微妙之处令人吃惊。
在批处理过程中,处理任务快速处理大量历史事件。如果需要按时间进行某种分类,批处理需要查看嵌入在每个事件中的时间戳。查看运行进程的机器上的系统时钟是没有意义的,因为运行这些进程的时间与事件实际发生的时间无关。批处理可能在几分钟内读取一年的历史事件;在大多数情况下,感兴趣的时间线是那一年的历史,而不是处理的那几分钟。此外,使用事件中的时间戳允许处理是确定性的:多次运行相同的批处理会得到相同的结果,而不依赖于处理发生的时间。
另一方面,许多流处理框架使用处理机器上的本地时钟(进程的时间)来确定窗口。这种方法的优点是简单,但存在一个问题:如果事件的处理与事件实际发生的时间之间存在显著延迟,则时间窗口会失去其含义。在极端情况下,由于处理节点上的严重资源短缺(排队)、网络问题或中间消息代理上的竞争,事件延迟可能高达数秒甚至数小时。使用本地时钟会使系统对处理中任何中断或延迟敏感,从而使结果取决于事件何时被处理而非事件何时发生。
例如,假设某个移动应用程序向服务器报告使用情况统计信息。如果移动设备离线,它可能会在线时立即发送大量累积事件。在这种情况下,应用程序在其报告中包含了移动设备上事件的实际时间戳。处理系统应尊重这些时间戳,因为事件的实际发生时间比服务器接收事件的时间更具相关性。因此,事件的时间戳不应与处理时间混淆。
在分布式系统中,由于时钟歪斜(请参阅第287页的“不可靠的时钟”),不同节点上的本地时钟可能不同意。有许多数据库依赖于时钟来保持时间戳同步,例如Google的Spanner(请参阅第290页的“使用时钟获取信任”),但这对于日常的流处理器来说并非常见。
为处理事件时间戳和本地时钟之间的差异,流处理器可以支持将消息代理中的消息传递延迟考虑在内。例如,Apache Kafka的0.10及以上版本在消息上包含一个时间戳,该时间戳可以设置为事件生成时间或消息被代理接收的时间。此外,由于可用时间戳在不同来源的可靠性方面可能存在差异——有些来自非常可靠的服务器集群,有些来自未经校准的移动设备——因此考虑时钟歪斜可能是有用的,例如,通过跟踪跨数据中心的时钟差异。
时间戳的可靠性
事件时间戳的来源可能差异很大——有些来自经过良好同步的服务器集群,有些来自未经校准的移动设备。处理系统应考虑时钟歪斜的影响,例如通过跟踪跨不同来源的时钟差异。
窗口类型
窗口通常分为几种类型,各有不同的用途和挑战:
- 滚动窗口:固定长度、不重叠的时间段(例如,每五分钟一个窗口)。每个事件恰好属于一个滚动窗口。
- 滑动窗口:固定长度但可能重叠的时间段(例如,每30秒滑动一次的最后五分钟窗口)。一个事件可能属于多个滑动窗口。
- 会话窗口:由活动突发定义,后跟不活动期(例如,用户购物会话)。会话窗口的长度不固定;它基于事件间的时间间隔。
窗口可以在事件时间或处理时间上定义。事件时间窗口尊重事件中的时间戳,而处理时间窗口使用处理器上的时钟。事件时间窗口更符合现实,但处理起来更复杂,因为事件可能无序到达,延迟可能会发生变化。
事件时间 vs 处理时间
尽可能使用事件时间窗口,以保持与分析目标一致。但处理事件时间需要处理迟到事件和无序数据,这通常通过允许一定程度的延迟或使用水印来处理。
水印是流处理中的一个概念,用于表示在某个时间点之前的所有事件预计都已被观察到的时间戳。例如,如果水印设置为时间T,则系统假定所有时间戳⇐T的事件都已到达。一旦水印前进,具有更早时间戳的事件被视为迟到事件,可以被丢弃或放入侧输出进行特殊处理。
处理迟到事件
在实践中,很难保证所有事件都严格按时间戳顺序到达。网络延迟、客户端缓存、重试和分布式系统的其他特性都可能导致事件乱序到达。在处理事件时间窗口时,系统通常采用以下策略之一:
- 忽略迟到事件:窗口一旦触发,忽略在此之后到达的事件。这简单但可能丢失数据。
- 重新触发窗口:当迟到事件到达时,更新窗口的结果并触发更新输出。这确保准确性,但可能导致下游结果出现多次更新。
- 等待并允许一定程度的迟到:指定一个最大允许延迟(例如,设为事件时间后10分钟)。在此延迟内到达的事件将被包含,之后到达的事件被丢弃或单独处理。
对于实现这些策略的流处理框架,Flink提供了丰富的支持,通过水印、允许的延迟(allowedLateness)和侧输出(side outputs)来处理迟到数据。
处理时间与事件时间权衡
处理时间窗口实现简单但结果不可重复且对延迟敏感。事件时间窗口更准确但需要处理乱序和延迟,通常需要设置水印和容忍延迟。
以上内容涵盖了你提供的文本,从“第十二章:流处理”开始,完整翻译并保留所有信息、示例、引用和解释。未包含[CONTEXT_OVERLAP]部分。
12.1 流处理
事件时间与处理时间
批处理可以在几分钟内读取一年历史事件;在大多数情况下,感兴趣的时间线是那一年的历史,而非那几分钟的处理过程。此外,使用事件中的时间戳可以使处理变得确定性:在相同输入上重复运行同一进程会得到相同结果。
另一方面,许多流处理框架使用处理机器的本地系统时钟(即处理时间)来确定窗口[64]。这种方法简单,且如果事件创建到事件处理之间的延迟极短,它也是合理的。但只要有显著的处理滞后(即处理发生时间明显晚于事件发生时间),这种方法就会失效。
事件时间 vs 处理时间
处理可能因多种原因延迟,包括排队、网络故障、导致消息代理或处理器争用的性能问题、流消费者的重启,或在故障恢复或代码错误修复后重新处理过去的事件。
消息延迟还可能导致消息顺序不可预测。例如,假设用户先发出一个 Web 请求(由 Web 服务器 A 处理),然后第二个请求(由服务器 B 处理)。A 和 B 发出描述它们处理的请求的事件,但 B 的事件先于 A 的事件到达消息代理。现在流处理器会先看到 B 事件,再看到 A 事件,尽管它们的发生顺序相反。
如果类比有助于理解,请考虑《星球大战》系列电影。第四部于1977年上映,第五部1980年,第六部1983年,随后是第一部、第二部和第三部于1999、2002和2005年上映,第七、八、九部于2015、2017和2019年上映[65]。如果你按照上映顺序观看,那么你的处理顺序与叙事顺序是不一致的(集数编号就像事件时间戳,观看电影的时间就是处理时间)。作为人类,我们能应对这种不连续,但流处理算法需要专门编写以适应这些时间与排序问题。
混淆事件时间和处理时间会导致错误数据。例如,假设你的流处理器测量请求速率(每秒请求数)。如果重新部署流处理器,它可能关闭一分钟,恢复后处理积压事件。如果基于处理时间测量速率,那么在处理积压时看起来就像是请求突然出现异常尖峰,而实际请求速率是平稳的(图12-8)。
图12-8:按处理时间设置窗口会因处理速率变化而产生伪影。
处理迟到事件
根据事件时间定义窗口时的一个棘手问题是你永远无法确定是否已收到某个特定窗口的所有事件,还是仍有部分事件尚未到达。
例如,假设你将事件分组到 1 分钟窗口,以便统计每分钟的请求数。你已经统计了时间戳落在第37分钟内的一些事件,时间已推移;现在大部分传入事件属于第38和第39分钟。你何时声明第37分钟的窗口已完成并输出计数值?
你可以设置超时,如果在某段时间内没有看到该窗口的新事件,就声明窗口就绪。然而,某些事件可能缓冲在其他机器上,因网络中断而延迟。你需要能够处理这些在窗口已被声明完成后才到达的迟到事件。大体上有两种选择[1]:
- 忽略迟到事件:因为在正常情况下它们可能只占事件的一小部分。你可以将丢弃事件的数量作为指标跟踪,如果开始丢弃大量数据则发出警报。
- 发布修正:包含迟到事件的窗口更新值。你可能还需要撤销之前的输出。
在某些情况下,可以使用特殊消息来指示“从现在起,不会再有时间戳早于 t 的消息”,消费者可以利用它触发窗口[66]。但是,如果多个在不同机器上的生产者生成事件,每个都有自己的最小时间戳阈值,消费者需要单独跟踪每个生产者。这种情况下添加和移除生产者会更加棘手。
你到底使用谁的时钟?
当事件在系统中多个点被缓冲时,为事件分配时间戳会变得更加困难。例如,考虑一个移动应用将使用指标事件报告给服务器。该应用可能在设备离线时被使用,这将导致事件在设备本地缓冲,并在下次互联网连接时发送给服务器(可能延迟数小时甚至数天)。对于该流的任何消费者来说,这些事件看起来就像是极度延迟的迟到事件。
在这种情况下,事件上的时间戳应该真正是用户交互发生的时间(根据移动设备的本地时钟)。然而,用户控制设备上的时钟通常不可信,因为它可能被意外或故意设为错误时间(参见第360页的“时钟同步与准确性”)。事件被服务器接收的时间(根据服务器时钟)更可能准确,因为服务器在你的控制之下,但在描述用户交互方面意义较小。
为了调整不准确的设备时钟,一种方法是记录三个时间戳[67]:
- 事件发生的时间(根据设备时钟)
- 事件发送到服务器的时间(根据设备时钟)
- 事件被服务器接收的时间(根据服务器时钟)
通过从第三个时间戳中减去第二个时间戳,你可以估计设备时钟与服务器时钟之间的偏移(假设网络延迟相对于所需的时间戳精度可以忽略不计)。然后你可以将该偏移应用于事件时间戳,从而估计事件发生的真实时间(假设事件发生到发送到服务器之间设备时钟偏移没有改变)。
这个问题并不独属于流处理;批处理也面临着完全相同的时间推理问题。只是在流处理上下文中,我们对时间的流逝更加敏感。
窗口类型
一旦你确定了如何确定事件的时间戳,下一步就是决定如何定义时间段上的窗口。窗口随后可用于聚合——例如,统计事件数或计算窗口内值的平均值。常见窗口类型有[64, 68]:
滚动窗口(Tumbling window):固定长度,每个事件恰好属于一个窗口。例如,如果有一个1分钟的滚动窗口,那么时间戳在10:03:00到10:03:59之间的所有事件归为一组,10:04:00到10:04:59的事件归入下一组,依此类推。你可以通过将每个事件时间戳向下取整到最近的分钟来实现1分钟滚动窗口,以确定它所属的窗口。
跳跃窗口(Hopping window):固定长度,但连续窗口之间存在重叠以提供平滑。例如,一个5分钟窗口,跳跃大小为1分钟,将包含10:03:00到10:07:59的事件,下一个窗口覆盖10:04:00到10:08:59的事件,依此类推。你可以先计算1分钟滚动窗口,然后聚合多个相邻窗口来实现。
滑动窗口(Sliding window):包含彼此在某个时间间隔内发生的所有事件。例如,一个5分钟的滑动窗口将覆盖10:03:39和10:08:12的事件,因为它们相隔不到5分钟(注意,5分钟的滚动和跳跃窗口不会将这两个事件放在同一窗口,因为它们使用固定边界)。滑动窗口可以通过维护一个按时间排序的事件缓冲区,并在事件过期时将其移除来实现。
会话窗口(Session window):与其他窗口类型不同,会话窗口没有固定时长。相反,它通过将同一用户在时间上紧邻发生的所有事件分组来定义,当用户处于不活动状态一段时间(例如,30分钟内无事件发生)时窗口结束。会话化是网站分析的常见需求。
窗口操作通常维护临时状态。在某些情况下,无论窗口多大或事件多少,状态大小是固定的——例如,计数操作无论窗口大小或事件数量如何,只有一个计数器。另一方面,滑动窗口或流连接(我们在下一节讨论)需要将事件缓冲到窗口结束。因此,大窗口大小或高吞吐量流可能导致流处理器维护大量临时状态。这时必须确保运行流处理任务的机器有足够容量来维护这些状态,无论是在内存还是磁盘中。
第12章:流处理
流连接
在第471页的“连接与分组”中,我们讨论了批处理作业如何通过键连接数据集,以及这种连接如何构成数据管道的重要组成部分。由于流处理将数据管道泛化为对无界数据集的增量处理,因此对流上的连接存在完全相同的需求。
然而,新事件随时可能出现在流上这一事实,使得流上的连接比批处理作业更具挑战性。为了更好地理解情况,我们区分三种类型的连接:流–流连接、流–表连接和表–表连接。以下各节通过示例逐一说明。
流–流连接(窗口连接)
假设你的网站有一个搜索功能,你想检测所搜索 URL 的最新趋势。每当有人输入搜索查询时,你记录一个包含查询和返回结果的事件。每当有人点击其中一个搜索结果时,你记录另一个事件,记录该点击。要计算搜索结果的每个 URL 的点击率,你需要将搜索动作和点击动作的事件关联起来,它们通过相同的会话 ID 连接。广告系统中也需要类似的分析[69]。
如果用户放弃搜索,点击可能永远不会发生;即使发生,搜索与点击之间的时间也可能变化很大。在许多情况下可能是几秒钟,但也可能长达数天或数周(如果用户运行搜索后忘记浏览器标签页,稍后返回并点击结果)。由于网络延迟可变,点击事件甚至可能在搜索事件之前到达。你可以为连接选择一个合适的窗口——例如,你可以选择连接一个点击和一个搜索,如果它们相隔最多一小时发生。
注意,将搜索细节嵌入点击事件并不等同于连接事件。这样做只能告诉你用户点击了搜索结果的情况,而不能告诉你用户未点击任何结果的情况。要衡量搜索质量,你需要准确的点击率,这就要求同时拥有搜索事件和点击事件。
要实现这种类型的连接,流处理器需要维护状态——例如,过去一小时内发生的所有事件,按会话 ID 索引。每当搜索事件或点击事件发生时,它被添加到相应的索引中,流处理器还检查另一个索引,看是否已经到达同一会话 ID 的另一个事件。如果有匹配事件,你发出一个事件说明哪个搜索结果被点击。如果搜索事件过期而没有匹配的点击事件,你发出一个事件说明哪些搜索结果未被点击。
流–流连接需要维护一个时间窗口内的状态。
流–表连接(流富化)
在第471页的“连接与分组”(图11-2)中,我们看到了一个批处理作业连接两个数据集的例子:一组用户活动事件和一个用户配置文件数据库。将用户活动事件视为流,并在流处理器中持续执行相同的连接是很自然的想法。输入是包含用户ID的活动事件流,输出是用户ID已用该用户的配置文件信息增强过的活动事件流。这个过程有时被称为从数据库中富化活动事件。
要执行此连接,流处理需要一次处理一个活动事件,在数据库中查找该事件的用户ID,并将配置文件信息添加到活动事件中。数据库查找可以通过查询远程数据库来实现;然而,正如第471页的“连接与分组”所讨论的,这样的远程查询可能很慢,并有使数据库过载的风险[58]。
另一种方法是将数据库的副本加载到流处理器中,以便在没有网络往返的情况下在本地查询。这种技术称为哈希连接,因为数据库的本地副本如果足够小,可能是内存中的哈希表,或者是本地磁盘上的索引。
与批处理作业的区别在于,批处理作业将数据库的时间点快照作为输入,而流处理器是长时间运行的,数据库的内容可能随时间变化,因此流处理器的数据库本地副本需要保持最新。这个问题可以通过CDC解决。流处理器可以订阅用户配置文件数据库的变更日志以及活动事件流。当配置文件被创建或修改时,流处理器更新其本地副本。这样,我们获得了两个流之间的连接:活动事件和配置文件更新。
流–表连接与流–流连接非常相似。最大的区别在于,对于表变更日志流,连接使用一个回退到“时间起点”(概念上的无限窗口)的窗口,更新的记录版本会覆盖旧的版本。对于流输入,连接可能根本不维护窗口。
通过CDC将数据库变更作为流处理,可以实现低延迟的流–表连接。
表–表连接(物化视图维护)
考虑我们在第34页的“案例研究:社交媒体主页时间线”中讨论的社交网络时间线示例。我们说,当用户想要查看他们的主页时间线时,迭代所有关注的人,找到他们最近的帖子并合并起来,成本太高。
相反,我们想要一个时间线缓存,一种每用户的“收件箱”,帖子在发送时被写入其中,这样读取时间线只需要一次查找。物化并维护这个缓存需要以下事件处理:
- 当用户 u 发送新帖子时,该帖子被添加到所有关注 u 的用户的时间线中。
- 当用户删除帖子或删除整个账户时,该帖子从所有用户的时间线中移除。
- 当用户 u1 开始关注用户 u2 时,u2 的最近帖子被添加到 u1 的时间线。
- 当用户 u1 取消关注 u2 时,u2 的帖子从 u1 的时间线中移除。
要在流处理器中实现此缓存维护,你需要帖子事件(发送和删除)和关注关系事件(关注和取消关注)的流。流处理需要维护一个数据库,包含每个用户的关注者集合,以便知道当新帖子到达时需要更新哪些时间线。
这种流处理的另一种看法是,它维护了一个连接两个表(帖子 and 关注)的查询的物化视图——类似如下查询:
SELECT follows.follower_id AS timeline_id,
array_agg(posts.* ORDER BY posts.timestamp DESC)
FROM posts
JOIN follows ON follows.followee_id = posts.sender_id
GROUP BY follows.follower_id流之间的连接直接对应于此查询中的表连接。时间线实际上是查询结果的一个缓存,每次底层表发生变化时更新。
NOTE
如果将流视为表的导数,如图12-7所示,并将连接视为两个表的乘积 u·v,那么物化连接的变更流遵循乘积规则 (u·v)′ = u′v + uv′。任何帖子的变更与当前关注者连接,任何关注关系的变更与当前帖子连接[37]。
连接的时间依赖性
这里描述的三种连接类型(流–流、流–表、表–表)有很多共同点。它们都要求流处理器维护一个状态(搜索和点击事件、用户配置文件或关注者列表),该状态来自一个连接输入,并在处理另一个输入中的记录时查询该状态。
维护状态的事件顺序很重要——例如,是先关注然后取消关注用户,还是相反,会影响结果。在像 Kafka 这样的分片事件日志中,单个分片(分区)内的事件顺序得以保留,但不同流或分片之间通常没有顺序保证。
这就提出了一个问题:如果不同流上的事件发生在接近的时间,它们按什么顺序被处理?在流–表连接示例中,例如,如果用户更新了他们的配置文件,哪些活动事件与旧配置文件(在配置文件更新之前处理)连接,哪些与新配置文件(在配置文件更新之后处理)连接?换句话说:如果状态随时间变化,并且你与一个状态连接,你使用哪个时间点进行连接?
这种时间依赖性可能出现在许多地方。例如,如果你销售东西,你需要对发票应用正确的税率,这取决于国家或州、产品类型和销售日期(因为税率随时变化)。在将销售数据与税率表连接时,你可能希望与销售时的税率连接,如果你正在重新处理历史数据,这可能与当前税率不同。
如果流之间的事件的顺序不确定,连接将变得非确定性[70],这意味着你不能在相同的输入上重新运行相同的作业并保证获得相同的结果。当再次运行作业时,输入流上的事件可能以不同的方式交错。
在数据仓库中,这个问题被称为缓慢变化维度(SCD),通常通过为连接记录的特定版本使用唯一标识符来解决——例如,每次税率变化时,给它一个新的标识符,发票包含销售时税率的标识符[71, 72]。这种改变使连接变得确定,但后果是日志压缩变得不可能,因为表中记录的所有版本都需要保留。或者,你可以反规范化数据,将适用的税率直接包含在每个销售事件中。
时间依赖性可能导致非确定性连接,需要小心处理(如使用SCD或反规范化)。
容错
在本章的最后一节,我们考虑流处理器如何容忍故障。在第11章中,我们看到批处理框架可以相当容易地容忍故障:如果一个任务失败,它可以简单地在另一台机器上重新启动,失败任务的输出被丢弃。这种透明的重试是可能的,因为输入文件是不可变的,每个任务将其输出写入一个单独的文件,并且输出只有在任务成功完成时才可见。
特别地,批处理方法的容错确保即使某些任务失败,批处理作业的输出也与没有任何错误发生时的输出相同。它
第12章:流处理
看起来仿佛每条输入记录都被恰好处理一次——没有记录被跳过,也没有记录被处理两次。尽管重启任务意味着记录可能被处理多次,但在输出中的可见效果却如同它们只被处理过一次。这一原则被称为恰好一次语义,尽管“有效一次”可能是一个更具描述性的术语 [73]。
同样的容错问题也出现在流处理中,但处理起来不那么直接。等到任务完成再使其输出可见是不可行的,因为流是无限的,因此你永远无法完成处理。
微批处理与检查点
一种解决方案是将流分解成小块,并将每个块视为一个微型的批处理过程。这种方法称为微批处理,用于 Spark Streaming [74]。批处理大小通常约为1秒,这是性能权衡的结果:更小的批处理会产生更大的调度和协调开销,而更大的批处理则意味着流处理器结果变得可见的延迟更长。
微批处理还隐式提供了一个等于批处理大小的滚动窗口(按处理时间而非事件时间戳进行窗口化);任何需要更大窗口的作业必须显式地将状态从一个微批次传递到下一个微批次。
另一种变体方法,用于 Apache Flink,是定期生成状态的滚动检查点并将其写入持久化存储 [75, 76]。如果流操作符崩溃,它可以从最近的检查点重新启动,并丢弃在最后检查点与崩溃之间生成的任何输出。检查点由消息流中的屏障触发,类似于微批次之间的边界,但不会强制特定的窗口大小。
在流处理框架的范围内,微批处理和检查点方法提供了与批处理相同的恰好一次语义。然而,一旦输出离开流处理器(例如当它写入数据库、向外部消息代理发布消息或触发发送电子邮件时),框架便无法再丢弃失败微批次的输出。在这种情况下,重启失败的任务会导致外部副作用发生两次,仅靠微批处理或检查点不足以防止此问题。
重新审视原子提交
为了在出现故障时呈现恰好一次处理的效果,我们需要确保处理事件的所有输出和副作用当且仅当处理成功时才持久化。这包括发送给下游操作符或外部消息系统(包括电子邮件或推送通知)的任何消息、数据库写入、操作符状态的更改以及对输入消息的确认(包括在基于日志的消息代理中向前移动消费者偏移量)。
这些操作必须全部原子性地发生:要么全部发生,要么一个都不发生。如果这种方法听起来耳熟,那是因为我们在“恰好一次消息处理”(第329页)中,在分布式事务和两阶段提交的上下文中讨论过它。
我们在第8章中考虑了传统分布式事务实现(如XA)的问题。然而,在更受限的环境中,有可能高效地实现这样的原子提交设施。这种方法用于 Google Cloud Dataflow [66, 75]、VoltDB [77] 和 Apache Kafka [78, 79]。与 XA 不同,这些实现不会试图在异构技术间提供事务,而是通过将状态变更和消息传递都管理在流处理框架内部来保持事务的内部性。通过在一个事务中处理多条输入消息,可以摊销事务协议的开销。
幂等性
我们的目标是丢弃任何失败任务的部分输出,以便它们可以被安全地重试。分布式事务是实现这一目标的一种方式;另一种方式是依赖幂等性,正如我们在“持久化执行与工作流”(第187页)中所见 [80]。
幂等操作是指可以执行多次,且效果与只执行一次相同的操作。例如,在键值存储中删除一个键是幂等的(再次删除没有进一步影响),而递增计数器则不是幂等的(再次递增会使值增加两次)。
即使一个操作本质不是幂等的,通常也可以用一点额外的元数据使其变得幂等。例如,从 Kafka 消费消息时,每条消息都有一个持久的、单调递增的偏移量。当向外部数据库写入值时,你可以将触发最后一次写入的消息偏移量也包含在该值中。这样,你就能判断更新是否已经被应用,并避免再次执行相同的更新。Storm 的 Trident 中的状态处理基于类似的想法。
依赖幂等性意味着若干假设:重启失败的任务必须以相同的顺序重放相同的消息(基于日志的消息代理可以做到这一点),处理必须是确定性的,且没有其他节点可以同时更新同一个值 [81, 82]。当从一个处理节点故障转移到另一个节点时,可能需要隔离(见“分布式锁与租约”,第373页)来防止一个被认为已死亡但实际上还活着的节点造成干扰。尽管有这些注意事项,幂等操作可以成为一种有效的方式,仅用很小的开销就能实现恰好一次语义。
失败后重建状态
任何需要状态的流处理——例如,窗口聚合(如计数器、平均值和直方图)以及用于连接的任何表和索引——必须确保该状态在失败后可以恢复。
一种选择是将状态保存在远程数据存储中并进行复制,尽管为每条消息查询远程数据库可能会很慢。另一种选择是将状态本地保存在流处理器中并定期复制。这样,当流处理器从故障中恢复时,新任务可以读取复制后的状态并继续处理,而不会丢失数据。
例如,Flink 定期捕获操作符状态的快照并将其写入持久化存储,如分布式文件系统 [75, 76];Kafka Streams 通过将状态变更发送到带有日志压缩的专用 Kafka 主题来复制状态变更,类似于 CDC [83]。VoltDB 通过在多个节点上冗余处理每条输入消息来复制状态(见“实际的串行执行”,第309页)。
在某些情况下,甚至可能不需要复制状态,因为它可以从输入流中重建。例如,如果状态由相当短窗口的聚合组成,那么重放该窗口对应的输入事件可能足够快。如果状态是通过 CDC 维护的数据库的本地副本,那么也可以从日志压缩的变更流中重建数据库。
所有这些都取决于底层基础设施的性能特征。在某些系统中,网络延迟可能低于磁盘访问延迟,网络带宽可能与磁盘带宽相当。没有一种解决方案对所有情况都普遍理想,随着存储和网络技术的发展,本地状态与远程状态的优劣也可能发生转变。
总结
在本章中,我们讨论了事件流、它们的作用以及如何处理它们。在某些方面,流处理与我们在第11章中讨论的批处理非常相似,但它是持续地作用于无界(永无止境)的流,而不是固定大小的输入 [84]。从这个角度来看,消息代理和事件日志扮演着流式文件系统的角色。
我们花了一些时间比较两种类型的消息代理:
-
AMQP/JMS风格的消息代理:代理将单个消息分配给消费者,消费者在成功处理消息后确认单个消息。消息在确认后从代理中删除。这种方法适合作为异步形式的 RPC(另见“事件驱动架构”,第189页)——例如,在任务队列中,消息处理的确切顺序不重要,并且在处理完毕后不需要回头重新读取旧消息。
-
基于日志的消息代理:代理将分片中的所有消息分配给同一个消费者节点,并始终以相同的顺序传递消息。并行性通过分片实现,消费者通过检查点记录最后处理消息的偏移量来跟踪进度。代理将消息保留在磁盘上,因此如果必要,可以跳回并重新读取旧消息。
基于日志的方法与数据库中的复制日志(见第6章)和日志结构存储引擎(见第4章)有相似之处。它也是一种共识形式,正如我们在第10章中所见。我们看到这种方法特别适合用于消费输入流并生成派生状态或派生输出流的流处理系统。
关于流的来源,我们讨论了几种可能性:用户活动事件、传感器提供的周期性读数以及数据源(例如金融中的市场数据)自然地表示为流。我们还看到,将数据库写入视为流也很有用。我们可以捕获变更日志——数据库所有变更的历史——要么隐式地通过 CDC,要么显式地通过事件溯源。日志压缩允许流保留数据库内容的完整副本。
将数据库表示为流为系统集成开辟了强大的机会。通过消费变更日志并将其应用于派生系统,你可以保持派生数据系统(如搜索索引、缓存和分析系统)持续更新。你甚至可以通过从头开始并消费从开始到现在的整个变更日志,在现有数据上构建全新的视图。
用于将状态作为流维护和重放消息的设施,也是支撑各种流处理框架中流连接和容错技术的基础。我们讨论了流处理的几个目的,包括搜索事件模式(复杂事件处理)、计算窗口聚合(流分析)以及保持派生数据系统更新(物化视图)。
然后,我们讨论了在流处理器中推理时间的困难,包括处理时间与事件时间戳之间的区别,以及处理在你认为窗口已完成后到达的延迟事件(乱序事件)的问题。
| 530 |
第12章:流处理
我们区分了流处理中可能出现的三种连接类型:
- 流–流连接:两个输入流均由活动事件组成,连接运算符在某个时间窗口内搜索相关事件。例如,连接运算符可能匹配同一用户30分钟内执行的两个操作。如果希望在同一个流中查找相关事件,两个连接输入实际上可以是同一个流(自连接)。
- 流–表连接:一个输入流由活动事件组成,另一个是数据库变更日志。变更日志使数据库的本地副本保持最新。对于每个活动事件,连接运算符查询数据库并输出一个增强的活动事件。
- 表–表连接:两个输入流都是数据库变更日志。在这种情况下,每一侧发生的每次变更都与另一侧的最新状态进行连接。结果是两个表之间连接的物化视图的变更流。
最后,我们讨论了在流处理器中实现容错和恰好一次语义的技术。与批处理类似,我们需要丢弃任何失败任务的部分输出。然而,由于流处理运行时间长且持续产生输出,我们不能简单地丢弃所有输出。相反,可以使用更细粒度的恢复机制,基于微批处理、检查点、事务或幂等写入。
参考文献
[1] Tyler Akidau, Robert Bradshaw, Craig Chambers, Slava Chernyak, Rafael J. Fernández-Moctezuma, Reuven Lax, Sam McVeety, Daniel Mills, Frances Perry, Eric Schmidt, and Sam Whittle. “The Dataflow Model: A Practical Approach to Balancing Correctness, Latency, and Cost in Massive-Scale, Unbounded, Out-of-Order Data Processing.” Proceedings of the VLDB Endowment, volume 8, issue 12, pages 1792–1803, August 2015. doi:10.14778/2824032.2824076
[2] Harold Abelson, Gerald Jay Sussman, and Julie Sussman. Structure and Interpretation of Computer Programs, 2nd edition. MIT Press, 1996. ISBN: 9780262510875. Archived at archive.org
[3] Patrick Th. Eugster, Pascal A. Felber, Rachid Guerraoui, and Anne-Marie Kermarrec. “The Many Faces of Publish/Subscribe.” ACM Computing Surveys, volume 35, issue 2, pages 114–131, June 2003. doi:10.1145/857076.857078
[4] Don Carney, Uğur Çetintemel, Mitch Cherniack, Christian Convey, Sangdon Lee, Greg Seidman, Michael Stonebraker, Nesime Tatbul, and Stan Zdonik. “Monitoring Streams—A New Class of Data Management Applications.” At 28th International Conference on Very Large Data Bases (VLDB), August 2002. doi:10.1016/B978-155860869-6/50027-5
[5] Matthew Sackman. “Pushing Back.” wellquite.org, May 2016. Archived at perma.cc/3KCZ-RUFY
[6] Thomas Figg (tef). “How (Not) to Write a Pipeline.” cohost.org, June 2023. Archived at perma.cc/A3V8-NYCM
[7] Vicent Martí. “Brubeck, a statsd-Compatible Metrics Aggregator.” github.blog, June 2015. Archived at perma.cc/TP3Q-DJYM
[8] Seth Lowenberger. “MoldUDP64 Protocol Specification V 1.00.” nasdaqtrader.com, July 2009. Archived at perma.cc/7CRQ-QBD7
[9] Ian Malpass. “Measure Anything, Measure Everything.” codeascraft.com, February 2011. Archived at archive.org
[10] Dieter Plaetinck. “25 Graphite, Grafana and statsd Gotchas.” grafana.com, March 2016. Archived at perma.cc/3NP3-67U7
[11] Jeff Lindsay. “Web Hooks to Revolutionize the Web.” progrium.com, May 2007. Archived at perma.cc/BF9U-XNX4
[12] Jim N. Gray. “Queues Are Databases.” Microsoft Research Technical Report MSR-TR-95-56, December 1995. Archived at arxiv.org
[13] Mark Hapner, Rich Burridge, Rahul Sharma, Joseph Fialli, Kate Stout, and Nigel Deakin. “JSR-343 Java Message Service (JMS) 2.0 Specification.” jms-spec.java.net, March 2013. Archived at perma.cc/E4YG-46TA
[14] Sanjay Aiyagari, Matthew Arrott, Mark Atwell, Jason Brome, Alan Conway, Robert Godfrey, Robert Greig, Pieter Hintjens, John O’Hara, Matthias Radestock, Alexis Richardson, Martin Ritchie, Shahrokh Sadjadi, Rafael Schloming, Steven Shaw, Martin Sustrik, Carl Trieloff, Kim van der Riet, and Steve Vinoski. “AMQP: Advanced Message Queuing Protocol Specification.” Version 0-9-1, November 2008. Archived at perma.cc/6YJJ-GM9X
[15] “Architectural Overview of Pub/Sub.” cloud.google.com, 2025. Archived at perma.cc/VWF5-ABP4
[16] Aris Tzoumas. “Lessons from Scaling PostgreSQL Queues to 100k Events Per Second.” rudderstack.com, July 2025. Archived at perma.cc/QD8C-VA4Y
[17] Robin Moffatt. “Kafka Connect Deep Dive—Error Handling and Dead Letter Queues.” confluent.io, March 2019. Archived at perma.cc/KQ5A-AB28
[18] Dunith Danushka. ” Message Reprocessing: How to Implement the Dead Letter Queue.” redpanda.com. Archived at perma.cc/R7UB-WEWF
[19] Damien Gasparina, Loic Greffier, and Sebastien Viale. “KIP-1034: Dead Letter Queue in Kafka Streams.” cwiki.apache.org, April 2024. Archived at perma.cc/3VXV-QXAN
[20] Jay Kreps, Neha Narkhede, and Jun Rao. “Kafka: A Distributed Messaging System for Log Processing.” At 6th International Workshop on Networking Meets Databases (NetDB), June 2011. Archived at perma.cc/CSW7-TCQ5
[21] Jay Kreps. “Benchmarking Apache Kafka: 2 Million Writes Per Second (On Three Cheap Machines).” engineering.linkedin.com, April 2014. Archived at archive.org
[22] Kartik Paramasivam. “How We’re Improving and Advancing Kafka at LinkedIn.” engineering.linkedin.com, September 2015. Archived at perma.cc/3S3V-JCYJ
[23] Philippe Dobbelaere and Kyumars Sheykh Esmaili. “Kafka Versus RabbitMQ: A Comparative Study of Two Industry Reference Publish/Subscribe Implementations.” At 11th ACM International Conference on Distributed and Event-Based Systems (DEBS), June 2017. doi:10.1145/3093742.3093908
[24] Kate Holterhoff. “Why Message Queues Endure: A History.” redmonk.com, December 2024. Archived at perma.cc/6DX8-XK4W
[25] Andrew Schofield. “KIP-932: Queues for Kafka.” cwiki.apache.org, May 2023. Archived at perma.cc/LBE4-BEMK
[26] Jack Vanlightly. “The Advantages of Queues on Logs.” jack-vanlightly.com, October 2023. Archived at perma.cc/WJ7V-287K
[27] Jay Kreps. “The Log: What Every Software Engineer Should Know About Real-Time Data’s Unifying Abstraction.” engineering.linkedin.com, December 2013. Archived at perma.cc/2JHR-FR64
[28] Andy Hattemer. “Change Data Capture Is Having a Moment. Why?” materialize.com, September 2021. Archived at perma.cc/AL37-P53C
[29] Prem Santosh Udaya Shankar. “Streaming MySQL Tables in Real-Time to Kafka.” engineeringblog.yelp.com, August 2016. Archived at perma.cc/5ZR3-2GVV
[30] Andreas Andreakis, Ioannis Papapanagiotou. “DBLog: A Watermark Based Change-Data-Capture Framework.” Archived at arXiv:2010.12597, October 2020.
[31] Jiri Pechanec. “Percolator.” debezium.io, October 2021. Archived at perma.cc/EQ8E-W6KQ
[32] Debezium maintainers. “Debezium Connector for Cassandra.” debezium.io. Archived at perma.cc/WR6K-EKMD
[33] Neha Narkhede. “Announcing Kafka Connect: Building Large-Scale Low-Latency Data Pipelines.” confluent.io, February 2016. Archived at perma.cc/8WXJ-L6GF
[34] Chris Riccomini. “Kafka Change Data Capture Breaks Database Encapsulation.” cnr.sh, November 2018. Archived at perma.cc/P572-9MKF
[35] Gunnar Morling. “‘Change Data Capture Breaks Encapsulation’. Does It, Though?” decodable.co, November 2023. Archived at perma.cc/YX2P-WNWR
[36] Gunnar Morling. “Revisiting the Outbox Pattern.” decodable.co, October 2024. Archived at perma.cc/M5ZL-RPS9
[37] Ashish Gupta and Inderpal Singh Mumick. “Maintenance of Materialized Views: Problems, Techniques, and Applications.” IEEE Data Engineering Bulletin, volume 18, issue 2, pages 3–18, June 1995. Archived at archive.org
[38] Mihai Budiu, Tej Chajed, Frank McSherry, Leonid Ryzhyk, and Val Tannen. “DBSP: Incremental Computation on Streams and Its Applications to Databases.” SIGMOD Record, volume 53, issue 1, pages 87–95, March 2024. doi:10.1145/3665252.3665271
[39] Jim Gray and Andreas Reuter. Transaction Processing: Concepts and Techniques. Morgan Kaufmann, 1992. ISBN: 9781558601901
[40] Martin Kleppmann. “Accounting for Computer Scientists.” martin.kleppmann.com, March 2011. Archived at perma.cc/9EGX-P38N
[41] Pat Helland. “Immutability Changes Everything.” At 7th Biennial Conference on Innovative Data Systems Research (CIDR), January 2015. Archived at perma.cc/33WX-3669
[42] Martin Kleppmann. Making Sense of Stream Processing. Report, O’Reilly Media, May 2016. Archived at perma.cc/RAY4-JDVX
[43] Kartik Paramasivam. “Stream Processing Hard Problems—Part 1: Killing Lambda.” engineering.linkedin.com, June 2016. Archived at archive.org
[44] Stéphane Derosiaux. “CQRS: What? Why? How?” sderosiaux.medium.com, September 2019. Archived at perma.cc/FZ3U-HVJ4
[45] Baron Schwartz. “Immutability, MVCC, and Garbage Collection.” xaprb.com, December 2013. Archived at archive.org
[46] Daniel Eloff, Slava Akhmechet, Jay Kreps, et al. “Re: Turning the Database Inside-out with Apache Samza.” Hacker News discussion, news.ycombinator.com, March 2015. Archived at perma.cc/ML9E-JC83
[47] Cognitect, Inc. “Datomic Documentation: Excision.” docs.datomic.com. Archived at perma.cc/J5QQ-SH32
第12章:流处理
[48] “Fossil Documentation: Deleting Content from Fossil.” fossil-scm.org, 2025. Archived at perma.cc/DS23-GTNG
[49] Jay Kreps. “The irony of distributed systems is that data loss is really easy but deleting data is surprisingly hard.” x.com, March 2015. Archived at perma.cc/7RRZ-V7B7
[50] Brent Robinson. “Crypto Shredding: How It Can Solve Modern Data Retention Challenges.” medium.com, January 2019. Archived at perma.cc/4LFK-S6XE
[51] Matthew D. Green and Ian Miers. “Forward Secure Asynchronous Messaging from Puncturable Encryption.” At IEEE Symposium on Security and Privacy, May 2015. doi:10.1109/SP.2015.26
[52] David C. Luckham. “What’s the Difference Between ESP and CEP?” complexevents.com, June 2019. Archived at perma.cc/E7PZ-FDEF
[53] Arvind Arasu, Shivnath Babu, and Jennifer Widom. “The CQL Continuous Query Language: Semantic Foundations and Query Execution.” The VLDB Journal, volume 15, issue 2, pages 121–142, June 2006. doi:10.1007/s00778-004-0147-z
[54] Julian Hyde. “Data in Flight: How Streaming SQL Technology Can Help Solve the Web 2.0 Data Crunch.” ACM Queue, volume 7, issue 11, December 2009. doi:10.1145/1661785.1667562
[55] Philippe Flajolet, Éric Fusy, Olivier Gandouet, and Frédéric Meunier. “HyperLogLog: The Analysis of a Near-Optimal Cardinality Estimation Algorithm.” At Conference on Analysis of Algorithms (AofA), June 2007. doi:10.46298/dmtcs.3545
[56] Jay Kreps. “Questioning the Lambda Architecture.” oreilly.com, July 2014. Archived at perma.cc/2WY5-HC8Y
[57] Ian Reppel. “An Overview of Apache Streaming Technologies.” ianreppel.org, March 2016. Archived at perma.cc/BB3E-QJLW
[58] Jay Kreps. “Why Local State Is a Fundamental Primitive in Stream Processing.” oreilly.com, July 2014. Archived at perma.cc/P8HU-R5LA
[59] RisingWave Labs. “Deep Dive into the RisingWave Stream Processing Engine —Part 2: Computational Model.” risingwave.com, November 2023. Archived at perma.cc/LM74-XDEL
[60] Frank McSherry, Derek G. Murray, Rebecca Isaacs, and Michael Isard. “Differential Dataflow.” At 6th Biennial Conference on Innovative Data Systems Research (CIDR), January 2013. Archived at perma.cc/T83W-ZBR2
[61] Andy Hattemer. “Incremental Computation in the Database.” materialize.com, March 2020. Archived at perma.cc/AL94-YVRN
摘要 | 535
[62] Shay Banon. “Percolator.” elastic.co, February 2011. Archived at perma.cc/LS5R-4FQX
[63] Alan Woodward and Martin Kleppmann. “Real-Time Full-Text Search with Luwak and Samza.” martin.kleppmann.com, April 2015. Archived at perma.cc/2U92-Q7R4
[64] Tyler Akidau. “The World Beyond Batch: Streaming 102.” oreilly.com, January 2016. Archived at perma.cc/4XF9-8M2K
[65] Stephan Ewen. “Streaming Analytics with Apache Flink.” At Kafka Summit, April 2016. Archived at perma.cc/QBQ4-F9MR
[66] Tyler Akidau, Alex Balikov, Kaya Bekiroğlu, Slava Chernyak, Josh Haberman, Reuven Lax, Sam McVeety, Daniel Mills, Paul Nordstrom, and Sam Whittle. “MillWheel: Fault-Tolerant Stream Processing at Internet Scale.” Proceedings of the VLDB Endowment, volume 6, issue 11, pages 1033–1044, August 2013. doi:10.14778/2536222.2536229
[67] Alex Dean. “Improving Snowplow’s Understanding of Time.” snowplow.io, September 2015. Archived at perma.cc/6CT9-Z3Q2
[68] “Azure Stream Analytics: Windowing Functions.” Microsoft Azure Reference, learn.microsoft.com, July 2025. Archived at archive.org
[69] Rajagopal Ananthanarayanan, Venkatesh Basker, Sumit Das, Ashish Gupta, Haifeng Jiang, Tianhao Qiu, Alexey Reznichenko, Deomid Ryabkov, Manpreet Singh, and Shivakumar Venkataraman. “Photon: Fault-Tolerant and Scalable Joining of Continuous Data Streams.” At ACM International Conference on Management of Data (SIGMOD), June 2013. doi:10.1145/2463676.2465272
[70] Ben Kirwin. “Doing the Impossible: Exactly-Once Messaging Patterns in Kafka.” ben.kirw.in, November 2014. Archived at perma.cc/A5QL-QRX7
[71] Pat Helland. “Data on the Outside Versus Data on the Inside.” At 2nd Biennial Conference on Innovative Data Systems Research (CIDR), January 2005. Archived at perma.cc/K9AH-LQPS
[72] Ralph Kimball and Margy Ross. The Data Warehouse Toolkit: The Definitive Guide to Dimensional Modeling, 3rd edition. John Wiley & Sons, 2013. ISBN: 9781118530801
[73] Viktor Klang. “I’m coining the phrase ‘effectively-once’ for message processing with at-least-once + idempotent operations.” x.com, October 2016. Archived at perma.cc/7DT9-TDG2
536 | 第12章:流处理
[74] Matei Zaharia, Tathagata Das, Haoyuan Li, Scott Shenker, and Ion Stoica. “Discretized Streams: An Efficient and Fault-Tolerant Model for Stream Processing on Large Clusters.” At 4th USENIX Conference in Hot Topics in Cloud Computing (HotCloud), June 2012.
[75] Kostas Tzoumas, Stephan Ewen, and Robert Metzger. “High-Throughput, Low-Latency, and Exactly-Once Stream Processing with Apache Flink.” ververica.com, August 2015. Archived at archive.org
[76] Paris Carbone, Gyula Fóra, Stephan Ewen, Seif Haridi, and Kostas Tzoumas. “Lightweight Asynchronous Snapshots for Distributed Dataflows.” arXiv:1506.08603, June 2015.
[77] Ryan Betts and John Hugg. Fast Data: Smart and at Scale. Report, O’Reilly Media, October 2015. Archived at perma.cc/VQ6S-XQQY
[78] Neha Narkhede and Guozhang Wang. “Exactly-Once Semantics Are Possible: Here’s How Kafka Does It.” confluent.io, June 2019. Archived at perma.cc/Q2AU-Q2ED
[79] Jason Gustafson, Flavio Junqueira, Apurva Mehta, Sriram Subramanian, and Guozhang Wang. “KIP-98—Exactly Once Delivery and Transactional Messaging.” cwiki.apache.org, November 2016. Archived at perma.cc/95PT-RCTG
[80] Pat Helland. “Idempotence Is Not a Medical Condition.” Communications of the ACM, volume 55, issue 5, pages 56–65, May 2012. doi:10.1145/2160718.2160734
[81] Jay Kreps. “Re: Trying to Achieve Deterministic Behavior on Recovery/Rewind.” Email to samza-dev mailing list, September 2014. Archived at perma.cc/7DPD-GJNL
[82] E. N. (Mootaz) Elnozahy, Lorenzo Alvisi, Yi-Min Wang, and David B. Johnson. “A Survey of Rollback-Recovery Protocols in Message-Passing Systems.” ACM Computing Surveys, volume 34, issue 3, pages 375–408, September 2002. doi:10.1145/568522.568525
[83] Adam Warski. “Kafka Streams—How Does It Fit the Stream Processing Landscape?” softwaremill.com, June 2016. Archived at perma.cc/WQ5Q-H2J2
[84] Stephan Ewen, Fabian Hueske, and Xiaowei Jiang. “Batch as a Special Case of Streaming and Alibaba’s contribution of Blink.” flink.apache.org, February 2019. Archived at perma.cc/A529-SKA9
摘要 | 537
图片上下文
[Image 11261 on Page 517]
[Image 11275 on Page 518]
[Image 11307 on Page 520]
[Image 11433 on Page 526]
[Image 11455 on Page 527]
[Image 11497 on Page 529]
[Image 11571 on Page 533]
[Image 11791 on Page 544]
[Image 105 on Page 549]
图片内容分析(由视觉模型提取)
- 第517页,图片1:该图片无法加载,因此无法分析其内容。
- 第518页,图片1:图片无法显示,无法分析其内容。
- 第520页,图片1:图片内容无法识别。
- 第526页,图片1:图片未能显示,无法分析。
- 第527页,图片1:图片内容无法获取,无法进行分析。
- 第529页,图片1:由于图片无法加载,无法分析其具体内容。请提供可访问的图片。
- 第533页,图片1:图片内容为空,无法分析。
- 第544页,图片1:图片内容无法识别,请重新上传。
- 第549页,图片1:图片无法显示,根据上下文推测为流处理相关技术示意图或流程图。