10 生产容错调优手册:从告警到根因的系统性诊断

摘要

前九篇系统讲解了 Spark 容错体系的每一个核心机制。本篇是整个专栏的”实战落地”——将所有知识点聚合为一份可以在生产值班时随时查阅的诊断手册。覆盖批处理与流处理中最高频的十类容错问题:Task 反复失败触发作业超时、FetchFailedException 引发的 Stage 重试风暴、Driver OOM 导致的批处理作业反复重跑、Structured Streaming Epoch 积压、State Store OOM、Watermark 停滞导致状态无限膨胀、RocksDB State Store 本地磁盘耗尽、ForeachBatch Sink 重复写入、Checkpoint 目录损坏无法恢复、以及推测执行在数据倾斜场景下反而加剧问题。每类问题给出:告警信号(如何第一时间发现)、根因分析(从第一性原理推导)、即时止血措施、以及长期调优策略,并在文末附全参数速查表。


第 1 章 批处理:Task 反复失败触发作业超时

1.1 症状与告警信号

Spark UI 表现

  • Jobs 页面出现大量 FAILED Stage(橙色)
  • Stages 页面某些 Task 的 Attempt 列显示 3、4(接近或达到 maxFailures
  • 作业运行时间远超历史平均值

日志关键字

WARN TaskSetManager: Lost task X.N in stage Y.Z ...
WARN TaskSetManager: Stage X contains a task of very large size (... KB)
ERROR TaskSetManager: Task X in stage Y.Z failed 4 times; aborting job

监控指标executor.failedTasks 持续高于正常水位;作业 SLA(完成时间)告警触发。

1.2 根因分类

根因一:特定节点硬件问题(磁盘坏道、内存 ECC 错误)

表现:同一个节点上有大量 Task 失败,Failed Tasks 计数集中在某一个 Executor 上。

诊断步骤:

  1. 在 Spark UI → Executors 页面,找到 Failed Tasks 计数最高的 Executor
  2. 记录该 Executor 的 host 名,SSH 到该节点检查系统日志(dmesg | grep -i error/var/log/messages
  3. 检查磁盘健康状态(smartctl -a /dev/sda

根因二:Task 数据量过大(Spill 到磁盘导致超时)

表现:失败日志中有 TaskKilledException: task killed due to stage cancellation 或 Task 运行时间异常长(P99 远大于中位数)。

诊断:Spark UI → Stage 详情 → Task Metrics,查看 Shuffle Spill (Disk) 是否远高于 Shuffle Spill (Memory),说明内存不足引发大量 Spill,导致 Task 极慢甚至超时。

根因三:用户代码抛出非预期异常

表现:Task 失败日志中有明确的用户代码堆栈(如 NullPointerException at com.company.MyUDF:42)。

诊断:查看完整的 Task 失败 stderr 日志(Spark UI → Stage → 某个 FAILED Task → Logs → stderr)。

1.3 调优策略

根因即时止血长期策略
硬件问题节点手动将问题节点从集群摘除或加入黑名单(spark.blacklist.enabled=true修复硬件或永久下线节点
Spill 过多增加 Executor 内存(--executor-memory);减少单 Task 数据量(增大分区数 repartition(N)开启 Tungsten 堆外内存;优化数据倾斜
用户代码异常在用户代码中增加异常处理(try-catch);过滤问题数据修复代码 Bug;增加数据质量检查

第 2 章 批处理:FetchFailedException 引发的 Stage 重试风暴

2.1 症状与告警信号

日志关键字

WARN TaskSetManager: Lost task X.0 in stage Y.0 ... FetchFailed(...)
INFO DAGScheduler: Resubmitting ShuffleMapStage Z ... due to fetch failure
WARN DAGScheduler: Marking Stage Y as failed due to a fetch failure from Stage Z

Spark UI 表现

  • 某个 Stage 在 Completed 状态后又重新变为 RUNNING(Stage 回滚)
  • 同一个 Stage 出现多次 Attempt(如 Stage 3 Attempt 0、Attempt 1、Attempt 2…)
  • 作业整体进度”倒退”(已完成的 Stage 重新执行)

2.2 根因分类

根因一:Map 节点宕机(Shuffle 文件物理消失)

最常见的真正原因。Map Stage 完成后节点宕机,Shuffle 文件随节点不可访问,导致 Reduce Task 的 Fetch 失败。

根因二:Executor 内存压力导致 GC 暂停过长

Map 节点的 Executor 正处于 Full GC,所有线程(包括 Netty 的 Shuffle 服务线程)暂停。Reduce 端 Fetch 超时(spark.shuffle.io.retryWait × maxRetries),引发 FetchFailedException。GC 结束后节点恢复,但 Fetch 已被标记为失败。

诊断:检查 Map 端 Executor 的 GC 日志(-verbose:gc 开启),查看是否有 Full GC 停顿超过 spark.shuffle.io.retryWait(默认 5 秒)。

根因三:网络抖动(Rack 间带宽竞争)

大规模 Shuffle 期间,Rack 间网络带宽打满,个别 Fetch 请求超时。

2.3 调优策略

即时止血

  • 调大 Fetch 重试次数和等待时间,给网络抖动更多容忍空间:
    spark.shuffle.io.maxRetries=10
    spark.shuffle.io.retryWait=30s
  • 调大 Stage 重试上限(应对节点批量宕机):
    spark.stage.maxConsecutiveAttempts=10

长期策略

  • 部署 External Shuffle Service(ESS):Shuffle 文件由 ESS 服务进程(不随 Executor 生命周期)管理,Executor OOM 或 GC 不影响 Shuffle 数据可用性
    spark.shuffle.service.enabled=true
  • 在关键 Shuffle 前做 RDD Checkpoint,避免 Stage 回滚代价过高
  • 考虑迁移到 Apache Celeborn(RSS),彻底解耦 Shuffle 存储与计算节点

第 3 章 批处理:Driver OOM 导致作业反复失败

3.1 症状与告警信号

日志关键字

java.lang.OutOfMemoryError: Java heap space
  at org.apache.spark.scheduler.DAGScheduler ...

# 或 YARN 日志
Container killed by YARN for exceeding memory limits.
XX MB of XX MB physical memory used.

YARN 监控:ApplicationMaster Container 状态频繁在 RUNNING → FAILED → RUNNING 之间切换。

3.2 根因分类

根因一:Broadcast 变量过大

sc.broadcast(data) 将整个数据集缓存到 Driver 内存,如果 data 过大(几十 GB),Driver 堆直接 OOM。

诊断:检查代码中所有 broadcast 调用,查看被广播对象的大小(data.size 或序列化后的字节数)。

根因二:超长 Lineage 链导致 Driver GC 压力过大

迭代算法(如 100 次迭代的 ML 训练)中,每次迭代产生新的 RDD 对象,但旧 RDD 对象因为被 Lineage 引用(dependencies)而无法被 GC 回收。Driver 内存逐渐被大量 RDD 对象填满。

诊断:通过 Driver 的 JVM Heap Dump(jmap -dump:format=b,file=dump.hprof <pid>),分析对象占用(mat/jhat),看是否有大量 RDD 对象未被回收。

根因三:collectToDriver 类操作收集了过大的结果集

rdd.collect()df.toPandas()df.show(largeN) 等操作将 Executor 端的数据全量传输到 Driver,如果数据量超过 Driver 堆大小,OOM。

3.3 调优策略

根因即时止血长期策略
Broadcast 过大减小 Broadcast 数据(只 broadcast 需要的列/行);改用 Sort Merge Join审查所有 broadcast 调用,设置大小上限 spark.sql.autoBroadcastJoinThreshold
Lineage 过长在迭代算法中每 10-20 次迭代做一次 checkpoint() + persist()在所有迭代 ML 算法中内置 Checkpoint 逻辑
collect 过大增加 Driver 内存(--driver-memory);改用 take(N) 限制结果量改为 write 直接写出,不 collect 到 Driver

第 4 章 Structured Streaming:Epoch 积压(Processing Lag 持续增长)

4.1 症状与告警信号

监控指标batchDuration 图中每个 Epoch 的处理时间持续 > trigger 间隔(如 trigger = 10s 但 batchDuration 稳定在 30s)。

日志关键字

INFO MicroBatchExecution: Streaming query made progress: {
  "batchId" : 1234,
  "batchDuration" : 35000,  // 35 秒,远超 trigger 间隔 10 秒
  "durationMs" : { "triggerExecution" : 34800, "queryPlanning" : 200 }
}

现象:Kafka 端 Consumer Group Lag 持续增长(数据生产速率 > 消费速率)。

4.2 根因分类

根因一:状态数据量过大,State Store 操作成为瓶颈

State Store 的 get/put 操作占用了大量 Epoch 时间(HDFSBackedStateStore 的内存 HashMap 过大,GC 频繁;或 RocksDB StateStore 的 SST 文件层数过多,读放大严重)。

诊断:查看 Spark UI → Structured Streaming → State Operators,找 numRowsTotal 是否异常大;查看 Executor GC 时间(在 Stages 页面的 Task Metrics 中)。

根因二:数据倾斜(某些分区的数据量远大于其他分区)

整个 Epoch 的完成时间取决于最慢的那个 Task,如果某个 Kafka 分区的数据量是其他分区的 10 倍,对应的 Task 会成为瓶颈。

诊断:Spark UI → Stage → Task 时间分布,查看 Max / Median 比值是否 > 3。

根因三:下游 Sink 写出变慢(如 Delta Lake compaction 占用 I/O)

ForeachBatch 中的写出操作(如写 Delta Lake)因为触发了 Compaction 或其他后台任务而变慢。

4.3 调优策略

根因即时止血长期策略
State Store 太大配置 Watermark + TTL 清理过期状态;切换 RocksDB State Store审查有状态算子是否有 TTL,防止状态无限增长
数据倾斜增加 Kafka 分区数(分散数据);在 foreachBatch 中使用 repartition 重新分区在 Source 侧调整分区策略
Sink 变慢增大 Delta Lake Compaction 间隔;与写出路径解耦(异步写出)优化 Sink 实现,避免同步 Compaction 阻塞流处理

第 5 章 Structured Streaming:State Store OOM

5.1 症状与告警信号

日志关键字

java.lang.OutOfMemoryError: Java heap space
  at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStore.put
  
# 或 GC overhead limit
java.lang.OutOfMemoryError: GC overhead limit exceeded

Spark UI:State Operators 的 memoryUsedBytes 持续增长,接近或超过 Executor 堆大小。

5.2 根因分类

根因一:有状态算子没有配置 Watermark / TTL

dropDuplicates("id")groupBy(...).count()(Update 模式)没有 Watermark,状态永远不清理。

诊断:查看 numRowsRemoved(Spark UI State Operators)是否始终为 0。

根因二:Watermark threshold 设置过大

Watermark threshold = 7 天,意味着 State Store 中保留 7 天内所有 key 的状态。对于高基数 key,7 天的状态量可能极大。

根因三:自定义 GroupState 超时回调中忘记调用 state.remove()

状态超时被触发(hasTimedOut = true),用户回调中输出了结果,但忘记调用 state.remove(),状态仍然保留在 State Store 中。

5.3 调优策略

即时止血

# 增加 Executor 内存(治标)
spark.executor.memory=16g
 
# 切换 RocksDB State Store(彻底解决堆限制问题)
spark.sql.streaming.stateStore.providerClass=\
  org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider

长期策略

  1. 为所有有状态算子配置合理的 Watermark threshold(根据业务数据的最大延迟设置,通常 P99 延迟 × 2)
  2. 检查所有 mapGroupsWithState / flatMapGroupsWithState 的超时回调,确保 state.remove() 被调用
  3. 在 Spark UI 的 State Operators 面板中设置 numRowsTotal 的告警阈值

第 6 章 Structured Streaming:Watermark 停滞导致状态无限膨胀

6.1 症状与告警信号

Spark UI 表现:Streaming Statistics 页面的 Watermark 曲线长时间水平(不前进);numRowsRemoved 持续为 0。

业务影响:流聚合窗口长时间没有输出(窗口等待 Watermark 超过窗口结束时间才输出);State Store 持续膨胀。

6.2 根因分类

根因一:某个 Source 长期无数据(多 Source Join 场景)

两条 Kafka 流 Join 时,一条流断流(如上游服务故障导致某个 Topic 没有新数据),该流的 Watermark 停滞,全局 Watermark(取 min)也停滞。

诊断:查看各 Source 的 numInputRows,找到为 0 的 Source。

根因二:数据全是极度迟到的历史数据

进行历史数据 Backfill 时,导入的历史数据的事件时间远小于”现在”,但系统已经处理了大量近期数据,Watermark 已经很高。历史数据全部被视为”迟到”,不更新 Watermark。

根因三:Kafka 分区中有长时间不活跃的 Partition

某个 Kafka Partition 长时间没有新消息(生产者停止向该 Partition 写入),但 Spark 仍将该 Partition 的 Watermark 视为当前值(该 Partition 没有新消息 = 没有新的事件时间 = Watermark 不更新)。

6.3 调优策略

根因一(多 Source Watermark 取 min)

# 改为取最大值(接受部分迟到数据被丢弃,但 Watermark 不会因一个流断流而停滞)
spark.sql.streaming.multipleWatermarkPolicy=max

根因二(历史数据 Backfill):历史数据回放期间,暂时关闭 Watermark 相关的状态清理,或用独立的流处理作业处理历史数据,不与实时流共享状态。

根因三(Kafka Partition 不活跃):在 Kafka Source 配置中设置”空闲 Partition Watermark 推进”策略(Spark 3.1+):

df.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "...")
  .option("subscribe", "my-topic")
  // 空闲分区推进 Watermark:超过 5 分钟无数据的分区,Watermark 继续推进
  .option("idleTimeoutMs", "300000")
  .load()

第 7 章 Structured Streaming:RocksDB State Store 本地磁盘耗尽

7.1 症状与告警信号

日志关键字

java.io.IOException: No space left on device
  at org.rocksdb.RocksDB.put(...)
  
# 或 K8s 上
OOMKilled: Container killed due to memory limits (if using tmpfs for local dir)

系统监控spark.local.dir 所在磁盘的使用率持续上涨至 90%+。

7.2 根因分类

根因一:状态数据量持续增长,没有 TTL

RocksDB 的 SST 文件随状态增长持续增大,最终耗尽磁盘。

根因二:RocksDB Compaction 的空间放大

LSM Tree 的空间放大(旧版本 key 在 Compaction 前占用额外空间)导致实际占用磁盘 = 有效数据量 × 1.5-2 倍。

根因三:Changelog 和 Snapshot 文件在 HDFS 上积累,但本地临时文件未清理

RocksDB 在生成 Snapshot 期间,HDFS 上传过程中的临时文件未及时清理,占用本地磁盘。

7.3 调优策略

即时止血

# 检查 RocksDB 本地目录大小
du -sh /mnt/nvme/rocksdb-state/*
 
# 手动清理旧的 SST 文件(谨慎操作,仅清理可以通过 Checkpoint 恢复的旧版本)
# 最好是重启受影响的 Executor,让 RocksDB 从 HDFS 重新下载 Checkpoint

长期策略

  1. 配置状态 TTL(Watermark 或 GroupState TTL),控制状态规模
  2. 为 RocksDB 配置独立的大容量 NVMe 磁盘,并设置磁盘使用率监控告警(阈值 70%)
  3. 调整 minVersionsToRetain 减少本地保留的历史版本数:
    spark.sql.streaming.stateStore.minVersionsToRetain=1

第 8 章 Structured Streaming:ForeachBatch Sink 重复写入

8.1 症状与告警信号

业务现象:下游数据表出现重复行;聚合指标值偏高(如 count 值是预期的 2 倍)。

Spark 日志:无明显错误(Exactly-once 保证失效通常是”静默”的——既不报错,也不告警,只是数据不对)。

8.2 根因分析

ForeachBatch Sink 重复写入的根本原因是:崩溃发生在”数据写出成功”但”Commit Log 写入前”,重启后重做该 Epoch,数据被再次写出(第 05 篇详解)。

具体到代码层面,常见的错误模式:

错误一:直接 JDBC INSERT(无幂等性)

// 问题:崩溃重启后会重复 INSERT
query.writeStream.foreachBatch { (df, epochId) =>
  df.write.mode("append").jdbc(url, "table", props)
}.start()

错误二:先删后插的”删”和”插”不在同一事务中

// 问题:如果在 DELETE 完成后、INSERT 开始前崩溃,数据会短暂丢失(然后重跑才恢复)
// 如果 INSERT 完成后才崩溃,重跑时 DELETE 会删掉已有的正确数据,再 INSERT 一遍
// 实际上这种写法是正确的(最终结果幂等),但 DELETE+INSERT 之间必须在同一事务
query.writeStream.foreachBatch { (df, epochId) =>
  val conn = getConnection()
  conn.setAutoCommit(false)
  conn.createStatement().execute(s"DELETE FROM t WHERE epoch_id = $epochId")
  df.write.jdbc(url, "t", props)  // 注意:这里的 jdbc write 使用的是另一个连接!事务不共享!
  conn.commit()  // 这个 commit 只提交了 DELETE,INSERT 在不同连接中自动提交了
}.start()

8.3 修复方案

推荐:使用 Delta Lake 作为 Sink(最简洁)

query.writeStream
  .format("delta")
  .option("checkpointLocation", "hdfs://path/ckpt")
  .start("hdfs://path/delta-table")
// Delta Lake 自动通过 (queryId, epochId) 保证幂等

JDBC Sink 的正确幂等写法(单连接事务)

query.writeStream.foreachBatch { (df, epochId) =>
  // 1. 将数据写到临时 DataFrame
  df.persist()
  
  // 2. 使用单个连接,在同一事务中完成 DELETE + INSERT
  val rows = df.collect()  // 注意:只在小数据量时适用
  
  val conn = DriverManager.getConnection(url, user, password)
  try {
    conn.setAutoCommit(false)
    
    // 检查幂等:该 epoch 是否已写过
    val checkStmt = conn.prepareStatement("SELECT 1 FROM epoch_log WHERE epoch_id=?")
    checkStmt.setLong(1, epochId)
    val rs = checkStmt.executeQuery()
    if (!rs.next()) {
      // 写数据
      val insertStmt = conn.prepareStatement("INSERT INTO target_table VALUES (?,?,?)")
      rows.foreach { row =>
        insertStmt.setString(1, row.getString(0))
        // ... 设置其他字段
        insertStmt.addBatch()
      }
      insertStmt.executeBatch()
      
      // 记录 epoch 完成
      val logStmt = conn.prepareStatement("INSERT INTO epoch_log(epoch_id) VALUES (?)")
      logStmt.setLong(1, epochId)
      logStmt.execute()
      
      conn.commit()
    }
  } catch {
    case e: Exception => conn.rollback(); throw e
  } finally {
    conn.close()
    df.unpersist()
  }
}.start()

第 9 章 Checkpoint 目录损坏无法恢复

9.1 症状与告警信号

日志关键字

org.apache.spark.sql.streaming.StreamingQueryException: 
  Failed to read streaming state store. Cannot start query from checkpoint location...

# 或
ERROR MicroBatchExecution: Query [id=abc] terminated with error
  Caused by: java.io.IOException: No FileSystem for scheme: hdfs
  
# 或 Schema 不兼容
AnalysisException: Detected incompatible evolution in streaming query plan.
  Checkpoint schema: ...  Current schema: ...

9.2 根因分类

根因症状处理方式
HDFS 目录被误删offsets/commits/ 目录不存在从备份恢复(如果有);否则删除整个 Checkpoint,从最新 offset 重新开始
部分 Checkpoint 文件损坏读取某个 offsets/N 文件时 JSON 解析失败手动删除损坏的文件,退回到上一个完整 Epoch
Schema 变更不兼容State Store 中存储的 Schema 与新代码期望的 Schema 不匹配删除整个 Checkpoint,从最新 offset 重新开始(接受状态丢失)
Spark 版本升级导致不兼容升级 Spark 大版本后 Checkpoint 格式变化在升级前先停流、备份 Checkpoint;升级后若不兼容则重建 Checkpoint

9.3 生产最佳实践

防患于未然

  1. 定期备份 Checkpoint 目录到另一个 HDFS 路径或对象存储(每日备份,保留 7 天)
  2. 在 Checkpoint 目录设置 HDFS ACL,防止误删(只有流处理服务账号有写权限,人工账号只读)
  3. Spark 大版本升级前,先用新版本测试 Checkpoint 兼容性(在 Staging 环境),不兼容时提前规划 Checkpoint 重建窗口

重建 Checkpoint 的标准操作流程

# 1. 停止流处理作业
spark-submit --kill <application-id>
 
# 2. 备份旧 Checkpoint
hdfs dfs -cp /path/to/ckpt /path/to/ckpt.backup.$(date +%Y%m%d)
 
# 3. 删除旧 Checkpoint
hdfs dfs -rm -r /path/to/ckpt
 
# 4. 重新启动流处理作业(从最新 Kafka offset 开始)
# 注意:历史状态全部丢失(聚合从零开始),需要业务侧接受
spark-submit --class MyStreamingApp myapp.jar \
  --conf spark.sql.streaming.checkpointLocation=/path/to/ckpt

第 10 章 推测执行在数据倾斜场景下加剧问题

10.1 症状与告警信号

开启推测执行后,发现:

  • 集群资源使用率异常高(同一 Task 有多个 Executor 在并发执行)
  • 特定 Task 的推测副本和原 Task 运行时间相近(推测没有加速)
  • 整体 Stage 完成时间反而增加(推测副本消耗了本可用于正常 Task 的资源)

10.2 根因分析

数据倾斜场景下(某个 Key 对应的数据量是其他 Key 的 100 倍),慢 Task 慢的根本原因是数据量大,而不是节点故障或性能抖动。推测副本被调度到另一个 Executor 上,处理的是同样大量的数据(相同的输入分区),因此同样很慢。

结果:原 Task 和推测副本都很慢,两者都在消耗资源,但最终完成时间与只运行原 Task 相比几乎没有提升,反而浪费了一份 Executor 资源。

10.3 解决策略

正确做法:解决数据倾斜,而不是依赖推测执行

  • Salting(加盐):给倾斜的 Key 加随机后缀,打散到多个分区
    // 对倾斜 key 加盐,分散到 N 个分区
    val saltedDF = df.withColumn("salted_key", 
      concat($"key", lit("_"), (rand() * 10).cast("int").cast("string")))
      .groupBy("salted_key").agg(...)
      .withColumn("key", split($"salted_key", "_")(0))
      .groupBy("key").agg(...)
  • Broadcast Join:将小表 broadcast,避免大表 Shuffle
  • AQE(Adaptive Query Execution,Spark 3.0+):自动检测数据倾斜并拆分倾斜分区

关闭推测执行(或调高阈值)

# 对于有大量数据倾斜的作业,关闭推测执行
spark.speculation=false
 
# 或提高推测阈值,减少误触发
spark.speculation.multiplier=3.0  # 必须达到中位数 3 倍才触发推测
spark.speculation.quantile=0.90   # 必须 90% 完成后才触发推测

附录:生产容错参数速查表

批处理核心参数

参数默认值说明推荐值(生产 Spot 实例场景)
spark.task.maxFailures4单 Task 最大失败次数8-10
spark.stage.maxConsecutiveAttempts4Stage 最大连续失败次数8-10
spark.network.timeout120sExecutor 心跳超时300s(节点频繁 GC 时)
spark.shuffle.io.maxRetries3Shuffle Fetch 最大重试次数10
spark.shuffle.io.retryWait5sShuffle Fetch 重试等待时间30s
spark.blacklist.enabledfalse开启节点黑名单true(硬件问题频繁时)
spark.speculationfalse开启推测执行true(非数据倾斜场景)
spark.speculation.multiplier1.5推测执行判定倍数2.0-3.0

Structured Streaming 核心参数

参数默认值说明推荐值
spark.sql.streaming.minBatchesToRetain100Offset/Commit Log 保留的最小 Epoch 数200
spark.sql.streaming.multipleWatermarkPolicymin多 Source Watermark 策略max(避免 Watermark 停滞)
spark.sql.streaming.stateStore.maintenanceInterval60sState Store 维护(快照)触发间隔30s(快速恢复场景)
spark.sql.streaming.stateStore.minVersionsToRetain2State Store 保留的最小版本数2-3

RocksDB State Store 核心参数

参数默认值说明推荐值
rocksdb.writeBufferSizeMB64MemTable 大小128-256
rocksdb.maxWriteBufferNumber4最大 MemTable 数量4-8
rocksdb.blockCacheSizeMB8Block Cache 大小64-256
rocksdb.changelogCheckpointing.enabledfalse(3.2)开启 Changelog 模式true
rocksdb.minDeltasForSnapshot200触发全量快照的 Changelog 数100-200
spark.executor.memoryOverhead10%堆外内存(需覆盖 RocksDB 堆外用量)2-4g

小结

至此,整个”Spark 容错与状态管理深度解析”专栏的 10 篇正文全部完成。

这份生产调优手册汇总了最高频的 10 类容错问题:

  • 批处理三大问题:Task 反复失败(黑名单 + 调大 maxFailures)、FetchFailedException 风暴(ESS + 调大 Fetch 重试)、Driver OOM(Checkpoint + 增大 Driver 内存)
  • 流处理七大问题:Epoch 积压(State TTL + RocksDB)、State Store OOM(Watermark + RocksDB + TTL)、Watermark 停滞(max 策略 + 空闲分区推进)、RocksDB 磁盘耗尽(TTL + 大磁盘 + 监控)、ForeachBatch 重复写入(Delta Lake / 事务幂等写法)、Checkpoint 损坏(备份 + 标准重建流程)、推测执行加剧倾斜(关闭推测 + AQE 解决倾斜)

每个问题的处理链路:告警信号 → 根因分类(用 Spark UI / 日志定位)→ 即时止血 → 长期策略,这是在生产环境中快速响应和系统性解决容错问题的标准方法论。


思考题

  1. FetchFailedException 引发的 Stage 重试风暴是一类常见的生产问题:一个 Executor 宕机 → 多个 Reducer 同时抛出 FetchFailedException → DAGScheduler 重新提交整个上游 Stage → 新 Stage 的 Task 重新占用资源 → 资源紧张引发更多 OOM → 更多 Executor 宕机,形成雪崩。如何通过 Spark 的配置参数(如 spark.stage.maxConsecutiveAttempts)和资源配置来打断这个雪崩循环?
  2. 生产中有时会遇到”作业卡住但没有报错”的问题:所有 Task 都在运行,但进度条长时间不动。这通常是某种形式的死锁或资源泄漏。在什么具体场景下,Spark 作业会陷入这种”假进行”状态?如何通过 Spark UI 的 Threads Dump 或 Task 时间轴来定位根因?
  3. 批处理作业的 SLA(如”每天 6 点前完成”)需要综合考虑数据量波动、集群负载变化和偶发性硬件故障。如何为 Spark 作业建立一套”容错预算”模型——即在保证 SLA 的前提下,允许多少次 Task 失败、多少次 Stage 重试、最多容忍多长时间的恢复延迟?这个模型如何指导 maxFailures、重试超时等参数的设置?

参考资料