如何将多个列从 Spark 数据帧中的 When 子句发送到 udf?



我想基于full_outer_join联接两个数据帧,并尝试在联接的结果集中添加一个新列,该列告诉我匹配的记录,仅来自左侧数据帧的不匹配记录和来自仅来自右侧数据帧的不匹配记录。

这是我的火花代码:

val creditLoc ="/data/accounts/credits/year=2016/month=06/day=02"
val debitLoc = "/data/accounts/debits/year=2016/month=06/day=02"
val creditDF = sqlContext.read.avro(creditLoc)
val debitDF  = sqlContext.read.avro(debitLoc) 
val credit  =  creditDF.withColumnRenamed("account_id","credit_account_id").as("credit")
val debit   =  debitDF.withColumnRenamed("account_id","debit_account_id").as("debit")
val fullOuterDF =  credit.join(debit,credit("credit_account_id") === debit("debit_account_id"),"full_outer")
val CREDIT_DEBIT_CONSOLIDATE_SCHEMA=List(
("credit.credit_account_id","string"),
("credit.channel_name",  "string"),
("credit.service_key",  "string"),
("credit.trans_id", "string"),
("credit.trans_dt",  "string"),
("credit.trans_amount",  "string"),
("debit.debit_account_id","string"),
("debit.icf_number","string"),
("debit.debt_amount","string")
)
val columnNamesList = CREDIT_DEBIT_CONSOLIDATE_SCHEMA.map(elem => col(elem._1)).seq 
val df  = fullOuterDF.select(columnNamesList:_*)
val caseDF = df.withColumn("matching_type",
when(df("credit_account_id") === df("debit_account_id"),"MATCHING_CREDIT_DEBIT").otherwise(
when(df("debit_account_id").isNull,"UNMATCHED_CREDIT").otherwise(
when(df("credit_account_id").isNull,"UNMATCHED_DEBIT").otherwise("INVALID_MATCHING_TYPE")
)
)
)

到目前为止,我在when子句本身中应用了"matching_type"的逻辑,但现在我想在UDF中编写"matching_type"的逻辑。 如果像上面那样写,代码就可以工作了。

下面的UDFs 接受单个列作为参数,如何创建一个接受多个列的 udf 并根据该 udf 内部的条件返回布尔值?

val isUnMatchedCREDIT = udf[Boolean, String](credit_account_id => {
credit_account_id == null
})
val isUnMatchedDEBIT = udf[Boolean, String](debit_account_id => {
debit_account_id == null
})

val caseDF = df.withColumn("matching_type",
when(df("credit_account_id") === df("debit_account_id"),"MATCHING_CREDIT_DEBIT").otherwise(
when(isUnMatchedCREDIT(df("credit_account_id")),"UNMATCHED_CREDIT").otherwise(
when(isUnMatchedDEBIT(df("debit_account_id")),"UNMATCHED_DEBIT").otherwise("INVALID_MATCHING_TYPE")
)
)
)
)

基本上,我想创建另一个UDF作为isMatchedCREDITDEBIT(),它接受两列credit_account_iddebit_account_id,如果两个值都相等,则UDF应返回true,否则为false。简而言之,我想为以下逻辑创建一个UDF

when(df("credit_account_id") === df("debit_account_id"),"MATCHING_CREDIT_DEBIT")

我已经尝试过这个,但它抛出编译类型错误:

val isMatchedCREDITDEBIT()= udf[Boolean, String,String](credit_account_id => {
credit_account_id == debit_account_id 
})

有人可以帮助我吗?

您可以创建一个包含两列的udf,并像这样执行逻辑:

val isMatchedCREDITDEBIT = udf((credit_account_id: String, debit_account_id: String) => {
credit_account_id == debit_account_id
})

可以在when子句中调用

when(isMatchedCREDITDEBIT(df("credit_account_id"), df("debit_account_id")), "MATCHING_CREDIT_DEBIT")

但是,为在两列上执行的所有逻辑创建单个udf会更容易。下面的udf将两列都作为输入,并返回所需的字符串,而不是布尔值。

val isMatchedCREDITDEBIT = udf((credit_account_id: String, debit_account_id: String) => {
if(credit_account_id == null){
"UNMATCHED_CREDIT"
} else if (debit_account_id == null){
"UNMATCHED_DEBIT"
} else if (credit_account_id == debit_account_id){
"MATCHING_CREDIT_DEBIT"
} else {
"INVALID_MATCHING_TYPE"
}
})
val caseDF = df.withColumn("matching_type", 
isMatchedCREDITDEBIT(df("credit_account_id"), df("debit_account_id")))

相关内容

  • 没有找到相关文章

最新更新