为了账号安全,请及时绑定邮箱和手机立即绑定

合并两个可观察量时保留排序

合并两个可观察量时保留排序

C#
森林海 2023-07-22 16:19:15
我想合并 2 个可观察值并保持顺序(可能基于选择器)。我还想对可观察的来源施加反压力。因此,选择器会选择其中一个项目通过可观察对象进行推送,而另一个项目也会等待另一个项目进行比较。Src1、Src2 和 Result 都是 类型IObservable<T>。Src1: { 1,3,6,8,9,10 }Src2: { 2,4,5,7,11,12 }Result: 1,2,3,4,5,6,7,8,9,10,11,12Timeline:Src1:    -1---3----6------8----9-10Src2:    --2-----4---5-7----11---------12Result:  --1--2--3-4-5-6--7-8--9-10-11-12在上面的示例中,src1 发出“1”并被阻塞,直到 src2 发出它的第一项“2”。应用一个选择器来选择最小的项目,该选择器从 src1 中选择项目。Src2 现在等待下一个项目(来自 src1)与其当前项目(“2”)进行比较。当 src1 发出下一个项目“3”时,再次运行选择,这次从 src2 中选择该项目。重复此过程,直到其中一个可观察量完成。然后,剩余的 observable 会推送项目直到完成。使用现有的 .net Rx 方法可以实现这一点吗?编辑:请注意,保证 2 个源可观察量是有序的。测试示例:var source1 = new List<int>() { 1, 4, 6, 7, 8, 10, 14 }.AsEnumerable();var source2 = new List<int>() { 2, 3, 5, 9, 11, 12, 13, 15 }.AsEnumerable();var src1 = source1.ToObservable();var src2 = source2.ToObservable();var res = src1.SortedMerge(src2, (a, b) =>    {       if (a <= b)           return a;       else           return b;    });res.Subscribe((x) => Console.Write($"{x}, "));期望结果:1,2,3,4,5,6,7,8,9,10,11,12,13,14,15
查看完整描述

1 回答

?
跃然一笑

TA贡献1826条经验 获得超6个赞

这很有趣。必须稍微调整一下算法。还可以进一步改进。

假设:

  1. 有两个通用类型 的streamA流。streamBT

  2. 两个流分别排序,使得streamA[i] < streamA[i+1]streamB[i] < stream[i+1]

  3. 您不能假设streamA[i]和之间有任何关系streamB[i]

  4. 流 A 和 B 是谨慎的:相同的元素不会从两者中发出。如果发生这种情况,我会扔掉NotImplementedException这个案子很容易处理,但我想避免歧义。

  5. 有一个min类型的函数T

  6. 没有对两条流的相对速度做出任何假设,但如果其中一条始终比另一条快,则背压将成为问题。

这是我使用的算法:

  • 设两个队列,qA并且qB

  • 当您从 获取一个项目时streamA,将其排队到qA

  • 当您从 获取一个项目时streamB,将其排队到qB

  • 当和 qA中都有一个项目时qB,比较两个队列的顶部项目。删除并发出这两者的最小值。如果两个队列仍然非空,则重复。

  • 如果 或streamA完成streamB,则转储队列的内容并终止。注意这无疑是懒惰的,可能应该更改为转储,然后继续返回未完成的 observable

这是代码:

public static IObservable<T> SortedMerge<T>(this IObservable<T> source, IObservable<T> other)

{

    return SortedMerge(source, other, (a, b) => Enumerable.Min(new[] { a, b}));

}


public static IObservable<T> SortedMerge<T>(this IObservable<T> source, IObservable<T> other, Func<T, T, T> min)

{

    return source

        .Select(i => (key: 1, value: i)).Materialize()

        .Merge(other.Select(i => (key: 2, value: i)).Materialize())

        .Scan((qA: ImmutableQueue<T>.Empty, qB: ImmutableQueue<T>.Empty, exception: (Exception)null, outputMessages: new List<T>()), 

            (state, message) =>

        {

            if (message.Kind == NotificationKind.OnNext)

            {

                var key = message.Value.key;

                var value = message.Value.value;

                var qA = state.qA;

                var qB = state.qB;

                if (key == 1)

                    qA = qA.Enqueue(value);

                else

                    qB = qB.Enqueue(value);

                var output = new List<T>();

                while(!qA.IsEmpty && !qB.IsEmpty)

                {

                    var aVal = qA.Peek();

                    var bVal = qB.Peek();

                    var minVal = min(aVal, bVal);

                    if(aVal.Equals(minVal) && bVal.Equals(minVal))

                        throw new NotImplementedException();


                    if(aVal.Equals(minVal))

                    {

                        output.Add(aVal);

                        qA = qA.Dequeue();

                    }

                    else

                    {

                        output.Add(bVal);

                        qB = qB.Dequeue();

                    }

                }

                return (qA, qB, null, output);

            }

            else if (message.Kind == NotificationKind.OnError)

            {

                return (state.qA, state.qB, message.Exception, new List<T>());

            }

            else //message.Kind == NotificationKind.OnCompleted

            {

                var output = state.qA.Concat(state.qB).ToList();

                return (ImmutableQueue<T>.Empty, ImmutableQueue<T>.Empty, null, output);

            }

        })

        .Publish(tuples => Observable.Merge(

            tuples

                .Where(t => t.outputMessages.Any() && (!t.qA.IsEmpty || !t.qB.IsEmpty))

                .SelectMany(t => t.outputMessages

                    .Select(v => Notification.CreateOnNext<T>(v))

                    .ToObservable()

            ),

            tuples

                .Where(t => t.outputMessages.Any() && t.qA.IsEmpty && t.qB.IsEmpty)

                .SelectMany(t => t.outputMessages

                    .Select(v => Notification.CreateOnNext<T>(v))

                    .ToObservable()

                    .Concat(Observable.Return(Notification.CreateOnCompleted<T>()))

            ),

            tuples

                .Where(t => t.exception != null)

                .Select(t => Notification.CreateOnError<T>(t.exception))

        ))

        .Dematerialize();

ImmutableQueue来自System.Collections.Immutable. Scan需要跟踪状态。由于OnCompleted处理需要具体化。诚然,这是一个复杂的解决方案,但我不确定是否有更干净的以 Rx 为中心的方法。


如果您需要进一步说明,请告诉我。


查看完整回答
反对 回复 2023-07-22
  • 1 回答
  • 0 关注
  • 107 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信