只从play框架中执行一个webservice请求



我对play框架以及如何在Scala中使用它是个新手。我想为大Json对象建立一个代理。到目前为止,我已经实现了json存储在缓存中,如果不存在,则从Web服务请求。

然而,当两个请求以相同的端点为目标(webservice和路径相同)时,只应执行一个调用,另一个请求应等待第一个调用的结果。目前,它正在对每个请求执行对服务的调用。

这是我的控制器:

@Singleton
class CmsProxyController @Inject()(val cmsService: CmsProxyService) extends Controller {
implicit def ec : ExecutionContext =  play.api.libs.concurrent.Execution.defaultContext
def header(path: String) = Action.async { context =>
cmsService.head(path) map { title =>
Ok(Json.obj("title" -> title))
}
}
def teaser(path: String) = Action.async { context =>
cmsService.teaser(path) map { res =>
Ok(res).as(ContentTypes.JSON)
}
}
}

这就是服务:

trait CmsProxyService {
def head(path: String): Future[String]
def teaser(path: String): Future[String]
}
@Singleton
class DefaultCmsProxyService @Inject()(cache: CacheApi, cmsCaller:  CmsCaller) extends CmsProxyService {
private val BASE = "http://foo.com"
private val CMS = "bar/rest/"
private val log = Logger("application")
override def head(path: String) = { 
query(url(path), "$.payload[0].title")
}
override def teaser(path: String) = {
query(url(path), "$.payload[0].content.teaserText")
}
private def url(path: String) = s"${BASE}/${CMS}/${path}"
private def query(url: String, jsonPath: String): Future[String] = {
val key = s"${url}?${jsonPath}"
val payload = findInCache(key)
if (payload.isDefined) {
log.debug("found payload in cache")
Future.successful(payload.get)
} else {
val queried = parse(fetch(url)) map { json =>
JSONPath.query(jsonPath, json).as[String]
}
queried.onComplete(value => saveInCache(key, value.get))
queried
}
}
private def parse(fetched: Future[String]): Future[JsValue] = {
fetched map { jsonString =>
Json.parse(jsonString)
}
}
//retrieve the requested value from the cache or from ws
private def fetch(url: String): Future[String] = {
val body = findInCache(url)
if (body.isDefined) {
log.debug("found body in cache")
Future.successful(body.get)
} else {
cmsCaller.call(url)
}
}
private def findInCache(key: String): Option[String] = cache.get(key)
private def saveInCache(key: String, value: String, duration: FiniteDuration = 5.minutes) = cache.set(key, value, 5.minutes)
}

最后是对网络服务的调用:

trait CmsCaller {
def call(url: String): Future[String]
}
@Singleton
class DefaultCmsCaller @Inject()(wsClient: WSClient) extends CmsCaller {
import scala.concurrent.ExecutionContext.Implicits.global
//keep those futures which are currently requested
private val calls: Map[String, Future[String]] = TrieMap()
private val log = Logger("application")
override def call(url: String): Future[String] = {
if(calls.contains(url)) {
Future.successful("ok")
}else {
val f = doCall(url)
calls put(url, f)
f
}
}
//do the final call
private def doCall(url: String): Future[String] = {
val request = ws(url)
val response = request.get()
val mapped = mapResponse(response)
mapped.onComplete(_ => cmsCalls.remove(url))
mapped
}
private def ws(url: String): WSRequest = wsClient.url(url)
//currently executed with every request
private def mapResponse(f: Future[WSResponse]): Future[String] = {
f.onComplete(_ => log.debug("call completed"))
f map {res =>
val status = res.status
log.debug(s"ws called, response status: ${status}")
if (status == 200) {
res.body
} else {
""
}
}
}
}

我的问题是:如何只能执行一个对Web服务的调用?即使对同一目标有多个请求。我不想阻止它,另一个请求(不确定我在这里用的词是否正确)只会被告知已经有一个网络服务呼叫正在进行中。

头和预告片的请求(见控制器)应仅执行一次对Web服务的调用。

使用Scala惰性关键字的简单答案

def requestPayload(): String = ??? //do something
@Singleton
class SimpleCache @Inject() () {
lazy val result: Future[String] = requestPayload()
}
//Usage
@Singleton
class SomeController @Inject() (simpleCache: SimpleCache) {
def action = Action { req =>
simpleCache.result.map { result =>
Ok("success")
}
}
}

第一个请求将触发rest调用,所有其他请求都将使用缓存的结果。使用map和flatMap来连锁请求。

使用Actors的复杂答案

使用Actor对请求进行排队,并缓存第一个成功请求的json结果。所有其他请求都将读取第一个请求的结果。

case class Request(value: String)
class RequestManager extends Actor {
var mayBeResult: Option[String] = None
var reqs = List.empty[(ActorRef, Request)]
def receive = {
case req: Request => 
context become firstReq
self ! req
}
def firstReq = {
case req: Request =>
process(req).onSuccess { value =>
mayBeResult = Some(value)
context become done
self ! "clear_pending_reqs"
}
context become processing
}
def processing = {
case req: Request =>
//queue requests
reqs = reqs ++ List(sender -> req)
}
def done = {
case "clear_pending_reqs" => 
reqs.foreach { case (sender, _) =>
//send value to the sender
sender ! value.
}
}
}

处理第一个请求失败的情况。在上面的代码块中,如果第一个请求失败,那么actor将永远不会进入完成状态。

我通过同步服务中的缓存解决了问题。我不确定这是否是一个优雅的解决方案,但它对我有效。

trait SyncCmsProxyService {
def head(path: String): String
def teaser(path: String): String
}

@Singleton
class DefaultSyncCmsProxyService @Inject()(implicit cache: CacheApi, wsClient: WSClient) extends SyncCmsProxyService with UrlBuilder with CacheAccessor{
private val log = Logger("application")
override def head(path: String) = {
log.debug("looking for head ...")
query(url(path), "$.payload[0].title")
}
override def teaser(path: String) = {
log.debug("looking for teaser ...")
query(url(path), "$.payload[0].content.teaserText")
}
private def query(url: String, jsonPath: String) = {
val key = s"${url}?${jsonPath}"
val payload = findInCache(key)
if (payload.isDefined) {
payload.get
}else{
val json = Json.parse(body(url))
val queried = JSONPath.query(jsonPath, json).as[String]
saveInCache(key, queried)
}
}
private def body(url: String) = {
cache.synchronized {
val body = findInCache(url)
if (body.isDefined) {
log.debug("found body in cache")
body.get
} else {
saveInCache(url, doCall(url))
}
}
}
private def doCall(url : String): String = {
import scala.concurrent.ExecutionContext.Implicits.global
log.debug("calling...")
val req = wsClient.url(url).get()
val f = req map { res =>
val status = res.status
log.debug(s"endpoint called! response status: ${status}")
if (status == 200) {
res.body
} else {
""
}
}
Await.result(f, 15.seconds)
}
}

请注意,我在这里省略了特性UrlBuilder和CacheAccessor,因为它们很琐碎。

最新更新