我正在尝试完成EFPL中的最后一个分配(命名为反应性关注者( - EDX平台的编程反应性系统课程。
我能够实现除outgoingFlow
外的所有功能。
在我看来,我应该以某种方式从现有流中创建一个新来源,经过一些阅读,我仍然没有意识到如何执行流程以生成新来源的元素。
我尝试使用mapConcat
,但没有成功。
我认为现有流是这样的:
eventParserFlow
.via(followersFlow)
.filter(p => isNotified(userId)(p))
现有Flow
S的类型和我的暂定do实施outgoingFlow
可以在此处看到:
val eventParserFlow: Flow[ByteString, Event, NotUsed]
val followersFlow: Flow[Event, (Event, Followers), NotUsed]
def outgoingFlow(userId: Int): Source[ByteString, NotUsed] = {
eventParserFlow
.via(followersFlow)
.filter(p => isNotified(userId)(p))
.mapConcat { case (e, _) => e.render }
???
}
任何人都可以向我指出一些阅读或我如何解决Akka中的类似问题的示例?
只是注释 - 因此,这不是这些问题的最佳资源。您应该使用的是相应的EDX课程中的讨论部分
关于您的问题 - 我不会给您明确的答案,只有很少的提示。
在akka-streams中,您不能仅在Flow
中创建Source
。Flow
负责转换,而Source
创建了新事件。在您的作业中,您只是忘记使用可用值之一。
- 仔细阅读
class Server
中的评论(不是object
(。 - 仔细观察
val (inboundSink, broadcastOut) = ...
,并尝试弄清每种vals
的目的以及它们如何相互关系以及应用程序本身。了解他们的类型是什么会有所帮助
这些提示应该足以了解如何实现outgoingFlow
,即Source[ByteString, NotUsed]