2 回答
TA贡献1966条经验 获得超4个赞
以下是使用纯 Rx 时您的 observables 的样子:
var producer = Observable.Generate(
(r: new Random(), i: 0), // initial state
_ => true, // condition
t => (t.r, t.i + 1), // iterator
t => t.i, // result selector
t => TimeSpan.FromSeconds(t.r.NextDouble()) // timespan generator
);
var consumer = producer.Zip(
Observable.Interval(TimeSpan.FromSeconds(2)),
(_, i) => i
);
然而,“毫不拖延地抓住第一个 n”并不是一件容易的事情。所以我们可以创建一个非时间间隔的生产者:
var rawProducer = Observable.Range(0, int.MaxValue);
然后分别创建时间间隔:
var timeGaps = Observable.Repeat(TimeSpan.Zero).Take(10) //or 20
.Concat(Observable.Generate(new Random(), r => true, r => r, r => TimeSpan.FromSeconds(r.NextDouble())));
然后结合这两个:
var timeGappedProducer = rawProducer.Zip(timeGaps, (i, ts) => Observable.Return(i).Delay(ts))
.Concat();
消费者看起来基本相同:
var lessPressureConsumer = timeGappedProducer .Zip(
Observable.Interval(TimeSpan.FromSeconds(2)),
(_, i) => i
);
鉴于所有这些,我真的不明白你为什么要这样做。这不是处理背压的好方法,而且这个问题听起来有点像XY 问题。您提到的运算符 ( Sample、Throttle等) 是处理背压的更好方法。
TA贡献1780条经验 获得超1个赞
您所描述的问题非常适合在生产者和消费者之间共享的简单有界缓冲区。生产者必须有一个与写入缓冲区相关的条件,说明缓冲区不能满。消费者必须有一个条件,说明缓冲区不能为空。请参阅以下使用 Ada 语言的示例。
with Ada.Text_IO; use Ada.Text_IO;
procedure Main is
type Order_Nums is range 1..10_000;
Type Index is mod 10;
type Buf_T is array(Index) of Order_Nums;
protected Orders is
entry Prepare(Order : in Order_Nums);
entry Sell(Order : out Order_Nums);
private
Buffer : Buf_T;
P_Index : Index := Index'First;
S_Index : Index := Index'First;
Count : Natural := 0;
end Orders;
protected body Orders is
entry Prepare(Order : in Order_Nums) when Count < Index'Modulus is
begin
Buffer(P_Index) := Order;
P_Index := P_Index + 1;
Count := Count + 1;
end Prepare;
entry Sell(Order : out Order_Nums) when Count > 0 is
begin
Order := Buffer(S_Index);
S_Index := S_Index + 1;
Count := Count - 1;
end Sell;
end Orders;
task Chef is
Entry Stop;
end Chef;
task Seller is
Entry Stop;
end Seller;
task body Chef is
The_Order : Order_Nums := Order_Nums'First;
begin
loop
select
accept Stop;
exit;
else
delay 1.0; -- one second
Orders.Prepare(The_Order);
Put_Line("Chef made order number " & The_Order'Image);
The_Order := The_Order + 1;
exit when The_Order = Order_Nums'Last;
end select;
end loop;
end Chef;
task body Seller is
The_Order : Order_Nums;
begin
loop
select
accept Stop;
exit;
else
delay 2.0; -- two seconds
Orders.Sell(The_Order);
Put_Line("Sold order number " & The_Order'Image);
end select;
end loop;
end Seller;
begin
delay 60.0; -- 60 seconds
Chef.Stop;
Seller.Stop;
end Main;
共享缓冲区名为 Orders。Orders 包含一个 10 Order_Nums 的循环缓冲区。包含订单的数组的索引被声明为mod 10包含 0 到 9 的值。Ada 模块化类型表现出环绕算术,因此递增超过 9 环绕为 0。Prepare 条目具有边界条件,要求Count < Index'Modulus其计算结果为 Count < 10在这种情况下。卖出条目有一个边界条件Count < 0。Chef 任务等待 1 秒来制作比萨饼,但一直等到缓冲区中有空间。只要缓冲区中有空间,Chef 就会生成一个订单。卖家等待 2 秒以消费订单。每个任务在其停止条目被调用时终止。Main 等待 60 秒,然后为每个任务调用停止条目。
- 2 回答
- 0 关注
- 285 浏览
添加回答
举报