与Spark一起在HDF中加入两个数据文件



我有两个数据集,它们已经使用同一分区器进行了分区并存储在HDF中。这些数据集是我们无法控制的两个不同的火花作业的输出。现在,我想加入这两个数据集以产生不同的信息。

Example:
Data Set 1:
ORDER_ID  CUSTOMER_ID ITEMS
OD1        C1          1,2,3   -> partition 0
OD2        C2          3,4,5   -> partition 0
OD3        C4          1,2,3   -> partition 1
OD4        C3          1,3     -> partition 1
Data Set 1:
ORDER_ID  CUSTOMER_ID  REFUND_ITEMS
OD1        C1          1     -> partition 0
OD2        C2          5     -> partition 0
OD3        C4          2,3   -> partition 1
OD4        C3          3     -> partition 1
Options are:
1) Create two RDDs from the datasets and join them.
2) Create one RDD using one of the dataset.
   -> For each partition in the RDD get the actual partition id i.e OD1 -> 0, OD3 -> 1 (using some custom logic)
   ->  Load data from HDFS for that partition for dataset 2
   -> Iterate over both the dataset and produce combined result.
For option 2 I don't know how to read a specific file form HDFS in the Spark executor. (I have the full URI for location of the file) 

您可以尝试创建2个数据范围并使用SQL加入。请在下面找到代码。

import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.Encoder
// For implicit conversions from RDDs to DataFrames
import spark.implicits._
case class struc_dataset(ORDER_ID: String,CUSTOMER_ID: String, ITEMS:String)
//Read file1
val File1DF = spark.sparkContext
   .textFile("temp/src/file1.txt")
   .map(_.split("t"))
   .map(attributes => struc_dataset(attributes(0), attributes(1),attributes(3))).toDF()
//Register as Temp view - Dataset1
File1DF.createOrReplaceTempView("Datset1")
//Read file2
val File2DF = spark.sparkContext
   .textFile("temp/src/file2.txt")
   .map(_.split("t"))
   .map(attributes => struc_dataset(attributes(0),attributes(1),attributes(3))).toDF()
//Register as Temp view - Dataset2
File2DF.createOrReplaceTempView("Datset2")
// SQL statement to create final dataframe (JOIN)
val finalDF = spark.sql("SELECT * FROM Dataset1 ds1 JOIN Dataset2 ds2 on ds1.ORDER_ID=ds2.ORDER_ID AND ds1.CUSTOMER_ID=ds2.CUSTOMER_ID")
finalDF.show()

最新更新