将HiveQL转换为Spark Scala



我想将带有窗口函数的HiveQL查询转换为Scala Spark查询...但我不断收到相同的异常。

问题上下文:mytablecategoryproduct字段组成。我想获得每个类别的前 N 个常用产品的列表。 下面的DF是一个HiveContext对象

原始查询(正常工作):

SELECT category, product, freq FROM (
    SELECT category, product, COUNT(*) AS freq, 
    ROW_NUMBER() OVER (PARTITION BY category ORDER BY COUNT(*) DESC) as seqnum
    FROM mytable GROUP BY category, product) ci 
WHERE seqnum <= 10;

我现在拥有的(部分转换,不起作用):

val w = row_number().over(Window.partitionBy("category").orderBy(count("*").desc))
val result = df.select("category", "product").groupBy("category", "product").agg(count("*").as("freq"))
val new_res = result.withColumn("seqNum", w).where(col("seqNum") <= 10).drop("seqNum")

不断收到以下异常:

线程"main"中的异常 org.apache.spark.sql.AnalysisException:表达式"类别"既不存在于分组依据中,也不是聚合函数。添加到分组依据或包装 first()(或 first_value) 如果你不在乎你得到哪个值。

这里可能出了什么问题?

你的错误是在 orderBy 子句中使用 aggregation:

.orderBy(count("*").desc)

如果这样写,表达式会引入新的聚合表达式。相反,您应该按名称引用现有聚合:

.orderBy("freq")

因此,您的代码应如下所示:

val w = row_number().over(
  Window.partitionBy("category").orderBy("freq"))
val result = df.select("category", "product")
  .groupBy("category", "product")
  .agg(count("*").as("freq"))
val new_res = result
  .withColumn("seqNum", w).where(col("seqNum") <= 10)
  .drop("seqNum")

相关内容

  • 没有找到相关文章

最新更新