为了账号安全,请及时绑定邮箱和手机立即绑定

多生产者/多消费者并发

多生产者/多消费者并发

Go
SMILET 2021-09-27 17:23:08
我可能遗漏了一些东西,或者不了解 Go 如何处理并发(或者我对并发本身的了解),我设计了一些代码来理解多个生产者/消费者。这是代码:package mainimport (    "fmt"    "time"    // "math/rand"    "sync")var seq uint64 = 0var generatorChan chan uint64var requestChan chan uint64func makeTimestamp() int64 {    return time.Now().UnixNano() / int64(time.Millisecond)}func generateStuff(genId int) {    var crap uint64    for {        crap = <-requestChan        // <- requestChan        seq = seq+1        fmt.Println("Gen ", genId, " - From : ", crap, " @", makeTimestamp())        generatorChan <- uint64(seq)    }}func concurrentPrint(id int, work *sync.WaitGroup) {    defer work.Done()    for i := 0; i < 5; i++ {        requestChan<-uint64(id)        fmt.Println("Conc", id, ": ", <-generatorChan)    }}func main() {    generatorChan = make(chan uint64)    requestChan = make(chan uint64)    var wg sync.WaitGroup    for i := 0; i < 20; i++ {        go generateStuff(i)    }    maximumWorker := 200    wg.Add(maximumWorker)    for i := 0; i < maximumWorker; i++ {        go concurrentPrint(i, &wg)    }    wg.Wait()}运行时,它会打印(主要是按顺序)从 1 到 1000 的所有数字(200 个消费者每个获得一个数字 5 次)。我原以为某些消费者会打印完全相同的数字,但即使有 20 个 goroutine 为generateStuff服务,通过增加全局变量来生成数字,但requestChan似乎正在像一个障碍一样工作,以防止这种情况发生。一般来说,我对 Go 或 Concurrency 有什么误解?我原以为会出现两个类似generateStuff类型的 go 例程会被一起唤醒并同时增加 seq 的情况,从而导致两个消费者两次打印相同的数字。在 playgolang 上编辑代码:http ://play.golang.org/p/eRzNXjdxtZ
查看完整描述

2 回答

?
收到一只叮咚

TA贡献1821条经验 获得超4个赞

您有多个可以同时运行的工作人员,并且所有工作人员都可以同时尝试和发出请求。由于requestChan是无缓冲的,它们都阻塞等待读取器同步并接受他们的请求。

您有多个生成器,它们将通过 与请求者同步requestChan,生成结果,然后阻塞无缓冲,generatorChan直到工作人员读取结果。请注意,它可能是不同的工人。

没有额外的同步,所以其他一切都是不确定的。

  • 一个生成器可以处理所有请求。

  • 生成器可以获取请求并seq 在任何其他生成器有机会运行之前通过递增。只有一个处理器,这甚至是可能的。

  • 所有的生成器都可以获取请求,并且最终都想要seq在完全相同的时间递增,从而导致各种问题。

  • 工作人员可以从他们碰巧发送到或来自完全不同的生成器的同一生成器中获得响应。

通常,如果不添加同步来强制执行这些行为中的一种,则无法确保其中任何一种行为确实发生。

请注意,对于数据竞争,这本身就是另一个非确定性事件。有可能获得任意值、程序崩溃等。假设在竞争条件下该值可能只是被一个或一些这样的相对无害的结果关闭是不安全的。

对于试验,您能做的最好的事情就是加速GOMAXPROCS。通过环境变量(例如类似env GOMAXPROCS=16 go run foo.goenv GOMAXPROCS=16 ./foo之后go build)或runtime.GOMAXPROCS(16)从您的程序调用。默认值为 1,这意味着可能会隐藏数据竞争或其他“奇怪”行为。

您还可以通过在不同的点添加调用runtime.Goschedtime.Sleep在不同的点上来影响一些事情。

如果您使用竞争检测器(例如使用go run -race foo.googo build -race),您还可以看到数据竞争。程序不仅应该在退出时显示“Found 1 data race(s)”,而且还应该在首次检测到竞争时转储大量带有堆栈跟踪的详细信息。

这是用于实验的代码的“清理”版本:

package main


import (

    "log"

    "sync"

    "sync/atomic"

)


var seq uint64 = 0

var generatorChan = make(chan uint64)

var requestChan = make(chan uint64)


func generator(genID int) {

    for reqID := range requestChan {

        // If you want to see a data race:

        //seq = seq + 1

        // Else:

        s := atomic.AddUint64(&seq, 1)

        log.Printf("Gen: %2d, from %3d", genID, reqID)

        generatorChan <- s

    }

}


func worker(id int, work *sync.WaitGroup) {

    defer work.Done()


    for i := 0; i < 5; i++ {

        requestChan <- uint64(id)

        log.Printf("\t\t\tWorker: %3d got %4d", id, <-generatorChan)

    }

}


func main() {

    log.SetFlags(log.Lmicroseconds)

    const (

        numGen    = 20

        numWorker = 200

    )

    var wg sync.WaitGroup

    for i := 0; i < numGen; i++ {

        go generator(i)

    }

    wg.Add(numWorker)

    for i := 0; i < numWorker; i++ {

        go worker(i, &wg)

    }

    wg.Wait()

    close(requestChan)

}

Playground(但请注意,playground 上的时间戳没有用,调用runtime.MAXPROCS可能没有任何作用)。进一步注意,playground 会缓存结果,因此重新运行完全相同的程序将始终显示相同的输出,您需要进行一些小的更改或仅在您自己的机器上运行它。


很多小的变化,比如关闭生成器,使用logvsfmt因为前者可以保证并发性,消除数据竞争,使输出看起来更好等。


查看完整回答
反对 回复 2021-09-27
?
拉莫斯之舞

TA贡献1820条经验 获得超10个赞

渠道类型

通道为并发执行函数提供了一种机制,通过发送和接收指定元素类型的值来进行通信。未初始化通道的值为 nil。

可以使用内置函数 make 创建一个新的初始化通道值,该函数将通道类型和可选容量作为参数:

make(chan int, 100)

容量(以元素数为单位)设置通道中缓冲区的大小。如果容量为零或不存在,则通道没有缓冲,只有当发送方和接收方都准备好时,通信才能成功。否则,如果缓冲区未满(发送)或非空(接收),则通道被缓冲并且通信成功而不会阻塞。一个 nil 通道永远不会准备好进行通信。

您正在使用无缓冲通道来限制通道通信。

例如,

generatorChan = make(chan uint64)
requestChan = make(chan uint64)


查看完整回答
反对 回复 2021-09-27
  • 2 回答
  • 0 关注
  • 217 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信