3 回答
TA贡献1893条经验 获得超10个赞
没有等效项 或 。您可以使用块管道创建类似的行为,或者如果这是昂贵的初始化,则可以使用连接池。但是,您可能需要重新考虑您的问题,TPL-Dataflow可能不是最适合的。如果不更多地了解确切的问题来解决它,很难说。但通常,任何一次初始化/每个输入都应在流外部完成并传入。loclaInitlocalFinally
但就像我说的,你可以使用管道来获得类似的东西,尽管它可能不是你真正想要的。Parallel.Foreach
public class DataflowPipeline
{
private TransformBlock<IEnumerable<int>, IEnumerable<Locals>> Initialize { get; }
private TransformManyBlock<IEnumerable<Locals>, Locals> Distribute { get; }
private TransformBlock<Locals, Result> Compute { get; }
//other blocks, results, disposal etc.
public DataflowPipeline()
{
var sequential = new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 1 };
var parallel = new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 6 };
Initialize = new TransformBlock<IEnumerable<int>, IEnumerable<Locals>>(
inputs => inputs.Select(x => new Locals() { ExpensiveItem = string.Empty, Input = x }),
sequential);
Distribute = new TransformManyBlock<IEnumerable<Locals>, Locals>(x => x, sequential);
Compute = new TransformBlock<Locals, Result>(
local => new Result() { ExpensiveItem = local.ExpensiveItem, Output = local.Input * 2 },
parallel);
//Other blocks, link, complete etc.
}
}
TA贡献1812条经验 获得超5个赞
如果要有一个有状态块(或),请创建一个函数来创建该块,并将状态放在局部变量中并捕获它们:TransformBlockActionBlock
private IPropagatorBlock<int,int> CreateMyBlock()
{
var state = 0;
return new TransformBlock<int,int>( x => x+state++ );
}
这样,编译器将隐式创建类。
TA贡献1828条经验 获得超3个赞
我想我有一个更好的例子 - 我需要从航空公司获得几千张机票记录(实际上是GDS)。为此,我需要先建立一个开销大的会话,然后才能发送 SOAP 或 REST 请求。会话受到限制,因此我真的不想为每个票证创建一个新的会话。它使每个请求所需的时间加倍,并浪费了金钱和资源。
创建自定义块似乎是解决方案,但实际上并不是那么好。数据流建立处理消息流的处理块的管道。试图使它们以不同的方式工作将与数据流模型的基本假设相冲突。
例如,任务用于并行性、限制和负载平衡 - MaxMessagesPerTask 选项在收到最大消息数后终止任务,以便一个任务不会长时间占用 CPU。为每个任务创建和销毁会话会破坏该机制,并最终创建不必要的会话。
池
处理这个问题的一种方法是使用一个对象池,其中包含块(在本例中为 Sessions)将使用的“昂贵”对象。令人愤怒的是,Microsoft.Extensions.ObjectPool软件包提供了这样一个池。这些文档不存在,它们被欺骗性地放置在树中,但这是一个独立的.NET Standard 2.0包。Github 源代码看似简单,该类使用 Interlocked.CompareExchange 来避免锁定。甚至还有一个LeakTrackingObjectPool实现。ASP.NET
如果我过去知道这一点,我可以写:
var pool = new DefaultObjectPool<Session>(new DefaultPooledObjectPolicy<Session>());
默认池对象策略仅用于创建新实例。但是,创建一个新策略很容易,例如,使用自己的创建逻辑甚至工厂方法的策略:new
public class SessionPolicy : DefaultPooledObjectPolicy<Session>
{
public override Session Create()
{
//Do whatever is needed here
return session;
}
}
重定向
另一种选择是使用多个块实例,并让源块链接到所有这些实例。为了避免将所有消息发送到第一个块,需要有界容量。假设我们有这个工厂方法:
TransformBlock<TIn,TOut> CreateThatBlockWithSession<TIn,TOut>(Settings someSettings)
{
var session=CreateSomeSessionFrom(someSettings);
var bounded=new DataflowBlockOptions {BoundedCapacity =1};
return new TransformBlock<TIn,TOut>(msg=>FunctionThatUses(msg,session),bounded);
}
并用它来创建多个块:
_blocks=Enumerable.Range(0,10)
.Select(_=>CreateThatBlockWithSession(settings))
.ToArray();
源块可以连接到所有这些块:
foreach(var target in _blocks)
{
_source.LinkTo(target,options);
}
然后,将所有这些块链接到下一个块。这里棘手的部分是我们不能只是传播完成。如果其中一个块完成,它将强制下一个块完成,即使其他块中有消息等待。
解决方案是使用和propaget完成到下一个块:Task.WhenAllContinueWith
foreach(var target in _blocks)
{
target.LinkTo(_nextBlock);
}
var allTasks=_blocks.Select(blk=>blk.Completion);
Task.WhenAll(allTasks)
.ContinueWith(_=>_nextBlock.Complete());
更强大的实现将检查所有任务的状态,如果其中一个任务失败,则调用下一个块IsFaultedFault()
- 3 回答
- 0 关注
- 86 浏览
添加回答
举报