3 回答
TA贡献2016条经验 获得超9个赞
这里是最简单的算法,如果您只想在消息到达太快时丢弃它们(而不是排队它们,这是有道理的,因为队列可能会变得任意大):
rate = 5.0; // unit: messages
per = 8.0; // unit: seconds
allowance = rate; // unit: messages
last_check = now(); // floating-point, e.g. usec accuracy. Unit: seconds
when (message_received):
current = now();
time_passed = current - last_check;
last_check = current;
allowance += time_passed * (rate / per);
if (allowance > rate):
allowance = rate; // throttle
if (allowance < 1.0):
discard_message();
else:
forward_message();
allowance -= 1.0;
在这个解决方案中没有数据结构,定时器等,它干净利落地工作:)为了看到这一点,'allowance'最多以每秒5/8单位的速度增长,即每8秒最多5个单位。转发的每条消息都会扣除一个单元,因此每八秒钟不能发送五条以上的消息。
请注意,rate应该是一个整数,即没有非零小数部分,否则算法将无法正常工作(实际速率不会rate/per)。例如rate=0.5; per=1.0;不起作用,因为allowance永远不会增长到1.0。但rate=1.0; per=2.0;工作正常。
TA贡献1871条经验 获得超13个赞
在排队的函数之前使用此装饰器@RateLimited(ratepersec)。
基本上,这会检查自上次以来是否已经过1 /速率秒,如果没有,则等待剩余的时间,否则它不会等待。这实际上限制了你的速率/秒。装饰器可以应用于您想要的速率限制的任何功能。
在您的情况下,如果您希望每8秒最多发送5条消息,请在sendToQueue函数之前使用@RateLimited(0.625)。
import timedef RateLimited(maxPerSecond): minInterval = 1.0 / float(maxPerSecond) def decorate(func): lastTimeCalled = [0.0] def rateLimitedFunction(*args,**kargs): elapsed = time.clock() - lastTimeCalled[0] leftToWait = minInterval - elapsed if leftToWait>0: time.sleep(leftToWait) ret = func(*args,**kargs) lastTimeCalled[0] = time.clock() return ret return rateLimitedFunction return decorate@RateLimited(2) # 2 per second at mostdef PrintNumber(num): print numif __name__ == "__main__": print "This should print 1,2,3... at about 2 per second." for i in range(1,100): PrintNumber(i)
TA贡献1786条经验 获得超11个赞
令牌桶实现起来相当简单。
从带有5个令牌的桶开始。
每5/8秒:如果存储桶少于5个令牌,请添加一个。
每次要发送消息时:如果存储桶有≥1个令牌,请取出一个令牌并发送消息。否则,等待/删除消息/等等。
(显然,在实际代码中,你使用整数计数器而不是真实的标记,你可以通过存储时间戳来优化每5/8步骤)
再次阅读问题,如果速率限制每8秒完全重置一次,那么这里是一个修改:
以时间戳开始,last_send
很久以前(例如,在纪元)。此外,从相同的5令牌桶开始。
每5/8秒执行一次规则。
每次发送消息时:首先,检查是否last_send
≥8秒前。如果是这样,请填充桶(将其设置为5个令牌)。其次,如果存储桶中有令牌,则发送消息(否则,丢弃/等待/等)。第三,设置last_send
为现在。
这应该适用于那种情况。
我实际上是用这样的策略编写了一个IRC机器人(第一种方法)。它在Perl中,而不是Python,但这里有一些代码来说明:
这里的第一部分处理向桶添加令牌。您可以看到基于时间(第2行到最后一行)添加令牌的优化,然后最后一行将桶内容限制为最大值(MESSAGE_BURST)
my $start_time = time; ... # Bucket handling my $bucket = $conn->{fujiko_limit_bucket}; my $lasttx = $conn->{fujiko_limit_lasttx}; $bucket += ($start_time-$lasttx)/MESSAGE_INTERVAL; ($bucket <= MESSAGE_BURST) or $bucket = MESSAGE_BURST;
$ conn是一种传递的数据结构。这是在一个常规运行的方法中(它计算下次有什么事情要做,并且长时间睡眠或直到它获得网络流量)。该方法的下一部分处理发送。它相当复杂,因为消息具有与之相关的优先级。
# Queue handling. Start with the ultimate queue. my $queues = $conn->{fujiko_queues}; foreach my $entry (@{$queues->[PRIORITY_ULTIMATE]}) { # Ultimate is special. We run ultimate no matter what. Even if # it sends the bucket negative. --$bucket; $entry->{code}(@{$entry->{args}}); } $queues->[PRIORITY_ULTIMATE] = [];
这是第一个队列,无论如何都会运行。即使它让我们的连接因洪水而死亡。用于非常重要的事情,比如响应服务器的PING。接下来,其余的队列:
# Continue to the other queues, in order of priority. QRUN: for (my $pri = PRIORITY_HIGH; $pri >= PRIORITY_JUNK; --$pri) { my $queue = $queues->[$pri]; while (scalar(@$queue)) { if ($bucket < 1) { # continue later. $need_more_time = 1; last QRUN; } else { --$bucket; my $entry = shift @$queue; $entry->{code}(@{$entry->{args}}); } } }
最后,存储桶状态被保存回$ conn数据结构(实际上稍晚于该方法;它首先计算它将在多长时间内完成更多工作)
# Save status. $conn->{fujiko_limit_bucket} = $bucket; $conn->{fujiko_limit_lasttx} = $start_time;
如您所见,实际的铲斗处理代码非常小 - 大约四行。其余代码是优先级队列处理。机器人具有优先级队列,例如,与之聊天的人不能阻止它执行其重要的启动/禁令职责。
添加回答
举报