如何从Akka-stream中的流中创建源?(编程反应性系统活动)



我正在尝试完成EFPL中的最后一个分配(命名为反应性关注者( - EDX平台的编程反应性系统课程。

我能够实现除outgoingFlow外的所有功能。

在我看来,我应该以某种方式从现有流中创建一个新来源,经过一些阅读,我仍然没有意识到如何执行流程以生成新来源的元素。

我尝试使用mapConcat,但没有成功。

我认为现有流是这样的:

eventParserFlow
.via(followersFlow)
.filter(p => isNotified(userId)(p))

现有Flow S的类型和我的暂定do实施outgoingFlow可以在此处看到:

val eventParserFlow: Flow[ByteString, Event, NotUsed]
val followersFlow: Flow[Event, (Event, Followers), NotUsed]
def outgoingFlow(userId: Int): Source[ByteString, NotUsed] = {
  eventParserFlow
    .via(followersFlow)
    .filter(p => isNotified(userId)(p))
    .mapConcat { case (e, _) => e.render }
  ???
}

任何人都可以向我指出一些阅读或我如何解决Akka中的类似问题的示例?

只是注释 - 因此,这不是这些问题的最佳资源。您应该使用的是相应的EDX课程中的讨论部分


关于您的问题 - 我不会给您明确的答案,只有很少的提示。

在akka-streams中,您不能仅在Flow中创建SourceFlow负责转换,而Source创建了新事件。在您的作业中,您只是忘记使用可用值之一。

  1. 仔细阅读class Server中的评论(不是object(。
  2. 仔细观察val (inboundSink, broadcastOut) = ...,并尝试弄清每种vals的目的以及它们如何相互关系以及应用程序本身。了解他们的类型是什么会有所帮助

这些提示应该足以了解如何实现outgoingFlow,即Source[ByteString, NotUsed]