10 生产容错调优手册:从告警到根因的系统性诊断
摘要
前九篇系统讲解了 Spark 容错体系的每一个核心机制。本篇是整个专栏的”实战落地”——将所有知识点聚合为一份可以在生产值班时随时查阅的诊断手册。覆盖批处理与流处理中最高频的十类容错问题:Task 反复失败触发作业超时、FetchFailedException 引发的 Stage 重试风暴、Driver OOM 导致的批处理作业反复重跑、Structured Streaming Epoch 积压、State Store OOM、Watermark 停滞导致状态无限膨胀、RocksDB State Store 本地磁盘耗尽、ForeachBatch Sink 重复写入、Checkpoint 目录损坏无法恢复、以及推测执行在数据倾斜场景下反而加剧问题。每类问题给出:告警信号(如何第一时间发现)、根因分析(从第一性原理推导)、即时止血措施、以及长期调优策略,并在文末附全参数速查表。
第 1 章 批处理:Task 反复失败触发作业超时
1.1 症状与告警信号
Spark UI 表现:
- Jobs 页面出现大量
FAILEDStage(橙色) - Stages 页面某些 Task 的
Attempt列显示 3、4(接近或达到maxFailures) - 作业运行时间远超历史平均值
日志关键字:
WARN TaskSetManager: Lost task X.N in stage Y.Z ...
WARN TaskSetManager: Stage X contains a task of very large size (... KB)
ERROR TaskSetManager: Task X in stage Y.Z failed 4 times; aborting job
监控指标:executor.failedTasks 持续高于正常水位;作业 SLA(完成时间)告警触发。
1.2 根因分类
根因一:特定节点硬件问题(磁盘坏道、内存 ECC 错误)
表现:同一个节点上有大量 Task 失败,Failed Tasks 计数集中在某一个 Executor 上。
诊断步骤:
- 在 Spark UI → Executors 页面,找到
Failed Tasks计数最高的 Executor - 记录该 Executor 的 host 名,SSH 到该节点检查系统日志(
dmesg | grep -i error、/var/log/messages) - 检查磁盘健康状态(
smartctl -a /dev/sda)
根因二:Task 数据量过大(Spill 到磁盘导致超时)
表现:失败日志中有 TaskKilledException: task killed due to stage cancellation 或 Task 运行时间异常长(P99 远大于中位数)。
诊断:Spark UI → Stage 详情 → Task Metrics,查看 Shuffle Spill (Disk) 是否远高于 Shuffle Spill (Memory),说明内存不足引发大量 Spill,导致 Task 极慢甚至超时。
根因三:用户代码抛出非预期异常
表现:Task 失败日志中有明确的用户代码堆栈(如 NullPointerException at com.company.MyUDF:42)。
诊断:查看完整的 Task 失败 stderr 日志(Spark UI → Stage → 某个 FAILED Task → Logs → stderr)。
1.3 调优策略
| 根因 | 即时止血 | 长期策略 |
|---|---|---|
| 硬件问题节点 | 手动将问题节点从集群摘除或加入黑名单(spark.blacklist.enabled=true) | 修复硬件或永久下线节点 |
| Spill 过多 | 增加 Executor 内存(--executor-memory);减少单 Task 数据量(增大分区数 repartition(N)) | 开启 Tungsten 堆外内存;优化数据倾斜 |
| 用户代码异常 | 在用户代码中增加异常处理(try-catch);过滤问题数据 | 修复代码 Bug;增加数据质量检查 |
第 2 章 批处理:FetchFailedException 引发的 Stage 重试风暴
2.1 症状与告警信号
日志关键字:
WARN TaskSetManager: Lost task X.0 in stage Y.0 ... FetchFailed(...)
INFO DAGScheduler: Resubmitting ShuffleMapStage Z ... due to fetch failure
WARN DAGScheduler: Marking Stage Y as failed due to a fetch failure from Stage Z
Spark UI 表现:
- 某个 Stage 在 Completed 状态后又重新变为 RUNNING(Stage 回滚)
- 同一个 Stage 出现多次 Attempt(如 Stage 3 Attempt 0、Attempt 1、Attempt 2…)
- 作业整体进度”倒退”(已完成的 Stage 重新执行)
2.2 根因分类
根因一:Map 节点宕机(Shuffle 文件物理消失)
最常见的真正原因。Map Stage 完成后节点宕机,Shuffle 文件随节点不可访问,导致 Reduce Task 的 Fetch 失败。
根因二:Executor 内存压力导致 GC 暂停过长
Map 节点的 Executor 正处于 Full GC,所有线程(包括 Netty 的 Shuffle 服务线程)暂停。Reduce 端 Fetch 超时(spark.shuffle.io.retryWait × maxRetries),引发 FetchFailedException。GC 结束后节点恢复,但 Fetch 已被标记为失败。
诊断:检查 Map 端 Executor 的 GC 日志(-verbose:gc 开启),查看是否有 Full GC 停顿超过 spark.shuffle.io.retryWait(默认 5 秒)。
根因三:网络抖动(Rack 间带宽竞争)
大规模 Shuffle 期间,Rack 间网络带宽打满,个别 Fetch 请求超时。
2.3 调优策略
即时止血:
- 调大 Fetch 重试次数和等待时间,给网络抖动更多容忍空间:
spark.shuffle.io.maxRetries=10 spark.shuffle.io.retryWait=30s - 调大 Stage 重试上限(应对节点批量宕机):
spark.stage.maxConsecutiveAttempts=10
长期策略:
- 部署 External Shuffle Service(ESS):Shuffle 文件由 ESS 服务进程(不随 Executor 生命周期)管理,Executor OOM 或 GC 不影响 Shuffle 数据可用性
spark.shuffle.service.enabled=true - 在关键 Shuffle 前做 RDD Checkpoint,避免 Stage 回滚代价过高
- 考虑迁移到 Apache Celeborn(RSS),彻底解耦 Shuffle 存储与计算节点
第 3 章 批处理:Driver OOM 导致作业反复失败
3.1 症状与告警信号
日志关键字:
java.lang.OutOfMemoryError: Java heap space
at org.apache.spark.scheduler.DAGScheduler ...
# 或 YARN 日志
Container killed by YARN for exceeding memory limits.
XX MB of XX MB physical memory used.
YARN 监控:ApplicationMaster Container 状态频繁在 RUNNING → FAILED → RUNNING 之间切换。
3.2 根因分类
根因一:Broadcast 变量过大
sc.broadcast(data) 将整个数据集缓存到 Driver 内存,如果 data 过大(几十 GB),Driver 堆直接 OOM。
诊断:检查代码中所有 broadcast 调用,查看被广播对象的大小(data.size 或序列化后的字节数)。
根因二:超长 Lineage 链导致 Driver GC 压力过大
迭代算法(如 100 次迭代的 ML 训练)中,每次迭代产生新的 RDD 对象,但旧 RDD 对象因为被 Lineage 引用(dependencies)而无法被 GC 回收。Driver 内存逐渐被大量 RDD 对象填满。
诊断:通过 Driver 的 JVM Heap Dump(jmap -dump:format=b,file=dump.hprof <pid>),分析对象占用(mat/jhat),看是否有大量 RDD 对象未被回收。
根因三:collectToDriver 类操作收集了过大的结果集
rdd.collect()、df.toPandas()、df.show(largeN) 等操作将 Executor 端的数据全量传输到 Driver,如果数据量超过 Driver 堆大小,OOM。
3.3 调优策略
| 根因 | 即时止血 | 长期策略 |
|---|---|---|
| Broadcast 过大 | 减小 Broadcast 数据(只 broadcast 需要的列/行);改用 Sort Merge Join | 审查所有 broadcast 调用,设置大小上限 spark.sql.autoBroadcastJoinThreshold |
| Lineage 过长 | 在迭代算法中每 10-20 次迭代做一次 checkpoint() + persist() | 在所有迭代 ML 算法中内置 Checkpoint 逻辑 |
| collect 过大 | 增加 Driver 内存(--driver-memory);改用 take(N) 限制结果量 | 改为 write 直接写出,不 collect 到 Driver |
第 4 章 Structured Streaming:Epoch 积压(Processing Lag 持续增长)
4.1 症状与告警信号
监控指标:batchDuration 图中每个 Epoch 的处理时间持续 > trigger 间隔(如 trigger = 10s 但 batchDuration 稳定在 30s)。
日志关键字:
INFO MicroBatchExecution: Streaming query made progress: {
"batchId" : 1234,
"batchDuration" : 35000, // 35 秒,远超 trigger 间隔 10 秒
"durationMs" : { "triggerExecution" : 34800, "queryPlanning" : 200 }
}
现象:Kafka 端 Consumer Group Lag 持续增长(数据生产速率 > 消费速率)。
4.2 根因分类
根因一:状态数据量过大,State Store 操作成为瓶颈
State Store 的 get/put 操作占用了大量 Epoch 时间(HDFSBackedStateStore 的内存 HashMap 过大,GC 频繁;或 RocksDB StateStore 的 SST 文件层数过多,读放大严重)。
诊断:查看 Spark UI → Structured Streaming → State Operators,找 numRowsTotal 是否异常大;查看 Executor GC 时间(在 Stages 页面的 Task Metrics 中)。
根因二:数据倾斜(某些分区的数据量远大于其他分区)
整个 Epoch 的完成时间取决于最慢的那个 Task,如果某个 Kafka 分区的数据量是其他分区的 10 倍,对应的 Task 会成为瓶颈。
诊断:Spark UI → Stage → Task 时间分布,查看 Max / Median 比值是否 > 3。
根因三:下游 Sink 写出变慢(如 Delta Lake compaction 占用 I/O)
ForeachBatch 中的写出操作(如写 Delta Lake)因为触发了 Compaction 或其他后台任务而变慢。
4.3 调优策略
| 根因 | 即时止血 | 长期策略 |
|---|---|---|
| State Store 太大 | 配置 Watermark + TTL 清理过期状态;切换 RocksDB State Store | 审查有状态算子是否有 TTL,防止状态无限增长 |
| 数据倾斜 | 增加 Kafka 分区数(分散数据);在 foreachBatch 中使用 repartition 重新分区 | 在 Source 侧调整分区策略 |
| Sink 变慢 | 增大 Delta Lake Compaction 间隔;与写出路径解耦(异步写出) | 优化 Sink 实现,避免同步 Compaction 阻塞流处理 |
第 5 章 Structured Streaming:State Store OOM
5.1 症状与告警信号
日志关键字:
java.lang.OutOfMemoryError: Java heap space
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStore.put
# 或 GC overhead limit
java.lang.OutOfMemoryError: GC overhead limit exceeded
Spark UI:State Operators 的 memoryUsedBytes 持续增长,接近或超过 Executor 堆大小。
5.2 根因分类
根因一:有状态算子没有配置 Watermark / TTL
dropDuplicates("id")、groupBy(...).count()(Update 模式)没有 Watermark,状态永远不清理。
诊断:查看 numRowsRemoved(Spark UI State Operators)是否始终为 0。
根因二:Watermark threshold 设置过大
Watermark threshold = 7 天,意味着 State Store 中保留 7 天内所有 key 的状态。对于高基数 key,7 天的状态量可能极大。
根因三:自定义 GroupState 超时回调中忘记调用 state.remove()
状态超时被触发(hasTimedOut = true),用户回调中输出了结果,但忘记调用 state.remove(),状态仍然保留在 State Store 中。
5.3 调优策略
即时止血:
# 增加 Executor 内存(治标)
spark.executor.memory=16g
# 切换 RocksDB State Store(彻底解决堆限制问题)
spark.sql.streaming.stateStore.providerClass=\
org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider长期策略:
- 为所有有状态算子配置合理的 Watermark threshold(根据业务数据的最大延迟设置,通常 P99 延迟 × 2)
- 检查所有
mapGroupsWithState/flatMapGroupsWithState的超时回调,确保state.remove()被调用 - 在 Spark UI 的 State Operators 面板中设置
numRowsTotal的告警阈值
第 6 章 Structured Streaming:Watermark 停滞导致状态无限膨胀
6.1 症状与告警信号
Spark UI 表现:Streaming Statistics 页面的 Watermark 曲线长时间水平(不前进);numRowsRemoved 持续为 0。
业务影响:流聚合窗口长时间没有输出(窗口等待 Watermark 超过窗口结束时间才输出);State Store 持续膨胀。
6.2 根因分类
根因一:某个 Source 长期无数据(多 Source Join 场景)
两条 Kafka 流 Join 时,一条流断流(如上游服务故障导致某个 Topic 没有新数据),该流的 Watermark 停滞,全局 Watermark(取 min)也停滞。
诊断:查看各 Source 的 numInputRows,找到为 0 的 Source。
根因二:数据全是极度迟到的历史数据
进行历史数据 Backfill 时,导入的历史数据的事件时间远小于”现在”,但系统已经处理了大量近期数据,Watermark 已经很高。历史数据全部被视为”迟到”,不更新 Watermark。
根因三:Kafka 分区中有长时间不活跃的 Partition
某个 Kafka Partition 长时间没有新消息(生产者停止向该 Partition 写入),但 Spark 仍将该 Partition 的 Watermark 视为当前值(该 Partition 没有新消息 = 没有新的事件时间 = Watermark 不更新)。
6.3 调优策略
根因一(多 Source Watermark 取 min):
# 改为取最大值(接受部分迟到数据被丢弃,但 Watermark 不会因一个流断流而停滞)
spark.sql.streaming.multipleWatermarkPolicy=max根因二(历史数据 Backfill):历史数据回放期间,暂时关闭 Watermark 相关的状态清理,或用独立的流处理作业处理历史数据,不与实时流共享状态。
根因三(Kafka Partition 不活跃):在 Kafka Source 配置中设置”空闲 Partition Watermark 推进”策略(Spark 3.1+):
df.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "...")
.option("subscribe", "my-topic")
// 空闲分区推进 Watermark:超过 5 分钟无数据的分区,Watermark 继续推进
.option("idleTimeoutMs", "300000")
.load()第 7 章 Structured Streaming:RocksDB State Store 本地磁盘耗尽
7.1 症状与告警信号
日志关键字:
java.io.IOException: No space left on device
at org.rocksdb.RocksDB.put(...)
# 或 K8s 上
OOMKilled: Container killed due to memory limits (if using tmpfs for local dir)
系统监控:spark.local.dir 所在磁盘的使用率持续上涨至 90%+。
7.2 根因分类
根因一:状态数据量持续增长,没有 TTL
RocksDB 的 SST 文件随状态增长持续增大,最终耗尽磁盘。
根因二:RocksDB Compaction 的空间放大
LSM Tree 的空间放大(旧版本 key 在 Compaction 前占用额外空间)导致实际占用磁盘 = 有效数据量 × 1.5-2 倍。
根因三:Changelog 和 Snapshot 文件在 HDFS 上积累,但本地临时文件未清理
RocksDB 在生成 Snapshot 期间,HDFS 上传过程中的临时文件未及时清理,占用本地磁盘。
7.3 调优策略
即时止血:
# 检查 RocksDB 本地目录大小
du -sh /mnt/nvme/rocksdb-state/*
# 手动清理旧的 SST 文件(谨慎操作,仅清理可以通过 Checkpoint 恢复的旧版本)
# 最好是重启受影响的 Executor,让 RocksDB 从 HDFS 重新下载 Checkpoint长期策略:
- 配置状态 TTL(Watermark 或 GroupState TTL),控制状态规模
- 为 RocksDB 配置独立的大容量 NVMe 磁盘,并设置磁盘使用率监控告警(阈值 70%)
- 调整
minVersionsToRetain减少本地保留的历史版本数:spark.sql.streaming.stateStore.minVersionsToRetain=1
第 8 章 Structured Streaming:ForeachBatch Sink 重复写入
8.1 症状与告警信号
业务现象:下游数据表出现重复行;聚合指标值偏高(如 count 值是预期的 2 倍)。
Spark 日志:无明显错误(Exactly-once 保证失效通常是”静默”的——既不报错,也不告警,只是数据不对)。
8.2 根因分析
ForeachBatch Sink 重复写入的根本原因是:崩溃发生在”数据写出成功”但”Commit Log 写入前”,重启后重做该 Epoch,数据被再次写出(第 05 篇详解)。
具体到代码层面,常见的错误模式:
错误一:直接 JDBC INSERT(无幂等性)
// 问题:崩溃重启后会重复 INSERT
query.writeStream.foreachBatch { (df, epochId) =>
df.write.mode("append").jdbc(url, "table", props)
}.start()错误二:先删后插的”删”和”插”不在同一事务中
// 问题:如果在 DELETE 完成后、INSERT 开始前崩溃,数据会短暂丢失(然后重跑才恢复)
// 如果 INSERT 完成后才崩溃,重跑时 DELETE 会删掉已有的正确数据,再 INSERT 一遍
// 实际上这种写法是正确的(最终结果幂等),但 DELETE+INSERT 之间必须在同一事务
query.writeStream.foreachBatch { (df, epochId) =>
val conn = getConnection()
conn.setAutoCommit(false)
conn.createStatement().execute(s"DELETE FROM t WHERE epoch_id = $epochId")
df.write.jdbc(url, "t", props) // 注意:这里的 jdbc write 使用的是另一个连接!事务不共享!
conn.commit() // 这个 commit 只提交了 DELETE,INSERT 在不同连接中自动提交了
}.start()8.3 修复方案
推荐:使用 Delta Lake 作为 Sink(最简洁):
query.writeStream
.format("delta")
.option("checkpointLocation", "hdfs://path/ckpt")
.start("hdfs://path/delta-table")
// Delta Lake 自动通过 (queryId, epochId) 保证幂等JDBC Sink 的正确幂等写法(单连接事务):
query.writeStream.foreachBatch { (df, epochId) =>
// 1. 将数据写到临时 DataFrame
df.persist()
// 2. 使用单个连接,在同一事务中完成 DELETE + INSERT
val rows = df.collect() // 注意:只在小数据量时适用
val conn = DriverManager.getConnection(url, user, password)
try {
conn.setAutoCommit(false)
// 检查幂等:该 epoch 是否已写过
val checkStmt = conn.prepareStatement("SELECT 1 FROM epoch_log WHERE epoch_id=?")
checkStmt.setLong(1, epochId)
val rs = checkStmt.executeQuery()
if (!rs.next()) {
// 写数据
val insertStmt = conn.prepareStatement("INSERT INTO target_table VALUES (?,?,?)")
rows.foreach { row =>
insertStmt.setString(1, row.getString(0))
// ... 设置其他字段
insertStmt.addBatch()
}
insertStmt.executeBatch()
// 记录 epoch 完成
val logStmt = conn.prepareStatement("INSERT INTO epoch_log(epoch_id) VALUES (?)")
logStmt.setLong(1, epochId)
logStmt.execute()
conn.commit()
}
} catch {
case e: Exception => conn.rollback(); throw e
} finally {
conn.close()
df.unpersist()
}
}.start()第 9 章 Checkpoint 目录损坏无法恢复
9.1 症状与告警信号
日志关键字:
org.apache.spark.sql.streaming.StreamingQueryException:
Failed to read streaming state store. Cannot start query from checkpoint location...
# 或
ERROR MicroBatchExecution: Query [id=abc] terminated with error
Caused by: java.io.IOException: No FileSystem for scheme: hdfs
# 或 Schema 不兼容
AnalysisException: Detected incompatible evolution in streaming query plan.
Checkpoint schema: ... Current schema: ...
9.2 根因分类
| 根因 | 症状 | 处理方式 |
|---|---|---|
| HDFS 目录被误删 | offsets/ 或 commits/ 目录不存在 | 从备份恢复(如果有);否则删除整个 Checkpoint,从最新 offset 重新开始 |
| 部分 Checkpoint 文件损坏 | 读取某个 offsets/N 文件时 JSON 解析失败 | 手动删除损坏的文件,退回到上一个完整 Epoch |
| Schema 变更不兼容 | State Store 中存储的 Schema 与新代码期望的 Schema 不匹配 | 删除整个 Checkpoint,从最新 offset 重新开始(接受状态丢失) |
| Spark 版本升级导致不兼容 | 升级 Spark 大版本后 Checkpoint 格式变化 | 在升级前先停流、备份 Checkpoint;升级后若不兼容则重建 Checkpoint |
9.3 生产最佳实践
防患于未然:
- 定期备份 Checkpoint 目录到另一个 HDFS 路径或对象存储(每日备份,保留 7 天)
- 在 Checkpoint 目录设置 HDFS ACL,防止误删(只有流处理服务账号有写权限,人工账号只读)
- Spark 大版本升级前,先用新版本测试 Checkpoint 兼容性(在 Staging 环境),不兼容时提前规划 Checkpoint 重建窗口
重建 Checkpoint 的标准操作流程:
# 1. 停止流处理作业
spark-submit --kill <application-id>
# 2. 备份旧 Checkpoint
hdfs dfs -cp /path/to/ckpt /path/to/ckpt.backup.$(date +%Y%m%d)
# 3. 删除旧 Checkpoint
hdfs dfs -rm -r /path/to/ckpt
# 4. 重新启动流处理作业(从最新 Kafka offset 开始)
# 注意:历史状态全部丢失(聚合从零开始),需要业务侧接受
spark-submit --class MyStreamingApp myapp.jar \
--conf spark.sql.streaming.checkpointLocation=/path/to/ckpt第 10 章 推测执行在数据倾斜场景下加剧问题
10.1 症状与告警信号
开启推测执行后,发现:
- 集群资源使用率异常高(同一 Task 有多个 Executor 在并发执行)
- 特定 Task 的推测副本和原 Task 运行时间相近(推测没有加速)
- 整体 Stage 完成时间反而增加(推测副本消耗了本可用于正常 Task 的资源)
10.2 根因分析
数据倾斜场景下(某个 Key 对应的数据量是其他 Key 的 100 倍),慢 Task 慢的根本原因是数据量大,而不是节点故障或性能抖动。推测副本被调度到另一个 Executor 上,处理的是同样大量的数据(相同的输入分区),因此同样很慢。
结果:原 Task 和推测副本都很慢,两者都在消耗资源,但最终完成时间与只运行原 Task 相比几乎没有提升,反而浪费了一份 Executor 资源。
10.3 解决策略
正确做法:解决数据倾斜,而不是依赖推测执行
- Salting(加盐):给倾斜的 Key 加随机后缀,打散到多个分区
// 对倾斜 key 加盐,分散到 N 个分区 val saltedDF = df.withColumn("salted_key", concat($"key", lit("_"), (rand() * 10).cast("int").cast("string"))) .groupBy("salted_key").agg(...) .withColumn("key", split($"salted_key", "_")(0)) .groupBy("key").agg(...) - Broadcast Join:将小表 broadcast,避免大表 Shuffle
- AQE(Adaptive Query Execution,Spark 3.0+):自动检测数据倾斜并拆分倾斜分区
关闭推测执行(或调高阈值):
# 对于有大量数据倾斜的作业,关闭推测执行
spark.speculation=false
# 或提高推测阈值,减少误触发
spark.speculation.multiplier=3.0 # 必须达到中位数 3 倍才触发推测
spark.speculation.quantile=0.90 # 必须 90% 完成后才触发推测附录:生产容错参数速查表
批处理核心参数
| 参数 | 默认值 | 说明 | 推荐值(生产 Spot 实例场景) |
|---|---|---|---|
spark.task.maxFailures | 4 | 单 Task 最大失败次数 | 8-10 |
spark.stage.maxConsecutiveAttempts | 4 | Stage 最大连续失败次数 | 8-10 |
spark.network.timeout | 120s | Executor 心跳超时 | 300s(节点频繁 GC 时) |
spark.shuffle.io.maxRetries | 3 | Shuffle Fetch 最大重试次数 | 10 |
spark.shuffle.io.retryWait | 5s | Shuffle Fetch 重试等待时间 | 30s |
spark.blacklist.enabled | false | 开启节点黑名单 | true(硬件问题频繁时) |
spark.speculation | false | 开启推测执行 | true(非数据倾斜场景) |
spark.speculation.multiplier | 1.5 | 推测执行判定倍数 | 2.0-3.0 |
Structured Streaming 核心参数
| 参数 | 默认值 | 说明 | 推荐值 |
|---|---|---|---|
spark.sql.streaming.minBatchesToRetain | 100 | Offset/Commit Log 保留的最小 Epoch 数 | 200 |
spark.sql.streaming.multipleWatermarkPolicy | min | 多 Source Watermark 策略 | max(避免 Watermark 停滞) |
spark.sql.streaming.stateStore.maintenanceInterval | 60s | State Store 维护(快照)触发间隔 | 30s(快速恢复场景) |
spark.sql.streaming.stateStore.minVersionsToRetain | 2 | State Store 保留的最小版本数 | 2-3 |
RocksDB State Store 核心参数
| 参数 | 默认值 | 说明 | 推荐值 |
|---|---|---|---|
rocksdb.writeBufferSizeMB | 64 | MemTable 大小 | 128-256 |
rocksdb.maxWriteBufferNumber | 4 | 最大 MemTable 数量 | 4-8 |
rocksdb.blockCacheSizeMB | 8 | Block Cache 大小 | 64-256 |
rocksdb.changelogCheckpointing.enabled | false(3.2) | 开启 Changelog 模式 | true |
rocksdb.minDeltasForSnapshot | 200 | 触发全量快照的 Changelog 数 | 100-200 |
spark.executor.memoryOverhead | 10% | 堆外内存(需覆盖 RocksDB 堆外用量) | 2-4g |
小结
至此,整个”Spark 容错与状态管理深度解析”专栏的 10 篇正文全部完成。
这份生产调优手册汇总了最高频的 10 类容错问题:
- 批处理三大问题:Task 反复失败(黑名单 + 调大 maxFailures)、FetchFailedException 风暴(ESS + 调大 Fetch 重试)、Driver OOM(Checkpoint + 增大 Driver 内存)
- 流处理七大问题:Epoch 积压(State TTL + RocksDB)、State Store OOM(Watermark + RocksDB + TTL)、Watermark 停滞(max 策略 + 空闲分区推进)、RocksDB 磁盘耗尽(TTL + 大磁盘 + 监控)、ForeachBatch 重复写入(Delta Lake / 事务幂等写法)、Checkpoint 损坏(备份 + 标准重建流程)、推测执行加剧倾斜(关闭推测 + AQE 解决倾斜)
每个问题的处理链路:告警信号 → 根因分类(用 Spark UI / 日志定位)→ 即时止血 → 长期策略,这是在生产环境中快速响应和系统性解决容错问题的标准方法论。
思考题
FetchFailedException引发的 Stage 重试风暴是一类常见的生产问题:一个 Executor 宕机 → 多个 Reducer 同时抛出 FetchFailedException → DAGScheduler 重新提交整个上游 Stage → 新 Stage 的 Task 重新占用资源 → 资源紧张引发更多 OOM → 更多 Executor 宕机,形成雪崩。如何通过 Spark 的配置参数(如spark.stage.maxConsecutiveAttempts)和资源配置来打断这个雪崩循环?- 生产中有时会遇到”作业卡住但没有报错”的问题:所有 Task 都在运行,但进度条长时间不动。这通常是某种形式的死锁或资源泄漏。在什么具体场景下,Spark 作业会陷入这种”假进行”状态?如何通过 Spark UI 的 Threads Dump 或 Task 时间轴来定位根因?
- 批处理作业的 SLA(如”每天 6 点前完成”)需要综合考虑数据量波动、集群负载变化和偶发性硬件故障。如何为 Spark 作业建立一套”容错预算”模型——即在保证 SLA 的前提下,允许多少次 Task 失败、多少次 Stage 重试、最多容忍多长时间的恢复延迟?这个模型如何指导
maxFailures、重试超时等参数的设置?
参考资料
- Apache Spark 官方文档:Configuration Guide(spark.apache.org)
- Apache Spark 官方文档:Structured Streaming Programming Guide
- Spark 容错机制(liyichao.github.io)
- RocksDB 101: Optimizing Stateful Streaming in Apache Spark(AWS Blog)
- How Spark Structured Streaming Recovers After Failures(Canadian Data Guy)
- 本专栏前九篇文章(01-09)