如何在Spark中压缩两个(或更多)DataFrame



我有两个DataFrameaba

Column 1 | Column 2
abc      |  123
cde      |  23 

b like

Column 1 
1      
2      

我想压缩ab(甚至更多)的dataframe,变成类似的东西:

Column 1 | Column 2 | Column 3
abc      |  123     |   1
cde      |  23      |   2

我该怎么做?

DataFrame API不支持这样的操作。zip两个rdd是可能的,但要使其工作,您必须匹配分区数量和每个分区的元素数量。假设是这种情况:

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructField, StructType, LongType}
val a: DataFrame = sc.parallelize(Seq(
  ("abc", 123), ("cde", 23))).toDF("column_1", "column_2")
val b: DataFrame = sc.parallelize(Seq(Tuple1(1), Tuple1(2))).toDF("column_3")
// Merge rows
val rows = a.rdd.zip(b.rdd).map{
  case (rowLeft, rowRight) => Row.fromSeq(rowLeft.toSeq ++ rowRight.toSeq)}
// Merge schemas
val schema = StructType(a.schema.fields ++ b.schema.fields)
// Create new data frame
val ab: DataFrame = sqlContext.createDataFrame(rows, schema)

如果不满足上述条件,唯一的选择是添加索引和连接:

def addIndex(df: DataFrame) = sqlContext.createDataFrame(
  // Add index
  df.rdd.zipWithIndex.map{case (r, i) => Row.fromSeq(r.toSeq :+ i)},
  // Create schema
  StructType(df.schema.fields :+ StructField("_index", LongType, false))
)
// Add indices
val aWithIndex = addIndex(a)
val bWithIndex = addIndex(b)
// Join and clean
val ab = aWithIndex
  .join(bWithIndex, Seq("_index"))
  .drop("_index")

在Scala的数据框架实现中,没有简单的方法将两个数据框架连接成一个。我们可以简单地通过向数据框架的每一行添加索引来绕过这个限制。然后,我们可以用这些索引做一个内连接。这是我实现的存根代码:

val a: DataFrame = sc.parallelize(Seq(("abc", 123), ("cde", 23))).toDF("column_1", "column_2")
val aWithId: DataFrame = a.withColumn("id",monotonicallyIncreasingId)
val b: DataFrame = sc.parallelize(Seq((1), (2))).toDF("column_3")
val bWithId: DataFrame = b.withColumn("id",monotonicallyIncreasingId)
aWithId.join(bWithId, "id")

一个简单的阅读-看看Python是如何做到这一点的!

纯SQL呢?

SELECT 
    room_name, 
    sender_nickname, 
    message_id, 
    row_number() over (partition by room_name order by message_id) as message_index, 
    row_number() over (partition by room_name, sender_nickname order by message_id) as user_message_index
from messages
order by room_name, message_id

我知道OP使用的是Scala,但如果像我一样,你需要知道如何在pyspark中做到这一点,那么试试下面的Python代码。与@zero323的第一个解决方案一样,它依赖于RDD.zip(),因此如果两个dataframe的分区数量和每个分区中的行数不相同,则会失败。

from pyspark.sql import Row
from pyspark.sql.types import StructType
def zipDataFrames(left, right):
    CombinedRow = Row(*left.columns + right.columns)
    def flattenRow(row):
        left = row[0]
        right = row[1]
        combinedVals = [left[col] for col in left.__fields__] + [right[col] for col in right.__fields__]
        return CombinedRow(*combinedVals)
    zippedRdd = left.rdd.zip(right.rdd).map(lambda row: flattenRow(row))        
    combinedSchema = StructType(left.schema.fields + right.schema.fields)        
    return zippedRdd.toDF(combinedSchema)
joined = zipDataFrames(a, b)

相关内容

  • 没有找到相关文章

最新更新