我正在尝试创建一个需要在多个任务之间传递数据的Flyte工作流程。我查看了文档中的示例之一,但尝试尽可能少地重新创建 blob 传递,但我仍然无法让它工作。这是我的完整工作流程定义(当然,我的实际用例会产生更多数据):from flytekit.sdk.tasks import python_task, outputs, inputsfrom flytekit.sdk.types import Typesfrom flytekit.sdk.workflow import workflow_class, Output, Input@inputs(the_text=Types.String)@outputs(the_blob=Types.Blob)@python_taskdef create_blob(wf_params, the_text, the_blob): fname = "a-file.txt" with open(fname, "w") as f: f.write(the_text) the_blob.set(fname)@inputs(the_blob=Types.Blob)@outputs(the_text=Types.String)@python_taskdef read_blob(wf_params, the_blob, the_text): the_blob.download() with open(the_blob.local_path) as f: the_text.set(f.read())@workflow_classclass PassBlob: input_text = Input(Types.String, required=True, help="The text to write to the file") create = create_blob(the_text=input_text) read = read_blob(the_blob=create.outputs.the_blob) output_text = Output(read.outputs.the_text, sdk_type=Types.String, help="The text read from the file")此工作流部署成功,当我运行它时,会发生以下情况:该create任务成功运行,并说明其输出:the_blob: type: single uri: /4u/fe0c7c6326294497dac9-create-0/9c684e85918080341a14478b5f013ee6在任务之间传递 s 的正确方法是什么Types.Blob?我该如何进行这项工作?
1 回答
www说
TA贡献1775条经验 获得超8个赞
我假设您正在本地沙箱环境中运行它(您正在使用 minio,它是我们在沙箱环境中部署的测试 blob 存储)。您能否分享您用于注册工作流程的 Flytekit.config 文件。
因此,Flyte 根据您的配置方式自动将中间数据存储在存储桶 (S3 / GCS) 中。
前缀设置用于自动将数据上传到配置的存储桶和前缀 https://github.com/lyft/flytesnacks/blob/b980963e48eac4ab7e4a9a3e58b353ad523cee47/cookbook/sandbox.config#L7
v0.7.0 之前的版本 - 使用配置中的分片格式化程序设置 - https://github.com/lyft/flytesnacks/blob/b980963e48eac4ab7e4a9a3e58b353ad523cee47/cookbook/sandbox.config#L14-L17
另请告诉我们您正在运行的 Flyte 版本。请加入 Slack 频道,我可以帮助您入门。抱歉给大家带来的麻烦
添加回答
举报
0/150
提交
取消