>我有两个数据帧,
val df1 = sqlContext.csvFile("/data/testData.csv")
val df2 = sqlContext.csvFile("/data/someValues.csv")
df1=
startTime name cause1 cause2
15679 CCY 5 7
15683 2 5
15685 1 9
15690 9 6
df2=
cause description causeType
3 Xxxxx cause1
1 xxxxx cause1
3 xxxxx cause2
4 xxxxx
2 Xxxxx
我想将一个复杂的函数getTimeCust
应用于 cause1 和 cause2 以确定最终原因,然后在 df2 中匹配此最终原因代码的描述。我必须有一个包含以下列的新 df(或 rdd):
startTime name cause descriptionCause
我的解决方案是
val rdd2 = df1.map(row => {
val (cause, descriptionCause) = getTimeCust(row.getInt(2), row.getInt(3), df2)
Row (row(0),row(1),cause,descriptionCause)
})
如果运行下面的代码,我有一个NullPointerException
,因为 df2 不可见。
该函数getTimeCust(Int, Int, DataFrame)
地图之外运行良好。
使用 df1.join(df2, <join condition>)
将数据帧联接在一起,然后从联接的数据帧中选择所需的字段。
您不能在执行器上运行的代码中使用 Spark 的分布式结构(rdd、数据帧等)(例如在映射中)。
尝试这样的事情:
def f1(cause1: Int, cause2: Int): Int = some logic to calculate cause
import org.apache.spark.sql.functions.udf
val dfCause = df1.withColumn("df1_cause", udf(f1)($"cause1", $"cause2"))
val dfJoined = dfCause.join(df2, on= df1Cause("df1_cause")===df2("cause"))
dfJoined.select("cause", "description").show()
谢谢你@Assaf。感谢您的回答和数据框的火花 udf。我已经解决了这个问题。解决方案是:
val getTimeCust= udf((cause1: Any, cause2: Any) => {
var lastCause = 0
var categoryCause=""
var descCause=""
lastCause= .............
categoryCause= ........
(lastCause, categoryCause)
})
并在调用 udf 后:
val dfWithCause = df1.withColumn("df1_cause", getTimeCust( $"cause1", $"cause2"))
终于加入了
val dfFinale=dfWithCause.join(df2, dfWithCause.col("df1_cause._1") === df2.col("cause") and dfWithCause.col("df1_cause._2") === df2.col("causeType"),'outer' )