为了账号安全,请及时绑定邮箱和手机立即绑定

如何等待执行

如何等待执行

Go
忽然笑 2021-11-29 16:59:40
我有一个大型日志文件,您想对其进行并行分析。我有这个代码:package mainimport (    "bufio"    "fmt"    "os"    "time")func main() {    filename := "log.txt"    threads := 10    // Read the  file    file, err := os.Open(filename)    if err != nil {        fmt.Println("Could not open file with the database.")        os.Exit(1)    }    defer file.Close()    scanner := bufio.NewScanner(file)    // Channel for strings    tasks := make(chan string)    // Run the threads that catch events from the channel and understand one line of the log file    for i := 0; i < threads; i++ {        go parseStrings(tasks)    }    // Start a thread load lines from a file into the channel    go getStrings(scanner, tasks)    // At this point I have to wait until all of the threads executed    // For example, I set the sleep    for {        time.Sleep(1 * time.Second)    }}func getStrings(scanner *bufio.Scanner, tasks chan<- string) {    for scanner.Scan() {        s := scanner.Text()        tasks <- s    }}func parseStrings(tasks <-chan string) {    for {        s := <-tasks        event := parseLine(s)        fmt.Println(event)    }}func parseLine(line string) string {    return line}实际上,当我等待所有线程结束时?有人建议我创建一个单独的线程,在其中添加结果,但如何添加?
查看完整描述

3 回答

?
繁华开满天机

TA贡献1816条经验 获得超4个赞

使用管道模式和“扇出/扇入”模式:


package main


import (

    "bufio"

    "fmt"

    "strings"

    "sync"

)


func main() {

    file := "here is first line\n" +

        "here is second line\n" +

        "here is line 3\n" +

        "here is line 4\n" +

        "here is line 5\n" +

        "here is line 6\n" +

        "here is line 7\n"

    scanner := bufio.NewScanner(strings.NewReader(file))


    // all lines onto one channel

    in := getStrings(scanner)


    // FAN OUT

    // Multiple functions reading from the same channel until that channel is closed

    // Distribute work across multiple functions (ten goroutines) that all read from in.

    xc := fanOut(in, 10)


    // FAN IN

    // multiplex multiple channels onto a single channel

    // merge the channels from c0 through c9 onto a single channel

    for n := range merge(xc) {

        fmt.Println(n)

    }

}


func getStrings(scanner *bufio.Scanner) <-chan string {

    out := make(chan string)

    go func() {

        for scanner.Scan() {

            out <- scanner.Text()

        }

        close(out)

    }()

    return out

}


func fanOut(in <-chan string, n int) []<-chan string {

    var xc []<-chan string

    for i := 0; i < n; i++ {

        xc = append(xc, parseStrings(in))

    }

    return xc

}


func parseStrings(in <-chan string) <-chan string {

    out := make(chan string)

    go func() {

        for n := range in {

            out <- parseLine(n)

        }

        close(out)

    }()

    return out

}


func parseLine(line string) string {

    return line

}


func merge(cs []<-chan string) <-chan string {

    var wg sync.WaitGroup

    wg.Add(len(cs))


    out := make(chan string)

    for _, c := range cs {

        go func(c <-chan string) {

            for n := range c {

                out <- n

            }

            wg.Done()

        }(c)

    }


    go func() {

        wg.Wait()

        close(out)

    }()

    return out

}


查看完整回答
反对 回复 2021-11-29
?
眼眸繁星

TA贡献1873条经验 获得超9个赞

只需使用sync.WaitGroup


package main


import(

    "sync"

)


func stuff(wg *sync.WaitGroup) {

    defer wg.Done() // tell the WaitGroup it's done

    /* stuff */

}


func main() {

    count := 50

    wg := new(sync.WaitGroup)

    wg.Add(count) // add number of gorutines to the WaitGroup

    for i := 0; i < count; i++ {

        go stuff(wg)

    }

    wg.Wait() // wait for all

}


查看完整回答
反对 回复 2021-11-29
?
肥皂起泡泡

TA贡献1829条经验 获得超6个赞

var wg sync.WaitGroup

启动每个 goroutine 时执行以下操作:

wg.Add(1)

当 goroutine 工作完成时,计数器递减

wg.Done()

结果,而不是

for {
    time.Sleep(1 * time.Second)}

 wg.Wait()


查看完整回答
反对 回复 2021-11-29
  • 3 回答
  • 0 关注
  • 153 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信