我有一个问题,我有一个简单的订阅者和简单的发布者,它们看起来像:public async Task SendRequest(){ var topic = "SomeTopic"; var requestHash = Helpers.ReturnUniqueKey(DateTime.Now, topic); requestKeys.Add(requestHash); Console.WriteLine($"Key count {requestKeys.Count}"); var responseHandler = new Action<ResponseMessage>(response => { Console.WriteLine($"Key count {requestKeys.Count}"); foreach (var key in requestKeys) { Console.WriteLine($"Response { BitConverter.ToString(response.IdentyficationHash) } - Key { BitConverter.ToString(key) }"); if (!key.SequenceEqual(response.IdentyficationHash)) return; requestKeys.Remove(key); } }); bus.Subscribe(BusController.ManualRequest, responseHandler, configuration => configuration.WithTopic(BusController.ManualRequest)); bus.Publish(someRequest, topic); async Task WaitForItToWorkAsync() { var retry = 0; var complete = false; while (!complete) { if (retry >= 20) return ; // Ill ass some msg leater complete = !requestKeys.Contains(requestHash); retry += 1; await Task.Delay(1000); } return // Ill ass some msg leater } await WaitForItToWorkAsync()}主要想法是我通过一些请求向某些服务发送消息并等待到达(我知道我可以使用 rpc,但可以有任何服务并且 rpc 不支持主题),这条路径有效,问题是 requestKeys HashSet 它类中的一个字段private readonly HashSet<byte[]> requestKeys;正如您在每个方法调用中看到的那样,我将 Key 添加到该字段,如果我发出第一个请求,它可以正常工作,但其他请求不会更新此密钥集合,我的意思是在 Action 之外它会更新,但在它之外是一个问题。我能做什么来解决这个问题?
1 回答
Smart猫小萌
TA贡献1911条经验 获得超7个赞
如果您想SendRequest()在收到响应之前阻止完成,您可以使用 aSemaphoreSlim而不是在 a 中添加和删除键HashSet,例如:
public async Task SendRequest()
{
var topic = "SomeTopic";
SemaphoreSlim semaphoreSlim = new SemaphoreSlim(0, 1);
var responseHandler = new Action<ResponseMessage>(response =>
{
//signal that the response has arrived
semaphoreSlim.Release();
});
bus.Subscribe(BusController.ManualRequest, responseHandler, configuration => configuration.WithTopic(BusController.ManualRequest));
bus.Publish(someRequest, topic);
//wait for the response to arrive
await semaphoreSlim.WaitAsync();
semaphoreSlim.Dispose();
}
- 1 回答
- 0 关注
- 91 浏览
添加回答
举报
0/150
提交
取消