在Scala中组合两个RDD



第一个RDDuser_person,是一个Hive表,记录每个人的信息:

+---------+---+----+
|person_id|age| bmi|
+---------+---+----+
|     -100|  1|null|
|        3|  4|null|
...

下面是我的第二个RDD,一个只有40行并且只包括基本信息的Hive表:

| id|startage|endage|energy|    
|  1|       0|   0.2|     1| 
|  1|       2|    10|     3| 
|  1|      10|    20|     5| 

我想根据每一行的年龄范围来计算每个人的能量需求。

例如,一个人的年龄是4岁,因此需要3种能量。我想将该信息添加到RDDuser_person中。

我该怎么做?

首先,使用enableHiveSupport()初始化spark会话,并将Hive配置文件(Hive-site.xml、core-site.xml和hdfs-site.xml(复制到spark/conf/目录,以使spark能够从Hive中读取。

val sparkSession = SparkSession.builder()
.appName("spark-scala-read-and-write-from-hive")
.config("hive.metastore.warehouse.dir", params.hiveHost + "user/hive/warehouse")
.enableHiveSupport()
.getOrCreate()

将配置单元表作为数据帧读取,如下所示:

val personDF= spark.sql("SELECT * from user_person")
val infoDF = spark.sql("SELECT * from person_info")

使用以下表达式连接这两个数据帧:

val outputDF = personDF.join(infoDF, $"age" >= $"startage" && $"age" < $"endage")

outputDF数据帧包含输入数据帧的所有列。

最新更新