3 回答
TA贡献1829条经验 获得超9个赞
生产者-消费者队列有许多不同的实现方式,例如queue.Queue可用。它们通常具有许多不同的属性,例如Dmitry Vyukov在此出色的文章中列出的属性。如您所见,可能有超过1万种不同的组合。根据要求,用于此类队列的算法也相差很大。仅扩展现有队列算法以保证其他属性是不可能的,因为这通常需要不同的内部数据结构和不同的算法。
Go的频道提供了相对较高的保证属性,因此这些频道可能适用于许多程序。最困难的要求之一是支持一次读取/阻塞多个通道(select语句),并且如果select语句中可以有多个分支可以继续进行,则要公平地选择一个通道,这样就不会留下任何消息。 。Python的queue.Queue不提供此功能,因此根本无法使用它来存档相同的行为。
因此,如果要继续使用queue.Queue,则需要找到解决该问题的方法。但是,变通办法有其自身的缺点列表,并且较难维护。寻找另一个提供所需功能的生产者-消费者队列可能是一个更好的主意!无论如何,这是两个可能的解决方法:
轮询
while True:
try:
i1 = c1.get_nowait()
print "received %s from c1" % i1
except queue.Empty:
pass
try:
i2 = c2.get_nowait()
print "received %s from c2" % i2
except queue.Empty:
pass
time.sleep(0.1)
在轮询通道时,这可能会占用大量CPU周期,并且在有很多消息时可能会变慢。使用具有指数退避时间(而不是此处显示的恒定0.1秒)的time.sleep()可能会大大改善此版本。
单个通知队列
queue_id = notify.get()
if queue_id == 1:
i1 = c1.get()
print "received %s from c1" % i1
elif queue_id == 2:
i2 = c2.get()
print "received %s from c2" % i2
使用此设置,您必须在发送到c1或c2之后将某些内容发送到通知队列。只要您只有一个这样的通知队列就足够了(即您没有多个“选择”,每个“选择”阻塞在您通道的不同子集上),这可能对您有用。
另外,您也可以考虑使用Go。无论如何,Go的goroutines和并发支持比Python的有限线程功能强大得多。
TA贡献1842条经验 获得超12个赞
如果使用queue.PriorityQueue,则可以使用通道对象作为优先级获得类似的行为:
import threading, logging
import random, string, time
from queue import PriorityQueue, Empty
from contextlib import contextmanager
logging.basicConfig(level=logging.NOTSET,
format="%(threadName)s - %(message)s")
class ChannelManager(object):
next_priority = 0
def __init__(self):
self.queue = PriorityQueue()
self.channels = []
def put(self, channel, item, *args, **kwargs):
self.queue.put((channel, item), *args, **kwargs)
def get(self, *args, **kwargs):
return self.queue.get(*args, **kwargs)
@contextmanager
def select(self, ordering=None, default=False):
if default:
try:
channel, item = self.get(block=False)
except Empty:
channel = 'default'
item = None
else:
channel, item = self.get()
yield channel, item
def new_channel(self, name):
channel = Channel(name, self.next_priority, self)
self.channels.append(channel)
self.next_priority += 1
return channel
class Channel(object):
def __init__(self, name, priority, manager):
self.name = name
self.priority = priority
self.manager = manager
def __str__(self):
return self.name
def __lt__(self, other):
return self.priority < other.priority
def put(self, item):
self.manager.put(self, item)
if __name__ == '__main__':
num_channels = 3
num_producers = 4
num_items_per_producer = 2
num_consumers = 3
num_items_per_consumer = 3
manager = ChannelManager()
channels = [manager.new_channel('Channel#{0}'.format(i))
for i in range(num_channels)]
def producer_target():
for i in range(num_items_per_producer):
time.sleep(random.random())
channel = random.choice(channels)
message = random.choice(string.ascii_letters)
logging.info('Putting {0} in {1}'.format(message, channel))
channel.put(message)
producers = [threading.Thread(target=producer_target,
name='Producer#{0}'.format(i))
for i in range(num_producers)]
for producer in producers:
producer.start()
for producer in producers:
producer.join()
logging.info('Producers finished')
def consumer_target():
for i in range(num_items_per_consumer):
time.sleep(random.random())
with manager.select(default=True) as (channel, item):
if channel:
logging.info('Received {0} from {1}'.format(item, channel))
else:
logging.info('No data received')
consumers = [threading.Thread(target=consumer_target,
name='Consumer#{0}'.format(i))
for i in range(num_consumers)]
for consumer in consumers:
consumer.start()
for consumer in consumers:
consumer.join()
logging.info('Consumers finished')
输出示例:
Producer#0 - Putting x in Channel#2
Producer#2 - Putting l in Channel#0
Producer#2 - Putting A in Channel#2
Producer#3 - Putting c in Channel#0
Producer#3 - Putting z in Channel#1
Producer#1 - Putting I in Channel#1
Producer#1 - Putting L in Channel#1
Producer#0 - Putting g in Channel#1
MainThread - Producers finished
Consumer#1 - Received c from Channel#0
Consumer#2 - Received l from Channel#0
Consumer#0 - Received I from Channel#1
Consumer#0 - Received L from Channel#1
Consumer#2 - Received g from Channel#1
Consumer#1 - Received z from Channel#1
Consumer#0 - Received A from Channel#2
Consumer#1 - Received x from Channel#2
Consumer#2 - Received None from default
MainThread - Consumers finished
在这个例子中,ChannelManager只是一个包装器queue.PriorityQueue,将select方法实现contextmanager为使其看起来类似于selectGo中的语句。
注意事项:
定购
在Go示例中,
select
如果有多个通道可用的数据,则在语句中写入通道的顺序确定将执行哪个通道的代码。在python示例中,顺序由分配给每个通道的优先级确定。但是,可以将优先级动态分配给每个通道(如示例中所示),因此可以使用更复杂的
select
方法来更改顺序,该方法将根据该方法的参数来分配新的优先级。同样,一旦上下文管理器完成,可以重新建立旧的顺序。封锁
在Go示例中,
select
如果default
存在案例,则该语句将阻塞。在python示例中,必须将boolean参数传递给该
select
方法,以在需要阻止/非阻止时使其清晰可见。在非阻塞情况下,上下文管理器返回的通道只是字符串,'default'
因此在内部代码中很容易在with
语句内部的代码中检测到此情况。线程:
queue
如示例中所示,模块中的对象已经为多生产者,多消费者的场景做好了准备。
TA贡献1796条经验 获得超4个赞
该pychan项目复制在Python围棋频道,包括复用。它实现了与Go相同的算法,因此符合您所有需要的属性:
多个生产者和消费者可以通过Chan进行交流。当生产者和消费者都准备就绪时,他们对
生产者和消费者按到达顺序得到服务(FIFO)
空(满)队列将阻止使用者(生产者)。
您的示例如下所示:
c1 = Chan(); c2 = Chan(); c3 = Chan()
try:
chan, value = chanselect([c1, c3], [(c2, i2)])
if chan == c1:
print("Received %r from c1" % value)
elif chan == c2:
print("Sent %r to c2" % i2)
else: # c3
print("Received %r from c3" % value)
except ChanClosed as ex:
if ex.which == c3:
print("c3 is closed")
else:
raise
(完全公开:我写了这个库)
- 3 回答
- 0 关注
- 229 浏览
添加回答
举报