将WebSocket从Play移动到Akka HTTP



我有一个WebSocket服务器在Play应用程序,我想把它移动到一个akka-http服务。我目前使用的是ActorFlow.actorRef,这是Akka中不存在的Play的一部分。

当WebSocket被接受时,我订阅一个RabbitMQ队列,并将每条消息转发给WebSocket。当我收到来自WebSocket的消息时,我在本地处理一些,并将其他消息转发到RabbitMQ交换机。

我如何使用akka-http做同样的事情?我可以使用Sink.actorRef创建一个接收器,并在那里处理入站消息,但是源呢?

我可以创建一个源与Source.actorRef,但我如何获得访问演员发送消息时,它的物化?我应该使用其他类型的源来从RabbitMQ订阅的foreach发送消息吗?

一旦我有了这些,看起来我可以使用Flow.fromSinkAndSource返回所需的流。

我会解释你的要求

你有一个websocket端点需要

  1. 本地处理一些请求并将响应发送回客户端
  2. 转发一些请求到RabbitMQ
  3. 订阅rabbitMQ,从rabbitMQ转发消息到websocket客户端

我的建议是避免使用actor,除非必要,actor是强大的,但我发现流更容易阅读和推理,当它适合模型

下面是如何在没有actor的情况下将Source和Sink连接在一起的

def wshandler: Flow[Message, Message, _] = {
    val rabbit = new Rabbit()
    val src =
      Source
        .actorRef(100, OverflowStrategy.dropBuffer)
        .mapMaterializedValue(ref => {
          rabbit
            .subscribe[String]("updates", queueName, topics) { 
              (body, topic) =>
                log.debug("Received from rabbit")
                // here you forward everything from rabbitmq to    
                // client using materialized actorRef
                ref ! TextMessage(body)
            }
        })
    // you need to implement your own pattern matching logic to differentiate between request to process 
    // locally and request to route to rabbitMQ
    val sink = Sink.foreach[Message](m => m match {
      case localReq => // your request response processing logic
      case rabbitMq => // publish to rabbitMQ
    })
    Flow.fromSinkAndSource(sink, src)
  }

这个片段没有实现你所展示的要点,希望它能解决你的问题

相关内容

  • 没有找到相关文章

最新更新