我如何使用pyspark从s3读取文件,这是在特定时间后创建的



我需要使用pyspark从s3读取json文件。S3位置可能包含数十万个文件。每个文件都有相同的元数据。但是每次我只需要读取在特定时间之后创建的文件。我怎么能做到呢?

如果您可以访问创建这些文件的系统,那么最简单的方法就是在编写这些文件时添加一个date分区:

s3://mybucket/myfolder/date=20210901/myfile1.json
s3://mybucket/myfolder/date=20210901/myfile1.json
s3://mybucket/myfolder/date=2021831/myfileA.json

然后你可以用过滤器读取它们;Pyspark只会将需要的文件加载到内存中。

start_dt = '20210831'
end_dt = '20210901'
df = (
spark
.read
.json(path)
.filter(F.col("date").between(start_dt, end_dt))
)

请注意,我没有明确地用JSON文件测试过这个,只是用Parquet测试过,所以这个方法可能需要调整。

如果你不能修改文件的写入方式,我认为Pyspark不能直接访问文件的元数据。相反,您将希望使用boto3直接查询S3以生成文件列表,使用boto3元数据过滤它们,然后将文件列表传递给read方法:

# generate this by querying via boto3
recent_files = ['s3://mybucket/file1.json', 's3://mybucket/file2.json']
df = spark.read.json(*recent_files)

关于从boto3列表文件的信息。

可以为DataFrameReader提供modifiedAftermodifiedBefore参数。json函数。

modifiedBefore an optional timestamp to only include files with
modification times occurring before the specified time. The provided timestamp must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
modifiedAfter an optional timestamp to only include files with
modification times occurring after the specified time. The provided timestamp must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)

from datetime import datetime
# Fill this variable with your last date
lowerbound = datetime(2021, 9, 1, 13, 0, 0)
# Current execution
upperbound = datetime.now()
df = spark.read.json(source_path, 
modifiedAfter=lowerbound.strftime('%Y-%m-%dT%H:%M:%S'), 
modifiedBefore=upperbound.strftime('%Y-%m-%dT%H:%M:%S'))

在Kafels回答的讨论中提到,modifiedBeforemodifiedAfter不能使用S3作为数据源。这真是太遗憾了!

下一个最佳选择是使用boto3列出分区中的所有对象,然后在结果中的lastModified元素上过滤结果。结果不包含创建时间戳,因此lastModified是您能做的最好的。在对象数量很大的情况下,您还需要小心处理分页。

像这样的操作应该可以检索匹配的键:

import boto3
def get_matching_s3_keys(bucket, prefix="", after_date=None):
"""
List keys in an S3 bucket that match specified criteria.
:param bucket: Name of the S3 bucket.
:param prefix: Only get objects whose key starts with
this prefix
:param after_date: Only get objects that were last modified
after this date. Note: this needs to be a timezone-aware date
"""
paginator = s3.get_paginator("list_objects_v2")
kwargs = {'Bucket': bucket, 'Prefix': prefix}
for page in paginator.paginate(**kwargs):
try:
contents = page["Contents"]
except KeyError:
break
for obj in contents:
last_modified = obj["LastModified"]
if after_date is None or last_modified > after_date:
yield obj["Key"]

最新更新