阿卡:有没有一个从不拉动的水槽



需要一个永不拉取的Sink,以便在单元测试中使用。是否已经可用,还是我需要自己编码?

请注意,Sink.ignore()无济于事,因为它总是拉动。我需要一个永不拉动的水槽。

直接回答

您可以创建一个从不调用Subscription.requestorg.reactivestreams.Subscriber

import org.reactivestreams.Subscriber
def nonSubscriber[T] : Subscriber[T] = new Subscriber[T] {
  override def onComplete() : Unit = {}
  override def onError(throwable: java.lang.Throwable) : Unit = {}
  //should never be called therefore definition is not implemented
  override def onNext(t: T) : Unit = ???
  //does not call s.request
  override def onSubscribe(s: Subscription) : Unit = {}
} 

然后,此订阅服务器可用于实例化Sink

import akka.NotUsed
import akka.stream.scaladsl.Sink
def nonSubscribingSink[T] : Sink[T, NotUsed] = 
  Sink.fromSubscriber[T](nonSubscriber[T])

间接答案

问题的性质表明您正在将"业务逻辑"与 akka 流逻辑混合在一起。 您可能需要考虑重新设计,这可能会使您的问题的答案变得不必要。

最终创建了我自己的实现:

// sink that does not pull
val snkStage = object : GraphStage<SinkShape<Message>>() {
    val shape = SinkShape(Inlet.create<Message>("in"))
    override fun shape() = shape
    override fun createLogic(inheritedAttributes: Attributes): GraphStageLogic = object : GraphStageLogic(shape) {
        init {
            setHandler(shape.`in`()) {}
        }
    }
}

但后来决定使用更传统的Sink.ignore()Source.maybe()组合。

最新更新