Airflow运行DAG来操作引发jaydebeapi.Error的DB2数据



我根据Airflow的官方网站制作了我的用于连接DB2的Airflow DAG。当我运行DAG插入数据或更新数据时,将引发jaydebeapi.Error。尽管气流会扬起一只知更鸟。错误,数据仍然在DB2中插入/更新成功。气流界面上的DAG将标记为FAILED。我不知道我错过了什么步骤。

我的DAG代码片段:
with DAG("my_dag1", default_args=default_args,
schedule_interval="@daily", catchup=False) as dag:
cerating_table = JdbcOperator(
task_id='creating_table',
jdbc_conn_id='db2',
sql=r"""
insert into DB2ECIF.T2(C1,C1_DATE) VALUES('TEST',CURRENT DATE);
""",
autocommit=True,
dag=dag
)

DAG日志:

[2022-06-20 02:16:03,743] {base.py:68} INFO - Using connection ID 'db2' for task execution.
[2022-06-20 02:16:04,785] {dbapi.py:213} INFO - Running statement: 
insert into DB2ECIF.T2(C1,C1_DATE) VALUES('TEST',CURRENT DATE);
, parameters: None
[2022-06-20 02:16:04,842] {dbapi.py:221} INFO - Rows affected: 1
[2022-06-20 02:16:04,844] {taskinstance.py:1889} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/providers/jdbc/operators/jdbc.py", line 76, in execute
return hook.run(self.sql, self.autocommit, parameters=self.parameters, handler=fetch_all_handler)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/hooks/dbapi.py", line 195, in run
result = handler(cur)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/providers/jdbc/operators/jdbc.py", line 30, in fetch_all_handler
return cursor.fetchall()
File "/home/airflow/.local/lib/python3.7/site-packages/jaydebeapi/__init__.py", line 596, in fetchall
row = self.fetchone()
File "/home/airflow/.local/lib/python3.7/site-packages/jaydebeapi/__init__.py", line 561, in fetchone
raise Error()
jaydebeapi.Error
[2022-06-20 02:16:04,847] {taskinstance.py:1400} INFO - Marking task as FAILED. dag_id=my_dag1, task_id=creating_table, execution_date=20210101T000000, start_date=, end_date=20220620T021604

我已经安装了所需的python气流包。列表如下:

包(系统)名称/版本

2.3.2
  1. 气流/
  2. IBM DB2/11.5.7
  3. OpenJDK/15.0.2
  4. JayDeBeApi/1.2.0
  5. JPype1/0.7.2
  6. apache-airflow-providers-jdbc/3.0.0

我已经尝试使用最新版本的第4项(1.2.3)和第5项(1.4.0)仍然不能工作。我也把气流版本降级到2.2.3或2.2.5得到了同样的结果。

如何解决这个问题?

这个错误不是在原来的插入查询中发生的,而是由于在这个PR中引入的fetchall - https://github.com/apache/airflow/pull/23817

使用apache-airflow-providers-jdbc/2.1.3可能是一个简单的解决方案。

要获得根本原因,在气流中设置DEBUG日志级别并查看fetchall导致错误的原因。拥有完整的回溯将有助于

最新更新