我正在使用Spark 1.5。
我有两个表单的数据帧:
scala> libriFirstTable50Plus3DF
res1: org.apache.spark.sql.DataFrame = [basket_id: string, family_id: int]
scala> linkPersonItemLessThan500DF
res2: org.apache.spark.sql.DataFrame = [person_id: int, family_id: int]
libriFirstTable50Plus3DF
有766,151 条记录,而linkPersonItemLessThan500DF
有26,694,353 条记录。请注意,我在linkPersonItemLessThan500DF
上使用repartition(number)
,因为我打算稍后加入这两个。我正在跟进上面的代码:
val userTripletRankDF = linkPersonItemLessThan500DF
.join(libriFirstTable50Plus3DF, Seq("family_id"))
.take(20)
.foreach(println(_))
为此,我得到了以下输出:
16/12/13 15:07:10 INFO scheduler.TaskSetManager: Finished task 172.0 in stage 3.0 (TID 473) in 520 ms on mlhdd01.mondadori.it (199/200)
java.util.concurrent.TimeoutException: Futures timed out after [300 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala: at scala.concurrent.Await$.result(package.scala:107)
at org.apache.spark.sql.execution.joins.BroadcastHashJoin.doExecute(BroadcastHashJoin.scala:110)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
at org.apache.spark.sql.execution.TungstenProject.doExecute(basicOperators.scala:86)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
at org.apache.spark.sql.execution.ConvertToSafe.doExecute(rowFormatConverters.scala:63)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:190)
at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:207)
at org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1386)
at org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1386)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1904)
at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1385)
at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1315)
at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1378)
at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:178)
at org.apache.spark.sql.DataFrame.show(DataFrame.scala:402)
at org.apache.spark.sql.DataFrame.show(DataFrame.scala:363)
at org.apache.spark.sql.DataFrame.show(DataFrame.scala:371)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:72)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:77)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:79)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:81)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:83)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:85)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:87)
at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:89)
at $iwC$$iwC$$iwC$$iwC.<init>(<console>:91)
at $iwC$$iwC$$iwC.<init>(<console>:93)
at $iwC$$iwC.<init>(<console>:95)
at $iwC.<init>(<console>:97)
at <init>(<console>:99)
at .<init>(<console>:103)
at .<clinit>(<console>)
at .<init>(<console>:7)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
我不明白问题是什么。这就像增加等待时间那么简单吗?加入是否过于密集?我需要更多的内存吗?洗牌是否密集?谁能帮忙?
发生这种情况是因为 Spark 尝试执行广播哈希联接,并且其中一个数据帧非常大,因此发送它会消耗大量时间。
您可以:
- 设置更高的
spark.sql.broadcastTimeout
以增加超时 -spark.conf.set("spark.sql.broadcastTimeout", newValueForExample36000)
persist()
两个数据帧,则 Spark 将使用随机连接 - 参考来自这里
PySpark
在 PySpark 中,您可以在构建 Spark 上下文时按以下方式设置配置:
spark = SparkSession
.builder
.appName("Your App")
.config("spark.sql.broadcastTimeout", "36000")
.getOrCreate()
只是为了给 @T. Gawęda 非常简洁的答案添加一些代码上下文。
在您的 Spark 应用程序中,Spark SQL 确实为连接选择了广播哈希联接,因为"libriFirstTable50Plus3DF 有 766,151 条记录",恰好小于所谓的广播阈值(默认为 10MB)。
可以使用 spark.sql.autoBroadcastJoinThreshold 配置属性控制广播阈值。
spark.sql.autoBroadcastJoinThreshold配置在执行联接时将广播到所有工作器节点的表的最大大小(以字节为单位)。通过将此值设置为 -1,可以禁用广播。请注意,当前统计信息仅支持已运行命令"分析表计算统计信息"noscan 的 Hive 元存储表。
您可以在堆栈跟踪中找到该特定类型的联接:
org.apache.spark.sql.execution.joins.BroadcastHashJoin.doExecute(BroadcastHashJoin.scala:110)
Spark SQL 中的BroadcastHashJoin
物理运算符使用广播变量将较小的数据集分发到 Spark 执行程序(而不是随每个任务一起发送它的副本)。
如果您使用explain
来查看物理查询计划,您会注意到查询使用 BroadcastExchangeExec 物理运算符。在这里,您可以看到用于广播较小表(和超时)的底层机制。
override protected[sql] def doExecuteBroadcast[T](): broadcast.Broadcast[T] = {
ThreadUtils.awaitResult(relationFuture, timeout).asInstanceOf[broadcast.Broadcast[T]]
}
doExecuteBroadcast
是Spark SQL中的每个物理运算符都遵循的SparkPlan
契约的一部分,如果需要,允许广播。BroadcastExchangeExec
恰好需要它。
超时参数就是您要查找的参数。
private val timeout: Duration = {
val timeoutValue = sqlContext.conf.broadcastTimeout
if (timeoutValue < 0) {
Duration.Inf
} else {
timeoutValue.seconds
}
}
如您所见,您可以完全禁用它(使用负值),这意味着无限期地等待广播变量传送到执行器或使用sqlContext.conf.broadcastTimeout
这正是 spark.sql.broadcastTimeout 配置属性。默认值为5 * 60
秒,您可以在堆栈跟踪中看到:
java.util.concurrent.TimeoutException: Futures 在 [300 秒] 后超时
除了增加spark.sql.broadcastTimeout
或 persist() 两个数据帧,
您可以尝试:
1.通过将spark.sql.autoBroadcastJoinThreshold
设置为-1
来禁用广播
2.通过将spark.driver.memory
设置为更高的值来增加火花驱动程序内存。
就我而言,这是由大型数据帧上的广播引起的:
df.join(broadcast(largeDF))
因此,根据之前的答案,我通过删除广播来修复它:
df.join(largeDF)