摘要:
窗口(Window)是流处理中将无界数据切割为有限数据集进行聚合计算的核心机制。Flink 提供了四种窗口类型(滚动、滑动、会话、全局),三类窗口函数(ReduceFunction、AggregateFunction、ProcessWindowFunction),以及可插拔的触发器(Trigger)和驱逐器(Evictor)机制。本文不只列举 API,而是深入每种窗口的设计语义:滑动窗口为什么会导致数据被”重复计算”?会话窗口如何在没有固定边界的情况下动态划分窗口?AggregateFunction 和 ProcessWindowFunction 各自适合什么场景,如何组合使用以兼顾性能与灵活性?读完本文,你将能够根据具体的业务需求选择正确的窗口类型和函数,并理解窗口状态管理对内存的影响。
第 1 章 窗口的本质:为无界流划定计算边界
1.1 为什么流处理需要窗口
流数据是无终点的——Kafka 的订单 Topic 会持续不断地收到新消息,用户行为日志流永远不会结束。但业务上的计算往往需要一个”时间范围”:过去 1 分钟的订单总金额、过去 1 小时的活跃用户数、某次用户会话期间的总点击次数。
如果没有窗口机制,唯一能做的是”全局聚合”——对从作业启动到当前时刻的所有数据做累加。这有两个问题:
- 状态无限增长:累加值需要存储所有历史数据,内存最终会耗尽
- 业务无意义:业务关心的是”最近 N 分钟”而不是”历史所有”
窗口将无界流切割为有限的时间(或数量)片段,让聚合计算变得有意义且内存可控。
1.2 窗口的四个核心属性
理解任何一种窗口,都需要回答四个问题:
- 窗口如何划分(Window Assigner):一条记录属于哪些窗口?一条记录可以属于多个窗口(滑动窗口)还是只属于一个(滚动窗口)?
- 窗口何时触发计算(Window Trigger):什么条件下触发窗口函数执行?默认是 Watermark 超过窗口结束时间
- 窗口如何计算(Window Function):对窗口内的所有数据执行什么操作?
- 窗口数据何时清理(Evictor + allowedLateness):窗口状态何时从内存中清除?
第 2 章 四种窗口类型详解
2.1 滚动窗口(Tumbling Window):互不重叠的时间分片
是什么:滚动窗口将时间轴划分为一系列大小固定、互不重叠的窗口。每条记录恰好属于一个窗口。
时间轴: 0 10 20 30 40 50 ...(秒)
窗口: [0,10) [10,20) [20,30) [30,40) [40,50) ...
记录分配:t=5 → [0,10);t=15 → [10,20);t=25 → [20,30)
代码示例(事件时间,每 5 分钟一个窗口):
stream
.keyBy(order -> order.getUserId())
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.sum("amount");
// 处理时间版本
stream
.keyBy(order -> order.getUserId())
.window(TumblingProcessingTimeWindows.of(Time.minutes(5)))
.sum("amount");窗口偏移(Window Offset):默认情况下,窗口以 Unix 时间戳的整数倍为边界(如 5 分钟窗口的边界是 00:00、00:05、00:10…)。如果需要对齐到特定时区(如东八区的整点),需要配置 offset:
// 东八区(UTC+8):偏移 -8 小时,使窗口边界对齐到北京时间整点
.window(TumblingEventTimeWindows.of(Time.hours(1), Time.hours(-8)))
// 这样窗口边界是 UTC 的 16:00、17:00(= 北京时间 00:00、01:00)适用场景:所有需要”每 N 分钟/小时统计一次”的场景——每分钟的 PV/UV、每小时的销售额、每天的新增用户数。
内存特性:每个 Key 在任意时刻最多只有 1 个活跃窗口(不重叠),内存占用最小。
2.2 滑动窗口(Sliding Window):重叠的时间窗口
是什么:滑动窗口有两个参数:窗口大小(size) 和 滑动步长(slide)。窗口每隔 slide 时间滑动一次,相邻窗口有 size - slide 的重叠区间。
size=10s, slide=5s:
时间轴: 0 5 10 15 20 25 ...(秒)
窗口1: [0,10)
窗口2: [5,15)
窗口3: [10,20)
窗口4: [15,25)
记录 t=7 属于:[0,10) 和 [5,15) 两个窗口(被计算了两次!)
记录 t=12 属于:[5,15) 和 [10,20) 两个窗口
每条记录属于 ceil(size/slide) 个窗口。例如 size=1hour、slide=5min,每条记录被分配到 12 个窗口。
stream
.keyBy(order -> order.getUserId())
// 窗口大小 10 分钟,每 5 分钟滑动一次
.window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(5)))
.aggregate(new AverageAmountAggregator());为什么要有滑动窗口(不这样会怎样):
考虑”过去 10 分钟的平均响应时间”这个监控指标。如果用滚动窗口(10 分钟),指标每 10 分钟才更新一次,实时性差。如果用滑动窗口(size=10min, slide=1min),指标每 1 分钟更新一次,但每次都反映过去完整的 10 分钟数据——这就是滑动窗口的价值:高频输出、长范围覆盖。
内存开销警告:每条记录被多个窗口持有,在窗口状态未触发时,同一条数据被存储了 ceil(size/slide) 份(或者说,有 ceil(size/slide) 个窗口同时持有对这条数据的引用)。slide 越小,内存开销越大。
生产避坑:滑动窗口的状态膨胀
size=1hour, slide=1min意味着每条数据被分配到 60 个窗口,同时有 60 个活跃窗口在维护状态。如果每个窗口的数据量大,这 60 份状态会消耗大量内存,容易触发 OOM 或导致 Checkpoint 体积过大。替代方案:使用滚动窗口(1 分钟)+ 多窗口结果流 Join,或者用
ProcessFunction+ 环形 Buffer 自定义实现,按需滑动计算。
2.3 会话窗口(Session Window):基于活跃度的动态窗口
是什么:会话窗口不基于固定时间,而是根据数据活跃程度动态划分窗口。如果两条记录之间的时间间隔超过 sessionGap(会话超时时间),它们就被分配到两个不同的窗口。
sessionGap = 5s:
事件流: t=1 t=3 t=4 t=10 t=11 t=20 t=22 t=23
间隔: 2s 1s 6s! 1s 9s! 2s 1s
窗口划分:
[t=1, t=4+5) = [1, 9) ← t=10 与 t=4 间隔 6s > 5s,新窗口
[t=10, t=11+5) = [10, 16) ← t=20 与 t=11 间隔 9s > 5s,新窗口
[t=20, t=23+5) = [20, 28)
stream
.keyBy(event -> event.getUserId())
// 静态会话间隔
.window(EventTimeSessionWindows.withGap(Time.minutes(30)))
.aggregate(new SessionSummaryAggregator());
// 动态会话间隔(每个 Key 可以有不同的超时时间)
.window(EventTimeSessionWindows.withDynamicGap(
(element) -> element.getSessionTimeout() // 从记录中提取超时时间
))会话窗口的实现难题:会话窗口的边界不是预先知道的(不像滚动窗口知道每个窗口的 [start, end)),而是随着新数据到来动态延伸。这给实现带来了挑战:
当收到新记录 t=23 时,它可能被分配到已有的窗口(如果前一个记录是 t=20,间隔 3s < 5s),也可能开启新窗口(如果前一个记录是 t=12,间隔 11s > 5s)。更复杂的是:如果有两个相邻的窗口 [1, 6) 和 [5, 10),它们应该被合并为 [1, 10)。
Flink 通过 MergingWindowAssigner 机制处理窗口合并:每次新记录到来时检查是否需要与已有窗口合并,触发 mergeWindows() 方法。窗口状态也需要随之合并。
适用场景:用户行为分析(一次会话内的所有点击/浏览)、网络安全(同一次攻击的所有请求)、客服系统(一次对话的所有消息)。
内存特性:每个活跃用户有一个(或多个尚未合并的)活跃窗口,窗口大小不固定,取决于用户的活跃时间。长时间活跃的用户窗口可能积累大量数据。
2.4 全局窗口(Global Window):手动控制的无限窗口
是什么:全局窗口将所有数据分配到一个不会自动触发的窗口,需要配合**自定义 Trigger(触发器)**使用。
stream
.keyBy(event -> event.getUserId())
.window(GlobalWindows.create())
.trigger(new MyCustomTrigger()) // 必须配合自定义触发器,否则窗口永远不触发
.aggregate(new MyAggregator());适用场景:当四种标准窗口都不满足需求时(如”每收到 100 条记录触发一次计算”、“当某个特定事件到来时触发窗口”),使用全局窗口 + 自定义 Trigger 实现灵活的触发逻辑。
第 3 章 窗口函数:对窗口数据执行什么操作
窗口函数(Window Function)决定了对窗口内的数据执行什么计算。Flink 提供三类函数,性能和灵活性各有侧重。
3.1 ReduceFunction:增量聚合,内存最优
是什么:ReduceFunction 是增量聚合函数——每来一条新记录,立刻与当前的聚合结果做一次 reduce 操作,只保留一个聚合值,不缓存原始记录。
内存模型:窗口状态中只存储一个”中间聚合值”,与窗口内的数据量无关。无论窗口内有 1 条还是 100 万条记录,状态大小始终是一个固定大小的对象。
// 需求:每分钟统计每个用户的最大订单金额
stream
.keyBy(order -> order.getUserId())
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.reduce(
(order1, order2) -> order1.getAmount() > order2.getAmount() ? order1 : order2
// ReduceFunction:输入输出类型必须相同
);约束:输入类型和输出类型必须相同(ReduceFunction<T> 的输入和输出都是 T)。如果需要不同的输入/输出类型(如输入 OrderEvent,输出 OrderSummary),需要用 AggregateFunction。
3.2 AggregateFunction:增量聚合,支持类型转换
是什么:AggregateFunction<IN, ACC, OUT> 是增量聚合函数的增强版,支持输入类型(IN)、累加器类型(ACC)和输出类型(OUT)三者不同。
三个方法:
createAccumulator():创建初始累加器add(IN value, ACC accumulator):将新记录合并到累加器getResult(ACC accumulator):从累加器生成最终输出
// 需求:计算每 1 分钟每个用户的平均订单金额
// 输入:OrderEvent;中间状态:(totalAmount: Double, count: Long);输出:Double
public class AverageAmountAggregator
implements AggregateFunction<OrderEvent, Tuple2<Double, Long>, Double> {
@Override
public Tuple2<Double, Long> createAccumulator() {
return Tuple2.of(0.0, 0L); // 初始:金额=0,数量=0
}
@Override
public Tuple2<Double, Long> add(OrderEvent order, Tuple2<Double, Long> acc) {
return Tuple2.of(acc.f0 + order.getAmount(), acc.f1 + 1);
}
@Override
public Double getResult(Tuple2<Double, Long> acc) {
return acc.f1 == 0 ? 0.0 : acc.f0 / acc.f1; // 平均值
}
@Override
public Tuple2<Double, Long> merge(Tuple2<Double, Long> a, Tuple2<Double, Long> b) {
// 会话窗口合并时调用(非会话窗口可以留空实现)
return Tuple2.of(a.f0 + b.f0, a.f1 + b.f1);
}
}
// 使用
stream
.keyBy(order -> order.getUserId())
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.aggregate(new AverageAmountAggregator());内存模型:与 ReduceFunction 相同,窗口状态中只存储一个 ACC 类型的累加器对象,内存占用极低。
AggregateFunction vs ReduceFunction:
| 特性 | ReduceFunction | AggregateFunction |
|---|---|---|
| 输入/输出类型 | 必须相同(T → T) | 可以不同(IN → ACC → OUT) |
| 灵活性 | 较低 | 较高 |
| 实现复杂度 | 低(只需一个方法) | 稍高(三个方法) |
| 性能 | 相同(都是增量聚合) | 相同 |
| 适用场景 | 简单聚合(求和、取最大) | 复杂聚合(平均值、百分位数) |
3.3 ProcessWindowFunction:全量计算,最灵活
是什么:ProcessWindowFunction<IN, OUT, KEY, W> 是全量窗口函数——窗口触发时,所有窗口内的原始记录都会被缓存并一次性传递给函数,函数可以遍历所有记录,访问窗口元数据(WindowContext)。
// 需求:统计每分钟每个用户的 Top-3 订单,并在输出中附带窗口时间范围
stream
.keyBy(order -> order.getUserId())
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.process(new ProcessWindowFunction<OrderEvent, TopOrderResult, String, TimeWindow>() {
@Override
public void process(
String userId,
Context context, // 窗口上下文:可访问窗口时间、状态等
Iterable<OrderEvent> orders, // 窗口内所有原始记录
Collector<TopOrderResult> out) {
// 获取窗口时间范围
TimeWindow window = context.window();
long windowStart = window.getStart();
long windowEnd = window.getEnd();
// 遍历所有记录,找 Top-3
List<OrderEvent> sortedOrders = StreamSupport
.stream(orders.spliterator(), false)
.sorted(Comparator.comparingDouble(OrderEvent::getAmount).reversed())
.limit(3)
.collect(Collectors.toList());
out.collect(new TopOrderResult(userId, windowStart, windowEnd, sortedOrders));
}
});内存代价(重要):ProcessWindowFunction 需要将窗口内的所有原始记录缓存在状态中,直到窗口触发。如果窗口大小为 1 小时,1 小时内的所有数据都需要在内存/状态后端中暂存。对于高吞吐场景,这可能消耗大量内存。
何时必须用 ProcessWindowFunction:
- 需要访问窗口的时间信息(
window.getStart()、window.getEnd()) - 需要遍历窗口内所有原始记录(如排序、Top-K)
- 需要访问窗口级别的状态(
context.windowState())或全局状态(context.globalState()) - 需要向侧输出流写数据(
context.output())
3.4 AggregateFunction + ProcessWindowFunction 组合:兼顾性能与灵活性
这是生产中最推荐的模式:用 AggregateFunction 做增量聚合(减少内存占用),用 ProcessWindowFunction 在窗口触发时补充窗口元数据(如时间范围)。
// 需求:统计每分钟每个用户的订单总金额,输出中需要包含窗口时间范围
stream
.keyBy(order -> order.getUserId())
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.aggregate(
// 第一个参数:增量聚合(只存储一个 Double,不缓存原始记录)
new AggregateFunction<OrderEvent, Double, Double>() {
public Double createAccumulator() { return 0.0; }
public Double add(OrderEvent o, Double acc) { return acc + o.getAmount(); }
public Double getResult(Double acc) { return acc; }
public Double merge(Double a, Double b) { return a + b; }
},
// 第二个参数:ProcessWindowFunction,接收聚合结果(仅 1 个 Double)+ 窗口元数据
new ProcessWindowFunction<Double, OrderWindowSummary, String, TimeWindow>() {
@Override
public void process(String userId, Context ctx,
Iterable<Double> aggregatedAmounts,
Collector<OrderWindowSummary> out) {
// aggregatedAmounts 只有一个元素(AggregateFunction 的结果)
double totalAmount = aggregatedAmounts.iterator().next();
TimeWindow w = ctx.window();
out.collect(new OrderWindowSummary(userId, w.getStart(), w.getEnd(), totalAmount));
}
}
);这种组合的优势:
- 状态中只存储
AggregateFunction的累加器(1 个 Double),不缓存原始记录 - 窗口触发时,
ProcessWindowFunction接收到的Iterable中只有 1 个元素(聚合结果),迭代开销极低 - 既能享受增量聚合的内存优势,又能在最终输出时访问窗口元数据
第 4 章 Trigger:自定义窗口触发逻辑
4.1 默认 Trigger 的行为
每种窗口类型都有对应的默认 Trigger:
- 事件时间窗口(
EventTimeWindows):EventTimeTrigger,当 Watermark 超过窗口结束时间时触发 - 处理时间窗口(
ProcessingTimeWindows):ProcessingTimeTrigger,当系统时钟超过窗口结束时间时触发 - 会话窗口:
EventTimeTrigger或ProcessingTimeTrigger(视时间语义)
4.2 Trigger 的四个方法
public abstract class Trigger<T, W extends Window> {
// 每来一条记录调用(决定是否立即触发)
public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx);
// 事件时间定时器触发时调用
public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx);
// 处理时间定时器触发时调用
public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx);
// 窗口被清理时调用(用于清理 Trigger 自身的状态)
public abstract void clear(W window, TriggerContext ctx);
}TriggerResult 有四种返回值:
CONTINUE:不触发,继续等待FIRE:触发窗口计算,但不清除窗口状态(窗口可以再次触发)PURGE:不触发,但清除窗口状态FIRE_AND_PURGE:触发后清除窗口状态(默认行为)
4.3 自定义 Trigger 示例:计数+时间双重触发
// 需求:每收到 100 条记录触发一次,或者 1 分钟没有达到 100 条也触发一次
public class CountOrTimeTrigger extends Trigger<OrderEvent, TimeWindow> {
private final long maxCount;
private final long timeoutMs;
// 用于记录每个窗口的当前计数
private final ReducingStateDescriptor<Long> countDesc =
new ReducingStateDescriptor<>("count", Long::sum, Long.class);
@Override
public TriggerResult onElement(OrderEvent element, long timestamp,
TimeWindow window, TriggerContext ctx) throws Exception {
// 注册超时定时器(第一次进入窗口时)
ctx.registerEventTimeTimer(window.getStart() + timeoutMs);
// 更新计数
ReducingState<Long> count = ctx.getPartitionedState(countDesc);
count.add(1L);
// 计数达到阈值,立即触发
if (count.get() >= maxCount) {
count.clear();
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx)
throws Exception {
// 超时定时器触发,无论数量是否达到阈值都触发
if (time == window.getStart() + timeoutMs) {
ctx.getPartitionedState(countDesc).clear();
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
return TriggerResult.CONTINUE;
}
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
ctx.getPartitionedState(countDesc).clear();
ctx.deleteEventTimeTimer(window.getStart() + timeoutMs);
}
}第 5 章 Evictor:窗口数据的驱逐机制
5.1 Evictor 的作用
Evictor 是可选的窗口组件,允许在窗口函数执行前或后从窗口中删除部分记录。注意:Evictor 与增量聚合不兼容——只要使用了 Evictor,窗口就必须缓存所有原始记录(因为需要在任意时刻删除部分记录),无法做增量聚合。
5.2 内置 Evictor
// CountEvictor:窗口触发前,只保留最新的 N 条记录,删除较早的记录
stream
.keyBy(e -> e.getUserId())
.window(GlobalWindows.create())
.trigger(CountTrigger.of(10)) // 每 10 条触发
.evictor(CountEvictor.of(5)) // 触发前只保留最新 5 条
.process(new MyProcessWindowFunction());
// TimeEvictor:窗口触发前,删除时间戳比最新记录早超过 N 毫秒的记录
.evictor(TimeEvictor.of(Time.seconds(60))) // 只保留最近 60 秒的记录
// DeltaEvictor:基于自定义 Delta 函数删除记录
.evictor(DeltaEvictor.of(threshold, new DeltaFunction<...>() { ... }))生产避坑:Evictor 禁用增量聚合
使用 Evictor 后,Flink 无法使用 ReduceFunction 或 AggregateFunction 做增量聚合,所有原始记录都会被缓存在状态中。对于高吞吐场景,这会导致状态急剧膨胀、Checkpoint 体积很大。
大多数情况下,Evictor 的需求可以通过 ProcessWindowFunction 中自行过滤数据来替代,性能更可控。
第 6 章 窗口的生命周期与状态管理
6.1 窗口状态的创建与清理时机
窗口状态的创建:当一条记录被分配到某个窗口,且该窗口尚不存在时,Flink 创建该窗口的状态。
窗口状态的触发:当 Trigger 返回 FIRE 或 FIRE_AND_PURGE 时,窗口函数被调用执行计算。
窗口状态的清理:以下时机清理窗口状态:
- Trigger 返回
PURGE或FIRE_AND_PURGE - Watermark 超过
windowEnd + allowedLateness,状态被自动清理 - 算子调用了
clear()方法(如自定义 Trigger 中)
内存泄漏风险:如果 Trigger 持续返回 FIRE(而不是 FIRE_AND_PURGE),窗口状态不会被清理,会持续积累。全局窗口(GlobalWindows)配合自定义 Trigger 时尤其需要注意这一点。
6.2 滑动窗口的状态放大问题
以 size=1hour, slide=1min 的滑动窗口为例(并发度=1,每秒 1000 条记录):
每条记录被分配到 60 个窗口(1小时/1分钟 = 60)
同时有 60 个活跃窗口
如果使用 ProcessWindowFunction(缓存原始记录):
每个窗口缓存 1小时 × 1000条/秒 = 3,600,000 条记录
60 个窗口 = 60 × 3,600,000 = 216,000,000 条记录在状态中
假设每条记录 100 字节,总状态 = 21.6 GB!
如果使用 AggregateFunction(增量聚合):
每个窗口只有 1 个累加器(如一个 Double = 8 字节)
60 个窗口 = 60 × 8 = 480 字节
状态几乎可以忽略不计
这个对比说明:对于大窗口的高吞吐场景,选择增量聚合(ReduceFunction/AggregateFunction)vs 全量聚合(ProcessWindowFunction)的差异是几个数量级的内存占用差距。
6.3 窗口对 Checkpoint 的影响
窗口状态是 Flink 状态的一部分,会在 Checkpoint 时被持久化。窗口越大、数据量越大、窗口函数选择不当(ProcessWindowFunction),Checkpoint 体积就越大。
Checkpoint 体积过大的排查思路:
- 查看 Web UI 中每个算子的 State Size Metrics,定位哪个算子的状态最大
- 对于窗口算子,检查是否使用了 ProcessWindowFunction(可改为 Aggregate + ProcessWindow 组合)
- 检查
allowedLateness是否过长(延长了窗口状态保留时间) - 检查是否有未被触发的全局窗口(Global Window + 未正确触发的 Trigger)
小结
Flink 窗口体系的核心知识点总结:
四种窗口类型的选型原则:
- 固定周期统计 → 滚动窗口(内存最省,每条记录只属于一个窗口)
- 高频输出长范围指标 → 滑动窗口(注意内存放大:每条记录属于 size/slide 个窗口)
- 用户行为会话分析 → 会话窗口(动态边界,支持窗口合并)
- 自定义触发逻辑 → 全局窗口 + 自定义 Trigger
三类窗口函数的性能比较:
- ReduceFunction:增量聚合,内存最优,仅支持相同输入输出类型
- AggregateFunction:增量聚合,内存最优,支持不同输入/累加器/输出类型
- ProcessWindowFunction:全量缓存,内存最大,但可访问所有原始记录和窗口元数据
- AggregateFunction + ProcessWindowFunction 组合:最推荐的生产模式,兼顾性能与灵活性
窗口状态管理要点:
- 增量聚合的状态大小 = O(1);全量聚合的状态大小 = O(窗口内记录数)
- 滑动窗口的实际状态 = 单窗口状态 × (size/slide)
- Evictor 与增量聚合不兼容,会强制切换到全量缓存模式
下一篇 06 状态管理与 Checkpoint 实战 将深入 Flink 的状态类型(ValueState、ListState、MapState、AggregatingState)、状态后端选型,以及 Checkpoint 和 Savepoint 的生产配置与使用实践。
思考题
- 会话窗口(Session Window)的核心挑战是”动态合并”——同一 Key 的相邻事件如果间隔小于
sessionGap,就属于同一个 Session。在分布式场景下,属于同一 Key 的事件分布在多个 SubTask 上(通过keyBy()确保同 Key 到同一 SubTask),Session 的合并逻辑是在 Task 级别完成的,不需要跨 SubTask 通信。但如果同一个 Session 的事件因为某种原因(如keyBy前的 Rebalance)出现在不同 SubTask 上,会发生什么?- 滑动窗口中,每条记录属于
windowSize / slideSize个窗口(如 10 分钟窗口每 1 分钟滑动一次,每条记录属于 10 个窗口)。Flink 实现滑动窗口有两种思路:(1)将每条记录复制 N 份分别放入 N 个窗口的 State;(2)只存储一份数据,用区间查询来检索属于某个窗口的数据。Flink 实际采用的是哪种思路?在滑动比很大时,这个实现有什么性能瓶颈?WindowFunction(全窗口函数)需要等窗口内所有数据收集完毕后才计算,而AggregateFunction(增量聚合函数)在每条数据到来时立即更新聚合中间状态。当窗口非常大(如 24 小时窗口)且数据量极大时,WindowFunction的内存压力会非常高。在必须使用复杂窗口逻辑(如 Top-N)又不想在内存中保存所有数据时,如何结合两者的优点?