我有一个WebSocket服务器在Play应用程序,我想把它移动到一个akka-http服务。我目前使用的是ActorFlow.actorRef
,这是Akka中不存在的Play的一部分。
当WebSocket被接受时,我订阅一个RabbitMQ队列,并将每条消息转发给WebSocket。当我收到来自WebSocket的消息时,我在本地处理一些,并将其他消息转发到RabbitMQ交换机。
我如何使用akka-http做同样的事情?我可以使用Sink.actorRef
创建一个接收器,并在那里处理入站消息,但是源呢?
我可以创建一个源与Source.actorRef
,但我如何获得访问演员发送消息时,它的物化?我应该使用其他类型的源来从RabbitMQ订阅的foreach
发送消息吗?
一旦我有了这些,看起来我可以使用Flow.fromSinkAndSource
返回所需的流。
我会解释你的要求
你有一个websocket端点需要
- 本地处理一些请求并将响应发送回客户端
- 转发一些请求到RabbitMQ
- 订阅rabbitMQ,从rabbitMQ转发消息到websocket客户端
我的建议是避免使用actor,除非必要,actor是强大的,但我发现流更容易阅读和推理,当它适合模型
下面是如何在没有actor的情况下将Source和Sink连接在一起的
def wshandler: Flow[Message, Message, _] = {
val rabbit = new Rabbit()
val src =
Source
.actorRef(100, OverflowStrategy.dropBuffer)
.mapMaterializedValue(ref => {
rabbit
.subscribe[String]("updates", queueName, topics) {
(body, topic) =>
log.debug("Received from rabbit")
// here you forward everything from rabbitmq to
// client using materialized actorRef
ref ! TextMessage(body)
}
})
// you need to implement your own pattern matching logic to differentiate between request to process
// locally and request to route to rabbitMQ
val sink = Sink.foreach[Message](m => m match {
case localReq => // your request response processing logic
case rabbitMq => // publish to rabbitMQ
})
Flow.fromSinkAndSource(sink, src)
}
这个片段没有实现你所展示的要点,希望它能解决你的问题