为了账号安全,请及时绑定邮箱和手机立即绑定

在低于正常优先级的 Windows 服务中停止 Parallel.ForEach

在低于正常优先级的 Windows 服务中停止 Parallel.ForEach

C#
jeck猫 2021-11-28 20:04:07
Parallel.ForEach我的 Windows 服务中有一个代码。如果ParallelOptions.MaxDegreeOfParallelism设置为 -1,我将使用大部分 CPU。但是停止服务会持续半分钟。一些应该接收服务应该停止的信号的内部控制器线程因处理器时间不足而被饿死。我将进程优先级设置为低于正常值,但这可能与此处无关。即使所有线程都忙,如何缩短停止服务的时间?我想暂时降低线程池中线程的优先级,因为我没有任何异步代码,但 Internet 说这是一个坏主意,所以在这里要求一种“正确”的方法。螺纹(包括操作系统和.NET)在所有情况下之间的不同OnStart和OnStop。此外,如果停止时间很长,那么OnStop有时最终会被调用的 OS 线程是一个新线程,不会更早地显示在日志中。要构建此代码,请创建新的 Windows 服务项目,从设计器添加 ProjectInstaller 类,将 Account 更改为 LocalService,并使用 InstallUtil 安装一次。确保 LocalService 可以写入 C:\Temp。public partial class Service1 : ServiceBase{    private ManualResetEvent stopEvent = new ManualResetEvent(false);    private Task mainTask;    private StreamWriter writer = File.AppendText(@"C:\Temp\Log.txt");    public Service1()    {        InitializeComponent();        writer.AutoFlush = true;    }    protected override void OnStart(string[] args)    {        Log("--------------");        Log("OnStart");        mainTask = Task.Run(new Action(Run));    }    protected override void OnStop()    {        Log("OnStop");        stopEvent.Set();        mainTask.Wait();        Log("--------------");    }    private void Log(string line)    {        writer.WriteLine(String.Format("{0:yyyy-MM-dd HH:mm:ss.fff}: [{1,2}] {2}",            DateTime.Now, Thread.CurrentThread.ManagedThreadId, line));    }
查看完整描述

3 回答

?
千万里不及你

TA贡献1784条经验 获得超9个赞

这段代码将在一两秒内停止服务,而已经在计算的线程只有在完成实际工作后才会结束。正如您在服务中看到的那样,OnStop 方法立即接收信号。但是,TaskManager 显示与服务关联的进程只有在消费线程全部完成后才会停止。


这使用了一个单独的线程正在填充的字符串(路径)的 BlockingCollection。并且有许多低优先级的线程会消耗字符串。


public partial class Service1 : ServiceBase

{

    private StreamWriter writer = File.AppendText(@"C:\temp\Log.txt");


    const int nbTreads = 30;

    BlockingCollection<string> dataItems;

    bool stopCompute = false;

    List<Thread> threads = new List<Thread>();

    Thread threadProd;

    private object aLock = new object();


    public Service1()

    {

        InitializeComponent();


        dataItems = new BlockingCollection<string>(nbTreads);


        writer.AutoFlush = true;

    }



    protected override void OnStart(string[] args)

    {

        Log("--------------");

        Log("OnStart");

        threadProd = new Thread(new ThreadStart(ProduireNomFichier));

        threadProd.Start();

        Thread.Sleep(1000); // fill the collection a little

        for (int i = 0; i < nbTreads; i++)

        {

            Thread threadRun = new Thread(() => Run());

            threadRun.Priority = ThreadPriority.Lowest;

            threadRun.Start();

            threads.Add(threadRun);

        }

    }


    private void ProduireNomFichier()

    {

        foreach (string nomFichier in Directory.EnumerateFiles(Environment.SystemDirectory))

        {

            dataItems.Add(nomFichier);

        }

    }


    protected override void OnStop()

    {

        lock (aLock)

        {

            stopCompute = true;

        }

        Log("OnStop");

        Log("--------------");

        threadProd.Abort();

    }


    private void Log(string line)

    {

        writer.WriteLine(String.Format("{0:yyyy-MM-dd HH:mm:ss.fff}: [{1,2}] {2}",

            DateTime.Now, Thread.CurrentThread.ManagedThreadId, line));

    }


    private void Run()

    {

        try

        {

            using (var sha = SHA256.Create())

            {

                while (dataItems.TryTake(out string fileName))

                {

                    lock (aLock)

                    {

                        if (stopCompute) return;

                    }

                    try

                    {

                        var hash = sha.ComputeHash(File.ReadAllBytes(fileName).OrderBy(x => x).ToArray());

                        Log(String.Format("file={0}, sillyhash={1}", fileName, Convert.ToBase64String(hash)));

                    }

                    catch (Exception ex)

                    {

                        Log(String.Format("file={0}, exception={1}", fileName, ex.Message));

                    }

                }

            }

        }

        catch (Exception ex)

        {

            Log(String.Format("exception={0}", ex.Message));

        }

    }

}


查看完整回答
反对 回复 2021-11-28
?
函数式编程

TA贡献1807条经验 获得超9个赞

在 Parallel.Foreach 中,您读取文件的所有字节,然后使用 LINQ 对它们进行排序。这效率不高。尝试使用 Array.Sort。对于 25 Mb 的文件,速度可以提高 85%。


Array.Sort 2230 ms

OrderBy 14650 ms

而且因为 OnStop 方法会等待任何已经开始的迭代结束,所以它可以更快地停止您的服务。


var fileBinary = File.ReadAllBytes(fileName);

Array.Sort(fileBinary);

var hash = sha.ComputeHash(fileBinary);


查看完整回答
反对 回复 2021-11-28
?
SMILET

TA贡献1796条经验 获得超4个赞

这是一个工作代码。它立即停止。请注意,主要思想来自:SylF。


但我不能给出一个明确的解释为什么会发生......更新(在你下面的评论之后):你找到了原因,它很好地解释了为什么你有这种行为。谢谢!我真的很高兴知道。


尽管这项工作是在低优先级线程中完成的,但在 CPU 几乎没有工作的机器上,您应该不会注意到任何额外的延迟。


抱歉,我混淆了您的代码示例以实现一些测试。但主要思想是改变调度程序(似乎不推荐)。但这是我找到的唯一方法。


代码:


using System;

using System.Collections.Generic;

using System.ComponentModel;

using System.Data;

using System.Diagnostics;

using System.IO;

using System.Linq;

using System.Security.Cryptography;

using System.ServiceProcess;

using System.Text;

using System.Threading;

using System.Threading.Tasks;


namespace StackOverflowQuestionWindowsService1

{

    public partial class Service1 : ServiceBase

    {

        private ManualResetEvent stopEvent = new ManualResetEvent(false);

        private Task mainTask;

        private StreamWriter writer = File.CreateText(@"C:\Temp\Log.txt");     //TAKE CARE - I do not append anymore  ********

        private CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();

        private int count = 0;


        public Service1()

        {

            InitializeComponent();


            writer.AutoFlush = true;

        }


        protected override void OnStart(string[] args)

        {

            Log("--------------");

            Log("OnStart");


            Task.Run(()=>Run());

        }


        protected override void OnStop()

        {

            Log("OnStop with actual thread count: " + Process.GetCurrentProcess().Threads.Count);


            cancellationTokenSource.Cancel();

        }


        private void Log(string line)

        {

            writer.WriteLine(String.Format("{0:yyyy-MM-dd HH:mm:ss.fff}: [{1,2}] {2}",

                DateTime.Now, Thread.CurrentThread.ManagedThreadId, line));

        }


        private void Run()

        {

            Stopwatch stopWatchTotal = new Stopwatch();

            stopWatchTotal.Start();


            try

            {

                using (var sha = SHA256.Create())

                {

                    var parallelOptions = new ParallelOptions();

                    parallelOptions.MaxDegreeOfParallelism = -1;

                    parallelOptions.CancellationToken = cancellationTokenSource.Token;

                    parallelOptions.TaskScheduler = new PriorityScheduler(ThreadPriority.Lowest);


                    Parallel.ForEach(Directory.EnumerateFiles(Environment.SystemDirectory),

                        parallelOptions, (fileName, parallelLoopState) =>

                        {

                            // Thread.CurrentThread.Priority = ThreadPriority.Lowest;

                            Stopwatch stopWatch = new Stopwatch();

                            stopWatch.Start();


                            Interlocked.Increment(ref count);


                            if (parallelOptions.CancellationToken.IsCancellationRequested)

                            {

                                Log(String.Format($"{count}"));

                                return;

                            }


                            try

                            {

                                var hash = sha.ComputeHash(File.ReadAllBytes(fileName).OrderBy(x => x).ToArray());

                                stopWatch.Stop();

                                Log(FormatTicks(stopWatch.ElapsedTicks));

                                Log(String.Format($"{count}, {FormatTicks(stopWatch.ElapsedTicks)}, file={fileName}, sillyhash={Convert.ToBase64String(hash)}"));

                            }

                            catch (Exception ex)

                            {

                                Log(String.Format($"{count} file={fileName}, exception={ex.Message}"));

                            }

                        });

                }

            }

            catch (Exception ex)

            {

                Log(String.Format("exception={0}", ex.Message));

            }


            stopWatchTotal.Stop();


            Log(FormatTicks(stopWatchTotal.ElapsedTicks));


            writer.Close();

            Process.GetCurrentProcess().Kill();

        }


        private string FormatTicks(long ticks)

        {

            return new TimeSpan(ticks).ToString();

        }

    }

}

优先调度程序:(感谢 Roman Starkov 在:来自微软Bnaya Eshet 的StackOverflow)


using System;

using System.Collections.Concurrent;

using System.Collections.Generic;

using System.Linq;

using System.Text;

using System.Threading;

using System.Threading.Tasks;


namespace StackOverflowQuestionWindowsService1

{

    public class PriorityScheduler : TaskScheduler

    {

        public static PriorityScheduler AboveNormal = new PriorityScheduler(ThreadPriority.AboveNormal);

        public static PriorityScheduler BelowNormal = new PriorityScheduler(ThreadPriority.BelowNormal);

        public static PriorityScheduler Lowest = new PriorityScheduler(ThreadPriority.Lowest);


        private BlockingCollection<Task> _tasks = new BlockingCollection<Task>();

        private Thread[] _threads;

        private ThreadPriority _priority;

        private readonly int _maximumConcurrencyLevel = Math.Max(1, Environment.ProcessorCount);


        public PriorityScheduler(ThreadPriority priority)

        {

            _priority = priority;

        }


        public override int MaximumConcurrencyLevel

        {

            get { return _maximumConcurrencyLevel; }

        }


        protected override IEnumerable<Task> GetScheduledTasks()

        {

            return _tasks;

        }


        protected override void QueueTask(Task task)

        {

            _tasks.Add(task);


            if (_threads == null)

            {

                _threads = new Thread[_maximumConcurrencyLevel];

                for (int i = 0; i < _threads.Length; i++)

                {

                    int local = i;

                    _threads[i] = new Thread(() =>

                    {

                        foreach (Task t in _tasks.GetConsumingEnumerable())

                            base.TryExecuteTask(t);

                    });

                    _threads[i].Name = string.Format("PriorityScheduler: ", i);

                    _threads[i].Priority = _priority;

                    _threads[i].IsBackground = true;

                    _threads[i].Start();

                }

            }

        }


        protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)

        {

            return false; // we might not want to execute task that should schedule as high or low priority inline

        }

    }

}



查看完整回答
反对 回复 2021-11-28
  • 3 回答
  • 0 关注
  • 238 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信