4 回答
TA贡献1827条经验 获得超9个赞
这看起来简单得多:
int numberProcessed = 0;
Parallel.ForEach(listProxies,
new ParallelOptions { MaxDegreeOfParallelism = nThreadsNum },
(p)=> {
var result = CheckProxyServer(p.sIPAddress, s.nPort, Thread.CurrentThread.ManagedThreadId);
UpdateProxyDBRecord(result.sIPAddress, result.bOnlineStatus);
Interlocked.Increment(numberProcessed);
});
带插槽:
var obj = new Object();
var slots = new List<int>();
Parallel.ForEach(listProxies,
new ParallelOptions { MaxDegreeOfParallelism = nThreadsNum },
(p)=> {
int threadId = Thread.CurrentThread.ManagedThreadId;
int slot = slots.IndexOf(threadId);
if (slot == -1)
{
lock(obj)
{
slots.Add(threadId);
}
slot = slots.IndexOf(threadId);
}
var result = CheckProxyServer(p.sIPAddress, s.nPort, slot);
UpdateProxyDBRecord(result.sIPAddress, result.bOnlineStatus);
});
我在那里采取了一些捷径来保证线程安全。您不必执行正常的检查-锁定-检查舞蹈,因为永远不会有两个线程尝试将相同的 threadid 添加到列表中,因此第二次检查将始终失败并且不需要。其次,出于同样的原因,我认为您也不需要锁定外部 IndexOf 。这使得它成为一个非常高效的并发例程,无论可枚举中有多少项,都很少锁定(它应该只锁定 nThreadsNum 次)。
TA贡献1895条经验 获得超7个赞
另一个解决方案是使用 aSemaphoreSlim
或使用 的生产者-消费者模式BlockinCollection<T>
。两种解决方案都支持取消。
信号量瘦身
private async Task CheckProxyServerAsync(IEnumerable<object> proxies)
{
var tasks = new List<Task>();
int currentThreadNumber = 0;
int maxNumberOfThreads = 8;
using (semaphore = new SemaphoreSlim(maxNumberOfThreads, maxNumberOfThreads))
{
foreach (var proxy in proxies)
{
// Asynchronously wait until thread is available if thread limit reached
await semaphore.WaitAsync();
string proxyIP = proxy.IPAddress;
int port = proxy.Port;
tasks.Add(Task.Run(() => CheckProxyServer(proxyIP, port, Interlocked.Increment(ref currentThreadNumber)))
.ContinueWith(
(task) =>
{
ProxyAddress result = task.Result;
// Method call must be thread-safe!
UpdateProxyDbRecord(result.IPAddress, result.OnlineStatus);
Interlocked.Decrement(ref currentThreadNumber);
// Allow to start next thread if thread limit was reached
semaphore.Release();
},
TaskContinuationOptions.OnlyOnRanToCompletion));
}
// Asynchronously wait until all tasks are completed
// to prevent premature disposal of semaphore
await Task.WhenAll(tasks);
}
}
生产者-消费者模式
// Uses a fixed number of same threads
private async Task CheckProxyServerAsync(IEnumerable<ProxyInfo> proxies)
{
var pipe = new BlockingCollection<ProxyInfo>();
int maxNumberOfThreads = 8;
var tasks = new List<Task>();
// Create all threads (count == maxNumberOfThreads)
for (int currentThreadNumber = 0; currentThreadNumber < maxNumberOfThreads; currentThreadNumber++)
{
tasks.Add(
Task.Run(() => ConsumeProxyInfo(pipe, currentThreadNumber)));
}
proxies.ToList().ForEach(pipe.Add);
pipe.CompleteAdding();
await Task.WhenAll(tasks);
}
private void ConsumeProxyInfo(BlockingCollection<ProxyInfo> proxiesPipe, int currentThreadNumber)
{
while (!proxiesPipe.IsCompleted)
{
if (proxiesPipe.TryTake(out ProxyInfo proxy))
{
int port = proxy.Port;
string proxyIP = proxy.IPAddress;
ProxyAddress result = CheckProxyServer(proxyIP, port, currentThreadNumber);
// Method call must be thread-safe!
UpdateProxyDbRecord(result.IPAddress, result.OnlineStatus);
}
}
}
TA贡献1828条经验 获得超6个赞
我建议稍微改变一下你的方法。不要启动和停止线程,而是将代理服务器数据放入并发队列中,每个代理服务器对应一个项目。然后创建固定数量的线程(或异步任务)来处理队列。在我看来,这更有可能提供平稳的性能(您不会一遍又一遍地启动和停止线程,这会产生开销)并且更容易编码。
一个简单的例子:
class ProxyChecker
{
private ConcurrentQueue<ProxyInfo> _masterQueue = new ConcurrentQueue<ProxyInfo>();
public ProxyChecker(IEnumerable<ProxyInfo> listProxies)
{
foreach (var proxy in listProxies)
{
_masterQueue.Enqueue(proxy);
}
}
public async Task RunChecks(int maximumConcurrency)
{
var count = Math.Max(maximumConcurrency, _masterQueue.Count);
var tasks = Enumerable.Range(0, count).Select( i => WorkerTask() ).ToList();
await Task.WhenAll(tasks);
}
private async Task WorkerTask()
{
ProxyInfo proxyInfo;
while ( _masterList.TryDequeue(out proxyInfo))
{
DoTheTest(proxyInfo.IP, proxyInfo.Port)
}
}
}
TA贡献1804条经验 获得超8个赞
如果我正确理解你的问题,这实际上是相当简单的await Task.WhenAny。基本上,您保留所有正在运行的任务的集合。一旦运行的任务达到一定数量,您将等待一个或多个任务完成,然后从集合中删除已完成的任务并继续添加更多任务。
下面是我的意思的一个例子:
var tasks = new List<Task>();
for (int i = 0; i < 20; i++)
{
// I want my list of tasks to contain at most 5 tasks at once
if (tasks.Count == 5)
{
// Wait for at least one of the tasks to complete
await Task.WhenAny(tasks.ToArray());
// Remove all of the completed tasks from the list
tasks = tasks.Where(t => !t.IsCompleted).ToList();
}
// Add some task to the list
tasks.Add(Task.Factory.StartNew(async delegate ()
{
await Task.Delay(1000);
}));
}
- 4 回答
- 0 关注
- 112 浏览
添加回答
举报