Databend UDF的StageLocation支持


                                                                                                                                                <p>Stage 是 Databend 中最常用的“落地/共享”存储形态:无论是从对象存储导入数据,还是把查询结果导出给下游,几乎都要经过 Stage。借助 PR <a href="https://www.oschina.net/action/GoToLink?url=https%3A%2F%2Fgithub.com%2Fdatabendlabs%2Fdatabend%2Fpull%2F18833" target="_blank">feat: External UDF support STAGE_LOCATION param #18833</a>,外部 UDF 现在可以以 <code>STAGE_LOCATION 形式直接获取 Stage 元数据,避免手动拼接/解析路径,让自定义逻辑与对象存储无缝衔接。

设计目标与收益

  • 目标:在 SQL 中直接传入 @stage/path,解析后生成结构化 Stage 元数据,运行时注入到 UDF;STAGE_LOCATION 类型显式声明依赖,便于校验与文档化。
  • 对 Databend Query 的价值:解析 Stage literal,校验仅允许 External Stage 且需独立凭证;在物理计划中剥离 Stage 参数,只通过 databend-stage-mapping header 传递元数据,保持数据列纯净。
  • 对外部 UDF Server 的价值:装饰器 stage_refs 明确 Stage 形参,StageLocation 对象直接提供 storagerelative_pathraw_info 等字段,函数无需自写解析或处理敏感凭证。
  • 安全:拒绝内部 Stage、IAM Role、STS 等高权限路径;非法 Stage 在执行前即被拦截,降低泄露风险。

基本 Stage 使用

Stage 是 Databend 的虚拟存储位置,支持内部和外部对象存储,让数据能方便地进出 Databend。

-- 创建外部 Stage
CREATE STAGE my_external_stage
    URL = 's3://databend-doc'
    CONNECTION = (
        AWS_KEY_ID = '<您的密钥ID>',
        AWS_SECRET_KEY = '<您的密钥>'
    );

LIST @my_external_stage;

在明确文件格式时,可以直接查询 Stage 数据:

CREATE STAGE parquet_query_stage 
URL = 's3://load/parquet/' 
CONNECTION = (
    ACCESS_KEY_ID = '<your-access-key-id>' 
    SECRET_ACCESS_KEY = '<your-secret-access-key>'
);

CREATE FILE FORMAT parquet_query_format TYPE = PARQUET;

SELECT $1
FROM @parquet_query_stage
(
    FILE_FORMAT => 'parquet_query_format',
    PATTERN => '.*[.]parquet'
);

在 UDF 中使用 STAGE_LOCATION

过去需要把 Stage 路径当字符串传给 UDF,再手动解析;现在可以在 SQL 中直接声明 STAGE_LOCATION,在 Python 侧用装饰器列出 Stage 形参即可获取 StageLocation 对象。

@udf(stage_refs=["stage_loc"], input_types=["INT"], result_type="INT")
def gcd(stage: StageLocation, value):
    bucket_cfg = stage.storage      # 已解析的外部存储配置
    working_path = stage.relative_path
    ...

CREATE OR REPLACE FUNCTION gcd (
    stage_loc STAGE_LOCATION,
    a INT,
    b INT
)
RETURNS INT
LANGUAGE python
HANDLER = 'gcd'
ADDRESS = 'http://127.0.0.1:8815';

SELECT gcd(@gcd_stage/input/2024/, 21, 14);

StageLocation 字段示例:

class StageLocation:
    name: str          # SQL 传入的 @path 全量值
    stage_name: str    # Stage 名称
    stage_type: str    # 仅 External Stage 被接受
    storage: Dict[str, Any]     # 已解析的存储配置 (无 IAM Role/STS)
    relative_path: str          # 相对路径部分
    raw_info: Dict[str, Any]    # 完整元数据备查

最终用户体验

  • 单 Stage:
CREATE OR REPLACE FUNCTION gcd (
    STAGE_LOCATION stage_loc,
    INT a,
    INT b
) RETURNS INT
LANGUAGE python
HANDLER = 'gcd'
ADDRESS = 'http://127.0.0.1:8815';

SELECT gcd(@gcd_stage/input/2024/, 21, 14);

  • 多 Stage:
CREATE OR REPLACE FUNCTION process_data (
    STAGE_LOCATION input_stage,
    STAGE_LOCATION output_stage,
    INT batch_id
) RETURNS INT
LANGUAGE python
HANDLER = 'process_data'
ADDRESS = 'http://127.0.0.1:8815';

SELECT process_data(@raw_stage/2024/05/, @result_stage/2024/05/, 42);

  • 行为对照:
    • External Stage + 独立凭证:✅ 正常执行。
    • Internal/User/Legacy Stage:❌ 拒绝,提示“仅支持 External Stage”。
    • External Stage 使用 IAM Role / STS:❌ 拒绝,要求改用独立 AK/SK。
未经允许不得转载:紫竹林-程序员中文网 » Databend UDF的StageLocation支持

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址
关于我们 免责申明 意见反馈 隐私政策
程序员中文网:公益在线网站,帮助学习者快速成长!
关注微信 技术交流
推荐文章
每天精选资源文章推送
推荐文章
随时随地碎片化学习
推荐文章
发现有趣的