如何使用JSON映射文件在Spark中使用Scala生成新的DataFrame



我有两个DataFrameDF1DF2,还有一个JSON文件,我需要用它作为映射文件来创建另一个数据帧(DF3(。

DF1:

+-------+-------+-------+
|column1|column2|column3|
+-------+-------+-------+
|    100|   John| Mumbai|
|    101|   Alex|  Delhi|
|    104|  Divas|Kolkata|
|    108|  Jerry|Chennai|
+-------+-------+-------+

DF2:

+-------+-----------+-------+
|column4|    column5|column6|
+-------+-----------+-------+
|     S1|        New|    xxx|
|     S2|        Old|    yyy|
|     S5|replacement|    zzz|
|    S10|        New|    ppp|
+-------+-----------+-------+

除此之外,我还有一个JSON格式的映射文件,它将用于生成DF3。

以下是JSON映射文件:

{"targetColumn":"newColumn1","sourceField1":"column2","sourceField2":"column4"}
{"targetColumn":"newColumn2","sourceField1":"column7","sourceField2":"column5"}
{"targetColumn":"newColumn3","sourceField1":"column8","sourceField2":"column6"}

因此,从这个JSON文件中,我需要使用映射的targetColumn部分中可用的列创建DF3,它将检查源列(如果它存在于DF1中(,然后从DF1映射到sourceField1,否则从DF2映射到sourceField2

以下是预期输出。

+----------+-----------+----------+
|newColumn1| newColumn2|newColumn3|
+----------+-----------+----------+
|      John|        New|       xxx|
|      Alex|        Old|       yyy|
|     Divas|replacement|       zzz|
|     Jerry|        New|       ppp|
+----------+-----------+----------+

这里的任何帮助都将被挪用。

分析JSON并创建以下自定义对象的List

case class SrcTgtMapping(targetColumn:String,sourceField1:String,sourceField2:String)
val srcTgtMappingList=List(SrcTgtMapping("newColumn1","column2","column4"),SrcTgtMapping("newColumn2","column7","column5"),SrcTgtMapping("newColumn3","column8","column6"))

index column的基础上,将伪index column添加到两个dataframes,并加入两个dataframes

import org.apache.spark.sql.functions._
val df1WithIndex=df1.withColumn("index",monotonicallyIncreasingId)
val df2WithIndex=df2.withColumn("index",monotonicallyIncreasingId)
val joinedDf=df1WithIndex.join(df2WithIndex,df1WithIndex.col("index")===df2WithIndex.col("index"))

创建query并执行它。

val df1Columns=df1WithIndex.columns.toList
val df2Columns=df2WithIndex.columns.toList
val query=srcTgtMappingList.map(stm=>if(df1Columns.contains(stm.sourceField1)) joinedDf.col(stm.sourceField1).alias(stm.targetColumn) else joinedDf.col(stm.sourceField2).alias(stm.targetColumn))
val output=joinedDf.select(query:_*)
output.show

样本输出:

+----------+-----------+----------+
|newColumn1| newColumn2|newColumn3|
+----------+-----------+----------+
|      John|        New|       xxx|
|      Alex|        Old|       yyy|
|     Jerry|        New|       ppp|
|     Divas|replacement|       zzz|
+----------+-----------+----------+

希望这种方法能帮助你

最新更新