当您连接两个具有相似列名的 DF 时:
df = df1.join(df2, df1['id'] == df2['id'])
联接工作正常,但无法调用id
列,因为它不明确,并且会出现以下异常:
pyspark.sql.utils.AnalysisException: "引用'id'是模棱两可的,可能是: id#5691, id#5918。"
这使得id
不再可用...
以下函数解决了这个问题:
def join(df1, df2, cond, how='left'):
df = df1.join(df2, cond, how=how)
repeated_columns = [c for c in df1.columns if c in df2.columns]
for col in repeated_columns:
df = df.drop(df2[col])
return df
我不喜欢的是我必须遍历列名并删除它们。这看起来真的很笨重...
您是否知道任何其他解决方案可以更优雅地联接和删除重复项或删除多列而不遍历每一列?
如果两个数据框中的连接列具有相同的名称,并且您只需要 equi 连接,则可以将连接列指定为列表,在这种情况下,结果将仅保留其中一个连接列:
df1.show()
+---+----+
| id|val1|
+---+----+
| 1| 2|
| 2| 3|
| 4| 4|
| 5| 5|
+---+----+
df2.show()
+---+----+
| id|val2|
+---+----+
| 1| 2|
| 1| 3|
| 2| 4|
| 3| 5|
+---+----+
df1.join(df2, ['id']).show()
+---+----+----+
| id|val1|val2|
+---+----+----+
| 1| 2| 2|
| 1| 2| 3|
| 2| 3| 4|
+---+----+----+
否则,您需要为连接数据框指定别名,并在稍后通过别名引用重复的列:
df1.alias("a").join(
df2.alias("b"), df1['id'] == df2['id']
).select("a.id", "a.val1", "b.val2").show()
+---+----+----+
| id|val1|val2|
+---+----+----+
| 1| 2| 2|
| 1| 2| 3|
| 2| 3| 4|
+---+----+----+
df.join(other, on, how)
当on
是列名字符串或列名字符串列表时,返回的数据帧将防止重复列。当on
是联接表达式时,将导致重复的列。我们可以使用.drop(df.a)
来删除重复的列。例:
cond = [df.a == other.a, df.b == other.bb, df.c == other.ccc]
# result will have duplicate column a
result = df.join(other, cond, 'inner').drop(df.a)
假设"a"是列为"id"的数据帧,而"b"是列为"id"的另一个数据帧
我使用以下两种方法来删除重复项:
方法 1:使用字符串连接表达式而不是布尔表达式。这会自动为您删除重复的列
a.join(b, 'id')
方法 2:在联接之前重命名列,并在联接之后删除列
b.withColumnRenamed('id', 'b_id')
joinexpr = a['id'] == b['b_id']
a.join(b, joinexpr).drop('b_id')
下面的代码适用于 Spark 1.6.0 及更高版本。
salespeople_df.show()
+---+------+-----+
|Num| Name|Store|
+---+------+-----+
| 1| Henry| 100|
| 2| Karen| 100|
| 3| Paul| 101|
| 4| Jimmy| 102|
| 5|Janice| 103|
+---+------+-----+
storeaddress_df.show()
+-----+--------------------+
|Store| Address|
+-----+--------------------+
| 100| 64 E Illinos Ave|
| 101| 74 Grand Pl|
| 102| 2298 Hwy 7|
| 103|No address available|
+-----+--------------------+
在此示例中,假设共享列的名称相同:
joined=salespeople_df.join(storeaddress_df, ['Store'])
joined.orderBy('Num', ascending=True).show()
+-----+---+------+--------------------+
|Store|Num| Name| Address|
+-----+---+------+--------------------+
| 100| 1| Henry| 64 E Illinos Ave|
| 100| 2| Karen| 64 E Illinos Ave|
| 101| 3| Paul| 74 Grand Pl|
| 102| 4| Jimmy| 2298 Hwy 7|
| 103| 5|Janice|No address available|
+-----+---+------+--------------------+
.join
将防止共享列重复。
假设您要删除此示例中的列Num
,则可以只使用.drop('colname')
joined=joined.drop('Num')
joined.show()
+-----+------+--------------------+
|Store| Name| Address|
+-----+------+--------------------+
| 103|Janice|No address available|
| 100| Henry| 64 E Illinos Ave|
| 100| Karen| 64 E Illinos Ave|
| 101| Paul| 74 Grand Pl|
| 102| Jimmy| 2298 Hwy 7|
+-----+------+--------------------+
将多个表连接在一起后,我通过一个简单的函数运行它们,以便在从左到右行走时遇到重复项时将列放在 DF 中。 或者,您也可以重命名这些列。
其中Names
是包含列['Id', 'Name', 'DateId', 'Description']
的表,Dates
是具有['Id', 'Date', 'Description']
列的表,列Id
和Description
在连接后将被复制。
Names = sparkSession.sql("SELECT * FROM Names")
Dates = sparkSession.sql("SELECT * FROM Dates")
NamesAndDates = Names.join(Dates, Names.DateId == Dates.Id, "inner")
NamesAndDates = dropDupeDfCols(NamesAndDates)
NamesAndDates.saveAsTable("...", format="parquet", mode="overwrite", path="...")
其中dropDupeDfCols
定义为:
def dropDupeDfCols(df):
newcols = []
dupcols = []
for i in range(len(df.columns)):
if df.columns[i] not in newcols:
newcols.append(df.columns[i])
else:
dupcols.append(i)
df = df.toDF(*[str(i) for i in range(len(df.columns))])
for dupcol in dupcols:
df = df.drop(str(dupcol))
return df.toDF(*newcols)
生成的数据框将包含列['Id', 'Name', 'DateId', 'Description', 'Date']
。
删除重复列的简单解决方案
final_result=df1.join(df2,(df1['subjectid']==df2['subjectid']),"left").drop(df1['subjectid'])
就我而言,我有一个数据帧,在联接后有多个重复列,我试图以 csv 格式与该数据帧相同,但由于列重复,我遇到了错误。我按照以下步骤删除了重复的列。代码在 scala 中
1) Rename all the duplicate columns and make new dataframe
2) make separate list for all the renamed columns
3) Make new dataframe with all columns (including renamed - step 1)
4) drop all the renamed column
private def removeDuplicateColumns(dataFrame:DataFrame): DataFrame = {
var allColumns: mutable.MutableList[String] = mutable.MutableList()
val dup_Columns: mutable.MutableList[String] = mutable.MutableList()
dataFrame.columns.foreach((i: String) =>{
if(allColumns.contains(i))
if(allColumns.contains(i))
{allColumns += "dup_" + i
dup_Columns += "dup_" +i
}else{
allColumns += i
}println(i)
})
val columnSeq = allColumns.toSeq
val df = dataFrame.toDF(columnSeq:_*)
val unDF = df.drop(dup_Columns:_*)
unDF
}
to call the above function use below code and pass your dataframe which contains duplicate columns
val uniColDF = removeDuplicateColumns(df)
如果您加入列表或字符串,则会自动删除 dup cols]1这是一个scala解决方案,您可以将相同的想法翻译成任何语言
// get a list of duplicate columns or use a list/seq
// of columns you would like to join on (note that this list
// should include columns for which you do not want duplicates)
val duplicateCols = df1.columns.intersect(df2.columns)
// no duplicate columns in resulting DF
df1.join(df2, duplicateCols.distinct.toSet)
Spark SQL 版本的答案:
df1.createOrReplaceTempView("t1")
df2.createOrReplaceTempView("t2")
spark.sql("select * from t1 inner join t2 using (id)").show()
# +---+----+----+
# | id|val1|val2|
# +---+----+----+
# | 1| 2| 2|
# | 1| 2| 3|
# | 2| 3| 4|
# +---+----+----+
当多列用于连接并且需要删除多个非字符串类型的列时,这对我有用。
final_data = mdf1.alias("a").join(df3.alias("b")
(mdf1.unique_product_id==df3.unique_product_id) &
(mdf1.year_week==df3.year_week) ,"left" ).select("a.*","b.promotion_id")
给出 a.* 从一个表中选择所有列,从另一个表中选择特定列。