为了账号安全,请及时绑定邮箱和手机立即绑定

反应式服务。分组和缓存流

反应式服务。分组和缓存流

C#
红糖糍粑 2023-09-16 16:05:33
新增内容:带有测试的完整源代码现在位于https://github.com/bboyle1234/ReactiveTest假设我们有一个视图状态对象,可以通过小的部分视图更改事件进行更新。以下是总视图、增量视图更新事件和Update构建总视图的累加器函数的一些示例模型:interface IDeviceView : ICloneable {    Guid DeviceId { get; }}class DeviceTotalView : IDeviceView {    public Guid DeviceId { get; set; }    public int Voltage { get; set; }    public int Currents { get; set; }    public object Clone() => this.MemberwiseClone();}class DeviceVoltagesUpdateView : IDeviceView {    public Guid DeviceId { get; set; }    public int Voltage { get; set; }    public object Clone() => this.MemberwiseClone();}class DeviceCurrentsUpdateView : IDeviceView {    public Guid DeviceId { get; set; }    public int Current { get; set; }    public object Clone() => this.MemberwiseClone();}class DeviceUpdateEvent {    public DeviceTotalView View;    public IDeviceView LastUpdate;}static DeviceUpdateEvent Update(DeviceUpdateEvent previousUpdate, IDeviceView update) {    if (update.DeviceId != previousUpdate.View.DeviceId) throw new InvalidOperationException("Device ids do not match (numskull exception).");    var view = (DeviceTotalView)previousUpdate.View.Clone();    switch (update) {        case DeviceVoltagesUpdateView x: {            view.Voltage = x.Voltage;            break;        }        case DeviceCurrentsUpdateView x: {            view.Currents = x.Current;            break;        }    }    return new DeviceUpdateEvent { View = view, LastUpdate = update };}接下来,假设我们已经有一个可注入服务,能够为所有设备生成小更新事件的可观察流,并且我们希望创建一个可以为各个设备生成聚合视图流的服务。这是我们要创建的服务的接口:interface IDeviceService {    /// <summary>    /// Gets an observable that produces aggregated update events for the device with the given deviceId.    /// On subscription, the most recent event is immediately pushed to the subscriber.    /// There can be multiple subscribers.    /// </summary>    IObservable<DeviceUpdateEvent> GetDeviceStream(Guid deviceId);}如何使用System.Reactive v4库中的反应式扩展(目标)来实现此接口及其要求.netstandard2.0?这是我的带有注释的锅炉代码,这是我所能得到的。
查看完整描述

2 回答

?
杨__羊羊

TA贡献1943条经验 获得超7个赞

接受的答案中给出的实现虽然对我来说是一次神奇的教育,但也有一些问题需要依次解决。第一个是线程竞争问题,第二个是系统中存在大量设备时的性能问题。我最终解决了线程竞争并通过这个修改后的实现显着提高了性能:


在构造函数中,分组和扫描的设备流直接订阅到 a BehaviorSubject,它实现了Replay(1).RefCount()立即通知新订阅者流中最新值所需的功能。


在该方法中,我们继续使用字典查找来查找设备流,如果字典中尚不存在则GetDeviceStream创建预加载。BehaviorSubject我们已经删除了Where上述问题中先前实现中存在的搜索。使用 where 搜索会导致线程竞争问题,该问题通过使分组流可重播得以解决。这导致了指数性能问题。替换它,将FirstOrDefault花费的时间减少一半,然后完全删除它以支持GetCreate字典技术,从而获得完美的性能 O(1) 而不是 O(n2)。


GetCreateSubject使用Lazy代理对象作为字典值,因为有时可以针对单个键多次ConcurrentDictionary调用该方法。向字典Create提供 a可确保仅在其中一个懒惰者上调用该属性,因此每个设备只创建一个。LazyValueBehaviorSubject


class DeviceService : IDeviceService, IDisposable {


    readonly CompositeDisposable _disposable = new CompositeDisposable();

    readonly ConcurrentDictionary<Guid, Lazy<BehaviorSubject<DeviceUpdateEvent>>> _streams = new ConcurrentDictionary<Guid, Lazy<BehaviorSubject<DeviceUpdateEvent>>>();

    BehaviorSubject<DeviceUpdateEvent> GetCreateSubject(Guid deviceId) {

        return _streams.GetOrAdd(deviceId, Create).Value;

        Lazy<BehaviorSubject<DeviceUpdateEvent>> Create(Guid id) {

            return new Lazy<BehaviorSubject<DeviceUpdateEvent>>(() => {

                var subject = new BehaviorSubject<DeviceUpdateEvent>(DeviceUpdateEvent.GetInitialView(deviceId));

                _disposable.Add(subject);

                return subject;

            });

        }

    }


    public DeviceService(IConnectableObservable<IDeviceView> source) {

        _disposable.Add(source

            .GroupBy(x => x.DeviceId)

            .Subscribe(deviceStream => {

                _disposable.Add(deviceStream

                    .Scan(DeviceUpdateEvent.GetInitialView(deviceStream.Key), DeviceUtils.Update)

                    .Subscribe(GetCreateSubject(deviceStream.Key)));

            }));

        _disposable.Add(source.Connect());

    }


    public void Dispose() {

        _disposable.Dispose();

    }


    public IObservable<DeviceUpdateEvent> GetDeviceStream(Guid deviceId) {

        return GetCreateSubject(deviceId).AsObservable();

    }

}


[TestMethod]

public async Task Test2() {

    var input = new AsyncProducerConsumerQueue<IDeviceView>();

    var source = new ConnectableObservableForAsyncProducerConsumerQueue<IDeviceView>(input);

    var service = new DeviceService(source);


    var ids = Enumerable.Range(0, 100000).Select(i => Guid.NewGuid()).ToArray();

    var idsRemaining = ids.ToHashSet();

    var t1 = Task.Run(async () => {

        foreach (var id in ids) {

            await input.EnqueueAsync(new DeviceVoltagesUpdateView { DeviceId = id, Voltage = 1 });

        }

    });

    var t2 = Task.Run(() => {

        foreach (var id in ids) {

            service.GetDeviceStream(id).Subscribe(x => idsRemaining.Remove(x.View.DeviceId));

        }

    });

    await Task.WhenAll(t1, t2);

    var sw = Stopwatch.StartNew();

    while (idsRemaining.Count > 0) {

        if (sw.Elapsed.TotalSeconds > 600) throw new Exception("Failed");

        await Task.Delay(100);

    }

}

在这里查看整个问题源代码和测试代码: https: //github.com/bboyle1234/ReactiveTest


查看完整回答
反对 回复 2023-09-16
?
九州编程

TA贡献1785条经验 获得超4个赞

你的要点中有一些奇怪的代码。这是我的工作内容:


public class DeviceService : IDeviceService, IDisposable

{


    readonly IObservable<IDeviceView> Source;

    private readonly Dictionary<Guid, IObservable<DeviceUpdateEvent>> _updateStreams = new Dictionary<Guid, IObservable<DeviceUpdateEvent>>();

    private readonly IObservable<(Guid, IObservable<DeviceUpdateEvent>)> _groupedStream;

    private readonly CompositeDisposable _disposable = new CompositeDisposable();


    public DeviceService(IObservable<IDeviceView> source)

    {

        Source = source;


        _groupedStream = source

            .GroupBy(v => v.DeviceId)

            .Select(o => (o.Key, o

                .Scan(new DeviceUpdateEvent { View = DeviceTotalView.GetInitialView(o.Key), LastUpdate = null }, (lastTotalView, newView) => lastTotalView.Update(newView))

                .Replay(1)

                .RefCount()

            ));


        var groupSubscription = _groupedStream.Subscribe(t =>

        {

            _updateStreams[t.Item1] = t.Item2;

            _disposable.Add(t.Item2.Subscribe());

        });

        _disposable.Add(groupSubscription);

    }


    public void Dispose()

    {

        _disposable.Dispose();

    }


    public IObservable<DeviceUpdateEvent> GetDeviceStream(Guid deviceId)

    {

        /// How do we implement this? The observable that we return must be pre-loaded with the latest update

        if(this._updateStreams.ContainsKey(deviceId))

            return this._updateStreams[deviceId];

        return _groupedStream

            .Where(t => t.Item1 == deviceId)

            .Select(t => t.Item2)

            .Switch();



    }

}

这里的肉就是_groupedStream一块。正如您所说,您按 DeviceId 进行分组,然后用于Scan更新状态。我还转移Update到静态类并使其成为扩展方法。你需要一个初始状态,所以我修改了你的DeviceTotalView类来获得它。相应修改:


public class DeviceTotalView : IDeviceView

{

    public Guid DeviceId { get; set; }

    public int Voltage { get; set; }

    public int Currents { get; set; }

    public object Clone() => this.MemberwiseClone();

    public static DeviceTotalView GetInitialView(Guid deviceId)

    {

        return new DeviceTotalView

        {

            DeviceId = deviceId,

            Voltage = 0,

            Currents = 0

        };

    }

}

接下来,它.Replay(1).Refcount()会记住最新的更新,然后在订阅时提供该更新。然后,我们将所有这些子可观察量填充到字典中,以便在方法调用时轻松检索。虚拟订阅 ( _disposable.Add(t.Item2.Subscribe())) 是Replay正常工作所必需的。


如果早期请求尚未更新的 DeviceId,我们会订阅该设备,它将_groupedStream等待第一次更新,生成该 Id 的可观察值,然后.Switch订阅该子可观察值。


然而,所有这些都在你的测试代码中失败了,我猜是因为这个ConnectableObservableForAsyncProducerConsumerQueue类。我不想调试它,因为我不建议这样做。一般来说,不建议混合 TPL 和 Rx 代码。他们解决的问题大部分是重叠的,而且互相妨碍。因此,我修改了您的测试代码,用重播主题替换了可连接的可观察队列。


我还添加了早期请求的测试用例(在该设备的更新到达之前):


DeviceUpdateEvent deviceView1 = null;

DeviceUpdateEvent deviceView2 = null;

DeviceUpdateEvent deviceView3 = null;


var subject = new ReplaySubject<IDeviceView>();


var id1 = Guid.NewGuid();

var id2 = Guid.NewGuid();

var id3 = Guid.NewGuid();


subject.OnNext(new DeviceVoltagesUpdateView { DeviceId = id1, Voltage = 1 });

subject.OnNext(new DeviceVoltagesUpdateView { DeviceId = id1, Voltage = 2 });

subject.OnNext(new DeviceVoltagesUpdateView { DeviceId = id2, Voltage = 100 });


var service = new DeviceService(subject);


service.GetDeviceStream(id1).Subscribe(x => deviceView1 = x);

service.GetDeviceStream(id2).Subscribe(x => deviceView2 = x);

service.GetDeviceStream(id3).Subscribe(x => deviceView3 = x);


/// I believe there is no need to pause here because the Subscribe method calls above 

/// block until the events have all been pushed into the subscribers above.


Assert.AreEqual(deviceView1.View.DeviceId, id1);

Assert.AreEqual(deviceView2.View.DeviceId, id2);

Assert.AreEqual(deviceView1.View.Voltage, 2);

Assert.AreEqual(deviceView2.View.Voltage, 100);

Assert.IsNull(deviceView3);


subject.OnNext(new DeviceVoltagesUpdateView { DeviceId = id2, Voltage = 101 });

Assert.AreEqual(deviceView2.View.Voltage, 101);


subject.OnNext(new DeviceVoltagesUpdateView { DeviceId = id3, Voltage = 101 });

Assert.AreEqual(deviceView3.View.DeviceId, id3);

Assert.AreEqual(deviceView3.View.Voltage, 101);

这一切都很好,并且可以在没有异步的情况下运行。


另外,作为一般提示,我建议使用Microsoft.Reactive.Testing包对 Rx 代码进行单元测试,而不是进行时间间隔测试。


查看完整回答
反对 回复 2023-09-16
  • 2 回答
  • 0 关注
  • 109 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信