如何避免联接后出现重复列



>我有两个数据帧,其中包含以下列:

df1.columns
//  Array(ts, id, X1, X2)

df2.columns
//  Array(ts, id, Y1, Y2)

在我做之后

val df_combined = df1.join(df2, Seq(ts,id))

我最终得到以下列:Array(ts, id, X1, X2, ts, id, Y1, Y2) .我可以预期公共列将被删除。还有什么需要做的吗?

简单的答案(来自有关此问题的 Databricks 常见问题解答)是执行连接,其中连接的列表示为字符串数组(或一个字符串)而不是谓词。

下面是一个改编自 Databricks FAQ 的示例,但有两个连接列,以回答原始海报的问题。

下面是左侧数据帧:

val llist = Seq(("bob", "b", "2015-01-13", 4), ("alice", "a", "2015-04-23",10))
val left = llist.toDF("firstname","lastname","date","duration")
left.show()
/*
+---------+--------+----------+--------+
|firstname|lastname|      date|duration|
+---------+--------+----------+--------+
|      bob|       b|2015-01-13|       4|
|    alice|       a|2015-04-23|      10|
+---------+--------+----------+--------+
*/

下面是正确的数据帧:

val right = Seq(("alice", "a", 100),("bob", "b", 23)).toDF("firstname","lastname","upload")
right.show()
/*
+---------+--------+------+
|firstname|lastname|upload|
+---------+--------+------+
|    alice|       a|   100|
|      bob|       b|    23|
+---------+--------+------+
*/

这是一个不正确的解决方案,其中连接列被定义为谓词left("firstname")===right("firstname") && left("lastname")===right("lastname")

不正确的结果是firstname列和lastname列在连接的数据框中重复:

left.join(right, left("firstname")===right("firstname") &&
                 left("lastname")===right("lastname")).show
/*
+---------+--------+----------+--------+---------+--------+------+
|firstname|lastname|      date|duration|firstname|lastname|upload|
+---------+--------+----------+--------+---------+--------+------+
|      bob|       b|2015-01-13|       4|      bob|       b|    23|
|    alice|       a|2015-04-23|      10|    alice|       a|   100|
+---------+--------+----------+--------+---------+--------+------+
*/

正确的解决方案是将联接列定义为字符串数组Seq("firstname", "lastname")。输出数据框没有重复的列:

left.join(right, Seq("firstname", "lastname")).show
/*
+---------+--------+----------+--------+------+
|firstname|lastname|      date|duration|upload|
+---------+--------+----------+--------+------+
|      bob|       b|2015-01-13|       4|    23|
|    alice|       a|2015-04-23|      10|   100|
+---------+--------+----------+--------+------+
*/

这是预期的行为。 DataFrame.join方法等效于像这样的 SQL 联接

SELECT * FROM a JOIN b ON joinExprs

如果要忽略重复的列,只需删除它们或之后选择感兴趣的列即可。如果要消除歧义,可以使用父DataFrames访问这些:

val a: DataFrame = ???
val b: DataFrame = ???
val joinExprs: Column = ???
a.join(b, joinExprs).select(a("id"), b("foo"))
// drop equivalent 
a.alias("a").join(b.alias("b"), joinExprs).drop(b("id")).drop(a("foo"))

或使用别名:

// As for now aliases don't work with drop
a.alias("a").join(b.alias("b"), joinExprs).select($"a.id", $"b.foo")

对于等连接,存在一种特殊的快捷语法,该语法采用字符串序列:

val usingColumns: Seq[String] = ???
a.join(b, usingColumns)

或作为单个字符串

val usingColumn: String = ???
a.join(b, usingColumn)

仅保留连接条件中使用的列的一个副本。

我已经坚持了一段时间,直到最近我才想出了一个非常简单的解决方案。

说一个是

scala> val a  = Seq(("a", 1), ("b", 2)).toDF("key", "vala")
a: org.apache.spark.sql.DataFrame = [key: string, vala: int]
scala> a.show
+---+----+
|key|vala|
+---+----+
|  a|   1|
|  b|   2|
+---+----+
and 
scala> val b  = Seq(("a", 1)).toDF("key", "valb")
b: org.apache.spark.sql.DataFrame = [key: string, valb: int]
scala> b.show
+---+----+
|key|valb|
+---+----+
|  a|   1|
+---+----+

我可以这样做来仅选择数据帧 a 中的值:

scala> a.join(b, a("key") === b("key"), "left").select(a.columns.map(a(_)) : _*).show
+---+----+
|key|vala|
+---+----+
|  a|   1|
|  b|   2|
+---+----+
你可以

简单地使用它

df1.join(df2, Seq("ts","id"),"TYPE-OF-JOIN")

这里的连接类型可以是

  • 全外

例如,我有两个这样的数据帧:

// df1
word   count1
w1     10   
w2     15  
w3     20
// df2
word   count2
w1     100   
w2     150  
w5     200

如果你做完全外部连接,那么结果看起来像这样

df1.join(df2, Seq("word"),"fullouter").show()
word   count1  count2
w1     10      100
w2     15      150
w3     20      null
w5     null    200

试试这个,

val df_combined = df1.join(df2, df1("ts") === df2("ts") && df1("id") === df2("id")).drop(df2("ts")).drop(df2("id"))

这是SQL的正常行为,我为此所做的是:

  • 删除或重命名源列
  • 执行联接
  • 删除重命名的列(如果有)

在这里,我正在替换"全名"列:

Java中的一些代码:

this
    .sqlContext
    .read()
    .parquet(String.format("hdfs:///user/blablacar/data/year=%d/month=%d/day=%d", year, month, day))
    .drop("fullname")
    .registerTempTable("data_original");
this
    .sqlContext
    .read()
    .parquet(String.format("hdfs:///user/blablacar/data_v2/year=%d/month=%d/day=%d", year, month, day))
    .registerTempTable("data_v2");
 this
    .sqlContext
    .sql(etlQuery)
    .repartition(1)
    .write()
    .mode(SaveMode.Overwrite)
    .parquet(outputPath);

查询位置:

SELECT
    d.*,
   concat_ws('_', product_name, product_module, name) AS fullname
FROM
    {table_source} d
LEFT OUTER JOIN
    {table_updates} u ON u.id = d.id

我相信这是您只能使用 Spark 做的事情(从列表中删除列),非常非常有帮助!

如果有人正在使用 spark-SQL 并希望实现相同的目标,那么您可以在 join query 中使用USING子句。

val spark = SparkSession.builder().master("local[*]").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
import spark.implicits._
val df1 = List((1, 4, 3), (5, 2, 4), (7, 4, 5)).toDF("c1", "c2", "C3")
val df2 = List((1, 4, 3), (5, 2, 4), (7, 4, 10)).toDF("c1", "c2", "C4")
df1.createOrReplaceTempView("table1")
df2.createOrReplaceTempView("table2")
spark.sql("select * from table1  inner join  table2  using (c1, c2)").show(false)
/*
+---+---+---+---+
|c1 |c2 |C3 |C4 |
+---+---+---+---+
|1  |4  |3  |3  |
|5  |2  |4  |4  |
|7  |4  |5  |10 |
+---+---+---+---+
*/

内部连接是Spark中的默认连接,下面是它的简单语法。

leftDF.join(rightDF,"Common Col Nam")

对于其他联接,您可以遵循以下语法

leftDF.join(rightDF,Seq("Common Columns comma seperated","join type")

如果列名称不常见,则

leftDF.join(rightDF,leftDF.col("x")===rightDF.col("y),"join type")
<</div> div class="one_answers">

最佳做法是在连接两个 DF 之前使列名不同,并相应地删除它们。

df1.columns =[id, age, income]
df2.column=[id, age_group]
df1.join(df2, on=df1.id== df2.id,how='inner').write.saveAsTable('table_name')

将返回错误,而重复列的错误

试试这个

,试试这个:

df2_id_renamed = df2.withColumnRenamed('id','id_2')
df1.join(df2_id_renamed, on=df1.id== df2_id_renamed.id_2,how='inner').drop('id_2')

将多个表连接在一起后,我通过一个简单的函数运行它们,以便在 DF 遇到重复项时重命名列。 或者,您也可以删除这些重复的列。

其中Names是包含['Id', 'Name', 'DateId', 'Description']列的表,Dates是包含['Id', 'Date', 'Description']列的表,则Id列和Description列在联接后将被复制。

Names = sparkSession.sql("SELECT * FROM Names")
Dates = sparkSession.sql("SELECT * FROM Dates")
NamesAndDates = Names.join(Dates, Names.DateId == Dates.Id, "inner")
NamesAndDates = deDupeDfCols(NamesAndDates, '_')
NamesAndDates.saveAsTable("...", format="parquet", mode="overwrite", path="...")

其中deDupeDfCols定义为:

def deDupeDfCols(df, separator=''):
    newcols = []
    for col in df.columns:
        if col not in newcols:
            newcols.append(col)
        else:
            for i in range(2, 1000):
                if (col + separator + str(i)) not in newcols:
                    newcols.append(col + separator + str(i))
                    break
    return df.toDF(*newcols)

生成的数据框将包含列['Id', 'Name', 'DateId', 'Description', 'Id2', 'Date', 'Description2']

抱歉,这个答案是用 Python 写的 - 我不熟悉 Scala,但这是我在谷歌上搜索这个问题时出现的问题,我确信 Scala 代码并没有太大的不同。

相关内容

  • 没有找到相关文章

最新更新