为了账号安全,请及时绑定邮箱和手机立即绑定

如何在 for-select 循环中更改通道

如何在 for-select 循环中更改通道

Go
慕沐林林 2022-09-19 14:54:23
我想做一个动态通道池,它可以监听数十万个通道,所有这些都在控制之下,正如我所排除的那样,我希望它能够自动升级,如果有太多的通道监听(goroutine =>反射=>选择)但是在selectN通道观察器编码期间,我被通道替换阻止了我想在运行时替换chan,这是在选择循环中,我已经尝试了一段时间以使其可用,但事情进展不顺利。func Test_Change(t *testing.T) {type A struct {    ch chan interface{}}a := &A{    ch: make(chan interface{}),}go func() {    for {        select {        case v := <-a.ch:            fmt.Println(v)        }    }}()newCh := make(chan interface{})go func() {    for i := 0; i < 200; i++ {        a.ch <- i    }    a.ch = newCh}()go func() {    for i := 1000; i < 1010; i++ {        newCh <- i    }}()for {    select {}}}它阻止了func Test_Change(t *testing.T) {type A struct {    ch chan interface{}    bh chan interface{}}a := &A{    ch: make(chan interface{}),    bh: make(chan interface{}),}notify := make(chan struct{})go func() {    for {        select {        case v := <-a.ch:            fmt.Println(v)        case <-notify:            fmt.Println("notify")        }    }}()newCh := make(chan interface{})go func() {    for i := 0; i < 200; i++ {        a.ch <- i    }    a.ch = newCh    notify <- struct{}{}}()go func() {    for i := 1000; i < 1010; i++ {        newCh <- i    }}()for {    select {}}}它奏效了
查看完整描述

3 回答

?
慕斯709654

TA贡献1840条经验 获得超5个赞

您已经正确确定,使用常规语法(固定数量的事例)不可能在任意大的动态 chan 上设置一个块,并且可以使用反射包。select


但是,我不确定这是实现目标的最佳方法。如果您确实有数千个频道需要观看(例如,同时连接数千个远程客户端),则可以使用“扇入”模式将所有内容写入非常少的固定数量的频道,并选择该频道。


而不是


    for {

        select {

        case <-sigterm:

            cleanup()

            os.Exit(1)

        case msg := <-client1:

            // process msg...

        case msg := <-client2:

            // process msg...

        // HOW CAN I DYNAMICALLY ADD AND REMOVE A CLIENT HERE?

        }

    }

想想像这样:


    for {

        select {

        case <-sigterm:

            cleanup()

            os.Exit(1)

        case msg := <-clients:

            // process msg...

        }

    }

func addClient(client chan Message) {

    // Fan-in: read all future messages from client, and write them

    // to clients.

    go func(){

        for msg := range client {

            clients <- msg

        }

    }()

}

替换通道变量的值不是线程安全的(可以是数据争用),但是让多个戈鲁廷同时写入和读取同一通道是完全可以的。clients


查看完整回答
反对 回复 2022-09-19
?
潇湘沐

TA贡献1816条经验 获得超6个赞

数据竞赛是一个严重的错误和设计缺陷。您可以通过 使用 运行测试或使用 运行程序来检测数据争用。a.chgo test -racego run -race program.go


可以替换在 for/select 循环中使用的 chan 的值,只要它是在事例的主体内正确完成的,而不是在另一个并发 goroutine 的代码中。


    replace := time.After(3 * time.Second)


    for {

        select {

        case v, ok := <-ch1:

            // use v...

        case v, ok := <-ch2:

            // use v...

        case <-replace:

            ch1 = anotherChannel

        }

    }

此示例可运行代码不雅(不要这样做)。您可以将其保存在工作站上,然后尝试使用数据竞速检测器。

固定的示例代码不具有不雅性。


查看完整回答
反对 回复 2022-09-19
?
炎炎设计

TA贡献1808条经验 获得超4个赞

也许这可能有效;


非常非常广泛地测试它,它是可行的,但很难把它弄对。


我预计它不会是完美的。而且它缺乏关于“该做”和“不该做”的大量文档。


它也不是合并输入通道的版本,它总是一次只消耗一个输入通道。这可能会对性能造成问题。


我给出的唯一保证是它没有比赛。


虽然我把写一个同人版的任务留给读者作为练习。


package main


import (

    "fmt"

)


func main() {

    m := New()

    go m.Run()

    input := m.Resize(0)

    input <- 5

    input <- 4

    close(input)


    input = m.Resize(10)

    input <- 6

    input <- 7

    close(input)


    input = m.Resize(2)

    input <- 8

    input <- 9

    close(input)


    m.Close()


    fmt.Println()


}


type masterOfThings struct {

    notify    chan notification

    wantClose chan chan bool

}


func New() masterOfThings {

    return masterOfThings{

        notify:    make(chan notification, 1),

        wantClose: make(chan chan bool),

    }

}


type notification struct {

    N   int

    out chan chan interface{}

}


func (m masterOfThings) Resize(n int) chan<- interface{} {

    N := notification{

        N:   n,

        out: make(chan chan interface{}, 1),

    }

    m.notify <- N

    return <-N.out

}


func (m masterOfThings) Close() {

    closed := make(chan bool)

    m.wantClose <- closed

    <-closed

}


func (m masterOfThings) Run() {

    var input chan interface{}

    inputs := []chan interface{}{}

    closers := []chan bool{}

    defer func() {

        for _, c := range closers {

            close(c)

        }

    }()

    var wantClose bool

    for {

        select {

        case m := <-m.wantClose:

            closers = append(closers, m)

            wantClose = true

            if len(inputs) < 1 && input == nil {

                return

            }

        case n, ok := <-input:

            if !ok {

                input = nil

                if len(inputs) > 0 {

                    input = inputs[0]

                    copy(inputs, inputs[1:])

                    inputs = inputs[:len(inputs)-1]

                } else if wantClose {

                    return

                }

                continue

            }

            fmt.Println(n)

        case n := <-m.notify:

            nInput := make(chan interface{}, n.N)

            if input == nil {

                input = nInput

            } else {

                inputs = append(inputs, nInput)

            }

            n.out <- nInput

        }

    }

}


查看完整回答
反对 回复 2022-09-19
  • 3 回答
  • 0 关注
  • 113 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信