摘要:

Apache HudiIncremental Query(增量查询) 是它区别于所有竞争对手的核心能力,也是”Hudi 是增量处理引擎”这一定位的最直接体现。它让下游 Spark 任务可以精确查询”自上次处理以来,哪些记录发生了变化”——而不是每次都扫描整张表。这一能力将全量 ETL 的算力消耗从”扫描全量 10TB 数据”缩减为”只处理过去 1 小时变化的 500MB 数据”,在大规模数据管道中通常带来 10-100 倍的计算量减少。本文深入剖析增量查询的实现机制、Checkpoint 状态管理、与 Spark Structured Streaming 的集成,以及在实际 CDC 增量 ETL 管道中的完整工程实践。


第 1 章 增量消费的本质需求

1.1 全量 ETL 的不可持续性

传统数据湖 ETL 管道的典型模式是全量覆盖:每天(或每小时)将整张业务表全量同步到数据湖,下游各层加工也是全量重算。这在数据量小的时候无关紧要,但随着数据规模增长,全量模式的代价变得不可持续:

案例:电商平台的订单表
  - 历史总记录:100 亿条(约 10TB)
  - 每小时新增/更新:500 万条(约 500MB)
  - 更新比例:只有 0.05% 的数据在一小时内发生了变化

全量 ETL(每小时运行):
  - 每次需要读取:10TB 数据
  - 每次计算量:10TB 的扫描 + 全量 JOIN/聚合
  - 每小时 Spark 集群成本:~$500(100 个节点 × 1 小时)
  - 数据新鲜度延迟:处理 10TB 数据需要 45-60 分钟

增量 ETL(每小时运行,基于 Hudi Incremental Query):
  - 每次需要读取:500MB(只有变化的数据)
  - 每次计算量:500MB 的扫描 + 增量 JOIN/聚合
  - 每小时 Spark 集群成本:~$5(相当于 1/100 的成本)
  - 数据新鲜度延迟:处理 500MB 数据需要 2-3 分钟

这就是增量处理的核心价值:只处理变化的数据,计算量与变化量成正比,而不是与历史总量成正比

1.2 为什么 Delta Lake 的 CDF 不是同等能力

Delta Lake 在 1.x 版本后也引入了 Change Data Feed(CDF),表面上提供类似的增量消费能力。但两者有几个重要差异:

差异 1:设计时间点不同

Hudi 的 Incremental Query 是从设计之初就内置的能力(2016 年),整个文件布局(Timeline、Commit 元数据中的文件级统计)都为增量查询设计。Delta Lake 的 CDF 是 2022 年才正式 GA 的后加功能,需要在建表时显式开启(delta.enableChangeDataFeed = true),且会额外写入 _change_data 目录存储变更日志——这意味着额外的存储开销。

差异 2:语义的细微差别

Hudi Incremental Query 返回的是:在时间范围 [T1, T2] 内,每条记录最新的状态(如果一条记录在这段时间内被更新了 3 次,只返回第 3 次的结果)。

Delta Lake CDF 返回的是:所有变更操作的明细,包括 insertupdate_preimageupdate_postimagedelete(类似 binlog 的行级变更流水)。

差异 3:对现有数据的影响

Hudi Incremental Query 对已有数据无任何影响,查询时通过扫描 Timeline 和过滤 _hoodie_commit_time 字段实现。

Delta Lake CDF 需要从开启 CDF 的那个版本开始才有变更日志,且关闭 CDF 会丢失历史变更记录。

两种增量消费语义的使用场景

如果下游需要的是”某时间段内每条记录的最新快照”(如:每小时刷新 DWD 层)→ Hudi Incremental Query 更自然 如果下游需要的是”完整的变更流水”(如:实时数仓的 SCD-2、CDC 审计日志)→ Delta Lake CDF 更合适


第 2 章 增量查询的技术实现

2.1 三个内置元字段

Hudi 在每条记录写入时,自动注入三个元字段,这些字段是增量查询的物理基础:

_hoodie_commit_time   : 该记录最后一次被写入(INSERT 或 UPDATE)的 Commit 时间戳
                        例:20240101120000000
                        作用:增量查询的过滤条件

_hoodie_record_key    : 记录的 recordKey(主键值)
                        例:trip_id=12345
                        作用:Index Lookup 和记录去重

_hoodie_partition_path: 记录所在的分区路径
                        例:date=2024-01-01
                        作用:分区路由

当你执行 SELECT * FROM trips,会自动看到这三个字段加上你的业务字段。

2.2 增量查询的 SQL 语法

Spark SQL 方式

-- Step 1:注册表(指定为增量查询模式)
CREATE TABLE trips_incremental
USING hudi
OPTIONS (
  path         = 's3://bucket/trips/',
  queryType    = 'incremental',      -- 关键:声明增量查询模式
  beginInstant = '20240101110000000', -- 从这个 Commit 时间点之后开始(不含)
  endInstant   = '20240101120000000'  -- 到这个 Commit 时间点(含),可省略(省略则到最新)
);
 
-- Step 2:查询(无需额外条件,Hudi 内部自动注入 _hoodie_commit_time 过滤)
SELECT trip_id, status, fare, _hoodie_commit_time
FROM trips_incremental;
 
-- 输出:只包含在 11:00 ~ 12:00 之间被写入的记录(最新版本)

Spark DataFrame API 方式(更常用于程序化场景):

# 读取增量数据(从上次 Checkpoint 之后)
last_checkpoint = read_checkpoint_from_state_store()  # 从状态存储读取上次处理的位置
 
incremental_df = spark.read.format("hudi") \
    .option("hoodie.datasource.query.type", "incremental") \
    .option("hoodie.datasource.read.begin.instanttime", last_checkpoint) \
    # 可选:指定结束时间(不指定则读到最新)
    # .option("hoodie.datasource.read.end.instanttime", "20240101130000000") \
    .load("s3://bucket/trips/")
 
# incremental_df 只包含 last_checkpoint 之后被写入的记录
print(f"增量记录数: {incremental_df.count()}")
print(f"时间范围: {incremental_df.agg({'_hoodie_commit_time': 'min'}).collect()[0][0]}"
      f" ~ {incremental_df.agg({'_hoodie_commit_time': 'max'}).collect()[0][0]}")
 
# 处理完成后,更新 Checkpoint
latest_commit = incremental_df.agg({"_hoodie_commit_time": "max"}).collect()[0][0]
write_checkpoint_to_state_store(latest_commit)

2.3 增量查询的内部执行逻辑

理解增量查询如何工作,需要深入到 Spark 的物理执行层:

用户执行:
spark.read.format("hudi")
    .option("queryType", "incremental")
    .option("beginInstant", "20240101110000000")
    .load("s3://bucket/trips/")

Hudi 内部执行步骤:

Step 1:Timeline 扫描
  读取 s3://bucket/trips/.hoodie/ 目录
  找到所有 timestamp > "20240101110000000" 的 COMPLETED Commit/DeltaCommit

Step 2:文件列表构建
  从每个相关 Commit 的元数据中,提取写入的文件列表
  对 MoR 表:包含 Parquet 基础文件和 .log 文件
  对 CoW 表:只包含 Parquet 文件

Step 3:构建 Spark 的 FileScanRDD
  只扫描上述文件(而不是整张表的所有文件)
  这是性能优化的关键:文件集合大幅缩小

Step 4:谓词下推(Predicate Pushdown)
  Hudi 自动在 Parquet 层注入过滤条件:
    _hoodie_commit_time > '20240101110000000'
  Parquet 的列统计(Column Statistics)可以进一步过滤文件内的 Row Group

Step 5:输出
  只返回 _hoodie_commit_time 在范围内的记录
  这些记录代表在时间范围内被最后一次写入的状态

性能优化关键点:增量查询的效率主要来自 Step 3 的文件列表缩小——如果过去 1 小时只有 10 个文件被写入,增量查询只需扫描这 10 个文件,而不是整张表的 10000 个文件。


第 3 章 Checkpoint 状态管理

3.1 为什么增量查询需要状态管理

增量查询本质上是一个有状态的流式消费:每次运行时,需要知道”上次处理到哪里了”(即上次处理的最大 Commit 时间戳),才能决定本次从哪个时间点开始拉取。

这个”上次处理位置”就是 Checkpoint。Checkpoint 必须被持久化到可靠的存储,否则:

  • Job 重启后不知道从哪里继续,要么重复处理(数据重复),要么跳过数据(数据丢失)

3.2 Checkpoint 存储方案

方案 1:文件系统 Checkpoint(最简单)

import json
import datetime
 
CHECKPOINT_PATH = "s3://bucket/checkpoints/trips_etl_checkpoint.json"
 
def read_checkpoint(spark, path, default_begin="20000101000000000"):
    """从文件系统读取 Checkpoint"""
    try:
        checkpoint_df = spark.read.text(path)
        return json.loads(checkpoint_df.collect()[0][0])["last_commit"]
    except Exception:
        return default_begin  # 首次运行,从最早开始
 
def write_checkpoint(spark, path, commit_time):
    """将 Checkpoint 写回文件系统"""
    checkpoint_data = json.dumps({"last_commit": commit_time,
                                  "updated_at": str(datetime.datetime.now())})
    spark.sparkContext.parallelize([checkpoint_data]) \
        .saveAsTextFile(path + "_tmp")
    # 原子替换(实际使用 hadoop fs 的 rename 操作)
    # ...
 
# 使用示例
def run_incremental_etl(spark):
    begin_time = read_checkpoint(spark, CHECKPOINT_PATH)
 
    # 增量读取
    incremental_df = spark.read.format("hudi") \
        .option("hoodie.datasource.query.type", "incremental") \
        .option("hoodie.datasource.read.begin.instanttime", begin_time) \
        .load("s3://bucket/trips/")
 
    if incremental_df.count() == 0:
        print("没有新数据,跳过")
        return
 
    # 业务处理
    processed_df = transform(incremental_df)
    processed_df.write.format("hudi") \
        .option("hoodie.datasource.write.operation", "upsert") \
        .save("s3://bucket/trips_dwd/")
 
    # 更新 Checkpoint
    latest_commit = incremental_df.agg({"_hoodie_commit_time": "max"}).collect()[0][0]
    write_checkpoint(spark, CHECKPOINT_PATH, latest_commit)

方案 2:Spark Structured Streaming 内置 Checkpoint

Hudi 支持作为 Spark Structured Streaming 的 Source,利用 Streaming 内置的 Checkpoint 机制(WAL)管理消费状态:

# Hudi 作为 Streaming Source(自动管理 Checkpoint)
streaming_df = spark.readStream.format("hudi") \
    .option("hoodie.datasource.query.type", "incremental") \
    .option("hoodie.datasource.read.begin.instanttime",
            "20240101000000000") \  # 初始起点
    .load("s3://bucket/trips/")
 
# 定义 Sink(输出到另一张 Hudi 表)
query = streaming_df.writeStream \
    .format("hudi") \
    .option("hoodie.datasource.write.operation", "upsert") \
    .option("hoodie.datasource.write.recordkey.field", "trip_id") \
    .option("hoodie.datasource.write.partitionpath.field", "date") \
    .option("hoodie.datasource.write.precombine.field", "updated_at") \
    .option("checkpointLocation", "s3://bucket/checkpoints/trips_stream/") \  # Streaming Checkpoint
    .option("path", "s3://bucket/trips_dwd/") \
    .trigger(processingTime="5 minutes") \  # 每 5 分钟触发一次
    .start()
 
query.awaitTermination()

这种方式最简洁,Spark Streaming 的 Checkpoint 机制天然保证了 exactly-once 语义(每个 Hudi Commit 对应一个 Streaming Batch)。

3.3 精确消费(Exactly-Once)的保证

在增量 ETL 管道中,exactly-once 需要读写两端联合保证:

读端(Hudi Source):
  - Timeline 的 COMPLETED 状态保证:只读完成的 Commit,不读 INFLIGHT 的数据
  - Checkpoint 保证:每次从上次成功处理的 Commit 之后开始
  - 幂等性:即使 Job 失败重试,只要 Checkpoint 未更新,会重新处理同一批数据

写端(Sink):
  - 如果 Sink 也是 Hudi 表(Upsert),则写入天然幂等(相同数据 Upsert 两次结果相同)
  - 如果 Sink 是非幂等的(如写 Kafka),需要在业务层做去重

Checkpoint 更新时机(关键!):
  - 必须在 Sink 写入成功后,才更新 Checkpoint
  - 即:先写数据,后更新 Checkpoint(At-Least-Once 语义)
  - 结合幂等写入,实现 Effectively-Exactly-Once

第 4 章 增量查询的工程实践模式

4.1 多层 Lakehouse 的增量 ETL 链路

在典型的 Lakehouse 架构中,数据从 ODS(原始层)到 DWD(明细层)到 DWS(汇总层),每一层都可以用 Hudi Incremental Query 实现增量计算:

架构图:

MySQL CDC
   ↓(Flink CDC / Debezium)
Kafka(原始变更事件)
   ↓(Spark Streaming 每分钟一次 Commit)
ODS 层(Hudi 表,MoR,按日期分区)
   ↓(Spark 增量 ETL,每 15 分钟)
DWD 层(Hudi 表,MoR,清洗后的明细数据)
   ↓(Spark 增量聚合,每小时)
DWS 层(Hudi 表,CoW,聚合指标)
   ↓(Presto/Trino 即席查询)
BI 报表

每层的数据量放大比:
  ODS → DWD:1:1(只是清洗,记录数接近)
  DWD → DWS:100:1(聚合降维)

每层增量处理的数据量:
  ODS 每15分钟增量:~100MB(相比 ODS 总量 10TB)
  DWD 每15分钟增量:~100MB
  DWS 每小时增量:~1MB(只聚合变化的维度)

4.2 增量 Join:只 Join 变化的数据

增量 ETL 中最常见的挑战是增量 Join——如何将增量的事实表数据与维表做 Join,同时考虑维表自身也在变化的情况:

def incremental_join_etl(spark, begin_time):
    # 增量事实表(只有最近变化的订单)
    incremental_orders = spark.read.format("hudi") \
        .option("hoodie.datasource.query.type", "incremental") \
        .option("hoodie.datasource.read.begin.instanttime", begin_time) \
        .load("s3://bucket/ods/orders/")
 
    # 维表(全量快照,用 Snapshot Query 读取最新状态)
    users_dim = spark.read.format("hudi") \
        .option("hoodie.datasource.query.type", "snapshot") \
        .load("s3://bucket/dim/users/")
        # 注意:这里必须用全量 Snapshot,因为维表的关联记录不一定在增量范围内
 
    # Join(增量事实表 × 全量维表)
    result = incremental_orders.join(
        users_dim,
        incremental_orders["user_id"] == users_dim["user_id"],
        "left"
    )
 
    # 写入 DWD 层(Upsert,幂等)
    result.write.format("hudi") \
        .option("hoodie.datasource.write.operation", "upsert") \
        .option("hoodie.datasource.write.recordkey.field", "order_id") \
        .save("s3://bucket/dwd/order_detail/")

维表变化的处理陷阱

如果维表也在频繁更新(如用户信息),上述方案在维表变化时不会触发事实表的重计算。 更完整的方案是:当维表有更新时,将维表的变化记录的关联事实记录也加入增量处理批次(“触发式回溯”)。这超出了 Hudi 本身的能力范围,需要在应用层实现。

4.3 增量查询与 Snapshot Query 的混合使用

实际数据管道中,通常需要混合使用两种查询模式:

def smart_incremental_read(spark, table_path, last_commit, full_refresh=False):
    """
    智能增量读取:
    - 正常情况:增量读取(只处理变化数据)
    - 首次运行或强制全量:全量 Snapshot 读取
    - 增量数据太大(超过阈值):降级为全量(防止增量 backlog 太大)
    """
    if full_refresh or last_commit is None:
        # 首次运行:全量读取
        return spark.read.format("hudi") \
            .option("hoodie.datasource.query.type", "snapshot") \
            .load(table_path), "snapshot"
 
    # 检查增量数据量
    incremental_df = spark.read.format("hudi") \
        .option("hoodie.datasource.query.type", "incremental") \
        .option("hoodie.datasource.read.begin.instanttime", last_commit) \
        .load(table_path)
 
    incremental_count = incremental_df.count()
    total_count = spark.read.format("hudi").load(table_path).count()
 
    if incremental_count > total_count * 0.3:
        # 增量超过总量 30%,降级为全量(全量反而更快)
        return spark.read.format("hudi") \
            .option("hoodie.datasource.query.type", "snapshot") \
            .load(table_path), "snapshot_fallback"
 
    return incremental_df, "incremental"

除 Spark Structured Streaming 外,Hudi 也支持作为 Flink 的 Source,实现真正的流式增量消费:

// Flink 消费 Hudi 表的增量数据
StreamTableEnvironment tableEnv = ...;
 
tableEnv.executeSql(
    "CREATE TABLE trips (" +
    "  trip_id STRING," +
    "  status STRING," +
    "  fare DOUBLE," +
    "  `_hoodie_commit_time` STRING" +
    ") WITH (" +
    "  'connector' = 'hudi'," +
    "  'path' = 's3://bucket/trips/'," +
    "  'table.type' = 'MERGE_ON_READ'," +
    // 关键:streaming.read 模式开启增量消费
    "  'read.streaming.enabled' = 'true'," +
    "  'read.start-commit' = '20240101000000000'," +
    "  'read.streaming.check-interval' = '60'" +  // 每 60 秒检查一次新 Commit
    ")"
);
 
// Flink 会持续监听 Hudi 的 Timeline,每有新 Commit 就触发增量读取
tableEnv.executeSql(
    "INSERT INTO dwd_trips " +
    "SELECT trip_id, status, fare FROM trips " +
    "WHERE status IN ('COMPLETED', 'BILLED')"
);

这种方式让 Hudi 充当了类似 Apache Kafka 的角色:作为一个持久化的、可回溯的变更日志存储,Flink 可以从任意历史时间点开始消费,且不会因为 Kafka 的 retention 限制而丢失历史数据。


小结

Hudi 的 Incremental Query 不是一个锦上添花的功能,而是 Hudi 整个架构设计的核心输出——Timeline 的三阶段提交保证了增量语义的正确性,Commit 元数据中的文件列表使得增量查询可以精确缩小扫描范围,_hoodie_commit_time 元字段使得记录级别的精确过滤成为可能。

在大规模数据湖中,将全量 ETL 改造为增量 ETL,通常是性价比最高的性能优化手段——不需要修改业务逻辑,只需要将 spark.read.format("hudi").load(path) 改为增量模式,配合 Checkpoint 状态管理,即可获得 10-100 倍的计算量降低。

最后一篇 06 Hudi vs Delta Lake vs Iceberg——架构设计的本质差异与选型 将综合本专栏所有篇章的内容,对三大数据湖方案进行全面的架构对比,给出明确的场景化选型决策框架。

思考题

  1. Hudi 的增量查询允许消费者只处理”自某个 Commit 时间点以来的所有变更记录”。如果某次增量中包含了 DELETE 操作,增量查询如何返回这些被删除的记录,使下游能做出相应的删除处理?HoodieRecord 中哪个字段标识了一条记录是”删除”?
  2. Flink 持续写入 Hudi 表时可能同时有多个 INFLIGHT Commit。如果 Spark 在有 INFLIGHT Commit 的时刻开始增量查询,它看到的是截止到上一个 COMPLETED Commit 的增量,还是包括 INFLIGHT 的部分数据?这是否会导致数据不一致的消费?
  3. Hudi 的 Bootstrap 功能允许将现有 Parquet 文件”引导”进 Hudi 管理而无需重写数据。Bootstrap 后,Bloom Filter Index 不会索引历史数据,对历史记录的 UPDATE 需要走慢路径(全分区扫描)。Bootstrap 功能在什么场景下是合理的技术选择,而不是从头重建 Hudi 表?