您将如何调用带有参数的函数(在我的代码中称为 func)作为 event_loop 的一部分?考虑到这些函数应该在服务器传入请求时多次调用。async def my_func_1(json_message: dict, writer: StreamWriter) -> bool: return Trueasync def my_func_2(json_message: dict, writer: StreamWriter) -> bool: return Trueasync def my_func_3(json_message: dict, writer: StreamWriter) -> bool: return Trueswitcher = { "Forward": my_func_1, "Backward": my_func_2, "Up": my_func_3}async def dispatcher(reader: StreamReader, writer: StreamWriter): try: msg = await reader.readline() message = ujson.decode(msg.decode()) except Exception: print("unable to parse json from read stream:" + str(msg.decode())) if "method" in message: func = switcher.get(message['method'], UnknownMethod) # how would you invoke func with arguments as part of the event_loop? # considering these functions should be invoked multiple times.def init(): loop = asyncio.get_event_loop() coro = asyncio.start_server(dispatcher, '127.0.0.1', 666, loop=loop) server = loop.run_until_complete(coro) try: loop.run_forever() except KeyboardInterrupt: pass # Close the server server.close() loop.run_until_complete(server.wait_closed()) loop.close()if __name__ == '__main__': init()感谢所有的帮助
1 回答
慕工程0101907
TA贡献1887条经验 获得超5个赞
根据我对您问题的理解,答案是简单地使用所需参数调用该函数,就像在任何其他上下文中一样。
# ...
if "method" in message:
func = switcher.get(message['method'], UnknownMethod)
await func(message, writer)
# ...
添加回答
举报
0/150
提交
取消