我如何创建一个TCP接收器,它只使用akka流来消费消息



我们在:akka-stream-experimental_2.11 1.0.

受例子启发

我们写了一个TCP接收器,如下所示:

def bind(address: String, port: Int, target: ActorRef)
          (implicit system: ActorSystem, actorMaterializer: ActorMaterializer): Future[ServerBinding] = {
    val sink = Sink.foreach[Tcp.IncomingConnection] { conn =>
      val serverFlow = Flow[ByteString]
        .via(Framing.delimiter(ByteString("n"), maximumFrameLength = 256, allowTruncation = true))
        .map(message => {
        target ? new Message(message); ByteString.empty
      })
      conn handleWith serverFlow
    }
    val connections = Tcp().bind(address, port)
    connections.to(sink).run()
  }

然而,我们的意图是让接收者根本不响应,只接收消息。(TCP消息发布者不关心响应)。

这可能吗?因为akka.stream.scaladsl.Tcp.IncomingConnection接受的流类型为:flow [ByteString, ByteString, Unit]

如果是,请给予指导。提前谢谢。

下面的一个尝试通过了我的单元测试,但不确定它是否是最好的主意:

def bind(address: String, port: Int, target: ActorRef)
          (implicit system: ActorSystem, actorMaterializer: ActorMaterializer): Future[ServerBinding] = {
    val sink = Sink.foreach[Tcp.IncomingConnection] { conn =>
      val targetSubscriber = ActorSubscriber[Message](system.actorOf(Props(new TargetSubscriber(target))))
      val targetSink = Flow[ByteString]
        .via(Framing.delimiter(ByteString("n"), maximumFrameLength = 256, allowTruncation = true))
        .map(Message(_))
        .to(Sink(targetSubscriber))
      conn.flow.to(targetSink).runWith(Source(Promise().future))
    }
    val connections = Tcp().bind(address, port)
    connections.to(sink).run()
  }

你做对了。为了保持在某个时刻关闭连接的可能性,您可能希望遵守承诺并稍后完成它。一旦完成了一个元素,这个元素就由源发布。然而,由于您不希望在连接上发布任何元素,您可以使用drop(1)来确保源永远不会发出任何元素。

这是你的例子的更新版本(未经测试):

val promise = Promise[ByteString]()
// this source will complete when the promise is fulfilled
// or it will complete with an error if the promise is completed with an error
val completionSource = Source(promise.future).drop(1)
completionSource  // only used to complete later
  .via(conn.flow) // I reordered the flow for better readability (arguably)
  .runWith(targetSink)
// to close the connection later complete the promise:
def closeConnection() = promise.success(ByteString.empty) // dummy element, will be dropped
// alternatively to fail the connection later, complete with an error
def failConnection() = promise.failure(new RuntimeException)

最新更新