我正在使用消息队列中的消息并使用Task.Run(). 但我想将消费速度限制在某个最大线程数,并且在线程数低于该数量之前不从消息队列中消费。假设我想要最多 100 个线程。在这种情况下,当达到 100 个线程时,它应该停止从消息队列中消费。当消息处理任务完成并且线程数下降到 99 时,它应该从队列中再消费一条消息。我尝试用于TransformBlock此目的,这里是用于演示目的的示例代码:public partial class MainWindow : Window { object syncObj = new object(); int i = 0; public MainWindow() { InitializeComponent(); } private async Task<bool> ProcessMessage(string message) { await Task.Delay(5000); lock (syncObj) { i++; System.Diagnostics.Debug.WriteLine(i); } return true; } private async void Button_Click(object sender, RoutedEventArgs e) { var processor = new TransformBlock<string, bool>( (str) => ProcessMessage(str), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 100 } ); for(int i = 0; i < 1000; i++) { await processor.SendAsync("a"); } }}限制并行任务的数量按预期工作,但所有消息都会立即发送到 TransformBlock,因此SendAsync循环在任务处理之前结束。我希望它继续接受消息,只要线程数低于最大值。允许并行度,但在达到 100 时等待。有没有办法使用 TransformBlock 来做到这一点,或者我应该诉诸其他方法?
1 回答
偶然的你
TA贡献1841条经验 获得超3个赞
数据流块具有输入缓冲区。该输入缓冲区充当队列。
如果您想将消息保留在自己的队列中,您可以通过限制数据流块愿意接收的项目数量来完成您想要的事情:
var processor = new TransformBlock<string, bool>(
(str) => ProcessMessage(str),
new ExecutionDataflowBlockOptions
{
BoundedCapacity = 100,
MaxDegreeOfParallelism = 100,
}
);
请注意,这BoundedCapacity包括块正在处理的项目。由于BoundedCapacity == MaxDegreeOfParallelism,这实际上会关闭数据流块的队列。
因此 SendAsync 循环在任务处理之前结束。
当有(最多)100 个任务需要处理时,它仍然会结束。如果您想等到所有项目完成处理,请调用Complete()和await Completed。
- 1 回答
- 0 关注
- 73 浏览
添加回答
举报
0/150
提交
取消