如何处理Apache气流中已经处理过的文件



假设我有一个raw_data文件夹,其中有我想要转换的文件,然后加载到clean_data文件夹。我希望我的DAG嗅探raw_data文件夹,只处理以前未转换和加载的文件。为此,我会使用一个日志文件来跟踪我已经处理过的文件,但我不确定这是最好的方法。

如何继续忽略已经处理过的文件,可能在Apache气流中有一个内置的功能?

我的目录如下

│   extract.py
│   track_actions.log
│
├───clean_data
└───raw_data
file1.txt
file2.txt
file3.txt

我的Dag函数如下

import os
import shutil
raw_files = os.listdir('./raw_data')

# would be my dag function
with open('track_actions.log', 'r') as log_file:
already_treated_files = log_file.read().split()
with open('track_actions.log', 'w') as log_file:
for filename in raw_files:
if filename not in already_treated_files:
# apply some transformation here
shutil.copy(f'./raw_data/{filename}', f'./clean_data/{filename[:-4]}_transformed.txt')
log_file.write(filename + 'n')
else:
print(filename, 'already treated')

我假设数据在你的" raw_data ";文件夹定期到达,您希望气流处理它。气流有"运行时上下文"的概念,这是运行时的几个变量,其中包含有关间隔的开始和结束日期等信息。参见文档中的data_interval_startdata_interval_end: https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html.

在你的函数中,你可以在这个间隔内选择一个创建日期的文件,并安排你的DAG运行,例如每小时一次(取决于你的数据到达的频率)。然后,您将只处理最近一小时内创建的文件。

最新更新