摘要:

Table API 和 Flink SQL 是构建在 DataStream API 之上的高阶声明式编程接口,让开发者能用 SQL 的方式表达复杂的流处理逻辑,极大降低了开发门槛。但”用 SQL 写流处理”并不是把传统数据库 SQL 搬过来就行——流处理的时间语义、乱序处理、持续更新等特性带来了一系列 SQL 扩展:时间属性列、窗口 TVF(表值函数)、流式 Join 的语义差异、Changelog 流与动态表的对应关系。本文从 Table API 的环境初始化出发,系统讲解 DDL 建表(Kafka Source/Sink)、常用 DML(SELECT、窗口聚合、JOIN)、DataStream 与 Table 的互转,以及生产中最常见的”SQL 作业结果不符合预期”问题的根因分析。


第 1 章 Table API 的统一编程模型

前面几篇文章介绍的 DataStream API 功能强大但相对底层:要实现一个”每 5 分钟统计每个品类的 GMV,按 GMV 倒序输出 Top 10”,需要手写 keyBy + window + AggregateFunction + ProcessWindowFunction 等大量样板代码。更麻烦的是,如果查询逻辑发生变化(如改为每 10 分钟、或增加过滤条件),需要修改多处代码,且不熟悉 Flink 的同学(如数据分析师)无法直接参与开发。

Flink SQL 解决了这个问题——同样的需求,用一条 SQL 表达:

SELECT
    category,
    TUMBLE_START(rowtime, INTERVAL '5' MINUTE) AS window_start,
    SUM(amount) AS gmv,
    ROW_NUMBER() OVER (PARTITION BY TUMBLE_START(rowtime, INTERVAL '5' MINUTE)
                       ORDER BY SUM(amount) DESC) AS rn
FROM orders
GROUP BY category, TUMBLE(rowtime, INTERVAL '5' MINUTE)
HAVING rn <= 10

SQL 的声明式特性让你只需描述”要什么”,不需要描述”怎么做”。更重要的是,Flink SQL 与批处理 SQL 使用相同的语法——同一个 SQL 可以在流处理和批处理模式下执行(一致的 API,不同的执行引擎)。

1.2 动态表(Dynamic Table):流与表的统一

Table API 引入了**动态表(Dynamic Table)**这个核心概念,用来统一”流”和”表”的语义:

  • 静态表(批处理):数据是完整的、固定的集合。SELECT * FROM users WHERE age > 18 在某个时刻执行,返回一个完整的结果集。
  • 动态表(流处理):数据持续到来,表的内容随时间不断变化。对动态表执行 SELECT * FROM orders WHERE amount > 100 得到的不是一个固定结果集,而是一个持续更新的结果流。

动态表与 Changelog

动态表的变化可以用 Changelog(变更日志)来表示,Changelog 有三种操作类型:

  • INSERT(+I):新增一行
  • UPDATE_BEFORE(-U):更新前的旧值(配合 UPDATE_AFTER 使用)
  • UPDATE_AFTER(+U):更新后的新值
  • DELETE(-D):删除一行

例如,对一个动态表执行 COUNT(*) GROUP BY category,随着数据流入:

输入流:
  +I: {category="手机", amount=1000}   → 输出 Changelog: +I {category="手机", cnt=1}
  +I: {category="手机", amount=500}    → 输出 Changelog: -U {category="手机", cnt=1}, +U {category="手机", cnt=2}
  +I: {category="电脑", amount=2000}   → 输出 Changelog: +I {category="电脑", cnt=1}

这种 Changelog 流就是 Flink SQL 的输出格式,下游 Sink 需要能处理这种”更新”语义(如 Upsert Kafka、MySQL 主键更新)。

设计哲学:流表二象性

“流”和”表”是同一事物的两面:表是流在某个时刻的快照,流是表随时间变化的序列。Flink 的动态表理论统一了这两种视角,让流处理 SQL 和批处理 SQL 在语法层面完全一致,区别只在执行引擎。这种统一性是 Flink 能够成为”批流一体”平台的理论基础。


第 2 章 Table 环境初始化

2.1 TableEnvironment 的创建

// 方式一:纯 Table API / SQL 模式(推荐,当不需要混用 DataStream API 时)
TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.newInstance()
    .inStreamingMode()   // 流处理模式(默认)
    // .inBatchMode()    // 批处理模式
    .build()
);
 
// 方式二:StreamTableEnvironment(需要与 DataStream API 混用时)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
env.enableCheckpointing(60_000);
 
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// StreamTableEnvironment 支持 DataStream ↔ Table 互转,适合混合编程

2.2 全局配置

// 时区配置(影响时间函数、窗口对齐)
tableEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));
 
// SQL Hint 配置(影响优化器行为)
tableEnv.getConfig().set("table.exec.mini-batch.enabled", "true");
tableEnv.getConfig().set("table.exec.mini-batch.allow-latency", "5 s");
tableEnv.getConfig().set("table.exec.mini-batch.size", "5000");
// mini-batch 优化:将微批次数据一起处理,减少状态访问次数(对聚合类作业效果显著)

第 3 章 DDL:定义 Source 与 Sink 表

3.1 Kafka Source 表

-- 通过 SQL DDL 定义 Kafka Source 表
CREATE TABLE orders (
    order_id      BIGINT,
    user_id       STRING,
    product_id    STRING,
    category      STRING,
    amount        DOUBLE,
    order_time    TIMESTAMP(3),          -- 事件时间字段(精度 3 = 毫秒)
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND  -- Watermark 策略
) WITH (
    'connector' = 'kafka',
    'topic' = 'orders',
    'properties.bootstrap.servers' = 'kafka:9092',
    'properties.group.id' = 'flink-sql-consumer',
    'scan.startup.mode' = 'earliest-offset',  -- 从最早 offset 消费
    'format' = 'json',                         -- 数据格式
    'json.fail-on-missing-field' = 'false',    -- 字段缺失时不报错(容错)
    'json.ignore-parse-errors' = 'true'        -- 解析错误时跳过该记录
);

关键字段解析

  • TIMESTAMP(3):带精度的时间戳类型,括号内的数字表示秒以下的精度(3 = 毫秒)
  • WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND:声明 order_time 为事件时间属性列,Watermark = 事件时间 - 5 秒(对应 DataStream API 中的 forBoundedOutOfOrderness(Duration.ofSeconds(5))
  • 'scan.startup.mode' = 'earliest-offset':等同于 DataStream API 中的 OffsetsInitializer.earliest()

数据格式(format)支持

Format说明典型用途
jsonJSON 格式,每条消息是一个 JSON 对象最常用
avroApache Avro 二进制格式Schema Registry 场景
csv逗号分隔文本日志类数据
debezium-jsonDebezium CDC 格式MySQL Binlog 入 Flink
canal-jsonCanal CDC 格式MySQL Binlog 入 Flink(阿里生态)
raw原始字节/字符串自定义解析场景

3.2 Kafka Sink 表(Upsert 模式)

-- Upsert Kafka Sink:支持 UPDATE 语义(通过 Kafka 的 compacted topic 实现)
-- 必须指定 PRIMARY KEY,作为 Kafka 消息的 Key(相同 Key 的消息会被 compaction)
CREATE TABLE order_stats (
    category     STRING,
    window_start TIMESTAMP(3),
    window_end   TIMESTAMP(3),
    total_gmv    DOUBLE,
    order_count  BIGINT,
    PRIMARY KEY (category, window_start) NOT ENFORCED  -- 联合主键
) WITH (
    'connector' = 'upsert-kafka',            -- upsert-kafka 而不是 kafka
    'topic' = 'order-stats',
    'properties.bootstrap.servers' = 'kafka:9092',
    'key.format' = 'json',                   -- Key 的序列化格式
    'value.format' = 'json'                  -- Value 的序列化格式
);
 
-- 普通 Kafka Sink(只支持 INSERT,不支持 UPDATE)
CREATE TABLE order_events_sink (
    order_id   BIGINT,
    event_type STRING,
    ts         TIMESTAMP(3)
) WITH (
    'connector' = 'kafka',
    'topic' = 'order-events-processed',
    'properties.bootstrap.servers' = 'kafka:9092',
    'format' = 'json'
);

Upsert Kafka vs 普通 Kafka Sink

普通 Kafka Sink 只支持追加写入(INSERT),如果上游查询产生了 UPDATE 或 DELETE 消息(如 COUNT GROUP BY 的结果会不断更新),普通 Kafka Sink 无法处理,会报错。

Upsert Kafka Sink 将 UPDATE 操作转为 Kafka 消息写入:DELETE 对应 null Value 的消息,INSERT/UPDATE 对应有 Value 的消息。配合 Kafka topic 的 Log Compaction,消费者能看到每个 Key 的最新值(实现 UPSERT 语义)。

3.3 Print 表与 BlackHole 表(调试用)

-- Print 表:将结果打印到控制台(调试时替代生产 Sink)
CREATE TABLE print_sink (
    category STRING,
    gmv      DOUBLE
) WITH (
    'connector' = 'print'
);
 
-- BlackHole 表:丢弃所有数据(性能测试时用,不关心输出)
CREATE TABLE blackhole_sink (
    id BIGINT
) WITH (
    'connector' = 'blackhole'
);

第 4 章 常用查询:SELECT、过滤、聚合

4.1 基础 SELECT 与过滤

-- 基础查询:过滤高金额订单
SELECT
    order_id,
    user_id,
    amount,
    order_time
FROM orders
WHERE amount > 1000
  AND category IN ('手机', '电脑', '平板');
 
-- 计算列:增加派生字段
SELECT
    order_id,
    user_id,
    amount,
    amount * 0.1 AS tax,                                  -- 计算税额
    CASE WHEN amount > 1000 THEN '高价值' ELSE '普通' END AS tier,
    DATE_FORMAT(order_time, 'yyyy-MM-dd HH:mm') AS time_str
FROM orders;

4.2 窗口聚合:TVF(表值函数)语法

Flink 1.13 引入了基于**表值函数(Table-Valued Function,TVF)**的窗口语法,比旧的 GROUP BY TUMBLE/HOP/SESSION 语法更强大(支持窗口 Top-N、窗口 Join 等)。

滚动窗口(TUMBLE)

-- 每 5 分钟统计每个品类的 GMV
SELECT
    category,
    window_start,
    window_end,
    SUM(amount)   AS total_gmv,
    COUNT(*)      AS order_count,
    AVG(amount)   AS avg_amount
FROM TABLE(
    TUMBLE(TABLE orders, DESCRIPTOR(order_time), INTERVAL '5' MINUTE)
)
GROUP BY category, window_start, window_end;

滑动窗口(HOP)

-- 每 1 分钟输出一次过去 10 分钟的统计(size=10min, slide=1min)
SELECT
    category,
    window_start,
    window_end,
    SUM(amount) AS total_gmv
FROM TABLE(
    HOP(TABLE orders, DESCRIPTOR(order_time), INTERVAL '1' MINUTE, INTERVAL '10' MINUTE)
    -- HOP(table, timecol, slide, size)
)
GROUP BY category, window_start, window_end;

会话窗口(SESSION)

-- 按用户会话统计(30分钟无活动则会话结束)
SELECT
    user_id,
    window_start,
    window_end,
    COUNT(*) AS click_count,
    SUM(amount) AS session_gmv
FROM TABLE(
    SESSION(TABLE orders, DESCRIPTOR(order_time), INTERVAL '30' MINUTE)
)
GROUP BY user_id, window_start, window_end;

4.3 Over 聚合(滑动聚合)

Over 聚合在每条记录上计算一个基于历史数据的聚合值(类似数据库的窗口函数),不等到窗口关闭就实时输出:

-- 每来一条订单,计算该用户过去 1 小时的累计消费金额
SELECT
    user_id,
    order_id,
    amount,
    order_time,
    SUM(amount) OVER (
        PARTITION BY user_id
        ORDER BY order_time
        RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
    ) AS cumulative_1h
FROM orders;

ROWS BETWEEN vs RANGE BETWEEN

  • ROWS BETWEEN 99 PRECEDING AND CURRENT ROW:基于行数的滑动窗口,包含当前行及前 99 行(共 100 行)
  • RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW:基于时间的滑动窗口,包含过去 1 小时内的所有行(行数不固定)

第 5 章 流式 JOIN:最复杂但最重要的查询

5.1 流与静态表的 Join(Lookup Join)

场景:订单流与用户维度表做关联,补充用户信息。

-- 定义用户维度表(存在 MySQL 中)
CREATE TABLE users (
    user_id    STRING,
    user_name  STRING,
    city       STRING,
    level      INT,
    PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://mysql:3306/db',
    'table-name' = 'users',
    'username' = 'root',
    'password' = 'password',
    'lookup.cache.max-rows' = '10000',  -- 本地缓存最多 10000 行
    'lookup.cache.ttl' = '60s'          -- 缓存 TTL 60 秒
);
 
-- Lookup Join:用订单流中的 user_id 去 MySQL 中查询用户信息
SELECT
    o.order_id,
    o.user_id,
    u.user_name,
    u.city,
    o.amount,
    o.order_time
FROM orders AS o
JOIN users FOR SYSTEM_TIME AS OF o.order_time AS u
ON o.user_id = u.user_id;
-- FOR SYSTEM_TIME AS OF o.order_time:时态 Join(Temporal Join)
-- 意味着:用 order_time 时刻的 users 表数据来做关联
-- 如果不加这个子句,也可以做普通的 Lookup Join(实时查询当前最新值)

Lookup Join 的缓存机制:每条订单都去 MySQL 查询是不可接受的(高延迟、高压力)。Flink 通过本地 LRU 缓存来减少对 MySQL 的访问,lookup.cache.max-rowslookup.cache.ttl 控制缓存大小和过期时间。

时态 Join(Temporal Join)与普通 Lookup Join 的区别

  • 普通 Lookup Join:用当前时刻 MySQL 中的最新数据关联(可能因数据更新而导致历史订单的关联结果不一致)
  • 时态 JoinFOR SYSTEM_TIME AS OF event_time):用事件发生时刻的维度数据关联——即使用户的 level 之后升级了,历史订单仍然用旧的 level 关联

5.2 双流 Interval Join

场景:订单支付流与订单下单流的关联(下单后 15 分钟内的支付才算有效)。

-- 双流 Interval Join:两条流在时间范围内做 Join
SELECT
    o.order_id,
    o.user_id,
    o.amount,
    p.pay_time,
    p.pay_method,
    p.pay_time - o.order_time AS payment_latency
FROM orders AS o
JOIN payments AS p
ON o.order_id = p.order_id
AND p.pay_time BETWEEN o.order_time AND o.order_time + INTERVAL '15' MINUTE;
-- 语义:payment 的 pay_time 必须在 order_time 之后且在 15 分钟之内

Interval Join 的状态管理:Flink 需要在状态中缓存一侧的数据,等待另一侧到来进行匹配。order_time + INTERVAL '15' MINUTE 定义了等待窗口——超过 15 分钟后,过期的订单/支付数据会从状态中清除,不会无限积累。

生产避坑:双流 Join 的状态开销

没有时间约束的双流 JOIN(如 ON o.order_id = p.order_id 不带时间条件)意味着 Flink 需要永久缓存两侧的数据等待匹配——状态无限增长,最终 OOM。双流 Join 必须带时间范围约束(Interval Join),或者使用窗口 Join(在同一窗口内 Join)。

5.3 窗口 Join(Window Join)

-- 窗口 Join:两个流在相同的时间窗口内 Join
SELECT
    o.category,
    window_start,
    window_end,
    SUM(o.amount) AS order_gmv,
    SUM(c.click_count) AS total_clicks
FROM TABLE(
    TUMBLE(TABLE orders, DESCRIPTOR(order_time), INTERVAL '5' MINUTE)
) AS o
JOIN TABLE(
    TUMBLE(TABLE page_clicks, DESCRIPTOR(click_time), INTERVAL '5' MINUTE)
) AS c
ON o.category = c.category
AND o.window_start = c.window_start  -- 同一窗口
GROUP BY o.category, window_start, window_end;

第 6 章 DataStream 与 Table 的互转

6.1 DataStream → Table

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
 
// 场景:DataStream 已有复杂的自定义处理,结果需要用 SQL 继续查询
DataStream<OrderEvent> enrichedOrders = ... // 经过自定义 Map/Filter 处理的流
 
// 方式一:直接转换(字段名从 POJO 字段名自动推断)
Table orderTable = tableEnv.fromDataStream(enrichedOrders);
 
// 方式二:指定时间属性(需要在转换时显式声明事件时间)
Table orderTable = tableEnv.fromDataStream(
    enrichedOrders,
    Schema.newBuilder()
        .columnByMetadata("rowtime", DataTypes.TIMESTAMP_LTZ(3), "timestamp", true)
        .watermark("rowtime", "rowtime - INTERVAL '5' SECONDS")
        .build()
);
 
// 之后可以用 SQL 继续查询
tableEnv.createTemporaryView("enriched_orders", orderTable);
Table result = tableEnv.sqlQuery(
    "SELECT category, SUM(amount) FROM enriched_orders GROUP BY category"
);

6.2 Table → DataStream

// 场景:SQL 处理完,需要用 DataStream API 做复杂的自定义输出
 
Table resultTable = tableEnv.sqlQuery("SELECT user_id, SUM(amount) FROM orders GROUP BY user_id");
 
// Append 模式(只有 INSERT 的结果,如窗口聚合)
DataStream<Row> appendStream = tableEnv.toDataStream(resultTable);
 
// Retract 模式(有 UPDATE/DELETE 的结果,如非窗口的 GROUP BY)
// toChangelogStream 返回 DataStream<Row>,Row 中包含 RowKind(INSERT/UPDATE/DELETE)
DataStream<Row> changelogStream = tableEnv.toChangelogStream(resultTable);
 
changelogStream.process(new ProcessFunction<Row, Void>() {
    @Override
    public void processElement(Row row, Context ctx, Collector<Void> out) {
        RowKind kind = row.getKind();
        if (kind == RowKind.INSERT || kind == RowKind.UPDATE_AFTER) {
            // 新增或更新后的值:写入下游
            String userId = (String) row.getField("user_id");
            Double totalAmount = (Double) row.getField("EXPR$1");
            // ... 处理逻辑
        }
        // UPDATE_BEFORE 和 DELETE 根据业务决定是否处理
    }
});

第 7 章 生产常见问题

7.1 GROUP BY 查询为什么结果会”撤回”

现象:执行 SELECT user_id, COUNT(*) FROM orders GROUP BY user_id,输出中不只有 INSERT,还有很多 UPDATE_BEFORE、UPDATE_AFTER 消息,下游处理比较麻烦。

原因:非窗口的聚合查询(没有时间窗口约束的 GROUP BY)会持续更新——每来一条新订单,该 user_id 的 COUNT 就增加 1,Flink 需要先撤回旧值(-U),再发出新值(+U)。这是**回撤流(Retract Stream)**的正常语义。

解决方案

  • 如果只关心最终结果(不需要中间更新),使用窗口聚合(TUMBLE/HOP),窗口关闭后才输出,不会有中间更新
  • 如果下游是数据库(如 MySQL),使用 Upsert Sink(配合 PRIMARY KEY),每次 UPDATE_AFTER 覆写旧记录即可
  • 如果下游是 Kafka,使用 Upsert Kafka Connector

7.2 SQL 作业中时区导致窗口边界错误

现象:设置了 5 分钟滚动窗口,期望窗口边界是整点(如 12:00、12:05、12:10),但实际边界是 UTC 时间(如 04:00、04:05…)。

原因:Flink SQL 的时间窗口默认对齐到 UTC 时区的零点。如果业务数据是北京时间(UTC+8),窗口边界需要用北京时间对齐。

解决

// 方式一:设置 TableEnvironment 的本地时区
tableEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));
 
// 方式二:在 SQL 中使用时区感知的时间类型
-- 使用 TIMESTAMP_LTZ(Timestamp with Local Time Zone)而不是 TIMESTAMP
CREATE TABLE orders (
    ...
    order_time TIMESTAMP_LTZ(3),  -- 时区感知的时间戳
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (...);

7.3 Lookup Join 结果不更新(维度数据已变更但 Join 结果未刷新)

现象:MySQL 中的用户等级(level)已经更新,但 Lookup Join 的结果仍然是旧的 level。

原因:Lookup Join 使用了本地 LRU 缓存(lookup.cache.ttl),缓存 TTL 未过期时使用缓存数据,不会实时查询 MySQL。

解决

  • 减小 lookup.cache.ttl(如从 60s 改为 10s),缩短数据陈旧的时间窗口
  • 如果维度数据变更需要立刻生效,考虑用 connect + BroadcastStream(DataStream API)实现动态维度更新,而不是 Lookup Join

小结

Flink Table API / SQL 的核心知识点:

动态表与 Changelog:流是持续变化的动态表,SQL 查询产生的是 Changelog 流(INSERT/UPDATE/DELETE 消息)。非窗口聚合会产生回撤流(Retract),窗口聚合只产生追加流(Append)。

DDL 建表关键配置

  • Kafka Source:WATERMARK FOR ts AS ts - INTERVAL 'N' SECOND 定义事件时间
  • Kafka Sink:有 UPDATE 语义时用 upsert-kafka + PRIMARY KEY
  • Lookup Join:配置 lookup.cache.* 减少外部系统压力

三种流式 Join

  • Lookup Join:流 Join 静态维度表,通过本地缓存加速
  • Interval Join:双流在时间范围内 Join,必须有时间约束防止状态无限增长
  • Window Join:在相同时间窗口内 Join 两条流

DataStream ↔ Table 互转

  • fromDataStream():DataStream → Table(指定时间属性需用 Schema Builder)
  • toDataStream() / toChangelogStream():Table → DataStream(有更新语义时用 Changelog)

下一篇 08 Flink 与 Kafka 端到端精确一次实战 将深入 Flink + Kafka 精确一次的完整配置,分析两阶段提交在 Kafka 事务中的实际工作方式,以及 Exactly-Once 的性能代价与取舍。

思考题

  1. Flink SQL 的”动态表”(Dynamic Table)抽象将流数据建模为一张不断变化的表,Retract 流(ChangeLog)传递的是行级别的增量变更(+I、-U、+U、-D)。当一个聚合结果需要被”撤回”(比如窗口关闭后输出,然后因迟到数据触发更新),Sink 必须支持 Upsert 或 Retract 语义。如果 Sink 是一个只支持追加写入的系统(如 Kafka 主题),如何处理包含撤回消息的 ChangeLog 流?
  2. Flink SQL 的 OVER 窗口(滑动聚合)与 GROUP BY 窗口在语义上有什么本质差异?OVER 窗口计算的是每条记录到达时的”滑动聚合值”,不等待窗口关闭就输出结果。这种实时输出特性需要维护多少历史状态?如果 OVER 窗口的时间范围是”过去 7 天”,State 的大小如何随时间变化?
  3. 在 Flink SQL 中执行 Temporal Table Join(时态表 Join)可以将实时流与一个维表的”历史版本”关联——即根据事件时间查找维表在该时刻的值,而不是维表的当前值。这在维表频繁更新的场景下非常有用。但时态表 Join 要求维表实现 LookupTableSource 接口。如果维表存储在 HBase 中,每次 Join 都需要一次网络请求,高并发下会有什么性能问题?如何通过异步 I/O 和缓存来优化?