1 回答
TA贡献1830条经验 获得超9个赞
这里有两件事:
1.) 通用状态处理程序:这些可以通过state_handlerskwarg 设置,并在每次状态更改时调用。状态处理程序需要具有签名state_handler(task: Task, old_state: State, new_state: State) -> Optional[State](这是您正在使用的签名);调用此处理程序后的任务状态将是处理程序返回的状态,或者返回new_stateif None。
2.) 关于失败回调:on_failure您在此处使用的 kwarg 旨在成为状态处理程序的便利 API;传递给此关键字的函数必须具有签名,并且仅在此任务进入状态fn(task: Task, state: State) -> None时才会被调用。Failed请注意,失败回调不能像状态处理程序那样改变任务的状态。
在您的示例中,您似乎混合了两个关键字参数。我相信下面的代码会做你所期望的:
from prefect.engine.state import Success
def handle_disambig_error(task, old_state, new_state):
if new_state.is_failed():
return_state = Success(result=pd.DataFrame())
else:
return_state = new_state
return return_state
@task(state_handlers=[handle_disambig_error])
def get_wiki_resource():
return df
添加回答
举报