Akka Websocket-如何通过服务器关闭连接



,这是我的WebSocket服务器实现。

val route = get {
  pathEndOrSingleSlash {
    handleWebSocketMessages(websocketFlow)
  }
}
def websocketFlow: Flow[Message, Message, Any] =
  Flow[Message]
    .collect { case TextMessage.Strict(textMessage) => protocol.hydrate(textMessage) }
    .via(chatActorFlow(UUID.randomUUID()))
    .map(event => TextMessage.Strict(protocol.serialize(event)))

def chatActorFlow(connectionId: UUID) : Flow[Protocol.Message, Protocol.Event, Any] = {
  val sink = Flow[Protocol.Message]
    .map(msg => Protocol.SignedMessage(connectionId, msg))
    .to(Sink.actorRef(chatRef, Protocol.CloseConnection(connectionId)))
  val source = Source
      .mapMaterializedValue {
        actor : ActorRef => {
          chatRef ! Protocol.OpenConnection(actor, connectionId)
        }
      }
  Flow.fromSinkAndSource(sink, source)
}

我想知道,一旦chatRef发送类型ConnectionClosed的消息,是否有任何方法可以关闭连接?

下面的解决方案允许通过终止由Source.actorRef阶段实现的参与者从服务器侧删除连接。这只是通过向其发送PoisonPill来完成。

现在,我仍然不清楚您要如何在连接时识别"被禁止"的客户连接。如果您想随时使用任何其他策略来踢出客户,仍然可以应用相同的逻辑并将PoisonPill发送给他们自己的源演员。

object ChatApp extends App {
  implicit val system = ActorSystem("chat")
  implicit val executor: ExecutionContextExecutor = system.dispatcher
  implicit val materializer = ActorMaterializer()
  val route = get {
    pathEndOrSingleSlash {
      handleWebSocketMessages(websocketFlow)
    }
  }
  val maximumClients = 1
  class ChatRef extends Actor {
    override def receive: Receive = withClients(Map.empty[UUID, ActorRef])
    def withClients(clients: Map[UUID, ActorRef]): Receive = {
      case SignedMessage(uuid, msg) => clients.collect{
        case (id, ar) if id == uuid => ar ! msg
      }
      case OpenConnection(ar, uuid) if clients.size == maximumClients => ar ! PoisonPill
      case OpenConnection(ar, uuid) => context.become(withClients(clients.updated(uuid, ar)))
      case CloseConnection(uuid) => context.become(withClients(clients - uuid))
    }
  }
  object Protocol {
    case class SignedMessage(uuid: UUID, msg: String)
    case class OpenConnection(actor: ActorRef, uuid: UUID)
    case class CloseConnection(uuid: UUID)
  }
  val chatRef = system.actorOf(Props[ChatRef])
  def websocketFlow: Flow[Message, Message, Any] =
    Flow[Message]
      .mapAsync(1) {
        case TextMessage.Strict(s) => Future.successful(s)
        case TextMessage.Streamed(s) => s.runFold("")(_ + _)
        case b: BinaryMessage => throw new Exception("Binary message cannot be handled")
      }.via(chatActorFlow(UUID.randomUUID()))
      .map(TextMessage(_))
  def chatActorFlow(connectionId: UUID) : Flow[String, String, Any] = {
    val sink = Flow[String]
      .map(msg => Protocol.SignedMessage(connectionId, msg))
      .to(Sink.actorRef(chatRef, Protocol.CloseConnection(connectionId)))
    val source = Source.actorRef(16, OverflowStrategy.fail)
      .mapMaterializedValue {
        actor : ActorRef => {
          chatRef ! Protocol.OpenConnection(actor, connectionId)
        }
      }
    Flow.fromSinkAndSource(sink, source)
  }
  Http().bindAndHandle(route, "0.0.0.0", 8080)
    .map(_ => println(s"Started server..."))
}

最新更新