1 回答
TA贡献1779条经验 获得超6个赞
如果您尝试借助input(key_press示例中的函数)模拟USB数据流,则必须使用multithreading模块,因为它input是一个阻塞函数,它将停止asyncio线程中的循环工作。要asyncio与input函数结合,您必须使用另一个线程,请查找以下示例:
import asyncio
import threading
async def do_work(i):
"""take input key and do some work with the input"""
await asyncio.sleep(5)
print(i)
async def main_thread_loop_work(_key):
"""simulate multiple tasks caused by key input"""
t1 = asyncio.create_task(do_work(_key))
t2 = asyncio.create_task(do_work(_key))
t3 = asyncio.create_task(do_work(_key))
await t1
await t2
await t3
def thead_worker(_key):
"""target function for another thread"""
asyncio.run(main_thread_loop_work(_key))
if __name__ == '__main__':
while True:
some_key = input("Please provide any key: ")
th = threading.Thread(target=thead_worker, args=(some_key, ))
th.start()
添加回答
举报