为了账号安全,请及时绑定邮箱和手机立即绑定

ZMQ 无法接收来自多个发布者的消息

ZMQ 无法接收来自多个发布者的消息

Go
扬帆大鱼 2023-05-22 17:19:10
我正在实施 ZMQ 的浓缩咖啡模式。我想连接很多订阅者 <> 代理 <> 许多发布者但是,代理中的侦听器仅接收来自一个发布者的消息。因此,订阅者只能从那个特定的发布者那里接收。我不知道我的代码有什么问题。package playgroundimport (    zmq "github.com/pebbe/zmq4"    "fmt"    "math/rand"    "time"    "testing")func subscriber_thread(id int) {    subscriber, _ := zmq.NewSocket(zmq.SUB)    subscriber.Connect("tcp://localhost:6001")    subscriber.SetSubscribe("")    defer subscriber.Close()    for {        msg, err := subscriber.RecvMessage(0)        if err != nil {            panic(err)        }        fmt.Println("subscriber id:", id,"received:", msg)    }}func publisher_thread(n int) {    publisher, _ := zmq.NewSocket(zmq.PUB)    publisher.Bind("tcp://*:6000")    for {        s := fmt.Sprintf("%c-%05d", n +'A', rand.Intn(100000))        _, err := publisher.SendMessage(s)        if err != nil {            panic(err)        }        fmt.Println("publisher sent:", s)        time.Sleep(100 * time.Millisecond) //  Wait for 1/10th second    }}//  The listener receives all messages flowing through the proxy, on its//  pipe. In CZMQ, the pipe is a pair of ZMQ_PAIR sockets that connects//  attached child threads. In other languages your mileage may vary:func listener_thread() {    pipe, _ := zmq.NewSocket(zmq.PAIR)    pipe.Bind("inproc://pipe")    //  Print everything that arrives on pipe    for {        msg, err := pipe.RecvMessage(0)        if err != nil {            break //  Interrupted        }        fmt.Printf("%q\n", msg)    }}func TestZmqEspresso(t *testing.T) {    go publisher_thread(0)    go publisher_thread(1)    go publisher_thread(2)    go subscriber_thread(1)    go subscriber_thread(2)    go listener_thread()    time.Sleep(100 * time.Millisecond)    subscriber, _ := zmq.NewSocket(zmq.XSUB)    subscriber.Connect("tcp://localhost:6000")    publisher, _ := zmq.NewSocket(zmq.XPUB)    publisher.Bind("tcp://*:6001")    listener, _ := zmq.NewSocket(zmq.PAIR)    listener.Connect("inproc://pipe")    zmq.Proxy(subscriber, publisher, listener)    fmt.Println("interrupted")}
查看完整描述

1 回答

?
慕桂英3389331

TA贡献2036条经验 获得超8个赞

解决方案:XPUB/XSUB 应该绑定到 socket PUB,SUB worker 应该连接到 socket


下面的工作代码


package playground


import (

    zmq "github.com/pebbe/zmq4"


    "fmt"

    "log"

    "math/rand"

    "testing"

    "time"

)


func subscriber_thread(id int) {

    subscriber, err := zmq.NewSocket(zmq.SUB)

    if err != nil {

        panic(err)

    }

    err = subscriber.Connect("tcp://localhost:6001")

    if err != nil {

        panic(err)

    }

    err = subscriber.SetSubscribe("")

    if err != nil {

        panic(err)

    }

    defer subscriber.Close()


    for {

        msg, err := subscriber.RecvMessage(0)

        if err != nil {

            panic(err)

        }

        fmt.Println("subscriber id:", id, "received:", msg)

    }

}


func publisher_thread(n int) {

    publisher, err := zmq.NewSocket(zmq.PUB)

    if err != nil {

        panic(err)

    }

    //err = publisher.Bind("tcp://*:6000")

    err = publisher.Connect("tcp://localhost:6000")

    if err != nil {

        panic(err)

    }


    for {

        s := fmt.Sprintf("%c-%05d", n+'A', rand.Intn(100000))

        _, err := publisher.SendMessage(s)

        if err != nil {

            panic(err)

        }

        fmt.Println("publisher sent:", s)

        time.Sleep(100 * time.Millisecond) //  Wait for 1/10th second

    }

}


//  The listener receives all messages flowing through the proxy, on its

//  pipe. In CZMQ, the pipe is a pair of ZMQ_PAIR sockets that connects

//  attached child threads. In other languages your mileage may vary:


func listener_thread() {

    pipe, _ := zmq.NewSocket(zmq.PAIR)

    pipe.Bind("inproc://pipe")


    //  Print everything that arrives on pipe

    for {

        msg, err := pipe.RecvMessage(0)

        if err != nil {

            break //  Interrupted

        }

        fmt.Printf("%q\n", msg)

    }

}


func TestZmqEspresso(t *testing.T) {

    log.SetFlags(log.LstdFlags | log.Lmicroseconds | log.Lshortfile)


    go publisher_thread(0)

    go publisher_thread(1)

    go publisher_thread(2)


    go subscriber_thread(1)

    go subscriber_thread(2)


    go listener_thread()


    time.Sleep(100 * time.Millisecond)


    subscriber, err := zmq.NewSocket(zmq.XSUB)

    if err != nil {

        panic(err)

    }

    //err = subscriber.Connect("tcp://localhost:6000")

    err = subscriber.Bind("tcp://*:6000")

    if err != nil {

        panic(err)

    }


    publisher, err := zmq.NewSocket(zmq.XPUB)

    if err != nil {

        panic(err)

    }

    err = publisher.Bind("tcp://*:6001")

    if err != nil {

        panic(err)

    }


    listener, _ := zmq.NewSocket(zmq.PAIR)

    listener.Connect("inproc://pipe")


    err = zmq.Proxy(subscriber, publisher, listener)

    if err != nil {

        panic(err)

    }


    fmt.Println("interrupted")


}


查看完整回答
反对 回复 2023-05-22
  • 1 回答
  • 0 关注
  • 196 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信