为了账号安全,请及时绑定邮箱和手机立即绑定

使用 TestSchedulers、Rx 和 BlockingCollection 进行死锁测试

使用 TestSchedulers、Rx 和 BlockingCollection 进行死锁测试

C#
莫回无 2023-09-16 15:01:30
我有以下类,它基本上订阅int observable 并将值乘以 2。出于现实目的,我添加了 Thread.Sleep 来模拟繁重的处理。public class WorkingClass{    private BlockingCollection<int> _collection = new BlockingCollection<int>(1);    public WorkingClass(IObservable<int> rawValues)    {        rawValues.Subscribe(x => _collection.Add(x));    }    public IObservable<int> ProcessedValues()    {        return Observable.Create<int>(observer =>        {            while (true)            {                int value;                try                {                    value = _collection.Take();                }                catch (Exception ex)                {                    observer.OnError(ex);                    break;                }                Thread.Sleep(1000); //Simulate long work                observer.OnNext(value * 2);            }            return Disposable.Empty;        });    }}我在测试它时遇到了麻烦,在下面的测试中我只想断言如果源流发出值 1,SUT 将发出值 2:[Test]public void SimpleTest(){    var sourceValuesScheduler = new TestScheduler();    var newThreadScheduler = new TestScheduler();    var source = sourceValuesScheduler.CreateHotObservable(         new Recorded<Notification<int>>(1000, Notification.CreateOnNext(1)));    var sut = new WorkingClass(source);    var observer = sourceValuesScheduler.CreateObserver<int>();    sut.ProcessedValues()        .SubscribeOn(newThreadScheduler) //The cold part (i.e, the while loop) of the ProcessedValues Observable should run in a different thread        .Subscribe(observer);    sourceValuesScheduler.AdvanceTo(1000);    observer.Messages.AssertEqual(new Recorded<Notification<int>>(1000, Notification.CreateOnNext(2)));}如果我运行此测试,则断言会失败,因为 newThreadScheduler 从未启动,因此从未创建 ProcessedValues observable。如果我这样做: sourceValuesScheduler.AdvanceTo(1000); newThreadScheduler.AdvanceTo(1000); 它也不起作用,因为 newThreadScheduler 使用与 sourceValuesScheduler 相同的线程,因此测试将在处理后的值被发出后立即挂起,在以下行:value = _collection.Take();有没有办法让多个 TestScheduler 在不同的线程上运行?否则我怎么能测试这样的课程呢?
查看完整描述

1 回答

?
料青山看我应如是

TA贡献1772条经验 获得超8个赞

Take()阻塞,直到有一个项目可以从 中删除BlockingCollection<int>或者您调用CompleteAdding()它。


ProcessedValues()鉴于您当前的实现,您订阅并执行循环的线程while将永远不会完成。


您应该BlockingCollection<int>在单独的线程上使用它。Task例如,您可以在ProcessedValues()调用时创建消耗。考虑以下实现,它也处理BlockingCollection<int>:


public sealed class WorkingClass : IDisposable

{

    private BlockingCollection<int> _collection = new BlockingCollection<int>(1);

    private List<Task> _consumerTasks = new List<Task>();


    public WorkingClass(IObservable<int> rawValues)

    {

        rawValues.Subscribe(x => _collection.Add(x));

    }


    public IObservable<int> ProcessedValues()

    {

        return Observable.Create<int>(observer =>

        {

            _consumerTasks.Add(Task.Factory.StartNew(() => Consume(observer), TaskCreationOptions.LongRunning));

            return Disposable.Empty;

        });

    }


    private void Consume(IObserver<int> observer)

    {

        try

        {

            foreach (int value in _collection.GetConsumingEnumerable())

            {

                Thread.Sleep(1000); //Simulate long work

                observer.OnNext(value * 2);

            }

        }

        catch (Exception ex)

        {

            observer.OnError(ex);

        }

    }


    public void Dispose()

    {

        _collection.CompleteAdding();

        Task.WaitAll(_consumerTasks.ToArray());

        _collection.Dispose();

    }

}

可以使用以下代码进行测试:


var sourceValuesScheduler = new TestScheduler();


var source = sourceValuesScheduler.CreateHotObservable(

    new Recorded<Notification<int>>(1000, Notification.CreateOnNext(1)));


var observer = sourceValuesScheduler.CreateObserver<int>();


using (var sut = new WorkingClass(source))

{

    sourceValuesScheduler.AdvanceTo(1000); //add to collection

    sut.ProcessedValues().Subscribe(observer); //consume

} //...and wait until the loop exists


observer.Messages.AssertEqual(new Recorded<Notification<int>>(1000, Notification.CreateOnNext(2)));



查看完整回答
反对 回复 2023-09-16
  • 1 回答
  • 0 关注
  • 78 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信