摘要

特征工程在 Spark MLlib 中并非零散的“数据处理步骤”,而是一套以 Transformer 为核心、可嵌入 Pipeline 的标准化组件体系。它将特征处理分为三个层次:提取(从原始数据构造特征)、变换(对特征进行缩放/编码)、选择(筛选有效特征)。本文从“如何将非结构化文本/时间戳高效转换为数值向量”这一核心问题切入,深度解析 Spark 内置的 TokenizerVectorAssemblerStandardScaler 等组件的实现原理,以及 PCAChiSqSelector 等选择算法的分布式计算策略。通过源码级拆解 StringIndexer 的频次统计、HashingTF 的哈希碰撞处理、以及 ChiSqSelector 的卡方统计量计算,还原一次完整的特征工程 Pipeline 执行路径。结合生产案例,提供高基数列编码陷阱、PCA 内存溢出、Pipeline 持久化版本兼容等典型问题排查方案。最后,在 2026 年特征存储(Feast)与在线特征服务普及的背景下,讨论 Spark 特征工程从“批处理”向“批流一体”演进的方向。


一、核心概念与底层图景

1.1 定义

工程定义

Spark 特征工程是基于 Transformer 抽象的可复用特征处理组件集合,涵盖:

  • 特征提取:从原始文本/时间戳/JSON 构造数值特征
  • 特征变换:缩放、归一化、离散化、编码
  • 特征选择:基于统计或模型筛选重要特征

所有组件均实现 transform() 方法,可串联为 Pipeline,并在训练/推理时保持一致性。

类比:特征工程如同食材处理流水线——原始农产品(数据)需经过清洗、切配、腌制、调味(变换),最终成为标准化半成品(特征向量),供后厨(模型)烹饪。

1.2 架构全景图

graph TD
    classDef extract fill:#e1f5fe,stroke:#01579b,stroke-width:2px;
    classDef transform fill:#fff3e0,stroke:#e65100,stroke-width:2px;
    classDef select fill:#ffe0b2,stroke:#e65100,stroke-width:2px;
    classDef pipeline fill:#d1c4e9,stroke:#4a148c,stroke-width:2px;

    subgraph 原始数据
        Raw[原始 DataFrame<br/>文本/数值/类别/时间戳]:::extract
    end

    subgraph 特征提取层
        Tokenizer[Tokenizer<br/>文本→单词数组]:::extract
        HashingTF[HashingTF<br/>单词→频率向量]:::extract
        CountVectorizer[CountVectorizer<br/>词汇表→向量]:::extract
    end

    subgraph 特征变换层
        StringIndexer[StringIndexer<br/>类别→索引]:::transform
        OneHotEncoder[OneHotEncoder<br/>索引→独热向量]:::transform
        VectorAssembler[VectorAssembler<br/>多列→特征向量]:::transform
        StandardScaler[StandardScaler<br/>标准化]:::transform
        MinMaxScaler[MinMaxScaler<br/>归一化]:::transform
        Bucketizer[Bucketizer<br/>连续值→离散桶]:::transform
    end

    subgraph 特征选择层
        PCA[PCA<br/>主成分分析]:::select
        ChiSqSelector[ChiSqSelector<br/>卡方检验]:::select
        VarianceThresholdSelector[方差阈值<br/>过滤低方差]:::select
    end

    subgraph Pipeline
        Stages[Stage 1 → Stage 2 → ... → Stage N]:::pipeline
    end

    Raw --> Tokenizer --> HashingTF --> VectorAssembler --> StandardScaler --> PCA
    Raw --> StringIndexer --> OneHotEncoder --> VectorAssembler
    Raw --> Bucketizer --> VectorAssembler
    
    VectorAssembler --> ChiSqSelector
    
    Pipeline -->|fit| 训练数据
    Pipeline -->|transform| 测试数据

交互方向解读

  • 提取层:输入多为原始格式,输出为数值或索引数组。
  • 变换层:输入/输出均为数值向量,主要改变分布或表示形式。
  • 选择层:输入特征向量,输出降维后的特征向量。
  • Pipeline 串联:前一 Stage 的输出列作为后一 Stage 的输入列,中间结果可保留用于调试。

二、机制原理深度剖析

2.1 核心子模块拆解

子模块职责设计意图/为何独立
StringIndexer将字符串列映射为 Double 索引(按频次排序)类别编码基础:高频类别索引小,便于后续处理
IndexToString将索引映射回原始字符串反向转换:预测结果可读性
HashingTF使用哈希函数将词语映射到固定长度向量无状态:无需维护词汇表,适合流式场景
CountVectorizer构建词汇表,输出词频向量可解释性:保留词汇表,便于调试
VectorAssembler将多列数值合并为单一特征向量列统一接口:ML 算法仅接受 Vector 类型输入
StandardScaler计算均值/标准差,标准化特征模型收敛必需:避免大数值特征主导梯度
PCA主成分分析降维无监督降维:保留最大方差方向
ChiSqSelector基于卡方检验选择与标签最相关的特征有监督筛选:提升模型可解释性

2.2 核心流程可视化:StringIndexer + OneHotEncoder 执行路径

sequenceDiagram
    participant D as Driver
    participant E1 as Executor 1
    participant E2 as Executor 2
    participant DF as DataFrame

    D->>D: StringIndexer.fit()
    D->>E1: 1. 扫描分区 1 类别
    D->>E2: 2. 扫描分区 2 类别
    
    E1-->>D: 3. 返回 (category, count) 局部
    E2-->>D: 4. 返回 (category, count) 局部
    
    D->>D: 5. 聚合全局频次<br/>排序:高频索引小
    D->>D: 6. 生成映射表<br/>{"catA":0.0, "catB":1.0}
    
    D->>B: 7. 广播映射表
    
    E1->>E1: 8. transform():<br/>原始列→索引列
    E2->>E2: 9. transform()
    
    Note over E1,E2: OneHotEncoder stage
    
    D->>D: 10. OneHotEncoder.fit()<br/>无状态,仅检查
    D->>B: 11. 广播编码信息
    E1->>E1: 12. transform():<br/>索引列→独热向量

2.3 卡方特征选择算法

# 伪代码:ChiSqSelector 分布式计算
def chiSqSelector(df: DataFrame, labelCol: str, numTopFeatures: int):
    # 1. 收集特征列与标签列
    featureCols = df.columns - [labelCol]
    
    # 2. 对各特征并行计算卡方统计量
    chiSqValues = df.rdd.mapPartitions { partition =>
        for row in partition:
            for col in featureCols:
                # 构建观测频数矩阵
                observed[ row[col] ][ row[label] ] += 1
        emit (col -> observed)
    }.reduceByKey { (a, b) => a + b }  # 按特征聚合
    
    # 3. Driver 计算卡方值
    for col, observed in chiSqValues:
        expected = computeExpected(observed)
        chiSq = sum((o - e)^2 / e)
        pValue = chi2cdf(chiSq, df)
        results[col] = (chiSq, pValue)
    
    # 4. 按卡方值排序,取前 numTopFeatures
    topFeatures = results.sortBy(-_._2[0]).take(numTopFeatures)
    
    return topFeatures

关键决策点

  • 卡方适用条件:要求特征与标签均为离散值。连续特征需先分箱。
  • P值校正:多特征选择需考虑多重检验问题(FDR/Bonferroni),MLlib 默认返回原始 P 值,需用户自行校正。
  • 分布式计算:各特征并行统计频数,reduce 阶段合并,Driver 仅计算最终卡方值,无瓶颈。

三、内核/源码级实现

3.1 核心数据结构(Scala)

StringIndexer 模型存储

// 路径:mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala
/**
 * StringIndexer 训练后的模型。
 */
class StringIndexerModel(
    override val uid: String,
    labels: Array[String]  // 按频次排序的类别数组
) extends Model[StringIndexerModel] with StringIndexerBase {
    
    // 类别 → 索引映射表
    private val labelToIndex: Map[String, Double] = 
        labels.zipWithIndex.map { case (label, idx) => 
            (label, idx.toDouble) 
        }.toMap
    
    /**
     * 将字符串列转换为索引列。
     */
    override def transform(dataset: Dataset[_]): DataFrame = {
        val indexer = udf { label: String =>
            labelToIndex.getOrElse(label, labels.length.toDouble) // 未知类别
        }
        dataset.withColumn($(outputCol), indexer(dataset($(inputCol))))
    }
}

HashingTF 实现(无状态)

// 路径:mllib/src/main/scala/org/apache/spark/ml/feature/HashingTF.scala
/**
 * 哈希特征变换。
 */
class HashingTF(override val uid: String) extends Transformer {
    
    private var numFeatures: Int = 1 << 18  // 默认 262144
    
    /**
     * 核心转换:将词语列表哈希为稀疏向量。
     */
    override def transform(dataset: Dataset[_]): DataFrame = {
        val hashUDF = udf { terms: Seq[String] =>
            val indices = ArrayBuffer.empty[Int]
            val values = ArrayBuffer.empty[Double]
            
            terms.foreach { term =>
                // murmur3 哈希,取模 numFeatures
                val index = nonNegativeMod(term.hashCode, numFeatures)
                indices += index
                values += 1.0  // 词频计数
            }
            
            // 构建稀疏向量
            Vectors.sparse(numFeatures, indices.toArray, values.toArray)
        }
        
        dataset.withColumn($(outputCol), hashUDF(dataset($(inputCol))))
    }
}

VectorAssembler 列合并

// 路径:mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala
/**
 * 将多列合并为特征向量。
 */
class VectorAssembler(override val uid: String) extends Transformer {
    
    override def transform(dataset: Dataset[_]): DataFrame = {
        val assembleUDF = udf { row: Row =>
            val indices = ArrayBuffer.empty[Int]
            val values = ArrayBuffer.empty[Double]
            var currentIndex = 0
            
            // 遍历所有输入列
            for (i <- 0 until row.size) {
                row.get(i) match {
                    case v: Double =>
                        indices += currentIndex
                        values += v
                        currentIndex += 1
                    case v: Vector =>
                        // 若为向量,将其元素逐个加入
                        v.foreachActive { (idx, value) =>
                            indices += currentIndex + idx
                            values += value
                        }
                        currentIndex += v.size
                }
            }
            
            Vectors.sparse(currentIndex, indices.toArray, values.toArray)
        }
        
        // 将多列组合为 Row 类型输入
        val assembled = dataset.withColumn(
            $(outputCol), 
            assembleUDF(struct($(inputCols).map(dataset(_)): _*))
        )
        assembled
    }
}

并发模型

  • fit() 阶段:需扫描全数据计算统计量(频次/均值/方差)。此阶段各分区并行计算局部统计,Driver 聚合。
  • transform() 阶段:完全并行,各分区独立处理,无 shuffle。
  • 瓶颈CountVectorizer 的词汇表构建需收集全部词语至 Driver,词汇量大时 Driver 内存压力大。

四、生产落地与 SRE 实战

4.1 场景化案例:高基数列 OneHotEncoder 导致特征爆炸

现象

  • 用户画像表,city 列有 10 万种不同城市。
  • 使用 StringIndexer + OneHotEncoder 后,特征维度从 50 飙升至 10 万+。
  • 模型训练 OOM,且 AUC 下降。

排查链路

  1. 检查编码后特征维度df.select('city_vec').first() 显示稀疏向量长度 = 10 万。
  2. 检查模型系数 → 逻辑回归对 10 万维特征需学习 10 万系数,过拟合。
  3. 根因:高基数类别不宜独热编码。

解决方案

# 方案A:使用频次编码
city_freq = df.groupBy('city').count().collect()
freq_dict = {row.city: row.count for row in city_freq}
freq_udf = udf(lambda x: float(freq_dict.get(x, 0)))
df = df.withColumn('city_freq', freq_udf('city'))
 
# 方案B:哈希技巧(HashingTF + 降维)
from pyspark.ml.feature import HashingTF
hashingTF = HashingTF(inputCol='city_words', outputCol='city_hash', numFeatures=1000)
 
# 方案C:目标编码(Target Encoding) - 需注意过拟合
# 需交叉验证或添加平滑

验证

特征维度降至 1000,AUC 提升 5%,训练时间减少 70%。

4.2 参数调优矩阵

参数名组件推荐值内核解释
numFeaturesHashingTF2^18 = 262144特征维度,调大减少碰撞,增加内存
minDFCountVectorizer2词语最小文档频次,过滤罕见词
maxCategoriesStringIndexer20自动判断是否为分类列时,类别数阈值
withMean / withStdStandardScalertrue / true是否中心化/标准化,对 SVM/回归必要
kPCA保留方差 95%降维维度,可用 explainedVariance 选择
numTopFeaturesChiSqSelector20-200保留特征数,依模型复杂度定
handleInvalidStringIndexerskip / keep测试集出现新类别时的处理策略

4.3 监控与诊断

关键指标(特征工程阶段)

指标名健康区间瓶颈阈值含义
StringIndexer 类别数< 1000> 10000高基数告警
特征向量平均非零值< 100> 1000稀疏度下降,可能编码方式不当
HashingTF 碰撞率< 1%> 5%numFeatures 太小,需调大
PCA 解释方差比> 0.95< 0.8降维后信息损失大,需增 k

诊断命令

# 查看编码后稀疏度
df.select('features').rdd.map(lambda x: x[0].numNonzeros()).stats()
 
# 检查 StringIndexer 类别分布
df.groupBy('indexed_city').count().orderBy('count', ascending=False).show(10)
 
# PCA 解释方差
pcaModel.explainedVariance.sum()

4.4 故障排查决策树

mindmap
  root((特征工程问题))
    维度爆炸
      OneHotEncoder 高基数
        对策: 改用频次/哈希编码
      VectorAssembler 重复列
        检查: 是否多次加入相同列
    内存溢出
      CountVectorizer Driver OOM
        指标: 词汇表过大
        对策: 用 HashingTF / 增大 minDF
      PCA 计算 OOM
        指标: 特征维度过高
        对策: 先降采样 / 用 RandomProjection
    稀疏度异常
      全为 0 的特征向量
        检查: VectorAssembler 输入列
        对策: 确保有数值特征输入
      非零值过多
        对策: 检查编码方式是否合适
    新类别问题
      StringIndexer 遇到未知类别
        设置: handleInvalid = "keep"
        后果: 未知类别统一编码

五、技术演进与未来视角(2026+)

5.1 历史设计约束与改进

版本变化动因/解决的问题
Spark 1.2 (2014)特征工程组件初版StringIndexerVectorAssembler 等基础组件
Spark 1.6 (2016)Pipeline API 成熟支持特征工程与模型串联
Spark 2.0 (2016)基于 DataFrame 重构统一 API,利用 Tungsten 优化
Spark 2.3 (2018)Imputer 引入缺失值填充组件
Spark 3.0 (2020)FeatureHasher 引入在线学习友好的哈希特征
Spark 3.4 (2023)RFormula 增强R 风格公式解析,支持交互项

5.2 2026 年仍存在的“遗留设计”

痛点1:特征工程与模型训练分离

Pipeline 虽能串联,但特征工程仍独立于模型优化,无法在调参时自动调整特征处理(如分箱数)。
现状:需手动搜索或借助第三方库(如 Featuretools)。

痛点2:在线推理特征不一致

训练时用 StringIndexer 生成映射表,在线服务需重新实现相同逻辑。
社区方案PipelineModel.save 保存所有元数据,Java/Python 端均可加载,但引入 MLlib 依赖。

痛点3:时间序列特征缺失

Spark MLlib 无原生滞后特征、滑动窗口生成器,需用户用 SQL/窗口函数手动实现。

5.3 未来趋势

  • 特征存储(Feature Store)集成
    Spark 特征工程组件可直接读写 Feast/Hopsworks,实现训练-推理特征一致性
  • 自动特征工程
    基于遗传算法/深度学习的自动特征生成(如 OpenFE)与 Spark 集成,在分布式环境下搜索有效特征。
  • 在线特征计算
    Spark Structured Streaming 与特征工程组件结合,实现实时特征更新(如实时计算用户过去 5 分钟点击率)。

十年后的 Spark 特征工程

它将不再是一组孤立的 Transformer,而是特征生命周期管理平台的一部分。从特征定义、计算、存储到监控,Spark 承担“批处理计算引擎”角色。但它的设计哲学——可组合、可复用、Pipeline 化——将影响所有后续特征工程工具。


参考文献