气流:XCOM Inserert期间的泡菜深度递归



嗨,我正在使用PythonOperator运行一个任务。看起来该任务实际上能够正常运行,并且返回的值是我预期的(它是 API 调用的大型 XML 输出)。但是,我得到了一个ERROR - (builtins.RecursionError) maximum recursion depth exceeded in comparison.我的 python 可调用对象返回一个值,所以我假设有一个 XCOM 推送,并且它正在尝试序列化后续运算符摄取的输出。但我不确定如何解决,因为我没有看到 a) 增加 Pickle 序列化程序的递归深度(此处建议)或 2) XCOM 推送期间的任何错误处理的配置

我的完整跟踪如下

INFO - Subtask: [2017-11-08 14:00:14,545] {models.py:1342} INFO - Executing <Task(PythonOperator): test_task_xml> on 2017-11-07 00:00:00
INFO - Subtask: [2017-11-08 14:00:31,817] {python_operator.py:81} INFO - Done. Returned value was: <QueryResult><Query><Answer>12345</Answer> ... (12321456 characters truncated) ... </Query></QueryResult>
INFO - Subtask: [2017-11-08 14:00:31,839] {models.py:1417} ERROR - (builtins.RecursionError) maximum recursion depth exceeded in comparison [SQL: 'INSERT INTO xcom (key, value, timestamp, execution_date, task_id, dag_id) VALUES (%(key)s, %(value)s, now(), %(execution_date)s, %(task_id)s, %(dag_id)s) RETURNING xcom.id'] [parameters: [{'dag_id': 'test_dag', 'key': 'return_value', 'value': <QueryResult><Query><Answer>12345</Answer> ... (12321456 characters truncated) ... </Query></QueryResult>, 'task_id': 'test_task_xml', 'execution_date': datetime.datetime(2017, 11, 7, 0, 0)}]]
INFO - Subtask: Traceback (most recent call last):
INFO - Subtask:   File "/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1116, in _execute_context
INFO - Subtask:     context = constructor(dialect, self, conn, *args)
INFO - Subtask:   File "/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 690, in _init_compiled
INFO - Subtask:     for key in compiled_params
INFO - Subtask:   File "/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 690, in <genexpr>
INFO - Subtask:     for key in compiled_params
INFO - Subtask:   File "/lib/python3.6/site-packages/sqlalchemy/sql/sqltypes.py", line 1516, in process
INFO - Subtask:     value = dumps(value, protocol)
INFO - Subtask:     dump(obj, file, protocol, byref, fmode, recurse)#, strictio)
INFO - Subtask:   File "/lib/python3.6/site-packages/dill/dill.py", line 274, in dump
[INFO - Subtask:     pik.dump(obj)
INFO - Subtask:   File "/lib/python3.6/pickle.py", line 409, in dump
INFO - Subtask:     self.save(obj)
INFO - Subtask:   File "/lib/python3.6/pickle.py", line 521, in save
INFO - Subtask:     self.save_reduce(obj=obj, *rv)
INFO - Subtask:   File "/lib/python3.6/pickle.py", line 634, in save_reduce
INFO - Subtask:     save(state)
INFO - Subtask:   File "/lib/python3.6/pickle.py", line 476, in save
INFO - Subtask:     f(self, obj) # Call unbound method with explicit self
INFO - Subtask:   File "/lib/python3.6/site-packages/dill/dill.py", line 871, in save_module_dict
INFO - Subtask:     StockPickler.save_dict(pickler, obj)
INFO - Subtask:   File "/lib/python3.6/pickle.py", line 821, in save_dict
INFO - Subtask:     self._batch_setitems(obj.items())
INFO - Subtask:   File "/lib/python3.6/pickle.py", line 847, in _batch_setitems
INFO - Subtask:     save(v)
INFO - Subtask:   File "/lib/python3.6/pickle.py", line 476, in save
INFO - Subtask:     f(self, obj) # Call unbound method with explicit self
INFO - Subtask:   File "/lib/python3.6/pickle.py", line 781, in save_list
INFO - Subtask:     self._batch_appends(obj)
INFO - Subtask:   File "/lib/python3.6/pickle.py", line 805, in _batch_appends
INFO - Subtask:     save(x)
INFO - Subtask:   File "/lib/python3.6/pickle.py", line 521, in save
INFO - Subtask:     self.save_reduce(obj=obj, *rv)
INFO - Subtask:   File "/lib/python3.6/pickle.py", line 634, in save_reduce
INFO - Subtask:     save(state)
INFO - Subtask:   File "/lib/python3.6/pickle.py", line 476, in save
INFO - Subtask:     f(self, obj) # Call unbound method with explicit self
INFO - Subtask:   File "/lib/python3.6/site-packages/dill/dill.py", line 871, in save_module_dict
发现数据库

BLOBBINARY LARGE OBJECT设置了对Python对象的酸洗限制。要解决此问题,您可以

  • 尝试使用文件流
  • 将文件
  • 转储到临时文件夹中,并通过XCOM功能推送文件路径
  • 在单个任务中处理整个过程,并通过XCOM推送某些值

最新更新