Detect Zombie Flows
当 Prefect flow run 所在的基础设施突然异常,比如机器宕机、容器被驱逐、进程被强杀,run 可能会一直停留在 Running 状态,但实际上已经不再继续执行,
这种 flow run 通常被称为 zombie flow run。
Abstract
Prefect 官方推荐的做法不是简单地“运行太久就判定异常”,而是让 flow run 持续发送 heartbeat 事件,再通过 Automation 检测心跳是否中断。如果在预期时间内没有收到新的 heartbeat,就把该 flow run 标记为 Crashed,必要时再追加通知、重试或告警动作。
核心思路
这个方案本质上是:
- flow run 进入运行态后,周期性发出
prefect.flow-run.heartbeat - Automation 在每次 heartbeat 之后,期待在一段时间窗口内看到:
- 下一次 heartbeat
- 或者该 flow run 进入最终状态(如
Completed/Failed/Cancelled/Crashed)
- 如果时间窗口内什么都没等到,就认为这个 run 已经“失联”
- Automation 把它改成
Crashed
所以,zombie flows 检测的关键不是看“运行多久”,而是看“是否还在持续上报自己活着”。
前置条件
要启用这套机制,至少需要满足以下条件:
- Prefect 版本
>= 3.1.8 - 设置
PREFECT_FLOWS_HEARTBEAT_FREQUENCY - 这个值必须是一个 大于等于 30 的整数
- 官方文档说明该机制面向 由 deployment 触发的 flow runs
例如:
export PREFECT_FLOWS_HEARTBEAT_FREQUENCY=30
创建 Automation
下面这个示例基本就是官方方案的核心:如果某个 flow run 在 heartbeat 之后 90 秒内既没有新的 heartbeat,也没有进入最终状态,就把它标记为 Crashed。
from datetime import timedelta
from prefect.automations import Automation
from prefect.client.schemas.objects import StateType
from prefect.events.actions import ChangeFlowRunState
from prefect.events.schemas.automations import EventTrigger, Posture
from prefect.events.schemas.events import ResourceSpecification
my_automation = Automation(
name="Crash zombie flows",
trigger=EventTrigger(
after={"prefect.flow-run.heartbeat"},
expect={
"prefect.flow-run.heartbeat",
"prefect.flow-run.Completed",
"prefect.flow-run.Failed",
"prefect.flow-run.Cancelled",
"prefect.flow-run.Crashed",
},
match=ResourceSpecification(
{"prefect.resource.id": ["prefect.flow-run.*"]}
),
for_each={"prefect.resource.id"},
posture=Posture.Proactive,
threshold=1,
within=timedelta(seconds=90),
),
actions=[
ChangeFlowRunState(
state=StateType.CRASHED,
message="Flow run marked as crashed due to missing heartbeats.",
)
],
)
if __name__ == "__main__":
my_automation.create()
配置参数说明
after
表示每次收到 heartbeat 后,都开始一次后续观察。
after={"prefect.flow-run.heartbeat"}
expect
表示在观察窗口里,系统期望看到以下任意一种事件:
- 下一次 heartbeat
- flow run 正常结束
- flow run 失败结束
- flow run 被取消
- flow run 已经被标记为 crashed
只要这些事件之一出现,就说明这个 flow run 不是 zombie。
expect={
"prefect.flow-run.heartbeat",
"prefect.flow-run.Completed",
"prefect.flow-run.Failed",
"prefect.flow-run.Cancelled",
"prefect.flow-run.Crashed",
}
match 和 for_each
这两项组合起来的效果是:
- 只匹配 flow run 资源
- 对每个 flow run 单独判断
也就是说,不会把不同 run 的 heartbeat 混在一起统计。
match=ResourceSpecification({"prefect.resource.id": ["prefect.flow-run.*"]})
for_each={"prefect.resource.id"}
posture=Posture.Proactive
这是一个 主动式 检测规则。意思不是“收到异常事件再响应”,而是“如果应该出现的事件没出现,也要触发”。
僵尸检测本质上就是一种“缺失事件检测”,所以这里必须是 proactive 类型。
threshold=1
表示只要出现 1 次“不符合预期”的窗口,就触发动作。
threshold=1
within=timedelta(seconds=90)
表示从某次 heartbeat 开始计时,90 秒内如果没有等到下一次 heartbeat 或最终状态事件,就判定异常。
within=timedelta(seconds=90)
为什么是 90 秒
这个时间窗口应该和 PREFECT_FLOWS_HEARTBEAT_FREQUENCY 一起看。
例如:
- heartbeat 频率是
30s - Automation 的窗口是
90s
那就等价于:
- 连续错过约 3 次 heartbeat 后,run 会被判定为 zombie
这也是官方文档给出的典型例子。
调整触发灵敏度
你主要调两个参数:
PREFECT_FLOWS_HEARTBEAT_FREQUENCYwithin
它们的关系可以简单理解为:
- heartbeat 越频繁,越快发现异常
within越短,越敏感,但越容易误判within越长,越稳妥,但故障发现越慢
常见思路:
| 场景 | 建议 |
|---|---|
| 希望更快发现机器/容器崩溃 | 减小 within,并适度提高 heartbeat 频率 |
| 网络偶发抖动较多 | 增大 within,避免误判 |
| 运行环境较稳定、故障代价高 | 适当提高检测灵敏度 |
实用经验
within 通常可以设置成 heartbeat 周期的 2-3 倍。
实践建议
-
僵尸检测更适合“基础设施失联”,不是“业务变慢”
如果 flow 本身还在跑,只是某一步非常慢,那么 heartbeat 仍然会继续发送,此时它不应该被判为 zombie。
所以这个方案解决的是:
- 进程死了
- worker 所在节点没了
- 容器被回收了
- 运行环境异常退出
而不是:SQL 跑很久、API 很慢、task 执行时间超长等业务异常情况。
后者更适合用:超时控制、SLA / 告警、自定义监控指标等手段来解决。
-
Crashed比Failed更准确zombie 场景通常不是业务代码抛异常,而是运行基础设施出了问题。
因此把状态改成
Crashed会比改成Failed更符合语义,也更方便后续排查。 -
可以在 Automation 里追加更多动作
官方文档明确提到,你可以在检测到 zombie run 时增加其他 action,比如:
- 发送通知
- 发 Slack / 邮件告警
- 触发补救流程
- 记录审计事件
也就是说,把 flow run 改成
Crashed只是第一步,不一定是全部。