Tasks
Task 是被 @task 装饰的 Python 函数,表示工作流中的原子工作单元。
from prefect import task
@task(log_prints=True)
def my_task():
print("hello prefect task")
核心能力
Task 的特点可以概括为:
- 可重试
- 可缓存
- 可并发执行
- 支持超时控制
- 支持事务语义
- 自动记录运行状态和元数据
- 能处理上游 Task 的 future 依赖
Task Run
每执行一次 Task,就会产生一次 Task Run。Task Run 的生命周期如下:
flowchart TD
A[Pending] -->|🚀 Starting execution| B[Running]
A -->|📦 Cached result available| C[Completed]
B -->|✅ Success| C[Completed]
B -.->|❌ Error occurred| D[Retrying/AwaitingRetry]
B -.->|💥 Infrastructure failure| E[Crashed]
D -->|🔄 Retry attempt| B
D -.->|😵 Retries exhausted| F[Failed]
style A fill:#f9f,stroke:#333,stroke-width:2px,color:#000
style B fill:#9ff,stroke:#333,stroke-width:2px,color:#000
style C fill:#9f9,stroke:#333,stroke-width:2px,color:#000
style D fill:#ff9,stroke:#333,stroke-width:2px,color:#000
style E fill:#f66,stroke:#333,stroke-width:2px,color:#fff
style F fill:#f99,stroke:#333,stroke-width:2px,color:#000
classDef default fill-opacity:0.9
运行 Task
运行 Task 的三种方式:
-
直接调用
result = my_task()- 会阻塞
- 直接返回结果
- 适合简单、顺序执行
-
.submit()
future = my_task.submit() result = future.result()- 不阻塞,立即返回 PrefectFuture
- 适合 Flow 内并发执行
- 结果通常要在同一上下文里获取
-
.delay()
future = my_task.delay()- 不阻塞
- 适合后台任务、异步派发
- 由独立 Task Worker 执行
- 不要求在提交它的上下文中等待结果
三种调用方式对比
| 方式 | 阻塞? | 返回值 | 可在同一上下 文中获取结果? |
适用场景 |
|---|---|---|---|---|
task() |
是 | 直接结果 | N/A | 简单顺序执行 |
task.submit() |
否 | PrefectFuture |
是 | Flow 内并发执行 |
task.delay() |
否 | PrefectFuture |
否 | 在独立基础设施上后台执行 |
编排模型
客户端侧编排
Prefect 的 Task 编排主要是 客户端侧编排(client-side orchestration)。
这意味着:
- Task Run 的创建和更新主要在本地完成
- 适合大规模任务执行
- 网络暂时不稳定时更有韧性
- UI 中的状态更新可能是“最终一致”,不是每一步都实时同步
状态依赖
Task 会根据数据流自动推断依赖。
自动建立依赖关系
例如,一个 task 使用了上游 task 的结果或 future,Prefect 就会自动建立依赖关系:
- 上游完成后,下游才能开始
此外,也可以通过 wait_for 显式指定状态依赖。
参数解析
当一个 task 接收参数时,Prefect 会自动做两件事:
-
解析参数里的 future / state。 如果参数中包含:
PrefectFutureState
Prefect 会递归遍历参数,把它们解析成实际值。
-
可能会“重建对象”。 如果参数是复杂对象(例如
dataclass/namedtuple):- Prefect 在解析字段后,会用
dataclasses.replace()创建一个新对象 - 这个过程会触发
__init__()和__post_init__()方法`
- Prefect 在解析字段后,会用
危险情况
from dataclasses import dataclass
from prefect import flow, task
init_count = 0
@dataclass
class Pipeline:
base_rate: float
adjusted_rate: float = None
def __post_init__(self):
global init_count
init_count += 1
if self.adjusted_rate is None:
self.adjusted_rate = self.base_rate * 1.5
@task
def get_rate() -> float:
return 100.0
@task(log_prints=True)
def run_pipeline(pipeline: Pipeline):
print(f"init_count inside task = {init_count}")
print(f"adjusted_rate = {pipeline.adjusted_rate}")
@flow(log_prints=True)
def my_flow():
rate_future = get_rate.submit()
pipeline = Pipeline(base_rate=42.0, adjusted_rate=rate_future)
print(f"init_count before task = {init_count}")
run_pipeline(pipeline)
my_flow()
看这个例子核心逻辑:
pipeline = Pipeline(
base_rate=42.0,
adjusted_rate=rate_future # future
)
Step 1:创建对象
adjusted_rate = future
__post_init__ 执行一次
Step 2:传入 task
Prefect 会:
- 把 future → 100.0
- 因为字段变了 → 触发
dataclasses.replace() - 创建一个新对象
Step 3:对象被重建
__post_init__ 再执行一次 ❗
结果:
init_count被增加两次adjusted_rate可能被重新计算 / 覆盖
如果 dataclass, 在 __post_init__ 里:
- 修改字段
- 做计算
- 有副作用(计数、日志等)
⚠️ 那就可能被执行两次, 因此 Prefect 提供了 quote 和 opaque 来避免这个问题。
| 模式 | 是否解析 future | 是否递归遍历 | 是否重建对象 |
|---|---|---|---|
| 默认 | ✅ | ✅ | ✅ |
| quote | ❌ | ❌ | ❌ |
| opaque | ✅ | ❌ | ❌ |
Task 和 Flow 的关系
Task 通常被组织在 Flow 中,用来构成完整工作流。Flow 用于定义整体流程,Task 用于定义具体步骤。
Task 的价值在于:
- 粒度更细
- 更容易重试
- 更适合并发
- 观察和排障更清晰
- 可以在多个 flow 中复用
后台任务
后台任务(Background Tasks)是 Prefect 的另一种执行模式,核心就是:
- 一个进程提交任务
- 另一个 worker 执行任务
通常通过 .delay() 使用。
适合场景
- Web 应用里派发耗时任务,不阻塞 HTTP 响应
- 把任务分发到专门的执行资源池
- 让任务执行能力独立扩缩容