为了账号安全,请及时绑定邮箱和手机立即绑定

如何将多个 goroutine 同步到选定 goroutine 的终止

如何将多个 goroutine 同步到选定 goroutine 的终止

Go
慕哥9229398 2021-10-04 09:36:51
我在上一个问题中问过这个问题,但有些人觉得我最初的问题不够详细(“为什么你想要一个定时条件等待??”)所以这里有一个更具体的问题。我有一个 goroutine 正在运行,称之为服务器。它已经启动,将执行一段时间,并做它的事情。然后,它会退出,因为它完成了。在它的执行过程中,一些其他的 goroutine 会启动。如果您愿意,可以称它们为“客户端”线程。他们运行步骤 A 和步骤 B。然后,他们必须等待“服务器”goroutine 完成指定的时间,如果“服务器未完成,则退出状态,如果完成则说运行步骤 C。”(请不要告诉我如何重组此工作流程。它是假设的和给定的。无法更改。)一个正常的、合理的方法是让服务器线程用 selectAll 或 Broadcast 函数通知一个条件变量,并让其他线程处于定时等待状态监视条件变量。func (s *Server) Join(timeMillis int) error {  s.mux.Lock()  defer s.mux.Unlock()  while !s.isFinished {     err = s.cond.Wait(timeMillis)     if err != nil {        stepC()     }  }  return err}服务器将进入一个状态,其中 isFinished 变为 true 并广播关于互斥锁的条件变量。除非这是不可能的,因为 Go 不支持定时条件等待。(但有一个 Broadcast())那么,什么是“以 Go 为中心”的方式来做到这一点?我已经了解了所有的 Go 博客和文档,这种模式或其等价物,尽管很明显,但从未出现,也没有对基本问题进行任何等效的“重构”——即 IPC 风格的通道介于一个例程和一个例程之间其他例行公事。是的,有扇入/扇出,但请记住这些线程不断出现和消失。这应该很简单 - 并且至关重要的是/不要让成千上万的“等待状态”goroutine 在复用通道的另一个“分支”(计时器)发出信号时等待服务器死亡/。请注意,上面的一些“客户端”可能在服务器 goroutine 启动之前启动(这是通常创建通道的时间),一些可能出现在期间,一些可能出现在之后......在所有情况下,如果和,他们应该运行 stepC仅当服务器在进入 Join() 函数后 timeMillis 毫秒后运行并退出时...一般来说,当有多个消费者时,渠道设施似乎非常缺乏。“首先构建一个将侦听器映射到的通道注册表”和“这是一个非常漂亮的递归数据结构,它通过它作为字段保存的通道发送自己”是这样的: 等待(forSomeTime)
查看完整描述

1 回答

?
慕田峪9158850

TA贡献1794条经验 获得超7个赞

我认为可以通过在单个共享通道上进行选择,然后在完成后让服务器关闭它来完成您想要的操作。


假设我们创建了一个全局“退出通道”,它在所有 goroutine 之间共享。它可以在创建“服务器”goroutine 之前创建。重要的部分是服务器 goroutine 从不向通道发送任何内容,而只是将其关闭。


现在客户端 goroutines,只需执行以下操作:


select {

    case <- ch:

    fmt.Println("Channel closed, server is done!")

    case <-time.After(time.Second):

    fmt.Println("Timed out. do recovery stuff")


}

服务器 goroutine 只做:


close(ch)

更完整的例子:


package main


import(

    "fmt"

    "time"


)



func waiter(ch chan struct{}) {

    fmt.Println("Doing stuff")


    fmt.Println("Waiting...")


    select {

        case <- ch:

        fmt.Println("Channel closed")

        case <-time.After(time.Second):

        fmt.Println("Timed out. do recovery stuff")


    }

}



func main(){


    ch := make(chan struct{})


    go waiter(ch)

    go waiter(ch)

    time.Sleep(100*time.Millisecond)

    fmt.Println("Closing channel")

    close(ch)


    time.Sleep(time.Second)


}

这可以抽象为以下实用程序 API:


type TimedCondition struct {

    ch chan struct{}

}


func NewTimedCondition()*TimedCondition {

    return &TimedCondition {

        ch: make(chan struct{}),

    }

}


func (c *TimedCondition)Broadcast() {

    close(c.ch)

}


func (c *TimedCondition)Wait(t time.Duration) error {

    select {

        // channel closed, meaning broadcast was called

        case <- c.ch:

            return nil

        case <-time.After(t):

            return errors.New("Time out")   

    }

}


查看完整回答
反对 回复 2021-10-04
  • 1 回答
  • 0 关注
  • 158 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信