为了账号安全,请及时绑定邮箱和手机立即绑定

将 python asyncio 与 pyee 事件发射器相结合

将 python asyncio 与 pyee 事件发射器相结合

慕桂英3389331 2023-03-01 15:46:14
我正在尝试使用pyee 库AsyncIOEventEmitter中的,但没有成功。由于某种原因,发出的事件“Hi”永远不会到达完成 asyncio 的未来。我也没有在网上找到合适的例子。此外,我尝试提供当前事件并为 ,使用新的事件循环,但两者产生相同的结果。async_handlerAsyncIOEventEmitter有人可以帮我吗?下面的示例单元测试:import asyncioimport loggingimport pytestfrom pyee import AsyncIOEventEmitterLOG = logging.getLogger(__name__)@pytest.mark.asyncioasync def test_setup(event_loop):    LOG.info("1 - start")    event_emitter = AsyncIOEventEmitter(asyncio.new_event_loop())    # Create a new Future object.    future_result = event_loop.create_future()    LOG.info("2 - emit event")    event_emitter.emit("event", "Hi")    @event_emitter.on("event")    async def async_handler(message):        LOG.info(">>> %s", message)        future_result.set_result(message)        return future_result    # Wait until *future_result* has a result and print it.    LOG.info(await future_result)
查看完整描述

1 回答

?
胡说叔叔

TA贡献1804条经验 获得超8个赞

好的,明白了,该async_handler方法必须在测试的早期定义...


这现在有效:


"""Event emitter playground"""

import asyncio

import logging

import pytest

from pyee import AsyncIOEventEmitter


LOG = logging.getLogger(__name__)



@pytest.mark.asyncio

async def test_setup(event_loop):

    """Receive event from emitter and complete future!"""

    LOG.info("1 - start")

    event_emitter = AsyncIOEventEmitter(asyncio.new_event_loop())


    @event_emitter.on("event")

    def async_handler(message):

        LOG.info(">>> %s", message)

        future_result.set_result(message)


    future_result = event_loop.create_future()

    LOG.info("2 - emit event")

    event_emitter.emit("event", "Hi")


    LOG.info(await future_result)


查看完整回答
反对 回复 2023-03-01
  • 1 回答
  • 0 关注
  • 369 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信