Flows
Flow 是一个被 @flow 装饰的 Python 函数,用来把一组业务逻辑组织成工作流。
from prefect import flow
@flow(log_prints=True)
def my_flow():
print("hello prefect")
核心能力
相比普通函数,Flow 主要多了这些能力:
- 自动追踪运行状态
- 支持参数校验和类型转换
- 支持失败重试
- 支持超时控制
- 支持部署后远程触发
- 可以调用 task 和其他 flow
Flow Run
每执行一次 flow,就会产生一次 flow run。Flow run 的生命周期如下:
flowchart TD
A[Scheduled] -->|⏰ Time to run| B[Pending]
B -->|🚀 Starting execution| C[Running]
C -->|✅ Success| D[Completed]
C -.->|❌ Error occurred| E[Failed]
C -.->|🛑 User intervention| F[Cancelled]
C -.->|💥 Infrastructure failure| G[Crashed]
style A fill:#ff9,stroke:#333,stroke-width:2px,color:#000
style B fill:#f9f,stroke:#333,stroke-width:2px,color:#000
style C fill:#9ff,stroke:#333,stroke-width:2px,color:#000
style D fill:#9f9,stroke:#333,stroke-width:2px,color:#000
style E fill:#f99,stroke:#333,stroke-width:2px,color:#000
style F fill:#faa,stroke:#333,stroke-width:2px,color:#000
style G fill:#f66,stroke:#333,stroke-width:2px,color:#fff
classDef default fill-opacity:0.9
Zombie Flow Run
如果一直没进入最终状态,可能变成 zombie flow run。常见的做法是 Automation + HEARTBEAT 机制来检测, 并进行清理/重试操作。
运行 Flow
最简单的方式就是直接调用:
my_flow()
其他常见方式还有:
- 外部调度器触发, 例如
cron、 Modal - Deployment 触发
- Schedule 定时触发
- Automation 事件触发
Flow 的参数
Flow 和普通函数一样可以接收参数:
@flow
def my_flow(x: int, y: str):
...
特点:
- 支持类型注解
- 支持 Pydantic
- 参数会在运行前校验
- 参数无效时,flow 会直接失败
- 通过 API 调用时,必须传 关键字参数
- 参数默认大小限制为 512 KB
实践上,大对象更适合传引用,而不是直接传内容。
Flow、Task、Subflow 设计
一个 flow 可以包含全部逻辑,但更推荐拆分。
嵌套 Flow(Subflow)
Flow 可以调用另一个 Flow,这就是 Nested Flow / Subflow。
-
适合场景
- 拆分复杂流程为可复用模块
- 增强可观测性和追踪能力
- 某段流程需单独参数化
- 希望某组任务使用不同的 task runner
-
注意点
- 子 flow 会在 UI 中独立显示
- 默认情况下子 flow 会阻塞父 flow,直到子 flow 完成
- 子 flow 无法单独脱离父 flow 被取消
- 若需独立取消/管理,建议将子 flow 单独部署
-
Flow 更适合
- 编排业务流程
- 部署
- 参数化调用
- 与 Prefect 服务端交互
-
Task 更适合
- 细粒度执行
- 并发
- 缓存
- 重试
- 事务性语义
推荐分工
- Flow:负责组织流程
- Task:负责执行具体步骤
- Subflow:负责把一组流程模块化
为什么这样拆:
- 更容易复用
- 更容易调试
- 更利于并发和重试
- 更利于观察运行情况
支持的函数类型
Prefect 的 flow 支持大多数常见 Python 函数形式,包括:
- 同步函数
- 异步函数
- 实例方法
- 类方法
- 静态方法
- 生成器
Generator 注意点
如果在 Flow 里直接 return generator,Prefect 会把它消费掉并转成列表,因为 Flow 的结果需要可序列化。
所以:
- return generator:会被消费
- yield generator:不会当作最终结果处理
from prefect import flow
def gen():
yield from [1, 2, 3]
print('Generator consumed!')
@flow
def f():
return gen()
f() # prints 'Generator consumed!'
from prefect import flow
def gen():
yield from [1, 2, 3]
print('Generator consumed!')
@flow
def f():
yield gen()
generator = next(f())
list(generator) # prints 'Generator consumed!'
最终状态
Flow 的最终状态主要由 返回值和异常 决定。
规则可以简化为:
- 直接抛异常 → Failed
- 返回手动创建的 State → 以该状态为准
- 返回多个 states / futures → 只要有失败项,整体可能失败
- 正常返回普通对象 → Completed