SPARK SQL以编程方式参考列



我即将开发一个功能,该函数使用Spark SQL执行每列操作。在此功能中,我需要参考列名:

val input = Seq(
    (0, "A", "B", "C", "D"),
    (1, "A", "B", "C", "D"),
    (0, "d", "a", "jkl", "d"),
    (0, "d", "g", "C", "D"),
    (1, "A", "d", "t", "k"),
    (1, "d", "c", "C", "D"),
    (1, "c", "B", "C", "D")
  ).toDF("TARGET", "col1", "col2", "col3TooMany", "col4")

以下示例通过'column明确指列工作正常。

val pre1_1 = input.groupBy('col1).agg(mean($"TARGET").alias("pre_col1"))
val pre2_1 = input.groupBy('col1, 'TARGET).agg(count("*") / input.filter('TARGET === 1).count alias ("pre2_col1"))
input.as('a)
    .join(pre1_1.as('b), $"a.col1" === $"b.col1").drop($"b.col1")
    .join(pre2_1.as('b), ($"a.col1" === $"b.col1") and ($"a.TARGET" === $"b.TARGET")).drop($"b.col1").drop($"b.TARGET").show
When referring to the columns programmatically they can no longer be resolved. When 2 joins are performed one after the other which worked fine for the code snippet above.

我可以观察到,对于此代码段,df的第一个和初始col1从开始到结束。这可能是无法再解决的原因。但是到目前为止,我无法弄清楚如何在仅通过字符串/如何正确地引用函数中的colnames时访问列。

val pre1_1 = input.groupBy("col1").agg(mean('TARGET).alias("pre_" + "col1"))
val pre2_1 = input.groupBy("col1", "TARGET").agg(count("*") / input.filter('TARGET === 1).count alias ("pre2_" + "col1"))
  input.join(pre1_1, input("col1") === pre1_1("col1")).drop(pre1_1("col1"))
    .join(pre2_1, (input("col1") === pre2_1("col1")) and (input("TARGET") === pre2_1("TARGET"))).drop(pre2_1("col1")).drop(pre2_1("TARGET"))

以及一种替代方法:

df.as('a)
      .join(pre1_1.as('b), $"a.${col}" === $"b.${col}").drop($"b.${col}")

没有成功,因为$"a.${col}"不再解决a.Column,而是不存在的df("a.col1")

在复杂情况下,始终使用唯一的别名来引用具有共享谱系的列。这是确保正确和稳定行为的唯一方法。

import org.apache.spark.sql.functions.col
val pre1_1 = input.groupBy("col1").agg(mean('TARGET).alias("pre_" + "col1")).alias("pre1_1")
val pre2_1 = input.groupBy("col1", "TARGET").agg(count("*") / input.filter('TARGET === 1).count alias ("pre2_" + "col1")).alias("pre2_1")
input.alias("input")
  .join(pre1_1, col("input.col1") === col("pre1_1.col1"))
  .join(pre2_1, (col("input.col1") === col("pre2_1.col1")) and (col("input.TARGET") === col("pre2_1.TARGET")))

如果您检查日志,您实际上会看到以下警告:

警告列:构建琐碎的真实等值谓词,'col1#12 = col1#12'。也许您需要使用别名

和代码您仅使用工作,因为Spark Source中有"特殊情况"。

在这样的简单情况下,只需使用Equi-Join语法:

input.join(pre1_1, Seq("col1"))
  .join(pre2_1, Seq("col1", "TARGET"))

相关内容

  • 没有找到相关文章

最新更新