为了账号安全,请及时绑定邮箱和手机立即绑定

加载带有和不带有 go-routines 的地图

加载带有和不带有 go-routines 的地图

Go
大话西游666 2022-04-26 15:01:30
这是我遇到的一个有趣的情况。在使用 go-routines 进行一些数据操作之后,我需要从文件中读取,并根据我们发现的内容填充地图。这是简化的问题陈述和示例:生成运行所需的数据gen_data.sh#!/bin/bash rm some.dat || : for i in `seq 1 10000`; do     echo "$i `date` tx: $RANDOM rx:$RANDOM" >> some.datdone如果我使用 将这些行读some.dat入map[int]string没有 go-routines 中loadtoDict.go,它会保持对齐。(因为第一个和第二个词是一样的,见下面的o/p。)在现实生活中,我确实需要在将线条加载到地图之前对其进行处理(昂贵),使用 go-routines 加快了我的字典创建速度,这是解决实际问题的重要要求。loadtoDict.gopackage mainimport (    "bufio"    "fmt"    "log"    "os")var (    fileName = "some.dat")func checkerr(err error) {    if err != nil {        fmt.Println(err)        log.Fatal(err)    }}func main() {    ourDict := make(map[int]string)    f, err := os.Open(fileName)    checkerr(err)    defer f.Close()    fscanner := bufio.NewScanner(f)    indexPos := 1    for fscanner.Scan() {        text := fscanner.Text()        //fmt.Println("text", text)        ourDict[indexPos] = text        indexPos++    }    for i, v := range ourDict {        fmt.Printf("%d: %s\n", i, v)    }}跑步:$ ./loadtoDict...8676: 8676 Mon Dec 23 15:52:24 PST 2019 tx: 17718 rx:11332234: 2234 Mon Dec 23 15:52:20 PST 2019 tx: 13170 rx:159623436: 3436 Mon Dec 23 15:52:21 PST 2019 tx: 17519 rx:54196177: 6177 Mon Dec 23 15:52:23 PST 2019 tx: 5731 rx:5449注意第一个和第二个词是如何“对齐”的。但是,如果我使用 go-routines 加载我的地图,这会出错:async_loadtoDict.gopackage mainimport (    "bufio"    "fmt"    "log"    "os"    "sync")var (    fileName = "some.dat"    mu       = &sync.RWMutex{}    MAX = 9000)func checkerr(err error) {    if err != nil {        fmt.Println(err)        log.Fatal(err)    }}
查看完整描述

3 回答

?
泛舟湖上清波郎朗

TA贡献1818条经验 获得超3个赞

您的信号量sem不起作用,因为您对其进行了深度缓冲。


一般来说,这是为此类任务设置地图的错误方法,因为读取文件会很慢。如果你有一个更复杂的任务——例如,读一行,想很多,设置一些东西——你会想要这个作为你的伪代码结构:


type workType struct {

    index int

    line  string

}


var wg sync.WaitGroup

wg.Add(nWorkers)

// I made this buffered originally but there's no real point, so

// fixing that in an edit

work := make(chan workType)

for i := 0; i < nWorkers; i++ {

    go readAndDoWork(work, &wg)

}


for i := 1; fscanner.Scan(); i++ {

    work <- workType{index: i, line: fscanner.Text()}

}

close(work)

wg.Wait()


... now your dictionary is ready ...

工人这样做:


func readAndDoWork(ch chan workType, wg *sync.WorkGroup) {

    for item := range ch {

        ... do computation ...

        insertIntoDict(item.index, result)

    }

    wg.Done()

}

抓住互斥锁(以insertIntoDict保护映射从索引到结果)并写入字典。(如果你愿意,你可以内联它。)


这里的想法是设置一定数量的工作人员——可能基于可用 CPU 的数量——每个工作人员抓取下一个工作项并处理它。主 goroutine 只是打包工作,然后关闭工作通道——这将导致所有工作人员看到输入结束——然后等待他们发出信号表明他们已经完成了计算。


(如果您愿意,您可以再创建一个 goroutine 来读取工作人员计算的结果并将它们放入映射中。这样您就不需要映射本身的互斥体。)


查看完整回答
反对 回复 2022-04-26
?
噜噜哒

TA贡献1784条经验 获得超7个赞

正如我在评论中提到的,您无法控制 goroutine 的执行顺序,因此不应从它们内部更改索引。


这是一个示例,其中与地图的交互在单个 goroutine 中,而您在其他 goroutine 中进行处理:


package main


import (

    "bufio"

    "fmt"

    "log"

    "os"

    "sync"

)


var (

    fileName = "some.dat"

    MAX      = 9000

)


func checkerr(err error) {

    if err != nil {

        fmt.Println(err)

        log.Fatal(err)

    }

}


type result struct {

    index int

    data string

}


func main() {

    ourDict := make(map[int]string)

    f, err := os.Open(fileName)

    checkerr(err)

    defer f.Close()


    fscanner := bufio.NewScanner(f)


    var wg sync.WaitGroup

    sem := make(chan struct{}, MAX) // Use empty structs for semaphores as they have no allocation

    defer close(sem)

    out := make(chan result)

    defer close(out)

    indexPos := 1


    for fscanner.Scan() {

        text := fscanner.Text()

        wg.Add(1)

        sem <- struct{}{}


        go func(index int, data string) {

            // Defer the release of your resources, otherwise if any error occur in your goroutine

            // you'll have a deadlock

            defer func() {

                wg.Done()

                <-sem

            }()

            // Process your data

            out <- result{index, data}

        }(indexPos, text) // Pass in the data that will change on the iteration, go optimizer will move it around better


        indexPos++

    }


    // The goroutine is the only one to write to the dict, so no race condition

    go func() {

        for {

            if entry, ok := <-out; ok {

                ourDict[entry.index] = entry.data

            } else {

                return // Exit goroutine when channel closes

            }

        }

    }()


    wg.Wait()


    for i, v := range ourDict {

        fmt.Printf("%d: %s\n", i, v)

    }


}


查看完整回答
反对 回复 2022-04-26
?
元芳怎么了

TA贡献1798条经验 获得超7个赞

好的,我已经弄清楚了。通过复制给goroutine一个值来挂起,似乎有效。


改变:


for fscanner.Scan() {

    text := fscanner.Text()

    wg.Add(1)

    sem <- 1

    go func() {

        mu.Lock()

        defer mu.Unlock()

        ourDict[indexPos] = text

        indexPos++

        <- sem

        wg.Done()

    }()


}


for fscanner.Scan() {

        text := fscanner.Text()

        wg.Add(1)

        sem <- 1

        go func(mypos int) {

                mu.Lock()

                defer mu.Unlock()

                ourDict[mypos] = text

                <-sem

                wg.Done()

        }(indexPos)

        indexPos++

}

完整代码: https: //play.golang.org/p/dkHaisPHyHz


使用工人池,


package main


import (

    "bufio"

    "fmt"

    "log"

    "os"

    "sync"

)


const (

    MAX      = 10

    fileName = "some.dat"

)


type gunk struct {

    line string

    id   int

}


func main() {

    ourDict := make(map[int]string)

    wg := sync.WaitGroup{}

    mu := sync.RWMutex{}


    cha := make(chan gunk)


    for i := 0; i < MAX; i++ {

        wg.Add(1)

        go func(id int) {

            defer wg.Done()

            for {

                textin, ok := <-cha

                if !ok {

                    return

                }

                mu.Lock()

                ourDict[textin.id] = textin.line

                mu.Unlock()

            }

        }(i)

    }


    f, err := os.Open(fileName)

    checkerr(err)

    defer f.Close()

    fscanner := bufio.NewScanner(f)

    indexPos := 1


    for fscanner.Scan() {

        text := fscanner.Text()

        thisgunk := gunk{line: text, id: indexPos}

        cha <- thisgunk

        indexPos++

    }


    close(cha)

    wg.Wait()

    for i, v := range ourDict {

        fmt.Printf("%d: %s\n", i, v)

    }


}


func checkerr(err error) {

    if err != nil {

        fmt.Println(err)

        log.Fatal(err)

    }

}



查看完整回答
反对 回复 2022-04-26
  • 3 回答
  • 0 关注
  • 105 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信