2 回答
TA贡献1795条经验 获得超7个赞
由于MessageReceiver.CloseAsync()提到如下:
关闭客户端。关闭由它打开的连接。
按我的测试,称为后MessageReceiver.CloseAsync()
,后续调用CompleteAsync
,DeadLetterAsync
将自实例失败IMessageReceiver
已被释放。如果你仍然想完成你的队列消息,你需要创建一个新的MessageReceiver
.
有什么方法可以通过库本身检查操作是否仍在运行以延迟任务取消?
AFAIK,SDK 目前不提供上述功能。此外,这里还有一个类似的关于正常关闭 Azure 服务总线的消息泵的反馈。
当服务停止时,我想在进程被终止之前给所有操作完成的机会。
对于您的要求,我假设您需要自己实现它以确保即使在 MessageReceiver 关闭后也可以成功处理接收到的队列消息。或者您可以将CancellationToken
参数传递到您的HandleMessage
方法中以显式取消而不是完成检索到的消息。
TA贡献1786条经验 获得超13个赞
另一个答案表明,不幸的是,现在无法实现该功能,即使该功能在库本身中不可用,也很受欢迎。一种替代方法是创建您自己的接收消息泵,但随后您就可以自己进行断开连接、管理等操作,尽管正常关闭本身并不难。使用当前的方法,我设法编写了一种解决方法,即使它很笨拙,它似乎也能正常工作。
private async Task StartReceiveLoop(IMessageReceiver receiver, CancellationToken cancellationToken)
{
int activeMessageHandlersCount = 0;
var doneReceiving = new TaskCompletionSource<bool>();
cancellationToken.Register(() =>
{
lock (receiver)
{
int attemptCount = 0;
while (attemptCount++ < 10 && activeMessageHandlersCount > 0)
{
Thread.Sleep(1000);
}
receiver.CloseAsync();
}
doneReceiving.SetResult(true);
});
receiver.RegisterMessageHandler(
async (message, ct) =>
{
bool canBeProcessed;
lock (receiver)
{
canBeProcessed = !cancellationToken.IsCancellationRequested;
if (canBeProcessed)
{
Interlocked.Increment(ref activeMessageHandlersCount);
}
}
if (canBeProcessed)
{
try
{
await HandleMessage(receiver, message);
}
finally
{
Interlocked.Decrement(ref activeMessageHandlersCount);
}
}
else
{
await Task.Delay(60000); // Otherwise message receiver will keep pumping message during graceful shutdown
}
}, new MessageHandlerOptions(HandleException));
await doneReceiving.Task;
}
另一个缺点是 Receiver 将接收大量消息而不进行处理,并将它们保留到锁定到期。
- 2 回答
- 0 关注
- 183 浏览
添加回答
举报