我们在:akka-stream-experimental_2.11 1.0.
受例子启发
我们写了一个TCP接收器,如下所示:
def bind(address: String, port: Int, target: ActorRef)
(implicit system: ActorSystem, actorMaterializer: ActorMaterializer): Future[ServerBinding] = {
val sink = Sink.foreach[Tcp.IncomingConnection] { conn =>
val serverFlow = Flow[ByteString]
.via(Framing.delimiter(ByteString("n"), maximumFrameLength = 256, allowTruncation = true))
.map(message => {
target ? new Message(message); ByteString.empty
})
conn handleWith serverFlow
}
val connections = Tcp().bind(address, port)
connections.to(sink).run()
}
然而,我们的意图是让接收者根本不响应,只接收消息。(TCP消息发布者不关心响应)。
这可能吗?因为akka.stream.scaladsl.Tcp.IncomingConnection接受的流类型为:flow [ByteString, ByteString, Unit]
如果是,请给予指导。提前谢谢。
下面的一个尝试通过了我的单元测试,但不确定它是否是最好的主意:
def bind(address: String, port: Int, target: ActorRef)
(implicit system: ActorSystem, actorMaterializer: ActorMaterializer): Future[ServerBinding] = {
val sink = Sink.foreach[Tcp.IncomingConnection] { conn =>
val targetSubscriber = ActorSubscriber[Message](system.actorOf(Props(new TargetSubscriber(target))))
val targetSink = Flow[ByteString]
.via(Framing.delimiter(ByteString("n"), maximumFrameLength = 256, allowTruncation = true))
.map(Message(_))
.to(Sink(targetSubscriber))
conn.flow.to(targetSink).runWith(Source(Promise().future))
}
val connections = Tcp().bind(address, port)
connections.to(sink).run()
}
你做对了。为了保持在某个时刻关闭连接的可能性,您可能希望遵守承诺并稍后完成它。一旦完成了一个元素,这个元素就由源发布。然而,由于您不希望在连接上发布任何元素,您可以使用drop(1)
来确保源永远不会发出任何元素。
这是你的例子的更新版本(未经测试):
val promise = Promise[ByteString]()
// this source will complete when the promise is fulfilled
// or it will complete with an error if the promise is completed with an error
val completionSource = Source(promise.future).drop(1)
completionSource // only used to complete later
.via(conn.flow) // I reordered the flow for better readability (arguably)
.runWith(targetSink)
// to close the connection later complete the promise:
def closeConnection() = promise.success(ByteString.empty) // dummy element, will be dropped
// alternatively to fail the connection later, complete with an error
def failConnection() = promise.failure(new RuntimeException)