需要一些帮助来解释来自Spark的一些错误日志。我的理解是,缓存不应触发所有数据发送到驱动程序。我有一个缩写的堆栈跟踪,如下所示:
Exception in thread "main" org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:136)
...
at org.apache.spark.sql.Dataset.persist(Dataset.scala:2902)
at org.apache.spark.sql.Dataset.cache(Dataset.scala:2912)
...
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 16 tasks (1076.4 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1750)
...
at org.apache.spark.rdd.RDD.collect(RDD.scala:938)
at org.apache.spark.sql.execution.SparkPlan.executeCollectIterator(SparkPlan.scala:304)
at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1$$anonfun$apply$1.apply(BroadcastExchangeExec.scala:76)
at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1$$anonfun$apply$1.apply(BroadcastExchangeExec.scala:73)
at org.apache.spark.sql.execution.SQLExecution$.withExecutionId(SQLExecution.scala:97)
at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1.apply(BroadcastExchangeExec.scala:72)
at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1.apply(BroadcastExchangeExec.scala:72)
...
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
看起来缓存启动了一个广播,然后最终调用RDD上的集合,然后触发"作业由于阶段故障而中止:16个任务的序列化结果的总大小(1076.4 MB(大于spark.driver.maxResultSize (1024.0 MB("错误。
为什么我看到这个错误对我来说有点困惑 - 我读到.cache
的是它保留在节点中,不必将所有数据移动到驱动程序中。
代码如下所示。 我们有一个按visit_id汇总一系列事件的工作。 它读取事件,投影一些字段,然后像这样聚合它们:
def aggregateRows: sql.DataFrame = {
projected
.orderBy("headerTimestamp")
.groupBy(groupBys.head, groupBys.tail: _*)
.agg(
first("accountState", ignoreNulls = true).alias("accountState"),
first("userId", ignoreNulls = true).alias("userId"),
first("subaffiliateId", ignoreNulls = true).alias("subaffiliateId"),
first("clientPlatform", ignoreNulls = true).alias("clientPlatform"),
first("localTimestamp", ignoreNulls = true).alias("localTimestamp"),
first("page", ignoreNulls = true).alias("firstPage")
)
}
(顺便说一句,我认为这段代码不正确,因为 groupBy 显然不维护排序,但这是我收到此错误时正在运行的代码(
然后我们在这样的user_id
上加入这个访问汇总(我们使用createOrReplaceTempView
和 spark sql 创建一个名为"访问"的临时视图(:
SELECT
u.days_since_last_active,
u.user_id,
v.appName as app_name,
v.clientPlatform as client_platform,
v.countryCode as country_code,
v.llChannel as ll_channel,
v.llSource as ll_source,
v.referralKey as referral_key,
v.visitTimestamp as resurrection_time,
v.subaffiliateId as subaffiliateId,
v.visitDate as resurrection_date,
v.accountState as account_state,
v.ipAddress as ip_address,
v.localTimestamp as resurrection_local_time,
v.visitId as visit_id,
v.firstPage as resurrection_page,
row_number() OVER (PARTITION BY u.days_since_last_active, u.user_id ORDER BY v.visitTimestamp) as rn
FROM ubdm u
LEFT OUTER JOIN visits v ON v.userId = u.user_id
AND u.date = '$dateStr'
AND (u.days_since_last_active > 30
OR (u.days_since_signup > 30 AND u.days_since_last_active IS NULL))
然后,我们在上面调用cache
,然后将数据帧写入 tsv 和镶木地板
val cached = generateDataFrame().cache()
writeParquet(cached.write, parquetPath)
writeTsv(cached.write, tsvPath)
.write
返回一个DataFrameWriter
。最后,例如对于镶木地板,我们称以下内容为DataFrameWriter
def writeParquet[A](df: DataFrameWriter[A], outputPath: String, saveMode: SaveMode = SaveMode.Overwrite): Unit = {
df.mode(saveMode)
.parquet(outputPath)
}
据我了解 - 一切正常。不,cache
不会触发collect
.
你还记得 - 火花transformations
和actions
.Transformations
仅由actions
触发。collect
是一个操作,它触发了cache
介于两者之间的rdd
计算。
您收到错误是因为您尝试collect
太多数据 (不适合您的驱动程序节点( 。
附言如果你能分享代码 - 那就太好了,顺便说一句。
在 Spark 2.3 上,cache()
会触发在驱动程序上收集广播数据。这是一个错误 (SPARK-23880( - 已在 2.4.0 版中修复。
至于转换与操作:一些 Spark 转换涉及额外的操作,例如sortByKey
在RDD上。因此,将所有 Spark 操作划分为转换或操作有点过于简单化。