摘要:
Flink 作业上线投产之后,运维工作才真正开始。一套完善的监控体系是生产作业稳定运行的基础——没有可观测性,就没有快速响应故障的能力。本文构建一套完整的 Flink 生产运维知识体系:从 Prometheus + Grafana 监控体系的搭建,到关键 Metrics 的含义与告警阈值设定(吞吐、延迟、反压率、Checkpoint 成功率),再到生产中最常见的 10 类故障(OOM、反压、Checkpoint 超时、TaskManager 频繁重启、数据延迟、Watermark 不推进等)的根因分析与解决思路。每类故障都按照”现象 → 排查路径 → 根因确认 → 解决方案”的结构展开,确保运维人员能够快速定位和恢复。
第 1 章 Flink 监控体系搭建
1.1 Flink Metrics 体系概述
Flink 内置了丰富的 Metrics 指标,分为四个层级:
- JVM 层:GC 次数、GC 时间、堆内存使用率、线程数
- TaskManager 层:CPU 使用率、网络 Buffer 使用率、内存各区域使用量
- 作业层:作业运行时长、重启次数、Checkpoint 成功/失败次数
- 算子层(最重要):每个 Subtask 的输入/输出记录数、字节数、反压状态、延迟
Flink 的 Metrics 通过 MetricsReporter 上报给外部监控系统。生产标准方案是:Flink → Prometheus → Grafana。
1.2 Prometheus Reporter 配置
# flink-conf.yaml
metrics.reporters: prometheus
metrics.reporter.prometheus.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory
metrics.reporter.prometheus.port: 9249 # 每个 TaskManager 和 JobManager 暴露此端口
# 可选:PromethusPushGateway(当 TM 数量多,Prometheus scrape 困难时)
metrics.reporter.promgateway.factory.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterFactory
metrics.reporter.promgateway.host: pushgateway-host
metrics.reporter.promgateway.port: 9091
metrics.reporter.promgateway.interval: 15 SECONDS
metrics.reporter.promgateway.jobName: flink-order-pipelinePrometheus Scrape 配置:
# prometheus.yml
scrape_configs:
- job_name: 'flink-taskmanager'
static_configs:
- targets:
- 'tm-host-1:9249'
- 'tm-host-2:9249'
- 'tm-host-3:9249'
# 对于 Kubernetes 部署,使用 kubernetes_sd_configs 自动发现 TM Pod
- job_name: 'flink-jobmanager'
static_configs:
- targets: ['jm-host:9249']1.3 核心 Grafana 面板设计
一个合格的 Flink 监控 Dashboard 必须包含以下面板组:
Panel Group 1:作业整体健康度
面板 1:作业运行状态(RUNNING/RESTARTING/FAILED)
Metric: flink_jobmanager_job_uptime
告警:uptime 为 0 超过 30 秒 → 作业不在运行状态
面板 2:作业重启次数(30分钟滚动窗口)
Metric: increase(flink_jobmanager_job_numRestarts[30m])
告警:30分钟内重启次数 > 3 → 作业不稳定,需排查
面板 3:Checkpoint 成功率
Metric: flink_jobmanager_job_numberOfCompletedCheckpoints /
(flink_jobmanager_job_numberOfCompletedCheckpoints + flink_jobmanager_job_numberOfFailedCheckpoints)
告警:成功率 < 80%(最近 1 小时)→ Checkpoint 频繁失败
Panel Group 2:吞吐与延迟
面板 4:端到端 Source 消费速率(每秒记录数)
Metric: sum(rate(flink_taskmanager_job_task_operator_numRecordsInPerSecond[1m]))
by (job_name, operator_name)
告警:消费速率持续低于历史基线的 50% → Source 可能停止消费
面板 5:Kafka Consumer Lag(积压量)
Metric: flink_taskmanager_job_task_operator_KafkaSourceReader_KafkaConsumer_records_lag_max
告警:Lag > 阈值(根据业务容忍度设定)→ 消费跟不上生产
面板 6:Source-to-Sink 端到端延迟
Metric: flink_taskmanager_job_latency_source_id_*_operator_id_*_operator_subtask_index_*_latency_p99
告警:P99 延迟 > SLA 阈值
Panel Group 3:反压与资源
面板 7:各算子反压率(0-1,>0.5 = High Backpressure)
Metric: flink_taskmanager_job_task_backPressuredTimeMsPerSecond / 1000
告警:某算子反压率持续 > 0.8 超过 5 分钟
面板 8:TaskManager JVM 堆内存使用率
Metric: flink_taskmanager_Status_JVM_Memory_Heap_Used /
flink_taskmanager_Status_JVM_Memory_Heap_Max
告警:堆内存使用率 > 85%(持续 5 分钟)→ 有 OOM 风险
面板 9:GC 停顿时间(每分钟)
Metric: rate(flink_taskmanager_Status_JVM_GarbageCollector_G1_Old_Generation_Time[1m])
告警:Old GC 时间/分钟 > 5000ms → 频繁 Full GC
Panel Group 4:Checkpoint 详情
面板 10:最近一次 Checkpoint 完成时间
Metric: flink_jobmanager_job_lastCheckpointDuration
告警:> Checkpoint 间隔的 80%(例如间隔 60s,则 > 48s 告警)
面板 11:Checkpoint 状态大小(字节)
Metric: flink_jobmanager_job_lastCheckpointSize
趋势:如果持续增长,说明状态没有被 TTL 清理,需要介入
面板 12:Checkpoint 对齐等待时间(Alignment Duration)
Metric: flink_taskmanager_job_task_checkpointAlignmentTime
告警:> 30s → 严重反压导致 Barrier 对齐阻塞
第 2 章 告警规则设计
2.1 告警分级
生产告警应该分级,避免”告警疲劳”(所有告警都是 P0 导致运维人员对告警脱敏):
| 级别 | 含义 | 响应时间 | 通知渠道 |
|---|---|---|---|
| P0 紧急 | 作业已停止或数据完全丢失 | 5 分钟内响应 | 电话 + 短信 + 钉钉/微信 |
| P1 严重 | 作业运行但存在严重异常(Checkpoint 持续失败、Lag 快速增长) | 30 分钟内响应 | 钉钉/微信群 |
| P2 警告 | 作业运行正常但有潜在风险(内存使用率高、GC 频繁) | 2 小时内处理 | 邮件 |
| P3 通知 | 信息性告警(作业重启一次、Checkpoint 偶发失败) | 下个工作日处理 | 邮件 |
2.2 关键告警规则(Prometheus AlertManager)
# flink-alerts.yml
groups:
- name: flink.critical
rules:
# P0:作业停止运行(uptime 为 0)
- alert: FlinkJobDown
expr: flink_jobmanager_job_uptime == 0
for: 2m
labels:
severity: critical
annotations:
summary: "Flink 作业 {{ $labels.job_name }} 已停止运行"
# P0:Kafka Lag 超过绝对阈值(积压超过 100 万条且持续增长)
- alert: FlinkKafkaLagCritical
expr: |
flink_taskmanager_job_task_operator_KafkaSourceReader_KafkaConsumer_records_lag_max > 1000000
AND
deriv(flink_taskmanager_job_task_operator_KafkaSourceReader_KafkaConsumer_records_lag_max[10m]) > 0
for: 5m
labels:
severity: critical
annotations:
summary: "Kafka Lag 持续增长且超过 100 万条,作业消费能力不足"
- name: flink.warning
rules:
# P1:Checkpoint 成功率低
- alert: FlinkCheckpointFailureRate
expr: |
(
rate(flink_jobmanager_job_numberOfFailedCheckpoints[30m]) /
(rate(flink_jobmanager_job_numberOfCompletedCheckpoints[30m]) +
rate(flink_jobmanager_job_numberOfFailedCheckpoints[30m]))
) > 0.2
for: 10m
labels:
severity: warning
annotations:
summary: "Flink Checkpoint 失败率超过 20%"
# P1:严重反压(反压率 > 0.8 持续 5 分钟)
- alert: FlinkHighBackpressure
expr: flink_taskmanager_job_task_backPressuredTimeMsPerSecond / 1000 > 0.8
for: 5m
labels:
severity: warning
annotations:
summary: "算子 {{ $labels.operator_name }} Subtask {{ $labels.subtask_index }} 持续高反压"
# P2:JVM 堆内存使用率高
- alert: FlinkHeapMemoryHigh
expr: |
flink_taskmanager_Status_JVM_Memory_Heap_Used /
flink_taskmanager_Status_JVM_Memory_Heap_Max > 0.85
for: 5m
labels:
severity: warning
annotations:
summary: "TaskManager {{ $labels.tm_id }} 堆内存使用率超过 85%"第 3 章 常见故障排查手册
3.1 故障一:作业频繁重启(Task 失败 → 自动重试 → 再次失败)
现象:Web UI 中看到作业处于 RESTARTING 状态循环,或者 numRestarts 指标持续增长。
排查路径:
步骤 1:查看失败的 Task 日志
Web UI → Job Graph → 点击失败的 Task → 查看 Task 日志
或:SSH 到对应 TaskManager → /flink/logs/taskmanager.log(或 Kubernetes 中 kubectl logs)
步骤 2:识别根因类型
常见的 Task 失败原因:
a. OutOfMemoryError: Java heap space → JVM 堆 OOM(见故障二)
b. OutOfMemoryError: GC overhead limit exceeded → GC 压力过大
c. IOException: Connection reset by peer → 网络问题或 TM 心跳超时
d. KafkaException: ... → Kafka 连接失败
e. NullPointerException / ClassCastException → 业务代码 Bug
f. RocksDBException: ... → RocksDB 文件损坏或磁盘满
步骤 3:根据根因处理
重启策略配置:
// 固定延迟重启策略(生产推荐)
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // 最多重启 3 次
Time.of(30, TimeUnit.SECONDS) // 每次重启等待 30 秒(让外部系统恢复)
));
// 指数退避重启策略(大规模作业推荐,防止所有 Task 同时重启冲击集群)
env.setRestartStrategy(RestartStrategies.exponentialDelayRestart(
Time.milliseconds(1000), // 初始延迟 1 秒
Time.milliseconds(60000), // 最大延迟 60 秒
2.0, // 退避系数(每次失败延迟翻倍)
Time.minutes(5), // 重置间隔(5 分钟内没有失败则重置计数)
0.1 // 随机扰动(防止大量 Task 同时重启)
));3.2 故障二:TaskManager OOM(Java Heap Space)
现象:TM 日志中出现 java.lang.OutOfMemoryError: Java heap space,Task 失败重启。
排查路径:
路径一:状态过大(HashMapStateBackend)
→ 查看 Checkpoint 大小:flink_jobmanager_job_lastCheckpointSize
→ 如果 Checkpoint 体积持续增长,说明状态无限积累
→ 解决:1) 为状态添加 TTL(清理过期 Key)
2) 切换到 EmbeddedRocksDBStateBackend(状态存 off-heap)
路径二:窗口积累大量数据
→ 使用了 ProcessWindowFunction(将窗口内所有数据存状态等到窗口关闭)
→ 数据量大时堆内存耗尽
→ 解决:改用 AggregateFunction + ProcessWindowFunction 组合(增量聚合,减少堆内对象)
路径三:用户代码内存泄漏
→ 某个算子中存在不受控的集合(如全局 static Map 不断 put)
→ 排查:通过 Java Profiler(JFR/Async-Profiler)分析堆内存对象分布
→ 解决:修复代码,避免无限增长的数据结构
路径四:JVM 堆内存分配过小
→ TM 总内存 8GB,但堆内存只有 1GB(大部分被 RocksDB Managed Memory 占用)
→ 解决:调整内存比例,减少 managed.fraction,增大堆内存
临时诊断命令:
# 生成 Heap Dump(需要 TM 进程还在运行)
# 先找到 TM 的 PID
jps | grep TaskManagerRunner
# 生成 Heap Dump(会导致短暂 STW)
jmap -dump:format=b,file=/tmp/flink-tm-heap.hprof <TM_PID>
# 用 Eclipse MAT / jhat 分析 Heap Dump,找出大对象3.3 故障三:Checkpoint 持续超时失败
现象:Web UI 的 Checkpoint History 中,大量 Checkpoint 显示 FAILED,失败原因是 Checkpoint expired before completing。
排查路径:
路径一:反压导致 Barrier 传播阻塞
→ 查看 Checkpoint Alignment Duration(对齐等待时间)
→ 如果某算子的 Alignment Duration 很长(>30s),说明是反压导致对齐阻塞
→ 解决:
a. 排查反压根因(见第 2 章),解决根本问题
b. 配置 aligned-checkpoint-timeout: 30s,超时后自动切换 Unaligned Checkpoint
c. 增大 Checkpoint 超时时间(setCheckpointTimeout)
路径二:状态持久化 IO 瓶颈
→ 查看 Checkpoint Async Duration(异步写出时间)
→ 如果某算子 Async Duration 很长,说明状态序列化/写出慢
→ 解决:
a. 切换到 RocksDB + 增量 Checkpoint(每次只上传增量 SST)
b. 增大 HDFS/S3 写入带宽(或改用更快的存储)
c. 增大 Checkpoint 超时时间
路径三:JobManager GC 导致 Checkpoint 协调超时
→ 查看 JM 的 GC 日志,如果有长时间 Full GC
→ 解决:增大 JM 内存,调整 JM 的 GC 参数
路径四:单个 Task 快照同步阶段耗时长
→ 查看 Sync Duration(同步阶段耗时)
→ 正常应该 < 5ms;如果某算子 > 100ms,说明同步阶段有大量操作
→ 对于 HashMapStateBackend:检查是否有非常大的 HashMap(CoW 扫描耗时)
3.4 故障四:Watermark 不推进,窗口永不触发
现象:事件时间窗口长时间没有输出,Web UI 中看到某个 Source Subtask 的 Watermark 一直停在很早的时间点。
排查路径:
路径一:某个 Kafka 分区长期无数据(空闲分区阻塞)
→ 在 Web UI 的 Source 算子中,查看各 Subtask 的当前 Watermark
→ 如果某个 Subtask 的 Watermark 远低于其他 Subtask,找到对应消费的 Kafka 分区
→ 验证该分区是否真的无数据(用 Kafka CLI 查看分区 Offset)
→ 解决:配置 withIdleness(Duration.ofMinutes(1)),让空闲 Subtask 不阻塞全局 Watermark
路径二:事件时间戳提取逻辑有问题
→ 检查 WatermarkStrategy 中 timestampAssigner 的逻辑
→ 是否所有事件的时间戳都返回了 0 或负数?
→ 用临时的 .print() 输出时间戳验证
路径三:maxOutOfOrderness 设置过大
→ 如果设置了 forBoundedOutOfOrderness(Duration.ofHours(24))
→ Watermark = maxEventTimestamp - 24h
→ 数据刚开始流入时,Watermark 会非常落后(正常,等到 24h 的数据积累后才会追上)
→ 解决:合理评估乱序时间窗口(通常 5s-5min,而不是小时级)
路径四:Source 并行度为 1,但下游并行度更高
→ Source(parallelism=1) 发出的 Watermark 只会到达对应的下游 Subtask
→ 其他下游 Subtask 没有 Watermark 输入,Watermark 为 Long.MIN_VALUE
→ 解决:调整并行度,或在 Source 后增加 rebalance() 确保 Watermark 广播
3.5 故障五:数据处理延迟持续升高(Kafka Lag 不断增大)
现象:Prometheus 监控中,records_lag_max 指标持续增长,说明 Flink 消费 Kafka 的速度已经跟不上 Kafka 的生产速度。
排查步骤:
步骤 1:确认是哪个阶段的瓶颈
a. 查看 Source 算子的 numRecordsOutPerSecond:如果 Source 输出速率很高,说明 Source 读取正常
b. 查看后续算子的反压状态:如果某算子高反压,Source 的消费速率被限制
步骤 2:检查是否是突发流量
→ 查看 Kafka 的生产速率历史,是否有突发峰值(如大促活动)
→ 如果是临时峰值:作业积压一段时间后会追上(正常现象)
→ 如果是持续增长:需要扩容
步骤 3:扩容方案
a. 提高并行度(需要 Kafka 分区数 ≥ 新并行度)
flink stop --savepointPath hdfs:///flink/savepoints/ <jobId>
# 增大并行度后 flink run -s <savepoint> -p <new_parallelism>
b. 增大 TaskManager 数量(保持并行度不变,增加资源)
c. 优化慢算子逻辑(如果是算子本身慢,而不是资源不足)
3.6 故障六:RocksDB 磁盘空间耗尽
现象:TM 日志中出现 RocksDBException: IO error: No space left on device,Task 失败。
排查与解决:
根因一:状态无限增长(无 TTL)
→ 解决:为所有有状态算子添加 TTL
→ 短期缓解:手动触发 Savepoint → 修改代码添加 TTL → 从 Savepoint 恢复
根因二:增量 Checkpoint 的历史 SST 文件积累
→ RocksDB 增量 Checkpoint 保留的历史 SST 文件越来越多
→ 解决:确保 state.checkpoints.num-retained 不要设置过大(建议 3)
→ Flink 会自动清理过期 Checkpoint 对应的 SST 引用
根因三:RocksDB Compaction 延迟,磁盘空间放大
→ RocksDB 的 LSM 结构在 Compaction 之前,同一个 Key 的多个版本同时存在
→ 磁盘空间放大系数可能达到 2-3x
→ 解决:增大 RocksDB Compaction 并发(setMaxBackgroundJobs)
→ 短期:增大磁盘容量;长期:调优 Compaction 策略
根因四:多个 Flink 作业共享同一磁盘
→ 另一个作业的 RocksDB 或 Checkpoint 文件占满磁盘
→ 解决:为不同作业配置不同的本地数据目录(state.backend.rocksdb.localdir)
3.7 故障七:TaskManager 与 JobManager 失去心跳
现象:JM 日志中出现 Lost heartbeat from TaskManager,TM 被标记为 dead,Task 被重新调度。
常见原因:
原因一:Full GC 导致 TM 进程暂停(GC STW 期间无法发送心跳)
→ 查看 TM GC 日志,检查是否有长时间 Full GC(>10s)
→ 解决:调优 JVM GC 参数,切换到 RocksDB 减少堆内对象
原因二:TM 所在机器 CPU 负载过高,心跳线程无法及时执行
→ 检查机器 CPU 使用率,是否有其他进程竞争 CPU
→ 解决:增大 Flink 心跳超时时间(akka.ask.timeout)或减少同机器的进程数
原因三:网络问题(TM 和 JM 之间的网络延迟或丢包)
→ 检查网络连通性(ping、traceroute)
→ 解决:增大心跳超时配置:
heartbeat.timeout: 60000 # 心跳超时 60 秒(默认 50000)
heartbeat.interval: 10000 # 心跳间隔 10 秒(默认 10000)
原因四:Kubernetes 环境中 Pod 被驱逐(Eviction)
→ 节点内存压力导致 K8s 驱逐低优先级 Pod
→ 解决:为 Flink TM Pod 设置合理的 Request/Limit,设置 PriorityClass
3.8 故障八:作业从 Savepoint 恢复失败
现象:flink run -s <savepoint> 启动失败,日志中出现 Cannot map checkpoint/savepoint state for operator xxx。
常见原因与解决:
原因一:算子 UID 未设置或 UID 在升级后变化
→ 错误:Savepoint contains operator with UID "xxx" which is unknown to the new job
→ 解决:确保所有有状态算子设置了显式 UID,且新版本代码中 UID 与旧版本一致
原因二:并行度超过 maxParallelism
→ 错误:Error restoring key groups (> maxParallelism)
→ 解决:新版本的并行度不能超过 maxParallelism(默认 128)
→ 如需超过:必须在第一次启动时就设置足够大的 maxParallelism
原因三:状态类型不兼容(如 ValueState<Integer> 改为 ValueState<Long>)
→ 错误:Failed to deserialize state
→ 解决:使用 Flink 的类型兼容机制(AvroSerializer 的 Schema Evolution)
→ 或:先忽略该算子的状态(--allowNonRestoredState),接受该算子从头计算
原因四:Savepoint 文件损坏
→ 检查 HDFS/S3 上的 Savepoint 文件是否完整
→ 解决:使用更早的 Savepoint(如有备份)
--allowNonRestoredState(谨慎使用):
# 忽略无法映射的算子状态(让作业能启动,代价是这部分状态从头计算)
flink run -s <savepoint> --allowNonRestoredState my-job.jar
# 使用前必须评估:忽略的状态是否会导致数据不一致(如计数器归零)3.9 故障九:Flink SQL 查询结果不符合预期
现象一:GROUP BY 查询结果中有重复数据。
原因:下游 Sink 未处理 Changelog 流中的 UPDATE_BEFORE(撤回消息)
Sink 只处理了 +I(INSERT)和 +U(UPDATE_AFTER),没有处理 -U(UPDATE_BEFORE)
导致旧值和新值同时存在于下游
解决:
a. 使用 Upsert Sink(如 upsert-kafka、upsert 模式的 JDBC Sink),按 PRIMARY KEY 覆写
b. 下游处理时,接收到 -D 或 -U 时删除旧记录,接收到 +I 或 +U 时写入新记录
现象二:窗口触发时间比预期晚了几分钟。
原因一:maxOutOfOrderness 设置过大(等待过久才触发窗口)
解决:减小 maxOutOfOrderness
原因二:某个 Source 分区的 Watermark 推进很慢(影响全局 Watermark)
解决:配置 withIdleness(),忽略空闲分区的 Watermark
原因三:auto-watermark-interval 设置过大(Watermark 推进不及时)
解决:减小 table.exec.source.idle-timeout 或增大 Watermark 更新频率
现象三:Lookup Join 的维度数据不刷新(用的是旧数据)。
原因:lookup.cache.ttl 未过期,本地缓存的旧维度数据被返回
解决:减小 lookup.cache.ttl(如从 60s 改为 10s)
或完全禁用缓存(lookup.cache.max-rows = 0,但会增加外部系统压力)
3.10 故障十:作业运行正常,但 Metrics 数据不更新
现象:Grafana 上的 Metrics 面板显示”No Data”或数据停止更新,但作业在 Web UI 中显示 RUNNING。
排查路径:
路径一:PrometheusReporter 未正确配置
→ 检查 flink-conf.yaml 中 metrics.reporters 和 metrics.reporter.prometheus.port 配置
→ 验证:curl http://<TM_HOST>:9249/metrics 是否有返回
路径二:Prometheus scrape 失败
→ 在 Prometheus UI(http://prometheus:9090/targets)查看 Flink TM Target 的状态
→ 如果显示 DOWN,检查网络连通性和端口是否开放
路径三:TaskManager 重启导致 Metrics 标签变化(Kubernetes 场景)
→ TM Pod 重启后,Pod IP 变化,Prometheus 的 static_configs 中的旧 IP 失效
→ 解决:使用 Kubernetes Service Discovery(kubernetes_sd_configs)自动发现新 TM Pod
路径四:作业重启后 Metrics Key(Job ID)变化
→ 每次作业重启,Job ID 都会变化,原来的 Metrics Key 不再更新
→ 在 Grafana 查询中,使用 job_name(稳定)而不是 job_id(每次重启变化)过滤
第 4 章 日常运维操作手册
4.1 常用 Flink CLI 命令速查
# 查看所有运行中的作业
flink list -r
# 查看所有作业(包括已完成和失败)
flink list -a
# 手动触发 Savepoint(不停止作业)
flink savepoint <jobId> hdfs:///flink/savepoints/
# 优雅停止作业(触发 Savepoint 后停止)
flink stop --savepointPath hdfs:///flink/savepoints/ <jobId>
# 立刻取消作业(不触发 Savepoint,状态不保留)
flink cancel <jobId>
# 从 Savepoint/Checkpoint 恢复
flink run -s hdfs:///flink/savepoints/savepoint-xxx -c com.example.MyJob my-job.jar
# 查看作业日志(仅 Standalone 模式)
flink run -p 8 ... 2>&1 | tee job.log4.2 作业升级标准流程
1. 准备阶段:
→ 阅读新版本 Flink Release Notes,确认无破坏性变更
→ 在测试环境验证新版本代码(从 Savepoint 恢复,验证状态连续性)
→ 确认所有有状态算子的 UID 在新版本中保持不变
→ 通知下游系统,准备好数据对账(升级过程中可能有短暂数据延迟)
2. 执行阶段:
flink stop --savepointPath hdfs:///flink/savepoints/ <oldJobId>
# 等待 Savepoint 创建完成,记录 Savepoint 路径
# 部署新版本 JAR
flink run -s <savepoint_path> -p <parallelism> -c com.example.MyJobV2 new-job.jar
3. 验证阶段:
→ 确认新作业状态为 RUNNING
→ 检查 Kafka Lag 是否在正常范围(升级期间的积压是否在追赶)
→ 验证关键业务指标(如订单统计数据是否连续)
→ 观察 Checkpoint 是否成功完成(第一次 Checkpoint 后确认状态正确)
4. 收尾阶段:
→ 保留旧 Savepoint(至少 48 小时,用于紧急回滚)
→ 更新监控告警规则(如有变化)
→ 更新运维文档
小结
本文构建了 Flink 生产运维的完整知识体系:
监控体系:Flink → Prometheus Reporter → Prometheus → Grafana,四个 Panel Group 覆盖作业健康度、吞吐延迟、反压资源、Checkpoint 详情。告警分级(P0-P3)避免告警疲劳。
关键 Metrics:
- 吞吐:
numRecordsInPerSecond/numRecordsOutPerSecond - 延迟:
latency_p99(Source-to-Sink 端到端延迟) - 反压:
backPressuredTimeMsPerSecond / 1000(>0.8 = 高反压) - 积压:
KafkaConsumer_records_lag_max - Checkpoint:
lastCheckpointDuration/numberOfFailedCheckpoints
10 类常见故障的排查思路:
- Task 频繁重启 → 看 Task 失败日志,识别异常类型
- 堆 OOM → 状态无 TTL / ProcessWindowFunction / 代码内存泄漏
- Checkpoint 超时 → 反压对齐阻塞 / IO 瓶颈 / JM GC
- Watermark 不推进 → 空闲分区阻塞(加
withIdleness()) - Lag 持续增长 → 找反压算子并扩容
- 磁盘满 → 状态无 TTL / Checkpoint 历史文件积累
- TM 心跳超时 → Full GC / CPU 过载 / 网络问题
- Savepoint 恢复失败 → UID 未设置 / 并行度超 maxParallelism / 类型不兼容
- SQL 结果异常 → 未处理撤回消息 / Watermark 推进慢 / 缓存 TTL
- Metrics 不更新 → PrometheusReporter 配置 / Prometheus scrape 失败
至此,「Flink 从入门到实战」专栏全部 10 篇文章已完成创作,覆盖从入门到生产运维的全生命周期。
思考题
- Flink 的
numRecordsInPerSecond和numBytesInPerSecond是衡量算子吞吐量的关键指标,但这两个指标只反映了数据处理速率,不直接反映处理延迟。在反压场景下,这两个指标会同时下降——但是,下降是从上游传导下来的,还是当前算子本身是瓶颈?如何通过 Flink UI 的反压指标和busyTimeMsPerSecond精确定位反压的起源算子?- Flink 作业的 Checkpoint 耗时受多种因素影响:状态大小、持久化存储速度、Task 端的 Snapshot 时间。当 Checkpoint 耗时接近
checkpointTimeout时,作业处于 Checkpoint 失败的边缘。在不增加checkpointTimeout的前提下,有哪些手段可以减少 Checkpoint 的端到端时间?增量 Checkpoint(Incremental Checkpoint)在什么状态后端上才有效?- Flink 作业中常见的 OOM 有两类:JVM 堆内存 OOM(
java.lang.OutOfMemoryError: Java heap space)和直接内存 OOM(java.lang.OutOfMemoryError: Direct buffer memory)。这两类 OOM 分别对应 Flink 内存模型中的哪个区域?在 K8s 部署场景下,Flink 进程被OOMKilled后,如何从容器日志和 GC 日志中区分是哪种 OOM 导致的?