为了账号安全,请及时绑定邮箱和手机立即绑定

asyncio.start_unix_server 和 redis 的套接字错误

asyncio.start_unix_server 和 redis 的套接字错误

潇潇雨雨 2023-11-09 21:29:42
我正在尝试使用 asyncio 和 Unix 域套接字在 Python 中构建一个玩具内存 Redis 服务器。baz我的最小示例只返回每个请求的值:import asyncioclass RedisServer:    def __init__(self):        self.server_address = "/tmp/redis.sock"    async def handle_req(self, reader, writer):        await reader.readline()        writer.write(b"$3\r\nbaz\r\n")        await writer.drain()        writer.close()        await writer.wait_closed()    async def main(self):        server = await asyncio.start_unix_server(self.handle_req, self.server_address)        async with server:            await server.serve_forever()    def run(self):        asyncio.run(self.main())RedisServer().run()当我使用以下脚本使用客户端库测试两个连续的客户端请求时,它可以redis工作:import timeimport redisr = redis.Redis(unix_socket_path="/tmp/redis.sock")r.get("foo")time.sleep(1)r.get("bar")但是,如果我删除time.sleep(1),有时它会起作用,有时第二个请求会失败,并出现以下任一情况:Traceback (most recent call last):  File "/tmp/env/lib/python3.8/site-packages/redis/connection.py", line 706, in send_packed_command    sendall(self._sock, item)  File "/tmp/env/lib/python3.8/site-packages/redis/_compat.py", line 9, in sendall    return sock.sendall(*args, **kwargs)BrokenPipeError: [Errno 32] Broken pipeDuring handling of the above exception, another exception occurred:Traceback (most recent call last):  File "test.py", line 9, in <module>    r.get("bar")  File "/tmp/env/lib/python3.8/site-packages/redis/client.py", line 1606, in get    return self.execute_command('GET', name)  File "/tmp/env/lib/python3.8/site-packages/redis/client.py", line 900, in execute_command    conn.send_command(*args)  File "/tmp/env/lib/python3.8/site-packages/redis/connection.py", line 725, in send_command    self.send_packed_command(self.pack_command(*args),  File "/tmp/env/lib/python3.8/site-packages/redis/connection.py", line 717, in send_packed_command    raise ConnectionError("Error %s while writing to socket. %s." %redis.exceptions.ConnectionError: Error 32 while writing to socket. Broken pipe.看来我的实现缺少客户端库期望的一些关键行为(可能是由于它是异步的)。我缺少什么?
查看完整描述

1 回答

?
拉风的咖菲猫

TA贡献1995条经验 获得超2个赞

write_eof()如果您想在每次请求后关闭套接字,则需要使用

缓冲的写入数据刷新后,关闭流的写入端。

您的代码稍加修改将如下所示:

async def handle_req(self, reader, writer):

    await reader.readline()

    writer.write(b"$3\r\nbaz\r\n")

    await writer.drain()

    writer.write_eof()

    writer.close()

    await writer.wait_closed()

通常,您不会在每次请求后关闭套接字。


以下示例仅用于说明目的,旨在表明套接字不需要关闭。当然,您总是会读取一行,然后根据 Redis 协议解释数据。我们知道这里发送了两个 GET 命令(每行 5 行,包含 2 个元素的数组的指示符,字符串的指示符,字符串值“GET”,以及字符串指示符和相应的值,即键)


async def handle_req(self, reader, writer):

    print("start")

    for i in range(0, 2):

        for x in range(0, 5):

            print(await reader.readline())

        writer.write(b"$3\r\nbaz\r\n")

        await writer.drain()

    writer.write_eof()

    writer.close()

    await writer.wait_closed()

在客户端发送是这样完成的:


print(r.get("foo"))

print(r.get("bar"))

time.sleep(1)

最后一次time.sleep是为了确保客户端不会立即退出。


控制台上的输出是:


start

b'*2\r\n'

b'$3\r\n'

b'GET\r\n'

b'$3\r\n'

b'foo\r\n'

b'*2\r\n'

b'$3\r\n'

b'GET\r\n'

b'$3\r\n'

b'bar\r\n'

请注意,start仅输出一次,这表明我们可以处理多个请求,而不必立即关闭套接字。


查看完整回答
反对 回复 2023-11-09
  • 1 回答
  • 0 关注
  • 86 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信