为了账号安全,请及时绑定邮箱和手机立即绑定

使用 BLPOP 处理 Redis 队列会导致单元测试中出现竞争条件?

使用 BLPOP 处理 Redis 队列会导致单元测试中出现竞争条件?

Go
白猪掌柜的 2022-05-23 17:02:19
我正在尝试实现一个先进先出的任务队列,如Go语言中 Redis 电子书的第 6.4.1 章所述。出于测试目的,我将一个CommandExecutor接口传递给“worker”函数,如下所示:package serviceimport (    "context"    "github.com/gomodule/redigo/redis"    "github.com/pkg/errors"    "github.com/sirupsen/logrus")const commandsQueue = "queuedCommands:"var pool = redis.Pool{    MaxIdle:   50,    MaxActive: 1000,    Dial: func() (redis.Conn, error) {        conn, err := redis.Dial("tcp", ":6379")        if err != nil {            logrus.WithError(err).Fatal("initialize Redis pool")        }        return conn, err    },}// CommandExecutor executes a commandtype CommandExecutor interface {    Execute(string) error}func processQueue(ctx context.Context, done chan<- struct{}, executor CommandExecutor) error {    rc := pool.Get()    defer rc.Close()    for {        select {        case <-ctx.Done():            done <- struct{}{}            return nil        default:            // If the commands queue does not exist, BLPOP blocks until another client            // performs an LPUSH or RPUSH against it. The timeout argument of zero is            // used to block indefinitely.            reply, err := redis.Strings(rc.Do("BLPOP", commandsQueue, 0))            if err != nil {                logrus.WithError(err).Errorf("BLPOP %s %d", commandsQueue, 0)                return errors.Wrapf(err, "BLPOP %s %d", commandsQueue, 0)            }            if len(reply) < 2 {                logrus.Errorf("Expected a reply of length 2, got one of length %d", len(reply))                return errors.Errorf("Expected a reply of length 2, got one of length %d", len(reply))            }            done <- struct{}{}        }    }}
查看完整描述

1 回答

?
狐的传说

TA贡献1804条经验 获得超3个赞

尽管在第一个测试的父上下文上调用了 cancel()。


在写入done和调用之间有一些时间cancel(),这意味着第一个测试可能(并且确实)进入第二for/select次迭代而不是退出 on <-ctx.Done()。更具体地说,测试代码在取消之前包含 2 个断言:


    assert.Exactly(t, 1, len(executor.ExecuteCalls()))

    assert.Exactly(t, "foobar", executor.ExecuteCalls()[0].In1)

然后才defer cancel()开始,这似乎为时已晚,无法取消第一个 go 例程的上下文。


如果您cancel()在读取之前移动 call done,则测试通过:


func TestProcessQueue(t *testing.T) {

    ctx, cancel := context.WithCancel(context.Background())


    executor := &CommandExecutorMock{

        ExecuteFunc: func(string) error {

            return nil

        },

    }


    done := make(chan struct{})

    go processQueue(ctx, done, executor)


    rc := pool.Get()

    defer rc.Close()


    _, err := rc.Do("RPUSH", commandsQueue, "foobar")

    require.NoError(t, err)


    cancel() // note this change right here

    <-done


    assert.Exactly(t, 1, len(executor.ExecuteCalls()))

    assert.Exactly(t, "foobar", executor.ExecuteCalls()[0].In1)

}


查看完整回答
反对 回复 2022-05-23
  • 1 回答
  • 0 关注
  • 100 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信