使用Spark访问数据框架中的列



我正在使用SCALA开发SPARK 1.6.1版本,遇到了一个不寻常的问题。当使用同一执行期间创建的现有列创建新列时,获得"org.apache.spark.sql.AnalysisException"。
工作:

 val resultDataFrame = dataFrame.withColumn("FirstColumn",lit(2021)).withColumn("SecondColumn",when($"FirstColumn" - 2021 === 0, 1).otherwise(10))
    resultDataFrame.printSchema().

NOT WORKING

val resultDataFrame = dataFrame.withColumn("FirstColumn",lit(2021)).withColumn("SecondColumn",when($"FirstColumn" - **max($"FirstColumn")** === 0, 1).otherwise(10))
resultDataFrame.printSchema().

这里我使用在同一执行期间创建的FirstColumn创建我的SecondColumn。问题是为什么它在使用avg/max函数时不起作用。请告诉我如何解决这个问题。

如果您想将聚合函数与"正常"列一起使用,则函数应该在groupBy之后或与窗口定义子句一起使用。在这些情况下,它们毫无意义。例子:

val result = df.groupBy($"col1").max("col2").as("max") // This works

在上面的例子中,生成的DataFrame将同时具有"col1"one_answers"max"作为列。

val max = df.select(min("col2"), max("col2")) 

这样做是因为查询中只有聚合函数。但是,以下命令将不起作用:

val result = df.filter($"col1" === max($"col2"))

因为我正在尝试将非聚合列与聚合列混合。

如果要将列与聚合值进行比较,可以尝试使用join:

val maxDf = df.select(max("col2").as("maxValue"))
val joined = df.join(maxDf)
val result = joined.filter($"col1" === $"maxValue").drop("maxValue")

或者使用简单的值:

val maxValue = df.select(max("col2")).first.get(0)
val result = filter($"col1" === maxValue)

相关内容

  • 没有找到相关文章

最新更新