描述
我有一个应用程序,它将数据发送到AWS Kinesis Firehose,并将数据写入我的S3存储桶。Firehose使用"yyyy/MM/dd/HH"格式写入文件。
类似于此示例S3路径:
s3://mybucket/2016/07/29/12
现在我有一个用Scala编写的Spark应用程序,我需要在其中读取特定时间段的数据。我有开始日期和结束日期。数据是JSON格式的,这就是为什么我使用sqlContext.read.json()
而不是sc.textFile()
。
如何快速高效地读取数据?
我试过什么
通配符-我可以从特定日期的所有小时或特定月份的所有日期中选择数据,例如:
val df = sqlContext.read.json("s3://mybucket/2016/07/29/*") val df = sqlContext.read.json("s3://mybucket/2016/07/*/*")
但是,如果我必须读取几天内的数据,例如2016-07-29-2016-07-30,我不能以同样的方式使用通配符方法。
这就引出了我的下一点。。。
- 在此解决方案中使用多个路径或所示的CSV目录。用逗号分隔目录似乎只适用于
sc.textFile()
,而不适用于sqlContext.read.json()
Union-cloud上一个链接中的第二个解决方案建议分别读取每个目录,然后将它们联合在一起。尽管他建议联合RDD,但也可以选择联合DataFrames。如果我手动从给定的日期段生成日期字符串,那么我可能会创建一个不存在的路径,而不是忽略它,整个读取都会失败。相反,我可以使用AWS SDK,并使用AmazonS3Client中的函数
listObjects
从上一个链接中获取所有密钥,如iMKanchwala的解决方案中的密钥。唯一的问题是我的数据在不断变化。如果
read.json()
函数将所有数据作为一个参数获取,它将读取所有必要的数据,并且足够聪明,可以从数据中推断json模式。如果我分别读取两个目录,但它们的模式不匹配,那么我认为统一这两个数据帧将成为一个问题。Glob(?)语法-nhahtdh的此解决方案比选项1和2好一点,因为它们提供了更详细地指定日期和目录的选项,并且是一个单一的"路径",因此它也适用于
read.json()
。但是,关于丢失的目录,又出现了一个常见的问题。假设我想要从20.07到30.07的所有数据,我可以这样声明:
val df = sqlContext.read.json("s3://mybucket/2016/07/[20-30]/*")
但是,如果我缺少7月25日的数据,那么路径
..16/07/25/
就不存在,整个函数就会失败。
很明显,当请求的周期为25.11.2015-12.02.2016时,这会变得更加困难,然后我需要通过编程(在我的Scala脚本中)创建一个字符串路径,比如:
"s3://mybucket/{2015/11/[25-30],2015/12/*,2016/01/*,2016/02/[01-12]}/*"
通过创建它,我需要以某种方式确保25-30和01-12的间隔都有相应的路径,如果缺少一个,它就会再次失败。(幸运的是,Asterisk处理了丢失的目录,因为它读取了所有存在的内容)
如何一次从单个目录路径读取所有必要的数据,而不会因为某个日期间隔之间缺少目录而失败
有一个简单得多的解决方案。如果您查看DataFrameReader API,您会注意到有一个.json(paths: String*)
方法。只需构建一个您想要的路径集合,根据您的喜好,使用not的globs,然后调用该方法,例如
val paths: Seq[String] = ...
val df = sqlContext.read.json(paths: _*)