如何使用spark-df读取s3中某个目录下的所有文件



我在这个模式中有490个JSON文件657438009821376.JSON,所有不同的文件都有不同的编号。

我能用吗"val input=spark.read.option("header",true(.json("/path/to/data/[0-9]*.json"(">

我需要将所有490个文件读入一个DF

您提供文件路径或目录路径作为源。https://spark.apache.org/docs/latest/sql-data-sources-json.html

在实际加载特定文件之前,没有从中筛选出这些文件的选项。加载它们时,您可以映射它们的源文件名&过滤掉不必要的文件,但我不建议做

我建议你可以分几个步骤来完成:

  • 使用boto3客户端列出s3存储桶中的所有匹配文件,smth类似
import boto
import boto.s3
import re
pattern = re.compile("your_regexp_pattern")
bucket = boto.s3.connect_to_region('eu-central-1').get_bucket("BUCKET_NAME")
files = bucket.list("","/path/to-dir")
filteredFiles = filter(lambda filename: pattern.match(filename), files)
  • 向spark提供从s3读取为json的文件列表
your_schema_class = StructType() 
.add("first_name", StringType()) 
.add("last_name", StringType())
# From step 1:
# files = ["s3://1.json", "s3://2.json"]
df = spark.read.json(files, your_schema_class)
df.show()

相关内容

  • 没有找到相关文章

最新更新