如何通过 Akka HTTP WebSocket 发送活动 WebSocket 连接的实时计数?(仅适用于 Akka S



如何通过 Akka WebSocket 发送活动 WebSocket 连接的实时计数?似乎Akka HTTP WebSockets改变了Akka Stream的prepend阶段的运行。

在下面的第一个代码块中,计数仅在是唯一连接时立即发送。在计数已超过 1 时连接的任何客户端在下一个客户端连接之前不会收到更新。

第二个代码块删除了 WebSocket 代码,以便仅使用 Akka Streams,并且每个流订阅都会立即收到一个计数,如 stdout 所示。

使用 Akka HTTP WebSocket

运行此代码并使用多个浏览器窗口访问 http://localhost:8080 在其浏览器控制台中显示此信息。

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.ws.TextMessage
import akka.http.scaladsl.model.{ContentTypes, HttpEntity}
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import akka.stream.scaladsl.{BroadcastHub, Flow, Keep, Sink, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy}
import scala.concurrent.ExecutionContextExecutor
object CounterFlow {
private implicit val actorSystem: ActorSystem = ActorSystem("CounterFlowTest")
private implicit val materializer: ActorMaterializer = ActorMaterializer()
private implicit val executionContext: ExecutionContextExecutor = actorSystem.dispatcher
val (counterSourceQueue, counterSource) =
Source.queue[Int](0, OverflowStrategy.backpressure)
.conflate(_ + _)
.scan(0)(_ + _)
.toMat(BroadcastHub.sink(bufferSize = 1))(Keep.both)
.run()
val clientFlow: Source[TextMessage.Strict, Unit] =
counterSource
.map(_.toString)
.map(TextMessage.Strict)
.prepend(Source.fromIterator(() => {
counterSourceQueue.offer(1)
Iterator.empty
}))
.watchTermination()((_, done) => done.foreach(_ => counterSourceQueue.offer(-1)))
def main(args: Array[String]): Unit = {
val route: Route =
pathEndOrSingleSlash {
get {
complete(HttpEntity(ContentTypes.`text/html(UTF-8)`,
"<h1>Check browser console for connection count</h1>" +
"<script>const ws = new WebSocket("ws://localhost:8080/ws");ws.onmessage = e => console.log(e.data);</script>"))
}
} ~
path("ws") {
handleWebSocketMessages(
Flow.fromSinkAndSourceCoupled(
Sink.ignore,
clientFlow))
}
Http().bindAndHandle(route, "0.0.0.0", 8080)
() // discard non-Unit value
}
}

仅限阿卡溪流

如 stdout 所示,每个客户端都会根据需要立即收到计数。

import akka.actor.ActorSystem
import akka.stream.scaladsl.{BroadcastHub, Keep, Sink, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy}
import scala.concurrent.ExecutionContext.Implicits.global
object CounterFlow {
private implicit val actorSystem: ActorSystem = ActorSystem("CounterFlowTEst")
private implicit val materializer: ActorMaterializer = ActorMaterializer()
val (counterSourceQueue, counterSource) = Source.queue[Int](0, OverflowStrategy.backpressure)
.conflate(_ + _)
.scan(0)(_ + _)
.toMat(BroadcastHub.sink)(Keep.both)
.run()
def subscribeClient(clientName: String): Unit =
counterSource
.prepend(Source.fromIterator(() => {
counterSourceQueue.offer(1)
Iterator.empty
}))
.watchTermination()((_, done) => done.foreach(_ => counterSourceQueue.offer(-1)))
.runWith(Sink.foreach(msg => println(s"$clientName $msg")))
.foreach(_ => println(s"$clientName Done"))
def main(args: Array[String]): Unit = {
subscribeClient("A")
subscribeClient("B")
Thread.sleep(500L)
subscribeClient("C")
}
}

通过将BroadcastHub.sink(bufferSize = 1)更改为BroadcastHub.sink(就像在 Akka Streams 版本中一样),每个客户端都会立即收到计数。

这是我测试的版本:

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.ws.TextMessage
import akka.http.scaladsl.model.{ContentTypes, HttpEntity}
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import akka.stream.scaladsl.{BroadcastHub, Flow, Keep, Sink, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy}
import scala.concurrent.ExecutionContextExecutor
object CounterFlow {
private implicit val actorSystem: ActorSystem = ActorSystem("CounterFlowTest")
private implicit val materializer: ActorMaterializer = ActorMaterializer()
private implicit val executionContext: ExecutionContextExecutor = actorSystem.dispatcher
val (counterSourceQueue, counterSource) =
Source.queue[Int](0, OverflowStrategy.backpressure)
.conflate(_ + _)
.scan(0)(_ + _)
.toMat(BroadcastHub.sink)(Keep.both)
.run()
val clientFlow: Source[TextMessage.Strict, Unit] =
counterSource
.map(_.toString)
.map(TextMessage.Strict)
.prepend(Source.fromIterator(() => {
counterSourceQueue.offer(1)
Iterator.empty
}))
.watchTermination()((_, done) => done.foreach(_ => counterSourceQueue.offer(-1)))
def main(args: Array[String]): Unit = {
val route: Route =
pathEndOrSingleSlash {
get {
complete(HttpEntity(ContentTypes.`text/html(UTF-8)`,
"<p>Connections: <span id='c'>?</span></p>" +
"<script>const ws = new WebSocket("ws://localhost:8080/ws");ws.onmessage = e => { console.log(e.data); document.getElementById('c').innerText = e.data; };</script>"))
}
} ~
path("ws") {
handleWebSocketMessages(
Flow.fromSinkAndSourceCoupled(
Sink.ignore,
clientFlow))
}
Http().bindAndHandle(route, "0.0.0.0", 8080)
() // discard non-Unit value
}
}

PS:很好的例子!

相关内容

  • 没有找到相关文章

最新更新