摘要:

窗口(Window)是流处理中将无界数据切割为有限数据集进行聚合计算的核心机制。Flink 提供了四种窗口类型(滚动、滑动、会话、全局),三类窗口函数(ReduceFunction、AggregateFunction、ProcessWindowFunction),以及可插拔的触发器(Trigger)和驱逐器(Evictor)机制。本文不只列举 API,而是深入每种窗口的设计语义:滑动窗口为什么会导致数据被”重复计算”?会话窗口如何在没有固定边界的情况下动态划分窗口?AggregateFunctionProcessWindowFunction 各自适合什么场景,如何组合使用以兼顾性能与灵活性?读完本文,你将能够根据具体的业务需求选择正确的窗口类型和函数,并理解窗口状态管理对内存的影响。


第 1 章 窗口的本质:为无界流划定计算边界

1.1 为什么流处理需要窗口

流数据是无终点的——Kafka 的订单 Topic 会持续不断地收到新消息,用户行为日志流永远不会结束。但业务上的计算往往需要一个”时间范围”:过去 1 分钟的订单总金额、过去 1 小时的活跃用户数、某次用户会话期间的总点击次数。

如果没有窗口机制,唯一能做的是”全局聚合”——对从作业启动到当前时刻的所有数据做累加。这有两个问题:

  1. 状态无限增长:累加值需要存储所有历史数据,内存最终会耗尽
  2. 业务无意义:业务关心的是”最近 N 分钟”而不是”历史所有”

窗口将无界流切割为有限的时间(或数量)片段,让聚合计算变得有意义且内存可控。

1.2 窗口的四个核心属性

理解任何一种窗口,都需要回答四个问题:

  1. 窗口如何划分(Window Assigner):一条记录属于哪些窗口?一条记录可以属于多个窗口(滑动窗口)还是只属于一个(滚动窗口)?
  2. 窗口何时触发计算(Window Trigger):什么条件下触发窗口函数执行?默认是 Watermark 超过窗口结束时间
  3. 窗口如何计算(Window Function):对窗口内的所有数据执行什么操作?
  4. 窗口数据何时清理(Evictor + allowedLateness):窗口状态何时从内存中清除?

第 2 章 四种窗口类型详解

2.1 滚动窗口(Tumbling Window):互不重叠的时间分片

是什么:滚动窗口将时间轴划分为一系列大小固定、互不重叠的窗口。每条记录恰好属于一个窗口。

时间轴:  0    10   20   30   40   50 ...(秒)
窗口:   [0,10) [10,20) [20,30) [30,40) [40,50) ...
记录分配:t=5 → [0,10);t=15 → [10,20);t=25 → [20,30)

代码示例(事件时间,每 5 分钟一个窗口)

stream
    .keyBy(order -> order.getUserId())
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .sum("amount");
 
// 处理时间版本
stream
    .keyBy(order -> order.getUserId())
    .window(TumblingProcessingTimeWindows.of(Time.minutes(5)))
    .sum("amount");

窗口偏移(Window Offset):默认情况下,窗口以 Unix 时间戳的整数倍为边界(如 5 分钟窗口的边界是 00:00、00:05、00:10…)。如果需要对齐到特定时区(如东八区的整点),需要配置 offset:

// 东八区(UTC+8):偏移 -8 小时,使窗口边界对齐到北京时间整点
.window(TumblingEventTimeWindows.of(Time.hours(1), Time.hours(-8)))
// 这样窗口边界是 UTC 的 16:00、17:00(= 北京时间 00:00、01:00)

适用场景:所有需要”每 N 分钟/小时统计一次”的场景——每分钟的 PV/UV、每小时的销售额、每天的新增用户数。

内存特性:每个 Key 在任意时刻最多只有 1 个活跃窗口(不重叠),内存占用最小。

2.2 滑动窗口(Sliding Window):重叠的时间窗口

是什么:滑动窗口有两个参数:窗口大小(size)滑动步长(slide)。窗口每隔 slide 时间滑动一次,相邻窗口有 size - slide 的重叠区间。

size=10s, slide=5s:
时间轴:  0    5    10   15   20   25 ...(秒)
窗口1:  [0,10)
窗口2:       [5,15)
窗口3:             [10,20)
窗口4:                   [15,25)

记录 t=7 属于:[0,10) 和 [5,15) 两个窗口(被计算了两次!)
记录 t=12 属于:[5,15) 和 [10,20) 两个窗口

每条记录属于 ceil(size/slide) 个窗口。例如 size=1hour、slide=5min,每条记录被分配到 12 个窗口。

stream
    .keyBy(order -> order.getUserId())
    // 窗口大小 10 分钟,每 5 分钟滑动一次
    .window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(5)))
    .aggregate(new AverageAmountAggregator());

为什么要有滑动窗口(不这样会怎样)

考虑”过去 10 分钟的平均响应时间”这个监控指标。如果用滚动窗口(10 分钟),指标每 10 分钟才更新一次,实时性差。如果用滑动窗口(size=10min, slide=1min),指标每 1 分钟更新一次,但每次都反映过去完整的 10 分钟数据——这就是滑动窗口的价值:高频输出、长范围覆盖

内存开销警告:每条记录被多个窗口持有,在窗口状态未触发时,同一条数据被存储了 ceil(size/slide) 份(或者说,有 ceil(size/slide) 个窗口同时持有对这条数据的引用)。slide 越小,内存开销越大。

生产避坑:滑动窗口的状态膨胀

size=1hour, slide=1min 意味着每条数据被分配到 60 个窗口,同时有 60 个活跃窗口在维护状态。如果每个窗口的数据量大,这 60 份状态会消耗大量内存,容易触发 OOM 或导致 Checkpoint 体积过大。

替代方案:使用滚动窗口(1 分钟)+ 多窗口结果流 Join,或者用 ProcessFunction + 环形 Buffer 自定义实现,按需滑动计算。

2.3 会话窗口(Session Window):基于活跃度的动态窗口

是什么:会话窗口不基于固定时间,而是根据数据活跃程度动态划分窗口。如果两条记录之间的时间间隔超过 sessionGap(会话超时时间),它们就被分配到两个不同的窗口。

sessionGap = 5s:
事件流:  t=1  t=3  t=4  t=10  t=11  t=20  t=22  t=23
间隔:    2s   1s   6s!  1s    9s!   2s    1s

窗口划分:
  [t=1, t=4+5) = [1, 9)      ← t=10 与 t=4 间隔 6s > 5s,新窗口
  [t=10, t=11+5) = [10, 16)  ← t=20 与 t=11 间隔 9s > 5s,新窗口
  [t=20, t=23+5) = [20, 28)
stream
    .keyBy(event -> event.getUserId())
    // 静态会话间隔
    .window(EventTimeSessionWindows.withGap(Time.minutes(30)))
    .aggregate(new SessionSummaryAggregator());
 
// 动态会话间隔(每个 Key 可以有不同的超时时间)
.window(EventTimeSessionWindows.withDynamicGap(
    (element) -> element.getSessionTimeout()  // 从记录中提取超时时间
))

会话窗口的实现难题:会话窗口的边界不是预先知道的(不像滚动窗口知道每个窗口的 [start, end)),而是随着新数据到来动态延伸。这给实现带来了挑战:

当收到新记录 t=23 时,它可能被分配到已有的窗口(如果前一个记录是 t=20,间隔 3s < 5s),也可能开启新窗口(如果前一个记录是 t=12,间隔 11s > 5s)。更复杂的是:如果有两个相邻的窗口 [1, 6)[5, 10),它们应该被合并[1, 10)

Flink 通过 MergingWindowAssigner 机制处理窗口合并:每次新记录到来时检查是否需要与已有窗口合并,触发 mergeWindows() 方法。窗口状态也需要随之合并。

适用场景:用户行为分析(一次会话内的所有点击/浏览)、网络安全(同一次攻击的所有请求)、客服系统(一次对话的所有消息)。

内存特性:每个活跃用户有一个(或多个尚未合并的)活跃窗口,窗口大小不固定,取决于用户的活跃时间。长时间活跃的用户窗口可能积累大量数据。

2.4 全局窗口(Global Window):手动控制的无限窗口

是什么:全局窗口将所有数据分配到一个不会自动触发的窗口,需要配合**自定义 Trigger(触发器)**使用。

stream
    .keyBy(event -> event.getUserId())
    .window(GlobalWindows.create())
    .trigger(new MyCustomTrigger())  // 必须配合自定义触发器,否则窗口永远不触发
    .aggregate(new MyAggregator());

适用场景:当四种标准窗口都不满足需求时(如”每收到 100 条记录触发一次计算”、“当某个特定事件到来时触发窗口”),使用全局窗口 + 自定义 Trigger 实现灵活的触发逻辑。


第 3 章 窗口函数:对窗口数据执行什么操作

窗口函数(Window Function)决定了对窗口内的数据执行什么计算。Flink 提供三类函数,性能和灵活性各有侧重。

3.1 ReduceFunction:增量聚合,内存最优

是什么ReduceFunction增量聚合函数——每来一条新记录,立刻与当前的聚合结果做一次 reduce 操作,只保留一个聚合值,不缓存原始记录。

内存模型:窗口状态中只存储一个”中间聚合值”,与窗口内的数据量无关。无论窗口内有 1 条还是 100 万条记录,状态大小始终是一个固定大小的对象。

// 需求:每分钟统计每个用户的最大订单金额
stream
    .keyBy(order -> order.getUserId())
    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
    .reduce(
        (order1, order2) -> order1.getAmount() > order2.getAmount() ? order1 : order2
        // ReduceFunction:输入输出类型必须相同
    );

约束:输入类型和输出类型必须相同(ReduceFunction<T> 的输入和输出都是 T)。如果需要不同的输入/输出类型(如输入 OrderEvent,输出 OrderSummary),需要用 AggregateFunction

3.2 AggregateFunction:增量聚合,支持类型转换

是什么AggregateFunction<IN, ACC, OUT> 是增量聚合函数的增强版,支持输入类型(IN)、累加器类型(ACC)和输出类型(OUT)三者不同。

三个方法

  • createAccumulator():创建初始累加器
  • add(IN value, ACC accumulator):将新记录合并到累加器
  • getResult(ACC accumulator):从累加器生成最终输出
// 需求:计算每 1 分钟每个用户的平均订单金额
// 输入:OrderEvent;中间状态:(totalAmount: Double, count: Long);输出:Double
 
public class AverageAmountAggregator 
        implements AggregateFunction<OrderEvent, Tuple2<Double, Long>, Double> {
 
    @Override
    public Tuple2<Double, Long> createAccumulator() {
        return Tuple2.of(0.0, 0L);  // 初始:金额=0,数量=0
    }
 
    @Override
    public Tuple2<Double, Long> add(OrderEvent order, Tuple2<Double, Long> acc) {
        return Tuple2.of(acc.f0 + order.getAmount(), acc.f1 + 1);
    }
 
    @Override
    public Double getResult(Tuple2<Double, Long> acc) {
        return acc.f1 == 0 ? 0.0 : acc.f0 / acc.f1;  // 平均值
    }
 
    @Override
    public Tuple2<Double, Long> merge(Tuple2<Double, Long> a, Tuple2<Double, Long> b) {
        // 会话窗口合并时调用(非会话窗口可以留空实现)
        return Tuple2.of(a.f0 + b.f0, a.f1 + b.f1);
    }
}
 
// 使用
stream
    .keyBy(order -> order.getUserId())
    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
    .aggregate(new AverageAmountAggregator());

内存模型:与 ReduceFunction 相同,窗口状态中只存储一个 ACC 类型的累加器对象,内存占用极低。

AggregateFunction vs ReduceFunction

特性ReduceFunctionAggregateFunction
输入/输出类型必须相同(T → T可以不同(IN → ACC → OUT
灵活性较低较高
实现复杂度低(只需一个方法)稍高(三个方法)
性能相同(都是增量聚合)相同
适用场景简单聚合(求和、取最大)复杂聚合(平均值、百分位数)

3.3 ProcessWindowFunction:全量计算,最灵活

是什么ProcessWindowFunction<IN, OUT, KEY, W> 是全量窗口函数——窗口触发时,所有窗口内的原始记录都会被缓存并一次性传递给函数,函数可以遍历所有记录,访问窗口元数据(WindowContext)。

// 需求:统计每分钟每个用户的 Top-3 订单,并在输出中附带窗口时间范围
stream
    .keyBy(order -> order.getUserId())
    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
    .process(new ProcessWindowFunction<OrderEvent, TopOrderResult, String, TimeWindow>() {
 
        @Override
        public void process(
                String userId,
                Context context,            // 窗口上下文:可访问窗口时间、状态等
                Iterable<OrderEvent> orders, // 窗口内所有原始记录
                Collector<TopOrderResult> out) {
 
            // 获取窗口时间范围
            TimeWindow window = context.window();
            long windowStart = window.getStart();
            long windowEnd = window.getEnd();
 
            // 遍历所有记录,找 Top-3
            List<OrderEvent> sortedOrders = StreamSupport
                .stream(orders.spliterator(), false)
                .sorted(Comparator.comparingDouble(OrderEvent::getAmount).reversed())
                .limit(3)
                .collect(Collectors.toList());
 
            out.collect(new TopOrderResult(userId, windowStart, windowEnd, sortedOrders));
        }
    });

内存代价(重要)ProcessWindowFunction 需要将窗口内的所有原始记录缓存在状态中,直到窗口触发。如果窗口大小为 1 小时,1 小时内的所有数据都需要在内存/状态后端中暂存。对于高吞吐场景,这可能消耗大量内存。

何时必须用 ProcessWindowFunction

  • 需要访问窗口的时间信息(window.getStart()window.getEnd()
  • 需要遍历窗口内所有原始记录(如排序、Top-K)
  • 需要访问窗口级别的状态(context.windowState())或全局状态(context.globalState()
  • 需要向侧输出流写数据(context.output()

3.4 AggregateFunction + ProcessWindowFunction 组合:兼顾性能与灵活性

这是生产中最推荐的模式:用 AggregateFunction 做增量聚合(减少内存占用),用 ProcessWindowFunction 在窗口触发时补充窗口元数据(如时间范围)。

// 需求:统计每分钟每个用户的订单总金额,输出中需要包含窗口时间范围
stream
    .keyBy(order -> order.getUserId())
    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
    .aggregate(
        // 第一个参数:增量聚合(只存储一个 Double,不缓存原始记录)
        new AggregateFunction<OrderEvent, Double, Double>() {
            public Double createAccumulator() { return 0.0; }
            public Double add(OrderEvent o, Double acc) { return acc + o.getAmount(); }
            public Double getResult(Double acc) { return acc; }
            public Double merge(Double a, Double b) { return a + b; }
        },
        // 第二个参数:ProcessWindowFunction,接收聚合结果(仅 1 个 Double)+ 窗口元数据
        new ProcessWindowFunction<Double, OrderWindowSummary, String, TimeWindow>() {
            @Override
            public void process(String userId, Context ctx,
                                Iterable<Double> aggregatedAmounts,
                                Collector<OrderWindowSummary> out) {
                // aggregatedAmounts 只有一个元素(AggregateFunction 的结果)
                double totalAmount = aggregatedAmounts.iterator().next();
                TimeWindow w = ctx.window();
                out.collect(new OrderWindowSummary(userId, w.getStart(), w.getEnd(), totalAmount));
            }
        }
    );

这种组合的优势

  • 状态中只存储 AggregateFunction 的累加器(1 个 Double),不缓存原始记录
  • 窗口触发时,ProcessWindowFunction 接收到的 Iterable 中只有 1 个元素(聚合结果),迭代开销极低
  • 既能享受增量聚合的内存优势,又能在最终输出时访问窗口元数据

第 4 章 Trigger:自定义窗口触发逻辑

4.1 默认 Trigger 的行为

每种窗口类型都有对应的默认 Trigger:

  • 事件时间窗口(EventTimeWindows):EventTimeTrigger,当 Watermark 超过窗口结束时间时触发
  • 处理时间窗口(ProcessingTimeWindows):ProcessingTimeTrigger,当系统时钟超过窗口结束时间时触发
  • 会话窗口:EventTimeTriggerProcessingTimeTrigger(视时间语义)

4.2 Trigger 的四个方法

public abstract class Trigger<T, W extends Window> {
 
    // 每来一条记录调用(决定是否立即触发)
    public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx);
 
    // 事件时间定时器触发时调用
    public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx);
 
    // 处理时间定时器触发时调用
    public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx);
 
    // 窗口被清理时调用(用于清理 Trigger 自身的状态)
    public abstract void clear(W window, TriggerContext ctx);
}

TriggerResult 有四种返回值:

  • CONTINUE:不触发,继续等待
  • FIRE:触发窗口计算,但不清除窗口状态(窗口可以再次触发)
  • PURGE:不触发,但清除窗口状态
  • FIRE_AND_PURGE:触发后清除窗口状态(默认行为)

4.3 自定义 Trigger 示例:计数+时间双重触发

// 需求:每收到 100 条记录触发一次,或者 1 分钟没有达到 100 条也触发一次
public class CountOrTimeTrigger extends Trigger<OrderEvent, TimeWindow> {
 
    private final long maxCount;
    private final long timeoutMs;
 
    // 用于记录每个窗口的当前计数
    private final ReducingStateDescriptor<Long> countDesc =
        new ReducingStateDescriptor<>("count", Long::sum, Long.class);
 
    @Override
    public TriggerResult onElement(OrderEvent element, long timestamp,
                                   TimeWindow window, TriggerContext ctx) throws Exception {
        // 注册超时定时器(第一次进入窗口时)
        ctx.registerEventTimeTimer(window.getStart() + timeoutMs);
 
        // 更新计数
        ReducingState<Long> count = ctx.getPartitionedState(countDesc);
        count.add(1L);
 
        // 计数达到阈值,立即触发
        if (count.get() >= maxCount) {
            count.clear();
            return TriggerResult.FIRE;
        }
        return TriggerResult.CONTINUE;
    }
 
    @Override
    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx)
            throws Exception {
        // 超时定时器触发,无论数量是否达到阈值都触发
        if (time == window.getStart() + timeoutMs) {
            ctx.getPartitionedState(countDesc).clear();
            return TriggerResult.FIRE;
        }
        return TriggerResult.CONTINUE;
    }
 
    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
        return TriggerResult.CONTINUE;
    }
 
    @Override
    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
        ctx.getPartitionedState(countDesc).clear();
        ctx.deleteEventTimeTimer(window.getStart() + timeoutMs);
    }
}

第 5 章 Evictor:窗口数据的驱逐机制

5.1 Evictor 的作用

Evictor 是可选的窗口组件,允许在窗口函数执行从窗口中删除部分记录。注意:Evictor 与增量聚合不兼容——只要使用了 Evictor,窗口就必须缓存所有原始记录(因为需要在任意时刻删除部分记录),无法做增量聚合。

5.2 内置 Evictor

// CountEvictor:窗口触发前,只保留最新的 N 条记录,删除较早的记录
stream
    .keyBy(e -> e.getUserId())
    .window(GlobalWindows.create())
    .trigger(CountTrigger.of(10))        // 每 10 条触发
    .evictor(CountEvictor.of(5))         // 触发前只保留最新 5 条
    .process(new MyProcessWindowFunction());
 
// TimeEvictor:窗口触发前,删除时间戳比最新记录早超过 N 毫秒的记录
.evictor(TimeEvictor.of(Time.seconds(60)))  // 只保留最近 60 秒的记录
 
// DeltaEvictor:基于自定义 Delta 函数删除记录
.evictor(DeltaEvictor.of(threshold, new DeltaFunction<...>() { ... }))

生产避坑:Evictor 禁用增量聚合

使用 Evictor 后,Flink 无法使用 ReduceFunction 或 AggregateFunction 做增量聚合,所有原始记录都会被缓存在状态中。对于高吞吐场景,这会导致状态急剧膨胀、Checkpoint 体积很大。

大多数情况下,Evictor 的需求可以通过 ProcessWindowFunction 中自行过滤数据来替代,性能更可控。


第 6 章 窗口的生命周期与状态管理

6.1 窗口状态的创建与清理时机

窗口状态的创建:当一条记录被分配到某个窗口,且该窗口尚不存在时,Flink 创建该窗口的状态。

窗口状态的触发:当 Trigger 返回 FIREFIRE_AND_PURGE 时,窗口函数被调用执行计算。

窗口状态的清理:以下时机清理窗口状态:

  1. Trigger 返回 PURGEFIRE_AND_PURGE
  2. Watermark 超过 windowEnd + allowedLateness,状态被自动清理
  3. 算子调用了 clear() 方法(如自定义 Trigger 中)

内存泄漏风险:如果 Trigger 持续返回 FIRE(而不是 FIRE_AND_PURGE),窗口状态不会被清理,会持续积累。全局窗口(GlobalWindows)配合自定义 Trigger 时尤其需要注意这一点。

6.2 滑动窗口的状态放大问题

size=1hour, slide=1min 的滑动窗口为例(并发度=1,每秒 1000 条记录):

每条记录被分配到 60 个窗口(1小时/1分钟 = 60)
同时有 60 个活跃窗口

如果使用 ProcessWindowFunction(缓存原始记录):
  每个窗口缓存 1小时 × 1000条/秒 = 3,600,000 条记录
  60 个窗口 = 60 × 3,600,000 = 216,000,000 条记录在状态中
  假设每条记录 100 字节,总状态 = 21.6 GB!

如果使用 AggregateFunction(增量聚合):
  每个窗口只有 1 个累加器(如一个 Double = 8 字节)
  60 个窗口 = 60 × 8 = 480 字节
  状态几乎可以忽略不计

这个对比说明:对于大窗口的高吞吐场景,选择增量聚合(ReduceFunction/AggregateFunction)vs 全量聚合(ProcessWindowFunction)的差异是几个数量级的内存占用差距

6.3 窗口对 Checkpoint 的影响

窗口状态是 Flink 状态的一部分,会在 Checkpoint 时被持久化。窗口越大、数据量越大、窗口函数选择不当(ProcessWindowFunction),Checkpoint 体积就越大。

Checkpoint 体积过大的排查思路

  1. 查看 Web UI 中每个算子的 State Size Metrics,定位哪个算子的状态最大
  2. 对于窗口算子,检查是否使用了 ProcessWindowFunction(可改为 Aggregate + ProcessWindow 组合)
  3. 检查 allowedLateness 是否过长(延长了窗口状态保留时间)
  4. 检查是否有未被触发的全局窗口(Global Window + 未正确触发的 Trigger)

小结

Flink 窗口体系的核心知识点总结:

四种窗口类型的选型原则

  • 固定周期统计 → 滚动窗口(内存最省,每条记录只属于一个窗口)
  • 高频输出长范围指标 → 滑动窗口(注意内存放大:每条记录属于 size/slide 个窗口)
  • 用户行为会话分析 → 会话窗口(动态边界,支持窗口合并)
  • 自定义触发逻辑 → 全局窗口 + 自定义 Trigger

三类窗口函数的性能比较

  • ReduceFunction:增量聚合,内存最优,仅支持相同输入输出类型
  • AggregateFunction:增量聚合,内存最优,支持不同输入/累加器/输出类型
  • ProcessWindowFunction:全量缓存,内存最大,但可访问所有原始记录和窗口元数据
  • AggregateFunction + ProcessWindowFunction 组合:最推荐的生产模式,兼顾性能与灵活性

窗口状态管理要点

  • 增量聚合的状态大小 = O(1);全量聚合的状态大小 = O(窗口内记录数)
  • 滑动窗口的实际状态 = 单窗口状态 × (size/slide)
  • Evictor 与增量聚合不兼容,会强制切换到全量缓存模式

下一篇 06 状态管理与 Checkpoint 实战 将深入 Flink 的状态类型(ValueState、ListState、MapState、AggregatingState)、状态后端选型,以及 Checkpoint 和 Savepoint 的生产配置与使用实践。

思考题

  1. 会话窗口(Session Window)的核心挑战是”动态合并”——同一 Key 的相邻事件如果间隔小于 sessionGap,就属于同一个 Session。在分布式场景下,属于同一 Key 的事件分布在多个 SubTask 上(通过 keyBy() 确保同 Key 到同一 SubTask),Session 的合并逻辑是在 Task 级别完成的,不需要跨 SubTask 通信。但如果同一个 Session 的事件因为某种原因(如 keyBy 前的 Rebalance)出现在不同 SubTask 上,会发生什么?
  2. 滑动窗口中,每条记录属于 windowSize / slideSize 个窗口(如 10 分钟窗口每 1 分钟滑动一次,每条记录属于 10 个窗口)。Flink 实现滑动窗口有两种思路:(1)将每条记录复制 N 份分别放入 N 个窗口的 State;(2)只存储一份数据,用区间查询来检索属于某个窗口的数据。Flink 实际采用的是哪种思路?在滑动比很大时,这个实现有什么性能瓶颈?
  3. WindowFunction(全窗口函数)需要等窗口内所有数据收集完毕后才计算,而 AggregateFunction(增量聚合函数)在每条数据到来时立即更新聚合中间状态。当窗口非常大(如 24 小时窗口)且数据量极大时,WindowFunction 的内存压力会非常高。在必须使用复杂窗口逻辑(如 Top-N)又不想在内存中保存所有数据时,如何结合两者的优点?