摘要:

Apache Paimon 将所有表分为两种基本类型:Primary Key Table(主键表)Append-Only Table(追加表)。这个分类不是表面上的”有没有主键”,而是两套完全不同的存储引擎逻辑——主键表基于 LSM-Tree 实现按主键的 Upsert 语义,追加表则退回到与 Apache Iceberg 类似的不可变文件追加模型。理解为什么 Paimon 要维护这两套模型,以及每种模型在写入路径、文件组织、查询行为和 Compaction 策略上的根本差异,是正确使用 Paimon 的基础。本文还深入解析主键表的 MergeFunction(合并函数) 机制——这是 Paimon 最独特的工程能力之一,允许相同主键的多条记录在 Compaction 时按自定义逻辑合并(如 SUM 聚合、LAST_NON_NULL 填充),从而在存储层实现实时 OLAP 指标的维护,避免大量重复的下游聚合计算。


第 1 章 为什么需要两种表类型

1.1 主键表的诞生背景:CDC Upsert 场景

主键表是 Paimon 的核心设计,直接对应 Apache Flink CDC 场景的需求:

典型场景:MySQL 订单表的 CDC 同步

MySQL 订单表(在线业务):
  INSERT INTO orders ...  ← 新订单
  UPDATE orders SET status='PAID' WHERE order_id=12345  ← 支付
  UPDATE orders SET status='SHIPPED' ...  ← 发货
  UPDATE orders SET status='COMPLETED' ...  ← 完成

Flink CDC 捕获变更 → Kafka 变更事件流:
  {op: '+I', order_id: 12345, status: 'CREATED', amount: 99.9}
  {op: '-U', order_id: 12345, status: 'CREATED', ...}  ← 更新前镜像
  {op: '+U', order_id: 12345, status: 'PAID', ...}      ← 更新后镜像
  {op: '-U', order_id: 12345, status: 'PAID', ...}
  {op: '+U', order_id: 12345, status: 'SHIPPED', ...}
  ...

写入 Paimon 主键表的需求:
  按 order_id 维护订单的最新状态(最新一次 +U 或 +I)
  旧版本记录不保留(Upsert 语义)
  支持查询任意时刻的订单快照(Time Travel)
  支持下游流读取变更事件(Changelog)

这个场景要求存储层:有主键(order_id)、按主键 Upsert(覆盖旧版本)、产生可消费的 Changelog(供下游 Flink 任务继续处理)。LSM-Tree 是自然的选择。

1.2 追加表的诞生背景:事件日志场景

追加表对应完全不同的场景:

典型场景:用户行为事件流的存储

用户行为事件(无主键,只追加):
  {user_id: 1001, action: 'click', page: '/home', ts: 2024-01-01T12:00:00}
  {user_id: 1002, action: 'view',  page: '/item/123', ts: 2024-01-01T12:00:01}
  {user_id: 1001, action: 'add_cart', item: 456, ts: 2024-01-01T12:00:05}
  ...

特点:
  每条记录都是独立事件,没有"更新旧记录"的概念
  数据量巨大(每秒百万级事件)
  查询模式:范围扫描(最近 1 小时的事件)、聚合统计
  不需要 Upsert,不需要 Changelog

这个场景使用 LSM-Tree 的代价:
  LSM-Tree 的 MemTable 需要按主键排序 → 事件流没有主键,排序无意义
  Compaction 需要合并旧版本 → 追加流没有旧版本,Compaction 变成纯文件合并
  → 使用 LSM-Tree 引入了不必要的 CPU 开销(排序 + Compaction 的复杂性)

对于追加场景,Paimon 提供了更简单的追加表模型——直接以 Iceberg/Hudi 风格的不可变文件追加,不需要 LSM-Tree 的开销。

1.3 两种表的核心差异总览

维度Primary Key Table(主键表)Append-Only Table(追加表)
存储模型LSM-Tree(MemTable + 分层 SST)不可变 Parquet/ORC 文件追加
写入语义Upsert(相同 key 覆盖)Append(只追加,不更新)
主键必须定义(NOT ENFORCED)无主键
Compaction必须(维护 LSM 层级 + 去除旧版本)可选(只是文件大小优化)
Changelog支持(Full Compaction / Lookup 模式)不支持(无更新事件)
查询性能需合并多层 SST(读放大)直接扫描不可变文件(无读放大)
Bucket 分区强制(按主键哈希路由)可选(Fixed Bucket 或 Dynamic Bucket)

第 2 章 主键表的写入路径深度解析

2.1 Bucket:主键路由的基本单元

在讨论写入路径之前,需要理解 Bucket(桶) 的概念。Paimon 的主键表中,每个分区(Partition)下进一步细分为若干 Bucket,按主键哈希路由:

表:events,按 DATE(event_ts) 分区,16 个 Bucket

写入 event_id='ev-12345' 的记录:
  1. 计算分区:DATE(2024-01-01T12:00:00) = '2024-01-01'
     → 文件路径前缀:partition=2024-01-01/

  2. 计算 Bucket:hash('ev-12345') % 16 = 7
     → 文件路径前缀:partition=2024-01-01/bucket-7/

  3. 写入 partition=2024-01-01/bucket-7/ 下的 MemTable

关键约束:
  相同主键(event_id)的所有版本,必须在同一个 Bucket 中
  这保证了 Compaction 时不需要跨 Bucket 合并(同一 key 只在一个地方)
  也保证了读取时只需要访问一个 Bucket 的文件(不需要全局扫描)

Bucket 数量的选择

Bucket 数 = 并行度 × 数据均衡系数

太少(如 1-4 个 Bucket):
  → 写入瓶颈集中在少量 Flink Task(每个 Task 负责一个 Bucket)
  → 数据热点(某些 Bucket 的数据量远大于其他)

太多(如 256+ 个 Bucket):
  → 每个 Bucket 的文件很小(SST 文件 < 10MB)
  → 大量小文件在 S3 上的管理开销增大
  → 查询时需要并发打开过多文件

经验法则:
  Bucket 数 ≈ 预期分区数据量(GB)× 8 / 128
  (目标是每个 Bucket 的 Level 2 文件约 128MB)

例:每个日期分区预期 32GB 数据
  Bucket 数 = 32 × 8 / 128 ≈ 2 → 取最近 2 的幂次 = 4

2.2 写入 MemTable 的并发控制

每个 Flink Task 实例负责若干个 Bucket 的写入。Paimon 通过 sequence number(序号) 机制保证同一主键的多个版本有全局排序:

sequence number 的来源:
  Flink 任务级别的单调递增计数器(在 Flink Operator State 中维护)
  每写入一条记录,sequence number +1

写入 MemTable 时的合并逻辑:
  如果 MemTable 中已有相同 key 的记录(相同 Checkpoint 周期内的重复 key):
    → 比较 sequence number
    → 保留 sequence number 更大的(更新的)那条
    → 丢弃旧的(MergeFunction:RetainLatest)

  如果 MemTable 中没有相同 key 的记录:
    → 直接写入(新 key 或来自旧 SST 文件的 key 不在内存中)

这保证了 MemTable 中每个 key 只有一个版本(最新版本)

2.3 主键表的完整写入流程

完整写入流程(Flink 流写主键表):

t=0: Flink 消费 Kafka 事件
  → 对每条记录,计算 Partition + Bucket
  → 路由到对应 Flink Task(按 Bucket 的 KeyBy)

t=0~N: 每条记录写入对应 Bucket 的 MemTable
  → MemTable 中按主键有序(SortBuffer 维护)
  → 相同 key 的新版本覆盖旧版本(in-memory Upsert)

t=N(MemTable 满 256MB):
  → 触发 MemTable Flush
  → 将 MemTable 按主键排序,写出 Level 0 SST 文件
  → 更新 Snapshot 元数据(数据对读者可见,延迟 = MemTable 刷写时间 ≈ 秒级)

t=M(后台 Compaction 触发):
  → 将多个 Level 0 SST 与 Level 1 SST 合并
  → 去除旧版本记录,提升文件层级
  → 更新 Snapshot 元数据

t=Checkpoint(Flink Checkpoint 触发,每 5 分钟):
  → 所有 MemTable 强制 Flush
  → Flink Operator State 持久化(记录 sequence number + 当前 Snapshot ID)
  → Commit:将本次 Checkpoint 内的所有文件变更原子提交到 Snapshot

关系:
  数据可见 ≠ Checkpoint(每次 MemTable Flush 即可发布 Snapshot,无需等 Checkpoint)
  数据持久 = Checkpoint(Checkpoint 后才保证崩溃恢复时不丢)
  Exactly-once = Checkpoint 保证(两次 Checkpoint 之间的数据在崩溃时会重放)

第 3 章 MergeFunction:主键表最独特的工程能力

3.1 MergeFunction 是什么,解决什么问题

MergeFunction(合并函数) 是 Paimon 主键表中,当同一主键存在多条记录时,决定”最终保留哪些字段值”的逻辑。

默认情况下,MergeFunction 是 DeduplicateMerge——相同 key 只保留最新版本(Upsert 语义)。但 Paimon 还提供了多种高级 MergeFunction,允许对不同列应用不同的聚合逻辑:

场景:实时 UV(独立访客数)统计表

设计:
  primary key = (date, page_id)  ← 按日期和页面统计
  visit_count: BIGINT            ← 每次有访问就 +1(累加,不是覆盖)
  last_visit_ts: TIMESTAMP       ← 最后一次访问时间(取最大值)
  page_name: STRING              ← 页面名称(取最新值,防止 NULL 覆盖)

如果用 DeduplicateMerge(默认):
  {date=2024-01-01, page_id=100, visit_count=1, last_visit_ts=12:00, page_name='首页'}
  {date=2024-01-01, page_id=100, visit_count=1, last_visit_ts=12:01, page_name=NULL}
  → 结果:{visit_count=1, last_visit_ts=12:01, page_name=NULL}  ← 新记录覆盖旧记录,丢失 page_name!

如果用 PartialUpdateMerge:
  → 结果:{visit_count=1, last_visit_ts=12:01, page_name='首页'}  ← 只覆盖非 NULL 字段
  还是错的!visit_count 应该累加,而不是取最新值(最新值=1,不是总计数)

如果用 AggregateMerge(不同列不同聚合函数):
  visit_count: SUM           ← 累加
  last_visit_ts: MAX         ← 取最大值
  page_name: LAST_NON_NULL   ← 取最新的非 NULL 值
  → 结果:{visit_count=2, last_visit_ts=12:01, page_name='首页'}  ← 正确!

3.2 四种内置 MergeFunction

MergeFunction 1:DeduplicateMerge(去重,默认)

-- 相同 key 只保留最新版本(sequence number 最大的)
CREATE TABLE orders (
    order_id BIGINT,
    status STRING,
    updated_at TIMESTAMP(3),
    PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
    'merge-engine' = 'deduplicate'  -- 默认值,可省略
);
 
-- 行为:
--   INSERT {order_id=1, status='CREATED', updated_at=10:00}
--   INSERT {order_id=1, status='PAID',    updated_at=10:01}
--   → 最终保留:{order_id=1, status='PAID', updated_at=10:01}

MergeFunction 2:PartialUpdateMerge(部分更新)

-- 只更新非 NULL 字段(允许多个上游流分别更新不同字段)
CREATE TABLE user_profile (
    user_id BIGINT,
    name STRING,             -- 用户基础信息服务更新
    avatar_url STRING,       -- 用户头像服务更新
    vip_level INT,           -- 会员服务更新
    PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
    'merge-engine' = 'partial-update'
);
 
-- 场景:三个上游流分别更新用户资料的不同字段
-- 流 A(基础信息):INSERT {user_id=1, name='张三', avatar_url=NULL, vip_level=NULL}
-- 流 B(头像):    INSERT {user_id=1, name=NULL, avatar_url='/img/123.jpg', vip_level=NULL}
-- 流 C(会员):    INSERT {user_id=1, name=NULL, avatar_url=NULL, vip_level=2}
-- → Paimon 合并结果:{user_id=1, name='张三', avatar_url='/img/123.jpg', vip_level=2}
 
-- 这是典型的"宽表拼接"场景:
-- 多个维度的信息从不同数据源流入,最终汇聚到一张宽表

Partial Update 的典型应用:实时宽表

在 Flink 流计算中构建实时宽表(如 DWD 层的用户行为宽表)时,传统做法需要多路流 JOIN(Flink 的 Regular Join 或 Temporal Join)。这些 JOIN 在 Flink State 中维护双边数据,State 大小随数据量增长。 Paimon 的 Partial Update 提供了另一种思路:各路流独立写入宽表的对应字段(其他字段传 NULL),Paimon 在存储层完成字段拼接——这避免了 Flink 大状态 JOIN 的内存开销,本质上把 JOIN 下推到了存储层。

MergeFunction 3:AggregateMerge(聚合合并)

-- 为不同字段指定不同的聚合函数
CREATE TABLE page_stats (
    date_str  STRING,
    page_id   BIGINT,
    pv        BIGINT,   -- Page View(累加)
    uv        BIGINT,   -- Unique Visitor(用 HyperLogLog 近似计数)
    avg_stay  DOUBLE,   -- 平均停留时间(加权平均)
    PRIMARY KEY (date_str, page_id) NOT ENFORCED
) WITH (
    'merge-engine' = 'aggregation',
    'fields.pv.aggregate-function'      = 'sum',
    'fields.uv.aggregate-function'      = 'hll_union',  -- HyperLogLog 合并
    'fields.avg_stay.aggregate-function' = 'sum'        -- 分子累加(配合分母字段)
);
 
-- 内置聚合函数:
--   sum       → 求和(BIGINT/DOUBLE/DECIMAL)
--   min/max   → 最小值/最大值
--   count     → 计数
--   last_value → 最新值(相当于 DeduplicateMerge 的单字段版本)
--   last_non_null_value → 最新的非 NULL 值(适合 Partial Update 场景)
--   hll_union  → HyperLogLog 合并(UV 近似计数)
--   listagg    → 字符串聚合

MergeFunction 4:FirstRowMerge(首次写入保留)

-- 相同 key 只保留第一次写入的记录(不允许后续更新)
CREATE TABLE order_creation_log (
    order_id BIGINT,
    created_at TIMESTAMP(3),
    original_amount DECIMAL(10,2),  -- 原始下单金额,不允许更新
    PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
    'merge-engine' = 'first-row'
);
 
-- 行为:只保留第一次插入的值,后续写入同一 order_id 的记录被忽略
-- 适合:幂等写入场景(保证只写一次)

3.3 MergeFunction 的执行时机

MergeFunction 在以下两个时机执行:

时机 1:MemTable Flush(内存合并)
  写入 MemTable 时,如果检测到相同 key,立即合并
  → 减少 SST 文件中的重复记录
  → 减少后续 Compaction 的合并工作量

时机 2:Compaction(磁盘合并)
  多个 SST 文件中相同 key 的记录,在 Compaction 时再次合并
  → 最终只保留合并后的结果
  → 这是 MergeFunction 的"最终结果"(MemTable 内的合并是临时结果)

注意:查询时也可能触发合并(如果 Compaction 不及时)
  → 多层 SST 中相同 key 的记录,在查询读取时在内存中合并
  → 这是读放大的来源,也是及时 Compaction 重要的原因

第 4 章 追加表的设计与适用场景

4.1 追加表的存储模型

追加表(Append-Only Table)完全绕过 LSM-Tree,直接以不可变文件的方式追加数据:

追加表写入流程:
  Flink 消费 Kafka 事件流
  → 数据直接写入 Parquet/ORC 文件(不经过 MemTable)
  → 文件写满(达到 target-file-size)后关闭
  → Flink Checkpoint 时,将本次写入的文件提交到 Snapshot
  → 数据对读者可见

文件结构(无 Bucket 目录):
partition=2024-01-01/
  data-00001.orc   ← 第一个文件(128MB)
  data-00002.orc   ← 第二个文件
  ...

注意:追加表也支持 Bucket 模式(Fixed Bucket),通过配置 bucketbucket-key 将数据按某列哈希分布到多个 Bucket,提升特定查询的文件扫描效率(类似 Iceberg 的 bucket 分区变换)。

4.2 追加表的 Compact 策略

追加表的 Compact 目标是解决小文件问题(而不是去除旧版本):

-- 追加表配置(带小文件合并)
CREATE TABLE user_events (
    user_id   BIGINT,
    event_type STRING,
    event_ts  TIMESTAMP(3),
    payload   STRING
) PARTITIONED BY (DATE_FORMAT(event_ts, 'yyyy-MM-dd'))
WITH (
    'bucket'                  = '-1',          -- 动态 Bucket(不固定分桶数)
    'target-file-size'        = '128 mb',      -- 目标文件大小
    'compaction.min-file-num' = '5',           -- 至少 5 个文件才触发合并
    'compaction.max-file-num' = '50',          -- 超过 50 个文件强制合并
    'write-only'              = 'false'        -- 允许后台 Compaction
);

4.3 追加表的三种使用场景

场景 1:日志/事件流存储

Flink 写入 Kafka 用户行为事件到 Paimon 追加表
→ 用 Trino/Presto 查询(类似 Iceberg 的 OLAP 查询场景)
→ 无需 Upsert,无需 Changelog
→ 追加表是最轻量的选择

场景 2:实时数仓的 ODS 层(原始数据层)

MySQL Binlog → Flink CDC → Paimon 追加表(保留 +I/-U/+U/-D 等原始事件)
→ 下游加工层通过 Flink 流读这张表(消费原始变更事件)
→ 追加表保留完整的原始变更历史(不合并,不去重)
→ 与主键表的区别:主键表只保留最新版本,追加表保留所有版本历史

场景 3:时序数据存储

IoT 设备传感器数据(每秒每个设备一条):
  {device_id: 'D001', temperature: 23.5, ts: 2024-01-01T12:00:00}
  {device_id: 'D001', temperature: 23.7, ts: 2024-01-01T12:00:01}

特点:每条记录都是独立时间点的读数,不存在"更新"
→ 追加表是正确选择(主键表的 Upsert 会错误地覆盖历史读数)

第 5 章 主键表与追加表的选型决策

5.1 决策树

Question 1:数据有没有"更新旧记录"的语义?
  是 → 主键表
      Question 2:需要多字段分别聚合(如 SUM、MAX)吗?
        是 → 主键表 + AggregateMerge
        否:Question 3:需要多个上游分别更新不同字段(宽表拼接)吗?
          是 → 主键表 + PartialUpdateMerge
          否 → 主键表 + DeduplicateMerge(默认 Upsert)
  否 → 追加表
      Question 4:需要按某列分桶(提升特定查询效率)吗?
        是 → 追加表 + Fixed Bucket
        否 → 追加表(无 Bucket,最简单)

5.2 混合使用模式(实时数仓的典型架构)

在实际的实时数仓中,主键表和追加表往往混合使用:

典型三层架构:

ODS 层(原始数据):追加表
  → MySQL CDC 的原始 Binlog 事件,保留 +I/-U/+U/-D 所有事件
  → 数据完整性优先,不压缩不聚合

DWD 层(明细数据):主键表(DeduplicateMerge)
  → 从 ODS 层读取变更事件,按主键合并为最新状态
  → 每条业务记录(如订单、用户)只保留最新版本
  → 支持下游 Lookup Join 作为维表

DWS 层(汇总数据):主键表(AggregateMerge)
  → 从 DWD 层读取变更事件,实时维护聚合指标
  → 如:按 (date, user_id) 统计 pv、uv、消费总额
  → 避免每次查询时的全表聚合

小结

Paimon 的双表模型体现了清晰的工程哲学:

  • 主键表(LSM-Tree):为高频 Upsert 和 CDC 场景设计,MergeFunction 是核心竞争力,允许存储层内置 OLAP 聚合逻辑(SUM、MAX、HyperLogLog),避免重复的下游计算
  • 追加表(不可变文件):为日志/事件流和时序数据设计,无写放大,无读放大,代价最低

选型的核心问题是:这张表里的数据有没有”同一业务实体的多个版本”?有则主键表,无则追加表。

下一篇 04 Changelog Producer——CDC 语义的流式生产与消费 将聚焦 Paimon 最独特的流处理能力:如何从存储层直接生产可被 Flink 消费的 Changelog(变更日志),以及 Full Compaction Producer 和 Lookup Changelog Producer 两种模式在延迟、准确性和资源消耗上的取舍。

思考题

  1. Paimon 的主键表(Primary Key Table)强制要求每条记录有唯一主键,支持 Upsert/Delete 语义;追加表(Append-only Table)不要求主键,只支持追加写入,类似 Kafka 的日志语义。在同一个数据管道中,CDC 同步表(需要处理 UPDATE/DELETE)使用主键表,而事件日志表(只追加)使用追加表——两者在同一个 Flink 作业中并存时,有没有什么资源管理上的冲突?
  2. 主键表在查询时需要合并多层 LSM 文件(Compaction 前有读放大),而追加表不需要合并(每个文件是独立的、不重叠的数据段)。对于读多写少的 OLAP 分析场景(如每天批量查询历史数据),主键表的读放大是否会导致比追加表显著更高的查询延迟?如何通过 Compaction 策略使主键表在批量读取时的性能接近追加表?
  3. Paimon 的追加表支持”局部有序”(bucket-key 指定桶内排序键),允许在追加写入的同时,保证同一 Bucket 内的数据按指定键有序。这个特性在什么查询场景下能显著提升性能(如范围查询、Join)?与主键表的全局有序相比,追加表的局部有序在存储和查询上的权衡是什么?