为了账号安全,请及时绑定邮箱和手机立即绑定

Golang - 为到不同服务器的多个连接扩展 websocket 客户端

Golang - 为到不同服务器的多个连接扩展 websocket 客户端

Go
胡子哥哥 2023-06-05 16:54:42
我有一个 websocket 客户端。实际上,它比下面显示的基本代码复杂得多。我现在需要扩展此客户端代码以打开到多个服务器的连接。最终,从服务器接收到消息时需要执行的任务是相同的。处理这个问题的最佳方法是什么?正如我上面所说,接收消息时执行的实际代码比示例中显示的要复杂得多。package mainimport (        "flag"        "log"        "net/url"        "os"        "os/signal"        "time"        "github.com/gorilla/websocket")var addr = flag.String("addr", "localhost:1234", "http service address")func main() {        flag.Parse()        log.SetFlags(0)        interrupt := make(chan os.Signal, 1)        signal.Notify(interrupt, os.Interrupt)        // u := url.URL{Scheme: "ws", Host: *addr, Path: "/echo"}        u := url.URL{Scheme: "ws", Host: *addr, Path: "/"}        log.Printf("connecting to %s", u.String())        c, _, err := websocket.DefaultDialer.Dial(u.String(), nil)        if err != nil {                log.Fatal("dial:", err)        }        defer c.Close()        done := make(chan struct{})        go func() {                defer close(done)                for {                        _, message, err := c.ReadMessage()                        if err != nil {                                log.Println("read:", err)                                return                        }                        log.Printf("recv: %s", message)                }        }()        ticker := time.NewTicker(time.Second)        defer ticker.Stop()        for {                select {                case <-done:                        return                case t := <-ticker.C:                        err := c.WriteMessage(websocket.TextMessage, []byte(t.String()))                        if err != nil {                                log.Println("write:", err)                                return                        }
查看完整描述

2 回答

?
眼眸繁星

TA贡献1873条经验 获得超9个赞

修改中断处理以在中断时关闭通道。这允许多个 goroutines 通过等待通道关闭来等待事件。


shutdown := make(chan struct{})

interrupt := make(chan os.Signal, 1)

signal.Notify(interrupt, os.Interrupt)

go func() {

    <-interrupt

    log.Println("interrupt")

    close(shutdown)

}()

将每个连接代码移动到一个函数中。这段代码是从问题中复制粘贴的,有两个变化:中断通道被关闭通道替换;该函数在函数完成时通知 sync.WaitGroup。


func connect(u string, shutdown chan struct{}, wg *sync.WaitGroup) {

    defer wg.Done()


    log.Printf("connecting to %s", u)

    c, _, err := websocket.DefaultDialer.Dial(u, nil)

    if err != nil {

        log.Fatal("dial:", err)

    }

    defer c.Close()


    done := make(chan struct{})


    go func() {

        defer close(done)

        for {

            _, message, err := c.ReadMessage()

            if err != nil {

                log.Println("read:", err)

                return

            }

            log.Printf("recv: %s", message)

        }

    }()


    ticker := time.NewTicker(time.Second)

    defer ticker.Stop()


    for {

        select {

        case <-done:

            return

        case t := <-ticker.C:

            err := c.WriteMessage(websocket.TextMessage, []byte(t.String()))

            if err != nil {

                log.Println("write:", err)

                return

            }

        case <-shutdown:

            // Cleanly close the connection by sending a close message and then

            // waiting (with timeout) for the server to close the connection.

            err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))

            if err != nil {

                log.Println("write close:", err)

                return

            }

            select {

            case <-done:

            case <-time.After(time.Second):

            }

            return

        }

    }

}

在中声明一个sync.WaitGroupmain()。对于要连接到的每个 websocket 端点,递增 WaitGroup 并启动 goroutine 以连接该端点。启动 goroutine 后,在 WaitGroup 上等待 goroutine 完成。


var wg sync.WaitGroup

for _, u := range endpoints { // endpoints is []string 

                              // where elements are URLs 

                              // of endpoints to connect to.

    wg.Add(1)

    go connect(u, shutdown, &wg)

}

wg.Wait()


查看完整回答
反对 回复 2023-06-05
?
开满天机

TA贡献1786条经验 获得超12个赞

与每个不同服务器的通信是否完全独立于其他服务器?如果是的话,我会以这样的方式四处走动:

  • main中创建一个带有取消函数的上下文

  • 在 main 中创建一个等待组来跟踪启动的 goroutines

  • 对于每个服务器,添加到等待组,从传递上下文和等待组引用的主函数启动一个新的 goroutine

  • main进入一个 for/select 循环,监听信号,如果信号到达,调用 cancelfunc 并等待等待组。

  • main还可以监听来自 goroutines 的结果 chan,并可能自己打印结果,因为 goroutines 不应该直接这样做。

  • 正如我们所说,每个goroutine都有 wg 的引用、上下文和可能的 chan 以返回结果。现在,如果 goroutine 必须只做一件事,或者它是否需要做一系列事情,这个方法就会分裂。对于第一种方法

  • 如果只有一件事要做,我们会遵循此处描述的方法(观察到异步他会依次启动一个新的 goroutine 来执行 DoSomething() 步骤,该步骤将在通道上返回结果)这允许它能够随时接受取消信号。由您决定您想要的非阻塞程度以及您想要响应取消信号的速度。此外,将关联的上下文传递给 goroutines 的好处是您可以调用 Context enabled大多数库函数的版本。例如,如果您希望您的拨号超时为 1 分钟,您可以创建一个新的上下文,该上下文从传递的超时开始,然后使用该超时创建 DialContext。这允许拨号从超时或父(您在 main 中创建的)上下文的 cancelfunc 被调用时停止。

  • 如果需要做更多的事情,我通常更喜欢用 goroutine 做一件事,让它调用一个新的 goroutine 来执行下一步(将所有引用传递到管道中)然后退出。

这种方法可以很好地扩展取消,并能够在任何步骤停止流水线,并且可以轻松地为可能需要太长时间的步骤支持带有 dealines 的上下文。


查看完整回答
反对 回复 2023-06-05
  • 2 回答
  • 0 关注
  • 218 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信