为了账号安全,请及时绑定邮箱和手机立即绑定

如何处理可以无阻塞增长的队列

如何处理可以无阻塞增长的队列

Go
郎朗坤 2023-03-21 16:05:15
如果队列可以从处理函数本身增长,我试图了解如何在 Go 中处理队列。请参阅下面的代码。在此伪代码中,我想将我创建的处理程序数量限制为 10。因此我创建了 10 个处理队列的处理程序。然后我用一个 url 开始排队。我的问题是,根据文档,sender通道将阻塞,直到接收方接收到数据。在下面的代码中,每个进程都是一个处理新 url 的接收器。然而,很容易看出,如果一个进程向队列发送 11 个链接,它将阻塞,直到所有接收者都处理完这些新链接。如果这些接收者每个都有 1 个链接,那么它们也会在将新的 1 个链接发送到队列时阻塞。由于每个人都被阻止,所以什么都没有完成。我想知道 go 的一般解决方案是什么,用于处理可以从进程本身增长的队列。请注意,我认为我可以通过锁定名为 的数组来执行此操作queue,但我正在尝试了解如何使用通道来完成此操作。var queue = make(chan string)func process(){    for currentURL := range queue {        links, _ := ... // some http call that gets links from a url        for _, link := links {            queue <- link        }    }}func main () {   for i :=0; i < 10; i++ {        go process()   }   queue <- "https://stackoverflow.com"   ...   // block until receive some quit message   <-quit }
查看完整描述

2 回答

?
拉丁的传说

TA贡献1789条经验 获得超8个赞

您可以使用的一种简单方法是将用于将链接添加到频道的代码移动到它自己的 go 例程中。这样,您的主要处理可以继续,而阻塞的通道写入将阻塞一个单独的 go 例程。


func process(){

    for currentURL := range queue {

        links, _ := ... // some http call that gets links from a url

        for _, link := links {

            l := link // this is important! ...

            // the loop will re-set the value of link before the go routine is started


            go func(l) {

                queue <- link // we'll be blocked here...

                // but the "parent" routine can still iterate through the channel

                // which in turn un-blocks the write

            }(l)

        }

    }

}

使用信号量示例编辑以限制 go 例程:


func main () {

    maxWorkers := 5000

    sem := semaphore.NewWeighted(int64(maxWorkers))

    ctx := context.TODO()

    for i :=0; i < 10; i++ {

        go process(ctx)

    }


    queue <- "https://stackoverflow.com"

    // block until receive some quit message

    <-quit 

}


func process(ctx context.Context){

    for currentURL := range queue {

        links, _ := ... // some http call that gets links from a url

        for _, link := links {

            l := link // this is important! ...

            // the loop will re-set the value of link before the go routine is started


            // acquire a go routine...

            // if we are at the routine limit, this line will block until one becomes available

            sem.Acquire(ctx, 1)

            go func(l) {

                defer sem.Release(1)

                queue <- link // we'll be blocked here...

                // but the "parent" routine can still iterate through the channel

                // which in turn un-blocks the write

            }(l)

        }

    }

}

但是这个选项最终可能会导致死锁...假设所有的 go 例程都已声明,父循环可能会被锁定在sem.Acquire. 这将导致子例程永远不会添加到通道中,因此永远不会执行 deferred sem.Release。在我的脑海中,我正在努力想出一个很好的方法来处理这个问题。也许是外部内存队列而不是通道?


查看完整回答
反对 回复 2023-03-21
?
弑天下

TA贡献1818条经验 获得超8个赞

有两件事你可以做,要么使用缓冲通道不阻塞,即使另一端没有人接收。这样您就可以立即刷新通道内的值。


一种更有效的方法是检查通道中是否有任何可用值,或者通道是否关闭,这应该由发送方在发送所有值时关闭。


接收者可以通过为接收表达式分配第二个参数来测试通道是否已关闭。


v, ok := <-ch 

ok如果false没有更多的值可以接收并且通道关闭。使用 select as 检查通道内的值


package main


import (

    "fmt"

    "sync"

)


var queue = make(chan int)

var wg sync.WaitGroup


func process(){

        values := []int{1,2,5,3,9,7}

        for _, value := range values {

            queue <- value        

        }

}


func main () {

   for i :=0; i < 10; i++ {

        go process()

   }

   wg.Add(1)

   go func(){

      defer wg.Done()

      for j:=0;j<30;j++ {

          select {

             case <-queue:

        fmt.Println(<-queue)

          } 

      }

   }()

   wg.Wait()

   close(queue)

}


查看完整回答
反对 回复 2023-03-21
  • 2 回答
  • 0 关注
  • 74 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信