为了账号安全,请及时绑定邮箱和手机立即绑定

如何知道何时停止并行 foreach,其中消费者也是 C# 中的生产者

如何知道何时停止并行 foreach,其中消费者也是 C# 中的生产者

C#
梦里花落0921 2022-11-21 20:32:42
我正在尝试使用 Parallel.ForEach() 并行处理 BlockingCollection 中的一些项目。当处理一个项目时,它可以生成 0-2 个以上的项目来处理。要处理的项目数最终总是会达到 0。我的问题是,由于消费者也是生产者(处理项目可以生成更多要处理的项目),当 BlockingCollection 为空时,我无法调用 BlockingCollection 的 CompleteAdding(),因为当前可能有其他线程正在处理将生成更多项目的项目项目。因此我不知道如何让 BlockingCollection/Parallel.ForEach 知道它可以退出。这是情况的示例(为简单起见进行了修改)using System;using System.Collections.Concurrent;using System.Threading.Tasks;namespace Example{    class Example    {        static void Main(string[] args)        {            var process = new BlockingCollection<int>() { 30 };            var parallelOptions = new ParallelOptions() { MaxDegreeOfParallelism = Environment.ProcessorCount };            Parallel.ForEach(process.GetConsumingEnumerable(), parallelOptions, item =>            {                if (item > 20)                {                    // Some add 2 items                    process.Add(item - 1);                    process.Add(item - 1);                    Console.WriteLine($"process.Count: {process.Count} | item: {item} | Added: 2");                }                else if (item > 10)                {                    // Some add 1 item                    process.Add(item-1);                    Console.WriteLine($"process.Count: {process.Count}| item: {item} | Added: 1");                }                else                {                    // Some add 0 items                    Console.WriteLine($"process.Count: {process.Count}| item: {item} | Added: 0");                }            });            // Parallel.ForEach never exits            Console.WriteLine("Completed Processing");            Console.ReadKey();        }    }}我尝试将 Parallel.ForEach 期间的 MaxDegreeOfParallelism 修改为要处理的项目数和 Environment.ProcessorCount 中的最小值,但这在 Parallel.ForEach 期间没有任何作用。我还尝试过存储未处理项目的数量,并在每个线程上更新此数字时执行锁定。当未处理的项目为 0 时,我将调用 AddingCompleted 方法。这也不管用。
查看完整描述

1 回答

?
一只甜甜圈

TA贡献1836条经验 获得超5个赞

你走在正确的轨道上:


我还尝试过存储未处理项目的数量,并在每个线程上更新此数字时执行锁定。当未处理的项目为 0 时,我将调用 AddingCompleted 方法。


问题是你实际上是在计算活跃工人的数量,而不是未处理项目的数量。也就是说,当您开始处理某事时,您只会增加计数器,因此队列中可能有许多其他项目未由该计数器表示。要执行后者,您需要做的是每次向队列中添加内容时递增一个计数器,然后每次完成处理队列中的内容时递减计数器。


现在,如果您尝试过,您可能会遇到一个不同的问题:默认情况下,该Parallel.ForEach()方法会尝试从源中批量处理项目。这不适用于像BlockingCollection<T>在枚举期间可能阻塞的源,等待额外的数据。在您的示例中,这会导致死锁,在Parallel.ForEach()等待更多项目之前它将对最近的批次进行排队,同时BlockingCollection<T>等待更多项目被处理,从而导致更多项目排队。


方法等待集合,ForEach()集合等待ForEach()方法,就会出现死锁。


不过有一个解决方法:您可以提供ForEach()一个分区程序,该分区程序专门配置为不缓冲数据,而是在检索工作项时立即将其排队。


将这两种策略放在一起,你会得到一个看起来像这样的代码版本(我为诊断目的添加了一些小的输出更改):


static void Main(string[] args)

{

    const int firstValue = 30;

    const int secondValues = 20;

    const int thirdValues = 10;


    var process = new BlockingCollection<int>() { firstValue };


    var parallelOptions = new ParallelOptions() { MaxDegreeOfParallelism = Environment.ProcessorCount };

    int totalItemCount = process.Count;


    OrderablePartitioner<int> partitioner = Partitioner.Create(process.GetConsumingEnumerable(), EnumerablePartitionerOptions.NoBuffering);


    Parallel.ForEach(partitioner, parallelOptions, (item, state, i) =>

    {

        string message;


        if (item > secondValues)

        {

            // Some add 2 items

            Interlocked.Add(ref totalItemCount, 2);

            process.Add(item - 1);

            process.Add(item - 1);

            message = $"{DateTime.Now.ToLongTimeString()}: process.Count: {process.Count} | item: {item} | Added: 2";

        }

        else if (item > thirdValues)

        {

            // Some add 1 item

            Interlocked.Increment(ref totalItemCount);

            process.Add(item - 1);

            message = $"{DateTime.Now.ToLongTimeString()}: process.Count: {process.Count}| item: {item} | Added: 1";

        }

        else

        {

            // Some add 0 items

            message = $"{DateTime.Now.ToLongTimeString()}: process.Count: {process.Count}| item: {item} | Added: 0";

        }


        int newCount = Interlocked.Decrement(ref totalItemCount);


        if (newCount == 0)

        {

            process.CompleteAdding();

        }


        Console.WriteLine($"{message} | newCount: {newCount} | i: {i}");

    });


    // Parallel.ForEach will exit

    Console.WriteLine("Completed Processing");    

    Console.ReadKey();

}



查看完整回答
反对 回复 2022-11-21
  • 1 回答
  • 0 关注
  • 106 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信