筛选时 Spark SQL "Futures timed out after 300 seconds"



我在做似乎很简单的SQL SQL过滤作业时会得到异常:

    someOtherDF
      .filter(/*somecondition*/)
      .select($"eventId")
      .createOrReplaceTempView("myTempTable")
    records
      .filter(s"eventId NOT IN (SELECT eventId FROM myTempTable)")

任何想法我如何解决这个问题?

注意:

  • 某些hotherdf在过滤和Event后的〜1m至5m行之间是GUIDS。
  • 记录包含40m至50m的行。

错误:

Stacktrace:
org.apache.spark.SparkException: Exception thrown in awaitResult:
        at org.apache.spark.util.ThreadUtils$.awaitResultInForkJoinSafely(ThreadUtils.scala:215)
        at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:131)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:124)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:124)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
        at org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:123)
        at org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec.doExecute(BroadcastNestedLoopJoinExec.scala:343)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
        at ...
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [300 seconds]
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
    at org.apache.spark.util.ThreadUtils$.awaitResultInForkJoinSafely(ThreadUtils.scala:212)
    ... 84 more

使用以下零件1(如何排除不与另一个桌子连接的行?2(加入

之后,在数据框架中发生spark副本列

我可以使用这样的左外部加入来解决我的问题:

    val leftColKey = records("eventId")
    val rightColKey = someOtherDF("eventId")
    val toAppend: DataFrame = records
      .join(someOtherDF, leftColKey === rightColKey, "left_outer")
      .filter(rightColKey.isNull) // Keep rows without a match in 'someOtherDF'. See (1)
      .drop(rightColKey) // Needed to discard duplicate column. See (2)

性能真的很好,并且不会遭受"未来的时机"问题。

编辑

正如同事向我指出的那样," Lefanti"联接类型更有效。

相关内容

最新更新