我是AKKA Streams的新手。(使用 Akka v 2.4.4)我正在尝试创建一个可以将新通知推送到订阅客户端的 Websocket。我的策略是实现一个 ActorPublisher,稍后我可以向其发送消息,然后将其推送到客户端。
首先,我复制了一个ActorPublisher的示例:
case class Tick()
class TickActor extends ActorPublisher[Int] {
import scala.concurrent.duration._
implicit val ec = context.dispatcher
val tick = context.system.scheduler.schedule(1 second, 1 second, self, `Tick())`
var cnt = 0
var buffer = Vector.empty[Int]
override def receive: Receive = {
case Tick() => {
cnt = cnt + 1
if (buffer.isEmpty && totalDemand > 0) {
onNext(cnt)
}
else {
buffer :+= cnt
if (totalDemand > 0) {
val (use,keep) = buffer.splitAt(totalDemand.toInt)
buffer = keep
use foreach onNext
}
}
}
}
override def postStop() = tick.cancel()
}
我的问题是我不知道如何使用它作为来源。
我尝试了以下方法:
val source: Source[Strict, ActorRef] = Source.actorPublisher(Props[TickActor]).map(i => TextMessage(i.toString))
optionalHeaderValueByType[akka.http.scaladsl.model.ws.UpgradeToWebSocket]() {
case Some(upgrade) =>
complete(
upgrade.handleMessagesWithSinkSource(Sink.ignore,source))
case None =>
reject(akka.http.scaladsl.server.ExpectedWebSocketRequestRejection)
}
但是当我与客户端连接时,我得到以下 ClassCastException:java.lang.ClassCastException:java.lang.Integer 不能强制转换为 scala.runtime.Nothing$
如果我将源更改为:
val src: Source[Strict, NotUsed] = Source.fromIterator(() => Iterator.continually(ThreadLocalRandom.current.nextInt()))
.filter(i => i > 0 && i % 2 == 0).map(i => TextMessage(i.toString))
它运行得很好。
我在连接点时有点挣扎,所以希望你能引导我朝着正确的方向前进。
我尝试了您的示例并能够重现该问题。我只做了一个更改来解决问题。这是添加的类型参数,现在有意义,因为在 akka 流中的某个地方,有一个类似 elem.asInstanceOf[T]
.因此,当 actorPublisher 中缺少该类型时,该类型被推断为Nothing
val source = Source.actorPublisher[Int](Props[TickActor]).map(i => TextMessage(i.toString))