3 回答
TA贡献1875条经验 获得超3个赞
尝试这个:
runningTasks
.Select(getFlightPriceTask => getFlightPriceTask.ToObservable())
.Merge()
.Subscribe(flightPrices => UpdateCheapestFlight(flightPrices))
TA贡献1848条经验 获得超2个赞
(使用Merge()就是诀窍!),我想对此发表评论并提出替代解决方案。
评论 Shlomo 的解决方案
这个解决方案非常简单,表达了 Rx 的优雅。唯一的问题是不能等待完成。这在生产代码中通常不是问题,我们只关心更新然后绑定到 UI 的属性。我的另一个评论是计算是在Subscribe()- 有些人喜欢保持订阅超轻量级的,但我认为这主要是个人喜好。
runningTasks
// Get all tasks and turn them into Observables.
.Select(getFlightPriceTask => getFlightPriceTask.ToObservable())
// Merge all tasks (in my case 3) into one "lane". Think of cars trying
// to leave a three lane highway and going for a one lane exit.
.Merge()
// For every task "leaving the highway" calculate the minimum price.
.Subscribe(flightPrices => UpdateCheapestFlight(flightPrices))
备选方案 1:使用 Do()
这根本没有使用Subscribe(),这有点违背 Rx 的想法,但它可以等待,因此表现得像原始版本。
await runningTasks
.Select(getFlightPriceTask => getFlightPriceTask.ToObservable())
.Merge()
// Process result of each task.
.Do(flightPrices => UpdateCheapestFlight(flightPrices))
// Taking all elements will only complete if all three tasks have completed.
.Take(runningTasks.Count);
备选方案 2:消除 UpdateCheapestFlight()
最后,我认为做这种更具 Rx 风格的方法是根本不使用原始的辅助方法,而是讲述一个易于阅读的“Rx-story”。
var minFlightPrice = await runningTasks
// Get all the tasks and turn them into Observables
.Select(getFlightPriceTask => getFlightPriceTask.ToObservable())
// Merge all three into one "lane".
.Merge()
// Get local minimum value of each airline
.Select(x => x.Min())
// Take all the local minimums...
.Take(runningTasks.Count)
// ...and find the minimum of them.
.Min();
TA贡献1859条经验 获得超6个赞
这是另一个解决方案:
await runningTasks
.ToObservable()
.Merge()
.Do(result => UpdateCheapestFlight(result))
.DefaultIfEmpty();
它看起来类似于 Shlomo 的解决方案,但有一个细微的区别:任务不是投影到嵌套的 observable ( IObservable<IObservable<TResult>>),而是投影到任务的 observable ( IObservable<Task<TResult>>)。Rx 包含Merge对这两种结构都起作用的运算符的重载。后者稍微更有效,因为它避免了创建任务的一些一次性包装器。当我们从异步委托而不是已经物化的任务开始时,前者更强大,因为它允许控制并发级别(通过不一次启动所有任务),还因为它可以处理任何挂起任务的自动取消以防结果 observable 因任何原因(包括任何任务中发生的错误)在任何时间取消订阅。
该Do运营商用于处理的任务的结果在他们的完成,一次一个结果的顺序。
最后DefaultIfEmpty需要操作员,以防止InvalidOperationException在初始任务列表为空的情况下发生。这是因为等待生成的 observable,并且等待 observable 需要返回一个值(最后发出的值)。
以下是上述示例中使用的 Rx 运算符的签名:
// Converts an enumerable sequence to an observable sequence.
public static IObservable<TSource> ToObservable<TSource>(
this IEnumerable<TSource> source);
// Merges results from all source tasks into a single observable sequence.
public static IObservable<TSource> Merge<TSource>(
this IObservable<Task<TSource>> sources);
// Invokes an action for each element in the observable sequence, and propagates
// all observer messages through the result sequence. This method can be used for
// debugging, logging, etc. of query behavior by intercepting the message stream
// to run arbitrary actions for messages on the pipeline.
public static IObservable<TSource> Do<TSource>(this IObservable<TSource> source,
Action<TSource> onNext);
// Returns the elements of the specified sequence or the type parameter's default
// value in a singleton sequence if the sequence is empty.
public static IObservable<TSource> DefaultIfEmpty<TSource>(
this IObservable<TSource> source);
- 3 回答
- 0 关注
- 239 浏览
添加回答
举报