为了账号安全,请及时绑定邮箱和手机立即绑定

如何在基于 db 的多线程通知/电子邮件发件人中减少 CPU 使用率

如何在基于 db 的多线程通知/电子邮件发件人中减少 CPU 使用率

C#
慕妹3242003 2022-06-12 15:29:11
我正在尝试开发一个 Windows 服务来向订阅发送通知。数据保存在 SQL 服务器数据库中。通知是通过向 REST API 端点发出 Web POST 请求来创建的,并保存在数据库表中。该服务启动一个任务,该任务不断从该数据库表中读取通知并将它们添加到队列中。该服务还启动了一些任务,这些任务不断从队列中读取并执行实际的发送过程。代码运行良好并完成了所需的工作,但问题是运行服务时 CPU 使用率为 100%。我尝试使用 Thread.Sleep 或 Task.Delay 但都没有帮助我减少 CPU 使用率。我已在此 codeprojct页面中读到,我需要使用等待处理程序并且应该在某些条件下等待。我无法正常工作。那么谁能建议我可以做些什么来减少EnqueueTask和的CPU使用率DequeueTask?这是发件人代码:static class NotificationSender{    static ConcurrentQueue<NotificationDelivery> deliveryQueue = null;    static Task enqueueTask = null;    static Task[] dequeueTasks = null;    public static void StartSending(ServiceState serviceState)    {        PushService.InitServices();        enqueueTask = Task.Factory.StartNew(EnqueueTask, serviceState);        deliveryQueue = new ConcurrentQueue<NotificationDelivery>();        int dequeueTasksCount = 10;        dequeueTasks = new Task[dequeueTasksCount];        for (int i = 0; i < dequeueTasksCount; i++)        {            dequeueTasks[i] = Task.Factory.StartNew(DequeueTask, serviceState);        }    }    public static void EnqueueTask(object state)    {        ServiceState serviceState = (ServiceState)state;        using (DSTeckWebPushNotificationsContext db = new DSTeckWebPushNotificationsContext())        {            while (!serviceState.CancellationTokenSource.Token.IsCancellationRequested)            {                int toEnqueue = 100 - deliveryQueue.Count;
查看完整描述

1 回答

?
绝地无双

TA贡献1946条经验 获得超4个赞

我想我终于可以使用等待处理程序和计时器进行良好的更改以维持 CPU 使用率。


EnqueueTask如果没有获取到通知,将等待 5 秒,然后再尝试从通知表中获取数据。如果没有获取到通知,它将启动计时器并重置等待句柄。然后计时器经过的回调将设置等待句柄。


现在也DequeueTask正在使用等待句柄。如果队列中没有更多项目,它将重置等待句柄以停止出队空队列。EnqueueTask将项目添加到队列时将设置此等待句柄。


CPU 使用率现在 <= 10%


这是更新的NotificationSender代码:


static class NotificationSender

{

    static ConcurrentQueue<NotificationDelivery> deliveryQueue = null;

    static Task enqueueTask = null;

    static Task[] dequeueTasks = null;


    static ManualResetEvent enqueueSignal = null;

    static ManualResetEvent dequeueSignal = null;


    static System.Timers.Timer enqueueTimer = null;


    public static void StartSending(CancellationToken token)

    {

        PushService.InitServices();


        using (DSTeckWebPushNotificationsContext db = new DSTeckWebPushNotificationsContext())

        {

            NotificationDelivery[] queuedDeliveries = db.NotificationDeliveries

                        .Where(nd => nd.Status == NotificationDeliveryStatus.Queued)

                        .ToArray();


            foreach (NotificationDelivery delivery in queuedDeliveries)

            {

                delivery.Status = NotificationDeliveryStatus.Pending;

            }


            db.SaveChanges();

        }


        enqueueSignal = new ManualResetEvent(true);

        dequeueSignal = new ManualResetEvent(false);


        enqueueTimer = new System.Timers.Timer();

        enqueueTimer.Elapsed += EnqueueTimerCallback;

        enqueueTimer.Interval = 5000;

        enqueueTimer.AutoReset = false;

        enqueueTimer.Stop();


        enqueueTask = new Task(EnqueueTask, token, TaskCreationOptions.LongRunning);

        enqueueTask.Start();


        deliveryQueue = new ConcurrentQueue<NotificationDelivery>();


        int dequeueTasksCount = 10;

        dequeueTasks = new Task[dequeueTasksCount];

        for (int i = 0; i < dequeueTasksCount; i++)

        {

            dequeueTasks[i] = new Task(DequeueTask, token, TaskCreationOptions.LongRunning);

            dequeueTasks[i].Start();

        }

    }


    public static void EnqueueTimerCallback(Object source, ElapsedEventArgs e)

    {

        enqueueSignal.Set();

        enqueueTimer.Stop();

    }


    public static void EnqueueTask(object state)

    {

        CancellationToken token = (CancellationToken)state;


        using (DSTeckWebPushNotificationsContext db = new DSTeckWebPushNotificationsContext())

        {

            while (!token.IsCancellationRequested)

            {

                if (enqueueSignal.WaitOne())

                {

                    int toEnqueue = 100 - deliveryQueue.Count;


                    if (toEnqueue > 0)

                    {

                        // fetch some records from db to be enqueued

                        NotificationDelivery[] deliveries = db.NotificationDeliveries

                            .Include("Subscription")

                            .Include("Notification")

                            .Include("Notification.NotificationLanguages")

                            .Include("Notification.NotificationLanguages.Language")

                            .Where(nd => nd.Status == NotificationDeliveryStatus.Pending && DateTime.Now >= nd.StartSendingAt)

                            .OrderBy(nd => nd.StartSendingAt)

                            .Take(toEnqueue)

                            .ToArray();


                        foreach (NotificationDelivery delivery in deliveries)

                        {

                            delivery.Status = NotificationDeliveryStatus.Queued;

                            deliveryQueue.Enqueue(delivery);

                        }


                        if (deliveries.Length > 0)

                        {

                            // save Queued state, so not fetched again the next loop

                            db.SaveChanges();


                            // signal the DequeueTask

                            dequeueSignal.Set();

                        }

                        else

                        {

                            // no more notifications, wait 5 seconds before try fetching again

                            enqueueSignal.Reset();

                            enqueueTimer.Start();

                        }

                    }


                    // save any changes made by the DequeueTask

                    // an event may be used here to know if any changes made

                    db.SaveChanges();

                }

            }


            Task.WaitAll(dequeueTasks);

            db.SaveChanges();

        }

    }


    public async static void DequeueTask(object state)

    {

        CancellationToken token = (CancellationToken)state;


        while (!token.IsCancellationRequested)

        {

            if (dequeueSignal.WaitOne()) // block untill we have items in the queue

            {

                NotificationDelivery delivery = null;


                if (deliveryQueue.TryDequeue(out delivery))

                {

                    NotificationDeliveryStatus ns = NotificationDeliveryStatus.Pending;

                    if (delivery.Subscription.Status == SubscriptionStatus.Subscribed)

                    {

                        PushResult result = await PushService.DoPushAsync(delivery);


                        switch (result)

                        {

                            case PushResult.Pushed:

                                ns = NotificationDeliveryStatus.Delivered;

                                break;

                            case PushResult.Error:

                                ns = NotificationDeliveryStatus.FailureError;

                                break;

                            case PushResult.NotSupported:

                                ns = NotificationDeliveryStatus.FailureNotSupported;

                                break;

                            case PushResult.UnSubscribed:

                                ns = NotificationDeliveryStatus.FailureUnSubscribed;

                                delivery.Subscription.Status = SubscriptionStatus.UnSubscribed;

                                break;

                        }

                    }

                    else

                    {

                        ns = NotificationDeliveryStatus.FailureUnSubscribed;

                    }


                    delivery.Status = ns;

                    delivery.DeliveredAt = DateTime.Now;

                }

                else

                {

                    // empty queue, no more items

                    // stop dequeueing untill new items added by EnqueueTask

                    dequeueSignal.Reset();

                }

            }

        }

    }


    public static void Wait()

    {

        Task.WaitAll(enqueueTask);

        Task.WaitAll(dequeueTasks);


        enqueueTask.Dispose();

        for(int i = 0; i < dequeueTasks.Length; i++)

        {

            dequeueTasks[i].Dispose();

        }

    }

}



查看完整回答
反对 回复 2022-06-12
  • 1 回答
  • 0 关注
  • 93 浏览

添加回答

举报

0/150
提交
取消
微信客服

购课补贴
联系客服咨询优惠详情

帮助反馈 APP下载

慕课网APP
您的移动学习伙伴

公众号

扫描二维码
关注慕课网微信公众号