>我有两个数据帧,其中包含以下列:
df1.columns
// Array(ts, id, X1, X2)
和
df2.columns
// Array(ts, id, Y1, Y2)
在我做之后
val df_combined = df1.join(df2, Seq(ts,id))
我最终得到以下列:Array(ts, id, X1, X2, ts, id, Y1, Y2)
.我可以预期公共列将被删除。还有什么需要做的吗?
简单的答案(来自有关此问题的 Databricks 常见问题解答)是执行连接,其中连接的列表示为字符串数组(或一个字符串)而不是谓词。
下面是一个改编自 Databricks FAQ 的示例,但有两个连接列,以回答原始海报的问题。
下面是左侧数据帧:
val llist = Seq(("bob", "b", "2015-01-13", 4), ("alice", "a", "2015-04-23",10))
val left = llist.toDF("firstname","lastname","date","duration")
left.show()
/*
+---------+--------+----------+--------+
|firstname|lastname| date|duration|
+---------+--------+----------+--------+
| bob| b|2015-01-13| 4|
| alice| a|2015-04-23| 10|
+---------+--------+----------+--------+
*/
下面是正确的数据帧:
val right = Seq(("alice", "a", 100),("bob", "b", 23)).toDF("firstname","lastname","upload")
right.show()
/*
+---------+--------+------+
|firstname|lastname|upload|
+---------+--------+------+
| alice| a| 100|
| bob| b| 23|
+---------+--------+------+
*/
这是一个不正确的解决方案,其中连接列被定义为谓词left("firstname")===right("firstname") && left("lastname")===right("lastname")
。
不正确的结果是firstname
列和lastname
列在连接的数据框中重复:
left.join(right, left("firstname")===right("firstname") &&
left("lastname")===right("lastname")).show
/*
+---------+--------+----------+--------+---------+--------+------+
|firstname|lastname| date|duration|firstname|lastname|upload|
+---------+--------+----------+--------+---------+--------+------+
| bob| b|2015-01-13| 4| bob| b| 23|
| alice| a|2015-04-23| 10| alice| a| 100|
+---------+--------+----------+--------+---------+--------+------+
*/
正确的解决方案是将联接列定义为字符串数组Seq("firstname", "lastname")
。输出数据框没有重复的列:
left.join(right, Seq("firstname", "lastname")).show
/*
+---------+--------+----------+--------+------+
|firstname|lastname| date|duration|upload|
+---------+--------+----------+--------+------+
| bob| b|2015-01-13| 4| 23|
| alice| a|2015-04-23| 10| 100|
+---------+--------+----------+--------+------+
*/
这是预期的行为。 DataFrame.join
方法等效于像这样的 SQL 联接
SELECT * FROM a JOIN b ON joinExprs
如果要忽略重复的列,只需删除它们或之后选择感兴趣的列即可。如果要消除歧义,可以使用父DataFrames
访问这些:
val a: DataFrame = ???
val b: DataFrame = ???
val joinExprs: Column = ???
a.join(b, joinExprs).select(a("id"), b("foo"))
// drop equivalent
a.alias("a").join(b.alias("b"), joinExprs).drop(b("id")).drop(a("foo"))
或使用别名:
// As for now aliases don't work with drop
a.alias("a").join(b.alias("b"), joinExprs).select($"a.id", $"b.foo")
对于等连接,存在一种特殊的快捷语法,该语法采用字符串序列:
val usingColumns: Seq[String] = ???
a.join(b, usingColumns)
或作为单个字符串
val usingColumn: String = ???
a.join(b, usingColumn)
仅保留连接条件中使用的列的一个副本。
我已经坚持了一段时间,直到最近我才想出了一个非常简单的解决方案。
说一个是
scala> val a = Seq(("a", 1), ("b", 2)).toDF("key", "vala")
a: org.apache.spark.sql.DataFrame = [key: string, vala: int]
scala> a.show
+---+----+
|key|vala|
+---+----+
| a| 1|
| b| 2|
+---+----+
and
scala> val b = Seq(("a", 1)).toDF("key", "valb")
b: org.apache.spark.sql.DataFrame = [key: string, valb: int]
scala> b.show
+---+----+
|key|valb|
+---+----+
| a| 1|
+---+----+
我可以这样做来仅选择数据帧 a 中的值:
scala> a.join(b, a("key") === b("key"), "left").select(a.columns.map(a(_)) : _*).show
+---+----+
|key|vala|
+---+----+
| a| 1|
| b| 2|
+---+----+
简单地使用它
df1.join(df2, Seq("ts","id"),"TYPE-OF-JOIN")
这里的连接类型可以是
- 左
- 右
- 内
- 全外
例如,我有两个这样的数据帧:
// df1
word count1
w1 10
w2 15
w3 20
// df2
word count2
w1 100
w2 150
w5 200
如果你做完全外部连接,那么结果看起来像这样
df1.join(df2, Seq("word"),"fullouter").show()
word count1 count2
w1 10 100
w2 15 150
w3 20 null
w5 null 200
试试这个,
val df_combined = df1.join(df2, df1("ts") === df2("ts") && df1("id") === df2("id")).drop(df2("ts")).drop(df2("id"))
这是SQL的正常行为,我为此所做的是:
- 删除或重命名源列
- 执行联接
- 删除重命名的列(如果有)
在这里,我正在替换"全名"列:
Java中的一些代码:
this
.sqlContext
.read()
.parquet(String.format("hdfs:///user/blablacar/data/year=%d/month=%d/day=%d", year, month, day))
.drop("fullname")
.registerTempTable("data_original");
this
.sqlContext
.read()
.parquet(String.format("hdfs:///user/blablacar/data_v2/year=%d/month=%d/day=%d", year, month, day))
.registerTempTable("data_v2");
this
.sqlContext
.sql(etlQuery)
.repartition(1)
.write()
.mode(SaveMode.Overwrite)
.parquet(outputPath);
查询位置:
SELECT
d.*,
concat_ws('_', product_name, product_module, name) AS fullname
FROM
{table_source} d
LEFT OUTER JOIN
{table_updates} u ON u.id = d.id
我相信这是您只能使用 Spark 做的事情(从列表中删除列),非常非常有帮助!
如果有人正在使用 spark-SQL 并希望实现相同的目标,那么您可以在 join query 中使用USING
子句。
val spark = SparkSession.builder().master("local[*]").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
import spark.implicits._
val df1 = List((1, 4, 3), (5, 2, 4), (7, 4, 5)).toDF("c1", "c2", "C3")
val df2 = List((1, 4, 3), (5, 2, 4), (7, 4, 10)).toDF("c1", "c2", "C4")
df1.createOrReplaceTempView("table1")
df2.createOrReplaceTempView("table2")
spark.sql("select * from table1 inner join table2 using (c1, c2)").show(false)
/*
+---+---+---+---+
|c1 |c2 |C3 |C4 |
+---+---+---+---+
|1 |4 |3 |3 |
|5 |2 |4 |4 |
|7 |4 |5 |10 |
+---+---+---+---+
*/
内部连接是Spark中的默认连接,下面是它的简单语法。
leftDF.join(rightDF,"Common Col Nam")
对于其他联接,您可以遵循以下语法
leftDF.join(rightDF,Seq("Common Columns comma seperated","join type")
如果列名称不常见,则
leftDF.join(rightDF,leftDF.col("x")===rightDF.col("y),"join type")
<</div>
div class="one_answers"> 最佳做法是在连接两个 DF 之前使列名不同,并相应地删除它们。
df1.columns =[id, age, income]
df2.column=[id, age_group]
df1.join(df2, on=df1.id== df2.id,how='inner').write.saveAsTable('table_name')
将返回错误,而重复列的错误
试试这个,试试这个:
df2_id_renamed = df2.withColumnRenamed('id','id_2')
df1.join(df2_id_renamed, on=df1.id== df2_id_renamed.id_2,how='inner').drop('id_2')
将多个表连接在一起后,我通过一个简单的函数运行它们,以便在 DF 遇到重复项时重命名列。 或者,您也可以删除这些重复的列。
其中Names
是包含['Id', 'Name', 'DateId', 'Description']
列的表,Dates
是包含['Id', 'Date', 'Description']
列的表,则Id
列和Description
列在联接后将被复制。
Names = sparkSession.sql("SELECT * FROM Names")
Dates = sparkSession.sql("SELECT * FROM Dates")
NamesAndDates = Names.join(Dates, Names.DateId == Dates.Id, "inner")
NamesAndDates = deDupeDfCols(NamesAndDates, '_')
NamesAndDates.saveAsTable("...", format="parquet", mode="overwrite", path="...")
其中deDupeDfCols
定义为:
def deDupeDfCols(df, separator=''):
newcols = []
for col in df.columns:
if col not in newcols:
newcols.append(col)
else:
for i in range(2, 1000):
if (col + separator + str(i)) not in newcols:
newcols.append(col + separator + str(i))
break
return df.toDF(*newcols)
生成的数据框将包含列['Id', 'Name', 'DateId', 'Description', 'Id2', 'Date', 'Description2']
。
抱歉,这个答案是用 Python 写的 - 我不熟悉 Scala,但这是我在谷歌上搜索这个问题时出现的问题,我确信 Scala 代码并没有太大的不同。