06 窗口聚合:滚动窗口、滑动窗口与会话窗口的实现机制
摘要
窗口聚合(Window Aggregation)是流处理中最核心的计算模式:将无界的事件流按时间维度切分为有界的窗口,在每个窗口内做聚合统计。Structured Streaming 支持三种窗口类型:滚动窗口(Tumbling Window,窗口不重叠、无间隙,每个事件只属于一个窗口)、滑动窗口(Sliding Window,窗口可重叠,每个事件可属于多个窗口)、会话窗口(Session Window,窗口大小由事件间隔动态决定,无数据时窗口自然结束)。三种窗口类型看似只是 API 的差别,但背后的 State Store 存储结构、状态更新逻辑、窗口合并(Merge)算法都完全不同——特别是会话窗口,需要在运行时动态合并相邻窗口,实现复杂度远超前两种。本文从物理实现角度深度讲解三种窗口的 State 存储模型、与 Watermark 交互的状态清理时机,以及在高基数(High Cardinality)场景下窗口状态的内存压力控制。
第 1 章 窗口的本质:按时间分组的特殊 GROUP BY
1.1 窗口聚合 vs 普通 GROUP BY
普通 GROUP BY 按某个值分组(如 GROUP BY category);窗口聚合按某个时间范围分组(如”每 5 分钟为一组”)。在 Structured Streaming 的实现中,窗口聚合被转化为一个特殊的 GROUP BY:
# 窗口聚合的 SQL 写法
spark.sql("""
SELECT window, category, SUM(amount) as total
FROM events
GROUP BY window(event_time, '5 minutes'), category
""")
# DataFrame API
from pyspark.sql.functions import window, sum as spark_sum
result = events \
.withWatermark("event_time", "10 minutes") \
.groupBy(window("event_time", "5 minutes"), "category") \
.agg(spark_sum("amount").alias("total"))window("event_time", "5 minutes") 本质上是一个列变换函数:对每一行,根据其 event_time 计算出该事件所属的窗口区间([start, end) 的 struct 类型),然后用这个窗口区间作为 GROUP BY 的键之一。
这样,窗口聚合就被统一为 GROUP BY (window_struct, category) 的普通聚合,复用了 Structured Streaming 现有的有状态聚合框架(State Store + HashAggregate)。
第 2 章 滚动窗口:最简单的窗口类型
2.1 滚动窗口的定义
滚动窗口(Tumbling Window):窗口大小固定、相邻窗口不重叠、覆盖所有时间(无间隙)。每个事件恰好属于一个窗口。
# 5 分钟滚动窗口
window("event_time", "5 minutes")
# 生成的窗口:[10:00, 10:05), [10:05, 10:10), [10:10, 10:15), ...对于事件时间 10:03:27,它属于窗口 [10:00, 10:05)(因为 10:00 <= 10:03:27 < 10:05)。
2.2 滚动窗口的 State 存储
滚动窗口的 State Key = (window_start, window_end, group_by_keys...):
State Store 中存储的 Key-Value:
Key: (window=[10:00,10:05), category=food) → Value: {sum=350, count=3}
Key: (window=[10:00,10:05), category=drink) → Value: {sum=120, count=2}
Key: (window=[10:05,10:10), category=food) → Value: {sum=200, count=1}
每收到一条新事件,按其 event_time 计算所属窗口,然后更新对应 Key 的 State:
事件:{event_time=10:03:45, category=food, amount=80}
计算所属窗口:[10:00, 10:05)
State Key:(window=[10:00,10:05), category=food)
State 更新:{sum: 350+80=430, count: 3+1=4}
2.3 滚动窗口的状态清理
配合 Watermark,当 Watermark > window_end 时,该窗口的状态被清理:
当前 Watermark = 10:06:20
窗口 [10:00, 10:05) 的 window_end = 10:05:00
判断:10:05:00 < 10:06:20 → 窗口关闭,状态清理,结果输出(Append 模式)
窗口 [10:05, 10:10) 的 window_end = 10:10:00
判断:10:10:00 > 10:06:20 → 窗口未关闭,继续保留状态
第 3 章 滑动窗口:事件属于多个窗口
3.1 滑动窗口的定义与参数
滑动窗口(Sliding Window):窗口大小固定,但相邻窗口可以重叠。由两个参数定义:
- 窗口大小(Window Duration):每个窗口的时间跨度
- 滑动步长(Slide Duration):相邻窗口起始时间的间距
当 slide < window 时,窗口重叠;当 slide = window 时,退化为滚动窗口;slide > window 时,窗口之间有间隙(某些事件不属于任何窗口)。
# 10 分钟窗口,每 5 分钟滑动一次
window("event_time", "10 minutes", "5 minutes")
# 生成的窗口:
# [09:55, 10:05), [10:00, 10:10), [10:05, 10:15), [10:10, 10:20), ...
# 对于事件时间 10:03:27:
# 属于 [09:55, 10:05)(因为 09:55 <= 10:03:27 < 10:05)
# 也属于 [10:00, 10:10)(因为 10:00 <= 10:03:27 < 10:10)
# → 该事件属于 2 个窗口(window/slide = 10/5 = 2 个)每个事件属于几个窗口:ceil(window_duration / slide_duration) 个窗口。
3.2 滑动窗口的 State 开销
滑动窗口的核心代价:每条事件需要更新多个窗口的 State,State Store 的写入量是滚动窗口的 window/slide 倍。
window=1小时,slide=1分钟 → 每条事件属于 60 个窗口
→ 每条事件需要更新 60 个 State Key
→ State Store 中同时维护 60 倍于滚动窗口的状态
→ State Store 内存压力 60 倍于同等场景的滚动窗口
生产中,window/slide 比值越大,State 开销越高。建议:
- 不要设置过小的
slide(如 1 秒滑动)配合较大的窗口(如 1 小时),除非内存充足 - 如果
window/slide = 1(滚动),直接用滚动窗口 API,性能更优
第 4 章 会话窗口:动态大小的智能窗口
4.1 会话窗口是什么,为什么出现
会话窗口(Session Window)(Spark 3.2 引入):窗口大小不固定,由事件间隔动态决定——如果两个事件的时间间隔 < 指定的 gap(会话超时),它们属于同一个会话窗口;如果间隔 >= gap,则分属不同的会话窗口。
为什么需要会话窗口:
滚动/滑动窗口都是基于绝对时间边界切分的,与用户行为模式无关。但很多业务场景的”窗口”是由用户行为本身定义的:
- 电商会话:用户连续浏览商品的行为构成一个会话,30 分钟无操作则会话结束
- 日志分析:同一用户的一次访问行为(连续的请求日志,超过 10 分钟无新日志则访问结束)
- 传感器异常检测:连续异常读数构成一次异常事件,恢复正常超过 5 分钟则异常事件结束
这些场景中,“窗口”的边界是由数据本身决定的,而不是固定的时间切片。会话窗口正是为此设计的。
# 会话窗口:30 分钟无操作则会话结束
from pyspark.sql.functions import session_window
result = user_events \
.withWatermark("event_time", "30 minutes") \
.groupBy(session_window("event_time", "30 minutes"), "userId") \
.agg(count("*").alias("page_views"), sum("duration").alias("total_duration"))4.2 会话窗口的动态合并机制
会话窗口最复杂的地方在于:窗口边界在运行时可能改变。
userId=user_001 的事件序列:
10:00:00 → 创建会话 [10:00:00, 10:30:00)(初始窗口 = [event_time, event_time + gap))
10:15:00 → 此事件在已有会话窗口内(10:15 < 10:30),更新会话:
[10:00:00, 10:45:00)(窗口结束延伸到 10:15 + 30分 = 10:45)
10:40:00 → 此事件在已有会话窗口内(10:40 < 10:45),更新会话:
[10:00:00, 11:10:00)(窗口结束延伸到 10:40 + 30分 = 11:10)
12:00:00 → 此事件在已有会话窗口外(12:00 > 11:10),创建新会话:
[12:00:00, 12:30:00)
最终:user_001 有两个会话:
会话 1:[10:00:00, 11:10:00),包含 3 个事件
会话 2:[12:00:00, ...],还在进行中
更复杂的情况:两个原本独立的窗口可能因为中间事件的到来而合并。
当前 State:
user_002 会话 A:[10:00, 10:30)(event at 10:00)
user_002 会话 B:[11:00, 11:30)(event at 11:00)
新到事件:event_time=10:45,userId=user_002
10:45 在会话 A 的结束时间 10:30 之后 → 会话 A 需要延伸到 10:45+30=11:15
延伸后会话 A 变为 [10:00, 11:15)
但 11:15 > 11:00(会话 B 的开始),会话 A 和 B 重叠了!
→ 必须将会话 A 和 B 合并为 [10:00, 11:30)
这种动态合并(Merge)是会话窗口区别于滚动/滑动窗口的核心复杂性。Structured Streaming 的会话窗口通过 MergingSessionsStateManager 实现这一合并逻辑,State Store 中对会话窗口的操作包含了 merge 操作(而滚动/滑动窗口只有 update)。
4.3 会话窗口的 State 存储与性能
会话窗口的 State Key = (session_start, session_end, group_by_keys...),但由于窗口边界动态变化,每次合并都需要:
- 删除旧的两个窗口的 State 条目
- 创建新的合并后的窗口 State 条目
这比滚动/滑动窗口的”只更新”操作代价更高(需要额外的 delete + insert)。会话窗口的并发度和 State 访问模式也更复杂,通常比同等数据量的滚动窗口消耗 2-5 倍的 State Store I/O。
第 5 章 高基数窗口的内存压力控制
5.1 高基数场景的 State 膨胀
“高基数”指 GROUP BY 的 Key 种类极多(如 GROUP BY userId, window,用户数亿)。此时 State Store 中同时维护数亿个 Key 的窗口状态,内存压力极大。
State 大小的计算:
State 大小 ≈ 并发窗口数 × 平均 Key 数 × 每 Key State 大小
= (窗口持续时间 / 滑动步长) × 活跃用户数 × 状态字节数
例:5分钟滑动,步长1分钟,1000万活跃用户,每 Key 状态约 100 字节
并发窗口数 = 5/1 = 5
State 大小 = 5 × 1000万 × 100B = 5GB
5.2 控制高基数 State 的策略
策略一:使用 RocksDB State Store
HDFS-backed State Store 将状态存在内存中(超过内存时序列化到磁盘),高基数场景容易 OOM;RocksDB State Store 将状态存储在本地 SSD 中,内存只缓存热点数据,可以处理远超内存大小的状态:
spark.conf.set(
"spark.sql.streaming.stateStore.providerClass",
"org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider"
)策略二:增大 Watermark 延迟阈值 = 增加状态保留时间,减小 = 降低内存
Watermark 延迟阈值决定了过期状态被清理的速度。适当减小延迟阈值(接受更多迟到数据丢失),可以更快地清理过期 State,降低内存压力。
策略三:预聚合降低基数
在窗口聚合之前,先做一层降维聚合(如按 region 预聚合),减少传入窗口聚合的行数和 Key 种类:
# 不推荐:直接按 userId 做高基数窗口聚合
events.groupBy(window("event_time", "5 min"), "userId").agg(sum("amount"))
# 推荐:先按 region 预聚合,降低基数
events \
.groupBy(window("event_time", "1 min"), "region") \
.agg(sum("amount").alias("region_total")) \
.groupBy(window("window.start", "5 min"), "region") \
.agg(sum("region_total"))小结
三种窗口类型覆盖了不同的业务语义需求:
- 滚动窗口:最简单,每事件属于一个窗口,State 开销最低;适合”每 N 分钟的统计”类需求
- 滑动窗口:事件属于多个窗口,State 开销是滚动的
window/slide倍;适合”过去 N 分钟的滑动统计”;window/slide比值不宜过大 - 会话窗口:窗口大小动态,运行时可能合并;实现最复杂,State 开销最高;适合”用户会话”类需求(窗口由行为间隔定义)
所有窗口类型都需要配合 Watermark 实现 State 的自动清理和 Append 模式输出;高基数场景优先使用 RocksDB State Store 避免内存 OOM。
第 07 篇深入 flatMapGroupsWithState:自定义状态机的终极工具,讲解 GroupState 的读写接口、超时机制(ProcessingTime vs EventTime)的底层原理,以及如何用它实现复杂的业务逻辑(欺诈检测、行为序列分析)。
思考题
- 滑动窗口的一个事件会同时属于多个窗口(窗口大小 / 滑动步长个窗口)。Spark 在实现上通过将一条记录”展开”成多条记录(每条对应一个窗口)来处理这个问题。这个展开操作在 State Store 中意味着状态存储量是滚动窗口的 N 倍。当滑动比(窗口大小 / 步长)很大时,这种实现有什么性能风险?有没有更省内存的实现思路?
- 会话窗口(Session Window)的边界不是固定的,而是由事件的间隔时间动态决定的。这使得会话窗口在分布式实现上比滚动 / 滑动窗口复杂得多——同一个 Key 的不同 Session 可能在不同批次中被合并或拆分。Spark 的会话窗口是如何处理跨批次的 Session 合并问题的?
- 窗口聚合结合 Watermark 后,State Store 可以安全清理已关闭窗口的状态。但如果忘记设置 Watermark,State Store 会无限增长。在生产中,如何通过监控指标提前发现”State 无限增长”的风险,而不是等到 OOM 才发现?
参考资料
- Apache Spark 官方文档:Structured Streaming Window Operations
- Apache Spark 源码:
org.apache.spark.sql.functions.window - Apache Spark 源码:
org.apache.spark.sql.execution.streaming.state.MergingSessionsStateManager - Session Window Support in Structured Streaming(SPARK-10816)