联接来自同一源的两个数据帧



我正在使用pyspark(Apache Spark)的DataFrame API,遇到了以下问题:

当我联接来自同一源数据帧的两个数据帧时,生成的 DF 将分解为大量行。举个简单的例子:

我从磁盘加载包含 n 行的数据帧:

df = sql_context.parquetFile('data.parquet')

然后,我从该源创建两个数据帧。

df_one = df.select('col1', 'col2')
df_two = df.select('col1', 'col3')

最后,我想(内部)将它们重新连接在一起:

df_joined = df_one.join(df_two, df_one['col1'] == df_two['col1'], 'inner')

col1中的密钥是唯一的。生成的数据帧应具有 n 行,但它确实具有n*n行。

当我直接从磁盘加载df_onedf_two时,不会发生这种情况。我在 Spark 1.3.0 上,但这也会发生在当前的 1.4.0 快照上。

谁能解释为什么会这样?

如果我

没看错,df_two没有col2

    df_one = df.select('col1', 'col2')
    df_two = df.select('col1', 'col3')

因此,当您这样做时:

    df_one.join(df_two, df_one['col1'] == df_two['col2'], 'inner')

这应该失败。如果你的意思是说

    df_one.join(df_two, df_one['col1'] == df_two['col1'], 'inner')

但是,从同一数据框加载的事实应该不会产生影响。我建议你这样做:

    df_one.show()
    df_two.show()

确保所选数据符合预期。

我在 Spark 1.3 上的大型数据集中也看到了这个问题。不幸的是,在我编造的小而人为的例子中,"加入"工作正常。我觉得加入之前的步骤中可能存在一些潜在的错误

执行连接(注意:日期时间只是一个字符串):

> join = df1.join(df2, df1.DateTime == df2.DateTime, "inner")
> join.count()
250000L

这显然是返回完整的 500*500 笛卡尔连接。

对我有用的是切换到 SQL:

  > sqlc.registerDataFrameAsTable(df1, "df1")
  > sqlc.registerDataFrameAsTable(df2, "df2")
  > join = sqlc.sql("select * from df1, df2 where df1.DateTime = df2.DateTime")
  > join.count()
  471L

该值看起来是正确的。

看到这一点,我个人不会使用 pyspark 的 DataFrame.join(),直到我能更好地理解这种差异。

相关内容

  • 没有找到相关文章

最新更新