04 依赖关系的本质:宽依赖与窄依赖的结构定义与性能边界
摘要
上一篇推导了惰性求值如何让 DAGScheduler 获得”全局视野”,并在 ShuffleDependency 处切割 Stage。本文进入依赖关系本身的深层分析:窄依赖与宽依赖的区分标准究竟是什么?这个标准背后的工程逻辑是什么?ShuffleDependency 在源码层面携带了哪些信息?Stage 切割算法的完整逻辑是怎样的?宽依赖对容错代价的影响为何比窄依赖大得多?以及,在生产实践中如何利用对依赖关系的理解来消除不必要的 Shuffle。
第 1 章 依赖分类的核心标准:数据分区的流向
1.1 一个直觉性问题:父分区的数据”归谁所有”?
理解窄依赖与宽依赖的本质区分,最好从一个直觉性问题出发:一个父 RDD 分区的数据,最终会流向几个子 RDD 分区?
-
如果只流向一个子分区:这个父分区的数据是”专属”的,子分区和父分区可以组成一个独立的计算单元(Task),在单台机器上执行,不需要等待其他父分区——这就是窄依赖。
-
如果可能流向多个子分区:这个父分区的数据需要被”拆分”后发送到不同节点——这就是宽依赖,必须触发 Shuffle。
这个标准可以用形式化的方式表达:设父 RDD 有 个分区 ,子 RDD 有 个分区 。
- 窄依赖:对任意父分区 ,存在至多一个子分区 使用 的数据。(每个父分区最多被一个子分区使用)
- 宽依赖:存在某个父分区 ,有多个子分区 都需要 的部分数据。(父分区被多个子分区瓜分)
注意:窄依赖的定义允许多个父分区对应一个子分区(如 union,多对一),但不允许一个父分区对应多个子分区。宽依赖则恰恰是”一个父分区对应多个子分区”的情形。
1.2 从代码层面理解依赖分类
Spark 源码中,依赖类型的类层次结构如下:
Dependency[T]
├── NarrowDependency[T] (窄依赖基类)
│ ├── OneToOneDependency[T] (一对一,map/filter/flatMap)
│ └── RangeDependency[T] (范围,union)
└── ShuffleDependency[K, V, C] (宽依赖/Shuffle依赖)
窄依赖基类 NarrowDependency 定义了一个抽象方法:
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
// 给定子分区序号,返回它依赖的所有父分区序号
def getParents(partitionId: Int): Seq[Int]
}这个方法的签名揭示了窄依赖的关键性质:给定一个子分区,可以精确地列举出它依赖的父分区集合。这种”可预知性”使得调度器无需任何全局通信,就能为每个 Task 独立确定它需要的数据范围。
OneToOneDependency 的实现极其简单:
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
override def getParents(partitionId: Int): List[Int] = List(partitionId)
// 子分区 i 只依赖父分区 i,无需任何计算
}RangeDependency(用于 union)稍复杂:
class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
extends NarrowDependency[T](rdd) {
override def getParents(partitionId: Int): List[Int] = {
// 子分区序号 partitionId 在 [outStart, outStart+length) 范围内
// 对应父分区序号 partitionId - outStart + inStart
if (partitionId >= outStart && partitionId < outStart + length) {
List(partitionId - outStart + inStart)
} else {
Nil
}
}
}宽依赖(ShuffleDependency)则没有对应的 getParents 方法——因为在 Shuffle 之前,你无法预知某个子分区需要哪些父分区的哪些记录(这取决于每条记录的 Key 和 Partitioner 的映射关系,只有实际执行时才能确定)。
第 2 章 窄依赖深析:流水线、独立性与局部容错
2.1 OneToOneDependency:完美的计算局部性
OneToOneDependency 是 Spark 中最理想的依赖形式。它保证了:
计算完全本地化:子分区 只需要父分区 的数据,而 的数据就在产生它的那台 Executor 的内存或磁盘上。只要 被调度到与 相同的机器(通过 getPreferredLocations 实现),整个计算链路完全本地化,零网络传输。
Task 之间完全独立:处理 的 Task 和处理 的 Task 没有任何依赖关系,可以真正并行。一个 Task 失败不影响其他 Task,也不需要重算其他分区。
Pipeline 优化的基础:多个连续的 OneToOneDependency 算子(如 filter → map → flatMap)可以被合并进同一个 Task 的 Iterator 链。每条记录在 JVM 调用栈中完成全部变换,不产生任何中间数据物化。
2.2 RangeDependency:Union 的实现原理
union(rdd2) 是 Spark 中唯一使用 RangeDependency 的核心算子。它的行为是:将两个 RDD 的分区列表直接拼接,而不触发任何数据移动。
rdd1: [P0, P1, P2] (3个分区)
rdd2: [P0, P1] (2个分区)
union: [P0, P1, P2, P3, P4] (5个分区)
分区映射关系:
- union.P0 → rdd1.P0 (RangeDependency: inStart=0, outStart=0, length=3)
- union.P1 → rdd1.P1
- union.P2 → rdd1.P2
- union.P3 → rdd2.P0 (RangeDependency: inStart=0, outStart=3, length=2)
- union.P4 → rdd2.P1
union 是窄依赖,不触发 Shuffle,这意味着你可以用 union 将多个数据源合并,而不产生任何额外的网络开销。每个 Task 独立处理其对应的原始分区数据。
union 的常见误用
很多开发者认为
union后数据是”混合均匀”的,实际上union只是分区列表的拼接。如果 rdd1 有数据倾斜(某些分区特别大),union 后的结果同样存在倾斜,且没有任何自动均衡。如果需要均衡分布,应在union后接repartition(n)(但这会触发 Shuffle)。
2.3 窄依赖的容错代价:精确的分区级重算
当窄依赖 Stage 中的某个 Task 失败时(Executor 崩溃、节点下线),调度器需要重算失败的分区。由于窄依赖的 getParents 可以精确给出依赖的父分区列表,重算路径是确定且最小的:
失败分区: MappedRDD.P3
重算路径(沿血缘向上追溯):
MappedRDD.P3
← 依赖 FilteredRDD.P3 (OneToOneDependency)
← 依赖 HadoopRDD.P3 (OneToOneDependency)
← 从 HDFS Block 3 重新读取
需要重跑的 Task 数: 1个
影响范围: 仅 P3,其他分区完全不受影响
这种”精确的分区级容错”是窄依赖最重要的工程价值之一——失败代价小,恢复快,不影响其他正在运行的 Task。
第 3 章 宽依赖(ShuffleDependency)深析:Shuffle 的完整机制
3.1 ShuffleDependency 携带的信息远超依赖关系本身
ShuffleDependency 不只是一个”我依赖这个 RDD”的标记,它携带了执行 Shuffle 所需的全部元数据:
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
@transient private val _rdd: RDD[_ <: Product2[K, V]],
val partitioner: Partitioner, // 决定每条 KV 记录路由到哪个下游分区
val serializer: Serializer = ..., // 序列化方式(默认 Java 序列化,生产建议 Kryo)
val keyOrdering: Option[Ordering[K]] = None, // Key 排序(sortByKey 时设置)
val aggregator: Option[Aggregator[K, V, C]] = None, // 聚合器(reduceByKey 时设置)
val mapSideCombine: Boolean = false // 是否开启 Map 端预聚合
) extends Dependency[Product2[K, V]] {
// 全局唯一的 Shuffle ID,用于 MapOutputTracker 追踪 Shuffle 输出位置
val shuffleId: Int = _rdd.context.newShuffleId()
// 将 ShuffleDependency 注册到 ShuffleManager,申请 Shuffle 句柄
val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager
.registerShuffle(shuffleId, this)
}这些字段共同决定了 Shuffle 的行为模式:
partitioner(必须):决定 Shuffle 的”分流规则”。每条 (K, V) 记录调用 partitioner.getPartition(key),得到目标下游分区序号。所有相同 Key 的记录必须进入同一个分区,这是 groupByKey/reduceByKey 语义正确性的基础。
aggregator + mapSideCombine(可选):这两个字段共同决定了 Shuffle Write 阶段是否进行 Map 端预聚合。
reduceByKey:mapSideCombine = true,聚合器定义了如何将相同 Key 的多个 Value 合并为一个groupByKey:mapSideCombine = false,所有记录原样写入,不做任何合并
mapSideCombine 的工程价值极大。考虑一个词频统计场景:原始数据有 1 亿条 (word, 1) 记录,经过 Map 端预聚合后,可能只剩 100 万条 (word, count) 记录需要通过网络传输——数据量减少 100 倍,Shuffle 性能提升数倍到数十倍。
3.2 Shuffle 的完整数据流
一次 Shuffle 操作在物理执行层面分为两个阶段,被 Stage 边界分隔:
Shuffle Write(属于 ShuffleMapStage):
上游 Stage 的每个 Task(ShuffleMapTask)在完成自己的计算后,不是将结果返回 Driver,而是将结果按目标分区序号写入本地磁盘的 Shuffle 文件。
- 对每条记录,调用
partitioner.getPartition(key)计算目标分区 - 若开启 Map 端预聚合,先在内存中按 Key 聚合(使用
ExternalAppendOnlyMap),内存不足时 Spill 到磁盘 - 将数据按分区序号排序后写入 Shuffle 数据文件(
.data文件)和索引文件(.index文件,记录每个分区在数据文件中的偏移量) - 向
MapOutputTracker(Driver 端)汇报 Shuffle 输出的位置信息(哪台机器、哪个文件、哪个偏移区间)
Shuffle Read(属于 ResultStage 或下一个 ShuffleMapStage):
下游 Stage 的每个 Task(ShuffleReadTask)启动后:
- 向 Driver 的
MapOutputTracker查询:我需要的分区数据在哪些机器上? - 通过
BlockManager的网络模块(基于 Netty),从上游所有 Executor 拉取对应的数据块 - 对拉取的数据进行归并排序(若需要
keyOrdering)或直接聚合(若有aggregator) - 将合并后的数据作为 Iterator 传递给后续算子
graph TD subgraph "ShuffleMapStage -- N 个 Task 并行执行" T0["Task 0</br>处理分区 P0"] T1["Task 1</br>处理分区 P1"] T2["Task 2</br>处理分区 P2"] end subgraph "本地磁盘 -- Shuffle 文件" F0["Executor 0</br>map_0.data</br>含: 分区0数据, 分区1数据, 分区2数据"] F1["Executor 1</br>map_1.data"] F2["Executor 2</br>map_2.data"] end subgraph "MapOutputTracker (Driver 端)" MOT["记录: Shuffle 0</br>Task 0 输出 → Executor 0</br>Task 1 输出 → Executor 1</br>Task 2 输出 → Executor 2"] end subgraph "ResultStage -- M 个 Task 并行执行" R0["Reduce Task 0</br>拉取所有 Executor 上分区0的数据"] R1["Reduce Task 1</br>拉取所有 Executor 上分区1的数据"] end T0 --> F0 T1 --> F1 T2 --> F2 T0 -->|"汇报输出位置"| MOT T1 -->|"汇报输出位置"| MOT T2 -->|"汇报输出位置"| MOT MOT -->|"查询: 分区0在哪里?"| R0 MOT -->|"查询: 分区1在哪里?"| R1 F0 -->|"网络拉取: 分区0部分"| R0 F1 -->|"网络拉取: 分区0部分"| R0 F2 -->|"网络拉取: 分区0部分"| R0 classDef mapStage fill:#d4f1f9,stroke:#2c6e8a,stroke-width:2px; classDef reduceStage fill:#f9ebd4,stroke:#8a6e2c,stroke-width:2px; classDef disk fill:#f9d4d4,stroke:#cc3333,stroke-width:2px; classDef driver fill:#d4f9d4,stroke:#2c8a2c,stroke-width:2px; class T0,T1,T2 mapStage; class R0,R1 reduceStage; class F0,F1,F2 disk; class MOT driver;
3.3 宽依赖的容错代价:比窄依赖昂贵得多
宽依赖的容错代价远高于窄依赖,原因在于 Shuffle 数据的”多对多依赖”关系:
场景:ShuffleMapStage 有 3 个 Task,ResultStage 有 2 个 Task
Task 0 (Map) 写出: 分区0的数据 → Reduce Task 0 需要
分区1的数据 → Reduce Task 1 需要
Task 1 (Map) 写出: 分区0的数据 → Reduce Task 0 需要
分区1的数据 → Reduce Task 1 需要
Task 2 (Map) 写出: 分区0的数据 → Reduce Task 0 需要
分区1的数据 → Reduce Task 1 需要
情形一:某个 Reduce Task 失败(如 Reduce Task 0 所在的 Executor 崩溃)
- 只需重新调度 Reduce Task 0,从 3 个 Executor 重新拉取分区0的数据
- 代价:只重算 1 个 Task
情形二:某个 Map Task 的输出丢失(如存储 Map Task 1 输出的 Executor 崩溃)
- Map Task 1 的 Shuffle 文件丢失,导致 Reduce Task 0 和 Reduce Task 1 都无法获得完整数据
- 需要重新执行 Map Task 1,重新生成 Shuffle 文件
- 由于 ResultStage 已经在运行中(部分 Reduce Task 可能已完成),DAGScheduler 必须:
- 重新提交 Map Task 1(重算 ShuffleMapStage 的一部分)
- 等待新的 Shuffle 文件生成
- 重新提交所有依赖 Map Task 1 输出的 Reduce Task(可能包括已完成的 Reduce Task 0 和 1)
- 代价:重算成本可能是失败前已完成工作量的数倍
这就是为什么在宽依赖密集的 DAG 中(如多次 Shuffle 的复杂 Join),生产上必须对关键中间结果做 checkpoint——将数据持久化到可靠存储(HDFS),切断血缘链,避免一个节点故障引发的雪崩式重算。
第 4 章 Stage 划分算法:DAGScheduler 的核心逻辑
4.1 算法的输入与输出
输入:包含所有 RDD 对象和依赖关系的 DAG(从 Action 触发点可达的所有 RDD)
输出:一组 Stage(每个 Stage 包含一批可以流水线执行的 RDD),以及 Stage 之间的依赖关系(哪个 Stage 的输出是另一个 Stage 的输入)
4.2 递归创建 Stage 的核心逻辑
DAGScheduler 使用**深度优先搜索(DFS)**从 Action 作用的末端 RDD 向上遍历,递归地创建 Stage:
// 简化的 Stage 创建逻辑(参考 DAGScheduler.scala)
private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
// 获取当前 RDD 的所有 ShuffleDependency(宽依赖)
getShuffleDependencies(rdd).map { shuffleDep =>
// 对每个宽依赖,创建或复用对应的 ShuffleMapStage
getOrCreateShuffleMapStage(shuffleDep, firstJobId)
}.toList
}
private def getShuffleDependencies(rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
val parents = new HashSet[ShuffleDependency[_, _, _]]
val visited = new HashSet[RDD[_]]
val waitingForVisit = new ArrayStack[RDD[_]]
waitingForVisit.push(rdd)
while (waitingForVisit.nonEmpty) {
val toVisit = waitingForVisit.pop()
if (!visited(toVisit)) {
visited += toVisit
toVisit.dependencies.foreach {
case shuffleDep: ShuffleDependency[_, _, _] =>
parents += shuffleDep // 找到宽依赖,记录
case dependency =>
waitingForVisit.push(dependency.rdd) // 窄依赖,继续向上遍历
}
}
}
parents
}核心逻辑:getShuffleDependencies 从当前 RDD 开始,沿窄依赖向上追溯(因为窄依赖不是 Stage 边界,继续合并),直到遇到宽依赖(ShuffleDependency)为止。所有遇到的宽依赖就是当前 Stage 的”上边界”——它们对应的 ShuffleMapStage 是当前 Stage 的父 Stage。
4.3 一个完整示例:多 Shuffle 场景的 Stage 划分
val rdd1 = sc.textFile("hdfs://users.txt") // RDD A
val rdd2 = rdd1.map(parseUser) // RDD B (窄)
val rdd3 = rdd2.filter(_.age > 18) // RDD C (窄)
val rdd4 = sc.textFile("hdfs://orders.txt") // RDD D
val rdd5 = rdd4.map(parseOrder) // RDD E (窄)
val rdd6 = rdd3.join(rdd5) // RDD F (宽:两路输入都需要 Shuffle)
val rdd7 = rdd6.map(formatResult) // RDD G (窄)
val rdd8 = rdd7.groupByKey() // RDD H (宽)
val rdd9 = rdd8.map(aggregateGroup) // RDD I (窄)
rdd9.saveAsTextFile("hdfs://output/") // ActionStage 划分结果:
| Stage | 类型 | 包含的 RDD | 说明 |
|---|---|---|---|
| Stage 0 | ShuffleMapStage | A → B → C | 为 join 左侧准备 Shuffle 输出 |
| Stage 1 | ShuffleMapStage | D → E | 为 join 右侧准备 Shuffle 输出 |
| Stage 2 | ShuffleMapStage | F → G | join 结果,为 groupByKey 准备 Shuffle 输出 |
| Stage 3 | ResultStage | H → I | 最终 groupByKey 和 map,写入 HDFS |
执行顺序:Stage 0 和 Stage 1 可以并行执行(无依赖关系),两者都完成后 Stage 2 才能启动,Stage 2 完成后 Stage 3 才能启动。
graph TD subgraph "Stage 0 -- ShuffleMapStage" A["textFile</br>users.txt"] B["map</br>parseUser"] C["filter</br>age > 18"] A --> B --> C end subgraph "Stage 1 -- ShuffleMapStage" D["textFile</br>orders.txt"] E["map</br>parseOrder"] D --> E end C -->|"Shuffle</br>join 左侧"| F E -->|"Shuffle</br>join 右侧"| F subgraph "Stage 2 -- ShuffleMapStage" F["join</br>ShuffledRDD"] G["map</br>formatResult"] F --> G end G -->|"Shuffle</br>groupByKey"| H subgraph "Stage 3 -- ResultStage" H["groupByKey</br>ShuffledRDD"] I["map</br>aggregateGroup"] J["saveAsTextFile"] H --> I --> J end classDef s0 fill:#d4f1f9,stroke:#2c6e8a,stroke-width:2px; classDef s1 fill:#e8f5e9,stroke:#2e7d32,stroke-width:2px; classDef s2 fill:#fff3e0,stroke:#e65100,stroke-width:2px; classDef s3 fill:#f3e5f5,stroke:#6a1b9a,stroke-width:2px; class A,B,C s0; class D,E s1; class F,G s2; class H,I,J s3;
第 5 章 生产优化:基于依赖关系的性能调优
5.1 reduceByKey vs groupByKey:Map 端预聚合的价值
这是 Spark 最经典的优化建议,但很多人知其然而不知其所以然。
groupByKey 的数据流:
原始数据(1亿条 (word, 1))
→ Shuffle:全量数据网络传输
→ Reduce 端:按 Key 归并所有值为 List
→ 网络传输量:1亿条记录 × 每条大小
reduceByKey 的数据流:
原始数据(1亿条 (word, 1))
→ Map 端预聚合:每个分区内按 Key 合并(假设词汇表 10 万词,每个分区压缩为 10 万条)
→ Shuffle:只传输预聚合后的数据
→ Reduce 端:按 Key 做最终合并
→ 网络传输量:分区数 × 10 万条记录(若 100 分区则 1000 万条,减少 90%)
为什么不是所有聚合操作都能开启 Map 端预聚合?
Map 端预聚合要求聚合函数满足结合律和交换律(即 f(f(a, b), c) == f(a, f(b, c)))。reduceByKey 满足这一条件(加法、乘法等),而 groupByKey 不能预聚合(因为需要保留所有原始值)。某些复杂聚合(如计算精确中位数)也无法 Map 端预聚合,因为局部中位数的合并不等于全局中位数。
5.2 利用 Partitioner 相等性消除 Shuffle
当两个 RDD 使用相同的 Partitioner(分区数相同、分区规则相同)时,对它们进行 join 或 cogroup 会变成窄依赖,完全跳过 Shuffle:
val p = new HashPartitioner(200)
// 预分区(只在第一次加载时 Shuffle 一次)
val userRDD = sc.textFile("hdfs://users.txt")
.map(parseUser)
.partitionBy(p)
.cache() // 缓存分区结果
val orderRDD = sc.textFile("hdfs://orders.txt")
.map(parseOrder)
.partitionBy(p)
.cache()
// 后续所有 join 都不触发 Shuffle!因为两个 RDD 已经按相同规则分区
val joined = userRDD.join(orderRDD) // 窄依赖 join
val joined2 = userRDD.join(anotherRDD.partitionBy(p)) // 同样无 Shuffle这个优化对于需要反复 Join 同一个维度表的场景尤其有价值(如广告系统中反复关联用户画像表)。一次预分区 + 缓存,换取后续所有 Join 的零 Shuffle 代价。
5.3 Checkpoint 截断过长血缘链
在迭代计算(如机器学习算法的多轮迭代)中,每一轮迭代都在上一轮 RDD 的基础上增加新的 Transformation,血缘链逐轮增长:
第 1 轮:RDD_1
第 2 轮:RDD_1 → RDD_2
第 3 轮:RDD_1 → RDD_2 → RDD_3
...
第 N 轮:RDD_1 → ... → RDD_N(血缘链长度 N)
当血缘链过长时,两个问题随之而来:
- JVM 栈溢出:
compute方法的递归调用深度与血缘链长度成正比,血缘过长可能导致StackOverflowError - 容错代价指数增长:如果第 N 轮的某个分区丢失,需要从第 1 轮开始重算,代价是 N 轮的全量计算
解决方案是定期 Checkpoint:
sc.setCheckpointDir("hdfs://checkpoints/")
var current = initialRDD
for (i <- 1 to 100) {
current = iterate(current) // 每轮迭代增加若干 Transformation
if (i % 10 == 0) {
current.checkpoint() // 每 10 轮将当前状态持久化到 HDFS
current.count() // 触发一次 Action 使 checkpoint 真正执行
// checkpoint 完成后,current 的血缘链被截断,指向 HDFS 上的 checkpoint 数据
}
}checkpoint 与 cache 的本质区别:
cache:数据存储在 Executor 内存(或磁盘),Executor 崩溃后数据丢失,血缘链不截断checkpoint:数据写入可靠存储(HDFS),血缘链被截断(RDD 的dependencies被替换为指向 HDFS 文件的CheckpointRDD),即使所有 Executor 都崩溃也能恢复
5.4 避免小文件触发的 Shuffle 放大
repartition(n) 是一个常被滥用的算子。它内部实现是 coalesce(n, shuffle=true),强制触发一次全量 Shuffle。
常见误用场景:
// 错误:为了减少输出文件数,在最后 repartition
rdd.filter(...).map(...).repartition(10).saveAsTextFile(...)
// 问题:repartition 触发全量 Shuffle,所有数据网络传输一次,只是为了减少文件数
// 正确:使用 coalesce 不触发 Shuffle(合理场景下)
rdd.filter(...).map(...).coalesce(10).saveAsTextFile(...)
// coalesce(n, shuffle=false) 将相邻分区合并,无网络传输
// 注意:coalesce 无法增加分区数(只能减少),且可能导致数据倾斜repartition 的合理使用场景:
- 数据倾斜时需要重新均匀分布(
coalesce无法解决倾斜,因为它只合并相邻分区) - 需要增加分区数以提高并行度(
coalesce无法增加分区数) - Shuffle 读取后数据量比预期少得多,需要减少后续 Stage 的并行度
第 6 章 总结
依赖关系是 RDD 血缘体系最核心的设计点,也是 Spark 调优知识体系的基础框架:
- 窄依赖的本质是数据的”专属性”——父分区只服务于一个子分区,使得 Task 独立、Pipeline 可行、容错精确
- 宽依赖的本质是数据的”共享性”——父分区的数据被多个子分区瓜分,必须经过 Shuffle 这一全局同步点,代价是网络传输、磁盘 I/O 和同步等待
- Stage 划分是 DAGScheduler 对依赖类型的直接应用:沿窄依赖合并,在宽依赖处切割
- 生产优化的核心思路是:能用窄依赖的不用宽依赖,必须用宽依赖时尽量减少 Shuffle 数据量(Map 端预聚合、预分区复用)
在 下一篇文章 中,我们将从容错的角度,深入探讨血缘图如何支撑 Spark 的零副本容错机制,以及在什么情况下血缘容错会”失效”并需要 Checkpoint 介入。
思考题
rdd1.join(rdd2)中,如果 rdd1 已经用HashPartitioner(100)分区,而 rdd2 没有分区器,join 会触发几次 Shuffle?如果两者都用HashPartitioner(100)分区,join 会触发几次 Shuffle?为什么?sortByKey()是宽依赖操作,它内部使用RangePartitioner。但RangePartitioner的构造需要对父 RDD 采样(触发一次 Action)。这意味着sortByKey实际上触发了几次计算?如果你的数据已经近似均匀分布,有更高效的排序方式吗?- 在 Stage 2 已经有若干 Reduce Task 完成时,某个 Map Task 的输出所在 Executor 崩溃了。DAGScheduler 需要重跑哪些 Task?已完成的 Reduce Task 会被重新执行吗?这与 Spark 版本有关系吗?