Akka Streams丢弃消息



我是Akka Streams的新手。我在Java中使用它。(akka-stream_2.12,版本:2.5.14(。

我写了以下课程:

package main;
import java.io.IOException;
import akka.actor.ActorSystem;
import akka.stream.ActorMaterializer;
import akka.stream.Materializer;
import akka.stream.OverflowStrategy;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.stream.javadsl.SourceQueueWithComplete;
public class AkkaTest {
public static void main(String[] args) throws IOException, InterruptedException {
final ActorSystem actorSystem = ActorSystem.create("VehicleSystem");
final Materializer materializer = ActorMaterializer.create(actorSystem);
SourceQueueWithComplete<Object> componentA_outPort1 = 
Source.<Object>queue(100, OverflowStrategy.backpressure()).async()
.to(Sink.foreach(str -> System.out.println(str)))
.run(materializer);
for(int i=1; i<100000; i++)
componentA_outPort1.offer("Akka rocks: " + i);
System.in.read();
actorSystem.terminate();
System.out.println("Done.");
}
}

我希望代码能打印100000条消息,因为这是迭代次数。相反,它只打印1-101条消息,然后从大约61000条开始打印消息(即"Akka rocks:61000"(。

因此,大多数信息都没有打印出来。你能解释一下原因吗?

这里问题的第一个提示是"Done."最后没有打印到控制台。相反,它被印在开头或"阿卡岩石"印刷品之间的某个地方。

原因是SourceQueue.offer是异步的。它返回一个CompletionStage,您不需要等待它的完成。一些流元素"丢失"的事实可以通过方法的文档来解释,特别是以下部分:

此外,当使用背压溢出策略时:-如果缓冲区已满,则在缓冲区中有空间之前,Future不会完成-在这种情况下,在Future完成之前调用报价将返回失败的Future

您可以通过执行以下操作进行验证:

SourceQueueWithComplete<Object> componentA_outPort1 = 
Source.<Object>queue(100, OverflowStrategy.backpressure()).async()
.to(Sink.foreach(str -> System.out.println(str)))
.run(materializer);
for (int i=1; i<100000; i++) {
CompletionStage<QueueOfferResult> result = componentA_outPort1.offer("Akka rocks: " + i);
System.out.println(result);
}

你会看到很多这样的"scala.current.java8.FuturesConvertersImpl$CF@39471dfa[未完成]">

为了解决这个问题,你应该等待报价的CompletionStage完成,有效地使整个呼叫同步,这似乎是你的意图:

SourceQueueWithComplete<Object> componentA_outPort1 = 
Source.<Object>queue(100, OverflowStrategy.backpressure()).async()
.to(Sink.foreach(str -> System.out.println(str)))
.run(materializer);
for (int i=1; i<100000; i++) {
componentA_outPort1.offer("Akka rocks: " + i).toCompletableFuture().join();
}

仍然"完成"不一定会在最后打印出来,因为报价的完成只保证队列接受了元素,而不是它被完全处理了。此外,请记住,actorSystem.terminate()也是异步的。

上面的方法适用于您的情况,但在某些情况下,可能不希望阻塞当前线程。对于像您这样的简单案例,使用不同的Source:可以轻松避免

Source.range(1,  1000).map(i -> "Akka rocks: " + i)

对于更复杂的情况,请考虑Source的其他静态方法,如采用Iterable的Source.fromSource.fromIterator

两件事:

  1. 喂一个SourceQueue的惯用方法是喂另一个Source(正如Pedro在回答中提到的(
  2. 您可能会在流处理完成之前终止actor系统

一旦接收器的物化值完成,就关闭系统:

import java.util.concurrent.CompletionStage;
import akka.Done;
import akka.japi.Pair;
import akka.stream.javadsl.Keep;
import akka.stream.javadsl.RunnableGraph;
// other imports...
Sink<String, CompletionStage<Done>> sink =
Sink.foreach(str -> System.out.println(str));
Source<String, SourceQueueWithComplete<String>> outPort =
Source.<String>queue(100, OverflowStrategy.backpressure()).async();
RunnableGraph<Pair<SourceQueueWithComplete<String>, CompletionStage<Done>>> stream =
outPort.toMat(sink, Keep.both());
Pair<SourceQueueWithComplete<String>, CompletionStage<Done>> pair = stream.run();
Source.range(1, 100000)
.map(i -> "Akka rocks: " + i)
.mapAsync(1, s -> pair.first().offer(s))
.runWith(Sink.ignore(), materializer);
pair.second().thenRun(() -> actorSystem.terminate());

最新更新