我需要一些帮助
当我使用for循环更新数据框架时,我有apache-spark的问题。它的大小一直无限制地增长,尽管它的计数没有增长
你能建议我如何修复它或指导我为什么我的数据帧大小一直在增长吗?(T ^ T)//
我的程序运行在本地[6]使用spark2.0.1
@this is my code
def main(args: Array[String]): Unit = {
val df1 = initial dataframe(read from db)
while(){
val word_count_df = processAndCountText() // query data from database and do word count
val temp_df1 = update(df1,word_count_df )
temp_df1.persist(StorageLevel.MEMORY_AND_DISK)
df1.unpersist()
df1 = temp_df1
println(temp_df1.count())
println(s"${SizeEstimator.estimate(temp_df1) / 1073741824.0} GB")
}
}
//
编辑这是一个更新函数,用于更新某些在word_count_df中有键的行。
我试图将其拆分为2个数据帧并分别计算,然后返回2个数据帧的并集,但这需要太多时间,因为它需要启用"spark.sql.crossJoin.enabled"
def update(u_stateful_df : DataFrame, word_count_df : DataFrame) : DataFrame = {
val run_time = current_end_time_m - start_time_ms / 60000
val calPenalty = udf { (last_update_duration: Long, run_time: Long) => calculatePenalty(last_update_duration, run_time) }
//calculatePenalty is simple math function using for loop and return double
val calVold = udf { (v_old: Double, penalty_power: Double) => v_old * Math.exp(penalty_power) }
//(word_new,count_new)
val word_count_temp_df = word_count_df
.withColumnRenamed("word", "word_new")
.withColumnRenamed("count", "count_new")
//u_stateful_df (word,u,v,a,last_update,count)
val state_df = u_stateful_df
.join(word_count_temp_df, u_stateful_df("word") === word_count_temp_df("word_new"), "outer")
.na.fill(Map("last_update" -> start_time_ms / 60000))
.na.fill(0.0)
.withColumn("word", when(col("word").isNotNull, col("word")).otherwise(col("word_new")))
.withColumn("count", when(col("word_new").isNotNull, col("count_new")).otherwise(-1))
.drop("count_new")
.withColumn("current_end_time_m", lit(current_end_time_m))
.withColumn("last_update_duration", col("current_end_time_m") - col("last_update"))
.filter(col("last_update_duration") < ResourceUtility.one_hour_duration_ms / 60000)
.withColumn("run_time", when(col("word_new").isNotNull, lit(run_time)))
.withColumn("penalty_power", when(col("word_new").isNotNull, calPenalty(col("last_update_duration"), col("run_time"))))
.withColumn("v_old_penalty", when(col("word_new").isNotNull, calVold(col("v"), col("penalty_power"))))
.withColumn("v_new", when(col("word_new").isNotNull, col("count") / run_time))
.withColumn("v_sum", when(col("word_new").isNotNull, col("v_old_penalty") + col("v_new")))
.withColumn("a", when(col("word_new").isNotNull, (col("v_sum") - col("v")) / col("last_update_duration")).otherwise(col("a")))
.withColumn("last_update", when(col("word_new").isNotNull, lit(current_end_time_m)).otherwise(col("last_update")))
.withColumn("u", when(col("word_new").isNotNull, col("v")).otherwise(col("u")))
.withColumn("v", when(col("word_new").isNotNull, col("v_sum")).otherwise(col("v")))
state_df.select("word", "u", "v", "a", "last_update", "count")
}
@this is my log
u_stateful_df : 1408665
size of dataframe size : 0.8601360470056534 GB
u_stateful_df : 1408665
size of dataframe size : 1.3347024470567703 GB
u_stateful_df : 268498
size of dataframe size : 1.5012029185891151 GB
u_stateful_df : 147232
size of dataframe size : 3.287795402109623 GB
u_stateful_df : 111950
size of dataframe size : 4.761911824345589 GB
....
....
u_stateful_df : 72067
size of dataframe size : 14.510709017515182 GB
@这是日志当我写入文件
I save df1 as CSV in the file system. below is the size of dataframe in file system, count and size(track by org.apache.spark.util.SizeEstimator).
csv size 84.2 MB
u_stateful_df : 1408665
size of dataframe size : 0.4460855945944786 GB
csv size 15.2 MB
u_stateful_df : 183315
size of dataframe size : 0.522 GB
csv size 9.96 MB
u_stateful_df : 123381
size of dataframe size : 0.630GB
csv size 4.63 MB
u_stateful_df : 56896
size of dataframe size : 0.999 GB
...
...
...
csv size 3.47 MB
u_stateful_df : 43104
size of dataframe size : 3.1956922858953476 GB
看起来像是Spark内部有泄漏。通常当你在Dataframe上调用persist
或cache
时,然后count
Spark生成结果并将其存储在分布式内存或磁盘上,但也知道整个执行计划,以便在丢失执行器或其他情况下重建该Dataframe。但是不应该占用那么多空间……
据我所知,没有选项来"崩溃"数据框架(告诉Spark忘记整个执行计划),除了简单地写入存储,然后从这个存储读取。