为了账号安全,请及时绑定邮箱和手机立即绑定

如何将任务列表转换为 Observable 并在完成后处理元素?

如何将任务列表转换为 Observable 并在完成后处理元素?

C#
德玛西亚99 2021-10-31 20:03:11
给定一个集合Tasks:var americanAirlines = new FlightPriceChecker("AA");...var runningTasks = new List<Task<IList<FlightPrice>>>{    americanAirlines.GetPricesAsync(from, to),    delta.GetPricesAsync(from, to),    united.GetPricesAsync(from, to)};我想GetPricesAsync()以它们到达的任何顺序处理结果。目前,我正在使用 while 循环来实现这一点:while (runningTasks.Any()){    // Wait for any task to finish    var completed = await Task.WhenAny(runningTasks);    // Remove from running list       runningTasks.Remove(completed);    // Process the completed task (updates a property we may be binding to)    UpdateCheapestFlight(completed.Result);}这是一个可以使用 Rx 更优雅地解决的问题吗?我尝试使用类似下面的代码但卡住了,因为我必须在某个地方await每个getFlightPriceTask都会阻塞,然后才执行下一个,而不是执行第一个完成的然后等待下一个:runningTasks  .ToObservable()  .Select(getFlightPriceTask => .???.)
查看完整描述

3 回答

?
翻过高山走不出你

TA贡献1875条经验 获得超3个赞

尝试这个:


runningTasks

  .Select(getFlightPriceTask => getFlightPriceTask.ToObservable())

  .Merge()

  .Subscribe(flightPrices => UpdateCheapestFlight(flightPrices))


查看完整回答
反对 回复 2021-10-31
?
慕尼黑5688855

TA贡献1848条经验 获得超2个赞

(使用Merge()就是诀窍!),我想对此发表评论并提出替代解决方案。


评论 Shlomo 的解决方案


这个解决方案非常简单,表达了 Rx 的优雅。唯一的问题是不能等待完成。这在生产代码中通常不是问题,我们只关心更新然后绑定到 UI 的属性。我的另一个评论是计算是在Subscribe()- 有些人喜欢保持订阅超轻量级的,但我认为这主要是个人喜好。


runningTasks

  // Get all tasks and turn them into Observables.

  .Select(getFlightPriceTask => getFlightPriceTask.ToObservable())

  // Merge all tasks (in my case 3) into one "lane". Think of cars trying

  // to leave a three lane highway and going for a one lane exit.

  .Merge()

  // For every task "leaving the highway" calculate the minimum price.

  .Subscribe(flightPrices => UpdateCheapestFlight(flightPrices))

备选方案 1:使用 Do()


这根本没有使用Subscribe(),这有点违背 Rx 的想法,但它可以等待,因此表现得像原始版本。


await runningTasks

    .Select(getFlightPriceTask => getFlightPriceTask.ToObservable())

    .Merge()

    // Process result of each task.

    .Do(flightPrices => UpdateCheapestFlight(flightPrices))

    // Taking all elements will only complete if all three tasks have completed.

    .Take(runningTasks.Count);

备选方案 2:消除 UpdateCheapestFlight()


最后,我认为做这种更具 Rx 风格的方法是根本不使用原始的辅助方法,而是讲述一个易于阅读的“Rx-story”。


var minFlightPrice = await runningTasks

    // Get all the tasks and turn them into Observables 

    .Select(getFlightPriceTask => getFlightPriceTask.ToObservable())

    // Merge all three into one "lane".

    .Merge()

    // Get local minimum value of each airline

    .Select(x => x.Min())

    // Take all the local minimums...

    .Take(runningTasks.Count)

    // ...and find the minimum of them.

    .Min();


查看完整回答
反对 回复 2021-10-31
?
BIG阳

TA贡献1859条经验 获得超6个赞

这是另一个解决方案:


await runningTasks

    .ToObservable()

    .Merge()

    .Do(result => UpdateCheapestFlight(result))

    .DefaultIfEmpty();

它看起来类似于 Shlomo 的解决方案,但有一个细微的区别:任务不是投影到嵌套的 observable ( IObservable<IObservable<TResult>>),而是投影到任务的 observable ( IObservable<Task<TResult>>)。Rx 包含Merge对这两种结构都起作用的运算符的重载。后者稍微更有效,因为它避免了创建任务的一些一次性包装器。当我们从异步委托而不是已经物化的任务开始时,前者更强大,因为它允许控制并发级别(通过不一次启动所有任务),还因为它可以处理任何挂起任务的自动取消以防结果 observable 因任何原因(包括任何任务中发生的错误)在任何时间取消订阅。


该Do运营商用于处理的任务的结果在他们的完成,一次一个结果的顺序。


最后DefaultIfEmpty需要操作员,以防止InvalidOperationException在初始任务列表为空的情况下发生。这是因为等待生成的 observable,并且等待 observable 需要返回一个值(最后发出的值)。


以下是上述示例中使用的 Rx 运算符的签名:


// Converts an enumerable sequence to an observable sequence.

public static IObservable<TSource> ToObservable<TSource>(

    this IEnumerable<TSource> source);


// Merges results from all source tasks into a single observable sequence.

public static IObservable<TSource> Merge<TSource>(

    this IObservable<Task<TSource>> sources);


// Invokes an action for each element in the observable sequence, and propagates

// all observer messages through the result sequence. This method can be used for

// debugging, logging, etc. of query behavior by intercepting the message stream

// to run arbitrary actions for messages on the pipeline.

public static IObservable<TSource> Do<TSource>(this IObservable<TSource> source,

    Action<TSource> onNext);


// Returns the elements of the specified sequence or the type parameter's default

// value in a singleton sequence if the sequence is empty.

public static IObservable<TSource> DefaultIfEmpty<TSource>(

    this IObservable<TSource> source);


查看完整回答
反对 回复 2021-10-31
  • 3 回答
  • 0 关注
  • 239 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信