我有两个文件/表,如下所示
File1:
101,10,20
102,30,40
103,50,60
和
File2:
101,10,20
104,70,80
103,50,55
在比较两个文件后,我需要创建新文件为:
File3:
102,30,40,D
104,70,80,I
103,50,55,U
其中D
为"已删除",I
为"已插入",U
为"已更新"。
我尝试过RDD subtract
和SparkSQL,但在Spark 1.x中对子查询有限制。
我想你正在寻找类似下面的东西。 这里我有两个数据帧 df1 和 df2。 DF1 是具有键列 A1 的主数据集,它正在与具有键列 B1 的辅助数据集 DF2 进行比较。 因此,如果键的字段 A2,A3 和 B2,B3 相同,则忽略这些记录。
- 如果键存在于 df1 中而不是 df2 中,则记录将标记为 D。
- 如果键存在于 df2 中而不是 df1 中,则记录将标记为 I。
- 如果键同时存在于 DF1 和 DF2 中,但值字段不同,则记录将标记为 U。
下面是代码片段。
from pyspark.sql.functions import udf
from pyspark.sql.types import *
from pyspark.sql import functions as F
input1 = [[101,10,20], [102,30,40], [103,50,60]]
input2 = [[101,10,20], [104,70,80], [103,50,55]]
df1 = sc.parallelize(input1).toDF(schema= StructType([StructField("a1", IntegerType(), False),StructField("a2", IntegerType(), False),StructField("a3", IntegerType(), False)]))
df2 = sc.parallelize(input2).toDF(schema=StructType([StructField("b1", IntegerType(), False),StructField("b2", IntegerType(), False),StructField("b3", IntegerType(), False)]))
joindf = df1.join(df2, [df1.a1 == df2.b1], 'outer').filter(((df1.a2 != df2.b2) | (df1.a3 != df2.b3)) | df1.a1.isNull() | df2.b1.isNull())
def check_row(a1, b1):
if not a1:
return 'D'
elif not b1:
return 'I'
else:
return 'U'
flagger = udf(check_row)
joindf.withColumn("flag", flagger(joindf.a1, joindf.b1)).select(F.when(joindf.a1.isNull(), joindf.b1).otherwise(joindf.a1).alias('a1'),F.when(joindf.a2.isNull(), joindf.b2).otherwise(joindf.a2).alias('a2'),F.when(joindf.a3.isNull(), joindf.b3).otherwise(joindf.a3).alias('a3'),'flag').show()
+---+---+---+----+
| a1| a2| a3|flag|
+---+---+---+----+
|103| 50| 60| U|
|102| 30| 40| I|
|104| 70| 80| D|
+---+---+---+----+
或者,如果您更喜欢Spark-SQL,请使用以下代码片段。
sqlContext.registerDataFrameAsTable(df1, 'df1')
sqlContext.registerDataFrameAsTable(df2, 'df2')
sqlContext.sql("""
SELECT
CASE WHEN a1 IS NULL THEN b1 ELSE a1 END as c1,
CASE WHEN a2 IS NULL THEN b2 ELSE a1 END as c2,
CASE WHEN a3 IS NULL THEN b3 ELSE a1 END as c3,
CASE
WHEN a1 IS NULL THEN 'I'
WHEN b1 is NULL THEN 'D'
ELSE 'U' END as flag
FROM df1 FULL OUTER JOIN df2 ON df1.a1 = df2.b1
WHERE (df1.a2 <> df2.b2 or df1.a3 <> df2.b3) or (df1.a1 is null) or (df2.b1 is null)
""").show()
+---+---+---+----+
| c1| c2| c3|flag|
+---+---+---+----+
|103|103|103| U|
|102|102|102| D|
|104| 70| 80| I|
+---+---+---+----+
一个可能的想法是使用 keyBy 函数按您的偏置键对两个 RDD 进行分组,然后应用不同的操作来计算 D、I、U。
- D :使用减法键函数计算文件1中的元素,而不是文件2中的元素;
- I :相同的减去键函数来计算文件2中的元素而不是文件1中的元素;
- U :使用连接函数计算文件1和文件2之间的通信元素(键!
请记住,连接函数将与 commun 中的关键元素一起运行。所以在你的例子中(101,10,20)也将是连接的结果。您必须筛选此结果才能仅获取包含更改的键。
我认为我们可能需要稍微更改 spark sql 的代码以包含更新条件。
sqlContext.sql("""
SELECT
CASE when a1 IS NULL then b1 ELSE a1 END as c1,
CASE when a2 IS NULL then b2
when a1 = b1 then b2
else a2 END as c2,
CASE when a3 IS NULL then b3
when a1 = b1 then b3
else a3 END as c3,
CASE
when a1 IS NULL then 'I'
when b1 is NULL then 'D'
ELSE 'U' END as flag
FROM df1 FULL OUTER JOIN df2 ON df1.a1 = df2.b1
WHERE (df1.a2 <> df2.b2 or df1.a3 <> df2.b3) or (df1.a1 is null) or (df2.b1 is null)
""").show()