在Play Framework WebSockets中广播消息



我正在使用Concurrent.unicast[JsValue]在Play Framework WebSockets中推送消息,我想优化向多个用户发送相同的消息。是否有可能使用多个Concurrent.Channel广播消息?

简短回答

为每个用户维护单独的通道,并与用户关联组

长回答

package controllers
import akka.actor.Actor
import play.api.libs.iteratee.Enumerator
import play.api.libs.iteratee.Concurrent.Channel
import play.api.libs.iteratee.Concurrent
import play.api.Logger
import play.api.libs.iteratee.Iteratee
import play.api.libs.concurrent.Execution.Implicits.defaultContext
object AdvancedRoomMessages {
  case class Join(name: String)
  case class BroadcastGroup(msg: String, gName: String)
  case class BroadcastAll(msg: String)
  case class AddGroup(gName: String)
  case class RemoveGroup(gName: String)
  case class AddUserToGroup(userName: String, gName: String)
  case class removeUserFromGroup(userName: String, gName: String)
}

class AdvancedRoom extends Actor {
  import scala.collection.mutable._

  /**
   * common channel for communication
   */

  val (enumerator, channel) = Concurrent.broadcast[String]

  /**
   * every user has his own channel
   */
  val users = Map[String, (Enumerator[String],Channel[String])]()
  /**
   * users can be grouped
   */
  val groups = Map[String, Option[Set[String]]]()
  import AdvancedRoomMessages._
  def receive = {
    case Join(name) => {
      /**
       * join request from the user
       */
      if(users contains name) {
        /**
         * existing user
         */
        val iteratee = Iteratee.ignore[String]
        sender ! ((iteratee, users(name)._1))
      }else {
        /**
         * join request from a new user
         */
        /**
         * create new broadcast channel
         */
        val (enumerator, channel) = Concurrent.broadcast[String]
        users += ((name, (enumerator, channel)))
        val iteratee = Iteratee.foreach[String](msg => {
          //do something with the message
        }).map{ _ => {
          /**
           * user closed his websocket client, so remove the user
           * warning ... also remove the corresponding user name in groups
           */
          users(name)._2.eofAndEnd()
          users -= name
        }}
        sender ! (iteratee, enumerator)
      }
    }
    case BroadcastGroup(msg, gName) => {
      groups(gName) match {
        case Some(gMates) => {
          gMates.foreach { person => users(person)._2.push(msg)}
        }
        case None => Logger.info("empty group") //ignore sending message
      }
    }
    case BroadcastAll(msg) => {
      channel push msg
    }
    case AddGroup(gName: String) => {
      groups += ((gName, None))
    }
    case RemoveGroup(gName: String) => {
      groups -= gName
    }
    case AddUserToGroup(userName, gName) => {
      groups(gName) match {
        case Some(gMates) => gMates += userName
        case None => Set(userName)
      }
    }
  }
}
def filter(group_id: String) = Enumeratee.filter[JsValue]{  json: JsValue =>
  group_id == (json"group_id").as[String]
}

此过滤器必须应用为

def chat(group_id: String) = WebSocket.using[JsValue] { request =>
    val in = Iteratee.foreach[JsValue]{ msg=>
    public_channel.push(msg)
    }
    (in, public_enumerator &> filter(group_id))
}

从我的实验,Concurrent.broadcast不发送给每个人(一些不幸的命名也许?)以下是我所使用的,效果如预期。

package controllers
import play.api._
import play.api.mvc._
import play.api.libs.iteratee.Concurrent
import play.api.libs.iteratee.Iteratee
import play.api.libs.concurrent.Execution.Implicits.defaultContext
import scala.collection.mutable.{Set => MS}
import scala.concurrent._ 
object Application extends Controller {
  val c:MS[(Int, Concurrent.Channel[String])] = MS() // (channelID, Channel))
  def pushHello = c.foreach(_._2.push("hello")) // push to ALL channels
  def index = WebSocket.async[String] { _ => future{
        val (out,channel) = Concurrent.broadcast[String]          
        val channelID = scala.util.Random.nextInt
        c.add((channelID, channel))
        val in = Iteratee.foreach[String] {
          _ match {
            case any => channel.push("received:"+any) // push to current channel
          }
        }.map { _ => c.retain(x => x._1 != channelID) }
        (in, out)
    }
  }
}

简短回答


val (enumerator, channel) = Concurrent.broadcast[String]
use above thing globally
长回答


package controllers
import play.api._
import play.api.mvc._
import play.libs.Akka
import play.api.libs.concurrent.Execution.Implicits.defaultContext
import play.api.libs.iteratee.Iteratee
import play.api.libs.iteratee.Enumerator
import akka.actor.Props
import akka.pattern.ask
import akka.util.Timeout
import Room._
import scala.concurrent.duration._
object Application extends Controller {
  def index = Action {
    Ok(views.html.index("Your new application is ready."))
  }
  /**
   * get actor ref
   */
  val room = Akka.system.actorOf(Props[Room])
  /**
   * websocket action
   */
  def chat(name: String) = WebSocket.async[String](implicit request => {
    implicit val timeout = Timeout(1 seconds)
    (room ? Join(name)).mapTo[(Iteratee[String, _], Enumerator[String])]
  })
}

//Here is the actor
package controllers
import akka.actor.Actor
import play.api.libs.iteratee.Concurrent
import play.api.libs.iteratee.Iteratee
import play.api.libs.concurrent.Execution.Implicits.defaultContext
object Room {
  case class Join(name: String)
  case class Broadcast(msg: String)
  case object Quit
}
class Room extends Actor {
  /**
   * here is the meat 
   * Creating channel globally is important here
   * This can be accessed across all cases in receive method
   * pushing the message into this channel and returning this enumerator to all ,
   * broadcasts the message 
   */
  val (enumerator, channel) = Concurrent.broadcast[String]
  /**
   * keep track of users
   */
  val users = scala.collection.mutable.Set[String]()
  import Room._
  def receive = {
    case Join(name) => {
      /**
       * add new users
       */
      if(!users.contains(name)) {
        users += name
        val iteratee = Iteratee.foreach[String]{
          msg => {
            /**
             * process messages from users
             * here we are broadcast it to all other users
             */
            self ! Broadcast(msg)
          }
        }.map( _ => {
          /**
           * user closed his websocket. 
           * remove him from users
           */
          users -= name
        })
        /**
         * send iteratee, enumerator pair to the sender of join message
         */
        sender ! (iteratee, enumerator)
      } else {
        /**
         * already added users
         */
        val iteratee = Iteratee.ignore[String]
        /**
         * send iteratee and enumerator pair
         */
        sender ! (iteratee, enumerator)
      }
    }
    case Broadcast(msg) => channel push(msg)
    /**
     * close the common channel only when actor is stopped
     */
    case Quit => channel eofAndEnd(); context.stop(self)
  }
}

相关内容

  • 没有找到相关文章

最新更新