可以发出值的akka.streams.Source(类似于monix.BehaviorSubject)



我正在寻找akka.stream.scaladsl.Source构造方法,该方法将允许我简单地从代码的不同位置发出下一个值(例如,监视系统事件(。

  • 我需要一些类似Promise的东西。Promise向Future发出单个值。我需要向Source发出多个值
  • monix.reactive.subjects.BehaviorSubject.onNext(_)
  • 我不太在乎背压

目前我已经使用monix&akka流(下面的代码(,但我认为应该有只有akka流的解决方案:

import akka.stream.scaladsl.{Flow, Sink, Source}
import monix.reactive.subjects.BehaviorSubject
import monix.execution.Scheduler.Implicits.global
val bs = BehaviorSubject("") //monix subject is sink and source at the same time
//this is how it is currently implemented
def createSource() = { 
    val s1 = Source.fromPublisher(bs.toReactivePublisher) //here we create source from subject
    Flow.fromSinkAndSourceCoupled[String, String](Sink.ignore, s1)
}
//somewhere else in code... some event happened
//this is how it works in monix.
val res:Future[Ack] = bs.onNext("Event #1471 just happened!") //here we emit value

也许您正在寻找Actor Source

文档中的一个示例:

import akka.actor.typed.ActorRef
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.{ Sink, Source }
import akka.stream.typed.scaladsl.ActorSource
trait Protocol
case class Message(msg: String) extends Protocol
case object Complete extends Protocol
case class Fail(ex: Exception) extends Protocol
val source: Source[Protocol, ActorRef[Protocol]] = ActorSource.actorRef[Protocol](completionMatcher = {
  case Complete =>
}, failureMatcher = {
  case Fail(ex) => ex
}, bufferSize = 8, overflowStrategy = OverflowStrategy.fail)
val ref = source
  .collect {
    case Message(msg) => msg
  }
  .to(Sink.foreach(println))
  .run()
ref ! Message("msg1")

通过这种方式,您将能够通过actor系统向actor发送消息,并且这些消息将从ActorSource向下流发送。

Source抽象,顾名思义,提供API来处理数据源。相反,您需要关注消耗数据的抽象——Sink。而Sink.foreach操作正是您想要的,很可能是:https://doc.akka.io/docs/akka/current/stream/operators/Sink/foreach.html

在您的情况下,代码看起来像:

import akka.stream.scaladsl.{Sink, Source}
val s1 = Source.// your WS akka stream source
s1.runWith(Sink.foreach(write))

希望这能有所帮助!

我想您想要的是sink.foreach。它为接收到的每个元素调用一个给定的过程。我认为代码将看起来像:

s1.runWith(Sink.foreach(write))

从本质上讲,所做的是,对于流一个源,sink尝试写入该流的每个元素。

编辑

我想你在找maybe。它创建了一个源,一旦物化的Promise完成并具有值,该源就会发出。查看此文档

编辑

futureSource也可能起作用。一旦成功完成,它就会流式传输给定未来源的元素

如果有帮助,请告诉我!!

https://doc.akka.io/docs/akka/current/stream/operators/Source/fromIterator.html或https://doc.akka.io/docs/akka/current/stream/operators/Source/fromPublisher.html是您所需要的,这取决于您的Source从何处使用数据。

相关内容

  • 没有找到相关文章

最新更新