气流 - 在 2 个不同的项目中从/向 Google Cloud Storage 复制 blob



我正在尝试使用 Airflow 将 blob 从项目 X 中的 GCS 存储桶 A 复制到项目 Y 中的存储桶 B。

似乎可用的运算符(GCSToGCSOperator)仅在同一项目中的两个存储桶之间运行良好。

我怎样才能在我的案例中实现副本?

我想避免使用 BashOperator...

谢谢!!

选项 1:使用CloudDataTransferServiceCreateJobOperator使用 Google API 创建传输作业。您可以在文档中找到有关它的信息。请注意,这要求服务帐户有权访问两者。如果不是这种情况,则尚不支持 请参阅使用 Google 存储传输 API 将数据从外部 GCS 传输到我的 GCS 中

选项 2:GCSToLocalFilesystemOperator与项目 1 一起使用,然后使用项目 2LocalFilesystemToGCSOperator

此解决方案的框架:

from airflow import DAG
from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
from airflow.providers.google.cloud.transfers.gcs_to_local import GCSToLocalFilesystemOperator
with DAG(
"example", schedule_interval="@daily", start_date=datetime(2021, 1, 1), catchup=False
) as dag:
download = GCSToLocalFilesystemOperator(
task_id="download_task",
bucket='some_bucket',
filename='/tmp/fake1.csv',
object_name="test/test1.csv",
gcp_conn_id='google_cloud_origin'
)

upload = LocalFilesystemToGCSOperator(
task_id='upload_task',
bucket='some_bucket',
src='/tmp/fake1.csv',
dst='test/test1.csv',
gcp_conn_id='google_cloud_dest'
)
download >> upload

虽然这不是理想的解决方案。这实际上取决于您的工作数量和频率。使用此解决方案,您可以通过本地磁盘传输文件 - 小批量可以。此解决方案适用于两个不同帐户的情况,因为每个运营商都与不同的 Google 连接相关联。

最新更新