斯帕克·斯卡拉。在映射中使用外部变量"数据帧"



>我有两个数据帧,

val df1 = sqlContext.csvFile("/data/testData.csv")
val df2 = sqlContext.csvFile("/data/someValues.csv")

 df1=
 startTime  name    cause1  cause2
 15679       CCY    5         7
 15683              2         5
 15685              1         9
 15690              9         6
df2=
cause   description causeType
3       Xxxxx       cause1
1       xxxxx       cause1
3       xxxxx       cause2
4       xxxxx
2       Xxxxx

我想将一个复杂的函数getTimeCust应用于 cause1 和 cause2 以确定最终原因,然后在 df2 中匹配此最终原因代码的描述。我必须有一个包含以下列的新 df(或 rdd):

startTime   name    cause   descriptionCause

我的解决方案是

  val rdd2 = df1.map(row => {
  val (cause, descriptionCause) = getTimeCust(row.getInt(2), row.getInt(3), df2)
  Row (row(0),row(1),cause,descriptionCause)
  })

如果运行下面的代码,我有一个NullPointerException,因为 df2 不可见。

该函数getTimeCust(Int, Int, DataFrame)地图之外运行良好。

使用 df1.join(df2, <join condition>) 将数据帧联接在一起,然后从联接的数据帧中选择所需的字段。

您不能在执行器上运行的代码中使用 Spark 的分布式结构(rdd、数据帧等)(例如在映射中)。

尝试这样的事情:

def f1(cause1: Int, cause2: Int): Int = some logic to calculate cause
import org.apache.spark.sql.functions.udf
val dfCause = df1.withColumn("df1_cause", udf(f1)($"cause1", $"cause2"))
val dfJoined = dfCause.join(df2, on= df1Cause("df1_cause")===df2("cause"))
dfJoined.select("cause", "description").show()

谢谢你@Assaf。感谢您的回答和数据框的火花 udf。我已经解决了这个问题。解决方案是:

   val getTimeCust= udf((cause1: Any, cause2: Any) => {
   var lastCause = 0
   var categoryCause=""
   var descCause=""
   lastCause= .............
   categoryCause= ........
    (lastCause, categoryCause)
  })

并在调用 udf 后:

  val dfWithCause = df1.withColumn("df1_cause", getTimeCust( $"cause1", $"cause2"))

终于加入了

 val dfFinale=dfWithCause.join(df2, dfWithCause.col("df1_cause._1") === df2.col("cause") and dfWithCause.col("df1_cause._2") === df2.col("causeType"),'outer' )

相关内容

  • 没有找到相关文章

最新更新