摘要:

Flink 作业上线投产之后,运维工作才真正开始。一套完善的监控体系是生产作业稳定运行的基础——没有可观测性,就没有快速响应故障的能力。本文构建一套完整的 Flink 生产运维知识体系:从 Prometheus + Grafana 监控体系的搭建,到关键 Metrics 的含义与告警阈值设定(吞吐、延迟、反压率、Checkpoint 成功率),再到生产中最常见的 10 类故障(OOM、反压、Checkpoint 超时、TaskManager 频繁重启、数据延迟、Watermark 不推进等)的根因分析与解决思路。每类故障都按照”现象 → 排查路径 → 根因确认 → 解决方案”的结构展开,确保运维人员能够快速定位和恢复。


Flink 内置了丰富的 Metrics 指标,分为四个层级:

  • JVM 层:GC 次数、GC 时间、堆内存使用率、线程数
  • TaskManager 层:CPU 使用率、网络 Buffer 使用率、内存各区域使用量
  • 作业层:作业运行时长、重启次数、Checkpoint 成功/失败次数
  • 算子层(最重要):每个 Subtask 的输入/输出记录数、字节数、反压状态、延迟

Flink 的 Metrics 通过 MetricsReporter 上报给外部监控系统。生产标准方案是:Flink → Prometheus → Grafana

1.2 Prometheus Reporter 配置

# flink-conf.yaml
metrics.reporters: prometheus
 
metrics.reporter.prometheus.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory
metrics.reporter.prometheus.port: 9249   # 每个 TaskManager 和 JobManager 暴露此端口
 
# 可选:PromethusPushGateway(当 TM 数量多,Prometheus scrape 困难时)
metrics.reporter.promgateway.factory.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterFactory
metrics.reporter.promgateway.host: pushgateway-host
metrics.reporter.promgateway.port: 9091
metrics.reporter.promgateway.interval: 15 SECONDS
metrics.reporter.promgateway.jobName: flink-order-pipeline

Prometheus Scrape 配置

# prometheus.yml
scrape_configs:
  - job_name: 'flink-taskmanager'
    static_configs:
      - targets:
          - 'tm-host-1:9249'
          - 'tm-host-2:9249'
          - 'tm-host-3:9249'
    # 对于 Kubernetes 部署,使用 kubernetes_sd_configs 自动发现 TM Pod
  
  - job_name: 'flink-jobmanager'
    static_configs:
      - targets: ['jm-host:9249']

1.3 核心 Grafana 面板设计

一个合格的 Flink 监控 Dashboard 必须包含以下面板组:

Panel Group 1:作业整体健康度

面板 1:作业运行状态(RUNNING/RESTARTING/FAILED)
  Metric: flink_jobmanager_job_uptime
  告警:uptime 为 0 超过 30 秒 → 作业不在运行状态

面板 2:作业重启次数(30分钟滚动窗口)
  Metric: increase(flink_jobmanager_job_numRestarts[30m])
  告警:30分钟内重启次数 > 3 → 作业不稳定,需排查

面板 3:Checkpoint 成功率
  Metric: flink_jobmanager_job_numberOfCompletedCheckpoints /
         (flink_jobmanager_job_numberOfCompletedCheckpoints + flink_jobmanager_job_numberOfFailedCheckpoints)
  告警:成功率 < 80%(最近 1 小时)→ Checkpoint 频繁失败

Panel Group 2:吞吐与延迟

面板 4:端到端 Source 消费速率(每秒记录数)
  Metric: sum(rate(flink_taskmanager_job_task_operator_numRecordsInPerSecond[1m]))
         by (job_name, operator_name)
  告警:消费速率持续低于历史基线的 50% → Source 可能停止消费

面板 5:Kafka Consumer Lag(积压量)
  Metric: flink_taskmanager_job_task_operator_KafkaSourceReader_KafkaConsumer_records_lag_max
  告警:Lag > 阈值(根据业务容忍度设定)→ 消费跟不上生产

面板 6:Source-to-Sink 端到端延迟
  Metric: flink_taskmanager_job_latency_source_id_*_operator_id_*_operator_subtask_index_*_latency_p99
  告警:P99 延迟 > SLA 阈值

Panel Group 3:反压与资源

面板 7:各算子反压率(0-1,>0.5 = High Backpressure)
  Metric: flink_taskmanager_job_task_backPressuredTimeMsPerSecond / 1000
  告警:某算子反压率持续 > 0.8 超过 5 分钟

面板 8:TaskManager JVM 堆内存使用率
  Metric: flink_taskmanager_Status_JVM_Memory_Heap_Used /
         flink_taskmanager_Status_JVM_Memory_Heap_Max
  告警:堆内存使用率 > 85%(持续 5 分钟)→ 有 OOM 风险

面板 9:GC 停顿时间(每分钟)
  Metric: rate(flink_taskmanager_Status_JVM_GarbageCollector_G1_Old_Generation_Time[1m])
  告警:Old GC 时间/分钟 > 5000ms → 频繁 Full GC

Panel Group 4:Checkpoint 详情

面板 10:最近一次 Checkpoint 完成时间
  Metric: flink_jobmanager_job_lastCheckpointDuration
  告警:> Checkpoint 间隔的 80%(例如间隔 60s,则 > 48s 告警)

面板 11:Checkpoint 状态大小(字节)
  Metric: flink_jobmanager_job_lastCheckpointSize
  趋势:如果持续增长,说明状态没有被 TTL 清理,需要介入

面板 12:Checkpoint 对齐等待时间(Alignment Duration)
  Metric: flink_taskmanager_job_task_checkpointAlignmentTime
  告警:> 30s → 严重反压导致 Barrier 对齐阻塞

第 2 章 告警规则设计

2.1 告警分级

生产告警应该分级,避免”告警疲劳”(所有告警都是 P0 导致运维人员对告警脱敏):

级别含义响应时间通知渠道
P0 紧急作业已停止或数据完全丢失5 分钟内响应电话 + 短信 + 钉钉/微信
P1 严重作业运行但存在严重异常(Checkpoint 持续失败、Lag 快速增长)30 分钟内响应钉钉/微信群
P2 警告作业运行正常但有潜在风险(内存使用率高、GC 频繁)2 小时内处理邮件
P3 通知信息性告警(作业重启一次、Checkpoint 偶发失败)下个工作日处理邮件

2.2 关键告警规则(Prometheus AlertManager)

# flink-alerts.yml
groups:
  - name: flink.critical
    rules:
      # P0:作业停止运行(uptime 为 0)
      - alert: FlinkJobDown
        expr: flink_jobmanager_job_uptime == 0
        for: 2m
        labels:
          severity: critical
        annotations:
          summary: "Flink 作业 {{ $labels.job_name }} 已停止运行"
 
      # P0:Kafka Lag 超过绝对阈值(积压超过 100 万条且持续增长)
      - alert: FlinkKafkaLagCritical
        expr: |
          flink_taskmanager_job_task_operator_KafkaSourceReader_KafkaConsumer_records_lag_max > 1000000
          AND
          deriv(flink_taskmanager_job_task_operator_KafkaSourceReader_KafkaConsumer_records_lag_max[10m]) > 0
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "Kafka Lag 持续增长且超过 100 万条,作业消费能力不足"
 
  - name: flink.warning
    rules:
      # P1:Checkpoint 成功率低
      - alert: FlinkCheckpointFailureRate
        expr: |
          (
            rate(flink_jobmanager_job_numberOfFailedCheckpoints[30m]) /
            (rate(flink_jobmanager_job_numberOfCompletedCheckpoints[30m]) +
             rate(flink_jobmanager_job_numberOfFailedCheckpoints[30m]))
          ) > 0.2
        for: 10m
        labels:
          severity: warning
        annotations:
          summary: "Flink Checkpoint 失败率超过 20%"
 
      # P1:严重反压(反压率 > 0.8 持续 5 分钟)
      - alert: FlinkHighBackpressure
        expr: flink_taskmanager_job_task_backPressuredTimeMsPerSecond / 1000 > 0.8
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "算子 {{ $labels.operator_name }} Subtask {{ $labels.subtask_index }} 持续高反压"
 
      # P2:JVM 堆内存使用率高
      - alert: FlinkHeapMemoryHigh
        expr: |
          flink_taskmanager_Status_JVM_Memory_Heap_Used /
          flink_taskmanager_Status_JVM_Memory_Heap_Max > 0.85
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "TaskManager {{ $labels.tm_id }} 堆内存使用率超过 85%"

第 3 章 常见故障排查手册

3.1 故障一:作业频繁重启(Task 失败 → 自动重试 → 再次失败)

现象:Web UI 中看到作业处于 RESTARTING 状态循环,或者 numRestarts 指标持续增长。

排查路径

步骤 1:查看失败的 Task 日志
  Web UI → Job Graph → 点击失败的 Task → 查看 Task 日志
  或:SSH 到对应 TaskManager → /flink/logs/taskmanager.log(或 Kubernetes 中 kubectl logs)

步骤 2:识别根因类型
  常见的 Task 失败原因:
  a. OutOfMemoryError: Java heap space → JVM 堆 OOM(见故障二)
  b. OutOfMemoryError: GC overhead limit exceeded → GC 压力过大
  c. IOException: Connection reset by peer → 网络问题或 TM 心跳超时
  d. KafkaException: ... → Kafka 连接失败
  e. NullPointerException / ClassCastException → 业务代码 Bug
  f. RocksDBException: ... → RocksDB 文件损坏或磁盘满

步骤 3:根据根因处理

重启策略配置

// 固定延迟重启策略(生产推荐)
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
    3,                        // 最多重启 3 次
    Time.of(30, TimeUnit.SECONDS)  // 每次重启等待 30 秒(让外部系统恢复)
));
 
// 指数退避重启策略(大规模作业推荐,防止所有 Task 同时重启冲击集群)
env.setRestartStrategy(RestartStrategies.exponentialDelayRestart(
    Time.milliseconds(1000),  // 初始延迟 1 秒
    Time.milliseconds(60000), // 最大延迟 60 秒
    2.0,                      // 退避系数(每次失败延迟翻倍)
    Time.minutes(5),          // 重置间隔(5 分钟内没有失败则重置计数)
    0.1                       // 随机扰动(防止大量 Task 同时重启)
));

3.2 故障二:TaskManager OOM(Java Heap Space)

现象:TM 日志中出现 java.lang.OutOfMemoryError: Java heap space,Task 失败重启。

排查路径

路径一:状态过大(HashMapStateBackend)
  → 查看 Checkpoint 大小:flink_jobmanager_job_lastCheckpointSize
  → 如果 Checkpoint 体积持续增长,说明状态无限积累
  → 解决:1) 为状态添加 TTL(清理过期 Key)
          2) 切换到 EmbeddedRocksDBStateBackend(状态存 off-heap)

路径二:窗口积累大量数据
  → 使用了 ProcessWindowFunction(将窗口内所有数据存状态等到窗口关闭)
  → 数据量大时堆内存耗尽
  → 解决:改用 AggregateFunction + ProcessWindowFunction 组合(增量聚合,减少堆内对象)

路径三:用户代码内存泄漏
  → 某个算子中存在不受控的集合(如全局 static Map 不断 put)
  → 排查:通过 Java Profiler(JFR/Async-Profiler)分析堆内存对象分布
  → 解决:修复代码,避免无限增长的数据结构

路径四:JVM 堆内存分配过小
  → TM 总内存 8GB,但堆内存只有 1GB(大部分被 RocksDB Managed Memory 占用)
  → 解决:调整内存比例,减少 managed.fraction,增大堆内存

临时诊断命令

# 生成 Heap Dump(需要 TM 进程还在运行)
# 先找到 TM 的 PID
jps | grep TaskManagerRunner
 
# 生成 Heap Dump(会导致短暂 STW)
jmap -dump:format=b,file=/tmp/flink-tm-heap.hprof <TM_PID>
 
# 用 Eclipse MAT / jhat 分析 Heap Dump,找出大对象

3.3 故障三:Checkpoint 持续超时失败

现象:Web UI 的 Checkpoint History 中,大量 Checkpoint 显示 FAILED,失败原因是 Checkpoint expired before completing

排查路径

路径一:反压导致 Barrier 传播阻塞
  → 查看 Checkpoint Alignment Duration(对齐等待时间)
  → 如果某算子的 Alignment Duration 很长(>30s),说明是反压导致对齐阻塞
  → 解决:
    a. 排查反压根因(见第 2 章),解决根本问题
    b. 配置 aligned-checkpoint-timeout: 30s,超时后自动切换 Unaligned Checkpoint
    c. 增大 Checkpoint 超时时间(setCheckpointTimeout)

路径二:状态持久化 IO 瓶颈
  → 查看 Checkpoint Async Duration(异步写出时间)
  → 如果某算子 Async Duration 很长,说明状态序列化/写出慢
  → 解决:
    a. 切换到 RocksDB + 增量 Checkpoint(每次只上传增量 SST)
    b. 增大 HDFS/S3 写入带宽(或改用更快的存储)
    c. 增大 Checkpoint 超时时间

路径三:JobManager GC 导致 Checkpoint 协调超时
  → 查看 JM 的 GC 日志,如果有长时间 Full GC
  → 解决:增大 JM 内存,调整 JM 的 GC 参数

路径四:单个 Task 快照同步阶段耗时长
  → 查看 Sync Duration(同步阶段耗时)
  → 正常应该 < 5ms;如果某算子 > 100ms,说明同步阶段有大量操作
  → 对于 HashMapStateBackend:检查是否有非常大的 HashMap(CoW 扫描耗时)

3.4 故障四:Watermark 不推进,窗口永不触发

现象:事件时间窗口长时间没有输出,Web UI 中看到某个 Source Subtask 的 Watermark 一直停在很早的时间点。

排查路径

路径一:某个 Kafka 分区长期无数据(空闲分区阻塞)
  → 在 Web UI 的 Source 算子中,查看各 Subtask 的当前 Watermark
  → 如果某个 Subtask 的 Watermark 远低于其他 Subtask,找到对应消费的 Kafka 分区
  → 验证该分区是否真的无数据(用 Kafka CLI 查看分区 Offset)
  → 解决:配置 withIdleness(Duration.ofMinutes(1)),让空闲 Subtask 不阻塞全局 Watermark

路径二:事件时间戳提取逻辑有问题
  → 检查 WatermarkStrategy 中 timestampAssigner 的逻辑
  → 是否所有事件的时间戳都返回了 0 或负数?
  → 用临时的 .print() 输出时间戳验证

路径三:maxOutOfOrderness 设置过大
  → 如果设置了 forBoundedOutOfOrderness(Duration.ofHours(24))
  → Watermark = maxEventTimestamp - 24h
  → 数据刚开始流入时,Watermark 会非常落后(正常,等到 24h 的数据积累后才会追上)
  → 解决:合理评估乱序时间窗口(通常 5s-5min,而不是小时级)

路径四:Source 并行度为 1,但下游并行度更高
  → Source(parallelism=1) 发出的 Watermark 只会到达对应的下游 Subtask
  → 其他下游 Subtask 没有 Watermark 输入,Watermark 为 Long.MIN_VALUE
  → 解决:调整并行度,或在 Source 后增加 rebalance() 确保 Watermark 广播

3.5 故障五:数据处理延迟持续升高(Kafka Lag 不断增大)

现象:Prometheus 监控中,records_lag_max 指标持续增长,说明 Flink 消费 Kafka 的速度已经跟不上 Kafka 的生产速度。

排查步骤

步骤 1:确认是哪个阶段的瓶颈
  a. 查看 Source 算子的 numRecordsOutPerSecond:如果 Source 输出速率很高,说明 Source 读取正常
  b. 查看后续算子的反压状态:如果某算子高反压,Source 的消费速率被限制

步骤 2:检查是否是突发流量
  → 查看 Kafka 的生产速率历史,是否有突发峰值(如大促活动)
  → 如果是临时峰值:作业积压一段时间后会追上(正常现象)
  → 如果是持续增长:需要扩容

步骤 3:扩容方案
  a. 提高并行度(需要 Kafka 分区数 ≥ 新并行度)
     flink stop --savepointPath hdfs:///flink/savepoints/ <jobId>
     # 增大并行度后 flink run -s <savepoint> -p <new_parallelism>
  b. 增大 TaskManager 数量(保持并行度不变,增加资源)
  c. 优化慢算子逻辑(如果是算子本身慢,而不是资源不足)

3.6 故障六:RocksDB 磁盘空间耗尽

现象:TM 日志中出现 RocksDBException: IO error: No space left on device,Task 失败。

排查与解决

根因一:状态无限增长(无 TTL)
  → 解决:为所有有状态算子添加 TTL
  → 短期缓解:手动触发 Savepoint → 修改代码添加 TTL → 从 Savepoint 恢复

根因二:增量 Checkpoint 的历史 SST 文件积累
  → RocksDB 增量 Checkpoint 保留的历史 SST 文件越来越多
  → 解决:确保 state.checkpoints.num-retained 不要设置过大(建议 3)
  → Flink 会自动清理过期 Checkpoint 对应的 SST 引用

根因三:RocksDB Compaction 延迟,磁盘空间放大
  → RocksDB 的 LSM 结构在 Compaction 之前,同一个 Key 的多个版本同时存在
  → 磁盘空间放大系数可能达到 2-3x
  → 解决:增大 RocksDB Compaction 并发(setMaxBackgroundJobs)
  → 短期:增大磁盘容量;长期:调优 Compaction 策略

根因四:多个 Flink 作业共享同一磁盘
  → 另一个作业的 RocksDB 或 Checkpoint 文件占满磁盘
  → 解决:为不同作业配置不同的本地数据目录(state.backend.rocksdb.localdir)

3.7 故障七:TaskManager 与 JobManager 失去心跳

现象:JM 日志中出现 Lost heartbeat from TaskManager,TM 被标记为 dead,Task 被重新调度。

常见原因

原因一:Full GC 导致 TM 进程暂停(GC STW 期间无法发送心跳)
  → 查看 TM GC 日志,检查是否有长时间 Full GC(>10s)
  → 解决:调优 JVM GC 参数,切换到 RocksDB 减少堆内对象

原因二:TM 所在机器 CPU 负载过高,心跳线程无法及时执行
  → 检查机器 CPU 使用率,是否有其他进程竞争 CPU
  → 解决:增大 Flink 心跳超时时间(akka.ask.timeout)或减少同机器的进程数

原因三:网络问题(TM 和 JM 之间的网络延迟或丢包)
  → 检查网络连通性(ping、traceroute)
  → 解决:增大心跳超时配置:
    heartbeat.timeout: 60000      # 心跳超时 60 秒(默认 50000)
    heartbeat.interval: 10000     # 心跳间隔 10 秒(默认 10000)

原因四:Kubernetes 环境中 Pod 被驱逐(Eviction)
  → 节点内存压力导致 K8s 驱逐低优先级 Pod
  → 解决:为 Flink TM Pod 设置合理的 Request/Limit,设置 PriorityClass

3.8 故障八:作业从 Savepoint 恢复失败

现象flink run -s <savepoint> 启动失败,日志中出现 Cannot map checkpoint/savepoint state for operator xxx

常见原因与解决

原因一:算子 UID 未设置或 UID 在升级后变化
  → 错误:Savepoint contains operator with UID "xxx" which is unknown to the new job
  → 解决:确保所有有状态算子设置了显式 UID,且新版本代码中 UID 与旧版本一致

原因二:并行度超过 maxParallelism
  → 错误:Error restoring key groups (> maxParallelism)
  → 解决:新版本的并行度不能超过 maxParallelism(默认 128)
  → 如需超过:必须在第一次启动时就设置足够大的 maxParallelism

原因三:状态类型不兼容(如 ValueState<Integer> 改为 ValueState<Long>)
  → 错误:Failed to deserialize state
  → 解决:使用 Flink 的类型兼容机制(AvroSerializer 的 Schema Evolution)
  → 或:先忽略该算子的状态(--allowNonRestoredState),接受该算子从头计算

原因四:Savepoint 文件损坏
  → 检查 HDFS/S3 上的 Savepoint 文件是否完整
  → 解决:使用更早的 Savepoint(如有备份)

--allowNonRestoredState(谨慎使用)

# 忽略无法映射的算子状态(让作业能启动,代价是这部分状态从头计算)
flink run -s <savepoint> --allowNonRestoredState my-job.jar
# 使用前必须评估:忽略的状态是否会导致数据不一致(如计数器归零)

现象一GROUP BY 查询结果中有重复数据。

原因:下游 Sink 未处理 Changelog 流中的 UPDATE_BEFORE(撤回消息)
     Sink 只处理了 +I(INSERT)和 +U(UPDATE_AFTER),没有处理 -U(UPDATE_BEFORE)
     导致旧值和新值同时存在于下游

解决:
  a. 使用 Upsert Sink(如 upsert-kafka、upsert 模式的 JDBC Sink),按 PRIMARY KEY 覆写
  b. 下游处理时,接收到 -D 或 -U 时删除旧记录,接收到 +I 或 +U 时写入新记录

现象二:窗口触发时间比预期晚了几分钟。

原因一:maxOutOfOrderness 设置过大(等待过久才触发窗口)
  解决:减小 maxOutOfOrderness

原因二:某个 Source 分区的 Watermark 推进很慢(影响全局 Watermark)
  解决:配置 withIdleness(),忽略空闲分区的 Watermark

原因三:auto-watermark-interval 设置过大(Watermark 推进不及时)
  解决:减小 table.exec.source.idle-timeout 或增大 Watermark 更新频率

现象三:Lookup Join 的维度数据不刷新(用的是旧数据)。

原因:lookup.cache.ttl 未过期,本地缓存的旧维度数据被返回
解决:减小 lookup.cache.ttl(如从 60s 改为 10s)
     或完全禁用缓存(lookup.cache.max-rows = 0,但会增加外部系统压力)

3.10 故障十:作业运行正常,但 Metrics 数据不更新

现象:Grafana 上的 Metrics 面板显示”No Data”或数据停止更新,但作业在 Web UI 中显示 RUNNING。

排查路径

路径一:PrometheusReporter 未正确配置
  → 检查 flink-conf.yaml 中 metrics.reporters 和 metrics.reporter.prometheus.port 配置
  → 验证:curl http://<TM_HOST>:9249/metrics 是否有返回

路径二:Prometheus scrape 失败
  → 在 Prometheus UI(http://prometheus:9090/targets)查看 Flink TM Target 的状态
  → 如果显示 DOWN,检查网络连通性和端口是否开放

路径三:TaskManager 重启导致 Metrics 标签变化(Kubernetes 场景)
  → TM Pod 重启后,Pod IP 变化,Prometheus 的 static_configs 中的旧 IP 失效
  → 解决:使用 Kubernetes Service Discovery(kubernetes_sd_configs)自动发现新 TM Pod

路径四:作业重启后 Metrics Key(Job ID)变化
  → 每次作业重启,Job ID 都会变化,原来的 Metrics Key 不再更新
  → 在 Grafana 查询中,使用 job_name(稳定)而不是 job_id(每次重启变化)过滤

第 4 章 日常运维操作手册

# 查看所有运行中的作业
flink list -r
 
# 查看所有作业(包括已完成和失败)
flink list -a
 
# 手动触发 Savepoint(不停止作业)
flink savepoint <jobId> hdfs:///flink/savepoints/
 
# 优雅停止作业(触发 Savepoint 后停止)
flink stop --savepointPath hdfs:///flink/savepoints/ <jobId>
 
# 立刻取消作业(不触发 Savepoint,状态不保留)
flink cancel <jobId>
 
# 从 Savepoint/Checkpoint 恢复
flink run -s hdfs:///flink/savepoints/savepoint-xxx -c com.example.MyJob my-job.jar
 
# 查看作业日志(仅 Standalone 模式)
flink run -p 8 ... 2>&1 | tee job.log

4.2 作业升级标准流程

1. 准备阶段:
   → 阅读新版本 Flink Release Notes,确认无破坏性变更
   → 在测试环境验证新版本代码(从 Savepoint 恢复,验证状态连续性)
   → 确认所有有状态算子的 UID 在新版本中保持不变
   → 通知下游系统,准备好数据对账(升级过程中可能有短暂数据延迟)

2. 执行阶段:
   flink stop --savepointPath hdfs:///flink/savepoints/ <oldJobId>
   # 等待 Savepoint 创建完成,记录 Savepoint 路径
   # 部署新版本 JAR
   flink run -s <savepoint_path> -p <parallelism> -c com.example.MyJobV2 new-job.jar

3. 验证阶段:
   → 确认新作业状态为 RUNNING
   → 检查 Kafka Lag 是否在正常范围(升级期间的积压是否在追赶)
   → 验证关键业务指标(如订单统计数据是否连续)
   → 观察 Checkpoint 是否成功完成(第一次 Checkpoint 后确认状态正确)

4. 收尾阶段:
   → 保留旧 Savepoint(至少 48 小时,用于紧急回滚)
   → 更新监控告警规则(如有变化)
   → 更新运维文档

小结

本文构建了 Flink 生产运维的完整知识体系:

监控体系:Flink → Prometheus Reporter → Prometheus → Grafana,四个 Panel Group 覆盖作业健康度、吞吐延迟、反压资源、Checkpoint 详情。告警分级(P0-P3)避免告警疲劳。

关键 Metrics

  • 吞吐:numRecordsInPerSecond / numRecordsOutPerSecond
  • 延迟:latency_p99(Source-to-Sink 端到端延迟)
  • 反压:backPressuredTimeMsPerSecond / 1000(>0.8 = 高反压)
  • 积压:KafkaConsumer_records_lag_max
  • Checkpoint:lastCheckpointDuration / numberOfFailedCheckpoints

10 类常见故障的排查思路

  • Task 频繁重启 → 看 Task 失败日志,识别异常类型
  • 堆 OOM → 状态无 TTL / ProcessWindowFunction / 代码内存泄漏
  • Checkpoint 超时 → 反压对齐阻塞 / IO 瓶颈 / JM GC
  • Watermark 不推进 → 空闲分区阻塞(加 withIdleness()
  • Lag 持续增长 → 找反压算子并扩容
  • 磁盘满 → 状态无 TTL / Checkpoint 历史文件积累
  • TM 心跳超时 → Full GC / CPU 过载 / 网络问题
  • Savepoint 恢复失败 → UID 未设置 / 并行度超 maxParallelism / 类型不兼容
  • SQL 结果异常 → 未处理撤回消息 / Watermark 推进慢 / 缓存 TTL
  • Metrics 不更新 → PrometheusReporter 配置 / Prometheus scrape 失败

至此,「Flink 从入门到实战」专栏全部 10 篇文章已完成创作,覆盖从入门到生产运维的全生命周期。

思考题

  1. Flink 的 numRecordsInPerSecondnumBytesInPerSecond 是衡量算子吞吐量的关键指标,但这两个指标只反映了数据处理速率,不直接反映处理延迟。在反压场景下,这两个指标会同时下降——但是,下降是从上游传导下来的,还是当前算子本身是瓶颈?如何通过 Flink UI 的反压指标和 busyTimeMsPerSecond 精确定位反压的起源算子?
  2. Flink 作业的 Checkpoint 耗时受多种因素影响:状态大小、持久化存储速度、Task 端的 Snapshot 时间。当 Checkpoint 耗时接近 checkpointTimeout 时,作业处于 Checkpoint 失败的边缘。在不增加 checkpointTimeout 的前提下,有哪些手段可以减少 Checkpoint 的端到端时间?增量 Checkpoint(Incremental Checkpoint)在什么状态后端上才有效?
  3. Flink 作业中常见的 OOM 有两类:JVM 堆内存 OOM(java.lang.OutOfMemoryError: Java heap space)和直接内存 OOM(java.lang.OutOfMemoryError: Direct buffer memory)。这两类 OOM 分别对应 Flink 内存模型中的哪个区域?在 K8s 部署场景下,Flink 进程被 OOMKilled 后,如何从容器日志和 GC 日志中区分是哪种 OOM 导致的?