3 回答
![?](http://img1.sycdn.imooc.com/533e4c9c0001975102200220-100-100.jpg)
TA贡献1784条经验 获得超9个赞
这段代码将在一两秒内停止服务,而已经在计算的线程只有在完成实际工作后才会结束。正如您在服务中看到的那样,OnStop 方法立即接收信号。但是,TaskManager 显示与服务关联的进程只有在消费线程全部完成后才会停止。
这使用了一个单独的线程正在填充的字符串(路径)的 BlockingCollection。并且有许多低优先级的线程会消耗字符串。
public partial class Service1 : ServiceBase
{
private StreamWriter writer = File.AppendText(@"C:\temp\Log.txt");
const int nbTreads = 30;
BlockingCollection<string> dataItems;
bool stopCompute = false;
List<Thread> threads = new List<Thread>();
Thread threadProd;
private object aLock = new object();
public Service1()
{
InitializeComponent();
dataItems = new BlockingCollection<string>(nbTreads);
writer.AutoFlush = true;
}
protected override void OnStart(string[] args)
{
Log("--------------");
Log("OnStart");
threadProd = new Thread(new ThreadStart(ProduireNomFichier));
threadProd.Start();
Thread.Sleep(1000); // fill the collection a little
for (int i = 0; i < nbTreads; i++)
{
Thread threadRun = new Thread(() => Run());
threadRun.Priority = ThreadPriority.Lowest;
threadRun.Start();
threads.Add(threadRun);
}
}
private void ProduireNomFichier()
{
foreach (string nomFichier in Directory.EnumerateFiles(Environment.SystemDirectory))
{
dataItems.Add(nomFichier);
}
}
protected override void OnStop()
{
lock (aLock)
{
stopCompute = true;
}
Log("OnStop");
Log("--------------");
threadProd.Abort();
}
private void Log(string line)
{
writer.WriteLine(String.Format("{0:yyyy-MM-dd HH:mm:ss.fff}: [{1,2}] {2}",
DateTime.Now, Thread.CurrentThread.ManagedThreadId, line));
}
private void Run()
{
try
{
using (var sha = SHA256.Create())
{
while (dataItems.TryTake(out string fileName))
{
lock (aLock)
{
if (stopCompute) return;
}
try
{
var hash = sha.ComputeHash(File.ReadAllBytes(fileName).OrderBy(x => x).ToArray());
Log(String.Format("file={0}, sillyhash={1}", fileName, Convert.ToBase64String(hash)));
}
catch (Exception ex)
{
Log(String.Format("file={0}, exception={1}", fileName, ex.Message));
}
}
}
}
catch (Exception ex)
{
Log(String.Format("exception={0}", ex.Message));
}
}
}
![?](http://img1.sycdn.imooc.com/54584f3100019e9702200220-100-100.jpg)
TA贡献1807条经验 获得超9个赞
在 Parallel.Foreach 中,您读取文件的所有字节,然后使用 LINQ 对它们进行排序。这效率不高。尝试使用 Array.Sort。对于 25 Mb 的文件,速度可以提高 85%。
Array.Sort 2230 ms
OrderBy 14650 ms
而且因为 OnStop 方法会等待任何已经开始的迭代结束,所以它可以更快地停止您的服务。
var fileBinary = File.ReadAllBytes(fileName);
Array.Sort(fileBinary);
var hash = sha.ComputeHash(fileBinary);
![?](http://img1.sycdn.imooc.com/5333a0aa000121d702000200-100-100.jpg)
TA贡献1796条经验 获得超4个赞
这是一个工作代码。它立即停止。请注意,主要思想来自:SylF。
但我不能给出一个明确的解释为什么会发生......更新(在你下面的评论之后):你找到了原因,它很好地解释了为什么你有这种行为。谢谢!我真的很高兴知道。
尽管这项工作是在低优先级线程中完成的,但在 CPU 几乎没有工作的机器上,您应该不会注意到任何额外的延迟。
抱歉,我混淆了您的代码示例以实现一些测试。但主要思想是改变调度程序(似乎不推荐)。但这是我找到的唯一方法。
代码:
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Security.Cryptography;
using System.ServiceProcess;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace StackOverflowQuestionWindowsService1
{
public partial class Service1 : ServiceBase
{
private ManualResetEvent stopEvent = new ManualResetEvent(false);
private Task mainTask;
private StreamWriter writer = File.CreateText(@"C:\Temp\Log.txt"); //TAKE CARE - I do not append anymore ********
private CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
private int count = 0;
public Service1()
{
InitializeComponent();
writer.AutoFlush = true;
}
protected override void OnStart(string[] args)
{
Log("--------------");
Log("OnStart");
Task.Run(()=>Run());
}
protected override void OnStop()
{
Log("OnStop with actual thread count: " + Process.GetCurrentProcess().Threads.Count);
cancellationTokenSource.Cancel();
}
private void Log(string line)
{
writer.WriteLine(String.Format("{0:yyyy-MM-dd HH:mm:ss.fff}: [{1,2}] {2}",
DateTime.Now, Thread.CurrentThread.ManagedThreadId, line));
}
private void Run()
{
Stopwatch stopWatchTotal = new Stopwatch();
stopWatchTotal.Start();
try
{
using (var sha = SHA256.Create())
{
var parallelOptions = new ParallelOptions();
parallelOptions.MaxDegreeOfParallelism = -1;
parallelOptions.CancellationToken = cancellationTokenSource.Token;
parallelOptions.TaskScheduler = new PriorityScheduler(ThreadPriority.Lowest);
Parallel.ForEach(Directory.EnumerateFiles(Environment.SystemDirectory),
parallelOptions, (fileName, parallelLoopState) =>
{
// Thread.CurrentThread.Priority = ThreadPriority.Lowest;
Stopwatch stopWatch = new Stopwatch();
stopWatch.Start();
Interlocked.Increment(ref count);
if (parallelOptions.CancellationToken.IsCancellationRequested)
{
Log(String.Format($"{count}"));
return;
}
try
{
var hash = sha.ComputeHash(File.ReadAllBytes(fileName).OrderBy(x => x).ToArray());
stopWatch.Stop();
Log(FormatTicks(stopWatch.ElapsedTicks));
Log(String.Format($"{count}, {FormatTicks(stopWatch.ElapsedTicks)}, file={fileName}, sillyhash={Convert.ToBase64String(hash)}"));
}
catch (Exception ex)
{
Log(String.Format($"{count} file={fileName}, exception={ex.Message}"));
}
});
}
}
catch (Exception ex)
{
Log(String.Format("exception={0}", ex.Message));
}
stopWatchTotal.Stop();
Log(FormatTicks(stopWatchTotal.ElapsedTicks));
writer.Close();
Process.GetCurrentProcess().Kill();
}
private string FormatTicks(long ticks)
{
return new TimeSpan(ticks).ToString();
}
}
}
优先调度程序:(感谢 Roman Starkov 在:来自微软Bnaya Eshet 的StackOverflow)
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace StackOverflowQuestionWindowsService1
{
public class PriorityScheduler : TaskScheduler
{
public static PriorityScheduler AboveNormal = new PriorityScheduler(ThreadPriority.AboveNormal);
public static PriorityScheduler BelowNormal = new PriorityScheduler(ThreadPriority.BelowNormal);
public static PriorityScheduler Lowest = new PriorityScheduler(ThreadPriority.Lowest);
private BlockingCollection<Task> _tasks = new BlockingCollection<Task>();
private Thread[] _threads;
private ThreadPriority _priority;
private readonly int _maximumConcurrencyLevel = Math.Max(1, Environment.ProcessorCount);
public PriorityScheduler(ThreadPriority priority)
{
_priority = priority;
}
public override int MaximumConcurrencyLevel
{
get { return _maximumConcurrencyLevel; }
}
protected override IEnumerable<Task> GetScheduledTasks()
{
return _tasks;
}
protected override void QueueTask(Task task)
{
_tasks.Add(task);
if (_threads == null)
{
_threads = new Thread[_maximumConcurrencyLevel];
for (int i = 0; i < _threads.Length; i++)
{
int local = i;
_threads[i] = new Thread(() =>
{
foreach (Task t in _tasks.GetConsumingEnumerable())
base.TryExecuteTask(t);
});
_threads[i].Name = string.Format("PriorityScheduler: ", i);
_threads[i].Priority = _priority;
_threads[i].IsBackground = true;
_threads[i].Start();
}
}
}
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
return false; // we might not want to execute task that should schedule as high or low priority inline
}
}
}
- 3 回答
- 0 关注
- 238 浏览
添加回答
举报