摘要:

Apache Paimon 最核心的流处理能力之一,是让数据湖的存储层直接充当可靠的 Changelog 来源——下游 Apache Flink 任务可以像消费 Apache Kafka 一样消费 Paimon 表的变更事件(+I-U+U-D),而不是像传统方案那样只能做全量快照查询。这个能力被称为 Changelog Producer。本文深入剖析 Changelog Producer 的两种核心实现模式:Full Compaction Producer(通过全量合并产生精确的 before/after 变更对)和 Lookup Changelog Producer(通过查找旧值产生实时 Changelog),解析两者在延迟、准确性、资源消耗和适用场景上的根本差异,以及 Paimon Changelog 如何与 Flink 的流式执行模型无缝结合,构建真正的流批一体实时数仓链路。


第 1 章 为什么数据湖需要 Changelog 能力

1.1 传统数据湖的”只读快照”困境

在 Paimon 出现之前,基于 Apache HudiApache Iceberg 构建的数据湖,其下游消费模式通常是:

消费模式 1:全量快照读取(Snapshot Query)
  每隔一段时间(如每小时),读取整张表的最新快照
  → 数据量 = 全量(即使只有 0.1% 的数据发生了变化)
  → 计算成本高,延迟高

消费模式 2:增量文件读取(Hudi Incremental Query / Iceberg Incremental Read)
  读取"上次处理以来新增/修改的文件"
  → 数据量 = 变更文件(比全量小很多)
  → 但仍然是文件粒度,无法知道"每条记录的变化方向(增/改/删)"

这两种模式都缺少一个关键信息:记录的变化方向(Change Type)

具体问题是:Hudi 增量查询返回的是”在时间范围内被最后修改的记录的当前状态”,但它不告诉你:这条记录是新增的?是从什么值更新到什么值?还是已经被删除了?

对于下游的 Flink 流计算任务,如果你需要基于”订单状态从 CREATED 变为 PAID”这个事件触发业务逻辑,增量快照文件是无法直接使用的——你还需要自己维护旧状态,与新文件做 JOIN 才能推断出变化方向。这显著增加了下游的开发复杂度和计算成本。

1.2 Kafka 的 CDC 流与数据湖的矛盾

另一种常见方案是:让 Apache Kafka 充当 Changelog 来源,Paimon/Hudi/Iceberg 只做”最终状态存储”:

架构:
  MySQL → Flink CDC → Kafka(Changelog 消息)→ 下游多路流消费
                                              ↓(同时写入)
                                    Paimon 数据湖(最终状态存储)

下游消费者:
  下游 A:从 Kafka 实时消费 Changelog,做实时指标计算
  下游 B:从 Paimon 读取最新状态,做批量报表
  下游 C:从 Kafka 消费 Changelog,更新另一张 Paimon 聚合表

这个架构的问题是:

问题 1:Kafka 的保留时间限制
  Kafka 默认保留 7 天的消息。超过 7 天的历史 Changelog 无法回溯。
  如果下游任务需要重跑历史数据(如修复 Bug),Kafka 的历史数据已经过期。
  而 Paimon 的历史 Snapshot 可以保留很久(只受存储成本限制)。

问题 2:Kafka 与数据湖的一致性
  写入 Kafka 和写入 Paimon 是两个独立操作。
  如果 Flink Job 在写 Paimon 成功、写 Kafka 失败时崩溃(或反之),
  会出现 Kafka 与 Paimon 数据不一致的问题。

问题 3:维护双写的复杂性
  需要同时维护 Kafka Topic 和 Paimon 表,数据双份存储,成本高。

Paimon 的 Changelog Producer 解决了这个矛盾:让 Paimon 本身成为 Changelog 的来源,消除对独立 Kafka CDC 流的依赖

1.3 Changelog 的四种变化类型

Paimon 遵循 Flink 的 Changelog 语义,使用四种 RowKind 表示记录的变化:

+I(INSERT):记录新增
  → {+I, order_id=1, status='CREATED', amount=99}
  → 表示 order_id=1 这条记录首次出现

-D(DELETE):记录删除
  → {-D, order_id=1, status='CREATED', amount=99}
  → 表示 order_id=1 这条记录被删除(含删除前的值)

-U(UPDATE BEFORE):更新前镜像
  → {-U, order_id=1, status='CREATED', amount=99}
  → 表示 order_id=1 的旧值(更新前)

+U(UPDATE AFTER):更新后镜像
  → {+U, order_id=1, status='PAID', amount=99}
  → 表示 order_id=1 的新值(更新后)
  → -U 和 +U 必须成对出现(先 -U 后 +U)

当 Flink 的下游算子(如 GROUP BYJOIN)消费这四种消息时,它们可以正确地维护聚合状态:

下游:统计各 status 的订单数量
  收到 {+I, status='CREATED'} → count['CREATED'] += 1
  收到 {-U, status='CREATED'} → count['CREATED'] -= 1  ← 撤回旧值
  收到 {+U, status='PAID'}    → count['PAID'] += 1      ← 加入新值
  → 最终:count['PAID']=1, count['CREATED']=0(正确!)

如果只有快照(没有 -U/+U 的撤回/更新):
  看到一条 {status='CREATED'} 记录被修改为 {status='PAID'}
  但不知道应该撤回哪个旧聚合值
  → 下游算子无法正确维护状态

第 2 章 Full Compaction Changelog Producer

2.1 设计思路:用合并过程产生 Changelog

Full Compaction Producer 的核心思想是:LSM-Tree 在做 Full Compaction 时,必然会遍历同一主键的所有历史版本(读取多层 SST 文件)——这个遍历过程包含了每条记录”旧值 → 新值”的完整信息,可以在 Compaction 过程中顺便输出 Changelog。

Full Compaction 过程中的 Changelog 生成:

待合并的 SST 文件中,key=order_1 有以下版本(按时间排序):
  Level 2(旧):{order_id=1, status='CREATED', amount=99}     ← 旧版本
  Level 0(新):{order_id=1, status='PAID',    amount=99}     ← 新版本(来自近期写入)

Full Compaction 合并时:
  读取旧版本:{order_id=1, status='CREATED', amount=99}
  读取新版本:{order_id=1, status='PAID',    amount=99}
  输出 Changelog:
    {-U, order_id=1, status='CREATED', amount=99}  ← UPDATE BEFORE
    {+U, order_id=1, status='PAID',    amount=99}  ← UPDATE AFTER
  最终 SST 只写入最新版本:{order_id=1, status='PAID', amount=99}

这些 Changelog 消息被写入专门的 Changelog 文件(存储在表目录下),下游 Flink 流任务可以读取这些文件消费变更事件。

2.2 Full Compaction Producer 的配置

CREATE TABLE dwd_orders (
    order_id   BIGINT,
    status     STRING,
    amount     DECIMAL(10,2),
    updated_at TIMESTAMP(3),
    PRIMARY KEY (order_id) NOT ENFORCED
) PARTITIONED BY (DATE_FORMAT(updated_at, 'yyyy-MM-dd'))
WITH (
    'bucket'              = '16',
    'changelog-producer'  = 'full-compaction',   -- 开启 Full Compaction Changelog
    'full-compaction.delta-commits' = '1',        -- 每隔多少次 Commit 触发一次 Full Compaction
    'changelog-producer.compaction-interval' = '2 min'  -- 最大等待 2 分钟触发 Compaction
);

2.3 Full Compaction Producer 的延迟分析

Full Compaction Producer 的数据延迟 = 最近一次 Full Compaction 完成时间,而不是写入时间。

时间线分析:
  t=0:   写入 {order_id=1, status='PAID'}(Flink 消费 Kafka)
  t=10s: MemTable Flush → Level 0 SST 生成,Snapshot 发布(数据可见,但 Changelog 未生成)
  t=2min: Full Compaction 触发(合并 L0 + L1 + L2 文件)
          → 遍历 order_id=1 的新旧版本
          → 生成 Changelog 文件
          → 发布新 Snapshot(含 Changelog 文件引用)
  t=2min: 下游 Flink 任务读取 Changelog:
          {-U, order_id=1, status='CREATED', ...}
          {+U, order_id=1, status='PAID', ...}

Changelog 延迟:约 2 分钟(Full Compaction 间隔)

这是 Full Compaction Producer 的主要缺点:Changelog 延迟与 Compaction 间隔绑定,通常在 1-5 分钟。

然而,Full Compaction Producer 有一个不可替代的优点:精确性。因为 Changelog 是在合并所有版本后生成的,即使同一条记录在短时间内被更新了 10 次,最终 Changelog 只输出 1 组 (-U, +U)(从第一个版本到最后一个版本),而不是 10 组中间状态的变更——这对下游聚合算子更友好(避免不必要的撤回重算)。

Full Compaction 与数据可见延迟的区别

注意区分两种”延迟”的概念:

  • 数据可见延迟:从写入到可被 Snapshot Query 查询的延迟 ≈ MemTable Flush 时间 ≈ 秒级
  • Changelog 延迟:从写入到 Changelog 事件被下游消费的延迟 ≈ Full Compaction 间隔 ≈ 分钟级 Full Compaction Producer 的 Changelog 延迟高,但 Snapshot 查询延迟仍然是秒级。如果下游只需要读最新状态(Lookup Join),Snapshot 延迟才是关键。

第 3 章 Lookup Changelog Producer

3.1 设计思路:写入时查找旧值

Lookup Changelog Producer 采用完全不同的策略——在写入时(而不是 Compaction 时)立刻查找该 key 的旧值,并立刻生成 Changelog:

Lookup Changelog 的写入流程:
  收到新记录:{order_id=1, status='PAID', amount=99}

  Step 1:在 MemTable 中查找 order_id=1 的旧值
    → 如果 MemTable 中有旧值:{order_id=1, status='CREATED', amount=99}
      立即生成:
        {-U, order_id=1, status='CREATED', amount=99}
        {+U, order_id=1, status='PAID', amount=99}

  Step 2:如果 MemTable 中没有旧值,查找 SST 文件(Level 0, 1, 2...)
    → 找到旧值
      生成:{-U, 旧值} + {+U, 新值}
    → 未找到(新记录)
      生成:{+I, 新值}

  Step 3:将新值写入 MemTable(覆盖旧值)

Lookup Changelog 的延迟

延迟分析:
  t=0:   写入 {order_id=1, status='PAID'}
  t=0:   Lookup 旧值(MemTable 命中或 SST 文件读取,约 1-10ms)
  t=0:   生成 Changelog,随 MemTable 数据一起缓冲
  t=10s: MemTable Flush,Changelog 文件写出
  t=10s: Snapshot 发布,下游可消费

Changelog 延迟:约 10-30 秒(与数据可见延迟相同)

Lookup Changelog Producer 的 Changelog 延迟与数据可见延迟几乎相同——几乎是实时的。

3.2 Lookup Changelog 的性能代价

Lookup Changelog 需要在每次写入时查找旧值,这个查找操作的代价是:

内存命中(MemTable 中有旧值):
  → 哈希查找,O(1),微秒级,代价极低

SST 文件查找(MemTable 中没有旧值):
  → 需要读取 SST 文件(Bloom Filter 过滤 + 二分查找)
  → 对于冷数据(旧记录在 Level 2 大文件中):
     一次 S3 GET 请求,约 10-50ms

写入吞吐量影响:
  如果大多数写入是对已有记录的更新(高更新率):
    → MemTable 命中率高 → 代价低
  如果大多数写入是新记录(高插入率):
    → 每条记录都需要 SST 查找(确认是否是新记录)→ 代价较高
    → 对于插入率 > 90% 的场景,Lookup 开销显著

对比 Full Compaction:
  Full Compaction 的查找发生在 Compaction 时(后台,不阻塞写入)
  Lookup 的查找发生在写入时(阻塞写入路径)
  → Lookup 对写入延迟有直接影响,Full Compaction 对写入延迟无影响

3.3 Lookup Changelog 的配置

CREATE TABLE dwd_orders (
    order_id   BIGINT,
    status     STRING,
    amount     DECIMAL(10,2),
    PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
    'bucket'             = '16',
    'changelog-producer' = 'lookup',          -- 开启 Lookup Changelog
    'lookup.cache-rows'  = '100000',          -- 缓存最近 10 万条记录(减少 SST 查找)
    'lookup.cache-file-retention' = '1 h'    -- 缓存保留时间
);

Lookup Cache 是 Lookup Changelog Producer 的关键优化——将最近被查找过的旧值缓存在内存中,避免重复的 SST 文件读取:

Lookup Cache 的工作原理:
  首次查找 order_id=1 的旧值:
    → 读取 SST 文件(10ms)
    → 将结果存入 Cache:{order_id=1, status='CREATED', ...}

  1 秒后,order_id=1 的再次更新(如 status='SHIPPED'):
    → Cache 命中:直接返回 {status='PAID', ...}(上次写入的新值)
    → 不需要读取 SST 文件
    → 代价:微秒级

Cache 大小(lookup.cache-rows = 100000):
  缓存最近被更新的 10 万条记录
  适合:更新模式有局部性(热点记录频繁更新,冷数据很少更新)
  对于完全随机的更新模式(所有记录均匀更新),Cache 效果有限

开启 Changelog Producer 后,下游 Flink 任务可以通过 scan-mode = latest-fullscan-mode = from-snapshot 流式消费变更事件:

-- 下游 Flink SQL:实时消费 dwd_orders 的 Changelog 计算各状态订单数
 
-- Source:流式读取 Paimon dwd_orders(消费 Changelog)
CREATE TABLE orders_changelog_source (
    order_id   BIGINT,
    status     STRING,
    amount     DECIMAL(10,2)
) WITH (
    'connector'    = 'paimon',
    'path'         = 'hdfs:///data/dwd_orders',
    'scan.mode'    = 'latest-full',   -- 先读一次全量快照,再持续消费增量 Changelog
    'scan.snapshot-id' = '1'         -- 可指定起始 Snapshot(实现精确的一次语义)
);
 
-- Sink:目标聚合统计表
CREATE TABLE dws_order_status_count (
    status     STRING,
    order_count BIGINT,
    PRIMARY KEY (status) NOT ENFORCED
) WITH (
    'connector'          = 'paimon',
    'path'               = 'hdfs:///data/dws_order_status_count',
    'merge-engine'       = 'aggregation',
    'fields.order_count.aggregate-function' = 'sum'  -- Paimon AggregateMerge
);
 
-- 流式 SQL 逻辑:统计各状态订单数
INSERT INTO dws_order_status_count
SELECT status, COUNT(*) AS order_count
FROM orders_changelog_source
GROUP BY status;
-- GROUP BY 在 Flink 中是有状态的流式聚合,消费 Changelog 事件正确维护状态

4.2 流批一体的 Changelog 消费场景

Paimon 的 Changelog 使得”流批一体”从理论变为实际:

场景:构建实时数仓的 DWD → DWS 链路

Step 1:MySQL CDC → Flink CDC → Paimon DWD 层(主键表,Full Compaction Changelog)
  写入:Flink CDC 消费 MySQL Binlog,Upsert 写入 Paimon 主键表
  输出:每 1 分钟(Full Compaction 间隔)生成一批 Changelog 文件

Step 2:下游 Flink Job 流读 Paimon DWD 层的 Changelog → 写入 DWS 聚合层
  读取:Flink 流式消费 DWD 的 Changelog(-U/+U 事件)
  计算:GROUP BY 维度(如按 status、按 region)聚合
  写入:Upsert 到 Paimon DWS 主键表(AggregateMerge)

整体延迟:
  MySQL 变更 → DWS 查询可见 ≈ Full Compaction 间隔 ≈ 1-2 分钟
  (DWD 层数据写入 + DWD Changelog 生成 + DWS 消费和写入)

这个链路中,Paimon 完全替代了 Kafka 在变更事件传递上的角色——Paimon 既是存储(持久化、可查询),也是流(可订阅的 Changelog 来源)。

4.3 两种 Producer 的选型建议

维度Full Compaction ProducerLookup Changelog Producer
Changelog 延迟分钟级(Compaction 间隔)秒级(几乎实时)
Changelog 精确性高(只输出最终变化,跳过中间状态)中(每次写入都产生 Changelog,含中间状态)
写入吞吐影响无(Changelog 在后台 Compaction 生成)有(每次写入需要 Lookup 旧值)
适用写入模式任意(特别适合批量写入)高更新率(热点记录频繁更新,Cache 命中率高)
资源消耗Compaction 时 CPU/IO 高峰写入时分散的 IO(Lookup 开销)
适合场景CDC Upsert + 下游不需要每次中间变更实时维表、需要低延迟 Changelog 的流水线

小结

Paimon 的 Changelog Producer 是它在流批一体架构中的核心差异化能力:

  • Full Compaction Producer:以分钟级延迟换取 Changelog 精确性和写入零额外开销,适合对延迟不那么敏感但需要准确变更语义的 CDC 链路
  • Lookup Changelog Producer:以写入时的 Lookup 开销换取秒级 Changelog 延迟,适合实时性要求高的场景,搭配 Lookup Cache 可以大幅降低 SST 读取频率

两者都让 Paimon 成为一个”既能存储、又能流式消费”的数据湖存储——这是 Apache Hudi 的 Incremental Query 和 Apache Iceberg 的 Incremental Read 都无法完全覆盖的能力。

下一篇 05 Lookup Join 与维表——Paimon 在流计算中的实时维表查询 将聚焦 Paimon 作为 Flink Lookup Join 的维表来源,解析 Paimon 如何通过 LSM-Tree 的局部查找能力和 Lookup Cache 机制,实现比 HBase/Redis 更低运维复杂度的实时维表查询。

思考题

  1. Paimon 的 Changelog Producer 将数据湖的写入操作(Upsert)转化为标准的 CDC 变更流(+I、-U、+U、-D),使下游 Flink 作业能够增量消费变更,而不是全量读取快照。这个 Changelog 是”物化的”(写入时就记录变更,存储在专用的 Changelog 文件中)还是”计算的”(查询时通过比较两个快照的差异来推导变更)?两种实现的延迟和存储代价有什么差异?
  2. Full Compaction Changelog Producer 通过在每次 Full Compaction 时对比前后两个版本的 SST 文件来生成 Changelog。这意味着 Changelog 的生成频率等于 Full Compaction 的频率,而不是写入频率。如果 Full Compaction 的间隔是 10 分钟,下游消费 Changelog 的最小延迟就是 10 分钟。如何通过调整 Compaction 策略来降低 Changelog 的端到端延迟?
  3. Paimon 的 Changelog 消费是幂等的吗?如果下游 Flink 作业从 Changelog 消费中途重启,它能从上次消费的位置(Changelog 的 Offset)重新开始,而不会重复处理已经处理过的变更吗?Paimon 的 Changelog Consumer 如何管理消费进度(类似 Kafka 的 Consumer Group Offset)?