我有两个DataFrame
、a
和b
。a
像
Column 1 | Column 2
abc | 123
cde | 23
b
like
Column 1
1
2
我想压缩a
和b
(甚至更多)的dataframe,变成类似的东西:
Column 1 | Column 2 | Column 3
abc | 123 | 1
cde | 23 | 2
我该怎么做?
DataFrame API不支持这样的操作。zip
两个rdd是可能的,但要使其工作,您必须匹配分区数量和每个分区的元素数量。假设是这种情况:
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructField, StructType, LongType}
val a: DataFrame = sc.parallelize(Seq(
("abc", 123), ("cde", 23))).toDF("column_1", "column_2")
val b: DataFrame = sc.parallelize(Seq(Tuple1(1), Tuple1(2))).toDF("column_3")
// Merge rows
val rows = a.rdd.zip(b.rdd).map{
case (rowLeft, rowRight) => Row.fromSeq(rowLeft.toSeq ++ rowRight.toSeq)}
// Merge schemas
val schema = StructType(a.schema.fields ++ b.schema.fields)
// Create new data frame
val ab: DataFrame = sqlContext.createDataFrame(rows, schema)
如果不满足上述条件,唯一的选择是添加索引和连接:
def addIndex(df: DataFrame) = sqlContext.createDataFrame(
// Add index
df.rdd.zipWithIndex.map{case (r, i) => Row.fromSeq(r.toSeq :+ i)},
// Create schema
StructType(df.schema.fields :+ StructField("_index", LongType, false))
)
// Add indices
val aWithIndex = addIndex(a)
val bWithIndex = addIndex(b)
// Join and clean
val ab = aWithIndex
.join(bWithIndex, Seq("_index"))
.drop("_index")
在Scala的数据框架实现中,没有简单的方法将两个数据框架连接成一个。我们可以简单地通过向数据框架的每一行添加索引来绕过这个限制。然后,我们可以用这些索引做一个内连接。这是我实现的存根代码:
val a: DataFrame = sc.parallelize(Seq(("abc", 123), ("cde", 23))).toDF("column_1", "column_2")
val aWithId: DataFrame = a.withColumn("id",monotonicallyIncreasingId)
val b: DataFrame = sc.parallelize(Seq((1), (2))).toDF("column_3")
val bWithId: DataFrame = b.withColumn("id",monotonicallyIncreasingId)
aWithId.join(bWithId, "id")
一个简单的阅读-看看Python是如何做到这一点的!
纯SQL呢?
SELECT
room_name,
sender_nickname,
message_id,
row_number() over (partition by room_name order by message_id) as message_index,
row_number() over (partition by room_name, sender_nickname order by message_id) as user_message_index
from messages
order by room_name, message_id
我知道OP使用的是Scala,但如果像我一样,你需要知道如何在pyspark中做到这一点,那么试试下面的Python代码。与@zero323的第一个解决方案一样,它依赖于RDD.zip()
,因此如果两个dataframe的分区数量和每个分区中的行数不相同,则会失败。
from pyspark.sql import Row
from pyspark.sql.types import StructType
def zipDataFrames(left, right):
CombinedRow = Row(*left.columns + right.columns)
def flattenRow(row):
left = row[0]
right = row[1]
combinedVals = [left[col] for col in left.__fields__] + [right[col] for col in right.__fields__]
return CombinedRow(*combinedVals)
zippedRdd = left.rdd.zip(right.rdd).map(lambda row: flattenRow(row))
combinedSchema = StructType(left.schema.fields + right.schema.fields)
return zippedRdd.toDF(combinedSchema)
joined = zipDataFrames(a, b)