1 回答
![?](http://img1.sycdn.imooc.com/56fb3e3d0001a10301000100-100-100.jpg)
TA贡献1836条经验 获得超5个赞
你走在正确的轨道上:
我还尝试过存储未处理项目的数量,并在每个线程上更新此数字时执行锁定。当未处理的项目为 0 时,我将调用 AddingCompleted 方法。
问题是你实际上是在计算活跃工人的数量,而不是未处理项目的数量。也就是说,当您开始处理某事时,您只会增加计数器,因此队列中可能有许多其他项目未由该计数器表示。要执行后者,您需要做的是每次向队列中添加内容时递增一个计数器,然后每次完成处理队列中的内容时递减计数器。
现在,如果您尝试过,您可能会遇到一个不同的问题:默认情况下,该Parallel.ForEach()方法会尝试从源中批量处理项目。这不适用于像BlockingCollection<T>在枚举期间可能阻塞的源,等待额外的数据。在您的示例中,这会导致死锁,在Parallel.ForEach()等待更多项目之前它将对最近的批次进行排队,同时BlockingCollection<T>等待更多项目被处理,从而导致更多项目排队。
方法等待集合,ForEach()集合等待ForEach()方法,就会出现死锁。
不过有一个解决方法:您可以提供ForEach()一个分区程序,该分区程序专门配置为不缓冲数据,而是在检索工作项时立即将其排队。
将这两种策略放在一起,你会得到一个看起来像这样的代码版本(我为诊断目的添加了一些小的输出更改):
static void Main(string[] args)
{
const int firstValue = 30;
const int secondValues = 20;
const int thirdValues = 10;
var process = new BlockingCollection<int>() { firstValue };
var parallelOptions = new ParallelOptions() { MaxDegreeOfParallelism = Environment.ProcessorCount };
int totalItemCount = process.Count;
OrderablePartitioner<int> partitioner = Partitioner.Create(process.GetConsumingEnumerable(), EnumerablePartitionerOptions.NoBuffering);
Parallel.ForEach(partitioner, parallelOptions, (item, state, i) =>
{
string message;
if (item > secondValues)
{
// Some add 2 items
Interlocked.Add(ref totalItemCount, 2);
process.Add(item - 1);
process.Add(item - 1);
message = $"{DateTime.Now.ToLongTimeString()}: process.Count: {process.Count} | item: {item} | Added: 2";
}
else if (item > thirdValues)
{
// Some add 1 item
Interlocked.Increment(ref totalItemCount);
process.Add(item - 1);
message = $"{DateTime.Now.ToLongTimeString()}: process.Count: {process.Count}| item: {item} | Added: 1";
}
else
{
// Some add 0 items
message = $"{DateTime.Now.ToLongTimeString()}: process.Count: {process.Count}| item: {item} | Added: 0";
}
int newCount = Interlocked.Decrement(ref totalItemCount);
if (newCount == 0)
{
process.CompleteAdding();
}
Console.WriteLine($"{message} | newCount: {newCount} | i: {i}");
});
// Parallel.ForEach will exit
Console.WriteLine("Completed Processing");
Console.ReadKey();
}
- 1 回答
- 0 关注
- 106 浏览
添加回答
举报