为了账号安全,请及时绑定邮箱和手机立即绑定

从 Redis 退订似乎不起作用

从 Redis 退订似乎不起作用

Go
繁星点点滴滴 2022-06-06 15:15:48
我正在尝试在 Redis 中使用 pub-sub。我要做的是打开两个redis-cli. 第一个我发出命令flushall以确保以绿色启动。然后在另一个终端中,我打开MONITOR以打印来自 Golang 示例客户端的所有命令(代码如下)。这是我从 MONITOR 打印的内容:1590207069.340860 [0 127.0.0.1:58910] "smembers" "user:Ali:rooms"1590207069.341380 [0 127.0.0.1:58910] "sadd" "user:Ali:rooms" "New"1590207069.345266 [0 127.0.0.1:58910] "smembers" "user:Ali:rooms"1590207069.353706 [0 127.0.0.1:58910] "sadd" "user:Ali:rooms" "Old"1590207069.354219 [0 127.0.0.1:58912] "subscribe" "New"1590207069.354741 [0 127.0.0.1:58910] "smembers" "user:Ali:rooms"1590207069.355444 [0 127.0.0.1:58912] "unsubscribe" "New" "Old"1590207069.356754 [0 127.0.0.1:58910] "sadd" "user:Ali:rooms" "OldPlusPlus"1590207069.357206 [0 127.0.0.1:58914] "subscribe" "New" "Old"1590207069.357656 [0 127.0.0.1:58910] "smembers" "user:Ali:rooms"1590207069.358362 [0 127.0.0.1:58912] "unsubscribe" "OldPlusPlus" "New" "Old"1590207069.361030 [0 127.0.0.1:58916] "subscribe" "OldPlusPlus" "New" "Old"我试图让客户端对随着时间的推移打开的所有通道都有一个连接。而不是一个连接/线程来处理 Redis 的每个通道。因此,每当需要新的订阅请求时,我都会尝试从客户端删除所有以前的订阅,并对旧频道和新频道进行新订阅。但似乎该unsubscribe命令没有按预期工作(或者我遗漏了一些东西)!因为当我尝试获取每个频道的客户端数量时,从第一个终端:127.0.0.1:6379> pubsub numsub OldPlusPlus New Old1) "OldPlusPlus"2) (integer) 13) "New"4) (integer) 25) "Old"6) (integer) 2除了当我尝试向“新”频道发送消息时,我的 go 客户端收到了两次消息!
查看完整描述

1 回答

?
冉冉说

TA贡献1877条经验 获得超1个赞

问题与订阅频道的 *redis.PubSub 类型的对象不是用于取消订阅频道的对象有关。


因此,我必须维护对此类对象的引用,然后使用该引用取消订阅所有频道。


这是修改和工作的代码:



package main


import (

    "fmt"

    "github.com/go-redis/redis/v7"

    "log"

)


type user struct {

    name        string

    rooms       []string

    stopRunning chan bool

    running     bool

    roomsPubsub map[string]*redis.PubSub

}


func (u *user) connect(rdb *redis.Client) error {

    // get all user rooms (from DB) and start subscribe

    r, err := rdb.SMembers(fmt.Sprintf("user:%s:rooms", u.name)).Result()

    if err != nil {

        return err

    }

    u.rooms = r


    if len(u.rooms) == 0 {

        return nil

    }


    u.doSubscribe("", rdb)


    return nil

}


func (u *user) subscribe(room string, rdb *redis.Client) error {

    // check if already subscribed

    for i := range u.rooms {

        if u.rooms[i] == room {

            return nil

        }

    }


    // add room to user

    userRooms := fmt.Sprintf("user:%s:rooms", u.name)

    if err := rdb.SAdd(userRooms, room).Err(); err != nil {

        return err

    }


    // get all user rooms (from DB) and start subscribe

    r, err := rdb.SMembers(userRooms).Result()

    if err != nil {

        return err

    }

    u.rooms = r


    if u.running {

        u.stopRunning <- true

    }


    u.doSubscribe(room, rdb)


    return nil

}


func (u *user) doSubscribe(room string, rdb *redis.Client) {

    pubSub := rdb.Subscribe(u.rooms...)

    if len(room) > 0 {

        u.roomsPubsub[room] = pubSub

    }


    go func() {

        u.running = true

        fmt.Println("starting the listener for user:", u.name, "on rooms:", u.rooms)

        for {

            select {


            case msg, ok := <-pubSub.Channel():

                if !ok {

                    break

                }

                fmt.Println(msg.Payload, msg.Channel)


            case <-u.stopRunning:

                fmt.Println("Stop listening for user:", u.name, "on old rooms")


                for k, v := range u.roomsPubsub {

                    if err := v.Unsubscribe(); err != nil {

                        fmt.Println("unable to unsubscribe", err)

                    }

                    delete(u.roomsPubsub, k)

                }

                break

            }

        }

    }()

}


func (u *user) unsubscribe(room string, rdb *redis.Client) error {

    return nil

}


var rdb *redis.Client


func main() {


    rdb = redis.NewClient(&redis.Options{Addr: "localhost:6379"})


    u := &user{

        name:        "Wael",

        stopRunning: make(chan bool),

        roomsPubsub: make(map[string]*redis.PubSub),

    }


    if err := u.connect(rdb); err != nil {

        log.Fatal(err)

    }


    if err := u.subscribe("New", rdb); err != nil {

        log.Fatal(err)

    }


    if err := u.subscribe("Old", rdb); err != nil {

        log.Fatal(err)

    }


    if err := u.subscribe("OldPlusPlus", rdb); err != nil {

        log.Fatal(err)

    }


    select {}

}



查看完整回答
反对 回复 2022-06-06
  • 1 回答
  • 0 关注
  • 136 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信