2 回答
TA贡献1789条经验 获得超8个赞
您可以使用的一种简单方法是将用于将链接添加到频道的代码移动到它自己的 go 例程中。这样,您的主要处理可以继续,而阻塞的通道写入将阻塞一个单独的 go 例程。
func process(){
for currentURL := range queue {
links, _ := ... // some http call that gets links from a url
for _, link := links {
l := link // this is important! ...
// the loop will re-set the value of link before the go routine is started
go func(l) {
queue <- link // we'll be blocked here...
// but the "parent" routine can still iterate through the channel
// which in turn un-blocks the write
}(l)
}
}
}
使用信号量示例编辑以限制 go 例程:
func main () {
maxWorkers := 5000
sem := semaphore.NewWeighted(int64(maxWorkers))
ctx := context.TODO()
for i :=0; i < 10; i++ {
go process(ctx)
}
queue <- "https://stackoverflow.com"
// block until receive some quit message
<-quit
}
func process(ctx context.Context){
for currentURL := range queue {
links, _ := ... // some http call that gets links from a url
for _, link := links {
l := link // this is important! ...
// the loop will re-set the value of link before the go routine is started
// acquire a go routine...
// if we are at the routine limit, this line will block until one becomes available
sem.Acquire(ctx, 1)
go func(l) {
defer sem.Release(1)
queue <- link // we'll be blocked here...
// but the "parent" routine can still iterate through the channel
// which in turn un-blocks the write
}(l)
}
}
}
但是这个选项最终可能会导致死锁...假设所有的 go 例程都已声明,父循环可能会被锁定在sem.Acquire. 这将导致子例程永远不会添加到通道中,因此永远不会执行 deferred sem.Release。在我的脑海中,我正在努力想出一个很好的方法来处理这个问题。也许是外部内存队列而不是通道?
TA贡献1818条经验 获得超8个赞
有两件事你可以做,要么使用缓冲通道不阻塞,即使另一端没有人接收。这样您就可以立即刷新通道内的值。
一种更有效的方法是检查通道中是否有任何可用值,或者通道是否关闭,这应该由发送方在发送所有值时关闭。
接收者可以通过为接收表达式分配第二个参数来测试通道是否已关闭。
v, ok := <-ch
ok如果false没有更多的值可以接收并且通道关闭。使用 select as 检查通道内的值
package main
import (
"fmt"
"sync"
)
var queue = make(chan int)
var wg sync.WaitGroup
func process(){
values := []int{1,2,5,3,9,7}
for _, value := range values {
queue <- value
}
}
func main () {
for i :=0; i < 10; i++ {
go process()
}
wg.Add(1)
go func(){
defer wg.Done()
for j:=0;j<30;j++ {
select {
case <-queue:
fmt.Println(<-queue)
}
}
}()
wg.Wait()
close(queue)
}
- 2 回答
- 0 关注
- 74 浏览
添加回答
举报