1 回答
TA贡献1772条经验 获得超8个赞
Take()阻塞,直到有一个项目可以从 中删除BlockingCollection<int>或者您调用CompleteAdding()它。
ProcessedValues()鉴于您当前的实现,您订阅并执行循环的线程while将永远不会完成。
您应该BlockingCollection<int>在单独的线程上使用它。Task例如,您可以在ProcessedValues()调用时创建消耗。考虑以下实现,它也处理BlockingCollection<int>:
public sealed class WorkingClass : IDisposable
{
private BlockingCollection<int> _collection = new BlockingCollection<int>(1);
private List<Task> _consumerTasks = new List<Task>();
public WorkingClass(IObservable<int> rawValues)
{
rawValues.Subscribe(x => _collection.Add(x));
}
public IObservable<int> ProcessedValues()
{
return Observable.Create<int>(observer =>
{
_consumerTasks.Add(Task.Factory.StartNew(() => Consume(observer), TaskCreationOptions.LongRunning));
return Disposable.Empty;
});
}
private void Consume(IObserver<int> observer)
{
try
{
foreach (int value in _collection.GetConsumingEnumerable())
{
Thread.Sleep(1000); //Simulate long work
observer.OnNext(value * 2);
}
}
catch (Exception ex)
{
observer.OnError(ex);
}
}
public void Dispose()
{
_collection.CompleteAdding();
Task.WaitAll(_consumerTasks.ToArray());
_collection.Dispose();
}
}
可以使用以下代码进行测试:
var sourceValuesScheduler = new TestScheduler();
var source = sourceValuesScheduler.CreateHotObservable(
new Recorded<Notification<int>>(1000, Notification.CreateOnNext(1)));
var observer = sourceValuesScheduler.CreateObserver<int>();
using (var sut = new WorkingClass(source))
{
sourceValuesScheduler.AdvanceTo(1000); //add to collection
sut.ProcessedValues().Subscribe(observer); //consume
} //...and wait until the loop exists
observer.Messages.AssertEqual(new Recorded<Notification<int>>(1000, Notification.CreateOnNext(2)));
- 1 回答
- 0 关注
- 78 浏览
添加回答
举报