在 Spark sql 中执行带有 count、have 和 orderby 的本机 sql



我是 apache-spark 的新手。

我得到了这个查询,结合了聚合函数计数、具有和排序。 这是我从这里引用的有效SQL查询

我从镶木地板文件创建了一个dataframe,然后尝试执行此查询:

SparkSession spark = SparkSession
.builder()
.appName("Java Spark SQL basic example")
.config("spark.master", "local")
.getOrCreate();
final Dataset<Row> dataset = spark.read().parquet("src/main/resources/test2.parquet");
dataset.createOrReplaceTempView("customers");
final Dataset<Row> dataset1 = spark.sql("SELECT count(customerid), customerid, country FROM customers GROUP BY country, customerid HAVING count(customerid) > 5 ORDER BY count(customerid) DESC");
dataset1.show();

但是我收到错误

java.lang.UnsupportedOperationException: Cannot evaluate expression: count(input[1, string, true])
at org.apache.spark.sql.catalyst.expressions.Unevaluable$class.doGenCode(Expression.scala:261)
at org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression.doGenCode(interfaces.scala:87)
at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:108)
at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:105)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:105)
at org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering$$anonfun$3.apply(GenerateOrdering.scala:83)
at org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering$$anonfun$3.apply(GenerateOrdering.scala:82)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering$.genComparisons(GenerateOrdering.scala:82)
at org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering$.create(GenerateOrdering.scala:164)
at org.apache.spark.sql.catalyst.expressions.codegen.GenerateOrdering$.create(GenerateOrdering.scala:43)
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:1193)
at org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering.<init>(GenerateOrdering.scala:207)
at org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering.<init>(GenerateOrdering.scala:204)
at org.apache.spark.sql.execution.TakeOrderedAndProjectExec.executeCollect(limit.scala:135)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3384)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2545)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2545)
at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3365)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3364)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2545)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2759)
at org.apache.spark.sql.Dataset.getRows(Dataset.scala:255)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:292)
at org.apache.spark.sql.Dataset.show(Dataset.scala:746)
at org.apache.spark.sql.Dataset.show(Dataset.scala:705)
at org.apache.spark.sql.Dataset.show(Dataset.scala:714)

queryExecution输出

== Parsed Logical Plan ==
'Sort ['count('customerid) DESC NULLS LAST], true
+- 'Filter ('count('customerid) > 5)
+- 'Aggregate ['country, 'customerid], [unresolvedalias('count('customerid), None), 'customerid, 'country]
+- 'UnresolvedRelation `customers`
== Analyzed Logical Plan ==
count(customerid): bigint, customerid: string, country: string
Sort [count(customerid#0) DESC NULLS LAST], true
+- Project [count(customerid)#42L, customerid#0, country#6]
+- Filter (count(customerid#0)#45L > cast(5 as bigint))
+- Aggregate [country#6, customerid#0], [count(customerid#0) AS count(customerid)#42L, customerid#0, country#6, count(customerid#0) AS count(customerid#0)#45L]
+- SubqueryAlias `customers`
+- Relation[customerid#0,customername#1,contactname#2,address#3,city#4,postalcode#5,country#6] parquet
== Optimized Logical Plan ==
Sort [count(customerid#0) DESC NULLS LAST], true
+- Project [count(customerid)#42L, customerid#0, country#6]
+- Filter (count(customerid#0)#45L > 5)
+- Aggregate [country#6, customerid#0], [count(customerid#0) AS count(customerid)#42L, customerid#0, country#6, count(customerid#0) AS count(customerid#0)#45L]
+- Project [customerid#0, country#6]
+- Relation[customerid#0,customername#1,contactname#2,address#3,city#4,postalcode#5,country#6] parquet
== Physical Plan ==
*(3) Sort [count(customerid#0) DESC NULLS LAST], true, 0
+- Exchange rangepartitioning(count(customerid#0) DESC NULLS LAST, 200)
+- *(2) Project [count(customerid)#42L, customerid#0, country#6]
+- *(2) Filter (count(customerid#0)#45L > 5)
+- *(2) HashAggregate(keys=[country#6, customerid#0], functions=[count(customerid#0)], output=[count(customerid)#42L, customerid#0, country#6, count(customerid#0)#45L])
+- Exchange hashpartitioning(country#6, customerid#0, 200)
+- *(1) HashAggregate(keys=[country#6, customerid#0], functions=[partial_count(customerid#0)], output=[country#6, customerid#0, count#53L])
+- *(1) FileScan parquet [customerid#0,country#6] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/D:.../src/main/resources/test2..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<customerid:string,country:string>

以及数据和模式的输出:

+----------+--------------------+------------------+--------------------+-----------+----------+-------+
|customerid|        customername|       contactname|             address|       city|postalcode|country|
+----------+--------------------+------------------+--------------------+-----------+----------+-------+
|         1| Alfreds Futterkiste|      Maria Anders|       Obere Str. 57|     Berlin|     12209|Germany|
|         2|Ana Trujillo Empa...|      Ana Trujillo|Avda. de la Const...|M�xico D.F.|      5021| Mexico|
|         3|Antonio Moreno Ta...|    Antonio Moreno|      Mataderos 2312|M�xico D.F.|      5023| Mexico|
|         4|     Around the Horn|      Thomas Hardy|     120 Hanover Sq.|     London|   WA1 1DP|     UK|
|         5|  Berglunds snabbk�p|Christina Berglund|      Berguvsv�gen 8|      Lule�|  S-958 22| Sweden|
+----------+--------------------+------------------+--------------------+-----------+----------+-------+
root
|-- customerid: string (nullable = true)
|-- customername: string (nullable = true)
|-- contactname: string (nullable = true)
|-- address: string (nullable = true)
|-- city: string (nullable = true)
|-- postalcode: string (nullable = true)
|-- country: string (nullable = true)

我不明白这里出了什么问题。

试试这个-

dataset.createOrReplaceTempView("customers");
final Dataset<Row> dataset1 = spark.sql("SELECT count(customerid) as count, customerid, country FROM " +
"customers" +
" GROUP BY country, customerid HAVING count > 5 ORDER BY count DESC");
dataset1.show();

最新更新