Sparks2/Java8 Cassandra2尝试从Cassandra读取一些数据,然后在sparks中通过查询运行一个组。我的DF中只有2列转换(日期),来源(字符串)
Dataset<Row> maxOrigindate = sparks.sql("SELECT origin, transdate, COUNT(*) AS cnt FROM origins GROUP BY (origin,transdate) ORDER BY cnt DESC LIMIT 1"); `
获取错误:
`Exception in thread "main" org.apache.spark.sql.AnalysisException: expression 'origins.`origin`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value)`
按问题分组已解决,在分组中删除()如下
完整代码:(试图获得一个来源/地点的最大运输次数)
JavaRDD<TransByDate> originDateRDD = javaFunctions(sc).cassandraTable("trans", "trans_by_date", CassandraJavaUtil.mapRowTo(TransByDate.class))
.select(CassandraJavaUtil.column("origin"), CassandraJavaUtil.column("trans_date").as("transdate")).limit((long)100) ;
Dataset<Row> originDF = sparks.createDataFrame(originDateRDD, TransByDate.class);
String[] columns = originDF.columns();
System.out.println("originDF columns: "+columns[0]+" "+columns[1]) ; -> transdate origin
originDF.createOrReplaceTempView("origins");
Dataset<Row> maxOrigindate = sparks.sql("SELECT origin, transdate, COUNT(*) AS cnt FROM origins GROUP BY origin,transdate ORDER BY cnt DESC LIMIT 1");
List list = maxOrigindate.collectAsList(); -> Exception here
int j = list.size();
originaDF列:转导来源
`public static class TransByDate implements Serializable {
private String origin;
private Date transdate;
public TransByDate() { }
public TransByDate (String origin, Date transdate) {
this.origin = origin;
this.transdate= transdate;
}
public String getOrigin() { return origin; }
public void setOrigin(String origin) { this.origin = origin; }
public Date getTransdate() { return transdate; }
public void setTransdate(Date transdate) { this.transdate = transdate; }
}
架构
root
|-- transdate: struct (nullable = true)
| |-- date: integer (nullable = false)
| |-- day: integer (nullable = false)
| |-- hours: integer (nullable = false)
| |-- minutes: integer (nullable = false)
| |-- month: integer (nullable = false)
| |-- seconds: integer (nullable = false)
| |-- time: long (nullable = false)
| |-- timezoneOffset: integer (nullable = false)
| |-- year: integer (nullable = false)
|-- origin: string (nullable = true)
异常错误执行程序:2.0阶段任务0.0中的异常(TID 12)scala。MatchError:Sun Jan 01 00:00:00 PST 2012(属于java.util.Date类)网址:org.apache.spark.sql.cocatalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:256)网址:org.apache.spark.sql.cocatalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:251)网址:org.apache.spark.sql.cocatalyst.CatalystTypeConverters$CatalystType Converter.toCatalyst(CatalystTypeConverters.scala:103)。。。。线程"main"org.apache.spark.SparkException中的异常:由于阶段失败,作业中止:阶段2.0中的任务0失败了1次,最近的失败:阶段2.0(TID 12,localhost)中丢失的任务0.0:scala。MatchError:Sun Jan 01 00:00:00 PST 2012(属于java.util.Date类)网址:org.apache.spark.sql.cocatalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:256)。。。驱动器堆栈:在org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheddler$$failJobAndIndependentStages(DAGSchedler.scala:1454)在org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)在org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441)。。。网址:org.apache.spark.sql.Dataset$$anonfun$collectAsList$1.apply(数据集.scala:2184)网址:org.apache.spark.sql.Dataset.withCallback(数据集.scala:2559)网址:org.apache.spark.sql.Dataset.collectAsList(数据集.scala:2184)在火花处。SparkTest.sqlMaxCount(SparkTest.java:244)->List-List=maxOrigindate.collectAsList()
由:scala.MatchError引起:Sun Jan 01 00:00:00 PST 2012(属于类java.util.Date)网址:org.apache.spark.sql.cocatalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:256)网址:org.apache.spark.sql.cocatalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:251)
您得到的错误如下。
Caused by: scala.MatchError: Sun Jan 01 00:00:00 PST 2012 (of class java.util.Date) at
此错误是因为Spark sql支持java.sql.Date
类型。请在此处查看Spark文档。您也可以参考SPARK-2562。
将查询更改为
Dataset<Row> maxOrigindate = sparks.sql("SELECT origin,
transdate,
COUNT(*) AS cnt FROM origins GROUP BY origin,transdate
ORDER BY cnt DESC LIMIT 1");
这会奏效的。