在 Scala 中定期获取数据而无需 var/mutable 集合的方法



我想定期从某个来源获取数据,每小时一次。我这样做,因为数据获取需要花费大量时间,大约 10 分钟。因此,我缓存了这些数据。

我现在有这样的代码:

import java.util._
object Loader {
@volatile private var map: Map[SomeKey, SomeValue] = Map()
def start() {
val timer = new Timer()
val timerTask = new TimerTask {
override def run() {
reload()
}
}
val oneHour = 60 * 60 * 1000
timer.schedule(timerTask, oneHour)
}
def reload() {
map = loadMap()
}
// this method invocation costs a lot, so, I cache it in reload()
def loadMap(): Map[SomeKey, SomeValue] = ...
def getValue(key: SomeKey): Option[SomeValue] = map.get(key)
}

此外,我在main()函数中Loader.start()调用。

这很好用,但我想知道,有没有办法以更实用的方式编写它:摆脱 var 而不只使用可变集合?

组合 IO 库 scalaz-stream 成功地在此类用例中保持代码的可变性。首先,依赖关系:

libraryDependencies ++= Seq(
"org.scalaz" %% "scalaz-core" % "7.2.8",
"org.scalaz" %% "scalaz-concurrent" % "7.2.8",
"org.scalaz.stream" %% "scalaz-stream" % "0.8.6a"
)

我们从scalaz-concurrentscalaz-stream的一些进口开始:

import java.util.concurrent.ScheduledExecutorService
import scala.concurrent.duration._
import scalaz.concurrent.Task
import scalaz.stream.time._
import scalaz.stream._
import scalaz.stream.Process.Env
import scalaz.stream.ReceiveY._

假设我们有一个能够提取其快照的命令式数据源。为了演示,它还会在每次提取时自行更新:

trait DataSource[Key, Value] {
def loadMap: Map[Key, Value]
}
object DataSourceStub extends DataSource[Int, String] {
private var externalSource: Map[Int, String]  = Map(1 -> "a")
def loadMap: Map[Int, String] = {
val snapshot = externalSource
val key = snapshot.keys.max
val value = snapshot(key)
val (newKey, newValue) = (key + 1) -> (value + "a")
val newSource = snapshot + (newKey -> newValue)
externalSource = newSource
snapshot
}
}

现在,我们通过引入timer来开始Loader实现,该在启动时立即发出单元事件,然后每refreshEvery秒发出一次。然后,通过将数据接收Task映射到每个事件并在流中评估它们,可以获得报告我们cacheStates的事件流。困难的部分是换向:我们需要将Request流(使用缓存中的数据执行某些操作的函数)与我们的周期性快照流交错。scalaz-stream提供了一个流换向工具,Wye,它允许我们说明处理来自输入流的事件的顺序。我们需要一个初始缓存快照才能使用,所以我们从wye.receiveL开始,移动到具有初始缓存状态的handleImpl。现在,我们可以通过以下方式接收任何receiveBoth事件:

  • 如果是缓存快照更新,我们会重复执行它而不产生输出;
  • 如果是请求,我们给它当前缓存状态,并将生成的Task发出到输出中,也会在当前状态下重复出现;
  • 如果其中一个输入流终止,我们将停止处理。

剩下的唯一事情是将我们的输入与handleWye连接起来,并将处理任务的副作用包含在流中,我们在processRequests中这样做。

class Loader[Key, Value](dataSource: DataSource[Key, Value], refreshEvery: Duration) {
type CacheState = Map[Key, Value]
type Request = CacheState => Task[Unit]
type ReaderEnv = Env[CacheState, Request]
implicit val scheduler: ScheduledExecutorService = DefaultScheduler
private val timer: Process[Task, Unit] =
Process.emit(()) ++ awakeEvery(refreshEvery).map(_ => ())
private val cacheStates: Process[Task, CacheState] =
timer.evalMap(_ => Task(dataSource.loadMap))
private val handle: Wye[CacheState, Request, Task[Unit]] = {
def handleImpl(current: CacheState): Wye[CacheState, Request, Task[Unit]] = {
import wye._
import Process._
receiveBoth {
case ReceiveL(i) => handleImpl(i)
case ReceiveR(i) => emit(i(current)) ++ handleImpl(current)
case HaltOne(rsn) => Halt(rsn)
}
}
wye.receiveL[CacheState, Request, Task[Unit]](handleImpl)
}
def processRequests(requests: Process[Task, Request]): Process[Task, Unit] =
cacheStates.wye(requests)(handle).eval
}

让我们通过按最大 id(每个 100 毫秒)向数据加载器发出 100 个数据请求来测试我们的数据加载器,同时每秒执行刷新:

object TestStreamBatching {
private val loader = new Loader(DataSourceStub, 1.second)
private def request(cache: loader.CacheState): Task[Unit] = Task {
Thread.sleep(100)
val key = cache.keys.max
val value = cache(key)
println(value)
}
private val requests: Process[Task, loader.Request] =
Process.unfold(100)(s => if(s > 0) Some((request, s - 1)) else None)
def main(args: Array[String]): Unit = {
loader.processRequests(requests).run.unsafePerformSync
}
}

通过运行它,您可以看到一个由'a'个字母组成的阶梯,每秒增加其 riser 大小,最终在 100 个输出后终止。

最新更新