跳转至

Tasks

Task 是被 @task 装饰的 Python 函数,表示工作流中的原子工作单元

from prefect import task

@task(log_prints=True)
def my_task():
    print("hello prefect task")

核心能力

Task 的特点可以概括为:

  • 可重试
  • 可缓存
  • 可并发执行
  • 支持超时控制
  • 支持事务语义
  • 自动记录运行状态和元数据
  • 能处理上游 Task 的 future 依赖

Task Run

每执行一次 Task,就会产生一次 Task Run。Task Run 的生命周期如下:

flowchart TD
    A[Pending] -->|🚀 Starting execution| B[Running]
    A -->|📦 Cached result available| C[Completed]
    B -->|✅ Success| C[Completed]
    B -.->|❌ Error occurred| D[Retrying/AwaitingRetry]
    B -.->|💥 Infrastructure failure| E[Crashed]
    D -->|🔄 Retry attempt| B
    D -.->|😵 Retries exhausted| F[Failed]

    style A fill:#f9f,stroke:#333,stroke-width:2px,color:#000
    style B fill:#9ff,stroke:#333,stroke-width:2px,color:#000
    style C fill:#9f9,stroke:#333,stroke-width:2px,color:#000
    style D fill:#ff9,stroke:#333,stroke-width:2px,color:#000
    style E fill:#f66,stroke:#333,stroke-width:2px,color:#fff
    style F fill:#f99,stroke:#333,stroke-width:2px,color:#000

    classDef default fill-opacity:0.9

运行 Task

运行 Task 的三种方式:

  • 直接调用


    result = my_task()
    
    • 会阻塞
    • 直接返回结果
    • 适合简单、顺序执行
  • .submit()


    future = my_task.submit()
    result = future.result()
    
    • 不阻塞,立即返回 PrefectFuture
    • 适合 Flow 内并发执行
    • 结果通常要在同一上下文里获取
  • .delay()


    future = my_task.delay()
    
    • 不阻塞
    • 适合后台任务、异步派发
    • 由独立 Task Worker 执行
    • 不要求在提交它的上下文中等待结果

三种调用方式对比

方式 阻塞? 返回值 可在同一上下
文中获取结果?
适用场景
task() 直接结果 N/A 简单顺序执行
task.submit() PrefectFuture Flow 内并发执行
task.delay() PrefectFuture 在独立基础设施上后台执行

编排模型

客户端侧编排

Prefect 的 Task 编排主要是 客户端侧编排(client-side orchestration)

这意味着:

  • Task Run 的创建和更新主要在本地完成
  • 适合大规模任务执行
  • 网络暂时不稳定时更有韧性
  • UI 中的状态更新可能是“最终一致”,不是每一步都实时同步

状态依赖

Task 会根据数据流自动推断依赖。

自动建立依赖关系

例如,一个 task 使用了上游 task 的结果或 future,Prefect 就会自动建立依赖关系:

  • 上游完成后,下游才能开始

此外,也可以通过 wait_for 显式指定状态依赖。

参数解析

当一个 task 接收参数时,Prefect 会自动做两件事:

  1. 解析参数里的 future / state。 如果参数中包含:

    • PrefectFuture
    • State

    Prefect 会递归遍历参数,把它们解析成实际值

  2. 可能会“重建对象”。 如果参数是复杂对象(例如 dataclass / namedtuple):

    • Prefect 在解析字段后,会用 dataclasses.replace() 创建一个新对象
    • 这个过程会触发 __init__()__post_init__() 方法`
危险情况
from dataclasses import dataclass
from prefect import flow, task

init_count = 0

@dataclass
class Pipeline:
    base_rate: float
    adjusted_rate: float = None

    def __post_init__(self):
        global init_count
        init_count += 1
        if self.adjusted_rate is None:
            self.adjusted_rate = self.base_rate * 1.5

@task
def get_rate() -> float:
    return 100.0

@task(log_prints=True)
def run_pipeline(pipeline: Pipeline):
    print(f"init_count inside task = {init_count}")
    print(f"adjusted_rate = {pipeline.adjusted_rate}")

@flow(log_prints=True)
def my_flow():
    rate_future = get_rate.submit()
    pipeline = Pipeline(base_rate=42.0, adjusted_rate=rate_future)
    print(f"init_count before task = {init_count}")
    run_pipeline(pipeline)

my_flow()

看这个例子核心逻辑:

pipeline = Pipeline(
    base_rate=42.0,
    adjusted_rate=rate_future  # future
)

Step 1:创建对象

adjusted_rate = future
__post_init__ 执行一次

Step 2:传入 task

Prefect 会:

  1. 把 future → 100.0
  2. 因为字段变了 → 触发 dataclasses.replace()
  3. 创建一个新对象

Step 3:对象被重建

__post_init__ 再执行一次 

结果:

  • init_count 被增加两次
  • adjusted_rate 可能被重新计算 / 覆盖

如果 dataclass, 在 __post_init__ 里:

  • 修改字段
  • 做计算
  • 有副作用(计数、日志等)

⚠️ 那就可能被执行两次, 因此 Prefect 提供了 quoteopaque 来避免这个问题。

模式 是否解析 future 是否递归遍历 是否重建对象
默认
quote
opaque

Task 和 Flow 的关系

Task 通常被组织在 Flow 中,用来构成完整工作流。Flow 用于定义整体流程,Task 用于定义具体步骤。

Task 的价值在于:

  • 粒度更细
  • 更容易重试
  • 更适合并发
  • 观察和排障更清晰
  • 可以在多个 flow 中复用

后台任务

后台任务(Background Tasks)是 Prefect 的另一种执行模式,核心就是:

  • 一个进程提交任务
  • 另一个 worker 执行任务

通常通过 .delay() 使用。

适合场景

  • Web 应用里派发耗时任务,不阻塞 HTTP 响应
  • 把任务分发到专门的执行资源池
  • 让任务执行能力独立扩缩容