我即将开发一个功能,该函数使用Spark SQL执行每列操作。在此功能中,我需要参考列名:
val input = Seq(
(0, "A", "B", "C", "D"),
(1, "A", "B", "C", "D"),
(0, "d", "a", "jkl", "d"),
(0, "d", "g", "C", "D"),
(1, "A", "d", "t", "k"),
(1, "d", "c", "C", "D"),
(1, "c", "B", "C", "D")
).toDF("TARGET", "col1", "col2", "col3TooMany", "col4")
以下示例通过'column
明确指列工作正常。
val pre1_1 = input.groupBy('col1).agg(mean($"TARGET").alias("pre_col1"))
val pre2_1 = input.groupBy('col1, 'TARGET).agg(count("*") / input.filter('TARGET === 1).count alias ("pre2_col1"))
input.as('a)
.join(pre1_1.as('b), $"a.col1" === $"b.col1").drop($"b.col1")
.join(pre2_1.as('b), ($"a.col1" === $"b.col1") and ($"a.TARGET" === $"b.TARGET")).drop($"b.col1").drop($"b.TARGET").show
When referring to the columns programmatically they can no longer be resolved. When 2 joins are performed one after the other which worked fine for the code snippet above.
我可以观察到,对于此代码段,df
的第一个和初始col1
从开始到结束。这可能是无法再解决的原因。但是到目前为止,我无法弄清楚如何在仅通过字符串/如何正确地引用函数中的colnames时访问列。
val pre1_1 = input.groupBy("col1").agg(mean('TARGET).alias("pre_" + "col1"))
val pre2_1 = input.groupBy("col1", "TARGET").agg(count("*") / input.filter('TARGET === 1).count alias ("pre2_" + "col1"))
input.join(pre1_1, input("col1") === pre1_1("col1")).drop(pre1_1("col1"))
.join(pre2_1, (input("col1") === pre2_1("col1")) and (input("TARGET") === pre2_1("TARGET"))).drop(pre2_1("col1")).drop(pre2_1("TARGET"))
以及一种替代方法:
df.as('a)
.join(pre1_1.as('b), $"a.${col}" === $"b.${col}").drop($"b.${col}")
没有成功,因为$"a.${col}"
不再解决a.Column
,而是不存在的df("a.col1")
。
在复杂情况下,始终使用唯一的别名来引用具有共享谱系的列。这是确保正确和稳定行为的唯一方法。
import org.apache.spark.sql.functions.col
val pre1_1 = input.groupBy("col1").agg(mean('TARGET).alias("pre_" + "col1")).alias("pre1_1")
val pre2_1 = input.groupBy("col1", "TARGET").agg(count("*") / input.filter('TARGET === 1).count alias ("pre2_" + "col1")).alias("pre2_1")
input.alias("input")
.join(pre1_1, col("input.col1") === col("pre1_1.col1"))
.join(pre2_1, (col("input.col1") === col("pre2_1.col1")) and (col("input.TARGET") === col("pre2_1.TARGET")))
如果您检查日志,您实际上会看到以下警告:
警告列:构建琐碎的真实等值谓词,'col1#12 = col1#12'。也许您需要使用别名
和代码您仅使用工作,因为Spark Source中有"特殊情况"。
在这样的简单情况下,只需使用Equi-Join语法:
input.join(pre1_1, Seq("col1"))
.join(pre2_1, Seq("col1", "TARGET"))