我正在尝试构建一个协程框架,通过并行执行每个数据相关函数来实现批处理数据抓取。以下是我目前为止的内容:http://pastie.org/7147798
-
这行不通
def get(id: Long) = reset { // Is it not already cached? if (!cached.isDefinedAt(id)) { // Store the ID we want to fetch. queued += id // Come back later... shift { fetch[Object]() } : Seq[Any] @cps[ExecState[Object]] } // We should have the ID fetched now. Result(cached(id)) }
我得到以下错误
这是ashoat@ashoatmbp [~/project]# scala -P:continuations:enable Loader.scala /Users/ashoat/project/Loader.scala:134: error: type mismatch; found : Unit required: Any @util.continuations.package.cps[Main.$anon.Loader.ExecState[Main.$anon.Loader.Object]] if (!cached.isDefinedAt(id)) { ^ one error found
def get(id: Long) = reset { // Is it not already cached? if (!cached.isDefinedAt(id)) { // Store the ID we want to fetch. queued += id // Come back later... shift { fetch[Object]() } : Seq[Any] @cps[ExecState[Object]] // We should have the ID fetched now. Result(cached(id)) } else { // We should have the ID fetched now. Result(cached(id)) } }
-
这行不通
val getFive = reset { if (true) { Result(5) } else { val seq: Seq[Any] = shift { fetch[Int](Object.get(15181990251L)) } val Seq(obj: Object) = seq Result(obj.fields("test").toInt) } }
我得到以下错误
这是ashoat@ashoatmbp [~/project]# scala -P:continuations:enable Loader.scala /Users/ashoat/project/Loader.scala:170: error: cannot cps-transform expression new this.Loader.Result[Int](5): type arguments [this.Loader.Result[Int],this.Loader.Result[Int],Nothing] do not conform to method shiftUnit's type parameter bounds [A,B,C >: B] Result(5)// : Result[Int] @cps[Result[Int]] ^ one error found
val getFive = reset { if (true) { Result(5) : Result[Int] @cps[Result[Int]] } else { val seq: Seq[Any] = shift { fetch[Int](Object.get(15181990251L)) } val Seq(obj: Object) = seq Result(obj.fields("test").toInt) } }
但是我得到以下警告
ashoat@ashoatmbp [~/project]# scala -P:continuations:enable Loader.scala /Users/ashoat/project/Loader.scala:170: warning: expression (new this.Loader.Result[Int](5): this.Loader.Result[Int]) is cps-transformed unexpectedly Result(5) : Result[Int] @cps[Result[Int]] ^ one warning found 8
虽然我自己仍然不太理解延续,尽我所能,在你的例子中的关键问题是,你的代码并不总是提供一个shift
到reset
。
编译器期望在reset
中找到一些嵌套的shift
。然后它将CPS将shift
转换为ControlContext][A, B, C]
,并将shift
之后发生的代码转换为ControlContext.map
调用。
因为您有一个if
语句,在else分支的情况下,没有嵌套的shift
:
reset {
if (false) {
shift { ... }
}
Result(cached(id)) // no shift
}
也一样reset {
if (false) {
shift { ... }
} else {
Result(cached(id)) // no shift
}
}
不能转换为有效的CPS代码。
似乎你可以在if分支内进行重置,或者向else分支提供一个微不足道的shift语句:
if (!cached.isDefinedAt(id)) reset {
shift { ... }
Result(cached(id))
} else {
Result(cached(id))
}
// or
reset {
if (!cached.isDefinedAt(id)) {
shift { ... }
Result(cached(id))
} else {
shift[Result[Object], ExecState[Object], ExecState[Object]] { k =>
Result(cached(id))
}
}
}
编辑:它似乎有一些不一致的cps插件如何推断类型。例如:
var b = false
def test[A](a: A) = reset {
if (b) {
a
} else {
shift{ (k: Unit => A) => k() }
a
}
}
使用-Xprint:selectivecps
选项运行编译显示编译器推断类型为Reset[A, Nothing]
,那么运行代码将在运行时产生错误。如果 If 反转为:
var b = false
def test[A](a: A) = reset {
if (b) {
shift{ (k: Unit => A) => k() }
a
} else {
a
}
}
则编译器正确推断出reset[A, A]
。如果我像test[A](a: A) = reset[A, A] {
一样为reset
提供类型参数,那么它在两种情况下都有效。
也许将类型参数指定为reset
和shift
,而不是使用Result(5)
,使用shiftUnit[A, B, C]
方法将有助于减少不一致。