Spark (sSala) 合并从映射函数生成的可变数量的数据帧



我有两个DataFrame(dfMaindf(都是String类型的3列("col1", "col2", "col3"(。我需要将一个函数映射到df的每一行(r(,并根据一些标准,选择那些dfMain并满足标准的元素,然后进行采样。

最终结果应作为积分返回DataFrame(即 Dataset[Row] (。我的错误不完整(scala(解决方案如下。由于map中的标准可能会变得更加复杂,因此我希望有一个更通用的解决方案,并允许合并要合并的map的结果(即生成的DataFrames(。

case class record(col1: String, col2:String, col3:String)
def myFun(df: DataFrame) : DataFrame = {
  df.as[record].map{
    r => dfMain.filter($"col1" !== r.col1 &&
                       $"col2" === r.col2 && 
                       $"col3" === r.col3 )
               .sample(false,0.2)
  }
}

假设您有两个dataframes

dfMain 
+----+----+----+
|col1|col2|col3|
+----+----+----+
|a1  |b1  |c1  |
|a   |b2  |c3  |
|a   |b3  |c4  |
+----+----+----+

df
+----+----+----+
|col1|col2|col3|
+----+----+----+
|a   |b1  |c1  |
|a   |b2  |c2  |
|a   |b3  |c3  |
+----+----+----+

现在查看有问题的代码,您正在尝试从col1不等于 df col1 且其余两列相等的dfMainfilter in rows,即 .filter($"col1" !== r.col1 && $"col2" === r.col2 && $"col3" === r.col3) .如果是这种情况,那么您应该选择join

dfMain.as("main").join(df.as("table"), $"main.col1" =!= $"table.col1" && $"main.col2" === $"table.col2" && $"main.col3" === $"table.col3", "inner")

应该给你

+----+----+----+----+----+----+
|col1|col2|col3|col1|col2|col3|
+----+----+----+----+----+----+
|a1  |b1  |c1  |a   |b1  |c1  |
+----+----+----+----+----+----+

现在,如果您只想要dfMaincolumns,您应该这样做

dfMain.as("main").join(df.as("table"), $"main.col1" =!= $"table.col1" && $"main.col2" === $"table.col2" && $"main.col3" === $"table.col3", "inner")
    .select($"main.col1", $"main.col2",$"main.col3")

应该是

+----+----+----+
|col1|col2|col3|
+----+----+----+
|a1  |b1  |c1  |
+----+----+----+

如果你只想要df columns,你应该这样做

dfMain.as("main").join(df.as("table"), $"main.col1" =!= $"table.col1" && $"main.col2" === $"table.col2" && $"main.col3" === $"table.col3", "inner")
    .select($"table.col1", $"table.col2",$"table.col3")

应将数据帧生成为

+----+----+----+
|col1|col2|col3|
+----+----+----+
|a   |b1  |c1  |
+----+----+----+

或者您可以同时输出两种dataframe joined

相关内容

  • 没有找到相关文章

最新更新