Spark Streaming是否支持Flink那样的迭代



例如,Flink的程序从一系列整数中连续减去1,直到它们达到零:闪烁迭代

DataStream<Long> someIntegers = env.generateSequence(0, 1000);
IterativeStream<Long> iteration = someIntegers.iterate();
DataStream<Long> minusOne = iteration.map(new MapFunction<Long, Long>() {
  @Override
  public Long map(Long value) throws Exception {
    return value - 1 ;
  }
});
DataStream<Long> stillGreaterThanZero = minusOne.filter(new FilterFunction<Long>() {
  @Override
  public boolean filter(Long value) throws Exception {
    return (value > 0);
 }
});
iteration.closeWith(stillGreaterThanZero);
DataStream<Long> lessThanZero = minusOne.filter(new FilterFunction<Long>() {
    @Override
    public boolean filter(Long value) throws Exception {
       return (value <= 0);
    }
});

没有什么能阻止您编写自己的。我会写一个尾部递归函数,它在驱动程序上运行,可以完成你想要的一切。上下文可以是维护状态的任何类型的对象,也可以是您想要做出决策的任何对象。

@tailrec
def iterateWhileYouCan[T](data: RDD[T], context: Boolean):RDD[T] = {
    if (context)
      data
    iterateWhileYouCan(awesomeComputation(data), context)
  }

相关内容

  • 没有找到相关文章

最新更新