为了账号安全,请及时绑定邮箱和手机立即绑定

相当于 Parallel.For 的 localInit 和 localFinally

相当于 Parallel.For 的 localInit 和 localFinally

C#
芜湖不芜 2022-08-20 16:27:04
我有一个有.我还发现,传递给块的构造函数(为每个发布的项目执行)的那个在逻辑上可以分解为一个昂贵的初始化例程和一个改变函数局部变量的主体。如果我可以将函数重构为一个名为 的类,每个并发操作执行一次初始化(就像 的回调一样),然后允许 TPL Dataflow 确保状态不会一次被多个项目改变,那会更有效率。TransformBlock<int, int>MaxDegreeOfParallelism = 6Func<int, int>TransformBlockStateParallel.ForlocalInit重构之前:Func<int, int> original = x => {    // method local variables    // expensive initialization routine to setup locals    // perform action on local variables    // potentially expensive teardown}重构后:public sealed class TransformBlockState<TIn, TOut> : IDisposable{    // instance state    public TransformBlockState()    {        // expensive initialization routine    }    public TOut Transform(TIn value)    {        // called many times but never concurrently for the same instance    }    public void Dispose()    {        // tear down state    }}TPL 数据流库中是否已经存在类似于 (for ) 和 (for ) 回调的内容?localInit.ctorlocalFinallyDispose我想避免有一个(大量不必要的锁定),我想避免将存储在字段中(因为不能保证不会在多个线程上运行(显然按顺序)或在单个线程上运行多个线程(也许在I / O上全部阻塞))。ConcurrentStack<TransformBlockState>TransformBlockState[ThreadStatic]TaskTask
查看完整描述

3 回答

?
白猪掌柜的

TA贡献1893条经验 获得超10个赞

没有等效项 或 。您可以使用块管道创建类似的行为,或者如果这是昂贵的初始化,则可以使用连接池。但是,您可能需要重新考虑您的问题,TPL-Dataflow可能不是最适合的。如果不更多地了解确切的问题来解决它,很难说。但通常,任何一次初始化/每个输入都应在流外部完成并传入。loclaInitlocalFinally


但就像我说的,你可以使用管道来获得类似的东西,尽管它可能不是你真正想要的。Parallel.Foreach


public class DataflowPipeline

{

    private TransformBlock<IEnumerable<int>, IEnumerable<Locals>> Initialize { get; }

    private TransformManyBlock<IEnumerable<Locals>, Locals> Distribute { get; }

    private TransformBlock<Locals, Result> Compute { get; }

    //other blocks, results, disposal etc.



    public DataflowPipeline()

    {

        var sequential = new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 1 };

        var parallel = new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 6 };


        Initialize = new TransformBlock<IEnumerable<int>, IEnumerable<Locals>>(

            inputs => inputs.Select(x => new Locals() { ExpensiveItem = string.Empty, Input = x }),

            sequential);

        Distribute = new TransformManyBlock<IEnumerable<Locals>, Locals>(x => x, sequential);

        Compute = new TransformBlock<Locals, Result>(

            local => new Result() { ExpensiveItem = local.ExpensiveItem, Output = local.Input * 2 },

            parallel);


        //Other blocks, link, complete etc.

    }

}


查看完整回答
反对 回复 2022-08-20
?
慕雪6442864

TA贡献1812条经验 获得超5个赞

如果要有一个有状态块(或),请创建一个函数来创建该块,并将状态放在局部变量中并捕获它们:TransformBlockActionBlock


private IPropagatorBlock<int,int> CreateMyBlock()

{

    var state = 0;

    return new TransformBlock<int,int>( x => x+state++ );

}

这样,编译器将隐式创建类。


查看完整回答
反对 回复 2022-08-20
?
倚天杖

TA贡献1828条经验 获得超3个赞

我想我有一个更好的例子 - 我需要从航空公司获得几千张机票记录(实际上是GDS)。为此,我需要先建立一个开销大的会话,然后才能发送 SOAP 或 REST 请求。会话受到限制,因此我真的不想为每个票证创建一个新的会话。它使每个请求所需的时间加倍,并浪费了金钱和资源。


创建自定义块似乎是解决方案,但实际上并不是那么好。数据流建立处理消息流的处理块的管道。试图使它们以不同的方式工作将与数据流模型的基本假设相冲突。


例如,任务用于并行性、限制和负载平衡 - MaxMessagesPerTask 选项在收到最大消息数后终止任务,以便一个任务不会长时间占用 CPU。为每个任务创建和销毁会话会破坏该机制,并最终创建不必要的会话。



处理这个问题的一种方法是使用一个对象池,其中包含块(在本例中为 Sessions)将使用的“昂贵”对象。令人愤怒的是,Microsoft.Extensions.ObjectPool软件包提供了这样一个池。这些文档不存在,它们被欺骗性地放置在树中,但这是一个独立的.NET Standard 2.0包。Github 源代码看似简单,该类使用 Interlocked.CompareExchange 来避免锁定。甚至还有一个LeakTrackingObjectPool实现。ASP.NET


如果我过去知道这一点,我可以写:


var pool = new DefaultObjectPool<Session>(new DefaultPooledObjectPolicy<Session>());

默认池对象策略仅用于创建新实例。但是,创建一个新策略很容易,例如,使用自己的创建逻辑甚至工厂方法的策略:new


public class SessionPolicy : DefaultPooledObjectPolicy<Session>

{

    public override Session Create()

    {

        //Do whatever is needed here

        return session;

    }

}

重定向


另一种选择是使用多个块实例,并让源块链接到所有这些实例。为了避免将所有消息发送到第一个块,需要有界容量。假设我们有这个工厂方法:


TransformBlock<TIn,TOut> CreateThatBlockWithSession<TIn,TOut>(Settings someSettings)

{

    var session=CreateSomeSessionFrom(someSettings);

    var bounded=new DataflowBlockOptions {BoundedCapacity =1};

    return new TransformBlock<TIn,TOut>(msg=>FunctionThatUses(msg,session),bounded);

}

并用它来创建多个块:


_blocks=Enumerable.Range(0,10)

                  .Select(_=>CreateThatBlockWithSession(settings))

                  .ToArray();

源块可以连接到所有这些块:


foreach(var target in _blocks)

{

    _source.LinkTo(target,options);

}

然后,将所有这些块链接到下一个块。这里棘手的部分是我们不能只是传播完成。如果其中一个块完成,它将强制下一个块完成,即使其他块中有消息等待。


解决方案是使用和propaget完成到下一个块:Task.WhenAllContinueWith


foreach(var target in _blocks)

{

    target.LinkTo(_nextBlock);

}


var allTasks=_blocks.Select(blk=>blk.Completion);

Task.WhenAll(allTasks)

    .ContinueWith(_=>_nextBlock.Complete());

更强大的实现将检查所有任务的状态,如果其中一个任务失败,则调用下一个块IsFaultedFault()


查看完整回答
反对 回复 2022-08-20
  • 3 回答
  • 0 关注
  • 86 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信