Apache Spark流:如何比较2个Dstream中的2个数据范围



我是Apache Spark的初学者。我正在尝试运行一个流动作业,该流局将收回一些数据,将其转换为数据框架并运行一些处理,例如加入和删除重复项等。现在,我必须缓存此处理的数据,以便可以使用下一个Dstream(使用一些Union/Join)将其附加在一起。

我尝试使用dataframe.cache()缓存并在Next stream批次中重新使用。

例如,如果DF是由Dstream形成的RDD。

   foreachrdd{
new =df.unionAll(processed)
new.registerTempTable("TableScheme")
sql.( //perform inner join and some other processing)
processed=new
processed.cache();
}

当我们执行dataframe.cache或dataframe.persist()时,我们是在缓存实际数据还是应用的DAG/转换?当第二流到来时,我的程序将

退出
Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
org.apache.spark.SparkException: Attempted to use BlockRDD[1] at socketTextStream after its blocks have been removed!

cache()功能执行在DAG中定义的处理,直到cache()调用并将数据存储适当地存储,以便不重复计算,应执行多个操作。

> >

您想要的是在流批处理之间的持久性。

有几种方法:

  • streamingContext.remember(分钟(5))保留了Minutes(5)的以前批次的数据
  • window in数据跨数据移动一个固定的时间窗口,使您可以对一批数据进行操作
  • updateStateByKey&mapWithState提供了维持和转化跨批处理状态的机制

您选择的方法将在很大程度上取决于您的用例。

相关内容

  • 没有找到相关文章

最新更新