如何在Scala中使用Stream.conf编写无泄漏的尾部递归函数



在编写对Stream(s)进行操作的函数时,有不同的递归概念。第一个简单意义在编译器级别上不是递归的,因为尾部如果没有立即求值,那么函数会立即返回,但返回的流是递归的:

final def simpleRec[A](as: Stream[A]): Stream[B] = 
if (a.isEmpty) Stream.empty              
else someB(a.head) #:: simpleRec(a.tail) 

上面的递归概念不会引起任何问题。第二个是编译器级别上真正的尾部递归:

@tailrec
final def rec[A](as: Stream[A]): Stream[B] = 
if (a.isEmpty) Stream.empty              // A) degenerated
else if (someCond) rec(a.tail)           // B) tail recursion
else someB(a.head) #:: rec(a.tail)       // C) degenerated

这里的问题是,即使没有实际执行调用,编译器也会将C)情况检测为非tailrec调用。这可以通过将流尾分解为一个辅助函数来避免:

@tailrec
final def rec[A](as: Stream[A]): Stream[B] = 
if (a.isEmpty) Stream.empty              
else if (someCond) rec(a.tail)          // B)
else someB(a.head) #:: recHelp(a.tail)  
@tailrec
final def recHelp[A](as: Stream[A]): Stream[B] = 
rec(as)

在编译时,这种方法最终会导致内存泄漏。由于尾部递归rec最终是从recHelp函数调用的,因此recHelp函数的堆栈帧保留了对蒸汽头的引用,并且在rec调用返回之前不会让流被垃圾收集,这可能相当长(就递归步骤而言),这取决于对B)的调用次数。

请注意,即使在没有帮助的情况下,如果编译器允许@tailrec,内存泄漏也可能仍然存在,因为惰性流尾部实际上会创建一个匿名对象,其中包含对流头的引用。

一个可能的解决方法是使recHelp方法不包含对流头的引用。这可以通过将封装流传递给它,并对封装器进行更改以从中删除引用来实现:

@tailrec
final def rec[A](as: Stream[A]): Stream[B] = 
if (a.isEmpty) Stream.empty              
else if (someCond) rec(a.tail)          
else {
// don't inline and don't define as def,
// or anonymous lazy wrapper object would hold reference
val tailRef = new AtomicReference(a.tail)
someB(a.head) #:: recHelp(tailRef)  
}
@tailrec
final def recHelp[A](asRef: AtomicReference[Stream[A]]): Stream[B] = 
// Note: don't put the content of the holder into a local variable
rec(asRef.getAndSet(null))

AtomicReference只是方便,在这种情况下不需要原子性,任何简单的holder对象都可以

还要注意,由于recHelp被封装在流Cons的尾部中,因此它将只被评估一次,并且Cons还负责同步。

正如您所暗示的,问题是在粘贴的代码中,filterHelp函数保留了head(因此您的解决方案将其删除)。

最好的答案是简单地避免这种令人惊讶的行为,使用Scalaz EphemeralStream,并看到它既不oom,又运行得更快,因为它对gc好得多。使用它并不总是那么简单,例如head是a()=>a而不是a,没有提取器等,但它都适用于一个目标,可靠的流使用。

你的filterHelper函数通常不必关心它是否保留了一个引用:

import scalaz.EphemeralStream
@scala.annotation.tailrec
def filter[A](s: EphemeralStream[A], f: A => Boolean): EphemeralStream[A] = 
if (s.isEmpty) 
s
else
if (f(s.head())) 
EphemeralStream.cons(s.head(), filterHelp(s.tail() , f) )
else
filter(s.tail(), f)
def filterHelp[A](s: EphemeralStream[A], f: A => Boolean) =
filter(s, f)
def s1 = EphemeralStream.range(1, big)

我想说的是,除非你有一个令人信服的理由使用Stream(其他库依赖项等),然后只使用EphemeralStream,否则惊喜会少得多。

最新更新