气流:灵气2。编程错误:无法适应类型"PythonOperator"



运行DAG时出现上述错误

psycopg2.ProgrammingError: can't adapt type 'PythonOperator'
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1165, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1283, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1313, in _execute_task
result = task_copy.execute(context=context)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/operators/python.py", line 150, in execute
return_value = self.execute_callable()
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/operators/python.py", line 161, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/opt/airflow/dags/crypto_etl.py", line 61, in create_text_file
close_data = ti.xcom_pull(key=None, task_ids=[transform_data])
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/session.py", line 70, in wrapper
return func(*args, session=session, **kwargs)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1966, in xcom_pull
for result in query.with_entities(XCom.task_id, XCom.value)
File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 3535, in __iter__
return self._execute_and_instances(context)
File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 3560, in _execute_and_instances
result = conn.execute(querycontext.statement, self._params)
File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1011, in execute
return meth(self, multiparams, params)
File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/sql/elements.py", line 298, in _execute_on_connection
return connection._execute_clauseelement(self, multiparams, params)
File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1130, in _execute_clauseelement
distilled_params,
File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1317, in _execute_context
e, statement, parameters, cursor, context
File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1511, in _handle_dbapi_exception
sqlalchemy_exception, with_traceback=exc_info[2], from_=e
File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
raise exception
File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1277, in _execute_context
cursor, statement, parameters, context
File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 608, in do_execute
cursor.execute(statement, parameters)
sqlalchemy.exc.ProgrammingError: (psycopg2.ProgrammingError) can't adapt type 'PythonOperator'
[SQL: SELECT xcom.task_id AS xcom_task_id, xcom.value AS xcom_value 
FROM xcom 
WHERE xcom.task_id IN (%(task_id_1)s) AND xcom.dag_id = %(dag_id_1)s AND xcom.execution_date = %(execution_date_1)s ORDER BY xcom.execution_date DESC, xcom.timestamp DESC]
[parameters: {'task_id_1': <Task(PythonOperator): transform_data>, 'dag_id_1': 'crypto_analysis', 'execution_date_1': DateTime(2021, 9, 30, 10, 0, 0, tzinfo=Timezone('UTC'))}]
(Background on this error at: http://sqlalche.me/e/13/f405)
[2021-09-30 10:02:37,270] {taskinstance.py:1513} INFO - Marking task as FAILED. dag_id=crypto_analysis, task_id=create_text_file, execution_date=20210930T100000, start_date=20210930T100236, end_date=20210930T100237
[2021-09-30 10:02:37,463] {local_task_job.py:151} INFO - Task exited with return code 1
[2021-09-30 10:02:37,693] {local_task_job.py:261} INFO - 0 downstream tasks scheduled from follow-on schedule check

代码如下:

def transform_data(**kwargs):
ti = kwargs['ti']
binance = []
ftx = []
bybit = []
binance_ohlcv_data, ftx_ohlcv_data, fetch_bybit_ohlcv = ti.xcom_pull(key=None, task_ids=['fetch_binance_ohlcv',
                       'fetch_ftx_ohlcv',
                       'fetch_bybit_ohlcv'])
for record in binance_ohlcv_data:
binance.append({'ts': record[0], 'close': record[4], 'volume': record[5]})
for record in ftx_ohlcv_data:
ftx.append({'ts': record[0], 'close': record[4], 'volume': record[5]})
for record in fetch_bybit_ohlcv:
bybit.append({'ts': record[0], 'close': record[4], 'volume': record[5]})
# return {'binance': binance, 'ftx': ftx, 'bybit': bybit}
return binance

def create_text_file(**kwargs):
ti = kwargs['ti']
today = datetime.now()
time_part = today.strftime('%d%m%y%H%m%S')
# binance_ohlcv_data, ftx_ohlcv_data, fetch_bybit_ohlcv = ti.xcom_pull(key=None, task_ids=['fetch_binance_ohlcv',
#                                                                                          'fetch_ftx_ohlcv',
#                                                                                          'fetch_bybit_ohlcv'])
close_data = ti.xcom_pull(key=None, task_ids=[transform_data])
print(close_data)

这一行有问题:

close_data = ti.xcom_pull(key=None, task_ids=[transform_data])

transform_data函数是字符串列表,它适用于ti.xcom_pull函数。你应该像以前那样使用它:

ti.xcom_pull(key=None, task_ids=['fetch_binance_ohlcv', 
'fetch_ftx_ohlcv', 
'fetch_bybit_ohlcv'])

最新更新