我正在尝试使用python将文件从一个S3桶复制到另一个S3桶。
我的代码
from airflow import DAG
from datetime import datetime, timedelta
from utils import FAILURE_EMAILS
from airflow.providers.amazon.aws.operators.s3_copy_object import S3CopyObjectOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2020, 1, 1),
'email': FAILURE_EMAILS,
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'S3_COPY_S3',
default_args=default_args,
catchup=False,
schedule_interval=None,
max_active_runs=1
)
copy_step = S3CopyObjectOperator(
source_bucket_key='source_file',
dest_bucket_key='dest_file',
aws_conn_id='aws_connection_id',
source_bucket_name='source-bucket',
dest_bucket_name='dest-bucket',
dag=dag
)
正确吗?谁能验证一下
假设您提供的值被替换为适当的桶名和s3键,您唯一缺少的是S3CopyObjectOperator的task_id。
在实例化操作符时,需要在关联DAG的上下文中提供唯一的task_id。