我一直在搜索,但没有找到如何使用Spark SQL查询存储为UTC毫秒的日期的解决方案。我从NoSQL数据源(MongoDB中的JSON)中提取的模式的目标日期为:
|-- dateCreated: struct (nullable = true)
||-- $date: long (nullable = true)
完整的模式如下:
scala> accEvt.printSchema
root
|-- _id: struct (nullable = true)
| |-- $oid: string (nullable = true)
|-- appId: integer (nullable = true)
|-- cId: long (nullable = true)
|-- data: struct (nullable = true)
| |-- expires: struct (nullable = true)
| | |-- $date: long (nullable = true)
| |-- metadata: struct (nullable = true)
| | |-- another key: string (nullable = true)
| | |-- class: string (nullable = true)
| | |-- field: string (nullable = true)
| | |-- flavors: string (nullable = true)
| | |-- foo: string (nullable = true)
| | |-- location1: string (nullable = true)
| | |-- location2: string (nullable = true)
| | |-- test: string (nullable = true)
| | |-- testKey: string (nullable = true)
| | |-- testKey2: string (nullable = true)
|-- dateCreated: struct (nullable = true)
| |-- $date: long (nullable = true)
|-- id: integer (nullable = true)
|-- originationDate: struct (nullable = true)
| |-- $date: long (nullable = true)
|-- processedDate: struct (nullable = true)
| |-- $date: long (nullable = true)
|-- receivedDate: struct (nullable = true)
| |-- $date: long (nullable = true)
我的目标是按照以下方式编写查询:
SELECT COUNT(*) FROM myTable WHERE dateCreated BETWEEN [dateStoredAsLong0] AND [dateStoredAsLong1]
到目前为止,我的流程是:
scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@29200d25
scala> val accEvt = sqlContext.jsonFile("/home/bkarels/mongoexport/accomplishment_event.json")
...
14/10/29 15:03:38 INFO SparkContext: Job finished: reduce at JsonRDD.scala:46, took 4.668981083 s
accEvt: org.apache.spark.sql.SchemaRDD =
SchemaRDD[6] at RDD at SchemaRDD.scala:103
scala> accEvt.registerAsTable("accomplishmentEvent")
(此时,以下基线查询成功执行)
scala> sqlContext.sql("select count(*) from accomplishmentEvent").collect.foreach(println)
...
[74475]
现在,我无法正确理解的伏都教是如何形成我的选择声明来推断日期。例如,以下执行w/o错误,但返回零,而不是所有记录的计数(74475)。
scala> sqlContext.sql("select count(*) from accomplishmentEvent where processedDate >= '1970-01-01'").collect.foreach(println)
...
[0]
我也尝试过一些丑陋的东西,比如:
scala> val now = new java.util.Date()
now: java.util.Date = Wed Oct 29 15:05:15 CDT 2014
scala> val today = now.getTime
today: Long = 1414613115743
scala> val thirtydaysago = today - (30 * 24 * 60 * 60 * 1000)
thirtydaysago: Long = 1416316083039
scala> sqlContext.sql("select count(*) from accomplishmentEvent where processedDate <= %s and processedDate >= %s".format(today,thirtydaysago)).collect.foreach(println)
根据建议,我已经在一个命名字段上进行了选择,以确保其有效。因此:
scala> sqlContext.sql("select receivedDate from accomplishmentEvent limit 10").collect.foreach(println)
退货:
[[1376318850033]]
[[1376319429590]]
[[1376320804289]]
[[1376320832835]]
[[1376320832960]]
[[1376320835554]]
[[1376320914480]]
[[1376321041899]]
[[1376321109341]]
[[1376321121469]]
然后扩展到尝试让一些我尝试过的日期生效:
scala> sqlContext.sql("select cId from accomplishmentEvent where receivedDate.date > '1970-01-01' limit 5").collect.foreach(println)
导致错误的结果:
java.lang.RuntimeException: No such field date in StructType(ArrayBuffer(StructField($date,LongType,true)))
...
如建议的那样,在字段名称前面加上$
会导致另一种错误:
scala> sqlContext.sql("select cId from accomplishmentEvent where receivedDate.$date > '1970-01-01' limit 5").collect.foreach(println)
java.lang.RuntimeException: [1.69] failure: ``UNION'' expected but ErrorToken(illegal character) found
select actualConsumerId from accomplishmentEvent where receivedDate.$date > '1970-01-01' limit 5
很明显,我不知道如何选择以这种方式存储的日期——有人能帮我填补这个空白吗?
我对Scala和Spark都比较新,所以如果这是一个基本问题,请原谅我,但我在论坛和Spark文档上的搜索结果都是空的。
谢谢。
您的JSON不是平面的,因此需要使用限定名称(如dateCreated.$date
)来寻址顶级以下的字段。你的特定日期字段都是long
类型的,所以你需要对它们进行数字比较,看起来你做这些比较是正确的。
另外一个问题是,字段名称中包含"$"字符,Spark SQL不允许您查询这些字符。一种解决方案是,不要直接将JSON作为SchemaRDD
读取(正如您所做的那样),而是先将其作为RDD[String]
读取,使用map
方法执行您选择的Scala字符串操作,然后使用SQLContext
的jsonRDD
方法创建SchemaRDD
。
val lines = sc.textFile(...)
// you may want something less naive than global replacement of all "$" chars
val linesFixed = lines.map(s => s.replaceAllLiterally("$", ""))
val accEvt = sqlContext.jsonRDD(linesFixed)
我已经用Spark 1.1.0测试过了。
作为参考,Spark SQL中缺乏引用功能的问题已经在本错误报告和其他错误报告中得到了说明,而且该修复程序似乎是最近签入的,但需要一些时间才能将其发布到中