我有一个 RabbitMQ 队列,我可以批量异步读取它,但我必须保留这些消息的顺序。我有一个名为的字段ServiceNumber,它定义了消息的唯一编号,我必须保持这个顺序。例如 SN1 SN2 SN1 SN1 SN1 SN2 1 2 3 4 5 6 在这种情况下,我们可以同时处理消息 1 和 2(它们具有不同的 SN),然后我们可以处理 3 和 6,然后是 4,然后是 5。我尝试通过ContinueWith以下方式通过链来实现它:private readonly Dictionary<string, Task> _currentTasks = new Dictionary<string, Task>();private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1);private async Task WrapMessageInQueue(string serviceNumber, Func<Task> taskFunc){ Task taskToAwait; await _semaphore.WaitAsync(); try { _currentTasks.TryGetValue(serviceNumber, out var task); if (task == null) task = Task.CompletedTask; taskToAwait = _currentTasks[serviceNumber] = task.ContinueWith(_ => taskFunc()); } finally { _semaphore.Release(); } await taskToAwait.ConfigureAwait(false);}void Main(){ Task.Run(async () => { var task1 = Task.Run(() => { return WrapMessageInQueue("10", async () => { await Task.Delay(5000); Console.WriteLine("first task finished"); }); }); while (task1.Status == TaskStatus.WaitingForActivation) { Console.WriteLine("waiting task to be picked by a scheduler. Status = {0}", task1.Status); await Task.Delay(100); } var task2 = Task.Run(() => { return WrapMessageInQueue("10", async () => { Console.WriteLine("second task finished"); }); }); await Task.WhenAll(new[] {task1, task2}); }).Wait();}这里的主要思想是第一个 RUNNED 任务应该在所有其余任务开始之前完成。所以我实现了一个字典,我在其中存储一个任务,然后每个后续任务都被添加到ContinueWith链中。因此,它会在前一个执行后严格执行。当第三个任务到达时,它会在队列中占据一席之地,依此类推。但由于某种原因它不起作用,输出是第二个任务完成第一个任务完成这段代码有什么问题?有没有更好的办法?
3 回答
陪伴而非守候
TA贡献1757条经验 获得超8个赞
有趣的方法。但是由于消息的处理(由 SN)无论如何都必须是顺序的,为什么每个消息都有一个任务?它只会让事情变得更复杂,因为您需要控制任务的执行顺序。
为什么没有收集器任务将传入的消息分类到队列中(按 SN)并为每个 SN 启动一个任务来处理队列?
- 3 回答
- 0 关注
- 184 浏览
添加回答
举报
0/150
提交
取消