为了账号安全,请及时绑定邮箱和手机立即绑定

Golang 在 goroutine 之间共享大量数据

Golang 在 goroutine 之间共享大量数据

Go
红颜莎娜 2022-01-10 18:47:13
我需要从另一个 goroutine 读取结构字段集,afaik 直接这样做,即使确定不会有并发访问(在读取发生之前写入完成,通过 发出信号chan struct{})可能会导致数据陈旧考虑到我可以保证没有并发访问,发送指向结构的指针(在第一个 goroutine 中创建,在第二个 goroutine 中修改,由第三个读取)会解决可能的过时问题吗?我想避免复制,因为结构很大并且包含填充在第二个 goroutine 中的巨大 Bytes.Buffer,我需要从第三个读取有一个锁定选项,但考虑到我知道不会有并发访问,这似乎有点过头了
查看完整描述

2 回答

?
扬帆大鱼

TA贡献1799条经验 获得超9个赞

这个有很多答案,这取决于你的数据结构和程序逻辑。

请参阅: 如何在并发 goroutines 期间锁定/同步对 Go 中变量的访问?
以及: 如何在 Golang 中使用 RWMutex?

1- 使用 有状态的 Goroutine和通道
2- 使用sync.Mutex
3- 使用同步/原子
4- 使用 WaitGroup
5- 使用程序逻辑(信号量
...


1:有状态的 Goroutines和通道:
我模拟了非常相似的示例(假设您想从一个 SSD 读取并以不同的速度写入另一个 SSD):
在这个示例代码中,一个 goroutine(名为 write)做了一些工作准备数据并填充大struct,另一个 goroutine(名为 read)从 big struct 读取数据然后做一些工作,而 manger goroutine 保证不会并发访问相同的数据。三个 goroutine 之间的通信是通过通道完成的。在您的情况下,您可以将指针用于通道数据,或像此示例这样的全局结构。
输出将是这样的:
mean= 36.6920166015625 stdev= 6.068973186592054

我希望这可以帮助您了解这个想法。
工作示例代码:

package main


import (

    "fmt"

    "math"

    "math/rand"

    "runtime"

    "sync"

    "time"

)


type BigStruct struct {

    big     []uint16

    rpos    int

    wpos    int

    full    bool

    empty   bool

    stopped bool

}


func main() {

    wg.Add(1)

    go write()

    go read()

    go manage()

    runtime.Gosched()

    stopCh <- <-time.After(5 * time.Second)

    wg.Wait()

    mean := Mean(hist)

    stdev := stdDev(hist, mean)

    fmt.Println("mean=", mean, "stdev=", stdev)

}


const N = 1024 * 1024 * 1024


var wg sync.WaitGroup

var stopCh chan time.Time = make(chan time.Time)


var hist []int = make([]int, 65536)


var s *BigStruct = &BigStruct{empty: true,

    big: make([]uint16, N), //2GB

}


var rc chan uint16 = make(chan uint16)

var wc chan uint16 = make(chan uint16)


func next(pos int) int {

    pos++

    if pos >= N {

        pos = 0

    }

    return pos

}


func manage() {

    dataReady := false

    var data uint16

    for {

        if !dataReady && !s.empty {

            dataReady = true

            data = s.big[s.rpos]

            s.rpos++

            if s.rpos >= N {

                s.rpos = 0

            }

            s.empty = s.rpos == s.wpos

            s.full = next(s.wpos) == s.rpos

        }

        if dataReady {

            select {

            case rc <- data:

                dataReady = false

            default:

                runtime.Gosched()

            }

        }

        if !s.full {

            select {

            case d := <-wc:

                s.big[s.wpos] = d

                s.wpos++

                if s.wpos >= N {

                    s.wpos = 0

                }

                s.empty = s.rpos == s.wpos

                s.full = next(s.wpos) == s.rpos

            default:

                runtime.Gosched()

            }

        }

        if s.stopped {

            if s.empty {

                wg.Done()

                return

            }

        }


    }

}


func read() {

    for {

        d := <-rc

        hist[d]++

    }

}


func write() {

    for {

        wc <- uint16(rand.Intn(65536))

        select {

        case <-stopCh:

            s.stopped = true

            return

        default:

            runtime.Gosched()

        }

    }

}


func stdDev(data []int, mean float64) float64 {

    sum := 0.0

    for _, d := range data {

        sum += math.Pow(float64(d)-mean, 2)

    }

    variance := sum / float64(len(data)-1)

    return math.Sqrt(variance)

}

func Mean(data []int) float64 {

    sum := 0.0

    for _, d := range data {

        sum += float64(d)

    }

    return sum / float64(len(data))

}

5:某些用例的另一种方式(更快):

这里使用共享数据结构的另一种方式来读取作业/写入作业/处理作业,它在第一篇文章中被分开,现在这里做同样的 3 个没有通道和没有互斥体的工作。


工作样本:


package main


import (

    "fmt"

    "math"

    "math/rand"

    "time"

)


type BigStruct struct {

    big     []uint16

    rpos    int

    wpos    int

    full    bool

    empty   bool

    stopped bool

}


func manage() {

    for {

        if !s.empty {

            hist[s.big[s.rpos]]++ //sample read job with any time len

            nextPtr(&s.rpos)

        }

        if !s.full && !s.stopped {

            s.big[s.wpos] = uint16(rand.Intn(65536)) //sample wrire job with any time len

            nextPtr(&s.wpos)

        }

        if s.stopped {

            if s.empty {

                return

            }

        } else {

            s.stopped = time.Since(t0) >= 5*time.Second

        }

    }

}


func main() {

    t0 = time.Now()

    manage()

    mean := Mean(hist)

    stdev := StdDev(hist, mean)

    fmt.Println("mean=", mean, "stdev=", stdev)

    d0 := time.Since(t0)

    fmt.Println(d0) //5.8523347s

}


var t0 time.Time


const N = 100 * 1024 * 1024


var hist []int = make([]int, 65536)


var s *BigStruct = &BigStruct{empty: true,

    big: make([]uint16, N), //2GB

}


func next(pos int) int {

    pos++

    if pos >= N {

        pos = 0

    }

    return pos

}

func nextPtr(pos *int) {

    *pos++

    if *pos >= N {

        *pos = 0

    }


    s.empty = s.rpos == s.wpos

    s.full = next(s.wpos) == s.rpos

}


func StdDev(data []int, mean float64) float64 {

    sum := 0.0

    for _, d := range data {

        sum += math.Pow(float64(d)-mean, 2)

    }

    variance := sum / float64(len(data)-1)

    return math.Sqrt(variance)

}

func Mean(data []int) float64 {

    sum := 0.0

    for _, d := range data {

        sum += float64(d)

    }

    return sum / float64(len(data))

}




查看完整回答
反对 回复 2022-01-10
?
largeQ

TA贡献2039条经验 获得超7个赞

为了防止在保留读取能力的同时对结构进行并发修改,您通常会嵌入一个sync.RWMutex。这不是豁免。您可以在传输过程中简单地锁定结构以进行写入,并在您方便的时间点将其解锁。


package main


import (

    "fmt"

    "sync"

    "time"

)


// Big simulates your big struct

type Big struct {

    sync.RWMutex

    value string

}


// pump uses a groutine to take the slice of pointers to Big,

// locks the underlying structs and sends the pointers to

// the locked instances of Big downstream

func pump(bigs []*Big) chan *Big {


    // We make the channel buffered for this example

    // for illustration purposes

    c := make(chan *Big, 3)


    go func() {

        for _, big := range bigs {

            // We lock the struct before sending it to the channel

            // so it can not be changed via pointer while in transit

            big.Lock()

            c <- big

        }

        close(c)

    }()


    return c

}


// sink reads pointers to the locked instances of Big

// reads them and unlocks them

func sink(c chan *Big) {


    for big := range c {

        fmt.Println(big.value)

        time.Sleep(1 * time.Second)

        big.Unlock()


    }

}


// modify tries to achieve locks to the instances and modify them

func modify(bigs []*Big) {

    for _, big := range bigs {


        big.Lock()

        big.value = "modified"

        big.Unlock()

    }

}


func main() {


    bigs := []*Big{&Big{value: "Foo"}, &Big{value: "Bar"}, &Big{value: "Baz"}}

    c := pump(bigs)


    // For the sake of this example, we wait until all entries are

    // send into the channel and hence are locked

    time.Sleep(1 * time.Second)


    // Now we try to modify concurrently before we even start to read

    // the struct of which the pointers were sent into the channel

    go modify(bigs)

    sink(c)


    // We use sleep here to keep waiting for modify() to finish simple.

    // Usually, you'd use a sync.waitGroup

    time.Sleep(1 * time.Second)


    for _, big := range bigs {

        fmt.Println(big.value)

    }


}


查看完整回答
反对 回复 2022-01-10
  • 2 回答
  • 0 关注
  • 228 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信