为了账号安全,请及时绑定邮箱和手机立即绑定

使用 SemaphoreSlim 和 Dictionary 等待 NetMQ 回复

使用 SemaphoreSlim 和 Dictionary 等待 NetMQ 回复

C#
慕沐林林 2023-09-09 16:42:03
我有两个应用程序使用 NetMQ 相互通信。第一个应用程序内置了一个 API 控制器,第二个应用程序(引擎)在必要时与第一个应用程序来回通信。所有 API 控制器函数都是异步任务,因为我们期望同时发生许多请求。其中一个 API 控制器函数需要首先向第二个应用程序(引擎)发送 NetMQ 消息,并等待其响应,然后再将结果返回给 API 函数请求者(API 用户)。它需要以异步方式完成所有这些,而不需要从线程池中获取,因为正如我之前所说,我们可能有大量请求不断地访问此控制器。当 SemaphoreSlim 等待时,第二个应用程序(引擎)正在更新数据库中特定记录的状态,并且我需要在 API 控制器函数中读取该记录的状态并将其返回给 API 用户,如果我没有 SemaphoreSlim 等待设计,那么代码当然会提前读取数据库记录并且不会返回正确的状态。因此,解决方案是创建等待,允许引擎更新状态,然后通知 API 控制器函数它可以从数据库读取新更新的状态。一位同事告诉我,使用 SemaphoreSlim 和 Dictionary 设置通过锁定在函数中创建等待,并且仅在收到来自引擎的 NetMQ 回复时才从字典中释放 SemaphoreSlim。然后代码自然会继续,我将检查数据库的更新状态并将其返回给 API 用户。问题在于 API 控制器函数完全忽略了 SemaphoreSlim 锁并过早地从数据库读取。最有可能的是因为它是异步的,所以我的问题是,如何使 SempahoreSlim 锁定和读取数据库代码以异步方式一起工作,而不锁定线程?我相信代码会让您更好地理解我正在尝试做的事情。1. API控制器部分功能代码://Call EngineMessenging.Queue.Enqueue(OrderPositionText + " " + UserId + " " + AssetPairId + " " + OrderType + " " + OrderRequestId + " " + rAmount + " " + rPrice + " " + Stop);//Create SemaphoreSlim Lock and add it to global Dictionary     var NewSemaphore = new SemaphoreSlim(1, 1);await NewSemaphore.WaitAsync().ConfigureAwait(false);ApiHub.UserSemaphoreDictionary.TryAdd(UserId, NewSemaphore);//Lock was released lets now read record in Database then return appropriate result to API user.                Debug.WriteLine("Checking order status for " + uuid);                OrderRequest OrderRequestRec = await _context.OrderRequest.Where(x => x.UUID == uuid).FirstOrDefaultAsync();    if (OrderRequestRec != null)                    {                        Debug.WriteLine("ORDER STATUS: " + OrderRequestRec.Status);    }2.这是另一个类中接收NetMQ消息并释放SemaphoreSlim的代码:                        //API Semaphore Release lock                        SemaphoreSlim checkUserId;                        if (ApiHub.UserSemaphoreDictionary.TryGetValue(UserId, out checkUserId)) {                            Debug.WriteLine("RELEASING LOCK: " + UserId);                            checkUserId.Release();                        }结果是先读取Database,然后释放SemaphoreSlim。我需要释放 SemaphoreSlim,然后读取数据库。
查看完整描述

1 回答

?
慕婉清6462132

TA贡献1804条经验 获得超2个赞

对于这个场景,我使用 aTaskCompletionSource<bool>作为一种高级事件。它具有Task可直接等待的属性。然后,您可以从接收消息的类中调用SetResult(true)来完成任务并允许等待者继续。

请注意,类型bool是无关紧要的:任务必须具有某种结果类型,但在这里我们只是将其用作表示事件已发生的信号方式(也就是说,我们实际上只想要 a 等待,而不是TaskTask<T>) 。

编辑以响应OP的编辑:

是的,这是完全异步的。该await关键字使 C# 编译器将控制器的代码分解为多个部分,因此后面的部分成为await可以稍后恢复的“延续”。它将延续添加到任务的完成处理程序列表中,然后放弃线程;控制器方法不再执行,因此不会耗尽线程/堆栈。

在您调用的类中SetResult,这实际上会导致任务通知(执行)处理程序,从而导致控制器的代码唤醒并完成。无需创建额外的线程;现有的被使用,并且一旦没有立即要做的工作就被放弃。


查看完整回答
反对 回复 2023-09-09
  • 1 回答
  • 0 关注
  • 73 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信