为了账号安全,请及时绑定邮箱和手机立即绑定

如何处理完美的任务失败并使用 on_failure 参数返回 SUCCESS?

如何处理完美的任务失败并使用 on_failure 参数返回 SUCCESS?

四季花海 2022-07-12 18:04:33
我有一个Flowin prefectwhich 和 atask的输出是 a dataframe。在下面提供的示例中,它总是失败。我希望task返回一个空dataframe的状态为SUCCESSusing @task(on_failure=handle_task_fail)。实现此目的的正确语法是什么?from pprint import pprintimport pandas as pdfrom prefect import Flow, taskfrom prefect.engine.signals import SUCCESSdef handle_disambig_error(task, old_state, new_state):    if new_state.is_failed():        new_state.result["wiki_df"] = pd.DataFrame()        # Is this needed?        #set state to SUCCESS    return new_state@task(on_failure=handle_disambig_error)def get_wiki_resource():    wiki_df = pd.DataFrame(        {            "a":[1],            "b":[1/0]        }    )    return wiki_dfwith Flow("Always Fail") as flow:    wiki_df = get_wiki_resource()state = flow.run()task_state = state.result[wiki_df]pprint(task_state.result)追溯:Traceback (most recent call last):  File "/miniconda3/lib/python3.7/site-packages/prefect/engine/runner.py", line 161, in handle_state_change    new_state = self.call_runner_target_handlers(old_state, new_state)  File "/miniconda3/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 120, in call_runner_target_handlers    new_state = handler(self.task, old_state, new_state) or new_state  File "/miniconda3/lib/python3.7/site-packages/prefect/utilities/notifications.py", line 69, in state_handler    fn(obj, new_state)TypeError: handle_disambig_error() missing 1 required positional argument: 'new_state'[2020-01-28 17:39:41,759] INFO - prefect.TaskRunner | Task 'get_wiki_resource': finished task run for task with final state: 'Failed'[2020-01-28 17:39:41,762] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
查看完整描述

1 回答

?
慕标琳琳

TA贡献1830条经验 获得超9个赞

这里有两件事:


1.) 通用状态处理程序:这些可以通过state_handlerskwarg 设置,并在每次状态更改时调用。状态处理程序需要具有签名state_handler(task: Task, old_state: State, new_state: State) -> Optional[State](这是您正在使用的签名);调用此处理程序后的任务状态将是处理程序返回的状态,或者返回new_stateif None。


2.) 关于失败回调:on_failure您在此处使用的 kwarg 旨在成为状态处理程序的便利 API;传递给此关键字的函数必须具有签名,并且仅在此任务进入状态fn(task: Task, state: State) -> None时才会被调用。Failed请注意,失败回调不能像状态处理程序那样改变任务的状态。


在您的示例中,您似乎混合了两个关键字参数。我相信下面的代码会做你所期望的:


from prefect.engine.state import Success



def handle_disambig_error(task, old_state, new_state):

    if new_state.is_failed():

        return_state = Success(result=pd.DataFrame())

    else:

        return_state = new_state

    return return_state


@task(state_handlers=[handle_disambig_error])

def get_wiki_resource():

   return df


查看完整回答
反对 回复 2022-07-12
  • 1 回答
  • 0 关注
  • 107 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信