// Delta Lake 源码中的版本提交逻辑(简化)// org.apache.spark.sql.delta.storage.LogStoredef write(path: Path, actions: Seq[String]): Unit = { // 尝试原子写入,如果文件已存在则抛出 FileAlreadyExistsException if (fs.exists(path)) throw new FileAlreadyExistsException(path.toString) // 写入内容 val stream = fs.create(path, false) // false = 不覆盖 actions.foreach(a => stream.write((a + "\n").getBytes("UTF-8"))) stream.close()}
2.3 提交冲突检测的核心逻辑
假设事务 A 在 readVersion=10 时开始,计算完成后准备提交 version=11,但此时发现 version=11 已经被事务 B 提交了。事务 A 需要判断:事务 B 的提交与我的操作是否冲突?
不冲突的情况(可以将 readVersion 提升为 11,继续尝试提交 version=12):
场景:事务 A 是 Append-only 写入(isBlindAppend=true)
→ 我的写入不依赖任何现有数据,也不修改任何现有数据
→ 事务 B 写了什么我不关心
→ 直接将 readVersion 更新为 11,尝试提交 version=12
场景:事务 B 修改了分区 partition=2026-01,我的操作只涉及 partition=2026-02
→ 事务 B 的变更与我的操作作用于不同的数据分区
→ 不冲突,继续尝试提交
冲突的情况(事务 A 必须重试,从头重新读取数据并重新计算):
场景:事务 A 是 UPDATE,readVersion=10 时读取了文件 F1 并计划 remove F1 + add F1'
事务 B(version=11)也 remove 了文件 F1(例如 B 也更新了 F1 中的数据)
→ 事务 A 的 remove F1 与事务 B 的 remove F1 冲突(F1 已被 B 删除,A 无法再删除)
→ 事务 A 必须从 readVersion=11 重新读取数据,重新计算 UPDATE
场景:事务 A 执行 MERGE(读了部分文件判断是否需要更新)
事务 B 向这些文件所在的分区追加了新数据
→ 事务 A 读取时没有看到 B 追加的数据,A 的 MERGE 结果可能是不完整的
→ 冲突(取决于隔离级别:WriteSerializable 下不冲突,Serializable 下冲突)
冲突检测算法(简化版):
事务 A 的冲突检测输入:
- A 在 readVersion=10 时读取的文件集合(readFiles)
- A 计划 remove 的文件集合(filesToRemove)
- 事务 B(version=11)的 add 和 remove 集合
冲突条件:
1. A 的 filesToRemove 与 B 的 filesToRemove 有交集
(A 和 B 都想删除同一个文件)
2. B 的 add 的文件覆盖了 A 读取的数据范围(且 A 不是 isBlindAppend)
(B 在 A 读取之后修改了 A 读取的数据)
第 3 章 并发操作的冲突矩阵
3.1 哪些并发操作不冲突
理解哪些操作可以安全并发,对设计高吞吐的 Delta Lake 写入管道至关重要:
操作 A
操作 B
是否冲突
原因
INSERT(Append)
INSERT(Append)
不冲突
双方都是 isBlindAppend,互不干扰
INSERT(新分区)
INSERT(不同分区)
不冲突
作用于不同分区文件
INSERT(Append)
UPDATE
不冲突(WriteSerializable)
A 是 Append,不读取现有数据,B 的修改不影响 A
INSERT(Append)
DELETE
不冲突(WriteSerializable)
同上
OPTIMIZE
INSERT
不冲突
OPTIMIZE 的文件替换(dataChange=false)不与新数据冲突
SELECT(读)
任何写
永不冲突
MVCC:读操作固定在某个 Snapshot 版本
3.2 哪些并发操作会冲突
操作 A
操作 B
是否冲突
原因
UPDATE(分区 P1)
UPDATE(分区 P1)
冲突
双方都要删除和重写 P1 的文件
DELETE(全表扫描)
INSERT(Append)
冲突(Serializable)
A 读取了所有文件,B 在 A 读取后增加了新文件(A 未看到)
MERGE
INSERT(同分区)
冲突(Serializable)
MERGE 读取了目标分区,B 在之后追加了新数据到该分区
MERGE
UPDATE(同文件)
冲突
双方都要修改同一个文件
3.3 隔离级别对冲突判断的影响
WriteSerializable(默认):只保证写操作之间的串行化,读操作不参与冲突检测。
WriteSerializable 下,以下不冲突:
- 事务 A(MERGE)读取了文件 F1,事务 B(INSERT Append)之后向 F1 所在分区追加了数据
- 原因:A 是写操作,B 也是写操作,但 B 的 INSERT 是 isBlindAppend,
两者的写入目标文件没有交集
Serializable 下,以下冲突(更保守):
- 同样场景中,A 在 readVersion=10 时读取了文件 F1
- B 在 version=11 时向 F1 所在分区追加了新数据
- A 的 MERGE 结果可能不包含 B 追加的数据(A 读取时没有看到 B 的数据)
- 为了保证串行等价性,A 必须重试,基于 version=11 重新执行 MERGE