在这种情况下,Spark cache() 是否会导致驱动程序的 collect()?



需要一些帮助来解释来自Spark的一些错误日志。我的理解是,缓存不应触发所有数据发送到驱动程序。我有一个缩写的堆栈跟踪,如下所示:

Exception in thread "main" org.apache.spark.SparkException: Exception thrown in awaitResult: 
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:136)
...
at org.apache.spark.sql.Dataset.persist(Dataset.scala:2902)
at org.apache.spark.sql.Dataset.cache(Dataset.scala:2912)
...
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 16 tasks (1076.4 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1750)
...
at org.apache.spark.rdd.RDD.collect(RDD.scala:938)
at org.apache.spark.sql.execution.SparkPlan.executeCollectIterator(SparkPlan.scala:304)
at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1$$anonfun$apply$1.apply(BroadcastExchangeExec.scala:76)
at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1$$anonfun$apply$1.apply(BroadcastExchangeExec.scala:73)
at org.apache.spark.sql.execution.SQLExecution$.withExecutionId(SQLExecution.scala:97)
at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1.apply(BroadcastExchangeExec.scala:72)
at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1.apply(BroadcastExchangeExec.scala:72)
...
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

看起来缓存启动了一个广播,然后最终调用RDD上的集合,然后触发"作业由于阶段故障而中止:16个任务的序列化结果的总大小(1076.4 MB(大于spark.driver.maxResultSize (1024.0 MB("错误。

为什么我看到这个错误对我来说有点困惑 - 我读到.cache的是它保留在节点中,不必将所有数据移动到驱动程序中。

代码如下所示。 我们有一个按visit_id汇总一系列事件的工作。 它读取事件,投影一些字段,然后像这样聚合它们:

def aggregateRows: sql.DataFrame = {
projected
.orderBy("headerTimestamp")
.groupBy(groupBys.head, groupBys.tail: _*)
.agg(
first("accountState", ignoreNulls = true).alias("accountState"),
first("userId", ignoreNulls = true).alias("userId"),
first("subaffiliateId", ignoreNulls = true).alias("subaffiliateId"),
first("clientPlatform", ignoreNulls = true).alias("clientPlatform"),
first("localTimestamp", ignoreNulls = true).alias("localTimestamp"),
first("page", ignoreNulls = true).alias("firstPage")
)
}

(顺便说一句,我认为这段代码不正确,因为 groupBy 显然不维护排序,但这是我收到此错误时正在运行的代码(

然后我们在这样的user_id上加入这个访问汇总(我们使用createOrReplaceTempView和 spark sql 创建一个名为"访问"的临时视图(:

SELECT
u.days_since_last_active,
u.user_id,
v.appName as app_name,
v.clientPlatform as client_platform,
v.countryCode as country_code,
v.llChannel as ll_channel,
v.llSource as ll_source,
v.referralKey as referral_key,
v.visitTimestamp as resurrection_time,
v.subaffiliateId as subaffiliateId,
v.visitDate as resurrection_date,
v.accountState as account_state,
v.ipAddress as ip_address,
v.localTimestamp as resurrection_local_time,
v.visitId as visit_id,
v.firstPage as resurrection_page,
row_number() OVER (PARTITION BY u.days_since_last_active, u.user_id ORDER BY v.visitTimestamp) as rn
FROM ubdm u
LEFT OUTER JOIN visits v ON v.userId = u.user_id
AND u.date = '$dateStr'
AND (u.days_since_last_active > 30
OR (u.days_since_signup > 30 AND u.days_since_last_active IS NULL))

然后,我们在上面调用cache,然后将数据帧写入 tsv 和镶木地板

val cached = generateDataFrame().cache()
writeParquet(cached.write, parquetPath)
writeTsv(cached.write, tsvPath)

.write返回一个DataFrameWriter。最后,例如对于镶木地板,我们称以下内容为DataFrameWriter

def writeParquet[A](df: DataFrameWriter[A], outputPath: String, saveMode: SaveMode = SaveMode.Overwrite): Unit = {
df.mode(saveMode)
.parquet(outputPath)
}

据我了解 - 一切正常。不,cache不会触发collect.

你还记得 - 火花transformationsactions.Transformations仅由actions触发。collect是一个操作,它触发了cache介于两者之间的rdd计算。

您收到错误是因为您尝试collect太多数据 (不适合您的驱动程序节点( 。

附言如果你能分享代码 - 那就太好了,顺便说一句。

在 Spark 2.3 上,cache()触发在驱动程序上收集广播数据。这是一个错误 (SPARK-23880( - 已在 2.4.0 版中修复。

至于转换与操作:一些 Spark 转换涉及额外的操作,例如sortByKey在RDD上。因此,将所有 Spark 操作划分为转换或操作有点过于简单化。

最新更新