为了账号安全,请及时绑定邮箱和手机立即绑定

如何确保在关闭 MessageReciever 期间完成 MessageHandler 委托?

如何确保在关闭 MessageReciever 期间完成 MessageHandler 委托?

C#
繁星点点滴滴 2021-08-22 17:29:03
我有一个服务,它使用MessageReceiverfromMicrosoft.Azure.ServiceBus连续收听 ServiceBus 中的订阅。当服务停止时,我想在进程被终止之前给所有操作完成的机会。这是我根据库提供的示例使用的代码:private async Task StartReceiveLoop(IMessageReceiver receiver, CancellationToken cancellationToken){    var doneReceiving = new TaskCompletionSource<bool>();    cancellationToken.Register(() =>    {        receiver.CloseAsync();        doneReceiving.SetResult(true);    });    receiver.RegisterMessageHandler(        async (message, ct) => await HandleMessage(receiver, message),        new MessageHandlerOptions(HandleException));    await doneReceiving.Task;}在服务停止时,我取消了任务,即使服务HandleMessage仍在运行,它也会立即被终止。有什么方法可以通过库本身检查操作是否仍在运行以延迟任务取消?我可以想到一种通过锁定所有正在运行的任务来进行自己计数的方法,但我希望有一种更好的方法可以让我知道正在运行的处理程序的数量。理想情况下,我想取消注册处理程序,以便消息泵停止,而接收器本身不会关闭以允许例如 CompleteAsync 调用。
查看完整描述

2 回答

?
一只萌萌小番薯

TA贡献1795条经验 获得超7个赞

由于MessageReceiver.CloseAsync()提到如下:

关闭客户端。关闭由它打开的连接。

按我的测试,称为后MessageReceiver.CloseAsync(),后续调用CompleteAsyncDeadLetterAsync将自实例失败IMessageReceiver已被释放。如果你仍然想完成你的队列消息,你需要创建一个新的MessageReceiver.

有什么方法可以通过库本身检查操作是否仍在运行以延迟任务取消?

AFAIK,SDK 目前不提供上述功能。此外,这里还有一个类似的关于正常关闭 Azure 服务总线的消息泵的反馈

当服务停止时,我想在进程被终止之前给所有操作完成的机会。

对于您的要求,我假设您需要自己实现它以确保即使在 MessageReceiver 关闭后也可以成功处理接收到的队列消息。或者您可以将CancellationToken参数传递到您的HandleMessage方法中以显式取消而不是完成检索到的消息。


查看完整回答
反对 回复 2021-08-22
?
开满天机

TA贡献1786条经验 获得超13个赞

另一个答案表明,不幸的是,现在无法实现该功能,即使该功能在库本身中不可用,也很受欢迎。一种替代方法是创建您自己的接收消息泵,但随后您就可以自己进行断开连接、管理等操作,尽管正常关闭本身并不难。使用当前的方法,我设法编写了一种解决方法,即使它很笨拙,它似乎也能正常工作。


private async Task StartReceiveLoop(IMessageReceiver receiver, CancellationToken cancellationToken)

{

    int activeMessageHandlersCount = 0;

    var doneReceiving = new TaskCompletionSource<bool>();


    cancellationToken.Register(() =>

    {

        lock (receiver)

        {

            int attemptCount = 0;

            while (attemptCount++ < 10 && activeMessageHandlersCount > 0)

            {

                Thread.Sleep(1000);

            }


            receiver.CloseAsync();

        }


        doneReceiving.SetResult(true);

    });


    receiver.RegisterMessageHandler(

        async (message, ct) =>

        {

            bool canBeProcessed;

            lock (receiver)

            {

                canBeProcessed = !cancellationToken.IsCancellationRequested;

                if (canBeProcessed)

                {

                    Interlocked.Increment(ref activeMessageHandlersCount);

                }

            }


            if (canBeProcessed)

            {

                try

                {

                    await HandleMessage(receiver, message);

                }

                finally

                {

                    Interlocked.Decrement(ref activeMessageHandlersCount);

                }

            }

            else

            {

                await Task.Delay(60000); // Otherwise message receiver will keep pumping message during graceful shutdown

            }

        }, new MessageHandlerOptions(HandleException));


    await doneReceiving.Task;

}

另一个缺点是 Receiver 将接收大量消息而不进行处理,并将它们保留到锁定到期。


查看完整回答
反对 回复 2021-08-22
  • 2 回答
  • 0 关注
  • 183 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信