akka 计时器给出消息 取消计时器 [消息] 与生成 [583]



我正在使用 akka 计时器

我正在使用推特流,我正在尝试在 5 秒内获取推文数量 这是我的代码

case class PerSecond(tweet:Tweet)
case class TweetPerSecondCount(tweet:Tweet)
class TweetPerSecondActor extends Actor with Timers{
var counter=0
def receive: PartialFunction[Any, Unit] = {
case PerSecond(tweet) =>
log.info("Actor TweetPerSecondActor recevied the message PerSecond")
timers.startPeriodicTimer("perSecond", TweetPerSecondCount(tweet), 5.second)
case TweetPerSecondCount(tweet) =>
log.info("Actor TweetPerSecondActor recevied the message TweetPerSecondCount")
log.info("got the tweet {}",getCounter+"in 5 seconds")
case message =>
log.warn("Actor TweetPerSecondActor: Unhandled message received : {}", message)
unhandled(message)
}
}

在 Play 框架的控制器操作中,我正在从推特流中获取推文对象(连续流而不弯腰(

class Mycontroller extends Controller {
val tweetPerSecondActor = system.actorOf......//create actor

def tweetAveragePerSecond = Action {
log.debug("in the action tweetAveragePerSecond")
def getTweet: PartialFunction[StreamingMessage, Unit] = {
case tweet: Tweet =>
val future = ask(tweetPerSecondActor, PerSecond(tweet))
}
val streaming: Future[TwitterStream] = handleTwitterStreamClient.getStreamingCLient.sampleStatuses(stall_warnings = true)(getTweet)
Ok("tweet average per second")
}
}

当我到达路线时,日志显示

TweetPerSecondActor INFO - Actor TweetPerSecondActor recevied the message PerSecond
16:42:44.335 28939 [ArteciateActorSystem-akka.actor.default-dispatcher-5] TimerScheduler DEBUG - Cancel timer [perSecond] with generation [758]
16:42:44.335 28939 [ArteciateActorSystem-akka.actor.default-dispatcher-5] TimerScheduler DEBUG - Start timer [perSecond] with generation [759]
6:42:44.335 28939 [TwitterActorSystem-akka.actor.default-dispatcher-6] TweetPerSecondActor INFO - Actor TweetPerSecondActor recevied the message PerSecond
16:42:44.335 28939 [TwitterActorSystem-akka.actor.default-dispatcher-5] TimerScheduler DEBUG - Cancel timer [perSecond] with generation [758]
16:42:44.335 28939 [TwitterActorSystem-akka.actor.default-dispatcher-5] TimerScheduler DEBUG - Start timer [perSecond] with generation [759]

如果我传递虚拟字符串值而不是运行流并传递 Twitter 对象,计时器工作正常 像下面一样 案例类 Persecond(推文:字符串( 案例类 TweetPerSecondCount(tweet:String(

class TweetPerSecondActor extends Actor with Timers{
var counter=0
def receive: PartialFunction[Any, Unit] = {
case PerSecond(tweet) =>
log.info("Actor TweetPerSecondActor recevied the message PerSecond")
timers.startPeriodicTimer("perSecond", TweetPerSecondCount(tweet), 5.second)
case TweetPerSecondCount(tweet) =>
log.info("Actor TweetPerSecondActor recevied the message TweetPerSecondCount")
log.info("got the tweet {}",getCounter+"in 5 seconds")
case message =>
log.warn("Actor TweetPerSecondActor: Unhandled message received : {}", message)
unhandled(message)
}
}
class Mycontroller extends Controller {
val tweetPerSecondActor = system.actorOf......//create actor

def tweetAveragePerSecond = Action {
log.debug("in the action tweetAveragePerSecond")
val future = ask(tweetPerSecondActor, PerSecond("dummy value"))

Ok("tweet average per second")
}
}

Akka 中的计时器是键控的(在您的情况下,键是"perSecond"(,API 保证

每个计时器都有一个键,

如果启动具有相同键的新计时器,则取消前一个

计时器

因此,每次调用getTweet函数(鉴于它是为了效果而不是为了值而调用,我更喜欢"过程",但这可能是特殊的(时,计时器都会被取消。

根据您要完成的任务,解决方案可能包括:

  • 每个请求的参与者。 您需要让请求返回一些内容,这些内容可以传递给未来的请求,以便从参与者那里获取信息。 参与者生命周期管理也可能是可取的。
  • 为整个控制器保留一个执行组件,但每个请求使用唯一的计时器键。 如果这些是周期性计时器,则必须跟踪哪些计时器键处于活动状态,并取消不再需要的计时器键(保持任意多个定期计时器运行最终可能会降低性能(

最新更新