Scala 中无限流的嵌套迭代



我有时发现自己想在 Scala 中对无限流执行嵌套迭代以进行推导,但指定循环终止条件可能有点棘手。有没有更好的方法来做这种事情?

我想到的用例是,我不一定预先知道我正在迭代的每个无限流需要多少元素(但显然我知道它不会是无限数字)。假设每个流的终止条件可能以某种复杂的方式取决于 for 表达式中其他元素的值。

最初的想法是尝试将流终止条件编写为for表达式中的iffilter 子句,但是在循环嵌套的无限流时会遇到麻烦,因为无法在第一个无限流上短路迭代,最终导致 OutOfMemoryError。我理解为什么会这样,考虑到表达式如何映射到映射flatMapwithFilter方法调用 - 我的问题是是否有更好的成语来做这种事情(也许根本不涉及理解)。

为了给出一个有点人为的例子来说明刚才描述的问题,请考虑以下(非常幼稚的)代码来生成数字 1 和 2 的所有配对:

val pairs = for {
i <- Stream.from(1) 
if i < 3 
j <- Stream.from(1) 
if j < 3
} 
yield (i, j)
pairs.take(2).toList 
// result: List[(Int, Int)] = List((1,1), (1,2)) 
pairs.take(4).toList
// 'hoped for' result: List[(Int, Int)] = List((1,1), (1,2), (2,1), (2,2))
// actual result:
//  java.lang.OutOfMemoryError: Java heap space
//      at scala.collection.immutable.Stream$.from(Stream.scala:1105)

显然,在这个简单的示例中,通过将if过滤器移动到原始流上的takeWhile方法调用中,可以轻松避免该问题,如下所示:

val pairs = for {
i <- Stream.from(1).takeWhile(_ < 3) 
j <- Stream.from(1).takeWhile(_ < 3) 
}    
yield (i, j)

但出于问题的目的,假设一个更复杂的用例,其中流终止条件不能轻易移动到流表达式本身。

一种可能性是将Stream包装到您自己的类中,该类以不同的方式处理filter,在本例中,takeWhile如下所示:

import scala.collection._
import scala.collection.generic._
class MyStream[+A]( val underlying: Stream[A] ) {
def flatMap[B, That](f: (A) => GenTraversableOnce[B])(implicit bf: CanBuildFrom[Stream[A], B, That]): That = underlying.flatMap(f);
def map[B, That](f: (A) ⇒ B)(implicit bf: CanBuildFrom[Stream[A], B, That]): That = underlying.map(f);
def filter(p: A => Boolean): Stream[A] = underlying.takeWhile(p);
//                                       ^^^^^^^^^^^^^^^^^^^^^^^^
}
object MyStream extends App {
val pairs = for {
i <- new MyStream(Stream.from(1))
if i < 3
j <- new MyStream(Stream.from(1))
if j < 3
} yield (i, j);
print(pairs.toList);
}

这将打印List((1,1), (1,2), (2,1), (2,2)).

我已经调整了 Petr 的建议,提出了我认为更普遍可用的解决方案,因为它没有限制if过滤器在理解中的定位(尽管它有更多的语法开销)。

这个想法再次是将底层流包含在包装器对象中,该对象委托flatMapmapfilter方法而不进行修改,但首先对底层流应用takeWhile调用,谓词为!isTruncated,其中isTruncated是属于包装对象的字段。在任何时候调用包装对象上的truncate都会翻转isTruncated标志并有效地终止对流的进一步迭代。 这在很大程度上依赖于以下事实:对基础流的takeWhile调用是延迟计算的,因此在迭代的后期阶段执行的代码可能会影响其行为。

缺点是您必须保留对流的引用,您希望能够通过将|| s.truncate附加到筛选器表达式(其中s是对包装流的引用)来截断您希望能够在迭代中截断的流。您还需要确保在每次通过流进行新迭代之前对包装器对象(或使用新的包装器对象)调用reset,除非您知道重复迭代每次的行为都相同。

import scala.collection._
import scala.collection.generic._
class TruncatableStream[A]( private val underlying: Stream[A]) {
private var isTruncated = false;
private var active = underlying.takeWhile(a => !isTruncated)
def flatMap[B, That](f: (A) => GenTraversableOnce[B])(implicit bf: CanBuildFrom[Stream[A], B, That]): That = active.flatMap(f);
def map[B, That](f: (A) => B)(implicit bf: CanBuildFrom[Stream[A], B, That]): That = active.map(f);
def filter(p: A => Boolean): Stream[A] = active.filter(p);
def truncate() = {
isTruncated = true
false
}
def reset() = {
isTruncated = false
active = underlying.takeWhile(a => !isTruncated)
}
}
val s1 = new TruncatableStream(Stream.from(1))
val s2 = new TruncatableStream(Stream.from(1))
val pairs = for {
i <- s1
// reset the nested iteration at the start of each outer iteration loop 
// (not strictly required here as the repeat iterations are all identical)
// alternatively, could just write: s2 = new TruncatableStream(Stream.from(1))  
_ = _s2.reset()      
j <- s2
if i < 3 || s1.truncate
if j < 3 || s2.truncate
} 
yield (i, j)
pairs.take(2).toList  // res1: List[(Int, Int)] = List((1,1), (1,2))
pairs.take(4).toList  // res2: List[(Int, Int)] = List((1,1), (1,2), (2,1), (2,2))

毫无疑问,这可以改进,但这似乎是解决问题的合理解决方案。

相关内容

  • 没有找到相关文章

最新更新