我有从CSV文件加载的倍数,我想根据一列加入它们,在这里我所做的。我只是想仿制并使其自动化。P>
val spark = SparkSession.builder.master("local").appName("my-spark-app").getOrCreate()
import spark.sqlContext.implicits._
val df1 = spark.read.option("inferSchema", "true").option("header", "true").csv("C:/Users/mhattabi/Desktop/dataTestCsvFile/dataTest1.txt")
val df2 = spark.read.option("inferSchema", "true").option("header", "true").csv("C:/Users/mhattabi/Desktop/dataTestCsvFile/dataTest2.txt")
val df3 = spark.read.option("inferSchema", "true").option("header", "true").csv("C:/Users/mhattabi/Desktop/dataTestCsvFile/dataTest3.txt")
df1.show
df2.show
df3.show
val df =List(df1,df2,df3).reduce((a, b) => a.join(b,Seq("time"),joinType="outer"))
df.show
问题只花了两个并非全部。结果是加入其中两个谢谢
这是答案的解决方案
val df1 = spark.read.option("inferSchema", "true").option("header", "true").csv("C:/Users/mhattabi/Desktop/dataTestCsvFile/dataTest1.txt")
val df2 = spark.read.option("inferSchema", "true").option("header", "true").csv("C:/Users/mhattabi/Desktop/dataTestCsvFile/dataTest2.txt")
val df3 = spark.read.option("inferSchema", "true").option("header", "true").csv("C:/Users/mhattabi/Desktop/dataTestCsvFile/dataTest3.txt")
val df_result=recursiveJoinOnDate(List(df1,df2,df3))
df_result.show
}
def recursiveJoinOnDate(list: List[DataFrame]): DataFrame =
{ if (list.isEmpty){ null }else if(list.size >1){ list.head.join(recursiveJoinOnDate(list.tail),Seq("`time.1`"),joinType="outer") }else list.head }
}