引言

在Spark编程中,Transformation操作是构建数据处理流水线的核心。这些操作通过对RDD(弹性分布式数据集)进行转换,生成新的RDD,形成有向无环图(DAG)的计算逻辑。理解各种Transformation操作的语义、实现机制和性能特性,对于编写高效、可扩展的Spark应用至关重要。

本笔记将系统梳理Spark中常用的Transformation操作,从简单的映射过滤到复杂的聚合连接,深入分析每个操作的逻辑处理流程、数据依赖关系和适用场景。


一、基础映射与过滤操作

1.1 map()和mapValues()操作

语义:对RDD中的每个元素应用函数转换。

flowchart TD
    subgraph "输入RDD"
        P1["分区1<br>Record1<br>Record2"]
        P2["分区2<br>Record3<br>Record4"]
    end
    
    subgraph "转换过程"
        P1 --> F1["map(func)<br>逐个转换"]
        P2 --> F2["map(func)<br>逐个转换"]
    end
    
    subgraph "输出RDD"
        R1["分区1<br>func(Record1)<br>func(Record2)"]
        R2["分区2<br>func(Record3)<br>func(Record4)"]
    end
    
    F1 --> R1
    F2 --> R2

关键特性

  • 生成MapPartitionsRDD
  • 数据依赖关系:OneToOneDependency(窄依赖)
  • 输入输出一一对应,不改变分区结构

示例代码

// map()示例:将每个元素乘以2
val rdd1 = sc.parallelize(List(1, 2, 3, 4))
val rdd2 = rdd1.map(x => x * 2)
// 结果:List(2, 4, 6, 8)
 
// mapValues()示例:对键值对的Value部分进行转换
val kvRDD = sc.parallelize(List(("a", 1), ("b", 2), ("c", 3)))
val result = kvRDD.mapValues(x => x * 10)
// 结果:List(("a", 10), ("b", 20), ("c", 30))

1.2 filter()和filterByRange()操作

语义:根据条件筛选RDD中的元素。

flowchart TD
    subgraph "输入RDD"
        P1["分区1<br>(1,"a")<br>(2,"b")<br>(3,"c")"]
        P2["分区2<br>(4,"d")<br>(5,"e")<br>(6,"f")"]
    end
    
    subgraph "过滤过程"
        P1 --> F1["filter(条件)<br>逐条判断"]
        P2 --> F2["filter(条件)<br>逐条判断"]
    end
    
    subgraph "输出RDD"
        R1["分区1<br>符合条件的记录"]
        R2["分区2<br>符合条件的记录"]
    end
    
    F1 --> R1
    F2 --> R2

关键特性

  • 生成MapPartitionsRDD
  • 数据依赖关系:OneToOneDependency
  • 可能减少数据量,但分区数量不变

示例代码

// filter()示例:筛选Key为偶数的记录
val rdd1 = sc.parallelize(List((1,"a"), (2,"b"), (3,"c"), (4,"d"), (5,"e")))
val rdd2 = rdd1.filter{case (k, v) => k % 2 == 0}
// 结果:List((2,"b"), (4,"d"))
 
// filterByRange()示例:筛选Key在[2,4]范围内的记录
val rdd3 = rdd1.filterByRange(2, 4)
// 结果:List((2,"b"), (3,"c"), (4,"d"))

1.3 flatMap()和flatMapValues()操作

语义:将每个输入元素映射为多个输出元素(“展平”操作)。

flowchart TD
    subgraph "输入RDD"
        P1["分区1<br>句子1<br>句子2"]
    end
    
    subgraph "flatMap过程"
        P1 --> F1["flatMap(func)<br>分词并展平"]
    end
    
    subgraph "输出RDD"
        R1["分区1<br>单词1<br>单词2<br>单词3<br>单词4"]
    end
    
    F1 --> R1

关键特性

  • 生成MapPartitionsRDD
  • 数据依赖关系:OneToOneDependency
  • 可以改变数据条数,常用于文本分词、数据展开等场景

示例代码

// flatMap()示例:分词程序
val textRDD = sc.parallelize(List("Hello World", "Spark Programming"))
val wordsRDD = textRDD.flatMap(line => line.split(" "))
// 结果:List("Hello", "World", "Spark", "Programming")
 
// flatMapValues()示例:对Value进行展开
val kvRDD = sc.parallelize(List(("a", List(1,2)), ("b", List(3,4))))
val result = kvRDD.flatMapValues(list => list)
// 结果:List(("a",1), ("a",2), ("b",3), ("b",4))

二、采样操作

2.1 sample()操作

语义:对RDD进行随机采样。

flowchart TD
    subgraph "输入RDD"
        P1["分区1<br>Record1~Record5"]
        P2["分区2<br>Record6~Record10"]
    end
    
    subgraph "采样过程"
        P1 --> S1["sample(fraction)<br>按概率采样"]
        P2 --> S2["sample(fraction)<br>按概率采样"]
    end
    
    subgraph "输出RDD"
        R1["分区1<br>采样结果1"]
        R2["分区2<br>采样结果2"]
    end
    
    S1 --> R1
    S2 --> R2

关键特性

  • 生成PartitionwiseSampledRDD
  • 数据依赖关系:OneToOneDependency
  • 两种采样模式:
    • sample(false, fraction):伯努利抽样,每个元素以fraction概率被选中
    • sample(true, fraction):泊松分布抽样,可能产生比原始数据更多的样本

示例代码

val data = sc.parallelize(1 to 100)
 
// 无放回抽样:每个元素有20%的概率被选中
val sampled1 = data.sample(false, 0.2)
 
// 有放回抽样:使用泊松分布
val sampled2 = data.sample(true, 0.3)
 
// 获取约10%的数据作为训练集
val training = data.sample(false, 0.1)

2.2 sampleByKey()操作

语义:根据不同的Key设置不同的采样概率。

关键特性

  • 生成MapPartitionsRDD
  • 数据依赖关系:OneToOneDependency
  • 可以为每个Key指定不同的采样概率,适用于类别不平衡的数据集

示例代码

val data = sc.parallelize(List(
    (1, "a"), (1, "b"), (1, "c"), (1, "d"),
    (2, "e"), (2, "f"), (2, "g")
))
 
// 定义每个Key的采样概率
val fractions = Map(1 -> 0.5, 2 -> 0.8)
 
// 无放回模式
val sampled1 = data.sampleByKey(false, fractions)
 
// 有放回模式
val sampled2 = data.sampleByKey(true, fractions)

三、分区级操作

3.1 mapPartitions()和mapPartitionsWithIndex()操作

语义:对整个分区进行操作,而不是单个元素。

flowchart TD
    subgraph "输入RDD"
        P1["分区1<br>数据迭代器"]
        P2["分区2<br>数据迭代器"]
    end
    
    subgraph "分区处理"
        P1 --> F1["mapPartitions(func)<br>处理整个分区"]
        P2 --> F2["mapPartitions(func)<br>处理整个分区"]
    end
    
    subgraph "输出RDD"
        R1["分区1<br>处理结果"]
        R2["分区2<br>处理结果"]
    end
    
    F1 --> R1
    F2 --> R2

关键特性

  • 生成MapPartitionsRDD
  • 数据依赖关系:OneToOneDependency
  • 适用场景
    • 需要跨元素共享资源(如数据库连接、文件句柄)
    • 需要维护中间状态的处理
    • 批量操作比逐元素操作更高效的情况

示例代码

// mapPartitions()示例:计算每个分区中奇数和偶数的和
val data = sc.parallelize(1 to 10, 3)
 
val result = data.mapPartitions { iter =>
    var (oddSum, evenSum) = (0, 0)
    while (iter.hasNext) {
        val num = iter.next()
        if (num % 2 == 0) evenSum += num
        else oddSum += num
    }
    Iterator((oddSum, evenSum))
}
// 结果:每个分区输出(奇数之和, 偶数之和)
 
// mapPartitionsWithIndex()示例:查看分区信息
val data2 = sc.parallelize(List("a", "b", "c", "d", "e"), 2)
 
val withIndex = data2.mapPartitionsWithIndex { (index, iter) =>
    iter.map(value => s"分区$index: $value")
}
// 结果:List("分区0: a", "分区0: b", "分区0: c", "分区1: d", "分区1: e")
 
// 数据库操作示例(高效方式)
val dbData = data.mapPartitions { iter =>
    // 每个分区只建立一次数据库连接
    val connection = createDatabaseConnection()
    val result = new ArrayBuffer[String]
    
    while (iter.hasNext) {
        val record = iter.next()
        // 处理并插入数据库
        insertToDatabase(connection, record)
        result += s"已处理: $record"
    }
    
    connection.close()
    result.iterator
}

3.2 partitionBy()操作

语义:按照指定的分区器对RDD重新分区。

flowchart TD
    subgraph "输入RDD"
        P1["分区1<br>(1, 'a')<br>(3, 'c')"]
        P2["分区2<br>(2, 'b')<br>(4, 'd')"]
    end
    
    subgraph "重新分区 (Shuffle)"
        P1 --> RP["partitionBy(partitioner)<br>根据Key重新分配"]
        P2 --> RP
    end
    
    subgraph "输出RDD"
        R1["分区1<br>Hash(Key)=1的记录"]
        R2["分区2<br>Hash(Key)=2的记录"]
    end
    
    RP --> R1
    RP --> R2

关键特性

  • 改变数据分布,可能产生Shuffle
  • 常用分区器:
    • HashPartitioner:基于Key的哈希值分区
    • RangePartitioner:基于Key的范围分区(用于排序)
  • RangePartitioner不保证分区内数据有序

示例代码

val data = sc.parallelize(List(
    (1, "a"), (2, "b"), (3, "c"), (4, "d")
))
 
// 使用HashPartitioner重新分区
val hashPartitioned = data.partitionBy(new HashPartitioner(2))
 
// 使用RangePartitioner重新分区
val rangePartitioned = data.partitionBy(new RangePartitioner(2, data))
 
// 查看分区后的数据分布
rangePartitioned.mapPartitionsWithIndex { (index, iter) =>
    Iterator(s"分区$index: ${iter.toList}")
}.collect()

四、聚合操作

4.1 groupByKey()操作

语义:将相同Key的Value聚合到一起。

flowchart TD
    subgraph "场景分析"
        direction LR
        S1["分区器不同<br>需要Shuffle"]
        S2["分区器相同<br>无需Shuffle"]
        
        S1 --> C1["生成ShuffleDependency"]
        S2 --> C2["生成OneToOneDependency"]
    end
    
    subgraph "Shuffle流程"
        A["输入RDD<br>不同分区有相同Key"]
        B["Shuffle<br>数据重分配"]
        C["输出RDD<br>相同Key聚合"]
        
        A --> B --> C
    end

关键特性

  • 可能产生ShuffleDependency(宽依赖)
  • 当child和parent RDD的partitioner相同且分区数一致时,使用OneToOneDependency
  • 缺点:Shuffle时产生大量中间数据,内存占用大

示例代码

val data = sc.parallelize(List(
    ("apple", 1), ("banana", 2), ("apple", 3),
    ("banana", 4), ("orange", 5)
))
 
// 普通groupByKey(可能产生Shuffle)
val grouped1 = data.groupByKey()
// 结果:Map("apple" -> [1,3], "banana" -> [2,4], "orange" -> [5])
 
// 无Shuffle的groupByKey(当数据已预先分区好时)
val partitionedData = data.partitionBy(new HashPartitioner(3))
val grouped2 = partitionedData.groupByKey()

性能建议:在多数情况下,优先考虑使用reduceByKey(),因为它在Shuffle前会先在map端进行combine,减少数据传输量。

4.2 reduceByKey()操作

语义:对相同Key的Value使用指定函数进行归约。

flowchart LR
    subgraph "第一步:Map端Combine"
        P1["分区1"] --> C1["combine(func)<br>本地聚合"]
        P2["分区2"] --> C2["combine(func)<br>本地聚合"]
    end
    
    subgraph "第二步:Shuffle"
        C1 --> S["Shuffle<br>按Key重分配"]
        C2 --> S
    end
    
    subgraph "第三步:Reduce端聚合"
        S --> R1["reduce(func)<br>最终聚合"]
        S --> R2["reduce(func)<br>最终聚合"]
    end
    
    R1 --> O1["输出分区1"]
    R2 --> O2["输出分区2"]

关键特性

  1. 两阶段聚合
    • Map端combine:先在每个分区内进行本地聚合
    • Reduce端reduce:Shuffle后进行全局聚合
  2. 函数要求:func必须满足交换律结合律(因为Shuffle不保证数据到达顺序)
  3. 性能优势:相比groupByKey(),减少了数据传输量和内存使用

示例代码

val data = sc.parallelize(List(
    ("a", 1), ("b", 2), ("a", 3),
    ("b", 4), ("c", 5)
))
 
// 求和操作
val sumByKey = data.reduceByKey(_ + _)
// 结果:Map("a"->4, "b"->6, "c"->5)
 
// 求最大值
val maxByKey = data.reduceByKey(math.max)
// 结果:Map("a"->3, "b"->4, "c"->5)
 
// 连接字符串(注意:需要满足结合律的实现)
val concatByKey = data.reduceByKey((x, y) => x + "," + y)

适用场景

  • 统计求和、计数、平均值等聚合操作
  • 数据压缩,减少存储空间
  • 流处理中的窗口聚合

4.3 aggregateByKey()操作

语义:更灵活的聚合操作,允许指定初始值和不同的聚合函数。

flowchart TD
    subgraph "聚合流程"
        A["zeroValue<br>初始值"]
        B["seqOp<br>分区内聚合"]
        C["combOp<br>分区间聚合"]
        
        A --> B
        B --> C
    end
    
    subgraph "计算示例"
        D["分区数据: (k,1), (k,2), (k,3)"]
        E["seqOp: zero+1, 结果+2, 结果+3"]
        F["combOp: 合并不同分区的中间结果"]
        
        D --> E --> F
    end

关键特性

  1. 三个参数
    • zeroValue:聚合初始值
    • seqOp:分区内聚合函数 (U, V) => U
    • combOp:分区间合并函数 (U, U) => U
  2. 灵活性
    • seqOp和combOp可以使用不同的逻辑
    • zeroValue和record可以是不同类型
  3. 与reduceByKey关系reduceByKey()aggregateByKey()的特殊情况(seqOp=combOp)

示例代码

val data = sc.parallelize(List(
    ("a", 1), ("a", 2), ("b", 3),
    ("b", 4), ("b", 5)
))
 
// 示例1:计算每个key的平均值(避免溢出)
val avgByKey = data.aggregateByKey((0.0, 0))(
    // seqOp: 分区内累加和计数
    (acc, value) => (acc._1 + value, acc._2 + 1),
    // combOp: 合并不同分区的结果
    (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
  ).mapValues { case (sum, count) => sum / count }
// 结果:Map("a"->1.5, "b"->4.0)
 
// 示例2:字符串连接(使用不同分隔符)
val concatResult = data.aggregateByKey("")(
    // 分区内用"_"连接
    (str, num) => if (str.isEmpty) num.toString else str + "_" + num,
    // 分区间用"@"连接
    (str1, str2) => str1 + "@" + str2
)

实际应用(Spark MLlib中的使用):

// FPGrowth算法中使用aggregateByKey统计频繁项
val transactions = sc.parallelize(Seq(
    Seq("a", "b", "c"),
    Seq("a", "b"),
    Seq("b", "c")
))
 
val itemCounts = transactions.flatMap(items => items.map(_ -> 1))
  .aggregateByKey(0)(_ + _, _ + _)
 
// NaiveBayes中使用aggregateByKey统计类别特征
val labeledData = sc.parallelize(Seq(
    (0, ("feature1", 1.0)),
    (1, ("feature1", 2.0)),
    (0, ("feature2", 1.5))
))
 
val aggregated = labeledData.aggregateByKey(Map[String, Double]())(
    // 分区内合并特征
    (map, (feature, value)) => map + (feature -> (map.getOrElse(feature, 0.0) + value)),
    // 分区间合并结果
    (map1, map2) => map1 ++ map2.map { case (k, v) => k -> (v + map1.getOrElse(k, 0.0)) }
)

4.4 combineByKey()操作

语义:最底层的聚合操作,提供最大的灵活性。

关键特性

  1. 三个函数参数
    • createCombiner: V => C:为每个Key创建初始累加器
    • mergeValue: (C, V) => C:将Value合并到累加器
    • mergeCombiners: (C, C) => C:合并不同分区的累加器
  2. 与aggregateByKey关系
    • aggregateByKey()是基于combineByKey()实现的
    • createCombiner对应zeroValue,但前者是函数,后者是固定值
    • mergeValue对应seqOp
    • mergeCombiners对应combOp

示例代码

val data = sc.parallelize(List(
    ("math", 85), ("math", 90), ("english", 88),
    ("math", 92), ("english", 95)
))
 
// 计算每科最高分和最低分
val scoreStats = data.combineByKey(
    // createCombiner: 创建初始值(分数,分数)
    score => (score, score),
    // mergeValue: 更新最高分和最低分
    (acc: (Int, Int), score: Int) => (math.max(acc._1, score), math.min(acc._2, score)),
    // mergeCombiners: 合并不同分区的结果
    (acc1: (Int, Int), acc2: (Int, Int)) => 
      (math.max(acc1._1, acc2._1), math.min(acc1._2, acc2._2))
)
// 结果:Map("math"->(92,85), "english"->(95,88))

4.5 foldByKey()操作

语义:带有初始值的reduceByKey。

关键特性

  • 介于reduceByKey()aggregateByKey()之间
  • 相比reduceByKey():多了初始值zeroValue
  • 相比aggregateByKey():seqOp和combOp使用相同的func
  • 基于aggregateByKey()实现

示例代码

val data = sc.parallelize(List(
    ("a", 1), ("a", 2), ("b", 3),
    ("b", 4), ("c", 5)
))
 
// 使用foldByKey计算乘积(初始值为1)
val productByKey = data.foldByKey(1)(_ * _)
// 结果:Map("a"->2, "b"->12, "c"->5)
 
// 字符串连接
val concatByKey = data.foldByKey("")((str, num) => str + num.toString)
// 结果:Map("a"->"12", "b"->"34", "c"->"5")

五、连接与集合操作

5.1 cogroup()/groupWith()操作

语义:将多个RDD按Key进行分组聚合。

flowchart TD
    subgraph "输入RDDs"
        RDD1["RDD1<br>分区1,2,3"]
        RDD2["RDD2<br>分区1,2,3"]
    end
    
    subgraph "依赖关系分析"
        RDD1 --> C1{"Partitioner相同<br>且分区数相同?"}
        RDD2 --> C2{"Partitioner相同<br>且分区数相同?"}
        
        C1 -->|"是"| D1["OneToOneDependency"]
        C1 -->|"否"| D2["ShuffleDependency"]
        C2 -->|"是"| D3["OneToOneDependency"]
        C2 -->|"否"| D4["ShuffleDependency"]
    end
    
    subgraph "输出"
        D1 --> O["CoGroupedRDD"]
        D2 --> O
        D3 --> O
        D4 --> O
    end

关键特性

  1. 多RDD聚合:最多支持4个RDD同时cogroup
  2. 智能依赖选择
    • 当parent和child RDD的partitioner相同且分区数一致时,使用OneToOneDependency
    • 否则使用ShuffleDependency
  3. 输出结构
    • 生成CoGroupedRDD(聚合数据)
    • 生成MapPartitionsRDD(转换数据类型为CompactBuffer

示例代码

val rdd1 = sc.parallelize(List((1, "a"), (2, "b"), (3, "c")))
val rdd2 = sc.parallelize(List((1, "x"), (2, "y"), (4, "z")))
 
// 基本cogroup
val cogrouped = rdd1.cogroup(rdd2)
/* 结果:
   1 -> (CompactBuffer("a"), CompactBuffer("x"))
   2 -> (CompactBuffer("b"), CompactBuffer("y"))
   3 -> (CompactBuffer("c"), CompactBuffer())
   4 -> (CompactBuffer(), CompactBuffer("z"))
*/
 
// 无Shuffle的cogroup(当数据已预先分区好时)
val partitioned1 = rdd1.partitionBy(new HashPartitioner(2))
val partitioned2 = rdd2.partitionBy(new HashPartitioner(2))
val efficientCogroup = partitioned1.cogroup(partitioned2)
 
// 多RDD cogroup
val rdd3 = sc.parallelize(List((1, "m"), (3, "n")))
val multiCogroup = rdd1.cogroup(rdd2, rdd3)

5.2 join()操作

语义:基于Key连接两个RDD。

flowchart LR
    subgraph "join实现原理"
        A["输入RDD1"] --> C["cogroup()<br>按Key聚合"]
        B["输入RDD2"] --> C
        
        C --> D["生成<K, (list1, list2)>"]
        D --> E["笛卡尔积计算<br>list1 × list2"]
        E --> F["输出<K, (v,w)>"]
    end
    
    subgraph "优化策略"
        G["Partitioner相同"] --> H["避免Shuffle"]
        I["Partitioner不同"] --> J["需要Shuffle"]
    end

关键特性

  1. 基于cogroup实现
    • 先用cogroup聚合相同Key的数据
    • 然后对两个list做笛卡尔积
  2. 四种执行计划(取决于partitioner):
    // 情况1:都需要Shuffle(默认)
    val result1 = rdd1.join(rdd2)
     
    // 情况2:rdd1已分区,避免Shuffle
    val partitioned1 = rdd1.partitionBy(new HashPartitioner(3))
    val result2 = partitioned1.join(rdd2)
     
    // 情况3:rdd2已分区,避免Shuffle
    val partitioned2 = rdd2.partitionBy(new HashPartitioner(3))
    val result3 = rdd1.join(partitioned2)
     
    // 情况4:都已分区,完全无Shuffle
    val result4 = partitioned1.join(partitioned2)
  3. 输出类型<K, (V, W)>

示例代码

val users = sc.parallelize(List(
    (1, "Alice"), (2, "Bob"), (3, "Charlie")
))
 
val orders = sc.parallelize(List(
    (1, "Order1"), (1, "Order2"), (2, "Order3"),
    (4, "Order4")
))
 
// 内连接
val userOrders = users.join(orders)
/* 结果:
   (1, ("Alice", "Order1"))
   (1, ("Alice", "Order2"))
   (2, ("Bob", "Order3"))
*/
 
// 左外连接(使用cogroup实现)
val leftJoin = users.leftOuterJoin(orders)
/* 结果:
   (1, ("Alice", Some("Order1")))
   (1, ("Alice", Some("Order2")))
   (2, ("Bob", Some("Order3")))
   (3, ("Charlie", None))
*/
 
// 右外连接
val rightJoin = users.rightOuterJoin(orders)
 
// 全外连接
val fullJoin = users.fullOuterJoin(orders)

5.3 cartesian()操作

语义:计算两个RDD的笛卡尔积。

flowchart TD
    subgraph "笛卡尔积分区策略"
        A["RDD1: m个分区"]
        B["RDD2: n个分区"]
        
        A --> C["两两组合<br>m×n个分区对"]
        B --> C
        
        C --> D["每个分区对生成<br>一个输出分区"]
    end
    
    subgraph "数据依赖"
        E["多对多<br>NarrowDependency"]
        F["非ShuffleDependency"]
        
        E --> G["高效并行计算"]
    end

关键特性

  • 生成m×n个分区(m为rdd1分区数,n为rdd2分区数)
  • 数据依赖:多对多的NarrowDependency(非Shuffle)
  • 输出分区数据 = 对应两个输入分区数据的笛卡尔积

示例代码

val rdd1 = sc.parallelize(List(1, 2, 3), 2)
val rdd2 = sc.parallelize(List("a", "b"), 2)
 
val cartesian = rdd1.cartesian(rdd2)
/* 结果:
   (1,"a"), (1,"b"), (2,"a"), (2,"b"), (3,"a"), (3,"b")
*/
 
// 应用场景:网格搜索参数
val params1 = sc.parallelize(List(0.1, 0.01, 0.001))
val params2 = sc.parallelize(List(10, 50, 100))
 
val paramGrid = params1.cartesian(params2)
// 结果:所有参数组合

5.4 sortByKey()操作

语义:按照Key对RDD进行排序。

flowchart LR
    subgraph "排序流程"
        A["输入RDD"] --> B["Shuffle<br>Range划分"]
        B --> C["分区内排序"]
        C --> D["全局有序输出"]
    end
    
    subgraph "Range划分特性"
        E["保证全局有序"]
        F["partition1的Key < partition2的Key"]
        G["分区内Key有序"]
        
        E --> F --> G
    end

关键特性

  1. 使用RangePartitioner:保证全局有序
  2. Shuffle操作:产生ShuffleDependency
  3. 排序范围
    • Key全局有序
    • 相同Key的Value无序
  4. 无map端combine

示例代码

val data = sc.parallelize(List(
    (3, "c"), (1, "a"), (4, "d"), (2, "b")
))
 
// 升序排序
val sortedAsc = data.sortByKey()
// 结果:List((1,"a"), (2,"b"), (3,"c"), (4,"d"))
 
// 降序排序
val sortedDesc = data.sortByKey(false)
// 结果:List((4,"d"), (3,"c"), (2,"b"), (1,"a"))
 
// 自定义排序(Secondary Sort方案)
case class CompositeKey(primary: Int, secondary: String) 
  extends Ordered[CompositeKey] {
  def compare(that: CompositeKey): Int = {
    val primaryCompare = this.primary.compareTo(that.primary)
    if (primaryCompare != 0) primaryCompare
    else this.secondary.compareTo(that.secondary)
  }
}
 
// 方法1:使用Secondary Sort
val secondarySorted = data.map { case (k, v) => 
  (CompositeKey(k, v), null)
}.sortByKey().map(_._1)
 
// 方法2:先groupByKey再排序Value
val groupAndSort = data.groupByKey()
  .mapValues(values => values.toList.sorted)
  .flatMap { case (k, vs) => vs.map(v => (k, v)) }

六、分区调整操作

6.1 coalesce()操作

语义:调整RDD的分区数量。

flowchart TD
    subgraph "四种使用模式"
        A["减少分区<br>无Shuffle"] --> A1["直接合并分区<br>可能数据倾斜"]
        B["增加分区<br>无Shuffle"] --> B1["分区数不变<br>无效操作"]
        C["减少分区<br>带Shuffle"] --> C1["均衡数据分布<br>解决倾斜"]
        D["增加分区<br>带Shuffle"] --> D1["真正增加分区<br>重新分配数据"]
    end
    
    subgraph "Shuffle实现机制"
        E["添加随机Key"] --> F["Hash划分"] --> G["移除随机Key"]
    end

四种使用场景

场景参数效果依赖关系
减少分区(默认)coalesce(N)直接合并相邻分区NarrowDependency
增加分区(默认)coalesce(M) M>原分区数无效,分区数不变NarrowDependency
减少分区(带Shuffle)coalesce(N, shuffle=true)均衡减少分区ShuffleDependency
增加分区(带Shuffle)coalesce(M, shuffle=true) M>原分区数真正增加分区ShuffleDependency

示例代码

val rdd = sc.parallelize(1 to 100, 5)  // 5个分区
 
// 场景1:减少分区到2个(无Shuffle)
val coalesced1 = rdd.coalesce(2)
println(s"分区数: ${coalesced1.getNumPartitions}")  // 输出: 2
 
// 场景2:增加分区到6个(无Shuffle,无效)
val coalesced2 = rdd.coalesce(6)
println(s"分区数: ${coalesced2.getNumPartitions}")  // 输出: 5
 
// 场景3:减少分区到2个(带Shuffle,均衡)
val coalesced3 = rdd.coalesce(2, shuffle = true)
 
// 场景4:增加分区到6个(带Shuffle,有效)
val coalesced4 = rdd.coalesce(6, shuffle = true)
println(s"分区数: ${coalesced4.getNumPartitions}")  // 输出: 6
 
// 查看数据分布
coalesced4.mapPartitionsWithIndex { (index, iter) =>
  Iterator(s"分区$index: ${iter.size}个元素")
}.collect()

6.2 repartition()操作

语义:重新分区,总是使用Shuffle。

关键特性

  • 相当于coalesce(numPartitions, shuffle = true)
  • 可以增加或减少分区
  • 重新均衡数据分布

示例代码

val rdd = sc.parallelize(1 to 100, 3)
 
// 增加分区
val repartitioned1 = rdd.repartition(6)
println(s"分区数: ${repartitioned1.getNumPartitions}")  // 输出: 6
 
// 减少分区
val repartitioned2 = rdd.repartition(2)
println(s"分区数: ${repartitioned2.getNumPartitions}")  // 输出: 2
 
// 应用场景:解决数据倾斜
val skewedData = sc.parallelize(
  List.fill(90)(1) ++ List.fill(5)(2) ++ List.fill(5)(3), 3
)
 
// 重新分区以均衡负载
val balancedData = skewedData.repartition(10)