2 回答
TA贡献1943条经验 获得超7个赞
接受的答案中给出的实现虽然对我来说是一次神奇的教育,但也有一些问题需要依次解决。第一个是线程竞争问题,第二个是系统中存在大量设备时的性能问题。我最终解决了线程竞争并通过这个修改后的实现显着提高了性能:
在构造函数中,分组和扫描的设备流直接订阅到 a BehaviorSubject,它实现了Replay(1).RefCount()立即通知新订阅者流中最新值所需的功能。
在该方法中,我们继续使用字典查找来查找设备流,如果字典中尚不存在则GetDeviceStream创建预加载。BehaviorSubject我们已经删除了Where上述问题中先前实现中存在的搜索。使用 where 搜索会导致线程竞争问题,该问题通过使分组流可重播得以解决。这导致了指数性能问题。替换它,将FirstOrDefault花费的时间减少一半,然后完全删除它以支持GetCreate字典技术,从而获得完美的性能 O(1) 而不是 O(n2)。
GetCreateSubject使用Lazy代理对象作为字典值,因为有时可以针对单个键多次ConcurrentDictionary调用该方法。向字典Create提供 a可确保仅在其中一个懒惰者上调用该属性,因此每个设备只创建一个。LazyValueBehaviorSubject
class DeviceService : IDeviceService, IDisposable {
readonly CompositeDisposable _disposable = new CompositeDisposable();
readonly ConcurrentDictionary<Guid, Lazy<BehaviorSubject<DeviceUpdateEvent>>> _streams = new ConcurrentDictionary<Guid, Lazy<BehaviorSubject<DeviceUpdateEvent>>>();
BehaviorSubject<DeviceUpdateEvent> GetCreateSubject(Guid deviceId) {
return _streams.GetOrAdd(deviceId, Create).Value;
Lazy<BehaviorSubject<DeviceUpdateEvent>> Create(Guid id) {
return new Lazy<BehaviorSubject<DeviceUpdateEvent>>(() => {
var subject = new BehaviorSubject<DeviceUpdateEvent>(DeviceUpdateEvent.GetInitialView(deviceId));
_disposable.Add(subject);
return subject;
});
}
}
public DeviceService(IConnectableObservable<IDeviceView> source) {
_disposable.Add(source
.GroupBy(x => x.DeviceId)
.Subscribe(deviceStream => {
_disposable.Add(deviceStream
.Scan(DeviceUpdateEvent.GetInitialView(deviceStream.Key), DeviceUtils.Update)
.Subscribe(GetCreateSubject(deviceStream.Key)));
}));
_disposable.Add(source.Connect());
}
public void Dispose() {
_disposable.Dispose();
}
public IObservable<DeviceUpdateEvent> GetDeviceStream(Guid deviceId) {
return GetCreateSubject(deviceId).AsObservable();
}
}
[TestMethod]
public async Task Test2() {
var input = new AsyncProducerConsumerQueue<IDeviceView>();
var source = new ConnectableObservableForAsyncProducerConsumerQueue<IDeviceView>(input);
var service = new DeviceService(source);
var ids = Enumerable.Range(0, 100000).Select(i => Guid.NewGuid()).ToArray();
var idsRemaining = ids.ToHashSet();
var t1 = Task.Run(async () => {
foreach (var id in ids) {
await input.EnqueueAsync(new DeviceVoltagesUpdateView { DeviceId = id, Voltage = 1 });
}
});
var t2 = Task.Run(() => {
foreach (var id in ids) {
service.GetDeviceStream(id).Subscribe(x => idsRemaining.Remove(x.View.DeviceId));
}
});
await Task.WhenAll(t1, t2);
var sw = Stopwatch.StartNew();
while (idsRemaining.Count > 0) {
if (sw.Elapsed.TotalSeconds > 600) throw new Exception("Failed");
await Task.Delay(100);
}
}
在这里查看整个问题源代码和测试代码: https: //github.com/bboyle1234/ReactiveTest
TA贡献1785条经验 获得超4个赞
你的要点中有一些奇怪的代码。这是我的工作内容:
public class DeviceService : IDeviceService, IDisposable
{
readonly IObservable<IDeviceView> Source;
private readonly Dictionary<Guid, IObservable<DeviceUpdateEvent>> _updateStreams = new Dictionary<Guid, IObservable<DeviceUpdateEvent>>();
private readonly IObservable<(Guid, IObservable<DeviceUpdateEvent>)> _groupedStream;
private readonly CompositeDisposable _disposable = new CompositeDisposable();
public DeviceService(IObservable<IDeviceView> source)
{
Source = source;
_groupedStream = source
.GroupBy(v => v.DeviceId)
.Select(o => (o.Key, o
.Scan(new DeviceUpdateEvent { View = DeviceTotalView.GetInitialView(o.Key), LastUpdate = null }, (lastTotalView, newView) => lastTotalView.Update(newView))
.Replay(1)
.RefCount()
));
var groupSubscription = _groupedStream.Subscribe(t =>
{
_updateStreams[t.Item1] = t.Item2;
_disposable.Add(t.Item2.Subscribe());
});
_disposable.Add(groupSubscription);
}
public void Dispose()
{
_disposable.Dispose();
}
public IObservable<DeviceUpdateEvent> GetDeviceStream(Guid deviceId)
{
/// How do we implement this? The observable that we return must be pre-loaded with the latest update
if(this._updateStreams.ContainsKey(deviceId))
return this._updateStreams[deviceId];
return _groupedStream
.Where(t => t.Item1 == deviceId)
.Select(t => t.Item2)
.Switch();
}
}
这里的肉就是_groupedStream一块。正如您所说,您按 DeviceId 进行分组,然后用于Scan更新状态。我还转移Update到静态类并使其成为扩展方法。你需要一个初始状态,所以我修改了你的DeviceTotalView类来获得它。相应修改:
public class DeviceTotalView : IDeviceView
{
public Guid DeviceId { get; set; }
public int Voltage { get; set; }
public int Currents { get; set; }
public object Clone() => this.MemberwiseClone();
public static DeviceTotalView GetInitialView(Guid deviceId)
{
return new DeviceTotalView
{
DeviceId = deviceId,
Voltage = 0,
Currents = 0
};
}
}
接下来,它.Replay(1).Refcount()会记住最新的更新,然后在订阅时提供该更新。然后,我们将所有这些子可观察量填充到字典中,以便在方法调用时轻松检索。虚拟订阅 ( _disposable.Add(t.Item2.Subscribe())) 是Replay正常工作所必需的。
如果早期请求尚未更新的 DeviceId,我们会订阅该设备,它将_groupedStream等待第一次更新,生成该 Id 的可观察值,然后.Switch订阅该子可观察值。
然而,所有这些都在你的测试代码中失败了,我猜是因为这个ConnectableObservableForAsyncProducerConsumerQueue类。我不想调试它,因为我不建议这样做。一般来说,不建议混合 TPL 和 Rx 代码。他们解决的问题大部分是重叠的,而且互相妨碍。因此,我修改了您的测试代码,用重播主题替换了可连接的可观察队列。
我还添加了早期请求的测试用例(在该设备的更新到达之前):
DeviceUpdateEvent deviceView1 = null;
DeviceUpdateEvent deviceView2 = null;
DeviceUpdateEvent deviceView3 = null;
var subject = new ReplaySubject<IDeviceView>();
var id1 = Guid.NewGuid();
var id2 = Guid.NewGuid();
var id3 = Guid.NewGuid();
subject.OnNext(new DeviceVoltagesUpdateView { DeviceId = id1, Voltage = 1 });
subject.OnNext(new DeviceVoltagesUpdateView { DeviceId = id1, Voltage = 2 });
subject.OnNext(new DeviceVoltagesUpdateView { DeviceId = id2, Voltage = 100 });
var service = new DeviceService(subject);
service.GetDeviceStream(id1).Subscribe(x => deviceView1 = x);
service.GetDeviceStream(id2).Subscribe(x => deviceView2 = x);
service.GetDeviceStream(id3).Subscribe(x => deviceView3 = x);
/// I believe there is no need to pause here because the Subscribe method calls above
/// block until the events have all been pushed into the subscribers above.
Assert.AreEqual(deviceView1.View.DeviceId, id1);
Assert.AreEqual(deviceView2.View.DeviceId, id2);
Assert.AreEqual(deviceView1.View.Voltage, 2);
Assert.AreEqual(deviceView2.View.Voltage, 100);
Assert.IsNull(deviceView3);
subject.OnNext(new DeviceVoltagesUpdateView { DeviceId = id2, Voltage = 101 });
Assert.AreEqual(deviceView2.View.Voltage, 101);
subject.OnNext(new DeviceVoltagesUpdateView { DeviceId = id3, Voltage = 101 });
Assert.AreEqual(deviceView3.View.DeviceId, id3);
Assert.AreEqual(deviceView3.View.Voltage, 101);
这一切都很好,并且可以在没有异步的情况下运行。
另外,作为一般提示,我建议使用Microsoft.Reactive.Testing包对 Rx 代码进行单元测试,而不是进行时间间隔测试。
- 2 回答
- 0 关注
- 109 浏览
添加回答
举报