在Spark-SQL或PySpark的帮助下使用两个表创建第三个表(不使用Panda(Python))



我正在尝试在Spark-sql或PySpark的帮助下使用两个表创建第三个表(不使用Panda(Python((

数据帧一:

+---------+---------+------------+-----------+
|  NAME   | NAME_ID |   CLIENT   | CLIENT_ID |
+---------+---------+------------+-----------+
| RISHABH |       1 | SINGH      |         5 |
| RISHABH |       1 | PATHAK     |         3 |
| RISHABH |       1 | KUMAR      |         2 |
| KEDAR   |       2 | PATHAK     |         3 |
| KEDAR   |       2 | JADHAV     |         1 |
| ANKIT   |       3 | SRIVASTAVA |         6 |
| ANKIT   |       3 | KUMAR      |         2 |
| SUMIT   |       4 | SINGH      |         5 |
| SUMIT   |       4 | SHARMA     |         4 |
+---------+---------+------------+-----------+

数据帧二:

| NAME      | NAME_ID   | CLIENT        | CLIENT_ID     |
| RISHBAH   | _____     | SRIVASTAVA    | _____         |
| KEDAR     | _____     | KUMAR         | _____         |
| RISHABH   | _____     | SINGH         | _____         |
| KEDAR     | _____     | PATHAK        | _____         |

###Require Dataframe Output:###
+---------+---------+------------+-----------+
| NAME    | NAME_ID | CLIENT     | CLIENT_ID |

| RISHBAH | 1       | SRIVASTAVA | 6         |

| KEDAR   | 2       | KUMAR      | 2         |
| RISHABH | 1       | SINGH      | 5         |

| KEDAR   | 2       | PATHAK     | 3         |

使用 Spark-SQL 或 Spark。

尝试过df1.join(df2,df1.NAME == df2.NAME,"left")

但我没有得到所需的输出。

我建议使用以下 spark-sql 方法

val df1 = <assuming data loaded>
val df2 = <assuming data loaded>
//createviews on top of dataframe
df1.createOrReplaceTempView("tbl1")
df1.createOrReplaceTempView("tbl2")
//extract the unique names and nameIds from the first df
uniqueNameDF=sparkSession.sql("select distict name,name_Id from tbl1")
//extract the unique client names and clientIds 
uniqueClientDF=sparkSession.sql("select distict client,client_Id from tbl1")
//create Views on these temporary results
uniqueNameDF.createOrReplaceTempView("name")
uniqueClientDF.createOrReplaceTempView("client")
//join the above views with df2 to get the desired result
resultDF=sparkSession.sql("select n.name,n.name_id,c.client,c.client_id from tbl2 join name n on tbl2.name=n.name join client c on tbl2.client=c.client")
# FROM DATAFRAME ONE AS df_with_key
# SPLIT OUT DISTINCT BY NAME AND CLIENT
nameDF=df_with_key.select("NAME","NAME_ID").distinct()
clientDF=df_with_key.select("CLIENT","CLIENT_ID").distinct()
# DATAFRAME TWO AS df_with_client
+-------+-------+----------+---------+   
|   NAME|NAME_ID|    CLIENT|CLIENT_ID|
+-------+-------+----------+---------+
|  KEDAR|   null|     KUMAR|     null|  
|  KEDAR|   null|    PATHAK|     null|   
|RISHABH|   null|     SINGH|     null|    
|RISHBAH|   null|SRIVASTAVA|     null|   
+-------+-------+----------+---------+
# NOW JOIN FIRST WITH NAME AND THEN CLIENT
df_with_client.drop("NAME_ID").join(nameDF,nameDF.NAME==df_with_client.NAME,"LEFT").drop(nameDF.NAME).drop("CLIENT_ID").join(clientDF,df_with_client.CLIENT==clientDF.CLIENT).drop(clientDF.CLIENT).select("NAME","NAME_ID","CLIENT","CLIENT_ID").show()
+-------+-------+----------+---------+
|   NAME|NAME_ID|    CLIENT|CLIENT_ID|
+-------+-------+----------+---------+
|  KEDAR|      2|     KUMAR|        2|
|  KEDAR|      2|    PATHAK|        3|
|RISHABH|      1|     SINGH|        5|
|RISHBAH|      1|SRIVASTAVA|        6|
+-------+-------+----------+---------+

最新更新