05 触发器 Trigger:ProcessingTime、Once、AvailableNow 与 Continuous

摘要

Trigger(触发器)决定了 Structured Streaming 查询何时启动下一个 MicroBatch——是按固定时间间隔、还是数据就绪时立即触发、还是只运行一次后退出。四种 Trigger 类型覆盖了从实时流处理(秒级批次)到离线增量处理(一次性消费积压)的全部场景。本文深度讲解每种 Trigger 的触发逻辑、与 MicroBatch 执行模型的交互、延迟与吞吐量的权衡,以及 Spark 3.3 新增的 AvailableNow 如何解决 Once 触发器的根本缺陷(单批次内存压力)、成为定时增量 ETL 场景的最佳选择。


第 1 章 Trigger 的本质:控制批次启动节奏

1.1 Trigger 在 MicroBatch 循环中的位置

第 01 篇讲到,MicroBatch 执行模型是一个无限循环:等待 Trigger → 读取 Offset → 执行 Job → 提交 Commit → 再等待 Trigger。Trigger 控制的是”等待”这一步的行为:

┌──────────────────────────────────────────────────────┐
│               MicroBatch 触发循环                      │
│                                                      │
│  [等待 Trigger 条件] ←──────────────────────┐         │
│         │                                  │         │
│         ↓                                  │         │
│  [获取 Source Offset]                       │         │
│         │                                  │         │
│         ↓                                  │         │
│  [执行 Spark Job]                           │         │
│         │                                  │         │
│         ↓                                  │         │
│  [写 Commit 文件] ──────────────────────────┘         │
└──────────────────────────────────────────────────────┘

不同 Trigger 对”等待 Trigger 条件”步骤的实现:

  • ProcessingTime:计时器,等满指定间隔后触发
  • Once:不等待,立即触发一次,完成后退出整个循环
  • AvailableNow:不等待,立即触发,但可以触发多次直到处理完所有积压数据后退出
  • Continuous:不使用 MicroBatch 循环,走完全不同的 Continuous Processing 路径

第 2 章 ProcessingTime:生产实时流的默认选择

2.1 ProcessingTime 的触发逻辑

# 每 10 秒触发一次
query = df.writeStream \
    .trigger(processingTime="10 seconds") \
    .format("kafka") \
    .start()
 
# 等价写法
from pyspark.sql.streaming import Trigger
query = df.writeStream \
    .trigger(Trigger.ProcessingTime("10 seconds")) \
    .start()
 
# 特殊值:processingTime="0 seconds"(尽可能快地触发,上一批次完成立即开始下一批次)
query = df.writeStream \
    .trigger(processingTime="0 seconds") \
    .start()

计时逻辑:计时从上一批次完成时开始(不是从上一批次开始时)。如果批次执行时间 > 触发间隔(如批次执行需要 15 秒,但间隔只有 10 秒),Spark 不会启动重叠的批次——而是在上一批次完成后立即启动下一批次(不再等待剩余的间隔时间)。

场景:processingTime="10 seconds",批次执行时间 15 秒

T=0:   批次 0 开始
T=15:  批次 0 完成(超过了 10 秒间隔)
T=15:  批次 1 立即开始(不再等 10 秒,因为已经"欠"了 5 秒)
T=30:  批次 1 完成
T=40:  批次 2 开始(等满了 10 秒)
...

2.2 批次间隔的选择原则

选择合适的批次间隔是吞吐量与延迟的核心权衡:

间隔过小的代价

  • 每个批次读取的数据量少,批次启动/结束的固定开销(Task 调度、Checkpoint 写入)占比上升
  • 批次间隔 < 批次执行时间时,系统陷入”追赶模式”,Kafka Lag 持续增长
  • 频繁的 State Store checkpoint 操作(每批次都触发 State 写入)IO 压力大

间隔过大的代价

  • 端到端延迟增大(事件需要等待更长时间才被处理)
  • 单批次数据量大,内存压力高,可能 Spill

经验法则

场景推荐批次间隔
实时告警、监控仪表盘1-5 秒
实时数仓(准实时)30 秒 - 5 分钟
近实时 ETL5-15 分钟
高吞吐 ETL(延迟不敏感)15-60 分钟

第 3 章 Once:单次执行的历史包袱

3.1 Once 的用途与行为

# 只执行一次 MicroBatch,然后退出
query = df.writeStream \
    .trigger(once=True) \
    .format("parquet") \
    .option("checkpointLocation", "/ckpt/") \
    .start()
query.awaitTermination()  # 等待这一次执行完成

Once 触发器的行为:启动后立即触发一个批次,从 Checkpoint 记录的上次 Offset 到当前 Kafka 最新 Offset 的所有数据,一次性处理完后退出。下次调用时,从上次的 endOffset 继续。

设计初衷:用于定时批量处理场景——不想维持一个长期运行的流查询,而是通过定时调度(如 cron job 或 Airflow)周期性地启动 Spark 作业,每次消费上次到现在的增量数据。

3.2 Once 的根本缺陷

Once 触发器的致命问题:将从上次 Offset 到当前 Kafka 最新 Offset 的所有积压数据放入单个 MicroBatch

如果上次运行是昨天,今天触发时 Kafka 中积累了数亿条消息,Once 会尝试在一个批次内处理所有数亿条消息——这往往导致:

  1. OOM(内存溢出):单批次数据量超过 Executor 内存
  2. State Store 压力:有状态查询在单批次内需要处理海量状态更新
  3. 运行时间极长:本应分散到多批次的处理集中到一个批次

这就是 AvailableNow 被引入的原因。


第 4 章 AvailableNow:定时增量 ETL 的最佳选择

4.1 AvailableNow 是什么,为什么出现

AvailableNow(Spark 3.3 引入)解决了 Once 的单批次大数据量问题:它也会在处理完所有当前可用数据后退出,但分多个批次逐步处理,而不是一个巨型批次:

# Spark 3.3+
query = df.writeStream \
    .trigger(availableNow=True) \
    .option("maxOffsetsPerTrigger", 1000000) \  # 每批最多 100 万条
    .format("parquet") \
    .option("checkpointLocation", "/ckpt/") \
    .start()
query.awaitTermination()

行为

  1. 查询启动时,记录当前各 Source 的最新 Offset(称为 “termination offset”)
  2. 开始执行 MicroBatch,每批次按 maxOffsetsPerTrigger 的限制处理一部分数据
  3. 当所有 Source 都处理到 termination offset 时,查询自动停止
Kafka 中积压了 1000 万条消息,maxOffsetsPerTrigger=100 万

AvailableNow 执行过程:
  批次 0:处理 Offset [0, 100万)
  批次 1:处理 Offset [100万, 200万)
  ...
  批次 9:处理 Offset [900万, 1000万)
  → 处理完成,自动退出

Once 执行过程:
  批次 0:处理所有 Offset [0, 1000万)  ← 单批次 1000 万条,可能 OOM
  → 处理完成,退出

4.2 AvailableNow 与 Once 的对比

维度OnceAvailableNow
引入版本Spark 2.0Spark 3.3
批次数量1 个批次(所有积压)多个批次(受 maxOffsets 控制)
内存压力高(积压越多越危险)可控(每批次固定大小)
State 更新单批次海量更新分批次渐进更新
失败恢复失败需从头重跑(整批次)失败从最近完成的批次续跑
适用场景积压量小且可控的场景所有定时增量 ETL 场景

设计哲学

AvailableNow 本质上把”定时流处理”的两个需求解耦:执行有界(处理完当前积压就退出,不持续监听)和批次有界(每个 MicroBatch 数据量可控)。这使它成为在 Airflow/DolphinScheduler 中调度 Spark 流处理作业的首选模式——定时触发、自动退出、不会因积压而 OOM。

4.3 生产中的 AvailableNow 最佳实践

# 完整的 AvailableNow 生产模式
spark = SparkSession.builder \
    .config("spark.sql.shuffle.partitions", "200") \
    .getOrCreate()
 
# 读取 Kafka
df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "broker:9092") \
    .option("subscribe", "events") \
    .option("startingOffsets", "earliest") \
    .option("maxOffsetsPerTrigger", 2000000) \  # 每批最多 200 万条
    .load()
 
# 业务逻辑
result = df.selectExpr("CAST(value AS STRING)") \
    .filter(...) \
    .groupBy(...) \
    .agg(...)
 
# AvailableNow 触发器
query = result.writeStream \
    .trigger(availableNow=True) \
    .outputMode("append") \
    .format("delta") \
    .option("checkpointLocation", "s3://bucket/checkpoint/events-etl/") \
    .option("path", "s3://bucket/output/events/") \
    .start()
 
# 等待执行完成后退出(Airflow 任务结束)
query.awaitTermination()

第 5 章 默认触发器(无 Trigger 配置)

当不调用 .trigger() 方法时,Structured Streaming 使用默认触发器,等价于 processingTime="0 seconds"——上一批次完成后尽可能快地启动下一批次(不等待任何间隔)。

# 等价写法
query = df.writeStream.format("console").start()
# 等价于:
query = df.writeStream.trigger(processingTime="0 seconds").format("console").start()

这种模式在数据持续涌入的场景下,每批次处理尽量多的数据,吞吐量最高,但 CPU 和 I/O 负载也最高(持续高强度运行)。适合吞吐量优先、延迟不是首要约束的场景。


小结

四种 Trigger 各有其精确适用场景:

  • ProcessingTime:生产实时流处理的默认选择,通过批次间隔平衡延迟与吞吐量;间隔选择依据是”批次执行时间的 2-3 倍”作为上限
  • Once:历史遗留,定时增量 ETL 场景;积压量不可控时有 OOM 风险,新项目优先用 AvailableNow
  • AvailableNow(Spark 3.3+):定时增量 ETL 的最佳实践;多批次消费积压、内存可控、失败可续跑;配合 Airflow 是理想的”流批一体”调度模式
  • Continuous:毫秒级延迟的实验性模式,不支持聚合/状态,生产中慎用
  • 默认(无 Trigger):等价于 processingTime="0 seconds",最高吞吐,持续全力运行

第 06 篇深入三种窗口的实现机制:滚动窗口(Tumbling Window)、滑动窗口(Sliding Window)与会话窗口(Session Window)在 State Store 中的存储结构,以及窗口状态的清理时机。


思考题

  1. ProcessingTime("0 seconds") 触发器会在上一个批次处理完成后立即启动下一个批次,实现”尽可能快”的处理节奏。但这会导致 Driver 端的调度循环持续高负载。在数据源速率不稳定的场景下(忽高忽低),这种触发模式与 ProcessingTime("5 seconds") 相比,在资源利用率和延迟上有什么差异?
  2. AvailableNow 触发器会在启动时一次性处理所有当前可用数据,然后自动停止。这看起来像是一次全量批处理,但它是作为流作业运行的。与直接用 Spark 批处理读取相同数据相比,AvailableNow 有哪些额外优势(如 Checkpoint 记录进度、精确一次语义)?在什么场景下应该选择 AvailableNow 而不是直接批处理?
  3. Continuous 触发器使用长时间运行的 Task(Epoch-based 执行模型),而不是 MicroBatch 的短 Task 模式。这意味着 Continuous 模式的 Task 失败后,必须从上一个 Epoch Checkpoint 重放数据,而不能像 MicroBatch 那样只重试单个批次。在 Continuous 模式下,如何平衡 Epoch 间隔(Checkpoint 粒度)与故障恢复代价之间的关系?

参考资料