如何创建流.从akka流源订阅开始的发布者



我有一个java接口,我必须实现它看起来像这样:

public Flow.Publisher<Packet> getLivePublisher();

该接口必须返回一个非活动的Flow.Publisher,直到它被订阅,并且订户调用Subscription.next(n)

到目前为止,我的实现看起来像
return Source
.fromIterator(() -> new LivePacketIterator())
.async("live-dispatcher")
.runWith(JavaFlowSupport.Sink.asPublisher(AsPublisher.WITHOUT_FANOUT), actorSystem);

不幸的是,这似乎立即开始从我的LivePacketIterator中获取元素,即使没有订阅者订阅返回的Flow.Publisher

我理解Source只是一种可订阅对象源的模板(我的理解是它就像一个出版商工厂),并且它只有在具体化后才转换为具体的活动源。因此,如果我理解正确的话,我需要以某种方式实现我的Source以获得Flow.Publisher。但是我想让它具体化,只有当它被订阅时才开始运行。

我也试过使用toMat()

return Source
.fromIterator(() -> new LivePacketIterator(maximumPacketSize))
.filter(OrderSnapshotPacket::isNotEmpty)
.async(dbDispatcher)
.toMat(JavaFlowSupport.Sink.asPublisher(AsPublisher.WITHOUT_FANOUT), Keep.right())
.???;

但我不确定如何处理产生的RunnableGraph.

我理解对了吗?有什么办法能做到我想做的吗?

不幸的是,这似乎立即开始从我的LivePacketIterator获取元素,即使没有订阅者订阅返回的Flow.Publisher。

你到底观察到什么来陈述这一点?我用了一个和你的非常相似的代码片段:

Flow.Publisher<Integer> integerPublisher =
Source.from(List.of(1,2,3,4,5))
.wireTap(System.out::println)
.async()
.runWith(
JavaFlowSupport.Sink.asPublisher(AsPublisher.WITHOUT_FANOUT),
ActorSystem.create());

这将不会开始从列表中发出项目,直到订阅了发布者。

我理解一个源只是一种模板的可订阅源的对象(我的理解是,它就像一个出版商的工厂),它只有转换为一个具体的活动源一旦它具体化

。所有Flow.*接口都是JVM响应式流规范的一部分。Akka Streams将这些接口视为SPI,不会直接在其API中使用它们。它引入了自己的抽象,如SourceFlowSink。Akka Streams允许您将其API中表示的处理流转换为低级流。就像你在片段里做的那样。如果你想将Akka Streams处理管道插件到其他响应式流实现(如RxJava或Project Reactor)中,这是非常有用的。所以Source是Akka Stream的抽象,在某种程度上等同于Flow.Publisher,也就是说,它是一个潜在无限数量的值的来源。您需要将Source连接到Sink(可能通过Flow),以便获得可以运行RunnableGraph。这将启动一切,在大多数情况下,将导致订阅链,元素将开始在流中流动。但这并不是JavaFlowSupport.Sink.asPublisherSink的唯一选择,运行RunnableGraph将把整个Akka流转换为Flow.Publisher的实例。这里的语义是,订阅被延迟,直到某个地方在该实例上调用subscribe。如果我没理解错的话,这正是你想要达到的目的。

最新更新