我有一个java接口,我必须实现它看起来像这样:
public Flow.Publisher<Packet> getLivePublisher();
该接口必须返回一个非活动的Flow.Publisher
,直到它被订阅,并且订户调用Subscription.next(n)
。
return Source
.fromIterator(() -> new LivePacketIterator())
.async("live-dispatcher")
.runWith(JavaFlowSupport.Sink.asPublisher(AsPublisher.WITHOUT_FANOUT), actorSystem);
不幸的是,这似乎立即开始从我的LivePacketIterator
中获取元素,即使没有订阅者订阅返回的Flow.Publisher
。
我理解Source
只是一种可订阅对象源的模板(我的理解是它就像一个出版商工厂),并且它只有在具体化后才转换为具体的活动源。因此,如果我理解正确的话,我需要以某种方式实现我的Source
以获得Flow.Publisher
。但是我想让它具体化,只有当它被订阅时才开始运行。
我也试过使用toMat()
return Source
.fromIterator(() -> new LivePacketIterator(maximumPacketSize))
.filter(OrderSnapshotPacket::isNotEmpty)
.async(dbDispatcher)
.toMat(JavaFlowSupport.Sink.asPublisher(AsPublisher.WITHOUT_FANOUT), Keep.right())
.???;
但我不确定如何处理产生的RunnableGraph
.
我理解对了吗?有什么办法能做到我想做的吗?
不幸的是,这似乎立即开始从我的LivePacketIterator获取元素,即使没有订阅者订阅返回的Flow.Publisher。
你到底观察到什么来陈述这一点?我用了一个和你的非常相似的代码片段:
Flow.Publisher<Integer> integerPublisher =
Source.from(List.of(1,2,3,4,5))
.wireTap(System.out::println)
.async()
.runWith(
JavaFlowSupport.Sink.asPublisher(AsPublisher.WITHOUT_FANOUT),
ActorSystem.create());
这将不会开始从列表中发出项目,直到订阅了发布者。
我理解一个源只是一种模板的可订阅源的对象(我的理解是,它就像一个出版商的工厂),它只有转换为一个具体的活动源一旦它具体化
。所有Flow.*
接口都是JVM响应式流规范的一部分。Akka Streams将这些接口视为SPI,不会直接在其API中使用它们。它引入了自己的抽象,如Source
、Flow
和Sink
。Akka Streams允许您将其API中表示的处理流转换为低级流。就像你在片段里做的那样。如果你想将Akka Streams处理管道插件到其他响应式流实现(如RxJava或Project Reactor)中,这是非常有用的。所以Source
是Akka Stream的抽象,在某种程度上等同于Flow.Publisher
,也就是说,它是一个潜在无限数量的值的来源。您需要将Source
连接到Sink
(可能通过Flow
),以便获得可以运行的RunnableGraph
。这将启动一切,在大多数情况下,将导致订阅链,元素将开始在流中流动。但这并不是JavaFlowSupport.Sink.asPublisher
Sink
的唯一选择,运行RunnableGraph
将把整个Akka流转换为Flow.Publisher
的实例。这里的语义是,订阅被延迟,直到某个地方在该实例上调用subscribe
。如果我没理解错的话,这正是你想要达到的目的。