为了账号安全,请及时绑定邮箱和手机立即绑定

队列上有多路复用

队列上有多路复用

Go
莫回无 2021-04-27 09:49:38
如何queue.Queue同时在多个对象上进行“选择” ?Golang的频道具有所需的功能:select {case i1 = <-c1:    print("received ", i1, " from c1\n")case c2 <- i2:    print("sent ", i2, " to c2\n")case i3, ok := (<-c3):  // same as: i3, ok := <-c3    if ok {        print("received ", i3, " from c3\n")    } else {        print("c3 is closed\n")    }default:    print("no communication\n")}其中第一个要解除阻塞的通道执行相应的块。我将如何在Python中实现这一目标?更新0根据tux21b的答案中给出的链接,所需的队列类型具有以下属性:多生产者/多消费者队列(MPMC)提供按生产者的FIFO / LIFO当队列为空/完整的消费者/生产者被阻止时此外,渠道可能会被阻塞,生产者将阻塞,直到消费者取回该物品为止。我不确定Python的Queue是否可以做到这一点。
查看完整描述

3 回答

?
PIPIONE

TA贡献1829条经验 获得超9个赞

生产者-消费者队列有许多不同的实现方式,例如queue.Queue可用。它们通常具有许多不同的属性,例如Dmitry Vyukov在此出色的文章中列出的属性。如您所见,可能有超过1万种不同的组合。根据要求,用于此类队列的算法也相差很大。仅扩展现有队列算法以保证其他属性是不可能的,因为这通常需要不同的内部数据结构和不同的算法。


Go的频道提供了相对较高的保证属性,因此这些频道可能适用于许多程序。最困难的要求之一是支持一次读取/阻塞多个通道(select语句),并且如果select语句中可以有多个分支可以继续进行,则要公平地选择一个通道,这样就不会留下任何消息。 。Python的queue.Queue不提供此功能,因此根本无法使用它来存档相同的行为。


因此,如果要继续使用queue.Queue,则需要找到解决该问题的方法。但是,变通办法有其自身的缺点列表,并且较难维护。寻找另一个提供所需功能的生产者-消费者队列可能是一个更好的主意!无论如何,这是两个可能的解决方法:


轮询


while True:

  try:

    i1 = c1.get_nowait()

    print "received %s from c1" % i1

  except queue.Empty:

    pass

  try:

    i2 = c2.get_nowait()

    print "received %s from c2" % i2

  except queue.Empty:

    pass

  time.sleep(0.1)

在轮询通道时,这可能会占用大量CPU周期,并且在有很多消息时可能会变慢。使用具有指数退避时间(而不是此处显示的恒定0.1秒)的time.sleep()可能会大大改善此版本。


单个通知队列


queue_id = notify.get()

if queue_id == 1:

  i1 = c1.get()

  print "received %s from c1" % i1

elif queue_id == 2:

  i2 = c2.get()

  print "received %s from c2" % i2

使用此设置,您必须在发送到c1或c2之后将某些内容发送到通知队列。只要您只有一个这样的通知队列就足够了(即您没有多个“选择”,每个“选择”阻塞在您通道的不同子集上),这可能对您有用。


另外,您也可以考虑使用Go。无论如何,Go的goroutines和并发支持比Python的有限线程功能强大得多。


查看完整回答
反对 回复 2021-05-10
?
红颜莎娜

TA贡献1842条经验 获得超12个赞

如果使用queue.PriorityQueue,则可以使用通道对象作为优先级获得类似的行为:


import threading, logging

import random, string, time

from queue import PriorityQueue, Empty

from contextlib import contextmanager


logging.basicConfig(level=logging.NOTSET,

                    format="%(threadName)s - %(message)s")


class ChannelManager(object):

    next_priority = 0


    def __init__(self):

        self.queue = PriorityQueue()

        self.channels = []


    def put(self, channel, item, *args, **kwargs):

        self.queue.put((channel, item), *args, **kwargs)


    def get(self, *args, **kwargs):

        return self.queue.get(*args, **kwargs)


    @contextmanager

    def select(self, ordering=None, default=False):

        if default:

            try:

                channel, item = self.get(block=False)

            except Empty:

                channel = 'default'

                item = None

        else:

            channel, item = self.get()

        yield channel, item



    def new_channel(self, name):

        channel = Channel(name, self.next_priority, self)

        self.channels.append(channel)

        self.next_priority += 1

        return channel



class Channel(object):

    def __init__(self, name, priority, manager):

        self.name = name

        self.priority = priority

        self.manager = manager


    def __str__(self):

        return self.name


    def __lt__(self, other):

        return self.priority < other.priority


    def put(self, item):

        self.manager.put(self, item)



if __name__ == '__main__':

    num_channels = 3

    num_producers = 4

    num_items_per_producer = 2

    num_consumers = 3

    num_items_per_consumer = 3


    manager = ChannelManager()

    channels = [manager.new_channel('Channel#{0}'.format(i))

                for i in range(num_channels)]


    def producer_target():

        for i in range(num_items_per_producer):

            time.sleep(random.random())

            channel = random.choice(channels)

            message = random.choice(string.ascii_letters)

            logging.info('Putting {0} in {1}'.format(message, channel))

            channel.put(message)


    producers = [threading.Thread(target=producer_target,

                                  name='Producer#{0}'.format(i))

                 for i in range(num_producers)]

    for producer in producers:

        producer.start()

    for producer in producers:

        producer.join()

    logging.info('Producers finished')


    def consumer_target():

        for i in range(num_items_per_consumer):

            time.sleep(random.random())

            with manager.select(default=True) as (channel, item):

                if channel:

                    logging.info('Received {0} from {1}'.format(item, channel))

                else:

                    logging.info('No data received')


    consumers = [threading.Thread(target=consumer_target,

                                  name='Consumer#{0}'.format(i))

                 for i in range(num_consumers)]

    for consumer in consumers:

        consumer.start()

    for consumer in consumers:

        consumer.join()

    logging.info('Consumers finished')

输出示例:


Producer#0 - Putting x in Channel#2

Producer#2 - Putting l in Channel#0

Producer#2 - Putting A in Channel#2

Producer#3 - Putting c in Channel#0

Producer#3 - Putting z in Channel#1

Producer#1 - Putting I in Channel#1

Producer#1 - Putting L in Channel#1

Producer#0 - Putting g in Channel#1

MainThread - Producers finished

Consumer#1 - Received c from Channel#0

Consumer#2 - Received l from Channel#0

Consumer#0 - Received I from Channel#1

Consumer#0 - Received L from Channel#1

Consumer#2 - Received g from Channel#1

Consumer#1 - Received z from Channel#1

Consumer#0 - Received A from Channel#2

Consumer#1 - Received x from Channel#2

Consumer#2 - Received None from default

MainThread - Consumers finished

在这个例子中,ChannelManager只是一个包装器queue.PriorityQueue,将select方法实现contextmanager为使其看起来类似于selectGo中的语句。

注意事项:

  • 定购

    • 在Go示例中,select如果有多个通道可用的数据,则在语句中写入通道的顺序确定将执行哪个通道的代码。

    • 在python示例中,顺序由分配给每个通道的优先级确定。但是,可以将优先级动态分配给每个通道(如示例中所示),因此可以使用更复杂的select方法来更改顺序,该方法将根据该方法的参数来分配新的优先级。同样,一旦上下文管理器完成,可以重新建立旧的顺序。

  • 封锁

    • 在Go示例中,select如果default存在案例,则该语句将阻塞。

    • 在python示例中,必须将boolean参数传递给该select方法,以在需要阻止/非阻止时使其清晰可见。在非阻塞情况下,上下文管理器返回的通道只是字符串,'default'因此在内部代码中很容易在with语句内部的代码中检测到此情况。

  • 线程:queue如示例中所示,模块中的对象已经为多生产者,多消费者的场景做好了准备。


查看完整回答
反对 回复 2021-05-10
?
SMILET

TA贡献1796条经验 获得超4个赞

pychan项目复制在Python围棋频道,包括复用。它实现了与Go相同的算法,因此符合您所有需要的属性:

  • 多个生产者和消费者可以通过Chan进行交流。当生产者和消费者都准备就绪时,他们对

  • 生产者和消费者按到达顺序得到服务(FIFO)

  • 空(满)队列将阻止使用者(生产者)。

您的示例如下所示:

c1 = Chan(); c2 = Chan(); c3 = Chan()


try:

    chan, value = chanselect([c1, c3], [(c2, i2)])

    if chan == c1:

        print("Received %r from c1" % value)

    elif chan == c2:

        print("Sent %r to c2" % i2)

    else:  # c3

        print("Received %r from c3" % value)

except ChanClosed as ex:

    if ex.which == c3:

        print("c3 is closed")

    else:

        raise

(完全公开:我写了这个库)


查看完整回答
反对 回复 2021-05-10
  • 3 回答
  • 0 关注
  • 229 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信