我正在尝试将大型Scala + Akka + PlayMini应用程序与外部REST API接口。这个想法是定期轮询(基本上每 1 到 10 分钟(一个根 URL,然后爬取子级 URL 以提取数据,然后将其发送到消息队列。
我想出了两种方法:
第一种方式
创建参与者层次结构以匹配 API 的资源路径结构。在谷歌纬度的情况下,这意味着,例如
- 演员"纬度/v1/当前位置"轮询 https://www.googleapis.com/latitude/v1/currentLocation
- 演员"纬度/v1/位置"轮询 https://www.googleapis.com/latitude/v1/location
- 演员"纬度/v1/位置/1"轮询 https://www.googleapis.com/latitude/v1/location/1
- 演员"纬度/v1/位置/2"轮询 https://www.googleapis.com/latitude/v1/location/2
- 演员"纬度/v1/位置/3"投票 https://www.googleapis.com/latitude/v1/location/3
- 等。
在这种情况下,每个参与者负责定期轮询其关联的资源,以及为下一级路径资源创建/删除子参与者(即参与者"latitude/v1/location"为它通过轮询 https://www.googleapis.com/latitude/v1/location 的所有位置创建参与者 1、2、3 等(。
第二种方式
创建一个相同的轮询参与者池,这些轮询参与者接收由路由器负载平衡的轮询请求(包含资源路径(,轮询 URL 一次,执行一些处理,并计划轮询请求(针对下一级资源和轮询的 URL(。在Google Latitude中,这意味着例如:
1 个路由器,n 个轮询器演员。对 https://www.googleapis.com/latitude/v1/location 的初始轮询请求会导致对 https://www.googleapis.com/latitude/v1/location/1、https://www.googleapis.com/latitude/v1/location/2 等的几个新的(立即(轮询请求,以及对同一资源(即 https://www.googleapis.com/latitude/v1/location(的一个(延迟(轮询请求。
我已经实现了这两种解决方案,并且无法立即观察到任何相关的性能差异,至少对于我感兴趣的 API 和轮询频率而言。我发现第一种方法比第二种方法(我需要scheduleOnce(...((更容易推理,并且可能更容易与system.scheduler.schedule(...(一起使用。此外,假设资源嵌套在多个级别并且生存期有点短(例如,每次轮询之间可能会添加/删除多个资源(,akka 的生命周期管理使得在第一种情况下很容易杀死整个分支。第二种方法应该(理论上(更快,代码更容易编写。
我的问题是:
- 哪种方法似乎是最好的(在性能、可扩展性、代码复杂性等方面(?
- 您认为这两种方法的设计(尤其是第一种方法(有什么问题吗?
- 有没有人尝试过实施类似的东西?它是怎么做到的?
谢谢!
为什么不创建一个主轮询器,然后按计划启动异步资源请求?
我不是使用 Akka 的专家,但我试了一下:
循环访问要提取的资源列表的轮询器对象:
import akka.util.duration._
import akka.actor._
import play.api.Play.current
import play.api.libs.concurrent.Akka
object Poller {
val poller = Akka.system.actorOf(Props(new Actor {
def receive = {
case x: String => Akka.system.actorOf(Props[ActingSpider], name=x.filter(_.isLetterOrDigit)) ! x
}
}))
def start(l: List[String]): List[Cancellable] =
l.map(Akka.system.scheduler.schedule(3 seconds, 3 seconds, poller, _))
def stop(c: Cancellable) {c.cancel()}
}
异步读取资源并触发更多异步读取的执行组件。 您可以将消息调度放在一个时间表上,而不是立即致电,如果它更友好:
import akka.actor.{Props, Actor}
import java.io.File
class ActingSpider extends Actor {
import context._
def receive = {
case name: String => {
println("reading " + name)
new File(name) match {
case f if f.exists() => spider(f)
case _ => println("File not found")
}
context.stop(self)
}
}
def spider(file: File) {
io.Source.fromFile(file).getLines().foreach(l => {
val k = actorOf(Props[ActingSpider], name=l.filter(_.isLetterOrDigit))
k ! l
})
}
}