通过SPARK EMR在嵌套目录中读取S3文件



我想出了如何从s3目录中读取文件中的pyspark shell(和脚本(,例如。通过使用:

rdd = sc.wholeTextFiles('s3n://bucketname/dir/*')

但是,尽管让我在一个目录中读取所有文件,但我想读取所有目录中的每个文件。

我不想将它们弄平或一次加载,因为我会有内存问题。

相反,我需要以批处理方式自动从每个子目录中加载所有文件。这可能吗?

这是我的目录结构:

s3_bucket_name->年度(2016年或2017年( ->月(最大12个文件夹( -> day(最大31个文件夹( -> sub -Day文件夹(最大30;基本上只是每天都对收集进行分区(。

类似的东西,除了它会持续12个月,最多31天...

BucketName
|
|
|---Year(2016)
|       |
|       |---Month(11)
|       |      |
|       |      |---Day(01)
|       |      |      |
|       |      |      |---Sub-folder(01)
|       |      |      |
|       |      |      |---Sub-folder(02)
|       |      |      |
|       |      |---Day(02)
|       |      |      |
|       |      |      |---Sub-folder(01)
|       |      |      |
|       |      |      |---Sub-folder(02)
|       |      |      |
|       |---Month(12)
|
|---Year(2017)
|       |
|       |---Month(1)
|       |      |
|       |      |---Day(01)
|       |      |      |
|       |      |      |---Sub-folder(01)
|       |      |      |
|       |      |      |---Sub-folder(02)
|       |      |      |
|       |      |---Day(02)
|       |      |      |
|       |      |      |---Sub-folder(01)
|       |      |      |
|       |      |      |---Sub-folder(02)
|       |      |      |
|       |---Month(2)

上面的每个箭头代表叉子。例如我已经收集了2年的数据,因此"年"叉子有2年。然后每年最多12个月,然后每个月,最多31个可能的日间文件夹。在每天,最多将有30个文件夹,只是因为我以这种方式将其拆分...

我希望这是有道理的...

我正在查看另一篇文章(来自S3或本地文件系统的Spark的子目录递归读取文件(,我相信他们建议使用通配符,所以类似:

rdd = sc.wholeTextFiles('s3n://bucketname/*/data/*/*') 

但是问题在于,它试图在各个子目录之间找到一个共同的文件夹 - 在这种情况下,没有保证,我只需要所有东西。

但是,在那一条推理方面,我想如果我做了什么..

rdd = sc.wholeTextFiles("s3n://bucketname/*/*/*/*/*')

,但问题是我现在遇到了记忆错误,可能是因为它一次加载了所有内容并吓坏了。

理想情况下,我能做的就是:

转到一天的子目录水平并阅读其中,例如。

首次阅读2016/12/01,然后是2016/12/02,直到2012/12/31,然后是2017/01/01,然后2017/01/02,... 2017/01/31等等。

这样,我不像上面的那样使用五个通配符(*(,而是以某种方式知道在" Day"级别上每个子目录看起来都很低。

我想到使用Python词典来指定每天的文件路径,但这似乎是一种相当麻烦的方法。我的意思是:

file_dict = { 
    0:'2016/12/01/*/*', 
    1:'2016/12/02/*/*', 
    ...
    30:'2016/12/31/*/*',
}

基本上适用于所有文件夹,然后通过它们迭代并使用类似的东西加载它们:

sc.wholeTextFiles('s3n://bucketname/' + file_dict[i])

,但我不想手动输入所有这些路径。我希望这是有道理的...

编辑:

提出问题的另一种方法是,如何以批处理方式从嵌套的子目录结构中读取文件?如何在Python中列举我的S3存储桶中的所有可能的文件夹名称?也许那会有所帮助...

edit2:

我每个文件中数据的结构如下:

{json object 1},
{json object 2},
{json object 3},
...
{json object n},

要使它成为" true json",它要么只需要像上面的情况一样,而无需尾随的逗号,要么是类似的东西(注意方括号,并且缺少最终的尾随逗号:

[
   {json object 1},
   {json object 2},
   {json object 3},
   ...
   {json object n}
 ]

我完全在pyspark中这样做的原因是我提交的脚本是因为我强迫自己手动处理这种格式化的怪癖。如果我使用蜂巢/雅典娜,我不确定如何处理。

为什么不使用Hive,甚至更好,Athena?这些都将部署文件系统的表格,以使您可以访问所有数据。然后,您可以将其捕获到Spark

另外,我相信您也可以在SPARK中使用HiveQL来设置文件系统位置的tempTable ONTOP,并且它将其全部注册为Hive表,您可以对其执行SQL。自从我这样做已经有一段时间了,但是绝对是可行的

最新更新