我在Spark中有两个数据框架。我正在做df1.except(df2)
两个查找是否在两个数据框架之间发生了任何更改。
df1在这里
|001000900|aaaaa BELLOWS CORPORATION||N|
|001000905|ddddd DEPARTMENT OF LABOR AND EMPLOYMENT SECURITY|BUREAU OF COMPLIANCE|N|
|001001049|gggg RAVIOLI MFG CO INC|SPINELLI BKY RAVIOLI PASTRY SP|N|
|001001130|dddd ANGELES UNIFIED SCHOOL DISTRICT|TRANSPORTATION BRANCH|N|
|001001143|ffff MUSIC PARTIES, INC||N|
|001001155|BOSTON BRASS AND IRON CO||N|
|001001171|HANCOCK MARINE, INC.||N|
|001001184|TRILLION CORPORATION||N|
|001001192|HAWAII STATE CHIROPRACTIC ASSOCIATION INC||N|
|001001379|THE FRUIT SQUARE PEOPLE INC|L & M BAKERY|N|
|001001416|J & S MARKET||N|
df2如下
|001000145|PARADISE TAN||N|
|001000306|SHRUT & ASCH LEATHER COMPANY, INC.||N|
|001000355|HARRISON SPECIALTY CO., INC.||N|
|001000363|LOUIS M. GERSON CO., INC.||N|
|001000467|SAVE THE SEA TURTLES INTERNATIONAL|ADOPT THE BEACH HI|N|
|001000504|DIRIGO SPICE CORPORATION|CUNNINGHAM SPICE|N|
|001000744|FREEDMAN THREAD COMPANY|COLONIAL THREAD CO|N|
|001000756|AFFORDABLE AIR CONDITIONING|P R ENTERPRISE|N|
|001000900|CLIFLEX BELLOWS CORPORATION||N|
|001000905|FLORIDA DEPARTMENT OF LABOR AND EMPLOYMENT SECURITY|BUREAU OF COMPLIANCE|N|
|001001049|SPINELLI RAVIOLI MFG CO INC|SPINELLI BKY RAVIOLI PASTRY SP|N|
|001001130|LOS ANGELES UNIFIED SCHOOL DISTRICT|TRANSPORTATION BRANCH|N|
|001001143|TOSCO MUSIC PARTIES, INC||N|
|001001155|BOSTON BRASS AND IRON CO||N|
但是我想要的是,我必须根据一个列之间找到两个数据框架之间的差异。
我希望我的输出如下
|dunsnumber|filler1| businessname| tradestylename|registeredaddressindicator|
+----------+-------+--------------------+--------------------+--------------------------+
| 001001130| |dddd ANGELES UNIF...|TRANSPORTATION BR...| N|
| 001000900| |aaaaa BELLOWS COR...| | N|
| 001000905| |ddddd DEPARTMENT ...|BUREAU OF COMPLIANCE| N|
| 001001143| |ffff MUSIC PARTIE...| | N|
| 001001049| |gggg RAVIOLI MFG ...|SPINELLI BKY RAVI...| N|
+----------+-------+--------------------+--------------------+
这是我的代码
import org.apache.spark.sql.functions._
val textRdd1 = sc.textFile("/home/cloudera/TRF/PCFP/INCR")
val rowRdd1 = textRdd1.map(line => Row.fromSeq(line.split("\|", -1)))
var df1 = sqlContext.createDataFrame(rowRdd1, schema)
val textRdd2 = sc.textFile("/home/cloudera/TRF/PCFP/MAIN")
val rowRdd2 = textRdd2.map(line => Row.fromSeq(line.split("\|", -1)))
var df2 = sqlContext.createDataFrame(rowRdd2, schema)
val diffAnyColumnDF = df1.except(df2).where(df1.col("dunsnumber") ===
df2.col("dunsnumber")).show()
因此,如果我的主键'dunsnumber'匹配,则只能找到该主键是否更改的任何列。
我希望我的问题清楚。
dataFrame没有提取方法。您可以使用另一种方法。将数据转换为RDD,使用减去方法,返回您的数据框架。
嗨,所以这对我有用..
val diffAnyColumnDF = df1.except(df2)
val addDF= diffAnyColumnDF.join(df2, Seq("dunsnumber")).show()