为了账号安全,请及时绑定邮箱和手机立即绑定

为什么 pyzmq 订阅者与 asyncio 的行为不同?

为什么 pyzmq 订阅者与 asyncio 的行为不同?

慕慕森 2023-10-11 09:59:14
我有一个 XPUB/XSUB 设备和多个模拟发布者在一个进程中运行。在一个单独的过程中,我想连接订阅者并将收到的消息打印到终端。下面我将展示一个简单函数的两个变体来实现这一点。我将这些函数包装为命令行实用程序。我的问题是 asyncio 变体永远不会接收消息。另一方面,非异步变体工作得很好。我已经测试了 ipc 和 tcp 传输的所有情况。在我的测试中,发布过程从未改变,除非我重新启动它以更改传输。这些消息是短字符串,大约每秒发布一次,因此我们不考虑性能问题。用户程序无限期地在线msg = await sock.receive_multipart()。在 XPUB/XSUB 设备中,我有显示消息转发的仪器sock.setsockopt(zmq.SUBSCRIBE, channel.encode()),与非异步变体连接时相同。asyncio 变体(不起作用,如所述)def subs(url, channel):    import asyncio    import zmq    import zmq.asyncio    ctx = zmq.asyncio.Context.instance()    sock = ctx.socket(zmq.SUB)    sock.connect(url)    sock.setsockopt(zmq.SUBSCRIBE, channel.encode())    async def task():        while True:            msg = await sock.recv_multipart()            print(' | '.join(m.decode() for m in msg))    try:        asyncio.run(task())    finally:        sock.setsockopt(zmq.LINGER, 0)        sock.close()常规阻塞变体(工作正常)def subs(url, channel):    import zmq    ctx = zmq.Context.instance()    sock = ctx.socket(zmq.SUB)    sock.connect(url)    sock.setsockopt(zmq.SUBSCRIBE, channel.encode())    def task():        while True:            msg = sock.recv_multipart()            print(' | '.join(m.decode() for m in msg))    try:        task()    finally:        sock.setsockopt(zmq.LINGER, 0)        sock.close()对于这个特定的工具,不需要使用 asyncio。但是,我在代码的其他地方也遇到了这个问题,异步接收永远不会收到。因此,我希望通过在这个简单的案例中弄清楚它,我将了解一般情况下出了什么问题。我的版本是import zmqzmq.zmq_version()  # '4.3.2'zmq.__version__  # '19.0.2'我使用的是 MacOS 10.13.6。我完全没有主意了。互联网,请帮忙!
查看完整描述

1 回答

?
catspeake

TA贡献1111条经验 获得超0个赞

一个有效的异步变体是


def subs(url, channel):

    import asyncio


    import zmq

    import zmq.asyncio


    ctx = zmq.asyncio.Context.instance()


    async def task():

        sock = ctx.socket(zmq.SUB)

        sock.connect(url)

        sock.setsockopt(zmq.SUBSCRIBE, channel.encode())


        try:

            while True:

                msg = await sock.recv_multipart()

                print(' | '.join(m.decode() for m in msg))

        finally:

            sock.setsockopt(zmq.LINGER, 0)

            sock.close()


    asyncio.run(task())

我的结论是,当使用 asyncio zmq 时,必须通过在等待套接字的事件循环上运行的调用来创建套接字。尽管原始形式没有对事件循环做任何花哨的事情,但套接字的事件循环似乎与asyncio.run. 我不知道为什么,我没有用 pyzmq 提出问题,因为他们的文档显示了这个答案中的用法,没有评论。


编辑回应评论:


asyncio.run总是创建一个新的事件循环,因此可能为传递到的协同例程之外实例化的套接字创建的循环asyncio.run(如原始问题中的 asyncio 变体)明显不同。


查看完整回答
反对 回复 2023-10-11
  • 1 回答
  • 0 关注
  • 88 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信