我们有以下用例:从主题读取(预期吞吐量是一个键每 2 秒记录一次),groupByKey 并执行 30 分钟窗口的窗口聚合,跳跃周期为 1 分钟。聚合只是附加收到的记录。当应用程序启动时,一切正常,但在后期阶段,当聚合大小增加时,应用程序会变慢并滞后拓扑结构:KStream<String, Foo> numericStream = builder.stream("topic", Consumed.with(Serdes.String(), FooSerde));static Duration WINDOW_MS = Duration.ofMinutes(30);static Duration ADVANCE_MS = Duration.ofMinutes(15);KStream<Windowed<String>, Foo1> windowedStream = numericStream.peek((key, value) -> System.out.println(value.getDateTime())) .groupByKey() .windowedBy((TimeWindows.of(WINDOW_MS).advanceBy(ADVANCE_MS)).grace(Duration.ofMillis(30))) .aggregate(new Initializer<Foo1>() { @Override public Foo1 apply() { return new Foo1(); }}, (key, value, aggregate) -> { aggregate.append(value); return aggregate; }, Materialized.<String, Foo1, WindowStore<Bytes,byte[]>>as("some_name").withValueSerde(Foo1Serde)) .toStream() .peek((key, value) -> System.out.println(" Key: "+key+ " Start: "+getISTTime(key.window().start()) + " End: "+ getISTTime(key.window().end()) +" Count: " + value.getCount() ));每条记录的大小约为20KB。当聚合大小超过 10MB 左右时,记录的处理时间会超过 2 秒,因此会出现滞后。COMMIT_INTERVAL_MS_CONFIG 设置为 0,因为状态存储应始终与最新数据包保持同步,并且状态存储会被查询并且间隔不同。如何消除应用程序的延迟,是否与 RocksDB I/O 操作有关?因为计数操作而不是聚合操作没有任何滞后每个主题有 3 个分区,但是具有相同键的记录会转到同一分区,那么线程/多个实例会有帮助吗?我们也在考虑在不使用窗口的情况下执行此操作,窗口是否会对较大的聚合产生这种滞后?
1 回答
慕田峪7331174
TA贡献1828条经验 获得超13个赞
由于您向 RocksDB 写入和读取越来越大的数据,因此可能会减慢处理速度。
是的,在一个实例中使用三个线程或启动三个实例各一个线程也可能在这种情况下有所帮助。通过您的拓扑和三个分区,处理分布在三个任务上。如果只有一个实例和一个线程,则所有三个任务将由同一线程运行。您可以通过指定一个具有三个线程的实例来进行纵向扩展,也可以通过在不同的计算节点上启动三个实例(每个实例具有一个线程)来进行横向扩展。两个实例之间的设置(一个具有两个线程,另一个具有一个线程)也可以工作。
如果没有窗口,聚合将永远不会过期,也永远不会从状态存储中删除。因此,状态存储中的数据将无限增长,并且可能会减慢状态存储的速度。
如果使用交互式查询来查询状态存储,则无需将 COMMIT_INTERVAL_MS_CONFIG 设置为 0,因为交互式查询还会查询状态存储前面的缓存。实际上,将 COMMIT_INTERVAL_MS_CONFIG 设置为零也可能会减慢处理速度,因为它会增加磁盘 I/O,因为您不断地将数据写入磁盘。
添加回答
举报
0/150
提交
取消