为了账号安全,请及时绑定邮箱和手机立即绑定

多个戈鲁丁在一个通道上有选择地监听

多个戈鲁丁在一个通道上有选择地监听

Go
芜湖不芜 2022-09-12 20:44:56
我看过这个,这个,这个和这个,但在这种情况下没有一个真正帮助我。我有多个戈鲁丁,如果通道中的值是针对该特定戈鲁廷的,则需要执行一些任务。var uuidChan chan stringfunc handleEntity(entityUuid string) {    go func() {        for {            select {            case uuid := <-uuidChan:                if uuid == entityUuid {                    // logic                    println(uuid)                    return                }            case <-time.After(time.Second * 5):                println("Timeout")                return            }        }    }()}func main() {    uuidChan = make(chan (string))    for i := 0; i < 5; i++ {        handleEntity(fmt.Sprintf("%d", i))    }    for i := 0; i < 4; i++ {        uuidChan <- fmt.Sprintf("%d", i)    }}https://play.golang.org/p/Pu5MhSP9Qtj在上面的逻辑中,uuid 由其中一个通道接收,但没有任何反应。为了解决这个问题,我尝试更改逻辑,以便在某个 uuid 的逻辑不在该例程中时将 uuid 重新插入到通道中。我知道这是一种不好的做法,这也行不通。func handleEntity(entityUuid string) {    go func() {        var notMe []string // stores list of uuids that can't be handled by this routine and re-inserts it in channel.        for {            select {            case uuid := <-uuidChan:                if uuid == entityUuid {                    // logic                    println(uuid)                    return                } else {                    notMe = append(notMe, uuid)                }            case <-time.After(time.Second * 5):                println("Timeout")                defer func() {                    for _, uuid := range notMe {                        uuidChan <- uuid                    }                }()                return            }        }    }()}https://play.golang.org/p/5On-Vd7UzqP正确的方法是什么?
查看完整描述

2 回答

?
泛舟湖上清波郎朗

TA贡献1818条经验 获得超3个赞

你有一个盒子里面有一个标签,所以收件人应该先阅读标签,然后决定如何处理它。如果将标签放在盒子内 - 您将迫使制造商打开盒子(请参阅解决方案1)。我鼓励您提供更好的邮政服务,并将标签至少放在盒子外面(请参阅解决方案3) - 或者更好地立即将盒子转移到正确的地址(请参阅解决方案2):

有很多解决方案可以解决这个问题,你只能受到想象力的限制:
1.由于你只有一个通道,里面有一个带有ID的数据,对于一个ID的消费者来说,你只能从通道中读取一次数据(假设通道内的数据很重要) - 你有一个简单的sulotion: 使用读取 goroutine 从通道读取数据,然后应用逻辑来决定如何处理此数据 - 例如,将其发送到另一个 goroutine 或运行任务。
试试这个

package main


import (

    "fmt"

    "sync"

    "time"

)


func main() {

    uuidChan := make(chan string)

    var wg sync.WaitGroup


    wg.Add(1)

    go func() {

        defer wg.Done()

        t := time.NewTimer(5 * time.Second)

        defer t.Stop()

        for {

            select {

            case uuid, ok := <-uuidChan:

                if !ok {

                    fmt.Println("Channel closed.")

                    return

                }

// logic:

                wg.Add(1)

                // Multiple goroutines listening selectively on one channel

                go consume(uuid, &wg)

                // switch uuid {case 1: go func1(); case 2: go func2()}


            case <-t.C:

                fmt.Println("Timeout")

                return

            }

        }

    }()


    for i := 0; i < 4; i++ {

        uuidChan <- fmt.Sprintf("%d", i)

    }

    close(uuidChan) // free up the goroutine


    wg.Wait() // wait until all consumers are done

    fmt.Println("All done.")

}


// Multiple goroutines listening selectively on one channel

func consume(uuid string, wg *sync.WaitGroup) {

    defer wg.Done()

// logic: or decide here based on uuid

    fmt.Println("job #:", uuid) // job

}

输出:


job #: 0

job #: 2

job #: 1

Channel closed.

job #: 3

All done.

使用每个戈鲁丁的通道,试试这个:

package main


import (

    "fmt"

    "sync"

    "time"

)


func handleEntity(uuidChan chan string, wg *sync.WaitGroup) {

    defer wg.Done()

    // for {

    select {

    case uuid, ok := <-uuidChan:

        if !ok {

            fmt.Println("closed")

            return // free up goroutine on chan closed

        }

        fmt.Println(uuid)

        return // job done


    case <-time.After(1 * time.Second):

        fmt.Println("Timeout")

        return

    }

    // }

}


func main() {

    const max = 5

    slice := make([]chan string, max)

    var wg sync.WaitGroup


    for i := 0; i < max; i++ {

        slice[i] = make(chan string, 1)


        wg.Add(1)

        go handleEntity(slice[i], &wg)

    }


    for i := 0; i < 4; i++ {

        slice[i] <- fmt.Sprintf("%d", i) // send to the numbered channel

    }


    wg.Wait()

    fmt.Println("All done.")

}

输出:


3

0

1

2

Timeout

All done.

使用和信号广播:

所以我们有一个盒子,并使用名为共享var的名称,我们在盒子的顶部添加接收者的地址。这里使用名为“首先将框设置为所需的ID”,然后使用信号广播通知所有正在收听的goroutines唤醒并检查和时间,以查看一个是否已寻址并且是否过期,然后全部返回等待状态,并且已寻址或已过期的通道继续读取无缓冲的信道或退出。然后使用 来表示剩余戈鲁廷的过期,最后让它们全部加入。请注意,第一个应该在之后调用 - 这意味着 goroutines 应该在第一个调用之前运行,所以一种方法是简单地使用另一个命名的 short for 。labelsync.Condlabellabellabellabeltime.AfterFuncwg.Wait()c.Broadcast()c.Wait()c.Broadcast()sync.WaitGroupw4wwait for wait

package main


import (

    "fmt"

    "sync"

    "time"

)


func handleEntity(entityUuid string) {

    defer wg.Done()

    t0 := time.Now()

    var expired, addressed bool


    w4w.Done()

    m.Lock()

    for !expired && !addressed {

        c.Wait()

        addressed = label == entityUuid

        expired = time.Since(t0) > d

    }

    m.Unlock()


    fmt.Println("id =", entityUuid, "addressed =", addressed, "expired =", expired)

    if !expired && addressed {

        uuid := <-uuidChan

        fmt.Println("matched =", entityUuid, uuid)

    }

    fmt.Println("done", entityUuid)

}


func main() {

    for i := 0; i < 5; i++ {

        w4w.Add(1)

        wg.Add(1)

        go handleEntity(fmt.Sprintf("%d", i))

    }

    w4w.Wait()


    time.AfterFunc(d, func() {

        // m.Lock()

        // label = "none"

        // m.Unlock()

        fmt.Println("expired")

        c.Broadcast() // expired

    })


    for i := 0; i < 4; i++ {

        m.Lock()

        label = fmt.Sprintf("%d", i)

        m.Unlock()

        c.Broadcast() // notify all

        uuidChan <- label

    }


    fmt.Println("...")

    wg.Wait()

    fmt.Println("all done")

}


var (

    label    string

    uuidChan = make(chan string)

    m        sync.Mutex

    c        = sync.NewCond(&m)

    w4w, wg  sync.WaitGroup

    d        = 1 * time.Second

)

输出:


id = 0 addressed = true expired = false

matched = 0 0

done 0

id = 1 addressed = true expired = false

matched = 1 1

done 1

id = 2 addressed = true expired = false

matched = 2 2

done 2

id = 3 addressed = true expired = false

matched = 3 3

done 3

...

expired

id = 4 addressed = false expired = true

done 4

all done


查看完整回答
反对 回复 2022-09-12
?
杨魅力

TA贡献1811条经验 获得超6个赞

也许你想映射你的频道,以发送消息到正确的戈鲁丁马上:


package main


import (

    "fmt"

    "time"

)


func worker(u string, c chan string) {

    for {

        fmt.Printf("got %s in %s\n", <-c, u)

    }

}


func main() {

    workers := make(map[string]chan string)


    for _, u := range []string{"foo", "bar", "baz"} {

        workers[u] = make(chan string)

        go worker(u, workers[u])

    }


    workers["foo"] <- "hello"

    workers["bar"] <- "world"

    workers["baz"] <- "!"


    fmt.Println()


    time.Sleep(time.Second)

}


查看完整回答
反对 回复 2022-09-12
  • 2 回答
  • 0 关注
  • 73 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信