我有两个dataframes,我想知道它们是否基于列作为密钥不同,否则会更新一个与众
val TMP_SITE = spark.load("jdbc", Map("url" -> "jdbc:oracle:thin:System/maher@//localhost:1521/XE", "dbtable" -> "IPTECH.TMP_SITE"))
.withColumn("SITE",'SITE.cast(LongType))
val local_pos = spark.load("jdbc", Map("url" -> url, "dbtable" -> "pos")).select("id","name")
TMP_SITE.printSchema()
local_pos.printSchema()
val join = TMP_SITE.join(local_pos, 'SITE === 'id, "inner")
root
|-- SITE: long (nullable = true)
|-- LIBELLE: string (nullable = false)
root
|-- id: long (nullable = false)
|-- name: string (nullable = true)
加入的结果是
|id |name |SITE|LIBELLE |
+---+----------------------+----+----------------------+
|51 |Ezzahra |51 |Ezzahra |
|7 |BENIKHALLED |7 |BENIKHALLED |
|15 |Kram |15 |Kram |
|54 |El Mourouj |54 |El Mourouj |
|11 |LE BARDO |11 |LE BARDO |
|29 |Mini M Ksar said |29 |Mini M Ksar said |
|69 |ZAGHOUAN |69 |ZAGHOUAN |
|42 |BEB EL KHADHRA |42 |BEB EL KHADHRA |
|73 |Zaouit Kontech |73 |Zaouit Kontech |
|87 |Aouina |87 |Aouina |
|64 |Sousse I I |64 |Sousse I I |
|3 |SAHRA CONFORT : KORBA |3 |SAHRA CONFORT : KORBA |
|34 |SOUKRA SQUARE |34 |SOUKRA SQUARE |
|59 |SAHRA CONFORT : ZARZIS|59 |SAHRA CONFORT : ZARZIS|
|8 |Jerba |8 |Jerba |
|22 |Moknine |22 |Moknine |
|28 |RDAYEF |28 |RDAYEF |
|85 |MONASTIR ABSORBA |85 |MONASTIR ABSORBA |
|16 |BARDO HANAYA |16 |BARDO HANAYA |
|35 |Mini M Agba |35 |Mini M Agba |
+---+----------------------+----+----------------------+
我做了这个
val temp = join.withColumn("changes", when($"LIBELLE" === $"name", lit("nothing")).otherwise("need an update"))
我得到了这个
|id |name |SITE|LIBELLE |changes |
+---+----------------------+----+----------------------+--------------+
|51 |Ezzahra |51 |Ezzahra |nothing |
|7 |BENIKHALLED |7 |BENIKHALLED |nothing |
|15 |Kram |15 |Kram |nothing |
|54 |El Mourouj |54 |El Mourouj |nothing |
|11 |LE BARDO |11 |LE BARDO |nothing |
|29 |Mini M Ksar said |29 |Mini M Ksar said |nothing |
|69 |ZAGHOUAN |69 |ZAGHOUAN |nothing |
|42 |BEB EL KHADHRA |42 |BEB EL KHADHRA |nothing |
|73 |Zaouit Kontech |73 |Zaouit Kontech |need an update|
|87 |Aouina |87 |Aouina |nothing |
|64 |Sousse I I |64 |Sousse I I |nothing |
|3 |SAHRA CONFORT : KORBA |3 |SAHRA CONFORT : KORBA |nothing |
|34 |SOUKRA SQUARE |34 |SOUKRA SQUARE |nothing |
|59 |SAHRA CONFORT : ZARZIS|59 |SAHRA CONFORT : ZARZIS|nothing |
|8 |Jerba |8 |Jerba |nothing |
|22 |Moknine |22 |Moknine |need an update|
|28 |RDAYEF |28 |RDAYEF |nothing |
|85 |MONASTIR ABSORBA |85 |MONASTIR ABSORBA |nothing |
|16 |BARDO HANAYA |16 |BARDO HANAYA |nothing |
|35 |Mini M Agba |35 |Mini M Agba |nothing |
+---+----------------------+----+----------------------+--------------+
我没有说为什么需要更新,因为它们是相同的。尽管它不应该对所有人说什么,因为它们是平等的
拥有dataframe
后,使用columns
和rows
。
因此,在join
dataframe
+----+---------------------+----+---------------------+
|SITE|LIBELLE |id |name |
+----+---------------------+----+---------------------+
|48 |Mini M Boumhel |48 |Mini M Boumhel |
|67 |Lac |67 |Lac |
|992 |test2 |992 |test |
|44 |KAIROUAN |44 |KAIROUAN |
|61 |Tunis |61 |Tunis |
|9001|MONOPRIX |9001|MONOPRIX |
|3 |SAHRA CONFORT : KORBA|3 |SAHRA CONFORT : KORBA|
|37 |Mini M Borj Lozir |37 |Mini M Borj Lozir |
|83 |Jendouba |83 |Jendouba |
|12 |Bigro |12 |Bigro |
+----+---------------------+----+---------------------+
您可以使用所编写的逻辑创建另一列,但通过使用when
函数为
import org.apache.spark.sql.functions._
val temp = join.withColumn("changes", when($"LIBELLE" === $"name", lit("nothing")).otherwise("need an update"))
temp
dataframe
将是
+----+---------------------+----+---------------------+--------------+
|SITE|LIBELLE |id |name |changes |
+----+---------------------+----+---------------------+--------------+
|48 |Mini M Boumhel |48 |Mini M Boumhel |nothing |
|67 |Lac |67 |Lac |nothing |
|992 |test2 |992 |test |need an update|
|44 |KAIROUAN |44 |KAIROUAN |nothing |
|61 |Tunis |61 |Tunis |nothing |
|9001|MONOPRIX |9001|MONOPRIX |nothing |
|3 |SAHRA CONFORT : KORBA|3 |SAHRA CONFORT : KORBA|nothing |
|37 |Mini M Borj Lozir |37 |Mini M Borj Lozir |nothing |
|83 |Jendouba |83 |Jendouba |nothing |
|12 |Bigro |12 |Bigro |nothing |
+----+---------------------+----+---------------------+--------------+
现在,您可以在 dataframe
上使用 filter
方法
temp.filter($"changes" === "need an update").show(false)
应该给你
+----+-------+---+----+--------------+
|SITE|LIBELLE|id |name|changes |
+----+-------+---+----+--------------+
|992 |test2 |992|test|need an update|
+----+-------+---+----+--------------+
您只需要使用select
,groupBy
,aggregations
,filters
和其他内置功能或使用udf
功能等。您甚至可以像示例一样转换为rdd
和tuples
。p>