为了账号安全,请及时绑定邮箱和手机立即绑定

C# 带槽的多线程

C# 带槽的多线程

C#
莫回无 2023-09-24 16:00:35
我有一个检查代理服务器的函数,目前它仅检查多个线程并等待所有线程完成,直到下一组线程开始。是否可以在允许的最大线程完成后立即启动一个新线程?for (int i = 0; i < listProxies.Count(); i+=nThreadsNum){                                  for (nCurrentThread = 0; nCurrentThread < nThreadsNum; nCurrentThread++)    {        if (nCurrentThread < nThreadsNum)        {           string strProxyIP = listProxies[i + nCurrentThread].sIPAddress;           int nPort = listProxies[i + nCurrentThread].nPort;                    tasks.Add(Task.Factory.StartNew<ProxyAddress>(() => CheckProxyServer(strProxyIP, nPort, nCurrentThread)));        }     }                     Task.WaitAll(tasks.ToArray());     foreach (var tsk in tasks)     {        ProxyAddress result = tsk.Result;        UpdateProxyDBRecord(result.sIPAddress, result.bOnlineStatus);     }     tasks.Clear();                }
查看完整描述

4 回答

?
素胚勾勒不出你

TA贡献1827条经验 获得超9个赞

这看起来简单得多:


int numberProcessed = 0;

Parallel.ForEach(listProxies,

  new ParallelOptions { MaxDegreeOfParallelism = nThreadsNum },

  (p)=> {

    var result = CheckProxyServer(p.sIPAddress, s.nPort, Thread.CurrentThread.ManagedThreadId);

    UpdateProxyDBRecord(result.sIPAddress, result.bOnlineStatus);

    Interlocked.Increment(numberProcessed);

});

带插槽:


var obj = new Object();

var slots = new List<int>();

Parallel.ForEach(listProxies,

  new ParallelOptions { MaxDegreeOfParallelism = nThreadsNum },

  (p)=> {

    int threadId = Thread.CurrentThread.ManagedThreadId;

    int slot = slots.IndexOf(threadId);

    if (slot == -1)

    {

      lock(obj)

      {

        slots.Add(threadId);

      }

      slot = slots.IndexOf(threadId);

    }

    var result = CheckProxyServer(p.sIPAddress, s.nPort, slot);

    UpdateProxyDBRecord(result.sIPAddress, result.bOnlineStatus);

});

我在那里采取了一些捷径来保证线程安全。您不必执行正常的检查-锁定-检查舞蹈,因为永远不会有两个线程尝试将相同的 threadid 添加到列表中,因此第二次检查将始终失败并且不需要。其次,出于同样的原因,我认为您也不需要锁定外部 IndexOf 。这使得它成为一个非常高效的并发例程,无论可枚举中有多少项,都很少锁定(它应该只锁定 nThreadsNum 次)。


查看完整回答
反对 回复 2023-09-24
?
人到中年有点甜

TA贡献1895条经验 获得超7个赞

另一个解决方案是使用 aSemaphoreSlim或使用 的生产者-消费者模式BlockinCollection<T>。两种解决方案都支持取消。

信号量瘦身


private async Task CheckProxyServerAsync(IEnumerable<object> proxies)

{

  var tasks = new List<Task>();

  int currentThreadNumber = 0;

  int maxNumberOfThreads = 8;


  using (semaphore = new SemaphoreSlim(maxNumberOfThreads, maxNumberOfThreads))

  {

    foreach (var proxy in proxies)

    {

      // Asynchronously wait until thread is available if thread limit reached

      await semaphore.WaitAsync();


      string proxyIP = proxy.IPAddress;

      int port = proxy.Port;

      tasks.Add(Task.Run(() => CheckProxyServer(proxyIP, port, Interlocked.Increment(ref currentThreadNumber)))

        .ContinueWith(

          (task) =>

          {

            ProxyAddress result = task.Result;


            // Method call must be thread-safe!

            UpdateProxyDbRecord(result.IPAddress, result.OnlineStatus);


            Interlocked.Decrement(ref currentThreadNumber);


            // Allow to start next thread if thread limit was reached

            semaphore.Release();

          },

          TaskContinuationOptions.OnlyOnRanToCompletion));

    }


    // Asynchronously wait until all tasks are completed

    // to prevent premature disposal of semaphore

    await Task.WhenAll(tasks);

  }

}

生产者-消费者模式


// Uses a fixed number of same threads

private async Task CheckProxyServerAsync(IEnumerable<ProxyInfo> proxies)

{

  var pipe = new BlockingCollection<ProxyInfo>();

  int maxNumberOfThreads = 8;

  var tasks = new List<Task>();


  // Create all threads (count == maxNumberOfThreads)

  for (int currentThreadNumber = 0; currentThreadNumber < maxNumberOfThreads; currentThreadNumber++)

  {

    tasks.Add(

      Task.Run(() => ConsumeProxyInfo(pipe, currentThreadNumber)));

  }


  proxies.ToList().ForEach(pipe.Add);

  pipe.CompleteAdding();


  await Task.WhenAll(tasks);

}


private void ConsumeProxyInfo(BlockingCollection<ProxyInfo> proxiesPipe, int currentThreadNumber)

{

  while (!proxiesPipe.IsCompleted)

  {

    if (proxiesPipe.TryTake(out ProxyInfo proxy))

    {

      int port = proxy.Port;

      string proxyIP = proxy.IPAddress;

      ProxyAddress result = CheckProxyServer(proxyIP, port, currentThreadNumber); 


      // Method call must be thread-safe!

      UpdateProxyDbRecord(result.IPAddress, result.OnlineStatus);

    }

  }

}




查看完整回答
反对 回复 2023-09-24
?
30秒到达战场

TA贡献1828条经验 获得超6个赞

我建议稍微改变一下你的方法。不要启动和停止线程,而是将代理服务器数据放入并发队列中,每个代理服务器对应一个项目。然后创建固定数量的线程(或异步任务)来处理队列。在我看来,这更有可能提供平稳的性能(您不会一遍又一遍地启动和停止线程,这会产生开销)并且更容易编码。


一个简单的例子:


class ProxyChecker

{

    private ConcurrentQueue<ProxyInfo> _masterQueue = new ConcurrentQueue<ProxyInfo>();


    public ProxyChecker(IEnumerable<ProxyInfo> listProxies)

    {

        foreach (var proxy in listProxies)

        {

            _masterQueue.Enqueue(proxy);

        }

    }


    public async Task RunChecks(int maximumConcurrency)

    {

        var count = Math.Max(maximumConcurrency, _masterQueue.Count);

        var tasks = Enumerable.Range(0, count).Select( i => WorkerTask() ).ToList();

        await Task.WhenAll(tasks);

    }


    private async Task WorkerTask()

    {

        ProxyInfo proxyInfo;

        while ( _masterList.TryDequeue(out proxyInfo))

        {

            DoTheTest(proxyInfo.IP, proxyInfo.Port)

        }

    }


查看完整回答
反对 回复 2023-09-24
?
胡说叔叔

TA贡献1804条经验 获得超8个赞

如果我正确理解你的问题,这实际上是相当简单的await Task.WhenAny。基本上,您保留所有正在运行的任务的集合。一旦运行的任务达到一定数量,您将等待一个或多个任务完成,然后从集合中删除已完成的任务并继续添加更多任务。


下面是我的意思的一个例子:


        var tasks = new List<Task>();


        for (int i = 0; i < 20; i++)

        {

            // I want my list of tasks to contain at most 5 tasks at once

            if (tasks.Count == 5)

            {

                // Wait for at least one of the tasks to complete

                await Task.WhenAny(tasks.ToArray());


                // Remove all of the completed tasks from the list

                tasks = tasks.Where(t => !t.IsCompleted).ToList();

            }


            // Add some task to the list

            tasks.Add(Task.Factory.StartNew(async delegate ()

                {

                    await Task.Delay(1000);

                }));

        }


查看完整回答
反对 回复 2023-09-24
  • 4 回答
  • 0 关注
  • 103 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信