为了账号安全,请及时绑定邮箱和手机立即绑定

如何从主线程退出

如何从主线程退出

Go
繁花不似锦 2023-08-07 10:55:32
func GoCountColumns(in chan []string, r chan Result, quit chan int) {    for {        select {        case data := <-in:            r <- countColumns(data) // some calculation function        case <-quit:            return // stop goroutine        }    }}func main() {    fmt.Println("Welcome to the csv Calculator")    file_path := os.Args[1]    fd, _ := os.Open(file_path)    reader := csv.NewReader(bufio.NewReader(fd))    var totalColumnsCount int64 = 0    var totallettersCount int64 = 0    linesCount := 0    numWorkers := 10000    rc := make(chan Result, numWorkers)    in := make(chan []string, numWorkers)    quit := make(chan int)    t1 := time.Now()    for i := 0; i < numWorkers; i++ {        go GoCountColumns(in, rc, quit)    }    //start worksers    go func() {        for {            record, err := reader.Read()            if err == io.EOF {                break            }            if err != nil {                log.Fatal(err)            }            if linesCount%1000000 == 0 {                fmt.Println("Adding to the channel")            }            in <- record            //data := countColumns(record)            linesCount++            //totalColumnsCount = totalColumnsCount + data.ColumnCount            //totallettersCount = totallettersCount + data.LettersCount        }        close(in)    }()    for i := 0; i < numWorkers; i++ {        quit <- 1 // quit goroutines from main    }    close(rc)    for i := 0; i < linesCount; i++ {        data := <-rc        totalColumnsCount = totalColumnsCount + data.ColumnCount        totallettersCount = totallettersCount + data.LettersCount    }    fmt.Printf("I counted %d lines\n", linesCount)    fmt.Printf("I counted %d columns\n", totalColumnsCount)    fmt.Printf("I counted %d letters\n", totallettersCount)    elapsed := time.Now().Sub(t1)    fmt.Printf("It took %f seconds\n", elapsed.Seconds())}My Hello World 是一个读取 csv 文件并将其传递到通道的程序。然后 goroutine 应该从这个通道消费。我的问题是我不知道如何从主线程检测所有数据都已处理并且我可以退出程序。
查看完整描述

3 回答

?
慕桂英546537

TA贡献1848条经验 获得超10个赞

在其他答案之上。


请(非常)小心,关闭通道应该发生在写入调用站点上,而不是读取调用站点上。在正在写入的GoCountColumns通道中r,关闭通道的责任落在GoCountColumns函数上。技术原因是,它是唯一确定该通道将不再被写入的参与者,因此可以安全关闭。

    func GoCountColumns(in chan []string, r chan Result, quit chan int) {

        defer close(r)     // this line.

        for {

            select {

            case data := <-in:

                r <- countColumns(data) // some calculation function

            case <-quit:

                return // stop goroutine

            }

        }

    }

如果我可以说,函数参数命名约定是将目标作为第一个参数,将源作为第二个参数,然后使用其他参数。GoCountColumns优选地写成:

    func GoCountColumns(dst chan Result, src chan []string, quit chan int) {

        defer close(dst)

        for {

            select {

            case data := <-src:

                dst <- countColumns(data) // some calculation function

            case <-quit:

                return // stop goroutine

            }

        }

    }

quit您在流程开始后立即致电。这是不合逻辑的。该quit命令是强制退出序列,一旦检测到退出信号就应该调用它,以尽可能以最佳状态(可能全部损坏)强制退出当前处理。换句话说,您应该依赖该signal.Notify包来捕获退出事件,并通知您的工作人员退出。请参阅https://golang.org/pkg/os/signal/#example_Notify

为了编写更好的并行代码,首先列出管理程序生命周期所需的例程,确定需要阻塞的例程以确保程序在退出之前完成。


在您的代码中,存在read, map。为了确保处理完整,程序主函数必须确保在退出时捕获信号,map然后再退出。请注意,该read功能并不重要。


然后,您还需要从用户输入捕获退出事件所需的代码。


总的来说,我们似乎需要阻止两个事件来管理生命周期。示意性地说,


func main(){

    go read()

    go map(mapDone)

    go signal()

    select {

        case <-mapDone:

        case <-sig:

    }

}

这个简单的代码很好process or die。事实上,当捕获到用户事件时,程序立即退出,而不给其他例程机会执行停止时所需的操作。


为了改善这些行为,您首先需要一种方法来表明程序想要离开其他例程,其次需要一种方法来等待这些例程在离开之前完成其停止序列。


要发出退出事件或取消信号,您可以使用 a context.Context,将其传递给工作人员,让他们听。


再次,示意性地,


func main(){

    ctx,cancel := context.WithCancel(context.WithBackground())

    go read(ctx)

    go map(ctx,mapDone)

    go signal()

    select {

        case <-mapDone:

        case <-sig:

            cancel()

    }

}

(稍后将详细阅读和绘制地图)


要等待完成,很多事情都是可能的,只要它们是线程安全的。通常,sync.WaitGroup使用 a。或者,在像您这样的情况下,只有一个例程需要等待,我们可以重新使用当前mapDone通道。


func main(){

    ctx,cancel := context.WithCancel(context.WithBackground())

    go read(ctx)

    go map(ctx,mapDone)

    go signal()

    select {

        case <-mapDone:

        case <-sig:

            cancel()

            <-mapDone

    }

}

这很简单也很直接。但这并不完全正确。最后一个mapDone chan可能会永远阻塞并使程序无法停止。因此,您可以实现第二个信号处理程序或超时。


示意性地,超时解决方案是


func main(){

    ctx,cancel := context.WithCancel(context.WithBackground())

    go read(ctx)

    go map(ctx,mapDone)

    go signal()

    select {

        case <-mapDone:

        case <-sig:

            cancel()

            select {

                case <-mapDone:

                case <-time.After(time.Second):

            }

    }

}

您还可以在最后一次选择中累积信号处理和超时。


最后,有几件事要讲read和map上下文聆听。


首先map,实现需要定期读取context.Done通道来检测cancellation。


这是简单的部分,只需要更新 select 语句。


    func GoCountColumns(ctx context.Context, dst chan Result, src chan []string) {

        defer close(dst)

        for {

            select {

            case <-ctx.Done():

                <-time.After(time.Minute) // do something more useful.

                return // quit. Notice the defer will be called.

            case data := <-src:

                dst <- countColumns(data) // some calculation function

            }

        }

    }

现在这read部分有点棘手,因为它是一个 IO,它不提供select强大的编程接口,并且监听上下文通道取消可能看起来很矛盾。这是。由于 IO 是阻塞的,因此无法侦听上下文。并且在从上下文通道读取时,无法读取 IO。在您的情况下,解决方案需要了解您的读取循环与您的程序生命周期无关(还记得我们只监听mapDone吗?),并且我们可以忽略上下文。


在其他情况下,例如,如果您想在读取最后一个字节时重新启动(因此在每次读取时,我们都会增加 n,计算字节数,并且我们希望在停止时保存该值)。然后,需要启动一个新的例程,因此,多个例程需要等待完成。在这种情况下,async.WaitGroup会更合适。


示意性地说,


func main(){

    var wg sync.WaitGroup

    processDone:=make(chan struct{})

    ctx,cancel := context.WithCancel(context.WithBackground())

    go read(ctx)

    wg.Add(1)

    go saveN(ctx,&wg)

    wg.Add(1)

    go map(ctx,&wg)

    go signal()

    go func(){

        wg.Wait()

        close(processDone)

    }()

    select {

        case <-processDone:

        case <-sig:

            cancel()

            select {

                case <-processDone:

                case <-time.After(time.Second):

            }

    }

}

在最后的代码中,正在传递等待组。例程负责调用wg.Done(),当所有例程完成后,processDone通道关闭,以发出选择信号。


    func GoCountColumns(ctx context.Context, dst chan Result, src chan []string, wg *sync.WaitGroup) {

        defer wg.Done()

        defer close(dst)

        for {

            select {

            case <-ctx.Done():

                <-time.After(time.Minute) // do something more useful.

                return // quit. Notice the defer will be called.

            case data := <-src:

                dst <- countColumns(data) // some calculation function

            }

        }

    }

尚未确定哪种模式是首选,但您也可能会看到waitgroup仅在调用站点进行管理。


func main(){

    var wg sync.WaitGroup

    processDone:=make(chan struct{})

    ctx,cancel := context.WithCancel(context.WithBackground())

    go read(ctx)

    wg.Add(1)

    go func(){

        defer wg.Done()

        saveN(ctx)

    }()

    wg.Add(1)

    go func(){

        defer wg.Done()

        map(ctx)

    }()

    go signal()

    go func(){

        wg.Wait()

        close(processDone)

    }()

    select {

        case <-processDone:

        case <-sig:

            cancel()

            select {

                case <-processDone:

                case <-time.After(time.Second):

            }

    }

}

除了所有这些问题和 OP 问题之外,您必须始终预先评估并行处理对于给定任务的相关性。没有独特的秘诀,练习和衡量你的代码性能。参见 pprof.


查看完整回答
反对 回复 2023-08-07
?
慕盖茨4494581

TA贡献1850条经验 获得超11个赞

这段代码中发生的事情太多了。您应该将代码重组为服务于特定目的的短函数,以便其他人可以轻松地帮助您(也可以帮助您自己)。

有多种方法可以让一个 go-routine 等待其他工作完成。最常见的方法是使用等待组(我提供的示例)或通道。

func processSomething(...) {

    ...

}


func main() {

    workers := &sync.WaitGroup{}


    for i := 0; i < numWorkers; i++ {

        workers.Add(1) // you want to call this from the calling go-routine and before spawning the worker go-routine


        go func() {

            defer workers.Done() // you want to call this from the worker go-routine when the work is done (NOTE the defer, which ensures it is called no matter what)

            processSomething(....) // your async processing

        }()

    }


    // this will block until all workers have finished their work

    workers.Wait()

}


查看完整回答
反对 回复 2023-08-07
?
www说

TA贡献1775条经验 获得超8个赞

您可以使用通道来阻塞,main直到 Goroutine 完成。


package main


import (

    "log"

    "time"

)


func main() {

    c := make(chan struct{})


    go func() {

        time.Sleep(3 * time.Second)

        log.Println("bye")

        close(c)

    }()


    // This blocks until the channel is closed by the routine

    <-c

}

无需向通道写入任何内容。读取会被阻塞,直到读取数据或者我们在这里使用的通道关闭为止。


查看完整回答
反对 回复 2023-08-07
  • 3 回答
  • 0 关注
  • 148 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信