为了账号安全,请及时绑定邮箱和手机立即绑定

Redis golang 客户端定期丢弃错误的 PubSub 连接(EOF)

Redis golang 客户端定期丢弃错误的 PubSub 连接(EOF)

Go
狐的传说 2023-07-17 17:06:57
我做了什么:我正在使用golang来自 的 Redis 库github.com/go-redis/redis。我的客户端监听一个名为“control”的 PubSub 通道。每当消息到达时,我都会处理它并继续接收下一条消息。我没完没了地听着,信息经常出现,有时好几天都没有。我的期望:我希望 Redis 通道能够无限地保持打开状态并在发送消息时接收消息。我的经历:通常它会运行几天,但偶尔client.Receive()会返回EOF错误。发生此错误后,客户端不再在该通道上接收消息。在内部,redis 客户端向 stdout 打印以下消息:redis: 2019/08/29 14:18:57 pubsub.go:151: redis: 丢弃坏的 PubSub 连接: EOF免责声明:我不确定这个错误是导致我停止接收消息的原因,它只是看起来相关。附加问题:我想了解为什么会发生这种情况,如果这是正常的,并且client.Subscribe()每当我遇到这种行为时重新连接到频道是否是一个很好的补救措施,或者我应该解决根本问题,无论它是什么。代码:这是处理我的客户端的完整代码(连接到 redis、订阅频道、无休止地接收消息):func InitAndListenAsync(log *log.Logger, sseHandler func(string, string) error) error {    rootLogger = log.With(zap.String("component", "redis-client"))    host := env.RedisHost    port := env.RedisPort    pass := env.RedisPass    addr := fmt.Sprintf("%s:%s", host, port)    tlsCfg := &tls.Config{}    client = redis.NewClient(&redis.Options{        Addr:      addr,        Password:  pass,        TLSConfig: tlsCfg,    })    if _, err := client.Ping().Result(); err != nil {        return err    }    go func() {        controlSub := client.Subscribe("control")        defer controlSub.Close()        for {            in, err := controlSub.Receive()  // *** SOMETIMES RETURNS EOF ERROR ***            if err != nil {                rootLogger.Error("failed to get feedback", zap.Error(err))                break            }            switch in.(type) {            case *redis.Message:                cm := comm.ControlMessageEvent{}                payload := []byte(in.(*redis.Message).Payload)                if err := json.Unmarshal(payload, &cm); err != nil {                    rootLogger.Error("failed to parse control message", zap.Error(err))                } else if err := handleIncomingEvent(&cm); err != nil {                    rootLogger.Error("failed to handle control message", zap.Error(err))                }
查看完整描述

3 回答

?
慕容708150

TA贡献1831条经验 获得超4个赞

pubsub.Channel()我通过遍历从而不是返回的通道来解决断开连接Receive()。


这是新代码:



func listenToControlChannel(client *redis.Client) {

    pubsub := client.Subscribe("control")

    defer pubsub.Close()


    if _, err := pubsub.Receive(); err != nil {

        rootLogger.Error("failed to receive from control PubSub", zap.Error(err))

        return

    }


    controlCh := pubsub.Channel()

    fmt.Println("start listening on control PubSub")


    // Endlessly listen to control channel,

    for msg := range controlCh {

        cm := ControlMessageEvent{}

        payload := []byte(msg.Payload)

        if err := json.Unmarshal(payload, &cm); err != nil {

            fmt.Printf("failed to parse control message: %s\n", err.Error())

        } else if err := handleIncomingEvent(&cm); err != nil {

            fmt.Printf("failed to handle control message: %s\n", err.Error())

        }

    }

}


查看完整回答
反对 回复 2023-07-17
?
繁星点点滴滴

TA贡献1803条经验 获得超3个赞

我不知道如果这是正确的方法,但在创建新的 Redis 客户端时,将ReadTimeout属性设置为-1解决了我的问题。


redisClient := redis.NewClient(&redis.Options{

    Addr:        addr,

    Password:    redisConf.Password,

    DB:          0, // Default DB

    ReadTimeout: -1,

})

注意:我使用的是 go-redis/v9


查看完整回答
反对 回复 2023-07-17
?
冉冉说

TA贡献1877条经验 获得超1个赞

我的看法是,如果 Redis 认为客户端空闲,它可能会断开你的客户端的连接。

解决这个问题的方法似乎是这样的:

  1. 使用ReceiveTimeout而不是Receive.

  2. 如果操作超时,则发出Ping并等待回复。

  3. 冲洗,重复。

这样,您就可以确保连接上存在一些流量,无论是否实际发布了任何数据。


查看完整回答
反对 回复 2023-07-17
  • 3 回答
  • 0 关注
  • 356 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信