为了账号安全,请及时绑定邮箱和手机立即绑定

缓冲 observable 以稳定慢观察者的可变延迟

缓冲 observable 以稳定慢观察者的可变延迟

C#
眼眸繁星 2021-11-07 20:06:42
我有一个 observable 产生一系列数字,这些数字之间的延迟范围从 0 到 1 秒(随机):var random = new Random();var randomDelaysObservable = Observable.Create<int>(async observer =>{    var value = 0;    while (true)    {        // delay from 0 to 1 second        var randomDelay = TimeSpan.FromSeconds(random.NextDouble());        await Task.Delay(randomDelay);        observer.OnNext(value++);    }    return Disposable.Empty;    // ReSharper disable once FunctionNeverReturns});我希望有一个消费者使用这些数字并将它们写出到控制台,但每 2 秒(恰好每两秒)只取一个数字。现在,我有这个观察者的代码(虽然我知道使用 是不正确的await):var delayedConsoleWritingObserver = Observer.Create<int>(async value =>{    // fixed delay of 2 seconds    var fixedDelay = TimeSpan.FromSeconds(2);    await Task.Delay(fixedDelay);    Console.WriteLine($"[{DateTime.Now:O}] Received value: {value}.");});randomDelaysObservable.Subscribe(delayedConsoleWritingObserver);如果生产者每 0 到 1 秒产生一个数字,而消费者只能每 2 秒消耗一个数字,很明显,生产者产生数字的速度比消费者消耗它们的速度快(背压)。我想要做的是能够提前“预加载”例如来自生产者的 10 或 20 个数字(如果消费者不能足够快地处理它们),以便消费者可以在没有随机延迟的情况下消费它们(但不是所有这些都因为可观察序列是无限的,如果它运行了一段时间,我们就会耗尽内存)。如果我有一个较慢的消费者,这将在某种程度上稳定来自生产者的可变延迟。不过,我不认为一个可能的解决方案如何在ReactiveX运营商做到这一点,我看的文件Buffer,Sample,Debounce和Window,和他们没有像我期待的东西。关于这如何可能的任何想法?请注意,即使我的观察者代码使用async/也不是真正正确await,但我想不出更好的方法来说明我想要实现的目标。
查看完整描述

2 回答

?
慕标5832272

TA贡献1966条经验 获得超4个赞

以下是使用纯 Rx 时您的 observables 的样子:


var producer = Observable.Generate(

    (r: new Random(), i: 0),                    // initial state

    _ => true,                                  // condition

    t => (t.r, t.i + 1),                        // iterator

    t => t.i,                                   // result selector

    t => TimeSpan.FromSeconds(t.r.NextDouble()) // timespan generator

);


var consumer = producer.Zip(

    Observable.Interval(TimeSpan.FromSeconds(2)),

    (_, i) => i

);

然而,“毫不拖延地抓住第一个 n”并不是一件容易的事情。所以我们可以创建一个非时间间隔的生产者:


var rawProducer = Observable.Range(0, int.MaxValue);

然后分别创建时间间隔:


var timeGaps = Observable.Repeat(TimeSpan.Zero).Take(10) //or 20

    .Concat(Observable.Generate(new Random(), r => true, r => r, r => TimeSpan.FromSeconds(r.NextDouble())));

然后结合这两个:


var timeGappedProducer = rawProducer.Zip(timeGaps, (i, ts) => Observable.Return(i).Delay(ts))

    .Concat();

消费者看起来基本相同:


var lessPressureConsumer = timeGappedProducer .Zip(

    Observable.Interval(TimeSpan.FromSeconds(2)),

    (_, i) => i

);

鉴于所有这些,我真的不明白你为什么要这样做。这不是处理背压的好方法,而且这个问题听起来有点像XY 问题。您提到的运算符 ( Sample、Throttle等) 是处理背压的更好方法。


查看完整回答
反对 回复 2021-11-07
?
慕神8447489

TA贡献1780条经验 获得超1个赞

您所描述的问题非常适合在生产者和消费者之间共享的简单有界缓冲区。生产者必须有一个与写入缓冲区相关的条件,说明缓冲区不能满。消费者必须有一个条件,说明缓冲区不能为空。请参阅以下使用 Ada 语言的示例。


with Ada.Text_IO; use Ada.Text_IO;


procedure Main is

   type Order_Nums is range 1..10_000;

   Type Index is mod 10;

   type Buf_T is array(Index) of Order_Nums;


   protected Orders is

      entry Prepare(Order : in Order_Nums);

      entry Sell(Order : out Order_Nums);

   private

      Buffer  : Buf_T;

      P_Index : Index := Index'First;

      S_Index : Index := Index'First;

      Count   : Natural := 0;

   end Orders;


   protected body Orders is

      entry Prepare(Order : in Order_Nums) when Count < Index'Modulus is

      begin

         Buffer(P_Index) := Order;

         P_Index := P_Index + 1;

         Count := Count + 1;

      end Prepare;


      entry Sell(Order : out Order_Nums) when Count > 0 is

      begin

         Order := Buffer(S_Index);

         S_Index := S_Index + 1;

         Count := Count - 1;

      end Sell;

   end Orders;


   task Chef is

      Entry Stop;

   end Chef;


   task Seller is

      Entry Stop;

   end Seller;


   task body Chef is

      The_Order : Order_Nums := Order_Nums'First;

   begin

      loop

         select

            accept Stop;

            exit;

         else

            delay 1.0; -- one second

            Orders.Prepare(The_Order);

            Put_Line("Chef made order number " & The_Order'Image);

            The_Order := The_Order + 1;

            exit when The_Order = Order_Nums'Last;

         end select;

      end loop;

   end Chef;


   task body Seller is

      The_Order : Order_Nums;

   begin

      loop

         select

            accept Stop;

            exit;

         else

            delay 2.0; -- two seconds

            Orders.Sell(The_Order);

            Put_Line("Sold order number " & The_Order'Image);

         end select;

      end loop;

   end Seller;


begin

   delay 60.0; -- 60 seconds

   Chef.Stop;

   Seller.Stop;

end Main;

共享缓冲区名为 Orders。Orders 包含一个 10 Order_Nums 的循环缓冲区。包含订单的数组的索引被声明为mod 10包含 0 到 9 的值。Ada 模块化类型表现出环绕算术,因此递增超过 9 环绕为 0。Prepare 条目具有边界条件,要求Count < Index'Modulus其计算结果为 Count < 10在这种情况下。卖出条目有一个边界条件Count < 0。Chef 任务等待 1 秒来制作比萨饼,但一直等到缓冲区中有空间。只要缓冲区中有空间,Chef 就会生成一个订单。卖家等待 2 秒以消费订单。每个任务在其停止条目被调用时终止。Main 等待 60 秒,然后为每个任务调用停止条目。


查看完整回答
反对 回复 2021-11-07
  • 2 回答
  • 0 关注
  • 285 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信