跳转至

Assets

Asset = 工作流产生的数据或结果。 在 Prefect 中,Asset 用来表示:

  • 数据(文件、表、模型等)
  • 工作流输出
  • 数据之间的依赖关系(lineage)

核心概念

Asset Key

每个 Asset 都由一个唯一的 URI 标识:

  • s3://data-lake/file.csv
  • postgres://db/table

作用:

  • 唯一标识资产
  • 自动分组(按 scheme
  • 支持层级结构(路径)

Asset 的三种状态

  • Materialized:被 workflow 生成或更新
  • Referenced:被 workflow 引用但未生成
  • External:外部已存在,仅作为依赖

Asset 生命周期

Materialization

当 workflow 创建或更新数据时:

from prefect.assets import materialize

@materialize("s3://data/file.csv")
def my_task():
    ...

特点:

  • 类似 Task,但用于声明“我要产出一个数据”
  • Prefect 会记录这次“产出尝试”
  • 成功 / 失败由 Task 状态决定

Reference

当一个 asset 被作为输入使用:

  • Prefect 会自动记录依赖关系
  • 不需要手动声明(通常)

也可以手动指定:

@materialize(
    "s3://output.csv",
    asset_deps=["s3://input.csv"]
)

依赖建模

Prefect 会构建 数据依赖图,方式有两种:

  1. 自动推断(推荐)

    • 基于 Task 输入输出
    • 数据流自动建立依赖
  2. 手动声明

    from prefect.assets import materialize
    
    
    @materialize(
        "s3://warehouse/enriched-data.csv",
        asset_deps=["postgres://db/reference-tables", "s3://external/vendor-data.csv"]
    )
    def enrich_data():
        # Explicitly depends on external database and vendor data
        pass
    

    适合:

    • 外部数据源
    • 无法从代码推断的依赖

Metadata

Asset 可以附带元信息:

  • Name:人类可读的标识符,用于唯一标识该 asset
  • Description:详细文档,支持 Markdown 格式,便于提供丰富的说明与上下文
  • Owners:负责方,Prefect 用户和团队在 UI 中有特殊显示
  • URL:用于访问或查看该 asset 的网页地址

动态 metadata

可以在运行时追加:add_asset_metadata(...)

适合记录:

  • 行数
  • 处理时间
  • 数据质量指标

健康状态

基于最近一次 materialization:

状态 含义
🟢 Green 最近成功
🔴 Red 最近失败
⚪ Gray 没有产出(或仅被引用)

事件系统

Asset 会自动发事件。

事件类型

  1. Materialization 事件, 表示数据是否成功产出

    • prefect.asset.materialization.succeeded
    • prefect.asset.materialization.failed
  2. Reference 事件, 表示这个数据被使用了

    • prefect.asset.referenced

触发规则

触发场景 事件类型 说明
成功产出 prefect.asset.materialization.succeeded asset 成功 materialize
产出失败 prefect.asset.materialization.failed asset materialize 失败
任意产出/使用 (上游 asset)prefect.asset.referenced 只要被依赖就会发 referenced
缓存命中 不发事件 复用已有产物时

事件举例

  • 当一个 asset 成功 materialize,会依次发送:
    1. 对所有上游 asset 发出 referenced 事件
    2. 自身发出 materialization.succeeded
  • 如果产出失败:
    1. 对所有上游 asset 发 referenced
    2. 自身发 materialization.failed
  • 如果是缓存命中,只标记已经引用,不会发 materialization 相关事件

组织方式

Prefect UI 自动组织 assets:

按 URI 分组

s3://
postgres://
snowflake://

按路径层级

s3://data-lake/
    ├── raw/
    └── processed/

支持搜索

基于: - 名称 - 描述 - owner