摘要:
Apache Hudi 的 Incremental Query(增量查询) 是它区别于所有竞争对手的核心能力,也是”Hudi 是增量处理引擎”这一定位的最直接体现。它让下游 Spark 任务可以精确查询”自上次处理以来,哪些记录发生了变化”——而不是每次都扫描整张表。这一能力将全量 ETL 的算力消耗从”扫描全量 10TB 数据”缩减为”只处理过去 1 小时变化的 500MB 数据”,在大规模数据管道中通常带来 10-100 倍的计算量减少。本文深入剖析增量查询的实现机制、Checkpoint 状态管理、与 Spark Structured Streaming 的集成,以及在实际 CDC 增量 ETL 管道中的完整工程实践。
第 1 章 增量消费的本质需求
1.1 全量 ETL 的不可持续性
传统数据湖 ETL 管道的典型模式是全量覆盖:每天(或每小时)将整张业务表全量同步到数据湖,下游各层加工也是全量重算。这在数据量小的时候无关紧要,但随着数据规模增长,全量模式的代价变得不可持续:
案例:电商平台的订单表
- 历史总记录:100 亿条(约 10TB)
- 每小时新增/更新:500 万条(约 500MB)
- 更新比例:只有 0.05% 的数据在一小时内发生了变化
全量 ETL(每小时运行):
- 每次需要读取:10TB 数据
- 每次计算量:10TB 的扫描 + 全量 JOIN/聚合
- 每小时 Spark 集群成本:~$500(100 个节点 × 1 小时)
- 数据新鲜度延迟:处理 10TB 数据需要 45-60 分钟
增量 ETL(每小时运行,基于 Hudi Incremental Query):
- 每次需要读取:500MB(只有变化的数据)
- 每次计算量:500MB 的扫描 + 增量 JOIN/聚合
- 每小时 Spark 集群成本:~$5(相当于 1/100 的成本)
- 数据新鲜度延迟:处理 500MB 数据需要 2-3 分钟
这就是增量处理的核心价值:只处理变化的数据,计算量与变化量成正比,而不是与历史总量成正比。
1.2 为什么 Delta Lake 的 CDF 不是同等能力
Delta Lake 在 1.x 版本后也引入了 Change Data Feed(CDF),表面上提供类似的增量消费能力。但两者有几个重要差异:
差异 1:设计时间点不同
Hudi 的 Incremental Query 是从设计之初就内置的能力(2016 年),整个文件布局(Timeline、Commit 元数据中的文件级统计)都为增量查询设计。Delta Lake 的 CDF 是 2022 年才正式 GA 的后加功能,需要在建表时显式开启(delta.enableChangeDataFeed = true),且会额外写入 _change_data 目录存储变更日志——这意味着额外的存储开销。
差异 2:语义的细微差别
Hudi Incremental Query 返回的是:在时间范围 [T1, T2] 内,每条记录最新的状态(如果一条记录在这段时间内被更新了 3 次,只返回第 3 次的结果)。
Delta Lake CDF 返回的是:所有变更操作的明细,包括 insert、update_preimage、update_postimage、delete(类似 binlog 的行级变更流水)。
差异 3:对现有数据的影响
Hudi Incremental Query 对已有数据无任何影响,查询时通过扫描 Timeline 和过滤 _hoodie_commit_time 字段实现。
Delta Lake CDF 需要从开启 CDF 的那个版本开始才有变更日志,且关闭 CDF 会丢失历史变更记录。
两种增量消费语义的使用场景
如果下游需要的是”某时间段内每条记录的最新快照”(如:每小时刷新 DWD 层)→ Hudi Incremental Query 更自然 如果下游需要的是”完整的变更流水”(如:实时数仓的 SCD-2、CDC 审计日志)→ Delta Lake CDF 更合适
第 2 章 增量查询的技术实现
2.1 三个内置元字段
Hudi 在每条记录写入时,自动注入三个元字段,这些字段是增量查询的物理基础:
_hoodie_commit_time : 该记录最后一次被写入(INSERT 或 UPDATE)的 Commit 时间戳
例:20240101120000000
作用:增量查询的过滤条件
_hoodie_record_key : 记录的 recordKey(主键值)
例:trip_id=12345
作用:Index Lookup 和记录去重
_hoodie_partition_path: 记录所在的分区路径
例:date=2024-01-01
作用:分区路由
当你执行 SELECT * FROM trips,会自动看到这三个字段加上你的业务字段。
2.2 增量查询的 SQL 语法
Spark SQL 方式:
-- Step 1:注册表(指定为增量查询模式)
CREATE TABLE trips_incremental
USING hudi
OPTIONS (
path = 's3://bucket/trips/',
queryType = 'incremental', -- 关键:声明增量查询模式
beginInstant = '20240101110000000', -- 从这个 Commit 时间点之后开始(不含)
endInstant = '20240101120000000' -- 到这个 Commit 时间点(含),可省略(省略则到最新)
);
-- Step 2:查询(无需额外条件,Hudi 内部自动注入 _hoodie_commit_time 过滤)
SELECT trip_id, status, fare, _hoodie_commit_time
FROM trips_incremental;
-- 输出:只包含在 11:00 ~ 12:00 之间被写入的记录(最新版本)Spark DataFrame API 方式(更常用于程序化场景):
# 读取增量数据(从上次 Checkpoint 之后)
last_checkpoint = read_checkpoint_from_state_store() # 从状态存储读取上次处理的位置
incremental_df = spark.read.format("hudi") \
.option("hoodie.datasource.query.type", "incremental") \
.option("hoodie.datasource.read.begin.instanttime", last_checkpoint) \
# 可选:指定结束时间(不指定则读到最新)
# .option("hoodie.datasource.read.end.instanttime", "20240101130000000") \
.load("s3://bucket/trips/")
# incremental_df 只包含 last_checkpoint 之后被写入的记录
print(f"增量记录数: {incremental_df.count()}")
print(f"时间范围: {incremental_df.agg({'_hoodie_commit_time': 'min'}).collect()[0][0]}"
f" ~ {incremental_df.agg({'_hoodie_commit_time': 'max'}).collect()[0][0]}")
# 处理完成后,更新 Checkpoint
latest_commit = incremental_df.agg({"_hoodie_commit_time": "max"}).collect()[0][0]
write_checkpoint_to_state_store(latest_commit)2.3 增量查询的内部执行逻辑
理解增量查询如何工作,需要深入到 Spark 的物理执行层:
用户执行:
spark.read.format("hudi")
.option("queryType", "incremental")
.option("beginInstant", "20240101110000000")
.load("s3://bucket/trips/")
Hudi 内部执行步骤:
Step 1:Timeline 扫描
读取 s3://bucket/trips/.hoodie/ 目录
找到所有 timestamp > "20240101110000000" 的 COMPLETED Commit/DeltaCommit
Step 2:文件列表构建
从每个相关 Commit 的元数据中,提取写入的文件列表
对 MoR 表:包含 Parquet 基础文件和 .log 文件
对 CoW 表:只包含 Parquet 文件
Step 3:构建 Spark 的 FileScanRDD
只扫描上述文件(而不是整张表的所有文件)
这是性能优化的关键:文件集合大幅缩小
Step 4:谓词下推(Predicate Pushdown)
Hudi 自动在 Parquet 层注入过滤条件:
_hoodie_commit_time > '20240101110000000'
Parquet 的列统计(Column Statistics)可以进一步过滤文件内的 Row Group
Step 5:输出
只返回 _hoodie_commit_time 在范围内的记录
这些记录代表在时间范围内被最后一次写入的状态
性能优化关键点:增量查询的效率主要来自 Step 3 的文件列表缩小——如果过去 1 小时只有 10 个文件被写入,增量查询只需扫描这 10 个文件,而不是整张表的 10000 个文件。
第 3 章 Checkpoint 状态管理
3.1 为什么增量查询需要状态管理
增量查询本质上是一个有状态的流式消费:每次运行时,需要知道”上次处理到哪里了”(即上次处理的最大 Commit 时间戳),才能决定本次从哪个时间点开始拉取。
这个”上次处理位置”就是 Checkpoint。Checkpoint 必须被持久化到可靠的存储,否则:
- Job 重启后不知道从哪里继续,要么重复处理(数据重复),要么跳过数据(数据丢失)
3.2 Checkpoint 存储方案
方案 1:文件系统 Checkpoint(最简单)
import json
import datetime
CHECKPOINT_PATH = "s3://bucket/checkpoints/trips_etl_checkpoint.json"
def read_checkpoint(spark, path, default_begin="20000101000000000"):
"""从文件系统读取 Checkpoint"""
try:
checkpoint_df = spark.read.text(path)
return json.loads(checkpoint_df.collect()[0][0])["last_commit"]
except Exception:
return default_begin # 首次运行,从最早开始
def write_checkpoint(spark, path, commit_time):
"""将 Checkpoint 写回文件系统"""
checkpoint_data = json.dumps({"last_commit": commit_time,
"updated_at": str(datetime.datetime.now())})
spark.sparkContext.parallelize([checkpoint_data]) \
.saveAsTextFile(path + "_tmp")
# 原子替换(实际使用 hadoop fs 的 rename 操作)
# ...
# 使用示例
def run_incremental_etl(spark):
begin_time = read_checkpoint(spark, CHECKPOINT_PATH)
# 增量读取
incremental_df = spark.read.format("hudi") \
.option("hoodie.datasource.query.type", "incremental") \
.option("hoodie.datasource.read.begin.instanttime", begin_time) \
.load("s3://bucket/trips/")
if incremental_df.count() == 0:
print("没有新数据,跳过")
return
# 业务处理
processed_df = transform(incremental_df)
processed_df.write.format("hudi") \
.option("hoodie.datasource.write.operation", "upsert") \
.save("s3://bucket/trips_dwd/")
# 更新 Checkpoint
latest_commit = incremental_df.agg({"_hoodie_commit_time": "max"}).collect()[0][0]
write_checkpoint(spark, CHECKPOINT_PATH, latest_commit)方案 2:Spark Structured Streaming 内置 Checkpoint
Hudi 支持作为 Spark Structured Streaming 的 Source,利用 Streaming 内置的 Checkpoint 机制(WAL)管理消费状态:
# Hudi 作为 Streaming Source(自动管理 Checkpoint)
streaming_df = spark.readStream.format("hudi") \
.option("hoodie.datasource.query.type", "incremental") \
.option("hoodie.datasource.read.begin.instanttime",
"20240101000000000") \ # 初始起点
.load("s3://bucket/trips/")
# 定义 Sink(输出到另一张 Hudi 表)
query = streaming_df.writeStream \
.format("hudi") \
.option("hoodie.datasource.write.operation", "upsert") \
.option("hoodie.datasource.write.recordkey.field", "trip_id") \
.option("hoodie.datasource.write.partitionpath.field", "date") \
.option("hoodie.datasource.write.precombine.field", "updated_at") \
.option("checkpointLocation", "s3://bucket/checkpoints/trips_stream/") \ # Streaming Checkpoint
.option("path", "s3://bucket/trips_dwd/") \
.trigger(processingTime="5 minutes") \ # 每 5 分钟触发一次
.start()
query.awaitTermination()这种方式最简洁,Spark Streaming 的 Checkpoint 机制天然保证了 exactly-once 语义(每个 Hudi Commit 对应一个 Streaming Batch)。
3.3 精确消费(Exactly-Once)的保证
在增量 ETL 管道中,exactly-once 需要读写两端联合保证:
读端(Hudi Source):
- Timeline 的 COMPLETED 状态保证:只读完成的 Commit,不读 INFLIGHT 的数据
- Checkpoint 保证:每次从上次成功处理的 Commit 之后开始
- 幂等性:即使 Job 失败重试,只要 Checkpoint 未更新,会重新处理同一批数据
写端(Sink):
- 如果 Sink 也是 Hudi 表(Upsert),则写入天然幂等(相同数据 Upsert 两次结果相同)
- 如果 Sink 是非幂等的(如写 Kafka),需要在业务层做去重
Checkpoint 更新时机(关键!):
- 必须在 Sink 写入成功后,才更新 Checkpoint
- 即:先写数据,后更新 Checkpoint(At-Least-Once 语义)
- 结合幂等写入,实现 Effectively-Exactly-Once
第 4 章 增量查询的工程实践模式
4.1 多层 Lakehouse 的增量 ETL 链路
在典型的 Lakehouse 架构中,数据从 ODS(原始层)到 DWD(明细层)到 DWS(汇总层),每一层都可以用 Hudi Incremental Query 实现增量计算:
架构图:
MySQL CDC
↓(Flink CDC / Debezium)
Kafka(原始变更事件)
↓(Spark Streaming 每分钟一次 Commit)
ODS 层(Hudi 表,MoR,按日期分区)
↓(Spark 增量 ETL,每 15 分钟)
DWD 层(Hudi 表,MoR,清洗后的明细数据)
↓(Spark 增量聚合,每小时)
DWS 层(Hudi 表,CoW,聚合指标)
↓(Presto/Trino 即席查询)
BI 报表
每层的数据量放大比:
ODS → DWD:1:1(只是清洗,记录数接近)
DWD → DWS:100:1(聚合降维)
每层增量处理的数据量:
ODS 每15分钟增量:~100MB(相比 ODS 总量 10TB)
DWD 每15分钟增量:~100MB
DWS 每小时增量:~1MB(只聚合变化的维度)
4.2 增量 Join:只 Join 变化的数据
增量 ETL 中最常见的挑战是增量 Join——如何将增量的事实表数据与维表做 Join,同时考虑维表自身也在变化的情况:
def incremental_join_etl(spark, begin_time):
# 增量事实表(只有最近变化的订单)
incremental_orders = spark.read.format("hudi") \
.option("hoodie.datasource.query.type", "incremental") \
.option("hoodie.datasource.read.begin.instanttime", begin_time) \
.load("s3://bucket/ods/orders/")
# 维表(全量快照,用 Snapshot Query 读取最新状态)
users_dim = spark.read.format("hudi") \
.option("hoodie.datasource.query.type", "snapshot") \
.load("s3://bucket/dim/users/")
# 注意:这里必须用全量 Snapshot,因为维表的关联记录不一定在增量范围内
# Join(增量事实表 × 全量维表)
result = incremental_orders.join(
users_dim,
incremental_orders["user_id"] == users_dim["user_id"],
"left"
)
# 写入 DWD 层(Upsert,幂等)
result.write.format("hudi") \
.option("hoodie.datasource.write.operation", "upsert") \
.option("hoodie.datasource.write.recordkey.field", "order_id") \
.save("s3://bucket/dwd/order_detail/")维表变化的处理陷阱
如果维表也在频繁更新(如用户信息),上述方案在维表变化时不会触发事实表的重计算。 更完整的方案是:当维表有更新时,将维表的变化记录的关联事实记录也加入增量处理批次(“触发式回溯”)。这超出了 Hudi 本身的能力范围,需要在应用层实现。
4.3 增量查询与 Snapshot Query 的混合使用
实际数据管道中,通常需要混合使用两种查询模式:
def smart_incremental_read(spark, table_path, last_commit, full_refresh=False):
"""
智能增量读取:
- 正常情况:增量读取(只处理变化数据)
- 首次运行或强制全量:全量 Snapshot 读取
- 增量数据太大(超过阈值):降级为全量(防止增量 backlog 太大)
"""
if full_refresh or last_commit is None:
# 首次运行:全量读取
return spark.read.format("hudi") \
.option("hoodie.datasource.query.type", "snapshot") \
.load(table_path), "snapshot"
# 检查增量数据量
incremental_df = spark.read.format("hudi") \
.option("hoodie.datasource.query.type", "incremental") \
.option("hoodie.datasource.read.begin.instanttime", last_commit) \
.load(table_path)
incremental_count = incremental_df.count()
total_count = spark.read.format("hudi").load(table_path).count()
if incremental_count > total_count * 0.3:
# 增量超过总量 30%,降级为全量(全量反而更快)
return spark.read.format("hudi") \
.option("hoodie.datasource.query.type", "snapshot") \
.load(table_path), "snapshot_fallback"
return incremental_df, "incremental"第 5 章 与 Flink 的增量集成
5.1 Hudi 作为 Flink Source
除 Spark Structured Streaming 外,Hudi 也支持作为 Flink 的 Source,实现真正的流式增量消费:
// Flink 消费 Hudi 表的增量数据
StreamTableEnvironment tableEnv = ...;
tableEnv.executeSql(
"CREATE TABLE trips (" +
" trip_id STRING," +
" status STRING," +
" fare DOUBLE," +
" `_hoodie_commit_time` STRING" +
") WITH (" +
" 'connector' = 'hudi'," +
" 'path' = 's3://bucket/trips/'," +
" 'table.type' = 'MERGE_ON_READ'," +
// 关键:streaming.read 模式开启增量消费
" 'read.streaming.enabled' = 'true'," +
" 'read.start-commit' = '20240101000000000'," +
" 'read.streaming.check-interval' = '60'" + // 每 60 秒检查一次新 Commit
")"
);
// Flink 会持续监听 Hudi 的 Timeline,每有新 Commit 就触发增量读取
tableEnv.executeSql(
"INSERT INTO dwd_trips " +
"SELECT trip_id, status, fare FROM trips " +
"WHERE status IN ('COMPLETED', 'BILLED')"
);这种方式让 Hudi 充当了类似 Apache Kafka 的角色:作为一个持久化的、可回溯的变更日志存储,Flink 可以从任意历史时间点开始消费,且不会因为 Kafka 的 retention 限制而丢失历史数据。
小结
Hudi 的 Incremental Query 不是一个锦上添花的功能,而是 Hudi 整个架构设计的核心输出——Timeline 的三阶段提交保证了增量语义的正确性,Commit 元数据中的文件列表使得增量查询可以精确缩小扫描范围,_hoodie_commit_time 元字段使得记录级别的精确过滤成为可能。
在大规模数据湖中,将全量 ETL 改造为增量 ETL,通常是性价比最高的性能优化手段——不需要修改业务逻辑,只需要将 spark.read.format("hudi").load(path) 改为增量模式,配合 Checkpoint 状态管理,即可获得 10-100 倍的计算量降低。
最后一篇 06 Hudi vs Delta Lake vs Iceberg——架构设计的本质差异与选型 将综合本专栏所有篇章的内容,对三大数据湖方案进行全面的架构对比,给出明确的场景化选型决策框架。
思考题
- Hudi 的增量查询允许消费者只处理”自某个 Commit 时间点以来的所有变更记录”。如果某次增量中包含了
DELETE操作,增量查询如何返回这些被删除的记录,使下游能做出相应的删除处理?HoodieRecord中哪个字段标识了一条记录是”删除”?- Flink 持续写入 Hudi 表时可能同时有多个 INFLIGHT Commit。如果 Spark 在有 INFLIGHT Commit 的时刻开始增量查询,它看到的是截止到上一个 COMPLETED Commit 的增量,还是包括 INFLIGHT 的部分数据?这是否会导致数据不一致的消费?
- Hudi 的 Bootstrap 功能允许将现有 Parquet 文件”引导”进 Hudi 管理而无需重写数据。Bootstrap 后,Bloom Filter Index 不会索引历史数据,对历史记录的 UPDATE 需要走慢路径(全分区扫描)。Bootstrap 功能在什么场景下是合理的技术选择,而不是从头重建 Hudi 表?