如何基于一个公共字段连接两个rdd



我是Scala的新手,正在学习使用RDDs。我有两个csv文件,其中包含以下标题和数据:csv1.txt

id,"location", "zipcode" 
1, "a", "12345"
2, "b", "67890"
3, "c" "54321"

csv2.txt

"location_x", "location_y", trip_hrs
"a", "b", 1
"a", "c", 3
"b", "c", 2
"a", "b", 1
"c", "b", 2

基本上,csv1数据是一组不同的位置和邮政编码,而csv2的数据的行程持续时间介于location_x和location_y之间。

这两个数据集中的共同信息是csv1中的locationcsv2location_x,尽管它们具有不同的头名称。

我想创建两个RDDs,其中一个包含来自csv1的数据,另一个来自csv2

然后我想join这两个RDDs,并返回该位置的位置、邮编和所有行程时间的总和,如下所示:

("a", "zipcode", 5)
("b", "zipcode", 2)
("c", "zipcode", 2)

我想知道你们中是否有人能帮我解决这个问题。谢谢

我将为您提供代码(IntelliJ中的完整应用程序(和一些解释。我希望它能有所帮助。

有关明确的详细信息,请阅读Spark文档。

使用键值对

这个问题可以用Spark Dataframes解决,您可以自己尝试。

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
object Joining {
val spark = SparkSession
.builder()
.appName("Joining")
.master("local[*]")
.config("spark.sql.shuffle.partitions", "4") //Change to a more reasonable default number of partitions for our data
.config("spark.app.id", "Joining")  // To silence Metrics warning
.getOrCreate()
val sc = spark.sparkContext
val path = "/home/cloudera/files/tests/"
def main(args: Array[String]): Unit = {
Logger.getRootLogger.setLevel(Level.ERROR)
try {
// read the files
val file1 = sc.textFile(s"${path}join1.csv")
val header1 = file1.first // extract the header of the file
val file2 = sc.textFile(s"${path}join2.csv")
val header2 = file2.first // extract the header of the file
val rdd1 = file1
.filter(line => line != header1) // to leave out the header
.map(line => line.split(",")) // split the lines => Array[String]
.map(arr => (arr(1).trim,arr(2).trim)) // to make up a pairRDD with arr(1) as key and zipcode
val rdd2 = file2
.filter(line => line != header2)
.map(line => line.split(",")) // split the lines => Array[String]
.map(arr => (arr(0).trim, arr(2).trim.toInt)) // to make up a pairRDD with arr(0) as key and trip_hrs
val joined = rdd1 // join the pairRDD by its keys
.join(rdd2)
.cache()  // cache joined in memory
joined.foreach(println) // checking data
println("**************")
//      ("c",("54321",2))
//      ("b",("67890",2))
//      ("a",("12345",1))
//      ("a",("12345",3))
//      ("a",("12345",1))
val result = joined.reduceByKey({ case((zip, time), (zip1, time1) ) => (zip, time + time1) })
result.map({case( (id,(zip,time)) ) => (id, zip, time)}).foreach(println) // checking output
//      ("b","67890",2)
//      ("c","54321",2)
//      ("a","12345",5)
// To have the opportunity to view the web console of Spark: http://localhost:4041/
println("Type whatever to the console to exit......")
scala.io.StdIn.readLine()
} finally {
sc.stop()
println("SparkContext stopped")
spark.stop()
println("SparkSession stopped")
}
}
}

如果您已经可以将CSV读取到RDD中,则可以汇总Trips,然后与Location:连接

val tripsSummarized = trips
.map({ case (location, _, hours) => (location, hours) })
.reduceByKey((hoursTotal, hoursIncrement) => hoursTotal + hoursIncrement)
val result = locations
.map({ case (_, location, zipCode) => (location, zipCode) })
.join(tripsSummarized)
.map({case (location, (zipCode, hoursTotal)) => (location, zipCode, hoursTotal) })

如果需要无跳闸的位置,可以使用"leftOuterJoin"。

最新更新