3 回答

TA贡献1785条经验 获得超4个赞
trio.current_time()为此使用恕我直言太复杂了。
进行速率限制的最简单方法是速率限制器,即基本上可以执行此操作的单独任务:
async def ratelimit(queue,tick, task_status=trio.TASK_STATUS_IGNORED):
with trio.open_cancel_scope() as scope:
task_status.started(scope)
while True:
await queue.get()
await trio.sleep(tick)
使用示例:
async with trio.open_nursery() as nursery:
q = trio.Queue(0)
limiter = await nursery.start(ratelimit, q, 1)
while whatever:
await q.put(None) # will return at most once per second
do_whatever()
limiter.cancel()
换句话说,您可以使用以下命令启动该任务
q = trio.Queue(0)
limiter = await nursery.start(ratelimit, q, 1)
然后您可以确定最多
await q.put(None)
零长度队列充当集合点,因此每秒将返回。完成后,致电
limiter.cancel()
停止限速任务,否则您的托儿所将不会退出。
如果您的用例包括开始的子任务,您需要在取消限制器之前完成这些子任务,那么最简单的方法是将它们冲洗到另一个托儿所中,而不是
while whatever:
await q.put(None) # will return at most once per second
do_whatever()
limiter.cancel()
你会用类似的东西
async with trio.open_nursery() as inner_nursery:
await start_tasks(inner_nursery, q)
limiter.cancel()
它将在触摸限制器之前等待任务完成。
注意:您可以轻松地将其调整为“突发”模式,即,只需增加队列的长度,就可以在速率限制生效之前允许一定数量的请求。

TA贡献1804条经验 获得超8个赞
此解决方案的动机和由来
自从我问了这个问题以来已经过去了几个月。从那时起,Python得到了改进,三人组(以及我对它们的了解)也有所改进。因此,我认为是时候使用带有类型注释和trio-0.10内存通道的Python 3.6进行一些更新了。
我对原始版本进行了自己的改进,但是在阅读@Roman Novatorov的出色解决方案后,再次进行了调整,这就是结果。对于函数的主要结构(以及使用httpbin.org进行说明的想法)表示敬意。我选择使用内存通道而不是互斥锁,以便能够从工作程序中删除所有令牌重新释放逻辑。
解决方案说明
我可以这样改写原来的问题:
我希望有许多工作人员彼此独立地启动请求(因此,它们将被实现为异步功能)。
在任何时候都释放零或一个令牌;向服务器发起请求的任何工作人员都将消耗一个令牌,并且直到经过最短时间后才会发出下一个令牌。在我的解决方案中,我使用三重奏的内存通道来协调令牌发行者和令牌使用者(工人)之间的关系
如果您不熟悉内存通道及其语法,可以在trio doc中阅读有关它们的信息。我想的逻辑async with memory_channel,并memory_channel.clone()能在第一时刻被混淆。
from typing import List, Iterator
import asks
import trio
asks.init('trio')
links: List[str] = [
'https://httpbin.org/delay/7',
'https://httpbin.org/delay/6',
'https://httpbin.org/delay/4'
] * 3
async def fetch_urls(urls: List[str], number_workers: int, throttle_rate: float):
async def token_issuer(token_sender: trio.abc.SendChannel, number_tokens: int):
async with token_sender:
for _ in range(number_tokens):
await token_sender.send(None)
await trio.sleep(1 / throttle_rate)
async def worker(url_iterator: Iterator, token_receiver: trio.abc.ReceiveChannel):
async with token_receiver:
for url in url_iterator:
await token_receiver.receive()
print(f'[{round(trio.current_time(), 2)}] Start loading link: {url}')
response = await asks.get(url)
# print(f'[{round(trio.current_time(), 2)}] Loaded link: {url}')
responses.append(response)
responses = []
url_iterator = iter(urls)
token_send_channel, token_receive_channel = trio.open_memory_channel(0)
async with trio.open_nursery() as nursery:
async with token_receive_channel:
nursery.start_soon(token_issuer, token_send_channel.clone(), len(urls))
for _ in range(number_workers):
nursery.start_soon(worker, url_iterator, token_receive_channel.clone())
return responses
responses = trio.run(fetch_urls, links, 5, 1.)
日志输出示例:
如您所见,所有页面请求之间的最短时间为一秒:
[177878.99] Start loading link: https://httpbin.org/delay/7
[177879.99] Start loading link: https://httpbin.org/delay/6
[177880.99] Start loading link: https://httpbin.org/delay/4
[177881.99] Start loading link: https://httpbin.org/delay/7
[177882.99] Start loading link: https://httpbin.org/delay/6
[177886.20] Start loading link: https://httpbin.org/delay/4
[177887.20] Start loading link: https://httpbin.org/delay/7
[177888.20] Start loading link: https://httpbin.org/delay/6
[177889.44] Start loading link: https://httpbin.org/delay/4
解决方案评论
由于对于异步代码而言并非不常见,因此该解决方案不会保留请求的URL的原始顺序。解决此问题的一种方法是将id与原始url相关联,例如使用元组结构,将响应放入响应字典中,然后依次抓取响应以将其放入响应列表中(保存排序并具有线性复杂)。
添加回答
举报