我是Apache Spark的初学者。我正在尝试运行一个流动作业,该流局将收回一些数据,将其转换为数据框架并运行一些处理,例如加入和删除重复项等。现在,我必须缓存此处理的数据,以便可以使用下一个Dstream(使用一些Union/Join)将其附加在一起。
我尝试使用dataframe.cache()缓存并在Next stream批次中重新使用。
例如,如果DF是由Dstream形成的RDD。
foreachrdd{
new =df.unionAll(processed)
new.registerTempTable("TableScheme")
sql.( //perform inner join and some other processing)
processed=new
processed.cache();
}
当我们执行dataframe.cache或dataframe.persist()时,我们是在缓存实际数据还是应用的DAG/转换?当第二流到来时,我的程序将
退出Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
org.apache.spark.SparkException: Attempted to use BlockRDD[1] at socketTextStream after its blocks have been removed!
cache()
功能执行在DAG中定义的处理,直到cache()
调用并将数据存储适当地存储,以便不重复计算,应执行多个操作。
您想要的是在流批处理之间的持久性。
有几种方法:
-
streamingContext.remember
(分钟(5))保留了Minutes(5)
的以前批次的数据 -
window
in数据跨数据移动一个固定的时间窗口,使您可以对一批数据进行操作 -
updateStateByKey
&mapWithState
提供了维持和转化跨批处理状态的机制
您选择的方法将在很大程度上取决于您的用例。