例如,Flink的程序从一系列整数中连续减去1,直到它们达到零:闪烁迭代
DataStream<Long> someIntegers = env.generateSequence(0, 1000);
IterativeStream<Long> iteration = someIntegers.iterate();
DataStream<Long> minusOne = iteration.map(new MapFunction<Long, Long>() {
@Override
public Long map(Long value) throws Exception {
return value - 1 ;
}
});
DataStream<Long> stillGreaterThanZero = minusOne.filter(new FilterFunction<Long>() {
@Override
public boolean filter(Long value) throws Exception {
return (value > 0);
}
});
iteration.closeWith(stillGreaterThanZero);
DataStream<Long> lessThanZero = minusOne.filter(new FilterFunction<Long>() {
@Override
public boolean filter(Long value) throws Exception {
return (value <= 0);
}
});
没有什么能阻止您编写自己的。我会写一个尾部递归函数,它在驱动程序上运行,可以完成你想要的一切。上下文可以是维护状态的任何类型的对象,也可以是您想要做出决策的任何对象。
@tailrec
def iterateWhileYouCan[T](data: RDD[T], context: Boolean):RDD[T] = {
if (context)
data
iterateWhileYouCan(awesomeComputation(data), context)
}