我正在使用Concurrent.unicast[JsValue]
在Play Framework WebSockets中推送消息,我想优化向多个用户发送相同的消息。是否有可能使用多个Concurrent.Channel
广播消息?
简短回答
为每个用户维护单独的通道,并与用户关联组
长回答package controllers
import akka.actor.Actor
import play.api.libs.iteratee.Enumerator
import play.api.libs.iteratee.Concurrent.Channel
import play.api.libs.iteratee.Concurrent
import play.api.Logger
import play.api.libs.iteratee.Iteratee
import play.api.libs.concurrent.Execution.Implicits.defaultContext
object AdvancedRoomMessages {
case class Join(name: String)
case class BroadcastGroup(msg: String, gName: String)
case class BroadcastAll(msg: String)
case class AddGroup(gName: String)
case class RemoveGroup(gName: String)
case class AddUserToGroup(userName: String, gName: String)
case class removeUserFromGroup(userName: String, gName: String)
}
class AdvancedRoom extends Actor {
import scala.collection.mutable._
/**
* common channel for communication
*/
val (enumerator, channel) = Concurrent.broadcast[String]
/**
* every user has his own channel
*/
val users = Map[String, (Enumerator[String],Channel[String])]()
/**
* users can be grouped
*/
val groups = Map[String, Option[Set[String]]]()
import AdvancedRoomMessages._
def receive = {
case Join(name) => {
/**
* join request from the user
*/
if(users contains name) {
/**
* existing user
*/
val iteratee = Iteratee.ignore[String]
sender ! ((iteratee, users(name)._1))
}else {
/**
* join request from a new user
*/
/**
* create new broadcast channel
*/
val (enumerator, channel) = Concurrent.broadcast[String]
users += ((name, (enumerator, channel)))
val iteratee = Iteratee.foreach[String](msg => {
//do something with the message
}).map{ _ => {
/**
* user closed his websocket client, so remove the user
* warning ... also remove the corresponding user name in groups
*/
users(name)._2.eofAndEnd()
users -= name
}}
sender ! (iteratee, enumerator)
}
}
case BroadcastGroup(msg, gName) => {
groups(gName) match {
case Some(gMates) => {
gMates.foreach { person => users(person)._2.push(msg)}
}
case None => Logger.info("empty group") //ignore sending message
}
}
case BroadcastAll(msg) => {
channel push msg
}
case AddGroup(gName: String) => {
groups += ((gName, None))
}
case RemoveGroup(gName: String) => {
groups -= gName
}
case AddUserToGroup(userName, gName) => {
groups(gName) match {
case Some(gMates) => gMates += userName
case None => Set(userName)
}
}
}
}
def filter(group_id: String) = Enumeratee.filter[JsValue]{ json: JsValue =>
group_id == (json"group_id").as[String]
}
此过滤器必须应用为
def chat(group_id: String) = WebSocket.using[JsValue] { request =>
val in = Iteratee.foreach[JsValue]{ msg=>
public_channel.push(msg)
}
(in, public_enumerator &> filter(group_id))
}
从我的实验,Concurrent.broadcast
不发送给每个人(一些不幸的命名也许?)以下是我所使用的,效果如预期。
package controllers
import play.api._
import play.api.mvc._
import play.api.libs.iteratee.Concurrent
import play.api.libs.iteratee.Iteratee
import play.api.libs.concurrent.Execution.Implicits.defaultContext
import scala.collection.mutable.{Set => MS}
import scala.concurrent._
object Application extends Controller {
val c:MS[(Int, Concurrent.Channel[String])] = MS() // (channelID, Channel))
def pushHello = c.foreach(_._2.push("hello")) // push to ALL channels
def index = WebSocket.async[String] { _ => future{
val (out,channel) = Concurrent.broadcast[String]
val channelID = scala.util.Random.nextInt
c.add((channelID, channel))
val in = Iteratee.foreach[String] {
_ match {
case any => channel.push("received:"+any) // push to current channel
}
}.map { _ => c.retain(x => x._1 != channelID) }
(in, out)
}
}
}
简短回答
val (enumerator, channel) = Concurrent.broadcast[String]
use above thing globally
长回答package controllers
import play.api._
import play.api.mvc._
import play.libs.Akka
import play.api.libs.concurrent.Execution.Implicits.defaultContext
import play.api.libs.iteratee.Iteratee
import play.api.libs.iteratee.Enumerator
import akka.actor.Props
import akka.pattern.ask
import akka.util.Timeout
import Room._
import scala.concurrent.duration._
object Application extends Controller {
def index = Action {
Ok(views.html.index("Your new application is ready."))
}
/**
* get actor ref
*/
val room = Akka.system.actorOf(Props[Room])
/**
* websocket action
*/
def chat(name: String) = WebSocket.async[String](implicit request => {
implicit val timeout = Timeout(1 seconds)
(room ? Join(name)).mapTo[(Iteratee[String, _], Enumerator[String])]
})
}
//Here is the actor
package controllers
import akka.actor.Actor
import play.api.libs.iteratee.Concurrent
import play.api.libs.iteratee.Iteratee
import play.api.libs.concurrent.Execution.Implicits.defaultContext
object Room {
case class Join(name: String)
case class Broadcast(msg: String)
case object Quit
}
class Room extends Actor {
/**
* here is the meat
* Creating channel globally is important here
* This can be accessed across all cases in receive method
* pushing the message into this channel and returning this enumerator to all ,
* broadcasts the message
*/
val (enumerator, channel) = Concurrent.broadcast[String]
/**
* keep track of users
*/
val users = scala.collection.mutable.Set[String]()
import Room._
def receive = {
case Join(name) => {
/**
* add new users
*/
if(!users.contains(name)) {
users += name
val iteratee = Iteratee.foreach[String]{
msg => {
/**
* process messages from users
* here we are broadcast it to all other users
*/
self ! Broadcast(msg)
}
}.map( _ => {
/**
* user closed his websocket.
* remove him from users
*/
users -= name
})
/**
* send iteratee, enumerator pair to the sender of join message
*/
sender ! (iteratee, enumerator)
} else {
/**
* already added users
*/
val iteratee = Iteratee.ignore[String]
/**
* send iteratee and enumerator pair
*/
sender ! (iteratee, enumerator)
}
}
case Broadcast(msg) => channel push(msg)
/**
* close the common channel only when actor is stopped
*/
case Quit => channel eofAndEnd(); context.stop(self)
}
}