DAG导入错误-传递了无效的参数



我正在尝试使用docker的airflow将数据从postgresql(本地(加载到谷歌云存储,但我遇到了这样的错误https://i.stack.imgur.com/pHzAF.png

Broken DAG: [/opt/airflow/dags/postgres_to_bigquery.py] Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 408, in apply_defaults
result = func(self, **kwargs, default_args=default_args)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 756, in __init__
f"Invalid arguments were passed to {self.__class__.__name__} (task_id: {task_id}). "
airflow.exceptions.AirflowException: Invalid arguments were passed to PostgresToGCSOperator (task_id: postgres_to_gcs). Invalid arguments were:
**kwargs: {'google_cloud_storage_conn_id': 'gcp_conn'}

这是我自己的代码的一部分

GCS_CONN = Variable.get('GCS_CONN')
default_args={
'owner': 'airflow',
'retries': 0,
'retry_delay': timedelta(minutes=5),
}
with DAG(
dag_id = 'postgres_to_bigquery',
default_args = default_args,
start_date = datetime(2022, 10, 3),
schedule_interval = '@once'
) as dag:
start = DummyOperator(
task_id = 'start',
)
postgres_to_gcs = PostgresToGCSOperator(
task_id = f'postgres_to_gcs',
postgres_conn_id = 'postgres_localhost',
sql = f'select * from orders;',
bucket = 'airflow_fakri',
filename = f'airflow_fakri/data/orders.csv',
export_format = 'csv',
gzip = False,
use_server_side_cursor = False,
google_cloud_storage_conn_id = GCS_CONN
)

看起来您确实传递了错误的参数。从文档:https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/transfers/postgres_to_gcs/index.html

正确的conn参数名称是postgres_conn_id

最新更新