气流操作员之间共享大文件



假设气流DAG由以下两个操作符组成:

  • Op1(GCSToLocalFilesystemOperator(:从GCS 下载文件XYZ

  • Op2(PythonOperator(:需要文件XYZ对其进行处理

DAG将由GCP的Composer执行。我知道可以将这两个运算符组合起来,并将它们实现为一个PythonOperator,但我想做得更模块化,并尽可能多地使用内置运算符。

候选解决方案:

  • 使用临时文件:不能使用临时文件。如果XYZ存储为/tmp/XXYZ,Op2可能会也可能找不到它,因为这两个操作员可能由不同的工作人员运行。

  • xcom:xcom也不能在这里使用,因为文件相当大。

  • 将GCS用作共享存储器:在本例中不起作用。我们回到了现在的位置,因为一个运营商仍然需要为第二个运营商下载和利用文件。

那么,这里有什么好的解决方案?一般来说,有没有一种方法可以将文件(或字符串(从一个运算符传递给另一个运算符?

我找到了这里描述的解决方案。composer pod上存在一个gcsfuse文件系统。在您的composer代码中,您可以通过/home/airflow/gcs本地访问它。

因此,在上面的例子中,Op1需要将XYZ从另一个bucket复制到composer bucket。这可以使用GCSToGCSOperator:来完成

from airflow.operators.python_operator import PythonOperator
from airflow.providers.google.cloud.transfers.gcs_to_gcs import GCSToGCSOperator
MY_BUCKET='Original Bucket containing XYZ'
PATH_TO_XYZ='path/to/XYZ'
COMPOSER_BUCKET = os.environ.get('GCS_BUCKET')
XYZ_GCS_PATH = 'data/my_dir/XYZ'
XYZ_LOCAL_PATH = f'/home/airflow/gcs/{XYZ_GCS_PATH}'
def my_function(filepath):
with open(filepath) as f:
content = f.read()
print(content)

with models.DAG(...) as dag:
Op1 = GCSToGCSOperator(
task_id='download_data',
source_bucket=MY_BUCKET,
source_object=PATH_TO_XYZ,
destination_bucket=COMPOSER_BUCKET,
destination_object=XYZ_GCS_PATH,
)
Op1 = PythonOperator(
task_id='read_file',
python_callable=my_function,
op_kwargs={'filepath': XYZ_LOCAL_PATH}
)
Op1 >> Op2

是,中间文件。使用xcom传递文件的路径

最新更新