Pyspark 从日期层次结构存储中读取选定的日期文件



我正在尝试使用 Pyspark 读取多个 CSV 文件,数据由 Amazon Kinesis Firehose 处理,因此它们以以下格式编写。

s3bucket/ 
    YYYY/
        mm/
            dd/
                hh/
                    files.gz
                    files.gz
                    files.gz

我实际上正在使用这段代码阅读一整天(例如 15/01/2019(,使用正则表达式:

data = spark.read.format("s3selectJson").options(compression="GZIP", multiline=True) 
    .load("s3://s3bucket/2019/01/15/*.gz".format(datetime_object.strftime("%Y/%m/%d")))

我的问题是,我如何读取知道我想要的日期的多天数据?有没有自动方法,或者我应该为我需要的日期制作正则表达式?

编辑:
我正在寻找的是以下
文档中DataFrameWriter.partitionBy(*cols(方法的反函数http://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=regex#pyspark.sql.DataFrameWriter

我担心,没有办法做到这一点。

如果你的数据结构如下(月份=,年份=...(,我们称之为分区。

s3bucket/ 
    year=YYYY/
        month=mm/
            day=dd/
                hour=hh/
                    files.gz
                    files.gz
                    files.gz

您可以轻松加载数据(在您的情况下,按特定日期加载(

data = spark.read.format("s3selectJson").options(compression="GZIP", multiline=True) 
  .load("s3://s3bucket/")
data_days = data.filter("day in (10, 20)")

使用分区时,Spark 仅加载特定日期,而不是所有日期。

我没有找到它的函数,但是,这是一个解决方法:

datetime_object = datetime.strptime("2019-01-31", '%Y-%m-%d')
delta_days = 10
base_bucket = "s3://s3bucket/{}/*/*.gz"
bucket_names = []
for date in [datetime_object - timedelta(days=x) for x in range(0, delta_days)]:
    bucket_names.append(base_bucket.format(date.strftime("%Y/%m/%d")))

幸运的是,.load()函数将列表作为源路径的参数,因此我根据所需的日期生成每个路径并将其提供给 load 函数。

data = spark.read.format("csv").options(compression="GZIP") 
        .load(bucket_names)

最新更新