我根据Airflow的官方网站制作了我的用于连接DB2的Airflow DAG。当我运行DAG插入数据或更新数据时,将引发jaydebeapi.Error。尽管气流会扬起一只知更鸟。错误,数据仍然在DB2中插入/更新成功。气流界面上的DAG将标记为FAILED。我不知道我错过了什么步骤。
我的DAG代码片段:with DAG("my_dag1", default_args=default_args,
schedule_interval="@daily", catchup=False) as dag:
cerating_table = JdbcOperator(
task_id='creating_table',
jdbc_conn_id='db2',
sql=r"""
insert into DB2ECIF.T2(C1,C1_DATE) VALUES('TEST',CURRENT DATE);
""",
autocommit=True,
dag=dag
)
DAG日志:
[2022-06-20 02:16:03,743] {base.py:68} INFO - Using connection ID 'db2' for task execution.
[2022-06-20 02:16:04,785] {dbapi.py:213} INFO - Running statement:
insert into DB2ECIF.T2(C1,C1_DATE) VALUES('TEST',CURRENT DATE);
, parameters: None
[2022-06-20 02:16:04,842] {dbapi.py:221} INFO - Rows affected: 1
[2022-06-20 02:16:04,844] {taskinstance.py:1889} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/providers/jdbc/operators/jdbc.py", line 76, in execute
return hook.run(self.sql, self.autocommit, parameters=self.parameters, handler=fetch_all_handler)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/hooks/dbapi.py", line 195, in run
result = handler(cur)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/providers/jdbc/operators/jdbc.py", line 30, in fetch_all_handler
return cursor.fetchall()
File "/home/airflow/.local/lib/python3.7/site-packages/jaydebeapi/__init__.py", line 596, in fetchall
row = self.fetchone()
File "/home/airflow/.local/lib/python3.7/site-packages/jaydebeapi/__init__.py", line 561, in fetchone
raise Error()
jaydebeapi.Error
[2022-06-20 02:16:04,847] {taskinstance.py:1400} INFO - Marking task as FAILED. dag_id=my_dag1, task_id=creating_table, execution_date=20210101T000000, start_date=, end_date=20220620T021604
我已经安装了所需的python气流包。列表如下:
包(系统)名称/版本
2.3.2- 气流/ IBM DB2/11.5.7
- OpenJDK/15.0.2
- JayDeBeApi/1.2.0
- JPype1/0.7.2
- apache-airflow-providers-jdbc/3.0.0
我已经尝试使用最新版本的第4项(1.2.3)和第5项(1.4.0)仍然不能工作。我也把气流版本降级到2.2.3或2.2.5得到了同样的结果。
如何解决这个问题?
这个错误不是在原来的插入查询中发生的,而是由于在这个PR中引入的fetchall - https://github.com/apache/airflow/pull/23817
使用apache-airflow-providers-jdbc/2.1.3
可能是一个简单的解决方案。
要获得根本原因,在气流中设置DEBUG日志级别并查看fetchall导致错误的原因。拥有完整的回溯将有助于