为了账号安全,请及时绑定邮箱和手机立即绑定

将python-trio中的信号量和时间限制与asks HTTP请求相结合

将python-trio中的信号量和时间限制与asks HTTP请求相结合

繁花如伊 2021-03-31 12:52:32
我正在尝试以异步方式使用Python,以便加快对服务器的请求。服务器的响应时间很慢(通常为几秒钟,但有时也快于一秒钟),但并行运行良好。我无权访问此服务器,也无法更改任何内容。因此,我有一个很大的URL列表(在下面的代码中pages),这些列表是我事先知道的,并且希望通过一次发出NO_TASKS=5请求来加快它们的加载速度。另一方面,我不想使服务器超载,因此我希望每个请求之间的最小间隔为1秒(即每秒1个请求的限制)。到目前为止,我已经使用Trio队列成功实现了信号量部分(一次五个请求)。import asksimport timeimport trioNO_TASKS = 5asks.init('trio')asks_session = asks.Session()queue = trio.Queue(NO_TASKS)next_request_at = 0results = []pages = [    'https://www.yahoo.com/',    'http://www.cnn.com',    'http://www.python.org',    'http://www.jython.org',    'http://www.pypy.org',    'http://www.perl.org',    'http://www.cisco.com',    'http://www.facebook.com',    'http://www.twitter.com',    'http://www.macrumors.com/',    'http://arstechnica.com/',    'http://www.reuters.com/',    'http://abcnews.go.com/',    'http://www.cnbc.com/',]async def async_load_page(url):    global next_request_at    sleep = next_request_at    next_request_at = max(trio.current_time() + 1, next_request_at)    await trio.sleep_until(sleep)    next_request_at = max(trio.current_time() + 1, next_request_at)    print('start loading page {} at {} seconds'.format(url, trio.current_time()))    req = await asks_session.get(url)    results.append(req.text)async def producer(url):    await queue.put(url)  async def consumer():    while True:        if queue.empty():            print('queue empty')            return        url = await queue.get()        await async_load_page(url)async def main():    async with trio.open_nursery() as nursery:        for page in pages:            nursery.start_soon(producer, page)        await trio.sleep(0.2)        for _ in range(NO_TASKS):            nursery.start_soon(consumer)start = time.time()trio.run(main)但是,我缺少限制部分的实现,即max。的实现。每秒1个请求。您可以在我尝试这样做的上方看到(的前五行async_load_page)
查看完整描述

3 回答

?
九州编程

TA贡献1785条经验 获得超4个赞

trio.current_time()为此使用恕我直言太复杂了。


进行速率限制的最简单方法是速率限制器,即基本上可以执行此操作的单独任务:


async def ratelimit(queue,tick, task_status=trio.TASK_STATUS_IGNORED):

    with trio.open_cancel_scope() as scope:

        task_status.started(scope)

        while True:

            await queue.get()

            await trio.sleep(tick)

使用示例:


async with trio.open_nursery() as nursery:

    q = trio.Queue(0)

    limiter = await nursery.start(ratelimit, q, 1)

    while whatever:

        await q.put(None) # will return at most once per second

        do_whatever()

    limiter.cancel()

换句话说,您可以使用以下命令启动该任务


q = trio.Queue(0)

limiter = await nursery.start(ratelimit, q, 1)

然后您可以确定最多


await q.put(None)

零长度队列充当集合点,因此每秒将返回。完成后,致电


 limiter.cancel()

停止限速任务,否则您的托儿所将不会退出。


如果您的用例包括开始的子任务,您需要在取消限制器之前完成这些子任务,那么最简单的方法是将它们冲洗到另一个托儿所中,而不是


while whatever:

    await q.put(None) # will return at most once per second

    do_whatever()

limiter.cancel()

你会用类似的东西


async with trio.open_nursery() as inner_nursery:

    await start_tasks(inner_nursery, q)

limiter.cancel()

它将在触摸限制器之前等待任务完成。


注意:您可以轻松地将其调整为“突发”模式,即,只需增加队列的长度,就可以在速率限制生效之前允许一定数量的请求。


查看完整回答
反对 回复 2021-04-27
?
胡说叔叔

TA贡献1804条经验 获得超8个赞

此解决方案的动机和由来

自从我问了这个问题以来已经过去了几个月。从那时起,Python得到了改进,三人组(以及我对它们的了解)也有所改进。因此,我认为是时候使用带有类型注释和trio-0.10内存通道的Python 3.6进行一些更新了。


我对原始版本进行了自己的改进,但是在阅读@Roman Novatorov的出色解决方案后,再次进行了调整,这就是结果。对于函数的主要结构(以及使用httpbin.org进行说明的想法)表示敬意。我选择使用内存通道而不是互斥锁,以便能够从工作程序中删除所有令牌重新释放逻辑。


解决方案说明

我可以这样改写原来的问题:


我希望有许多工作人员彼此独立地启动请求(因此,它们将被实现为异步功能)。

在任何时候都释放零或一个令牌;向服务器发起请求的任何工作人员都将消耗一个令牌,并且直到经过最短时间后才会发出下一个令牌。在我的解决方案中,我使用三重奏的内存通道来协调令牌发行者和令牌使用者(工人)之间的关系

如果您不熟悉内存通道及其语法,可以在trio doc中阅读有关它们的信息。我想的逻辑async with memory_channel,并memory_channel.clone()能在第一时刻被混淆。


from typing import List, Iterator


import asks

import trio


asks.init('trio')


links: List[str] = [

    'https://httpbin.org/delay/7',

    'https://httpbin.org/delay/6',

    'https://httpbin.org/delay/4'

] * 3



async def fetch_urls(urls: List[str], number_workers: int, throttle_rate: float):


    async def token_issuer(token_sender: trio.abc.SendChannel, number_tokens: int):

        async with token_sender:

            for _ in range(number_tokens):

                await token_sender.send(None)

                await trio.sleep(1 / throttle_rate)


    async def worker(url_iterator: Iterator, token_receiver: trio.abc.ReceiveChannel):

        async with token_receiver:

            for url in url_iterator:

                await token_receiver.receive()


                print(f'[{round(trio.current_time(), 2)}] Start loading link: {url}')

                response = await asks.get(url)

                # print(f'[{round(trio.current_time(), 2)}] Loaded link: {url}')

                responses.append(response)


    responses = []

    url_iterator = iter(urls)

    token_send_channel, token_receive_channel = trio.open_memory_channel(0)


    async with trio.open_nursery() as nursery:

        async with token_receive_channel:

            nursery.start_soon(token_issuer, token_send_channel.clone(), len(urls))

            for _ in range(number_workers):

                nursery.start_soon(worker, url_iterator, token_receive_channel.clone())


    return responses


responses = trio.run(fetch_urls, links, 5, 1.)

日志输出示例:

如您所见,所有页面请求之间的最短时间为一秒:


[177878.99] Start loading link: https://httpbin.org/delay/7

[177879.99] Start loading link: https://httpbin.org/delay/6

[177880.99] Start loading link: https://httpbin.org/delay/4

[177881.99] Start loading link: https://httpbin.org/delay/7

[177882.99] Start loading link: https://httpbin.org/delay/6

[177886.20] Start loading link: https://httpbin.org/delay/4

[177887.20] Start loading link: https://httpbin.org/delay/7

[177888.20] Start loading link: https://httpbin.org/delay/6

[177889.44] Start loading link: https://httpbin.org/delay/4

解决方案评论

由于对于异步代码而言并非不常见,因此该解决方案不会保留请求的URL的原始顺序。解决此问题的一种方法是将id与原始url相关联,例如使用元组结构,将响应放入响应字典中,然后依次抓取响应以将其放入响应列表中(保存排序并具有线性复杂)。


查看完整回答
反对 回复 2021-04-27
  • 3 回答
  • 0 关注
  • 210 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
微信客服

购课补贴
联系客服咨询优惠详情

帮助反馈 APP下载

慕课网APP
您的移动学习伙伴

公众号

扫描二维码
关注慕课网微信公众号