跳转至

Flows

Flow 是一个被 @flow 装饰的 Python 函数,用来把一组业务逻辑组织成工作流。

from prefect import flow

@flow(log_prints=True)
def my_flow():
    print("hello prefect")

核心能力

相比普通函数,Flow 主要多了这些能力:

  • 自动追踪运行状态
  • 支持参数校验和类型转换
  • 支持失败重试
  • 支持超时控制
  • 支持部署后远程触发
  • 可以调用 task 和其他 flow

Flow Run

每执行一次 flow,就会产生一次 flow run。Flow run 的生命周期如下:

flowchart TD
    A[Scheduled] -->|⏰ Time to run| B[Pending]
    B -->|🚀 Starting execution| C[Running]
    C -->|✅ Success| D[Completed]
    C -.->|❌ Error occurred| E[Failed]
    C -.->|🛑 User intervention| F[Cancelled]
    C -.->|💥 Infrastructure failure| G[Crashed]

    style A fill:#ff9,stroke:#333,stroke-width:2px,color:#000
    style B fill:#f9f,stroke:#333,stroke-width:2px,color:#000
    style C fill:#9ff,stroke:#333,stroke-width:2px,color:#000
    style D fill:#9f9,stroke:#333,stroke-width:2px,color:#000
    style E fill:#f99,stroke:#333,stroke-width:2px,color:#000
    style F fill:#faa,stroke:#333,stroke-width:2px,color:#000
    style G fill:#f66,stroke:#333,stroke-width:2px,color:#fff

    classDef default fill-opacity:0.9

Zombie Flow Run

如果一直没进入最终状态,可能变成 zombie flow run。常见的做法是 Automation + HEARTBEAT 机制来检测, 并进行清理/重试操作。


运行 Flow

最简单的方式就是直接调用:

my_flow()

其他常见方式还有:

  • 外部调度器触发, 例如 cronModal
  • Deployment 触发
  • Schedule 定时触发
  • Automation 事件触发

Flow 的参数

Flow 和普通函数一样可以接收参数:

@flow
def my_flow(x: int, y: str):
    ...

特点:

  • 支持类型注解
  • 支持 Pydantic
  • 参数会在运行前校验
  • 参数无效时,flow 会直接失败
  • 通过 API 调用时,必须传 关键字参数
  • 参数默认大小限制为 512 KB

实践上,大对象更适合传引用,而不是直接传内容。


Flow、Task、Subflow 设计

一个 flow 可以包含全部逻辑,但更推荐拆分。

嵌套 Flow(Subflow)

Flow 可以调用另一个 Flow,这就是 Nested Flow / Subflow。

  • 适合场景


    • 拆分复杂流程为可复用模块
    • 增强可观测性和追踪能力
    • 某段流程需单独参数化
    • 希望某组任务使用不同的 task runner
  • 注意点


    • 子 flow 会在 UI 中独立显示
    • 默认情况下子 flow 会阻塞父 flow,直到子 flow 完成
    • 子 flow 无法单独脱离父 flow 被取消
    • 若需独立取消/管理,建议将子 flow 单独部署
  • Flow 更适合


    • 编排业务流程
    • 部署
    • 参数化调用
    • 与 Prefect 服务端交互
  • Task 更适合


    • 细粒度执行
    • 并发
    • 缓存
    • 重试
    • 事务性语义

推荐分工

  • Flow:负责组织流程
  • Task:负责执行具体步骤
  • Subflow:负责把一组流程模块化

为什么这样拆:

  • 更容易复用
  • 更容易调试
  • 更利于并发和重试
  • 更利于观察运行情况

支持的函数类型

Prefect 的 flow 支持大多数常见 Python 函数形式,包括:

  • 同步函数
  • 异步函数
  • 实例方法
  • 类方法
  • 静态方法
  • 生成器

Generator 注意点

如果在 Flow 里直接 return generator,Prefect 会把它消费掉并转成列表,因为 Flow 的结果需要可序列化。

所以:

  • return generator:会被消费
  • yield generator:不会当作最终结果处理
from prefect import flow

def gen():
    yield from [1, 2, 3]
    print('Generator consumed!')

@flow
def f():
    return gen()

f()  # prints 'Generator consumed!'
from prefect import flow

def gen():
    yield from [1, 2, 3]
    print('Generator consumed!')

@flow
def f():
    yield gen()

generator = next(f())
list(generator) # prints 'Generator consumed!'

最终状态

Flow 的最终状态主要由 返回值和异常 决定。

规则可以简化为:

  • 直接抛异常 → Failed
  • 返回手动创建的 State → 以该状态为准
  • 返回多个 states / futures → 只要有失败项,整体可能失败
  • 正常返回普通对象 → Completed