为了账号安全,请及时绑定邮箱和手机立即绑定

使用并行性时限制消息队列消耗

使用并行性时限制消息队列消耗

C#
慕田峪9158850 2023-09-09 16:21:58
我正在使用消息队列中的消息并使用Task.Run(). 但我想将消费速度限制在某个最大线程数,并且在线程数低于该数量之前不从消息队列中消费。假设我想要最多 100 个线程。在这种情况下,当达到 100 个线程时,它应该停止从消息队列中消费。当消息处理任务完成并且线程数下降到 99 时,它应该从队列中再消费一条消息。我尝试用于TransformBlock此目的,这里是用于演示目的的示例代码:public partial class MainWindow : Window    {        object syncObj = new object();        int i = 0;        public MainWindow()        {            InitializeComponent();        }        private async Task<bool> ProcessMessage(string message)        {            await Task.Delay(5000);            lock (syncObj)            {                i++;                System.Diagnostics.Debug.WriteLine(i);            }            return true;        }        private async void Button_Click(object sender, RoutedEventArgs e)        {            var processor = new TransformBlock<string, bool>(                    (str) => ProcessMessage(str),                    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 100 }                    );            for(int i = 0; i < 1000; i++)            {                await processor.SendAsync("a");            }    }}限制并行任务的数量按预期工作,但所有消息都会立即发送到 TransformBlock,因此SendAsync循环在任务处理之前结束。我希望它继续接受消息,只要线程数低于最大值。允许并行度,但在达到 100 时等待。有没有办法使用 TransformBlock 来做到这一点,或者我应该诉诸其他方法?
查看完整描述

1 回答

?
偶然的你

TA贡献1841条经验 获得超3个赞

数据流块具有输入缓冲区。该输入缓冲区充当队列。


如果您想将消息保留在自己的队列中,您可以通过限制数据流块愿意接收的项目数量来完成您想要的事情:


var processor = new TransformBlock<string, bool>(

    (str) => ProcessMessage(str),

    new ExecutionDataflowBlockOptions

    {

      BoundedCapacity = 100,

      MaxDegreeOfParallelism = 100,

    }

);

请注意,这BoundedCapacity包括块正在处理的项目。由于BoundedCapacity == MaxDegreeOfParallelism,这实际上会关闭数据流块的队列。


因此 SendAsync 循环在任务处理之前结束。


当有(最多)100 个任务需要处理时,它仍然会结束。如果您想等到所有项目完成处理,请调用Complete()和await Completed。


查看完整回答
反对 回复 2023-09-09
  • 1 回答
  • 0 关注
  • 73 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信