从BigQuery导出到MySQL时出错



我正在尝试将一个表从BigQuery导出到Google Cloud MySQL数据库。

我发现了一个名为BigQueryToMySqlOperator的操作员(此处记录https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/transfers/bigquery_to_mysql/index.html?highlight=bigquerytomysqloperator#module-airflow。providers.google.cloud.transfers.bigquery_to_mysql(

当我将包含此任务的DAG部署到云编辑器上时,该任务总是失败,并出现错误

Traceback (most recent call last):
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1113, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1287, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1317, in _execute_task
result = task_copy.execute(context=context)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/transfers/bigquery_to_mysql.py", line 166, in execute
for rows in self._bq_get_data():
File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/transfers/bigquery_to_mysql.py", line 138, in _bq_get_data
response = cursor.get_tabledata(
File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/hooks/bigquery.py", line 2508, in get_tabledata
return self.hook.get_tabledata(*args, **kwargs)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/hooks/bigquery.py", line 1284, in get_tabledata
rows = self.list_rows(dataset_id, table_id, max_results, selected_fields, page_token, start_index)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/common/hooks/base_google.py", line 412, in inner_wrapper
raise AirflowException(
airflow.exceptions.AirflowException: You must use keyword arguments in this methods rather than positional

我真的不明白它为什么会抛出这个错误。有人能帮我弄清楚哪里出了问题,或者我应该如何将数据从BigQuery导出到MySQL DB吗?非常感谢你的帮助!

编辑:我的操作员代码基本上看起来像这个

transfer_data = BigQueryToMySqlOperator(
task_id='task_id',
dataset_table='origin_bq_table',
mysql_table='dest_table_name',
replace=True,
)

基于stacktrace,您很可能使用apache气流提供程序google==2.2.0。

airflow.exceptions.AirflowException:必须在中使用关键字参数这种方法而不是定位

此错误源于GoogleBaseHook,可以追溯到BigQueryToMySqlOperator。

BigQueryToMySqlOperator>BigQueryHook>BigQueryConnection>BigQueryCursor>get_tabledata

获得AirflowException的原因是因为get_tabledata作为execute方法的一部分调用。

不幸的是,对运算符的测试并不全面,因为它只检查调用的方法是否是正确的参数。

我认为这将需要一个新版本的谷歌提供程序,其中BigQueryToMySqlOperator中的光标用关键字参数调用list_rows,而不是用位置参数调用list_rows的get_tabledata。

我还在Airflow存储库中发布了一个Github问题。

最新更新