限制异步任务我想运行一堆异步任务,并限制在任何给定时间可以完成的任务数量。假设您有1000个网址,并且您只希望一次打开50个请求; 但只要一个请求完成,您就会打开与列表中下一个URL的连接。这样,一次只打开50个连接,直到URL列表用完为止。如果可能的话,我也想利用给定数量的线程。我提出了一种扩展方法,ThrottleTasksAsync可以实现我想要的功能。那里有更简单的解决方案吗?我认为这是一种常见的情况。用法:class Program{
static void Main(string[] args)
{
Enumerable.Range(1, 10).ThrottleTasksAsync(5, 2, async i => { Console.WriteLine(i); return i; }).Wait();
Console.WriteLine("Press a key to exit...");
Console.ReadKey(true);
}}这是代码:static class IEnumerableExtensions{
public static async Task<Result_T[]> ThrottleTasksAsync<Enumerable_T, Result_T>(this IEnumerable<Enumerable_T> enumerable, int maxConcurrentTasks, int maxDegreeOfParallelism, Func<Enumerable_T, Task<Result_T>> taskToRun)
{
var blockingQueue = new BlockingCollection<Enumerable_T>(new ConcurrentBag<Enumerable_T>());
var semaphore = new SemaphoreSlim(maxConcurrentTasks);
// Run the throttler on a separate thread.
var t = Task.Run(() =>
{
foreach (var item in enumerable)
{
// Wait for the semaphore
semaphore.Wait();
blockingQueue.Add(item);
}
blockingQueue.CompleteAdding();
});
var taskList = new List<Task<Result_T>>();
Parallel.ForEach(IterateUntilTrue(() => blockingQueue.IsCompleted), new ParallelOptions { MaxDegreeOfParallelism = maxDegreeOfParallelism },
_ =>但是,线程池快速耗尽,你不能做async/ await。额外: 为了解决调用时BlockingCollection抛出异常的问题,我正在使用带超时的重载。如果我没有使用超时,它会破坏使用的目的,因为不会阻止。有没有更好的办法?理想情况下,会有一种方法。Take()CompleteAdding()TryTakeTryTakeBlockingCollectionTryTakeTakeAsync
3 回答
至尊宝的传说
TA贡献1789条经验 获得超10个赞
根据要求,这是我最终使用的代码。
工作在主 - 详细配置中设置,每个主服务器作为批处理进行处理。每个工作单元都以这种方式排队:
var success = true;// Start processing all the master records.Master master;while (null != (master = await StoredProcedures.ClaimRecordsAsync(...))){ await masterBuffer.SendAsync(master);}// Finished sending master recordsmasterBuffer.Complete();// Now, wait for all the batches to complete.await batchAction.Completion;return success;
Masters一次缓冲一个,以节省其他外部进程的工作。每个主人的详细信息都通过以下方式发送给工作人员masterTransform
TransformManyBlock
。BatchedJoinBlock
还创建了A 以在一批中收集详细信息。
实际工作是以detailTransform
TransformBlock
异步方式完成的,每次150个。BoundedCapacity
设置为300以确保太多的Masters不会在链的开头进行缓冲,同时还留出足够的空间来排列足够的详细记录以允许一次处理150条记录。该块输出object
到它的目标,因为它是整个取决于它是否是一个链接过滤Detail
或Exception
。
所述batchAction
ActionBlock
收集来自所有批次的输出,并且执行散装数据库更新,错误日志等。对于每个批次。
将有几个BatchedJoinBlock
s,每个主人一个。由于每个ISourceBlock
都是按顺序输出的,并且每个批次只接受与一个主数据相关联的详细记录的数量,因此将按顺序处理批次。每个块仅输出一个组,并在完成时取消链接。只有最后一个批处理块将其完成传播到最终ActionBlock
。
数据流网络:
// The dataflow networkBufferBlock<Master> masterBuffer = null;TransformManyBlock<Master, Detail> masterTransform = null;TransformBlock<Detail, object> detailTransform = null;ActionBlock<Tuple<IList<object>, IList<object>>> batchAction = null;// Buffer master records to enable efficient throttling.masterBuffer = new BufferBlock<Master>(new DataflowBlockOptions { BoundedCapacity = 1 });// Sequentially transform master records into a stream of detail records.masterTransform = new TransformManyBlock<Master, Detail>(async masterRecord =>{ var records = await StoredProcedures.GetObjectsAsync(masterRecord); // Filter the master records based on some criteria here var filteredRecords = records; // Only propagate completion to the last batch var propagateCompletion = masterBuffer.Completion.IsCompleted && masterTransform.InputCount == 0; // Create a batch join block to encapsulate the results of the master record. var batchjoinblock = new BatchedJoinBlock<object, object>(records.Count(), new GroupingDataflowBlockOptions { MaxNumberOfGroups = 1 }); // Add the batch block to the detail transform pipeline's link queue, and link the batch block to the the batch action block. var detailLink1 = detailTransform.LinkTo(batchjoinblock.Target1, detailResult => detailResult is Detail); var detailLink2 = detailTransform.LinkTo(batchjoinblock.Target2, detailResult => detailResult is Exception); var batchLink = batchjoinblock.LinkTo(batchAction, new DataflowLinkOptions { PropagateCompletion = propagateCompletion }); // Unlink batchjoinblock upon completion. // (the returned task does not need to be awaited, despite the warning.) batchjoinblock.Completion.ContinueWith(task => { detailLink1.Dispose(); detailLink2.Dispose(); batchLink.Dispose(); }); return filteredRecords;}, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });// Process each detail record asynchronously, 150 at a time.detailTransform = new TransformBlock<Detail, object>(async detail => { try { // Perform the action for each detail here asynchronously await DoSomethingAsync(); return detail; } catch (Exception e) { success = false; return e; }}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 150, BoundedCapacity = 300 });// Perform the proper action for each batchbatchAction = new ActionBlock<Tuple<IList<object>, IList<object>>>(async batch =>{ var details = batch.Item1.Cast<Detail>(); var errors = batch.Item2.Cast<Exception>(); // Do something with the batch here}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });masterBuffer.LinkTo(masterTransform, new DataflowLinkOptions { PropagateCompletion = true });masterTransform.LinkTo(detailTransform, new DataflowLinkOptions { PropagateCompletion = true });
- 3 回答
- 0 关注
- 420 浏览
添加回答
举报
0/150
提交
取消