1 回答
TA贡献1829条经验 获得超4个赞
我现在知道如何解决它,如果我开始在map材料化值中使用广播中心的结果源,它就会起作用:
Publisher<String> stringPublisher = Source .from(Lists.newArrayList("Hello", "World", "!")) .runWith(Sink.asPublisher(AsPublisher.WITH_FANOUT), materializer); Source .fromPublisher(stringPublisher) .alsoToMat(BroadcastHub.of(String.class, 256), Keep.right()) .mapMaterializedValue(source -> source .runWith(Sink.foreach(System.out::println, materializer)) .run(materializer) .toCompletableFuture() .get();
编辑:TL;DR:在光弯论坛上有这样的解释:
此处发生的情况是,当您附加另一个流时,主流流已经完成。有时,在完成之前查看一些元素可能足够快。
---
因此,看起来广播中心实际上在消费者附加到广播中心创建的源之前删除了元素。
文档说它不会掉落:
如果没有订阅者连接到此集线器,则它不会丢弃任何元素,而是对上游生产者进行背压,直到订阅者到达。
https://doc.akka.io/docs/akka/current/stream/stream-dynamic.html
实际上,在大多数情况下都是如此,但是我发现有些情况下它的行为不正确:
public void testBH3() throws ExecutionException, InterruptedException {
Publisher<String> stringPublisher = Source
.from(Lists.newArrayList("Hello", "World", "!"))
.runWith(Sink.asPublisher(AsPublisher.WITH_FANOUT), materializer);
Source<String, NotUsed> allMessages = Source
.fromPublisher(stringPublisher)
.toMat(BroadcastHub.of(String.class, 256), Keep.right())
.run(materializer);
allMessages
.runForeach(System.out::println, materializer)
.toCompletableFuture()
.get();
}
public void repeat() throws ExecutionException, InterruptedException {
for (int i = 0; i < 100; i++) {
testBH3();
System.out.println("------");
}
}
这适用于 100 个案例中的大约 3 个。但以下方法适用于所有情况(我只是添加了一个节流阀来产生较慢的元素):
public void testBH3() throws ExecutionException, InterruptedException {
Publisher<String> stringPublisher = Source
.from(Lists.newArrayList("Hello", "World", "!"))
.runWith(Sink.asPublisher(AsPublisher.WITH_FANOUT), materializer);
Source<String, NotUsed> allMessages = Source
.fromPublisher(stringPublisher)
.throttle(1, Duration.ofSeconds(1))
.toMat(BroadcastHub.of(String.class, 256), Keep.right())
.run(materializer);
allMessages
.runForeach(System.out::println, materializer)
.toCompletableFuture()
.get();
}
因此,在我看来,当没有已经连接Sink时,广播中心有时会丢弃元素。
添加回答
举报