10 生产调优手册:从症状到根因的系统性诊断

摘要

前九篇系统讲解了 Spark Shuffle 与内存管理的底层机制。本篇是一篇面向生产实战的诊断与调优手册,将理论知识落地为可操作的工程实践。核心思路是从症状出发、逆向推导根因、按优先级施策——而不是无头苍蝇式的参数堆砌。本文分为五个诊断维度:Shuffle Spill 过多、数据倾斜、Executor OOM、Shuffle Fetch 超时失败、GC 压力过大,每个维度给出诊断路径(如何在 Spark UI 和日志中识别)、根因分析(为什么会发生)和针对性调优策略(做什么、怎么做、为什么这样做)。文末提供一份按场景分类的参数速查表。


第 1 章 调优的基本方法论

1.1 先诊断,后调参

生产调优最常见的误区是:直接按照网上流传的”Spark 调优参数大全”逐一调整,观察效果,反复试错。这种方式不仅效率低下,而且可能在改善一个问题的同时引入另一个问题。

正确的调优顺序是:

第一步:定位瓶颈——通过 Spark UI、Executor 日志、系统监控(CPU、内存、磁盘 I/O、网络),确定作业的性能瓶颈在哪里(Shuffle?GC?数据倾斜?磁盘?网络?)

第二步:分析根因——基于对底层机制的理解,分析为什么这个地方成为瓶颈(内存不足导致频繁 Spill?数据分布不均导致某些 Task 处理数据量远超其他?序列化效率低?)

第三步:按优先级施策——优先从代码/算法层面优化(如将 groupByKey 改为 reduceByKey),其次从配置层面调整,最后才考虑扩充资源

第四步:验证效果——对比调优前后的关键指标(Stage 时间、Shuffle Write/Read 量、Spill 量、GC 时间),确认改善方向正确,没有引入新问题

1.2 调优的三个层次

调优的手段可以按收益/代价比从高到低分为三个层次:

层次手段典型操作收益代价
算法层改变计算逻辑,减少根本数据量reduceByKey 替代 groupByKey,Broadcast Join 替代 Shuffle Join最高(可减少数据量 10-100x)需要改代码,有风险
配置层调整 Spark 参数,在不改代码前提下优化增大 Executor 内存、调整分区数、开启压缩中等(通常改善 2-5x)风险低,但效果有上限
资源层增加集群资源增加 Executor 数量、使用更大内存的机型、换 SSD 磁盘线性改善成本最高

经验法则:先穷举算法层和配置层优化,资源层优化是最后手段

1.3 Spark UI 是调优的核心工具

在开始任何调优之前,必须熟练使用 Spark UI(默认端口 4040)。核心页面及其关键信息:

Jobs 页面:作业级别的执行时间和 Stage 数量。找出耗时最长的 Job,进入其 Stage 详情。

Stages 页面:Stage 级别的汇总指标,包括 Shuffle Write/Read 量、Shuffle Spill(Memory/Disk)、每个 Task 的时间分布(Min/Median/Max/P75/P95)。Task 时间分布的极值与中位数之比反映数据倾斜程度。

Executors 页面:每个 Executor 的内存使用、GC 时间、Shuffle Read/Write 量。GC 时间/Task 执行时间 > 10% 说明 GC 是瓶颈。

SQL 页面(针对 Spark SQL):SQL 执行计划树,每个算子的输入/输出行数和大小,可以看到哪个 Exchange(Shuffle)节点产生了大量数据。


第 2 章 Shuffle Spill 过多

2.1 诊断信号

Spark UI 信号

  • Stage 详情的 Summary Metrics 中,Shuffle Spill (Disk) 非零,且与 Shuffle Write Size 的比值 > 2
  • 某个或某几个 Task 的 Shuffle Spill (Disk) 远大于其他 Task(提示数据倾斜,见第 3 章)
  • Task 的 Duration 远超预期,但 CPU 使用率不高——磁盘 I/O 成为瓶颈

Executor 日志信号

INFO ExternalSorter: Thread N spilling in-memory map of XXX MB to disk (Y times so far)

Y 值超过 5 表示严重 Spill,超过 10 意味着磁盘 I/O 是主要瓶颈。

系统监控信号:Executor 节点的磁盘写入 I/O 持续高位(iostat 显示 %util > 80%),而 CPU 使用率不高——典型的 I/O 瓶颈特征。

2.2 根因分析

Spill 过多的根本原因是单个 Task 的 Execution Memory 不足以容纳其中间数据。触发原因有以下几类:

原因一:Executor 内存总量不足

spark.executor.memory 设置偏小,统一内存池太小,导致每个 Task 分配到的执行内存也太小。

原因二:内存被其他消费者占用

同一 Executor 上同时运行多个 Task,每个 Task 分享 Execution Memory;或者存储内存(RDD Cache)占用了太多统一内存池,导致执行内存被压缩。

原因三:单个 Task 数据量过大

Shuffle 分区数太少(spark.sql.shuffle.partitions 偏小),导致每个 Reduce Task 需要处理太多数据,峰值内存需求超出可用执行内存。

原因四:算法选择不当

使用了 groupByKey 而非 reduceByKey,没有 Map 端聚合,大量原始记录全部写入 Shuffle 文件,Reduce 端需要在内存中累积所有记录,内存压力倍增。

2.3 调优策略

策略一(算法层,最高优先级):使用 Map 端聚合算子

// 改造前:没有 Map 端聚合
rdd.groupByKey().mapValues(values => values.sum)
 
// 改造后:有 Map 端聚合,Shuffle 数据量减少 N 倍
rdd.reduceByKey(_ + _)

对于复杂聚合(多字段统计),使用 aggregateByKeycombineByKey 提供自定义的 Map 端聚合逻辑。

策略二(配置层):增大 Executor 内存和执行内存比例

spark.executor.memory=16g                     # 从 4g/8g 调大
spark.memory.fraction=0.75                    # 给统一内存池更多空间
spark.memory.storageFraction=0.3              # 若不需要大量 RDD 缓存,减少存储保护区

调整 memory.fraction 需要理解其含义:增大它意味着减少 User Memory(用户代码的堆空间),如果用户代码本身也分配大量对象,可能导致 User Memory 不足 OOM。

策略三(配置层):增大 Shuffle 分区数

spark.sql.shuffle.partitions=2000           # 默认 200,按数据量调大

经验法则:分区数 ≈ 总 Shuffle 数据量(GB) × 4 到 8。例如 100GB 数据,设置 400-800 个分区,每个 Task 处理 125-250MB,适合大多数内存配置。

策略四(配置层):开启 Spill 压缩和高性能序列化

spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.shuffle.spill.compress=true
spark.io.compression.codec=lz4
spark.shuffle.file.buffer=64k

这组配置组合使用,可以将 Spill 文件大小减少 50-70%,Spill 写入速度提升 30-50%。

策略五(资源层):使用 SSD 本地磁盘

spark.local.dir=/data/nvme0/spark-tmp,/data/nvme1/spark-tmp

SSD 的顺序写入速度(1-3GB/s)比 HDD(100-200MB/s)快 10-30 倍,Spill 密集场景收益显著。


第 3 章 数据倾斜

3.1 诊断信号

数据倾斜是 Spark 生产问题中最普遍、最难彻底解决的问题类型。

Spark UI 诊断步骤

  1. 进入倾斜 Stage 的详情页
  2. Duration 降序排列 Tasks
  3. 观察 Max Task Duration vs Median Task Duration:比值 > 3 为轻度倾斜,> 10 为严重倾斜
  4. 找到最慢的几个 Task,查看其 Input Size / RecordsShuffle Read Size,确认其数据量是否远超其他 Task

代码侧诊断

// 查看 RDD 各分区的数据量分布
rdd.mapPartitionsWithIndex { case (idx, iter) =>
  Iterator((idx, iter.size))
}.collect().sortBy(-_._2).take(20)

如果某几个分区的数据量是其他分区的 10 倍以上,确认为数据倾斜。

常见倾斜 Key 识别

// 找出数量最多的 Key
rdd.map(_._1).countByValue().toSeq.sortBy(-_._2).take(20)

3.2 根因分析

数据倾斜的根本原因是分区函数将大量数据路由到同一个分区。常见场景:

场景一:Null/Default 值过多

如果数据中某个 Join Key 或 Group Key 有大量 null 或默认值(如 -1"unknown"),这些 Key 通过哈希函数都会被路由到同一个分区,该 Reduce Task 的数据量是其他 Task 的数百倍。

场景二:热点业务数据

电商场景中某个超级大商家/品牌的订单量远超平均水平,按商家 ID 聚合时该商家对应的分区数据量极大。

场景三:JOIN 时一侧数据不均

两表 JOIN 时,右表某个 Key 的记录数极多(如产品表中某个热门产品的销售记录),该 Key 对应的 Reduce Task 需要处理大量来自右表的数据。

3.3 调优策略(按场景分类)

策略一:过滤无效 Key(适用于 Null/Default 倾斜)

在 Shuffle 之前,过滤掉不需要的 Null 值或单独处理:

// 过滤 null key,单独统计 null 记录
val nonNullRdd = rdd.filter(_._1 != null)
val nullCount = rdd.filter(_._1 == null).count()
 
// 对非 null 数据正常聚合
val result = nonNullRdd.reduceByKey(_ + _)

策略二:随机盐值(Salting)——适用于热点 Key 聚合

核心思想:将热点 Key 打散到多个分区并行计算,最后合并局部结果。

val N = 100  // 盐值数量(根据数据量调整)
 
// 第一轮:加盐,热点 Key 被分散到 N 个分区
val salted = rdd.map { case (key, value) =>
  val saltedKey = (key, scala.util.Random.nextInt(N))
  (saltedKey, value)
}
val partialResult = salted.reduceByKey(_ + _)
 
// 第二轮:去盐,合并同一原始 Key 的局部结果
val finalResult = partialResult.map { case ((key, _), value) =>
  (key, value)
}.reduceByKey(_ + _)

策略三:Broadcast Join——适用于大表 join 小表

当 Join 的一侧数据量小于 spark.sql.autoBroadcastJoinThreshold(默认 10MB)时,Spark SQL 自动选择 Broadcast Join,完全消除 Shuffle。如果小表超过阈值,可以手动指定:

// Spark SQL
val result = bigDF.join(broadcast(smallDF), "key")
 
// RDD API
val broadcastSmall = sc.broadcast(smallRDD.collectAsMap())
val result = bigRDD.flatMap { case (key, value) =>
  broadcastSmall.value.get(key).map(smallValue => (key, value, smallValue))
}

策略四:热点 Key 单独处理

将热点 Key 与普通 Key 分开处理,热点 Key 用 Broadcast 或单独的小作业处理,普通 Key 正常 Shuffle:

val hotKeys = Set("hot_key_1", "hot_key_2")  // 预先确定热点 Key
 
val hotRDD = rdd.filter(x => hotKeys.contains(x._1))
val normalRDD = rdd.filter(x => !hotKeys.contains(x._1))
 
// 热点 Key:广播处理
val hotResult = sc.parallelize(hotRDD.groupByKey().collect())
 
// 普通 Key:正常 Shuffle 聚合
val normalResult = normalRDD.reduceByKey(_ + _)
 
// 合并结果
val finalResult = hotResult.union(normalResult)

策略五:Spark 3.0+ AQE(Adaptive Query Execution)自动处理倾斜

spark.sql.adaptive.enabled=true
spark.sql.adaptive.skewJoin.enabled=true                    # 自动检测并拆分倾斜分区
spark.sql.adaptive.skewJoin.skewedPartitionFactor=5        # 某分区数据量超过中位数的5倍认为倾斜
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes=256MB

AQE 会在运行时检测 Shuffle 后的分区大小分布,自动将过大的倾斜分区拆分成多个小分区并行处理,无需代码改动,是 Spark 3.x 最重要的自动调优特性之一。

核心概念

Spark 3.0 引入的 AQE(Adaptive Query Execution,自适应查询执行) 通过在运行时收集 Shuffle 统计信息(各分区实际大小),动态地调整后续阶段的执行计划。它能自动处理三类问题:合并小分区(Coalesce Partitions)、转换 Join 策略(如 Sort-Merge Join 转 Broadcast Join)、拆分倾斜分区(Skew Join Optimization)。AQE 是目前 Spark 最重要的运行时优化机制,生产环境应默认开启。


第 4 章 Executor OOM

4.1 诊断信号

Spark UI 信号

  • Job/Stage 失败,失败原因包含 ExecutorLostFailure
  • Executor 页面显示某个 Executor 的 Status = Dead

日志信号

# JVM 堆内 OOM
java.lang.OutOfMemoryError: Java heap space

# 堆外内存 OOM(YARN 杀死容器)
WARN YarnAllocator: Container ... killed by YARN for exceeding memory limits
Container is running beyond physical memory limits. Current usage: X MB

关键区别

  • Java heap space → JVM 堆内存耗尽(堆内执行/存储/用户内存超限)
  • Container killed by YARN → 整个 Container 内存超限(通常是堆外内存配置不当)

4.2 根因与对应处理

情形一:执行内存 OOM

症状:Java heap space + Task 日志中有大量 Spill 记录(说明执行内存已经在努力 Spill,但仍然不够)

根因:单个 Task 的数据量超出了所有内存腾出手段后的容量上限(Spill 后仍 OOM,通常是某次 Merge 或排序需要同时在内存中持有太多数据)

处理:

# 1. 增大执行内存
spark.executor.memory=32g
spark.memory.fraction=0.75

# 2. 增大分区数,减小单 Task 数据量
spark.sql.shuffle.partitions=2000

# 3. 开启堆外内存,执行内存挪到堆外,避免堆内 GC 和内存碎片
spark.memory.offHeap.enabled=true
spark.memory.offHeap.size=16g

情形二:用户代码 OOM

症状:Java heap space + 崩溃时没有 Spill 记录,堆转储(Heap Dump)显示大量用户自定义对象

根因:用户在 map/mapPartitions 等算子中,在内存中积累了大量对象(如把整个分区的数据收集到一个 List 中)

处理:

  • 检查用户代码,避免在 Task 中使用全量数据的内存结构
  • 对于需要迭代访问的场景,使用 Iterator 逐条处理而非 collect() 全量加载
  • 适当减小分区数据量(增大 spark.sql.shuffle.partitions

情形三:广播变量过大

症状:在创建广播变量后不久 OOM,堆转储显示大量来自同一广播变量的对象

根因:广播变量(sc.broadcast())的数据会完整地存在每个 Executor 的内存中,如果广播的数据集很大(>1GB),会严重挤占每个 Executor 的内存

处理:

  • 检查广播变量的大小:broadcastVar.value.size 或序列化后的字节数
  • 如果广播数据确实较大(>500MB),考虑改为 Shuffle Join
  • 调整 spark.sql.autoBroadcastJoinThreshold 的上限,防止 Spark SQL 自动广播过大的表:
    spark.sql.autoBroadcastJoinThreshold=50MB  # 默认 10MB,适当调大但不能无限大
    

情形四:YARN Container OOM(堆外内存超限)

症状:Container killed by YARN for exceeding memory limits,通常发生在开启了堆外内存后

根因:spark.executor.memoryOverhead(默认 max(executorMemory×0.1, 384MB))包含了 JVM 本身的堆外开销(JVM 元空间、线程栈、代码缓存等),但不包含 spark.memory.offHeap.size 指定的应用级堆外内存。如果同时开启了这两部分,YARN Container 的总内存上限必须覆盖两者之和。

处理:

# 假设:executor.memory=8g, offHeap.size=4g
# YARN Container 所需内存 = 8g + 4g + overhead
# overhead = max(8g*0.1, 384MB) = 819MB ≈ 1g

# 正确配置(在 YARN 模式下):
spark.executor.memory=8g
spark.memory.offHeap.enabled=true
spark.memory.offHeap.size=4g
spark.executor.memoryOverhead=5g  # 手动设置:4g offHeap + 1g 系统 overhead

生产避坑

Hadoop YARN 集群上,每个 Container 的内存上限由 YARN 的 yarn.scheduler.maximum-allocation-mb 和单个 Application 的 spark.yarn.executor.memoryOverhead 共同决定。常见错误是忘记同步调整 memoryOverhead,导致 Container 总内存超过 YARN 分配上限而被强制杀死。YARN 的内存限制检查是基于物理内存的实际使用量,不仅仅是 JVM 报告的堆大小。


第 5 章 Shuffle Fetch 失败

5.1 诊断信号

日志信号

WARN TaskSetManager: Lost task N in stage S:
  org.apache.spark.shuffle.FetchFailedException: Failed to connect to /host:port
  
# 或者
  org.apache.spark.shuffle.FetchFailedException: Connection from /host:port closed

Spark UI 信号

  • Stage 显示 Retry 标记(Stage 重试)
  • Executor 页面显示某个 Executor 曾经 Dead 后又恢复(GC 导致的临时不可达)

5.2 根因分析

Shuffle Fetch 失败的常见根因:

根因一:目标节点 GC 停顿

Map 端 Executor 发生长时间 Full GC,暂停时间超过了网络超时阈值。这是最常见的原因。

根因二:网络抖动

集群网络短暂不稳定,导致 TCP 连接超时或丢包,FetchRequest 没有收到响应。

根因三:Map Task 所在节点宕机或 Executor 被 Kill

节点物理故障,或者 YARN OOM Killer 杀死了 Executor 进程。

根因四:磁盘 I/O 过慢

Map 端 Executor 的磁盘写入正在进行(大量 Spill),导致响应 Fetch 请求的速度极慢,超时。

5.3 调优策略

策略一:增大重试次数和等待时间

spark.shuffle.io.maxRetries=10          # 默认 3,调大应对瞬时故障
spark.shuffle.io.retryWait=30s         # 默认 5s,调大应对 GC 导致的长时间停顿
spark.shuffle.io.connectionTimeout=300s # 默认 120s,适当调大

策略二:解决根本 GC 问题

如果 Fetch 失败源于目标节点的 GC 停顿:

# 开启 G1GC,减少 Full GC 频率
spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps

# 减少堆内对象数量,降低 GC 压力
spark.serializer=org.apache.spark.serializer.KryoSerializer  # 减少对象大小
spark.memory.offHeap.enabled=true                             # 将部分数据移出堆,减少 GC 压力

策略三:启用 ExternalShuffleService(ESS)

spark.shuffle.service.enabled=true
spark.shuffle.service.port=7337

ESS 将 Shuffle 文件的服务职责从 Executor 进程中分离出来,即使 Executor 因为 GC 暂停或被 Kill,已写出的 Shuffle 文件仍然可以通过 ESS 被其他 Executor 读取,大幅降低 FetchFailedException 的发生概率。

核心概念

ESS 是一个常驻于每个 Worker 节点的独立进程。启用 ESS 后,Map Task 写出的 Shuffle 文件由 ESS 注册和管理,Reduce Task 的 Fetch 请求直接发往 ESS,而不是 Map Task 所在的 Executor。这个解耦使得 Executor 的生命周期不再影响 Shuffle 数据的可用性,是高可靠性 Spark 集群的标准配置。


第 6 章 GC 压力过大

6.1 诊断信号

Spark UI 信号

  • Executors 页面中,GC Time / Task Time > 10%(超过 10% 说明 GC 是显著瓶颈,超过 30% 说明严重)
  • Stage 的任务时间分布中,部分 Task 的时间显著长于其他(可能是长时间 GC 导致)

Executor 日志信号(需要开启 -verbose:gc):

GC pause (G1 Humongous Allocation) 15.230ms
GC pause (G1 Mixed) 3.421ms
Full GC (Ergonomics) 15.123s  # Full GC > 1s 说明严重问题

6.2 根因分析

GC 压力大的常见来源:

来源一:大量小对象

Shuffle 操作(排序、聚合)产生大量临时 Java 对象((key, value) 对、中间聚合结果),在 Young Generation 中快速积累,触发频繁 Young GC。

来源二:大对象直接进入老年代

超过 Young GC 晋升阈值的对象(如大型 HashMap、大 Array)直接分配到 Old Generation,老年代快速填满,触发 Full GC。

来源三:内存碎片

长时间运行后,堆内存出现碎片化,大对象分配失败,触发 Full GC 进行碎片整理。

6.3 调优策略

策略一:切换到 G1 GC 并调优

G1GC 是处理大堆(> 4GB)时最推荐的 GC,相比 CMS 和默认 GC 有更短、更可预期的停顿时间:

spark.executor.extraJavaOptions=\
  -XX:+UseG1GC \
  -XX:G1HeapRegionSize=16M \       # 建议设为堆大小 / 2048
  -XX:MaxGCPauseMillis=200 \       # GC 停顿目标(ms)
  -XX:+G1HeapWastePercent=5 \      # 控制混合 GC 的触发频率
  -XX:InitiatingHeapOccupancyPercent=35  # 老年代使用率超35%触发并发标记

策略二:将 Shuffle 数据移出 JVM 堆

最根本的减少 GC 方式:让 Shuffle 的中间数据不在 JVM 堆上分配对象。

spark.memory.offHeap.enabled=true
spark.memory.offHeap.size=8g    # 将执行内存挪到堆外

UnsafeShuffleWriter 在堆外的 MemoryBlock 中存储序列化数据,完全不占用 JVM 堆,GC 几乎感知不到 Shuffle 的存在。

策略三:使用 Kryo 序列化减少对象大小

spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.kryo.registrationRequired=false

Kryo 序列化产生的字节数比 Java 序列化少 30-50%,更小的对象意味着 GC 追踪的工作量也更少。

策略四:适当降低 Executor 并发

每个 Executor 的并发 Task 数由 spark.executor.cores 控制(默认 = Executor 的 CPU 核数)。并发 Task 越多,每个 Task 分到的 Execution Memory 越少,更容易触发 Spill,同时多个 Task 同时产生大量临时对象,GC 压力倍增:

# 对于内存密集型作业,降低并发 Task 数
spark.executor.cores=2   # 而不是 4 或 8
# 相应增大 executor 数量保持总并行度
spark.executor.instances=20

第 7 章 AQE 与自动化调优

7.1 AQE 的三大能力

Spark 3.0+ 的 AQE 大幅降低了手动调优的难度。开启方式:

spark.sql.adaptive.enabled=true   # Spark 3.2+ 默认开启

能力一:动态合并小分区(Coalesce Partitions)

Shuffle 后如果实际分区数据量远小于预期(如设置了 2000 个分区,但实际数据量较小),AQE 会自动合并相邻的小分区,减少 Task 数量,避免调度开销。

spark.sql.adaptive.coalescePartitions.enabled=true
spark.sql.adaptive.coalescePartitions.minPartitionNum=1
spark.sql.adaptive.advisoryPartitionSizeInBytes=128MB  # 目标合并分区大小

能力二:动态切换 Join 策略

在运行时,如果一侧的 Shuffle 数据量实际上很小(小于 autoBroadcastJoinThreshold),AQE 可以将计划中的 Sort-Merge Join 动态切换为 Broadcast Join,消除一侧的 Shuffle。

能力三:自动处理倾斜 Join

spark.sql.adaptive.skewJoin.enabled=true
spark.sql.adaptive.skewJoin.skewedPartitionFactor=5
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes=256MB

当某个分区的大小超过中位数 × skewedPartitionFactor 且超过 skewedPartitionThresholdInBytes 时,AQE 自动将这个倾斜分区拆分成多个子分区,与 Join 对侧的分区做多路并行 Join,最后合并结果。

7.2 AQE 的局限

AQE 不是万能的:

  • 只对 SQL/DataFrame 作业有效,对 RDD API 作业无效
  • 只能优化 Shuffle 之后的阶段,无法改变 Shuffle 之前的数据分布
  • 极度倾斜(某 Key 的数据量是总量的 30% 以上)时,即使拆分也很难完全均衡

对于极度严重的倾斜,仍需要代码层面的 Salting 方案配合。


第 8 章 参数速查表

8.1 内存配置参数

参数默认值调优方向适用场景
spark.executor.memory1g按需调大所有场景
spark.executor.memoryOverheadmax(10%, 384MB)开启堆外内存时需增大堆外内存场景
spark.memory.fraction0.6Shuffle 密集型调大到 0.75执行内存不足时
spark.memory.storageFraction0.5无 Cache 时调小(0.3);ML 训练调大(0.7)存储/执行内存比例优化
spark.memory.offHeap.enabledfalsetrueGC 压力大时
spark.memory.offHeap.size0按需配置(通常等于 executor.memory 的 50-100%)堆外内存场景

8.2 Shuffle 配置参数

参数默认值调优方向适用场景
spark.sql.shuffle.partitions200按数据量调整(推荐:总Shuffle量GB × 4-8)分区数调优
spark.shuffle.file.buffer32k调大到 64k-128k磁盘 I/O 瓶颈
spark.shuffle.spill.compresstrue保持默认通用
spark.io.compression.codeclz4保持 lz4(CPU/IO 最佳平衡)压缩调优
spark.reducer.maxSizeInFlight48MB内存充足时调大(96MB-192MB)Shuffle Read 并发
spark.shuffle.io.maxRetries3网络不稳定时调大(10-20)故障容忍
spark.shuffle.io.retryWait5sGC 导致超时时调大(30s-60s)故障容忍
spark.shuffle.service.enabledfalse生产环境建议 true高可靠性
spark.local.dir/tmp配置多个 SSD 目录Spill 性能

8.3 序列化与 GC 参数

参数默认值推荐值说明
spark.serializerJavaSerializerKryoSerializer速度和空间都更优
spark.kryo.registrationRequiredfalsefalse不强制注册,方便使用
spark.executor.extraJavaOptions--XX:+UseG1GC -XX:+PrintGCDetailsGC 调优和监控

8.4 AQE 参数(Spark 3.0+)

参数默认值(3.2+)建议
spark.sql.adaptive.enabledtrue保持 true
spark.sql.adaptive.skewJoin.enabledtrue保持 true
spark.sql.adaptive.coalescePartitions.enabledtrue保持 true
spark.sql.adaptive.advisoryPartitionSizeInBytes64MB调整为 128MB(减少分区合并后的过小分区)
spark.sql.autoBroadcastJoinThreshold10MB按集群内存情况调整(最大不超过 500MB)

第 9 章 调优清单:上线前的标准检查

以下是一份在作业上线或出现性能问题时的快速检查清单:

[ ] 1. 序列化器是否已配置 Kryo?

spark.serializer=org.apache.spark.serializer.KryoSerializer

[ ] 2. Shuffle 分区数是否合理?

  • 检查 Stage 的 Shuffle Read Size (Median per Task),目标是 128MB-512MB per Task
  • spark.sql.adaptive.enabled=true(Spark 3.x 默认)可以自动合并过小的分区

[ ] 3. 是否存在 groupByKey 可以替换为 reduceByKey 的情况?

[ ] 4. 是否存在可以 Broadcast 的小表 Join?

  • 检查 SQL 执行计划(explain(true) 或 Spark UI SQL 页面),确认 Sort-Merge Join 是否可以改为 Broadcast Join

[ ] 5. GC 时间是否超过 Task 时间的 10%?

  • 如是,考虑 G1GC + 堆外内存

[ ] 6. Executor 内存是否足够?

  • spark.executor.memory 至少为每核 2-4GB
  • 观察 Spill (Disk) / Write Size 比率,目标 < 1

[ ] 7. 数据倾斜诊断

  • Stage Task 的 Max Duration / Median Duration < 3(无明显倾斜)

[ ] 8. 是否开启了 ESS?(高可靠性要求时)

  • spark.shuffle.service.enabled=true

[ ] 9. local.dir 是否指向 SSD?

  • Spill 密集型作业必须配置 SSD 目录

[ ] 10. YARN Container 内存上限是否覆盖了堆外内存?

  • spark.executor.memoryOverhead = offHeap.size + 系统 overhead

小结

生产调优没有银弹,本文提供的是一个系统化的诊断框架:

  • 从症状定位瓶颈:使用 Spark UI 的 Stage 详情、Executors 页面、Task 时间分布和 GC 时间,快速识别瓶颈类型
  • 五类核心问题:Spill 过多(内存不足/分区太少)、数据倾斜(Salting/Broadcast/AQE)、OOM(堆内堆外分类处理)、Fetch 失败(重试+ESS+GC 根治)、GC 过重(G1+堆外内存)
  • 优先级原则:算法层 > 配置层 > 资源层,先消灭根因,再做参数微调
  • AQE 是最佳盟友:Spark 3.x 的 AQE 自动处理大部分分区数调优和轻度数据倾斜,应默认开启

第 11 篇(专栏最后一篇)将介绍 Remote Shuffle Service(RSS)——当 Shuffle 的规模超出单集群能力时,如何通过独立的 Shuffle 存储服务彻底解耦计算与存储,以及业界主流 RSS 方案(Apache Celeborn、Uber RSS、Linkedin Magnet)的设计对比。


思考题

  1. Shuffle Spill 过多和数据倾斜是两类常见的 Shuffle 性能问题,但它们在 Spark UI 上的表现有时非常相似(都会出现长尾 Task)。如何通过 Spark UI 的指标精确区分”因为内存不足被迫 Spill 导致的长尾”和”因为数据量不均衡导致的长尾”?关键判断指标有哪些?
  2. 增大 spark.executor.memory 是解决 OOM 和 Spill 最直接的手段,但这会减少可运行的 Executor 数量(在资源总量固定的情况下)。存在一个最优的内存 / 并行度权衡点,超过这个点后增大内存反而会导致作业变慢。如何找到这个平衡点?有没有经验性的调优公式?
  3. Remote Shuffle Service(RSS)将 Shuffle 数据从 Executor 本地磁盘迁移到独立的存储服务,使得 Executor 可以随时释放而不影响 Shuffle 数据。但 RSS 引入了额外的网络跳转(Executor → RSS Server → Reducer)。在什么网络拓扑和延迟条件下,RSS 的额外网络开销会抵消它带来的弹性伸缩收益?

参考资料