使用PySpark可以在单个目录下读取特定文件吗



我遇到以下情况,在一个目录下有多个文件,如下所示:

AT_BookingRequests_16102021_05082022.xlsx
AT_Bookings_16102021_05082022.xlsx
SWE_BookingRequests_08042019_05082022.xlsx
SWE_Bookings_06082020_05082021.xlsx
SWE_Bookings_06082021_05082022.xlsx
SWE_Bookings_08042019_05082020.xlsx

现在我需要将它们加载到两个不同的表中(比如booking_request和bookings(。文件夹中的数据每天都会被加载,我需要只读取每天加载的新记录。

到目前为止,我考虑将文件名存储在特定的表和中

#  List all files under the base folder
fileList =[]
for x in dbutils.fs.ls(base_source_path):
fileList.append(x)
booking_requests_files = []
bookings_files = []
for i in fileList: 
file = i[0].split('/')[-1]
if 'BookingRequests' in file:
booking_requests_files.append(file)
else:
bookings_files.append(file)
loaded_booking_req_files = spark.sql(f"select distinct filename from {booking_requests_table}").rdd.flatMap(lambda x: x).collect()
loaded_bookings_files = spark.sql(f"select distinct filename from {bookings_table}").rdd.flatMap(lambda x: x).collect()
for file in booking_requests_files:
filepath = base_source_path + '/' + file
print(filepath)
if file not in loaded_booking_req_files:
df_req_read = spark.read.format("com.crealytics.spark.excel")
.option("header", "true")
.load(filepath)
for file in bookings_files :
filepath = base_source_path + '/' + file
print(filepath)
if file not in loaded_bookings_files :
df_req_read = spark.read.format("com.crealytics.spark.excel")
.option("header", "true")
.load(filepath)

我试图实现的是避免对文件进行迭代,如图所示。我已经尝试将所有文件作为列表传递,但失败了。

此外,实现这一点的最佳方法是什么?

在我看来像是一种流媒体方法。有多种方法可以满足你的需求。我在批处理和流媒体方面都有类似的事情。我们的案例:

  1. 批处理:我们使用的是标志系统。每次处理文件时,我们都会使用原始文件名创建一个空文件。所以,下次处理时,我们会检查文件是否已经处理过
  2. 结构化流:使用检查点。这里有很多东西可以分享,但如果你不了解结构化流媒体,你可以阅读文档:https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

希望我能帮助你。

最新更新