使用aleph处理websocket客户端消息



在学习Clojure的过程中,我目前面临着设置websocket社区的问题。经过许多不同的方法,我最终使用了aleph。

我设法实现的目标:

  • 处理新客户端连接
  • 处理客户端断开连接
  • 从服务器随意与客户端对话

我缺少的是每当连接的客户端之一通过websocket发送东西时触发处理程序函数的方法。

到目前为止我的代码:

(ns wonders7.core.handler
  (:require [compojure.core :refer :all]
            [compojure.route :as route]
            [ring.middleware.defaults :refer [wrap-defaults site-defaults]]
            [aleph.http :as http]
            [manifold.stream :as stream]
            [clojure.tools.logging :refer [info]]))
(defn uuid [] (str (java.util.UUID/randomUUID)))
(def clients (atom {}))
(defn ws-create-handler [req]
  (let [ws @(http/websocket-connection req)]
    (info "ws-create-handler")
    (stream/on-closed ws #(swap! clients dissoc ws))
    (swap! clients assoc ws (uuid))))
(defroutes app-routes
  (GET "/ws" [] ws-create-handler)
  (route/not-found "Not Found"))
(def app
  (wrap-defaults app-routes site-defaults))
(defn msg-to-client [[client-stream uuid]]
  (stream/put! client-stream "The server side says hello!"))
(defn msg-broadcast []
  (map #(msg-to-client %) @clients))
;(stream/take! (first (first @clients)))
;(http/start-server app {:port 8080})

我用注释掉的http/start server aleph调用启动Netty服务器。我还设法通过手动流/获取从客户端获取消息!电话(也被评论掉了)。我需要弄清楚的是,当有东西进来时,如何自动触发这种获取

提前感谢您的帮助!

您要查找的函数是(manifold.stream/consume callback stream),它将调用流中每条消息的回调。

在这个例子中,作者使用aleph中的recieve-allsiphon来完成一个非常相似的任务,我将大致解释为:

(let [chat (named-channel room (receive-all ch #(println "message: " %)))]
  (siphon chat ch)