为了账号安全,请及时绑定邮箱和手机立即绑定

C# 并发:使用多个 AutoResetEvent 是个好主意吗?

C# 并发:使用多个 AutoResetEvent 是个好主意吗?

C#
汪汪一只猫 2022-12-24 14:28:33
假设有很多线程调用Do(),只有一个工作线程处理实际工作。void Do(Job job){    concurrentQueue.Enqueue(job);    // wait for job done}void workerThread(){    while (true)    {        Job job;        if (concurrentQueue.TryDequeue(out job))        {            // do job        }    }}Do() 应该等到作业完成后再返回。所以我写了下面的代码:class Task {    public Job job;    public AutoResetEvent ev;}void Do(Job job){    using (var ev = new AutoResetEvent(false))    {        concurrentQueue.Enqueue(new Task { job = job, ev = ev }));        ev.WaitOne();    }}void workerThread(){    while (true)    {        Task task;        if (concurrentQueue.TryDequeue(out task))        {            // do job            task.ev.Set();        }    }}经过一些测试,我发现它按预期工作。但是我不确定这是分配许多 AutoResetEvents 的好方法,还是有更好的方法来完成?
查看完整描述

3 回答

?
ABOUTYOU

TA贡献1812条经验 获得超5个赞

由于所有客户端都必须等待一个线程来完成这项工作,因此实际上没有必要使用队列。所以我建议改用这个Monitor类,特别是Wait/Pulse功能。虽然它有点低级和冗长。

class Worker<TResult> : IDisposable

{

    private readonly object _outerLock = new object();

    private readonly object _innerLock = new object();

    private Func<TResult> _currentJob;

    private TResult _currentResult;

    private Exception _currentException;

    private bool _disposed;


    public Worker()

    {

        var thread = new Thread(MainLoop);

        thread.IsBackground = true;

        thread.Start();

    }


    private void MainLoop()

    {

        lock (_innerLock)

        {

            while (true)

            {

                Monitor.Wait(_innerLock); // Wait for client requests

                if (_disposed) break;

                try

                {

                    _currentResult = _currentJob.Invoke();

                    _currentException = null;

                }

                catch (Exception ex)

                {

                    _currentException = ex;

                    _currentResult = default;

                }

                Monitor.Pulse(_innerLock); // Notify the waiting client that the job is done

            }

        } // We are done

    }


    public TResult DoWork(Func<TResult> job)

    {

        TResult result;

        Exception exception;

        lock (_outerLock) // Accept only one client at a time

        {

            lock (_innerLock) // Acquire inner lock

            {

                if (_disposed) throw new InvalidOperationException();

                _currentJob = job;

                Monitor.Pulse(_innerLock); // Notify worker thread about the new job

                Monitor.Wait(_innerLock); // Wait for worker thread to process the job

                result = _currentResult;

                exception = _currentException;

                // Clean up

                _currentJob = null;

                _currentResult = default;

                _currentException = null;

            }

        }

        // Throw the exception, if occurred, preserving the stack trace

        if (exception != null) ExceptionDispatchInfo.Capture(exception).Throw();

        return result;

    }


    public void Dispose()

    {

        lock (_outerLock)

        {

            lock (_innerLock)

            {

                _disposed = true;

                Monitor.Pulse(_innerLock); // Notify worker thread to exit loop

            }

        }

    }

}

使用示例:


var worker = new Worker<int>();

int result = worker.DoWork(() => 1); // Accepts a function as argument

Console.WriteLine($"Result: {result}");

worker.Dispose();

输出:


Result: 1

更新:之前的解决方案对等待不友好,所以这里有一个允许适当等待的解决方案。它TaskCompletionSource为每个作业使用一个,存储在一个BlockingCollection.


class Worker<TResult> : IDisposable

{

    private BlockingCollection<TaskCompletionSource<TResult>> _blockingCollection

        = new BlockingCollection<TaskCompletionSource<TResult>>();


    public Worker()

    {

        var thread = new Thread(MainLoop);

        thread.IsBackground = true;

        thread.Start();

    }


    private void MainLoop()

    {

        foreach (var tcs in _blockingCollection.GetConsumingEnumerable())

        {

            var job = (Func<TResult>)tcs.Task.AsyncState;

            try

            {

                var result = job.Invoke();

                tcs.SetResult(result);

            }

            catch (Exception ex)

            {

                tcs.TrySetException(ex);

            }

        }

    }


    public Task<TResult> DoWorkAsync(Func<TResult> job)

    {

        var tcs = new TaskCompletionSource<TResult>(job,

            TaskCreationOptions.RunContinuationsAsynchronously);

        _blockingCollection.Add(tcs);

        return tcs.Task;

    }


    public TResult DoWork(Func<TResult> job) // Synchronous call

    {

        var task = DoWorkAsync(job);

        try { task.Wait(); } catch { } // Swallow the AggregateException

        // Throw the original exception, if occurred, preserving the stack trace

        if (task.IsFaulted) ExceptionDispatchInfo.Capture(task.Exception.InnerException).Throw();

        return task.Result;

    }


    public void Dispose()

    {

        _blockingCollection.CompleteAdding();

    }

}

使用示例


var worker = new Worker<int>();

int result = await worker.DoWorkAsync(() => 1); // Accepts a function as argument

Console.WriteLine($"Result: {result}");

worker.Dispose();

输出:


Result: 1


查看完整回答
反对 回复 2022-12-24
?
Smart猫小萌

TA贡献1911条经验 获得超7个赞

从同步的角度来看,这工作正常。

但是这样做似乎没有用。如果你想一个接一个地执行作业,你可以使用锁:

lock (lockObject) {
  RunJob();
}

您对这段代码的意图是什么?

还有一个效率问题,因为每个任务都会创建一个操作系统事件并等待它。如果您使用更现代TaskCompletionSource的,如果您同步等待该任务,这将在引擎盖下使用相同的东西。您可以使用异步等待 ( await myTCS.Task;) 来稍微提高效率。当然,这会用 async/await 感染整个调用堆栈。如果这是一个相当低的交易量操作,您将不会获得太多收益。


查看完整回答
反对 回复 2022-12-24
?
吃鸡游戏

TA贡献1829条经验 获得超7个赞

总的来说,我认为可行,尽管当您说“许多”线程正在调用 Do() 时,这可能无法很好地扩展……挂起的线程使用资源。

这段代码的另一个问题是,在空闲时间,您将在“workerThread”中出现“硬循环”,这将导致您的应用程序返回高 CPU 使用率时间。您可能希望将此代码添加到“workerThread”:

if (concurrentQueue.IsEmpty) Thread.Sleep(1);

您可能还想为 WaitOne 调用引入超时以避免日志堵塞。


查看完整回答
反对 回复 2022-12-24
  • 3 回答
  • 0 关注
  • 112 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信